As we reevaluate how to best support and maintain Staging Ref in the future, we encourage development teams using this environment to highlight their use cases in the following issue: https://gitlab.com/gitlab-com/gl-infra/software-delivery/framework/software-delivery-framework-issue-tracker/-/issues/36.

Skip to content
Snippets Groups Projects
Unverified Commit 302b7c2b authored by Igor Drozdov's avatar Igor Drozdov
Browse files

Poc: Send Geo requests as streamed multipart requests

parent 51eab44e
No related branches found
No related tags found
No related merge requests found
Loading
Loading
@@ -150,21 +150,10 @@ func (c *GitlabNetClient) Post(ctx context.Context, path string, data interface{
return c.DoRequest(ctx, http.MethodPost, normalizePath(path), data)
}
func (c *GitlabNetClient) DoRequest(ctx context.Context, method, path string, data interface{}) (*http.Response, error) {
request, err := newRequest(ctx, method, c.httpClient.Host, path, data)
if err != nil {
return nil, err
}
retryableRequest, err := newRetryableRequest(ctx, method, c.httpClient.Host, path, data)
if err != nil {
return nil, err
}
func (c *GitlabNetClient) prepareRequest(request *http.Request) error {
user, password := c.user, c.password
if user != "" && password != "" {
request.SetBasicAuth(user, password)
retryableRequest.SetBasicAuth(user, password)
}
claims := jwt.RegisteredClaims{
Loading
Loading
@@ -175,44 +164,33 @@ func (c *GitlabNetClient) DoRequest(ctx context.Context, method, path string, da
secretBytes := []byte(strings.TrimSpace(c.secret))
tokenString, err := jwt.NewWithClaims(jwt.SigningMethodHS256, claims).SignedString(secretBytes)
if err != nil {
return nil, err
return err
}
request.Header.Set(apiSecretHeaderName, tokenString)
retryableRequest.Header.Set(apiSecretHeaderName, tokenString)
originalRemoteIP, ok := ctx.Value(OriginalRemoteIPContextKey{}).(string)
originalRemoteIP, ok := request.Context().Value(OriginalRemoteIPContextKey{}).(string)
if ok {
request.Header.Add("X-Forwarded-For", originalRemoteIP)
retryableRequest.Header.Add("X-Forwarded-For", originalRemoteIP)
}
request.Header.Add("Content-Type", "application/json")
retryableRequest.Header.Add("Content-Type", "application/json")
request.Header.Add("User-Agent", c.userAgent)
retryableRequest.Header.Add("User-Agent", c.userAgent)
request.Close = true
retryableRequest.Close = true
start := time.Now()
return nil
}
var response *http.Response
var respErr error
if c.httpClient.HTTPClient != nil {
response, respErr = c.httpClient.HTTPClient.Do(request)
}
if os.Getenv("FF_GITLAB_SHELL_RETRYABLE_HTTP") == "1" && c.httpClient.RetryableHTTP != nil {
response, respErr = c.httpClient.RetryableHTTP.Do(retryableRequest)
}
func processResult(request *http.Request, response *http.Response, start time.Time, respErr error) error {
fields := log.Fields{
"method": method,
"method": request.Method,
"url": request.URL.String(),
"duration_ms": time.Since(start) / time.Millisecond,
}
logger := log.WithContextFields(ctx, fields)
logger := log.WithContextFields(request.Context(), fields)
if respErr != nil {
logger.WithError(respErr).Error("Internal API unreachable")
return nil, &ApiError{"Internal API unreachable"}
return &ApiError{"Internal API unreachable"}
}
if response != nil {
Loading
Loading
@@ -220,7 +198,7 @@ func (c *GitlabNetClient) DoRequest(ctx context.Context, method, path string, da
}
if err := parseError(response); err != nil {
logger.WithError(err).Error("Internal API error")
return nil, err
return err
}
if response.ContentLength >= 0 {
Loading
Loading
@@ -229,5 +207,55 @@ func (c *GitlabNetClient) DoRequest(ctx context.Context, method, path string, da
logger.Info("Finished HTTP request")
return nil
}
func (c *GitlabNetClient) AppendPath(path string) string {
return appendPath(c.httpClient.Host, path)
}
func (c *GitlabNetClient) DoRawRequest(request *http.Request) (*http.Response, error) {
c.prepareRequest(request)
start := time.Now()
response, respErr := c.httpClient.HTTPClient.Do(request)
if err := processResult(request, response, start, respErr); err != nil {
return nil, err
}
return response, nil
}
func (c *GitlabNetClient) DoRequest(ctx context.Context, method, path string, data interface{}) (*http.Response, error) {
request, err := newRequest(ctx, method, c.httpClient.Host, path, data)
if err != nil {
return nil, err
}
retryableRequest, err := newRetryableRequest(ctx, method, c.httpClient.Host, path, data)
if err != nil {
return nil, err
}
c.prepareRequest(request)
c.prepareRequest(retryableRequest.Request)
start := time.Now()
var response *http.Response
var respErr error
if c.httpClient.HTTPClient != nil {
response, respErr = c.httpClient.HTTPClient.Do(request)
}
if os.Getenv("FF_GITLAB_SHELL_RETRYABLE_HTTP") == "1" && c.httpClient.RetryableHTTP != nil {
response, respErr = c.httpClient.RetryableHTTP.Do(retryableRequest)
}
if err := processResult(request, response, start, respErr); err != nil {
return nil, err
}
return response, err
}
package customaction
import (
"bytes"
"context"
"errors"
"io"
"net/http"
"mime/multipart"
"gitlab.com/gitlab-org/labkit/log"
Loading
Loading
@@ -18,14 +18,12 @@ import (
)
type Request struct {
SecretToken []byte `json:"secret_token"`
Data accessverifier.CustomPayloadData `json:"data"`
Output []byte `json:"output"`
Output io.Reader
}
type Response struct {
Result []byte `json:"result"`
Message string `json:"message"`
}
type Command struct {
Loading
Loading
@@ -54,7 +52,7 @@ func (c *Command) processApiEndpoints(ctx context.Context, response *accessverif
data := response.Payload.Data
request := &Request{Data: data}
request.Data.UserId = response.Who
request.Data.UserId = response.Who
for _, endpoint := range data.ApiEndpoints {
ctxlog := log.WithContextFields(ctx, log.Fields{
Loading
Loading
@@ -64,64 +62,82 @@ func (c *Command) processApiEndpoints(ctx context.Context, response *accessverif
ctxlog.Info("customaction: processApiEndpoints: Performing custom action")
response, err := c.performRequest(ctx, client, endpoint, request)
httpRequest, err := c.prepareRequest(ctx, client.AppendPath(endpoint), request)
if err != nil {
return err
}
// Print to os.Stdout the result contained in the response
//
if err = c.displayResult(response.Result); err != nil {
if err := c.performRequest(ctx, client, httpRequest); err != nil {
return err
}
// In the context of the git push sequence of events, it's necessary to read
// stdin in order to capture output to pass onto subsequent commands
//
var output []byte
if c.EOFSent {
output, err = c.readFromStdin()
if err != nil {
return err
}
var w *io.PipeWriter
request.Output, w = io.Pipe()
go c.readFromStdin(w)
} else {
output = c.readFromStdinNoEOF()
// output = c.readFromStdinNoEOF()
}
ctxlog.WithFields(log.Fields{
"eof_sent": c.EOFSent,
"stdin_bytes": len(output),
// "stdin_bytes": len(output),
}).Debug("customaction: processApiEndpoints: stdin buffered")
request.Output = output
}
return nil
}
func (c *Command) performRequest(ctx context.Context, client *client.GitlabNetClient, endpoint string, request *Request) (*Response, error) {
response, err := client.DoRequest(ctx, http.MethodPost, endpoint, request)
func (c *Command) prepareRequest(ctx context.Context, endpoint string, request *Request) (*http.Request, error) {
body, pipeWriter := io.Pipe()
writer := multipart.NewWriter(pipeWriter)
go func() {
writer.WriteField("data[gl_id]", request.Data.UserId)
writer.WriteField("data[primary_repo]", request.Data.PrimaryRepo)
if request.Output != nil {
// Ignore errors, but may want to log them in a channel
binaryPart, _ := writer.CreateFormFile("output", "git-receive-pack")
io.Copy(binaryPart, request.Output)
}
writer.Close()
pipeWriter.Close()
}()
httpRequest, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, body)
if err != nil {
return nil, err
}
httpRequest.Header.Set("Content-Type", writer.FormDataContentType())
return httpRequest, nil
}
func (c *Command) performRequest(ctx context.Context, client *client.GitlabNetClient, request *http.Request) error {
response, err := client.DoRawRequest(request)
if err != nil {
return err
}
defer response.Body.Close()
cr := &Response{}
if err := gitlabnet.ParseJSON(response, cr); err != nil {
return nil, err
if _, err := io.Copy(c.ReadWriter.Out, response.Body); err != nil {
return err
}
return cr, nil
return nil
}
func (c *Command) readFromStdin() ([]byte, error) {
var output []byte
func (c *Command) readFromStdin(w *io.PipeWriter) {
var needsPackData bool
scanner := pktline.NewScanner(c.ReadWriter.In)
for scanner.Scan() {
line := scanner.Bytes()
output = append(output, line...)
w.Write(line)
if pktline.IsFlush(line) {
break
Loading
Loading
@@ -133,14 +149,10 @@ func (c *Command) readFromStdin() ([]byte, error) {
}
if needsPackData {
packData := new(bytes.Buffer)
_, err := io.Copy(packData, c.ReadWriter.In)
output = append(output, packData.Bytes()...)
return output, err
} else {
return output, nil
io.Copy(w, c.ReadWriter.In)
}
w.Close()
}
func (c *Command) readFromStdinNoEOF() []byte {
Loading
Loading
@@ -158,8 +170,3 @@ func (c *Command) readFromStdinNoEOF() []byte {
return output
}
func (c *Command) displayResult(result []byte) error {
_, err := io.Copy(c.ReadWriter.Out, bytes.NewReader(result))
return err
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment