2023-07-21 10:42:01 +08:00
// Copyright 2023 The Gitea Authors. All rights reserved.
// SPDX-License-Identifier: MIT
package actions
import (
"crypto/md5"
"encoding/base64"
"fmt"
"io"
2023-08-09 01:21:48 +08:00
"path/filepath"
2023-07-21 10:42:01 +08:00
"sort"
"time"
"code.gitea.io/gitea/models/actions"
2023-11-24 11:49:41 +08:00
"code.gitea.io/gitea/models/db"
2023-07-21 10:42:01 +08:00
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/storage"
)
func saveUploadChunk ( st storage . ObjectStorage , ctx * ArtifactContext ,
artifact * actions . ActionArtifact ,
contentSize , runID int64 ,
) ( int64 , error ) {
// parse content-range header, format: bytes 0-1023/146515
contentRange := ctx . Req . Header . Get ( "Content-Range" )
start , end , length := int64 ( 0 ) , int64 ( 0 ) , int64 ( 0 )
if _ , err := fmt . Sscanf ( contentRange , "bytes %d-%d/%d" , & start , & end , & length ) ; err != nil {
2023-12-21 15:04:50 +08:00
log . Warn ( "parse content range error: %v, content-range: %s" , err , contentRange )
2023-07-21 10:42:01 +08:00
return - 1 , fmt . Errorf ( "parse content range error: %v" , err )
}
// build chunk store path
2023-12-21 15:04:50 +08:00
storagePath := fmt . Sprintf ( "tmp%d/%d-%d-%d-%d.chunk" , runID , runID , artifact . ID , start , end )
2023-07-21 10:42:01 +08:00
// use io.TeeReader to avoid reading all body to md5 sum.
// it writes data to hasher after reading end
// if hash is not matched, delete the read-end result
hasher := md5 . New ( )
r := io . TeeReader ( ctx . Req . Body , hasher )
// save chunk to storage
writtenSize , err := st . Save ( storagePath , r , - 1 )
if err != nil {
return - 1 , fmt . Errorf ( "save chunk to storage error: %v" , err )
}
// check md5
reqMd5String := ctx . Req . Header . Get ( artifactXActionsResultsMD5Header )
chunkMd5String := base64 . StdEncoding . EncodeToString ( hasher . Sum ( nil ) )
log . Info ( "[artifact] check chunk md5, sum: %s, header: %s" , chunkMd5String , reqMd5String )
// if md5 not match, delete the chunk
if reqMd5String != chunkMd5String || writtenSize != contentSize {
if err := st . Delete ( storagePath ) ; err != nil {
log . Error ( "Error deleting chunk: %s, %v" , storagePath , err )
}
return - 1 , fmt . Errorf ( "md5 not match" )
}
log . Info ( "[artifact] save chunk %s, size: %d, artifact id: %d, start: %d, end: %d" ,
storagePath , contentSize , artifact . ID , start , end )
// return chunk total size
return length , nil
}
type chunkFileItem struct {
2023-12-21 15:04:50 +08:00
RunID int64
2023-07-21 10:42:01 +08:00
ArtifactID int64
Start int64
End int64
Path string
}
func listChunksByRunID ( st storage . ObjectStorage , runID int64 ) ( map [ int64 ] [ ] * chunkFileItem , error ) {
storageDir := fmt . Sprintf ( "tmp%d" , runID )
var chunks [ ] * chunkFileItem
2023-12-21 15:04:50 +08:00
if err := st . IterateObjects ( storageDir , func ( fpath string , obj storage . Object ) error {
baseName := filepath . Base ( fpath )
// when read chunks from storage, it only contains storage dir and basename,
// no matter the subdirectory setting in storage config
item := chunkFileItem { Path : storageDir + "/" + baseName }
if _ , err := fmt . Sscanf ( baseName , "%d-%d-%d-%d.chunk" , & item . RunID , & item . ArtifactID , & item . Start , & item . End ) ; err != nil {
2023-07-21 10:42:01 +08:00
return fmt . Errorf ( "parse content range error: %v" , err )
}
chunks = append ( chunks , & item )
return nil
} ) ; err != nil {
return nil , err
}
// chunks group by artifact id
chunksMap := make ( map [ int64 ] [ ] * chunkFileItem )
for _ , c := range chunks {
chunksMap [ c . ArtifactID ] = append ( chunksMap [ c . ArtifactID ] , c )
}
return chunksMap , nil
}
func mergeChunksForRun ( ctx * ArtifactContext , st storage . ObjectStorage , runID int64 , artifactName string ) error {
// read all db artifacts by name
2023-11-24 11:49:41 +08:00
artifacts , err := db . Find [ actions . ActionArtifact ] ( ctx , actions . FindArtifactsOptions {
RunID : runID ,
ArtifactName : artifactName ,
} )
2023-07-21 10:42:01 +08:00
if err != nil {
return err
}
// read all uploading chunks from storage
chunksMap , err := listChunksByRunID ( st , runID )
if err != nil {
return err
}
// range db artifacts to merge chunks
for _ , art := range artifacts {
chunks , ok := chunksMap [ art . ID ]
if ! ok {
log . Debug ( "artifact %d chunks not found" , art . ID )
continue
}
if err := mergeChunksForArtifact ( ctx , chunks , st , art ) ; err != nil {
return err
}
}
return nil
}
func mergeChunksForArtifact ( ctx * ArtifactContext , chunks [ ] * chunkFileItem , st storage . ObjectStorage , artifact * actions . ActionArtifact ) error {
sort . Slice ( chunks , func ( i , j int ) bool {
return chunks [ i ] . Start < chunks [ j ] . Start
} )
allChunks := make ( [ ] * chunkFileItem , 0 )
startAt := int64 ( - 1 )
// check if all chunks are uploaded and in order and clean repeated chunks
for _ , c := range chunks {
// startAt is -1 means this is the first chunk
// previous c.ChunkEnd + 1 == c.ChunkStart means this chunk is in order
// StartAt is not -1 and c.ChunkStart is not startAt + 1 means there is a chunk missing
if c . Start == ( startAt + 1 ) {
allChunks = append ( allChunks , c )
startAt = c . End
}
}
// if the last chunk.End + 1 is not equal to chunk.ChunkLength, means chunks are not uploaded completely
if startAt + 1 != artifact . FileCompressedSize {
log . Debug ( "[artifact] chunks are not uploaded completely, artifact_id: %d" , artifact . ID )
return nil
}
// use multiReader
readers := make ( [ ] io . Reader , 0 , len ( allChunks ) )
closeReaders := func ( ) {
for _ , r := range readers {
_ = r . ( io . Closer ) . Close ( ) // it guarantees to be io.Closer by the following loop's Open function
}
readers = nil
}
defer closeReaders ( )
for _ , c := range allChunks {
var readCloser io . ReadCloser
var err error
if readCloser , err = st . Open ( c . Path ) ; err != nil {
return fmt . Errorf ( "open chunk error: %v, %s" , err , c . Path )
}
readers = append ( readers , readCloser )
}
mergedReader := io . MultiReader ( readers ... )
// if chunk is gzip, use gz as extension
// download-artifact action will use content-encoding header to decide if it should decompress the file
extension := "chunk"
if artifact . ContentEncoding == "gzip" {
extension = "chunk.gz"
}
// save merged file
storagePath := fmt . Sprintf ( "%d/%d/%d.%s" , artifact . RunID % 255 , artifact . ID % 255 , time . Now ( ) . UnixNano ( ) , extension )
written , err := st . Save ( storagePath , mergedReader , - 1 )
if err != nil {
return fmt . Errorf ( "save merged file error: %v" , err )
}
if written != artifact . FileCompressedSize {
return fmt . Errorf ( "merged file size is not equal to chunk length" )
}
defer func ( ) {
closeReaders ( ) // close before delete
// drop chunks
for _ , c := range chunks {
if err := st . Delete ( c . Path ) ; err != nil {
log . Warn ( "Error deleting chunk: %s, %v" , c . Path , err )
}
}
} ( )
// save storage path to artifact
2024-01-17 11:21:16 +08:00
log . Debug ( "[artifact] merge chunks to artifact: %d, %s, old:%s" , artifact . ID , storagePath , artifact . StoragePath )
// if artifact is already uploaded, delete the old file
if artifact . StoragePath != "" {
if err := st . Delete ( artifact . StoragePath ) ; err != nil {
log . Warn ( "Error deleting old artifact: %s, %v" , artifact . StoragePath , err )
}
}
2023-07-21 10:42:01 +08:00
artifact . StoragePath = storagePath
2023-09-06 15:41:06 +08:00
artifact . Status = int64 ( actions . ArtifactStatusUploadConfirmed )
2023-07-21 10:42:01 +08:00
if err := actions . UpdateArtifactByID ( ctx , artifact . ID , artifact ) ; err != nil {
return fmt . Errorf ( "update artifact error: %v" , err )
}
return nil
}