As we reevaluate how to best support and maintain Staging Ref in the future, we encourage development teams using this environment to highlight their use cases in the following issue: https://gitlab.com/gitlab-com/gl-infra/software-delivery/framework/software-delivery-framework-issue-tracker/-/issues/36.

Skip to content
Snippets Groups Projects
Commit cfd5e9f2 authored by Jacob Vosmaer's avatar Jacob Vosmaer Committed by Igor Drozdov
Browse files

Optionally use SSHUploadPackWithSidechannel

If the GitLab API returns an allowed response with use_sidechannel set
to true, gitlab-shell will establish a sidechannel connection and use
SSHUploadPackWithSidechannel instead of SSHUploadPack. This is an
efficiency improvement.
parent 250c0db0
No related branches found
No related tags found
No related merge requests found
package testserver
import (
"context"
"fmt"
"net"
"os"
"path"
Loading
Loading
@@ -8,8 +10,11 @@ import (
"testing"
"github.com/stretchr/testify/require"
"gitlab.com/gitlab-org/gitaly/v14/client"
pb "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/labkit/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)
Loading
Loading
@@ -46,6 +51,26 @@ func (s *TestGitalyServer) SSHUploadPack(stream pb.SSHService_SSHUploadPackServe
return nil
}
func (s *TestGitalyServer) SSHUploadPackWithSidechannel(ctx context.Context, req *pb.SSHUploadPackWithSidechannelRequest) (*pb.SSHUploadPackWithSidechannelResponse, error) {
conn, err := client.OpenServerSidechannel(ctx)
if err != nil {
return nil, err
}
defer conn.Close()
s.ReceivedMD, _ = metadata.FromIncomingContext(ctx)
response := []byte("SSHUploadPackWithSidechannel: " + req.Repository.GlRepository)
if _, err := fmt.Fprintf(conn, "%04x\x01%s", len(response)+5, response); err != nil {
return nil, err
}
if err := conn.Close(); err != nil {
return nil, err
}
return &pb.SSHUploadPackWithSidechannelResponse{}, nil
}
func (s *TestGitalyServer) SSHUploadArchive(stream pb.SSHService_SSHUploadArchiveServer) error {
req, err := stream.Recv()
if err != nil {
Loading
Loading
@@ -70,7 +95,9 @@ func StartGitalyServer(t *testing.T) (string, *TestGitalyServer) {
err := os.MkdirAll(filepath.Dir(gitalySocketPath), 0700)
require.NoError(t, err)
server := grpc.NewServer()
server := grpc.NewServer(
client.SidechannelServer(log.ContextLogger(context.Background()), insecure.NewCredentials()),
)
listener, err := net.Listen("unix", gitalySocketPath)
require.NoError(t, err)
Loading
Loading
Loading
Loading
@@ -30,7 +30,7 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie
GitConfigOptions: response.GitConfigOptions,
}
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
defer cancel()
Loading
Loading
Loading
Loading
@@ -23,7 +23,7 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie
request := &pb.SSHUploadArchiveRequest{Repository: &response.Gitaly.Repo}
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
defer cancel()
Loading
Loading
Loading
Loading
@@ -21,13 +21,30 @@ func (c *Command) performGitalyCall(ctx context.Context, response *accessverifie
Features: response.Gitaly.Features,
}
if response.Gitaly.UseSidechannel {
gc.DialSidechannel = true
request := &pb.SSHUploadPackWithSidechannelRequest{
Repository: &response.Gitaly.Repo,
GitProtocol: c.Args.Env.GitProtocolVersion,
GitConfigOptions: response.GitConfigOptions,
}
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
defer cancel()
rw := c.ReadWriter
return client.UploadPackWithSidechannel(ctx, conn, registry, rw.In, rw.Out, rw.ErrOut, request)
})
}
request := &pb.SSHUploadPackRequest{
Repository: &response.Gitaly.Repo,
GitProtocol: c.Args.Env.GitProtocolVersion,
GitConfigOptions: response.GitConfigOptions,
}
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn) (int32, error) {
return gc.RunGitalyCommand(ctx, func(ctx context.Context, conn *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
ctx, cancel := gc.PrepareContext(ctx, request.Repository, response, c.Args.Env)
defer cancel()
Loading
Loading
Loading
Loading
@@ -71,3 +71,59 @@ func TestUploadPack(t *testing.T) {
require.Empty(t, testServer.ReceivedMD["some-other-ff"])
require.Equal(t, testServer.ReceivedMD["x-gitlab-correlation-id"][0], "a-correlation-id")
}
func TestUploadPack_withSidechannel(t *testing.T) {
gitalyAddress, testServer := testserver.StartGitalyServer(t)
requests := requesthandlers.BuildAllowedWithGitalyHandlersWithSidechannel(t, gitalyAddress)
url := testserver.StartHttpServer(t, requests)
output := &bytes.Buffer{}
input := &bytes.Buffer{}
userId := "1"
repo := "group/repo"
env := sshenv.Env{
IsSSHConnection: true,
OriginalCommand: "git-upload-pack " + repo,
RemoteAddr: "127.0.0.1",
}
args := &commandargs.Shell{
GitlabKeyId: userId,
CommandType: commandargs.UploadPack,
SshArgs: []string{"git-upload-pack", repo},
Env: env,
}
cmd := &Command{
Config: &config.Config{GitlabUrl: url},
Args: args,
ReadWriter: &readwriter.ReadWriter{ErrOut: output, Out: output, In: input},
}
ctx := correlation.ContextWithCorrelation(context.Background(), "a-correlation-id")
ctx = correlation.ContextWithClientName(ctx, "gitlab-shell-tests")
err := cmd.Execute(ctx)
require.NoError(t, err)
require.Equal(t, "SSHUploadPackWithSidechannel: "+repo, output.String())
for k, v := range map[string]string{
"gitaly-feature-cache_invalidator": "true",
"gitaly-feature-inforef_uploadpack_cache": "false",
"x-gitlab-client-name": "gitlab-shell-tests-git-upload-pack",
"key_id": "123",
"user_id": "1",
"remote_ip": "127.0.0.1",
"key_type": "key",
} {
actual := testServer.ReceivedMD[k]
require.Len(t, actual, 1)
require.Equal(t, v, actual[0])
}
require.Empty(t, testServer.ReceivedMD["some-other-ff"])
require.Equal(t, testServer.ReceivedMD["x-gitlab-correlation-id"][0], "a-correlation-id")
}
Loading
Loading
@@ -29,21 +29,23 @@ import (
// GitalyHandlerFunc implementations are responsible for making
// an appropriate Gitaly call using the provided client and context
// and returning an error from the Gitaly call.
type GitalyHandlerFunc func(ctx context.Context, client *grpc.ClientConn) (int32, error)
type GitalyHandlerFunc func(ctx context.Context, client *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error)
type GitalyCommand struct {
Config *config.Config
ServiceName string
Address string
Token string
Features map[string]string
Config *config.Config
ServiceName string
Address string
Token string
Features map[string]string
DialSidechannel bool
}
// RunGitalyCommand provides a bootstrap for Gitaly commands executed
// through GitLab-Shell. It ensures that logging, tracing and other
// common concerns are configured before executing the `handler`.
func (gc *GitalyCommand) RunGitalyCommand(ctx context.Context, handler GitalyHandlerFunc) error {
conn, err := getConn(ctx, gc)
registry := client.NewSidechannelRegistry(log.ContextLogger(ctx))
conn, err := getConn(ctx, gc, registry)
if err != nil {
log.ContextLogger(ctx).WithError(fmt.Errorf("RunGitalyCommand: %v", err)).Error("Failed to get connection to execute Git command")
Loading
Loading
@@ -53,7 +55,7 @@ func (gc *GitalyCommand) RunGitalyCommand(ctx context.Context, handler GitalyHan
childCtx := withOutgoingMetadata(ctx, gc.Features)
ctxlog := log.ContextLogger(childCtx)
exitStatus, err := handler(childCtx, conn)
exitStatus, err := handler(childCtx, conn, registry)
if err != nil {
if grpcstatus.Convert(err).Code() == grpccodes.Unavailable {
Loading
Loading
@@ -116,7 +118,7 @@ func withOutgoingMetadata(ctx context.Context, features map[string]string) conte
return metadata.NewOutgoingContext(ctx, md)
}
func getConn(ctx context.Context, gc *GitalyCommand) (*grpc.ClientConn, error) {
func getConn(ctx context.Context, gc *GitalyCommand, registry *client.SidechannelRegistry) (*grpc.ClientConn, error) {
if gc.Address == "" {
return nil, fmt.Errorf("no gitaly_address given")
}
Loading
Loading
@@ -160,5 +162,9 @@ func getConn(ctx context.Context, gc *GitalyCommand) (*grpc.ClientConn, error) {
)
}
if gc.DialSidechannel {
return client.DialSidechannel(ctx, gc.Address, registry, connOpts)
}
return client.DialContext(ctx, gc.Address, connOpts)
}
Loading
Loading
@@ -11,14 +11,15 @@ import (
"google.golang.org/grpc/metadata"
grpcstatus "google.golang.org/grpc/status"
"gitlab.com/gitlab-org/gitaly/v14/client"
pb "gitlab.com/gitlab-org/gitaly/v14/proto/go/gitalypb"
"gitlab.com/gitlab-org/gitlab-shell/internal/config"
"gitlab.com/gitlab-org/gitlab-shell/internal/gitlabnet/accessverifier"
"gitlab.com/gitlab-org/gitlab-shell/internal/sshenv"
)
func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn) (int32, error) {
return func(ctx context.Context, client *grpc.ClientConn) (int32, error) {
func makeHandler(t *testing.T, err error) func(context.Context, *grpc.ClientConn, *client.SidechannelRegistry) (int32, error) {
return func(ctx context.Context, client *grpc.ClientConn, registry *client.SidechannelRegistry) (int32, error) {
require.NotNil(t, ctx)
require.NotNil(t, client)
Loading
Loading
@@ -86,7 +87,7 @@ func TestRunGitalyCommandMetadata(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
cmd := tt.gc
err := cmd.RunGitalyCommand(context.Background(), func(ctx context.Context, _ *grpc.ClientConn) (int32, error) {
err := cmd.RunGitalyCommand(context.Background(), func(ctx context.Context, _ *grpc.ClientConn, _ *client.SidechannelRegistry) (int32, error) {
md, exists := metadata.FromOutgoingContext(ctx)
require.True(t, exists)
require.Equal(t, len(tt.want), md.Len())
Loading
Loading
Loading
Loading
@@ -29,6 +29,14 @@ func BuildDisallowedByApiHandlers(t *testing.T) []testserver.TestRequestHandler
}
func BuildAllowedWithGitalyHandlers(t *testing.T, gitalyAddress string) []testserver.TestRequestHandler {
return buildAllowedWithGitalyHandlers(t, gitalyAddress, false)
}
func BuildAllowedWithGitalyHandlersWithSidechannel(t *testing.T, gitalyAddress string) []testserver.TestRequestHandler {
return buildAllowedWithGitalyHandlers(t, gitalyAddress, true)
}
func buildAllowedWithGitalyHandlers(t *testing.T, gitalyAddress string, useSidechannel bool) []testserver.TestRequestHandler {
requests := []testserver.TestRequestHandler{
{
Path: "/api/v4/internal/allowed",
Loading
Loading
@@ -56,6 +64,9 @@ func BuildAllowedWithGitalyHandlers(t *testing.T, gitalyAddress string) []testse
},
},
}
if useSidechannel {
body["gitaly"].(map[string]interface{})["use_sidechannel"] = true
}
require.NoError(t, json.NewEncoder(w).Encode(body))
},
},
Loading
Loading
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment