fix(cache): make dynamoDB aware of orignal/deduped blobs (#1881)
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
parent
039648a445
commit
53f97eb265
60
pkg/storage/cache/boltdb_test.go
vendored
60
pkg/storage/cache/boltdb_test.go
vendored
@ -60,5 +60,65 @@ func TestBoltDBCache(t *testing.T) {
|
||||
err = cacheDriver.PutBlob("key", "")
|
||||
So(err, ShouldNotBeNil)
|
||||
So(err, ShouldEqual, errors.ErrEmptyValue)
|
||||
|
||||
cacheDriver, _ = storage.Create("boltdb", cache.BoltDBDriverParameters{t.TempDir(), "cache_test", false}, log)
|
||||
So(cacheDriver, ShouldNotBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key1", "originalBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err = cacheDriver.GetBlob("key1")
|
||||
So(val, ShouldEqual, "originalBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err = cacheDriver.GetBlob("key1")
|
||||
So(val, ShouldEqual, "originalBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.DeleteBlob("key1", "originalBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err = cacheDriver.GetBlob("key1")
|
||||
So(val, ShouldEqual, "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// should be empty
|
||||
val, err = cacheDriver.GetBlob("key1")
|
||||
So(err, ShouldNotBeNil)
|
||||
So(val, ShouldBeEmpty)
|
||||
|
||||
// try to add three same values
|
||||
err = cacheDriver.PutBlob("key2", "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key2", "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key2", "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err = cacheDriver.GetBlob("key2")
|
||||
So(val, ShouldEqual, "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.DeleteBlob("key2", "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// should be empty
|
||||
val, err = cacheDriver.GetBlob("key2")
|
||||
So(err, ShouldNotBeNil)
|
||||
So(val, ShouldBeEmpty)
|
||||
})
|
||||
}
|
||||
|
119
pkg/storage/cache/dynamodb.go
vendored
119
pkg/storage/cache/dynamodb.go
vendored
@ -26,11 +26,11 @@ type DynamoDBDriverParameters struct {
|
||||
}
|
||||
|
||||
type Blob struct {
|
||||
Digest string `dynamodbav:"Digest,string"`
|
||||
BlobPath []string `dynamodbav:"BlobPath,stringset"`
|
||||
Digest string `dynamodbav:"Digest,string"`
|
||||
DuplicateBlobPath []string `dynamodbav:"DuplicateBlobPath,stringset"`
|
||||
OriginalBlobPath string `dynamodbav:"OriginalBlobPath,string"`
|
||||
}
|
||||
|
||||
// Use ONLY for tests.
|
||||
func (d *DynamoDBDriver) NewTable(tableName string) error {
|
||||
//nolint:gomnd
|
||||
_, err := d.client.CreateTable(context.TODO(), &dynamodb.CreateTableInput{
|
||||
@ -107,7 +107,7 @@ func (d *DynamoDBDriver) Name() string {
|
||||
return "dynamodb"
|
||||
}
|
||||
|
||||
// Returns the first path of the blob if it exists.
|
||||
// Returns the original blob.
|
||||
func (d *DynamoDBDriver) GetBlob(digest godigest.Digest) (string, error) {
|
||||
resp, err := d.client.GetItem(context.TODO(), &dynamodb.GetItemInput{
|
||||
TableName: aws.String(d.tableName),
|
||||
@ -129,11 +129,7 @@ func (d *DynamoDBDriver) GetBlob(digest godigest.Digest) (string, error) {
|
||||
|
||||
_ = attributevalue.UnmarshalMap(resp.Item, &out)
|
||||
|
||||
if len(out.BlobPath) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return out.BlobPath[0], nil
|
||||
return out.OriginalBlobPath, nil
|
||||
}
|
||||
|
||||
func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error {
|
||||
@ -143,17 +139,18 @@ func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error {
|
||||
return zerr.ErrEmptyValue
|
||||
}
|
||||
|
||||
marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()})
|
||||
expression := "ADD BlobPath :i"
|
||||
if originBlob, _ := d.GetBlob(digest); originBlob == "" {
|
||||
// first entry, so add original blob
|
||||
if err := d.putOriginBlob(digest, path); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
expression := "ADD DuplicateBlobPath :i"
|
||||
attrPath := types.AttributeValueMemberSS{Value: []string{path}}
|
||||
|
||||
if _, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
|
||||
Key: marshaledKey,
|
||||
TableName: &d.tableName,
|
||||
UpdateExpression: &expression,
|
||||
ExpressionAttributeValues: map[string]types.AttributeValue{":i": &attrPath},
|
||||
}); err != nil {
|
||||
d.log.Error().Err(err)
|
||||
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil {
|
||||
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to put blob")
|
||||
|
||||
return err
|
||||
}
|
||||
@ -184,7 +181,11 @@ func (d *DynamoDBDriver) HasBlob(digest godigest.Digest, path string) bool {
|
||||
|
||||
_ = attributevalue.UnmarshalMap(resp.Item, &out)
|
||||
|
||||
for _, item := range out.BlobPath {
|
||||
if out.OriginalBlobPath == path {
|
||||
return true
|
||||
}
|
||||
|
||||
for _, item := range out.DuplicateBlobPath {
|
||||
if item == path {
|
||||
return true
|
||||
}
|
||||
@ -198,24 +199,28 @@ func (d *DynamoDBDriver) HasBlob(digest godigest.Digest, path string) bool {
|
||||
func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error {
|
||||
marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()})
|
||||
|
||||
expression := "DELETE BlobPath :i"
|
||||
expression := "DELETE DuplicateBlobPath :i"
|
||||
attrPath := types.AttributeValueMemberSS{Value: []string{path}}
|
||||
|
||||
_, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
|
||||
Key: marshaledKey,
|
||||
TableName: &d.tableName,
|
||||
UpdateExpression: &expression,
|
||||
ExpressionAttributeValues: map[string]types.AttributeValue{":i": &attrPath},
|
||||
})
|
||||
if err != nil {
|
||||
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil {
|
||||
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to delete")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
result, _ := d.GetBlob(digest)
|
||||
originBlob, _ := d.GetBlob(digest)
|
||||
// if original blob is the one deleted
|
||||
if originBlob == path {
|
||||
// move duplicate blob to original, storage will move content here
|
||||
originBlob, _ = d.getDuplicateBlob(digest)
|
||||
if originBlob != "" {
|
||||
if err := d.putOriginBlob(digest, originBlob); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if result == "" {
|
||||
if originBlob == "" {
|
||||
d.log.Debug().Str("digest", digest.String()).Str("path", path).Msg("deleting empty bucket")
|
||||
|
||||
_, _ = d.client.DeleteItem(context.TODO(), &dynamodb.DeleteItemInput{
|
||||
@ -226,3 +231,59 @@ func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DynamoDBDriver) getDuplicateBlob(digest godigest.Digest) (string, error) {
|
||||
resp, err := d.client.GetItem(context.TODO(), &dynamodb.GetItemInput{
|
||||
TableName: aws.String(d.tableName),
|
||||
Key: map[string]types.AttributeValue{
|
||||
"Digest": &types.AttributeValueMemberS{Value: digest.String()},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
d.log.Error().Err(err).Str("tableName", d.tableName).Msg("failed to get blob")
|
||||
|
||||
return "", err
|
||||
}
|
||||
|
||||
out := Blob{}
|
||||
|
||||
if resp.Item == nil {
|
||||
return "", zerr.ErrCacheMiss
|
||||
}
|
||||
|
||||
_ = attributevalue.UnmarshalMap(resp.Item, &out)
|
||||
|
||||
if len(out.DuplicateBlobPath) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
return out.DuplicateBlobPath[0], nil
|
||||
}
|
||||
|
||||
func (d *DynamoDBDriver) putOriginBlob(digest godigest.Digest, path string) error {
|
||||
expression := "SET OriginalBlobPath = :s"
|
||||
attrPath := types.AttributeValueMemberS{Value: path}
|
||||
|
||||
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":s": &attrPath}); err != nil {
|
||||
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("unable to put original blob")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *DynamoDBDriver) updateItem(digest godigest.Digest, expression string,
|
||||
expressionAttVals map[string]types.AttributeValue,
|
||||
) error {
|
||||
marshaledKey, _ := attributevalue.MarshalMap(map[string]interface{}{"Digest": digest.String()})
|
||||
|
||||
_, err := d.client.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
|
||||
Key: marshaledKey,
|
||||
TableName: &d.tableName,
|
||||
UpdateExpression: &expression,
|
||||
ExpressionAttributeValues: expressionAttVals,
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
57
pkg/storage/cache/dynamodb_test.go
vendored
57
pkg/storage/cache/dynamodb_test.go
vendored
@ -100,5 +100,62 @@ func TestDynamoDB(t *testing.T) {
|
||||
|
||||
err = cacheDriver.DeleteBlob(keyDigest, path.Join(dir, "value2"))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key1", "originalBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err = cacheDriver.GetBlob("key1")
|
||||
So(val, ShouldEqual, "originalBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err = cacheDriver.GetBlob("key1")
|
||||
So(val, ShouldEqual, "originalBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key1", "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.DeleteBlob("key1", "originalBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err = cacheDriver.GetBlob("key1")
|
||||
So(val, ShouldEqual, "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.DeleteBlob("key1", "duplicateBlobPath")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// should be empty
|
||||
val, err = cacheDriver.GetBlob("key1")
|
||||
So(err, ShouldNotBeNil)
|
||||
So(val, ShouldBeEmpty)
|
||||
|
||||
// try to add three same values
|
||||
err = cacheDriver.PutBlob("key2", "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key2", "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("key2", "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
val, err = cacheDriver.GetBlob("key2")
|
||||
So(val, ShouldEqual, "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.DeleteBlob("key2", "duplicate")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// should be empty
|
||||
val, err = cacheDriver.GetBlob("key2")
|
||||
So(err, ShouldNotBeNil)
|
||||
So(val, ShouldBeEmpty)
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user