2021-04-09 01:25:57 +03:00
// Copyright 2021 The Gitea Authors. All rights reserved.
2022-11-27 21:20:29 +03:00
// SPDX-License-Identifier: MIT
2021-04-09 01:25:57 +03:00
package lfs
import (
"bytes"
"context"
"errors"
"fmt"
2023-09-18 11:40:50 +03:00
"io"
2021-04-09 01:25:57 +03:00
"net/http"
"net/url"
"strings"
2021-07-24 19:03:58 +03:00
"code.gitea.io/gitea/modules/json"
2021-04-09 01:25:57 +03:00
"code.gitea.io/gitea/modules/log"
2021-08-18 16:10:39 +03:00
"code.gitea.io/gitea/modules/proxy"
2024-10-30 08:41:55 +03:00
"code.gitea.io/gitea/modules/setting"
2024-11-04 07:49:08 +03:00
"golang.org/x/sync/errgroup"
2021-04-09 01:25:57 +03:00
)
// HTTPClient is used to communicate with the LFS server
// https://github.com/git-lfs/git-lfs/blob/main/docs/api/batch.md
type HTTPClient struct {
client * http . Client
endpoint string
transfers map [ string ] TransferAdapter
}
2021-06-14 20:20:43 +03:00
// BatchSize returns the preferred size of batchs to process
func ( c * HTTPClient ) BatchSize ( ) int {
2024-10-30 08:41:55 +03:00
return setting . LFSClient . BatchSize
2021-06-14 20:20:43 +03:00
}
2021-11-20 12:34:05 +03:00
func newHTTPClient ( endpoint * url . URL , httpTransport * http . Transport ) * HTTPClient {
if httpTransport == nil {
httpTransport = & http . Transport {
Proxy : proxy . Proxy ( ) ,
}
}
2021-08-18 16:10:39 +03:00
hc := & http . Client {
2021-11-20 12:34:05 +03:00
Transport : httpTransport ,
2021-08-18 16:10:39 +03:00
}
2021-04-09 01:25:57 +03:00
2023-09-18 11:40:50 +03:00
basic := & BasicTransferAdapter { hc }
2021-04-09 01:25:57 +03:00
client := & HTTPClient {
2023-09-18 11:40:50 +03:00
client : hc ,
endpoint : strings . TrimSuffix ( endpoint . String ( ) , "/" ) ,
transfers : map [ string ] TransferAdapter {
basic . Name ( ) : basic ,
} ,
2021-04-09 01:25:57 +03:00
}
return client
}
func ( c * HTTPClient ) transferNames ( ) [ ] string {
keys := make ( [ ] string , len ( c . transfers ) )
i := 0
for k := range c . transfers {
keys [ i ] = k
i ++
}
return keys
}
func ( c * HTTPClient ) batch ( ctx context . Context , operation string , objects [ ] Pointer ) ( * BatchResponse , error ) {
2021-06-14 20:20:43 +03:00
log . Trace ( "BATCH operation with objects: %v" , objects )
2021-04-09 01:25:57 +03:00
url := fmt . Sprintf ( "%s/objects/batch" , c . endpoint )
request := & BatchRequest { operation , c . transferNames ( ) , nil , objects }
payload := new ( bytes . Buffer )
2021-07-24 19:03:58 +03:00
err := json . NewEncoder ( payload ) . Encode ( request )
2021-04-09 01:25:57 +03:00
if err != nil {
2021-06-14 20:20:43 +03:00
log . Error ( "Error encoding json: %v" , err )
return nil , err
2021-04-09 01:25:57 +03:00
}
2023-12-28 06:59:00 +03:00
req , err := createRequest ( ctx , http . MethodPost , url , map [ string ] string { "Content-Type" : MediaType } , payload )
2021-04-09 01:25:57 +03:00
if err != nil {
2021-06-14 20:20:43 +03:00
return nil , err
2021-04-09 01:25:57 +03:00
}
2023-09-18 11:40:50 +03:00
res , err := performRequest ( ctx , c . client , req )
2021-04-09 01:25:57 +03:00
if err != nil {
2021-06-14 20:20:43 +03:00
return nil , err
2021-04-09 01:25:57 +03:00
}
defer res . Body . Close ( )
var response BatchResponse
2021-07-24 19:03:58 +03:00
err = json . NewDecoder ( res . Body ) . Decode ( & response )
2021-04-09 01:25:57 +03:00
if err != nil {
2021-06-14 20:20:43 +03:00
log . Error ( "Error decoding json: %v" , err )
return nil , err
2021-04-09 01:25:57 +03:00
}
if len ( response . Transfer ) == 0 {
response . Transfer = "basic"
}
return & response , nil
}
// Download reads the specific LFS object from the LFS server
2021-06-14 20:20:43 +03:00
func ( c * HTTPClient ) Download ( ctx context . Context , objects [ ] Pointer , callback DownloadCallback ) error {
return c . performOperation ( ctx , objects , callback , nil )
}
// Upload sends the specific LFS object to the LFS server
func ( c * HTTPClient ) Upload ( ctx context . Context , objects [ ] Pointer , callback UploadCallback ) error {
return c . performOperation ( ctx , objects , nil , callback )
}
2021-04-09 01:25:57 +03:00
2024-11-04 07:49:08 +03:00
// performOperation takes a slice of LFS object pointers, batches them, and performs the upload/download operations concurrently in each batch
2021-06-14 20:20:43 +03:00
func ( c * HTTPClient ) performOperation ( ctx context . Context , objects [ ] Pointer , dc DownloadCallback , uc UploadCallback ) error {
if len ( objects ) == 0 {
return nil
}
operation := "download"
if uc != nil {
operation = "upload"
}
result , err := c . batch ( ctx , operation , objects )
2021-04-09 01:25:57 +03:00
if err != nil {
2021-06-14 20:20:43 +03:00
return err
2021-04-09 01:25:57 +03:00
}
transferAdapter , ok := c . transfers [ result . Transfer ]
if ! ok {
2021-06-14 20:20:43 +03:00
return fmt . Errorf ( "TransferAdapter not found: %s" , result . Transfer )
2021-04-09 01:25:57 +03:00
}
2024-11-05 16:10:57 +03:00
if setting . LFSClient . BatchOperationConcurrency <= 0 {
panic ( "BatchOperationConcurrency must be greater than 0, forgot to init?" )
}
2024-11-04 07:49:08 +03:00
errGroup , groupCtx := errgroup . WithContext ( ctx )
errGroup . SetLimit ( setting . LFSClient . BatchOperationConcurrency )
2021-06-14 20:20:43 +03:00
for _ , object := range result . Objects {
2024-11-04 07:49:08 +03:00
errGroup . Go ( func ( ) error {
return performSingleOperation ( groupCtx , object , dc , uc , transferAdapter )
} )
}
2021-04-09 01:25:57 +03:00
2024-11-04 07:49:08 +03:00
// only the first error is returned, preserving legacy behavior before concurrency
return errGroup . Wait ( )
}
2021-06-14 20:20:43 +03:00
2024-11-04 07:49:08 +03:00
// performSingleOperation performs an LFS upload or download operation on a single object
func performSingleOperation ( ctx context . Context , object * ObjectResponse , dc DownloadCallback , uc UploadCallback , transferAdapter TransferAdapter ) error {
// the response from a lfs batch api request for this specific object id contained an error
if object . Error != nil {
log . Trace ( "Error on object %v: %v" , object . Pointer , object . Error )
2021-06-14 20:20:43 +03:00
2024-11-04 07:49:08 +03:00
// this was an 'upload' request inside the batch request
if uc != nil {
if _ , err := uc ( object . Pointer , object . Error ) ; err != nil {
2021-06-14 20:20:43 +03:00
return err
}
2024-11-04 07:49:08 +03:00
} else {
// this was NOT an 'upload' request inside the batch request, meaning it must be a 'download' request
if err := dc ( object . Pointer , nil , object . Error ) ; err != nil {
2021-06-14 20:20:43 +03:00
return err
}
2024-11-04 07:49:08 +03:00
}
// if the callback returns no err, then the error could be ignored, and the operations should continue
return nil
}
2021-06-14 20:20:43 +03:00
2024-11-04 07:49:08 +03:00
// the response from an lfs batch api request contained necessary upload/download fields to act upon
if uc != nil {
if len ( object . Actions ) == 0 {
log . Trace ( "%v already present on server" , object . Pointer )
return nil
}
2021-06-14 20:20:43 +03:00
2024-11-04 07:49:08 +03:00
link , ok := object . Actions [ "upload" ]
if ! ok {
return errors . New ( "missing action 'upload'" )
}
content , err := uc ( object . Pointer , nil )
if err != nil {
return err
}
2021-06-14 20:20:43 +03:00
2024-11-04 07:49:08 +03:00
err = transferAdapter . Upload ( ctx , link , object . Pointer , content )
if err != nil {
return err
}
link , ok = object . Actions [ "verify" ]
if ok {
if err := transferAdapter . Verify ( ctx , link , object . Pointer ) ; err != nil {
2021-06-14 20:20:43 +03:00
return err
}
}
2024-11-04 07:49:08 +03:00
} else {
link , ok := object . Actions [ "download" ]
if ! ok {
// no actions block in response, try legacy response schema
link , ok = object . Links [ "download" ]
}
if ! ok {
log . Debug ( "%+v" , object )
return errors . New ( "missing action 'download'" )
}
2021-06-14 20:20:43 +03:00
2024-11-04 07:49:08 +03:00
content , err := transferAdapter . Download ( ctx , link )
if err != nil {
return err
}
if err := dc ( object . Pointer , content , nil ) ; err != nil {
return err
}
}
2021-06-14 20:20:43 +03:00
return nil
2021-04-09 01:25:57 +03:00
}
2023-09-18 11:40:50 +03:00
// createRequest creates a new request, and sets the headers.
func createRequest ( ctx context . Context , method , url string , headers map [ string ] string , body io . Reader ) ( * http . Request , error ) {
log . Trace ( "createRequest: %s" , url )
req , err := http . NewRequestWithContext ( ctx , method , url , body )
if err != nil {
log . Error ( "Error creating request: %v" , err )
return nil , err
}
for key , value := range headers {
req . Header . Set ( key , value )
}
2024-06-12 01:22:28 +03:00
req . Header . Set ( "Accept" , AcceptHeader )
2023-09-18 11:40:50 +03:00
return req , nil
}
// performRequest sends a request, optionally performs a callback on the request and returns the response.
// If the status code is 200, the response is returned, and it will contain a non-nil Body.
// Otherwise, it will return an error, and the Body will be nil or closed.
func performRequest ( ctx context . Context , client * http . Client , req * http . Request ) ( * http . Response , error ) {
log . Trace ( "performRequest: %s" , req . URL )
res , err := client . Do ( req )
if err != nil {
select {
case <- ctx . Done ( ) :
return res , ctx . Err ( )
default :
}
log . Error ( "Error while processing request: %v" , err )
return res , err
}
if res . StatusCode != http . StatusOK {
defer res . Body . Close ( )
return res , handleErrorResponse ( res )
}
return res , nil
}
func handleErrorResponse ( resp * http . Response ) error {
var er ErrorResponse
err := json . NewDecoder ( resp . Body ) . Decode ( & er )
if err != nil {
if err == io . EOF {
return io . ErrUnexpectedEOF
}
log . Error ( "Error decoding json: %v" , err )
return err
}
2024-06-12 01:22:28 +03:00
log . Trace ( "ErrorResponse(%v): %v" , resp . Status , er )
2023-09-18 11:40:50 +03:00
return errors . New ( er . Message )
}