As we reevaluate how to best support and maintain Staging Ref in the future, we encourage development teams using this environment to highlight their use cases in the following issue: https://gitlab.com/gitlab-com/gl-infra/software-delivery/framework/software-delivery-framework-issue-tracker/-/issues/36.

Skip to content
Snippets Groups Projects
Commit b69c08e2 authored by Igor Drozdov's avatar Igor Drozdov
Browse files

Merge branch 'jv-ssh-sidechannel' into 'main'

Add support for SSHUploadPackWithSidechannel RPC

See merge request gitlab-org/gitlab-shell!557
parents 0fbf4e47 cfd5e9f2
No related branches found
No related tags found
No related merge requests found
Showing with 511 additions and 89 deletions
package testserver
import (
"context"
"fmt"
"net"
"os"
"path"
Loading
Loading
@@ -8,12 +10,18 @@ import (
"testing"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
pb "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)
type TestGitalyServer struct{ ReceivedMD metadata.MD }
type TestGitalyServer struct {
ReceivedMD metadata.MD
pb.UnimplementedSSHServiceServer
}
func (s *TestGitalyServer) SSHReceivePack(stream pb.SSHService_SSHReceivePackServer) error {
req, err := stream.Recv()
Loading
Loading
@@ -43,6 +51,26 @@ func (s *TestGitalyServer) SSHUploadPack(stream pb.SSHService_SSHUploadPackServe
return nil
}
func (s *TestGitalyServer) SSHUploadPackWithSidechannel(ctx context.Context, req *pb.SSHUploadPackWithSidechannelRequest) (*pb.SSHUploadPackWithSidechannelResponse, error) {
conn, err := client.OpenServerSidechannel(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
s.ReceivedMD, _ = metadata.FromIncomingContext(ctx)
response := []byte("SSHUploadPackWithSidechannel: " + req.Repository.GlRepository)
if _, err := fmt.Fprintf(conn, "%04x\x01%s", len(response)+5, response); err != nil {
return nil, err
}
if err := conn.Close(); err != nil {
return nil, err
}
return &pb.SSHUploadPackWithSidechannelResponse{}, nil
}
func (s *TestGitalyServer) SSHUploadArchive(stream pb.SSHService_SSHUploadArchiveServer) error {
req, err := stream.Recv()
if err != nil {
Loading
Loading
@@ -67,7 +95,9 @@ func StartGitalyServer(t *testing.T) (string, *TestGitalyServer) {
err := os.MkdirAll(filepath.Dir(gitalySocketPath), 0700)
require.NoError(t, err)
server := grpc.NewServer()
server := grpc.NewServer(
client.SidechannelServer(log.ContextLogger(context.Background()), insecure.NewCredentials()),
)
listener, err := net.Listen("unix", gitalySocketPath)
require.NoError(t, err)
Loading
Loading
Loading
Loading
@@ -11,10 +11,10 @@ require (
github.com/pires/go-proxyproto v0.6.0
github.com/prometheus/client_golang v1.10.0
github.com/stretchr/testify v1.7.0
gitlab.com/gitlab-org/gitaly/v14 v14.0.0-rc1
gitlab.com/gitlab-org/gitaly/v14 v14.6.0-rc1.0.20220121102056-2e398afa0490
gitlab.com/gitlab-org/labkit v1.7.0
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.37.0
google.golang.org/grpc v1.38.0
gopkg.in/yaml.v2 v2.4.0
)
This diff is collapsed.
Loading
Loading
@@ -30,7 +30,7 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie
GitConfigOptions: response.GitConfigOptions,
}
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
defer cancel()
Loading
Loading
Loading
Loading
@@ -23,7 +23,7 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie
request := &pb.SSHUploadArchiveRequest{Repository: &response.Gitaly.Repo}
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
defer cancel()
Loading
Loading
Loading
Loading
@@ -21,13 +21,30 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie
Features: response.Gitaly.Features,
}
if response.Gitaly.UseSidechannel {
gc.DialSidechannel = true
request := &pb.SSHUploadPackWithSidechannelRequest{
Repository: &response.Gitaly.Repo,
GitProtocol: c.Args.Env.GitProtocolVersion,
GitConfigOptions: response.GitConfigOptions,
}
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
defer cancel()
rw := c.ReadWriter
return client.UploadPackWithSidechannel(ctx, conn, registry, rw.In, rw.Out, rw.ErrOut, request)
})
}
request := &pb.SSHUploadPackRequest{
Repository: &response.Gitaly.Repo,
GitProtocol: c.Args.Env.GitProtocolVersion,
GitConfigOptions: response.GitConfigOptions,
}
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
defer cancel()
Loading
Loading
Loading
Loading
@@ -71,3 +71,59 @@ func TestUploadPack(t *testing.T) {
require.Empty(t, testServer.ReceivedMD["some-other-ff"])
require.Equal(t, testServer.ReceivedMD["x-gitlab-correlation-id"][0], "a-correlation-id")
}
func TestUploadPack_withSidechannel(t *testing.T) {
gitalyAddress, testServer := testserver.StartGitalyServer(t)
requests := requesthandlers.BuildAllowedWithGitalyHandlersWithSidechannel(t, gitalyAddress)
url := testserver.StartHttpServer(t, requests)
output := &bytes.Buffer{}
input := &bytes.Buffer{}
userId := "1"
repo := "group/repo"
env := sshenv.Env{
IsSSHConnection: true,
OriginalCommand: "git-upload-pack " + repo,
RemoteAddr: "127.0.0.1",
}
args := &commandargs.Shell{
GitlabKeyId: userId,
CommandType: commandargs.UploadPack,
SshArgs: []string{"git-upload-pack", repo},
Env: env,
}
cmd := &Command{
Config: &config.Config{GitlabUrl: url},
Args: args,
ReadWriter: &readwriter.ReadWriter{ErrOut: output, Out: output, In: input},
}
ctx := correlation.ContextWithCorrelation(context.Background(), "a-correlation-id")
ctx = correlation.ContextWithClientName(ctx, "gitlab-shell-tests")
err := cmd.Execute(ctx)
require.NoError(t, err)
require.Equal(t, "SSHUploadPackWithSidechannel: "+repo, output.String())
for k, v := range map[string]string{
"gitaly-feature-cache_invalidator": "true",
"gitaly-feature-inforef_uploadpack_cache": "false",
"x-gitlab-client-name": "gitlab-shell-tests-git-upload-pack",
"key_id": "123",
"user_id": "1",
"remote_ip": "127.0.0.1",
"key_type": "key",
} {
actual := testServer.ReceivedMD[k]
require.Len(t, actual, 1)
require.Equal(t, v, actual[0])
}
require.Empty(t, testServer.ReceivedMD["some-other-ff"])
require.Equal(t, testServer.ReceivedMD["x-gitlab-correlation-id"][0], "a-correlation-id")
}
Loading
Loading
@@ -32,10 +32,11 @@ type Request struct {
}
type Gitaly struct {
Repo pb.Repository `json:"repository"`
Address string `json:"address"`
Token string `json:"token"`
Features map[string]string `json:"features"`
Repo pb.Repository `json:"repository"`
Address string `json:"address"`
Token string `json:"token"`
Features map[string]string `json:"features"`
UseSidechannel bool `json:"use_sidechannel"`
}
type CustomPayloadData struct {
Loading
Loading
Loading
Loading
@@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/require"
pb "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitlab-shell/client"
"gitlab.com/gitlab-org/gitlab-shell/client/testserver"
"gitlab.com/gitlab-org/gitlab-shell/internal/command/commandargs"
"gitlab.com/gitlab-org/gitlab-shell/internal/config"
Loading
Loading
@@ -54,7 +53,11 @@ func buildExpectedResponse(who string) *Response {
}
func TestSuccessfulResponses(t *testing.T) {
client := setup(t, "")
okResponse := testResponse{body: responseBody(t, "allowed.json"), status: http.StatusOK}
client := setup(t,
map[string]testResponse{"first": okResponse},
map[string]testResponse{"1": okResponse},
)
testCases := []struct {
desc string
Loading
Loading
@@ -83,8 +86,48 @@ func TestSuccessfulResponses(t *testing.T) {
}
}
func TestSidechannelFlag(t *testing.T) {
okResponse := testResponse{body: responseBody(t, "allowed_sidechannel.json"), status: http.StatusOK}
client := setup(t,
map[string]testResponse{"first": okResponse},
map[string]testResponse{"1": okResponse},
)
testCases := []struct {
desc string
args *commandargs.Shell
who string
}{
{
desc: "Provide key id within the request",
args: &commandargs.Shell{GitlabKeyId: "1"},
who: "key-1",
}, {
desc: "Provide username within the request",
args: &commandargs.Shell{GitlabUsername: "first"},
who: "user-1",
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
result, err := client.Verify(context.Background(), tc.args, uploadPackAction, repo)
require.NoError(t, err)
response := buildExpectedResponse(tc.who)
response.Gitaly.UseSidechannel = true
require.Equal(t, response, result)
})
}
}
func TestGeoPushGetCustomAction(t *testing.T) {
client := setup(t, "responses/allowed_with_push_payload.json")
client := setup(t, map[string]testResponse{
"custom": {
body: responseBody(t, "allowed_with_push_payload.json"),
status: 300,
},
}, nil)
args := &commandargs.Shell{GitlabUsername: "custom"}
result, err := client.Verify(context.Background(), args, receivePackAction, repo)
Loading
Loading
@@ -106,7 +149,12 @@ func TestGeoPushGetCustomAction(t *testing.T) {
}
func TestGeoPullGetCustomAction(t *testing.T) {
client := setup(t, "responses/allowed_with_pull_payload.json")
client := setup(t, map[string]testResponse{
"custom": {
body: responseBody(t, "allowed_with_pull_payload.json"),
status: 300,
},
}, nil)
args := &commandargs.Shell{GitlabUsername: "custom"}
result, err := client.Verify(context.Background(), args, uploadPackAction, repo)
Loading
Loading
@@ -128,7 +176,11 @@ func TestGeoPullGetCustomAction(t *testing.T) {
}
func TestErrorResponses(t *testing.T) {
client := setup(t, "")
client := setup(t, nil, map[string]testResponse{
"2": {body: []byte(`{"message":"Not allowed!"}`), status: http.StatusForbidden},
"3": {body: []byte(`{"message":"broken json!`), status: http.StatusOK},
"4": {status: http.StatusForbidden},
})
testCases := []struct {
desc string
Loading
Loading
@@ -163,20 +215,21 @@ func TestErrorResponses(t *testing.T) {
}
}
func setup(t *testing.T, allowedPayload string) *Client {
testhelper.PrepareTestRootDir(t)
type testResponse struct {
body []byte
status int
}
body, err := os.ReadFile(path.Join(testhelper.TestRoot, "responses/allowed.json"))
func responseBody(t *testing.T, name string) []byte {
t.Helper()
testhelper.PrepareTestRootDir(t)
body, err := os.ReadFile(path.Join(testhelper.TestRoot, "responses", name))
require.NoError(t, err)
return body
}
var bodyWithPayload []byte
if allowedPayload != "" {
allowedWithPayloadPath := path.Join(testhelper.TestRoot, allowedPayload)
bodyWithPayload, err = os.ReadFile(allowedWithPayloadPath)
require.NoError(t, err)
}
func setup(t *testing.T, userResponses, keyResponses map[string]testResponse) *Client {
t.Helper()
requests := []testserver.TestRequestHandler{
{
Path: "/api/v4/internal/allowed",
Loading
Loading
@@ -187,36 +240,14 @@ func setup(t *testing.T, allowedPayload string) *Client {
var requestBody *Request
require.NoError(t, json.Unmarshal(b, &requestBody))
switch requestBody.Username {
case "first":
_, err = w.Write(body)
require.NoError(t, err)
case "second":
errBody := map[string]interface{}{
"status": false,
"message": "missing user",
}
require.NoError(t, json.NewEncoder(w).Encode(errBody))
case "custom":
w.WriteHeader(http.StatusMultipleChoices)
_, err = w.Write(bodyWithPayload)
if tr, ok := userResponses[requestBody.Username]; ok {
w.WriteHeader(tr.status)
_, err := w.Write(tr.body)
require.NoError(t, err)
}
switch requestBody.KeyId {
case "1":
_, err = w.Write(body)
} else if tr, ok := keyResponses[requestBody.KeyId]; ok {
w.WriteHeader(tr.status)
_, err := w.Write(tr.body)
require.NoError(t, err)
case "2":
w.WriteHeader(http.StatusForbidden)
errBody := &client.ErrorResponse{
Message: "Not allowed!",
}
require.NoError(t, json.NewEncoder(w).Encode(errBody))
case "3":
w.Write([]byte("{ \"message\": \"broken json!\""))
case "4":
w.WriteHeader(http.StatusForbidden)
}
},
},
Loading
Loading
Loading
Loading
@@ -29,21 +29,23 @@ import (
// GitalyHandlerFunc implementations are responsible for making
// an appropriate Gitaly call using the provided client and context
// and returning an error from the Gitaly call.
type GitalyHandlerFunc func(ctx context.Context, client *grpc.ClientConn) (int32, error)
type GitalyHandlerFunc func(ctx context.Context, client *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error)
type GitalyCommand struct {
Config *config.Config
ServiceName string
Address string
Token string
Features map[string]string
Config *config.Config
ServiceName string
Address string
Token string
Features map[string]string
DialSidechannel bool
}
// RunGitalyCommand provides a bootstrap for Gitaly commands executed
// through GitLab-Shell. It ensures that logging, tracing and other
// common concerns are configured before executing the `handler`.
func (gc *GitalyCommand) RunGitalyCommand(ctx context.Context, handler GitalyHandlerFunc) error {
conn, err := getConn(ctx, gc)
registry := client.NewSidechannelRegistry(log.ContextLogger(ctx))
conn, err := getConn(ctx, gc, registry)
if err != nil {
log.ContextLogger(ctx).WithError(fmt.Errorf("RunGitalyCommand: %v", err)).Error("Failed to get connection to execute Git command")
Loading
Loading
@@ -53,7 +55,7 @@ func (gc *GitalyCommand) RunGitalyCommand(ctx context.Context, handler GitalyHan
childCtx := withOutgoingMetadata(ctx, gc.Features)
ctxlog := log.ContextLogger(childCtx)
exitStatus, err := handler(childCtx, conn)
exitStatus, err := handler(childCtx, conn, registry)
if err != nil {
if grpcstatus.Convert(err).Code() == grpccodes.Unavailable {
Loading
Loading
@@ -116,7 +118,7 @@ func withOutgoingMetadata(ctx context.Context, features map[string]string) conte
return metadata.NewOutgoingContext(ctx, md)
}
func getConn(ctx context.Context, gc *GitalyCommand) (*grpc.ClientConn, error) {
func getConn(ctx context.Context, gc *GitalyCommand, registry *client.SidechannelRegistry) (*grpc.ClientConn, error) {
if gc.Address == "" {
return nil, fmt.Errorf("no gitaly_address given")
}
Loading
Loading
@@ -160,5 +162,9 @@ func getConn(ctx context.Context, gc *GitalyCommand) (*grpc.ClientConn, error) {
)
}
if gc.DialSidechannel {
return client.DialSidechannel(ctx, gc.Address, registry, connOpts)
}
return client.DialContext(ctx, gc.Address, connOpts)
}
Loading
Loading
@@ -11,14 +11,15 @@ import (
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
"gitlab.com/gitlab-org/gitaly/v14/client"
pb "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitlab-shell/internal/config"
"gitlab.com/gitlab-org/gitlab-shell/internal/gitlabnet/accessverifier"
"gitlab.com/gitlab-org/gitlab-shell/internal/sshenv"
)
func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn) (int32, error) {
return func(ctx context.Context, client *grpc.ClientConn) (int32, error) {
func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry) (int32, error) {
return func(ctx context.Context, client *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
require.NotNil(t, ctx)
require.NotNil(t, client)
Loading
Loading
@@ -86,7 +87,7 @@ func TestRunGitalyCommandMetadata(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
cmd := tt.gc
err := cmd.RunGitalyCommand(context.Background(), func(ctx context.Context, _ *grpc.ClientConn) (int32, error) {
err := cmd.RunGitalyCommand(context.Background(), func(ctx context.Context, _ *grpc.ClientConn, _ *client.SidechannelRegistry) (int32, error) {
md, exists := metadata.FromOutgoingContext(ctx)
require.True(t, exists)
require.Equal(t, len(tt.want), md.Len())
Loading
Loading
Loading
Loading
@@ -29,6 +29,14 @@ func BuildDisallowedByApiHandlers(t *testing.T) []testserver.TestRequestHandler
}
func BuildAllowedWithGitalyHandlers(t *testing.T, gitalyAddress string) []testserver.TestRequestHandler {
return buildAllowedWithGitalyHandlers(t, gitalyAddress, false)
}
func BuildAllowedWithGitalyHandlersWithSidechannel(t *testing.T, gitalyAddress string) []testserver.TestRequestHandler {
return buildAllowedWithGitalyHandlers(t, gitalyAddress, true)
}
func buildAllowedWithGitalyHandlers(t *testing.T, gitalyAddress string, useSidechannel bool) []testserver.TestRequestHandler {
requests := []testserver.TestRequestHandler{
{
Path: "/api/v4/internal/allowed",
Loading
Loading
@@ -56,6 +64,9 @@ func BuildAllowedWithGitalyHandlers(t *testing.T, gitalyAddress string) []testse
},
},
}
if useSidechannel {
body["gitaly"].(map[string]interface{})["use_sidechannel"] = true
}
require.NoError(t, json.NewEncoder(w).Encode(body))
},
},
Loading
Loading
{
"status": true,
"gl_repository": "project-26",
"gl_project_path": "group/private",
"gl_id": "user-1",
"gl_username": "root",
"git_config_options": ["option"],
"gitaly": {
"repository": {
"storage_name": "default",
"relative_path": "@hashed/5f/9c/5f9c4ab08cac7457e9111a30e4664920607ea2c115a1433d7be98e97e64244ca.git",
"git_object_directory": "path/to/git_object_directory",
"git_alternate_object_directories": ["path/to/git_alternate_object_directory"],
"gl_repository": "project-26",
"gl_project_path": "group/private"
},
"address": "unix:gitaly.socket",
"token": "token",
"use_sidechannel": true
},
"git_protocol": "protocol",
"gl_console_messages": ["console", "message"]
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment