s3: bugfix, use sync.Map instead of map for storing multi part uploads references

add storage lock in GetIndexContent

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
Petu Eusebiu 2022-03-02 20:29:48 +02:00 committed by Ramkumar Chinchani
parent bf21435d42
commit 9cffbcaccb
2 changed files with 55 additions and 42 deletions

View File

@ -45,8 +45,8 @@ type ObjectStorage struct {
// We must keep track of multi part uploads to s3, because the lib
// which we are using doesn't cancel multiparts uploads
// see: https://github.com/distribution/distribution/blob/main/registry/storage/driver/s3-aws/s3.go#L545
isMultiPartUpload map[string]bool
metrics monitoring.MetricServer
multiPartUploads sync.Map
metrics monitoring.MetricServer
}
func (is *ObjectStorage) RootDir() string {
@ -67,13 +67,13 @@ func NewImageStore(rootDir string, gc bool, gcDelay time.Duration, dedupe, commi
log zlog.Logger, metrics monitoring.MetricServer,
store driver.StorageDriver) storage.ImageStore {
imgStore := &ObjectStorage{
rootDir: rootDir,
store: store,
lock: &sync.RWMutex{},
blobUploads: make(map[string]storage.BlobUpload),
log: log.With().Caller().Logger(),
isMultiPartUpload: make(map[string]bool),
metrics: metrics,
rootDir: rootDir,
store: store,
lock: &sync.RWMutex{},
blobUploads: make(map[string]storage.BlobUpload),
log: log.With().Caller().Logger(),
multiPartUploads: sync.Map{},
metrics: metrics,
}
return imgStore
@ -281,16 +281,11 @@ func (is *ObjectStorage) GetRepositories() ([]string, error) {
// GetImageTags returns a list of image tags available in the specified repository.
func (is *ObjectStorage) GetImageTags(repo string) ([]string, error) {
var lockLatency time.Time
dir := path.Join(is.rootDir, repo)
if fi, err := is.store.Stat(context.Background(), dir); err != nil || !fi.IsDir() {
return nil, zerr.ErrRepoNotFound
}
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
buf, err := is.GetIndexContent(repo)
if err != nil {
return nil, err
@ -324,9 +319,6 @@ func (is *ObjectStorage) GetImageManifest(repo string, reference string) ([]byte
return nil, "", "", zerr.ErrRepoNotFound
}
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
buf, err := is.GetIndexContent(repo)
if err != nil {
return nil, "", "", err
@ -368,11 +360,14 @@ func (is *ObjectStorage) GetImageManifest(repo string, reference string) ([]byte
return nil, "", "", zerr.ErrManifestNotFound
}
p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded())
manifestPath := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded())
buf, err = is.store.GetContent(context.Background(), p)
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
buf, err = is.store.GetContent(context.Background(), manifestPath)
if err != nil {
is.log.Error().Err(err).Str("blob", p).Msg("failed to read manifest")
is.log.Error().Err(err).Str("blob", manifestPath).Msg("failed to read manifest")
return nil, "", "", err
}
@ -451,11 +446,6 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy
refIsDigest = true
}
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
dir := path.Join(is.rootDir, repo)
buf, err := is.GetIndexContent(repo)
@ -463,6 +453,11 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy
return "", err
}
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
var index ispec.Index
if err := json.Unmarshal(buf, &index); err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON")
@ -574,9 +569,6 @@ func (is *ObjectStorage) DeleteImageManifest(repo string, reference string) erro
isTag = true
}
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
buf, err := is.GetIndexContent(repo)
if err != nil {
return err
@ -623,6 +615,9 @@ func (is *ObjectStorage) DeleteImageManifest(repo string, reference string) erro
return zerr.ErrManifestNotFound
}
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
// now update "index.json"
dir = path.Join(is.rootDir, repo)
file := path.Join(dir, "index.json")
@ -707,8 +702,8 @@ func (is *ObjectStorage) GetBlobUpload(repo string, uuid string) (int64, error)
// if it's not a multipart upload check for the regular empty file
// created by NewBlobUpload, it should have 0 size every time
isMultiPartStarted, ok := is.isMultiPartUpload[blobUploadPath]
if !isMultiPartStarted || !ok {
_, hasStarted := is.multiPartUploads.Load(blobUploadPath)
if !hasStarted {
binfo, err := is.store.Stat(context.Background(), blobUploadPath)
if err != nil {
var perr driver.PathNotFoundError
@ -800,7 +795,9 @@ func (is *ObjectStorage) PutBlobChunk(repo string, uuid string, from int64, to i
defer file.Close()
if from != file.Size() {
// cancel multipart upload created earlier
// cancel multipart upload
is.multiPartUploads.Delete(blobUploadPath)
err := file.Cancel()
if err != nil {
is.log.Error().Err(err).Msg("failed to cancel multipart upload")
@ -830,8 +827,6 @@ func (is *ObjectStorage) PutBlobChunk(repo string, uuid string, from int64, to i
return -1, err
}
is.isMultiPartUpload[blobUploadPath] = true
return int64(nbytes), err
}
@ -843,8 +838,8 @@ func (is *ObjectStorage) BlobUploadInfo(repo string, uuid string) (int64, error)
// if it's not a multipart upload check for the regular empty file
// created by NewBlobUpload, it should have 0 size every time
isMultiPartStarted, ok := is.isMultiPartUpload[blobUploadPath]
if !isMultiPartStarted || !ok {
_, hasStarted := is.multiPartUploads.Load(blobUploadPath)
if !hasStarted {
uploadInfo, err := is.store.Stat(context.Background(), blobUploadPath)
if err != nil {
is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob")
@ -887,6 +882,8 @@ func (is *ObjectStorage) FinishBlobUpload(repo string, uuid string, body io.Read
return zerr.ErrBadBlobDigest
}
defer fileWriter.Close()
if err := fileWriter.Commit(); err != nil {
is.log.Error().Err(err).Msg("failed to commit file")
@ -922,6 +919,11 @@ func (is *ObjectStorage) FinishBlobUpload(repo string, uuid string, body io.Read
dst := is.BlobPath(repo, dstDigest)
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
if err := is.store.Move(context.Background(), src, dst); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to finish blob")
@ -929,8 +931,7 @@ func (is *ObjectStorage) FinishBlobUpload(repo string, uuid string, body io.Read
return err
}
// remove multipart upload, not needed anymore
delete(is.isMultiPartUpload, src)
is.multiPartUploads.Delete(src)
return nil
}
@ -1115,8 +1116,13 @@ func (is *ObjectStorage) GetReferrers(repo, digest string, mediaType string) ([]
}
func (is *ObjectStorage) GetIndexContent(repo string) ([]byte, error) {
var lockLatency time.Time
dir := path.Join(is.rootDir, repo)
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
buf, err := is.store.GetContent(context.Background(), path.Join(dir, "index.json"))
if err != nil {
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
@ -1181,21 +1187,23 @@ func writeFile(store driver.StorageDriver, filepath string, buf []byte) (int, er
return n, nil
}
// Because we can not create an empty multipart upload, we store multi part uploads
// so that we know when to create a fileWriter with append=true or with append=false
// Trying and handling errors results in weird s3 api errors.
// get a multipart upload FileWriter based on wheather or not one has already been started.
func getMultipartFileWriter(imgStore *ObjectStorage, filepath string) (driver.FileWriter, error) {
var file driver.FileWriter
var err error
isMultiPartStarted, ok := imgStore.isMultiPartUpload[filepath]
if !isMultiPartStarted || !ok {
_, hasStarted := imgStore.multiPartUploads.Load(filepath)
if !hasStarted {
// start multipart upload
file, err = imgStore.store.Writer(context.Background(), filepath, false)
if err != nil {
return file, err
}
imgStore.multiPartUploads.Store(filepath, true)
} else {
// continue multipart upload
file, err = imgStore.store.Writer(context.Background(), filepath, true)
if err != nil {
return file, err

View File

@ -1387,8 +1387,13 @@ func (is *ImageStoreFS) GetBlobContent(repo string, digest string) ([]byte, erro
}
func (is *ImageStoreFS) GetIndexContent(repo string) ([]byte, error) {
var lockLatency time.Time
dir := path.Join(is.rootDir, repo)
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
if err != nil {
if os.IsNotExist(err) {