fix(zb): fixed remote repositories cleanup (#1461)
fix(storage/local): also put deduped blobs in cache, not just origin blobs this caused an error when trying to delete deduped blobs from multiple repositories fix(storage/s3): check blob is present in cache before deleting this is an edge case where dedupe is false but cacheDriver is not nil (because in s3 we open the cache.db if storage find it in rootDir) it caused an error when trying to delete blobs uploaded with dedupe false Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
parent
9ca85e0937
commit
1b184ceef8
@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
@ -18,21 +19,83 @@ import (
|
||||
ispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"gopkg.in/resty.v1"
|
||||
|
||||
"zotregistry.io/zot/errors"
|
||||
"zotregistry.io/zot/pkg/api"
|
||||
"zotregistry.io/zot/pkg/test"
|
||||
)
|
||||
|
||||
func makeHTTPGetRequest(url string, resultPtr interface{}, client *resty.Client) error {
|
||||
resp, err := client.R().Get(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode() != http.StatusOK {
|
||||
log.Printf("unable to make GET request on %s, response status code: %d", url, resp.StatusCode())
|
||||
|
||||
return errors.New(string(resp.Body())) //nolint: goerr113
|
||||
}
|
||||
|
||||
err = json.Unmarshal(resp.Body(), resultPtr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func makeHTTPDeleteRequest(url string, client *resty.Client) error {
|
||||
resp, err := client.R().Delete(url)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if resp.StatusCode() != http.StatusAccepted {
|
||||
log.Printf("unable to make DELETE request on %s, response status code: %d", url, resp.StatusCode())
|
||||
|
||||
return errors.New(string(resp.Body())) //nolint: goerr113
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func deleteTestRepo(repos []string, url string, client *resty.Client) error {
|
||||
for _, repo := range repos {
|
||||
resp, err := client.R().Delete((fmt.Sprintf("%s/v2/%s/", url, repo)))
|
||||
var tags api.ImageTags
|
||||
|
||||
// get tags
|
||||
err := makeHTTPGetRequest(fmt.Sprintf("%s/v2/%s/tags/list", url, repo), &tags, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// request specific check
|
||||
statusCode := resp.StatusCode()
|
||||
if statusCode != http.StatusAccepted {
|
||||
return errors.ErrUnknownCode
|
||||
for _, tag := range tags.Tags {
|
||||
var manifest ispec.Manifest
|
||||
|
||||
// first get tag manifest to get containing blobs
|
||||
err := makeHTTPGetRequest(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, tag), &manifest, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// delete blobs
|
||||
for _, blob := range manifest.Layers {
|
||||
err := makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, blob.Digest.String()), client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// delete config blob
|
||||
err = makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, manifest.Config.Digest.String()), client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// delete manifest
|
||||
err = makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, tag), client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -273,7 +336,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
|
||||
// request specific check
|
||||
statusCode = resp.StatusCode()
|
||||
if statusCode != http.StatusAccepted {
|
||||
return nil, repos, errors.ErrUnknownCode
|
||||
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
|
||||
}
|
||||
|
||||
loc := test.Location(url, resp)
|
||||
@ -311,7 +374,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
|
||||
// request specific check
|
||||
statusCode = resp.StatusCode()
|
||||
if statusCode != http.StatusCreated {
|
||||
return nil, repos, errors.ErrUnknownCode
|
||||
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
|
||||
}
|
||||
|
||||
// upload image config blob
|
||||
@ -325,7 +388,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
|
||||
// request specific check
|
||||
statusCode = resp.StatusCode()
|
||||
if statusCode != http.StatusAccepted {
|
||||
return nil, repos, errors.ErrUnknownCode
|
||||
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
|
||||
}
|
||||
|
||||
loc = test.Location(url, resp)
|
||||
@ -345,7 +408,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
|
||||
// request specific check
|
||||
statusCode = resp.StatusCode()
|
||||
if statusCode != http.StatusCreated {
|
||||
return nil, repos, errors.ErrUnknownCode
|
||||
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
|
||||
}
|
||||
|
||||
// create a manifest
|
||||
@ -388,7 +451,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo
|
||||
// request specific check
|
||||
statusCode = resp.StatusCode()
|
||||
if statusCode != http.StatusCreated {
|
||||
return nil, repos, errors.ErrUnknownCode
|
||||
return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113
|
||||
}
|
||||
|
||||
manifestHash[repo] = manifestTag
|
||||
|
@ -18,6 +18,8 @@ func NewPerfRootCmd() *cobra.Command {
|
||||
|
||||
var concurrency, requests int
|
||||
|
||||
var skipCleanup bool
|
||||
|
||||
rootCmd := &cobra.Command{
|
||||
Use: "zb <url>",
|
||||
Short: "`zb`",
|
||||
@ -46,7 +48,7 @@ func NewPerfRootCmd() *cobra.Command {
|
||||
|
||||
requests = concurrency * (requests / concurrency)
|
||||
|
||||
Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR)
|
||||
Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR, skipCleanup)
|
||||
},
|
||||
}
|
||||
|
||||
@ -66,6 +68,8 @@ func NewPerfRootCmd() *cobra.Command {
|
||||
"Number of requests to perform")
|
||||
rootCmd.Flags().StringVarP(&outFmt, "output-format", "o", "",
|
||||
"Output format of test results: stdout (default), json, ci-cd")
|
||||
rootCmd.Flags().BoolVar(&skipCleanup, "skip-cleanup", false,
|
||||
"Clean up pushed repos from remote registry after running benchmark (default true)")
|
||||
|
||||
// "version"
|
||||
rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "Show the version and exit")
|
||||
|
@ -270,6 +270,7 @@ type testFunc func(
|
||||
config testConfig,
|
||||
statsCh chan statsRecord,
|
||||
client *resty.Client,
|
||||
skipCleanup bool,
|
||||
) error
|
||||
|
||||
//nolint:gosec
|
||||
@ -279,6 +280,7 @@ func GetCatalog(
|
||||
config testConfig,
|
||||
statsCh chan statsRecord,
|
||||
client *resty.Client,
|
||||
skipCleanup bool,
|
||||
) error {
|
||||
var repos []string
|
||||
|
||||
@ -336,9 +338,11 @@ func GetCatalog(
|
||||
}
|
||||
|
||||
// clean up
|
||||
err = deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
if !skipCleanup {
|
||||
err = deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -350,6 +354,7 @@ func PushMonolithStreamed(
|
||||
config testConfig,
|
||||
statsCh chan statsRecord,
|
||||
client *resty.Client,
|
||||
skipCleanup bool,
|
||||
) error {
|
||||
var repos []string
|
||||
|
||||
@ -363,9 +368,11 @@ func PushMonolithStreamed(
|
||||
}
|
||||
|
||||
// clean up
|
||||
err := deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
if !skipCleanup {
|
||||
err := deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -377,6 +384,7 @@ func PushChunkStreamed(
|
||||
config testConfig,
|
||||
statsCh chan statsRecord,
|
||||
client *resty.Client,
|
||||
skipCleanup bool,
|
||||
) error {
|
||||
var repos []string
|
||||
|
||||
@ -390,9 +398,11 @@ func PushChunkStreamed(
|
||||
}
|
||||
|
||||
// clean up
|
||||
err := deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
if !skipCleanup {
|
||||
err := deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -404,6 +414,7 @@ func Pull(
|
||||
config testConfig,
|
||||
statsCh chan statsRecord,
|
||||
client *resty.Client,
|
||||
skipCleanup bool,
|
||||
) error {
|
||||
var repos []string
|
||||
|
||||
@ -472,9 +483,11 @@ func Pull(
|
||||
}
|
||||
|
||||
// clean up
|
||||
err := deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
if !skipCleanup {
|
||||
err := deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -486,6 +499,7 @@ func MixedPullAndPush(
|
||||
config testConfig,
|
||||
statsCh chan statsRecord,
|
||||
client *resty.Client,
|
||||
skipCleanup bool,
|
||||
) error {
|
||||
var repos []string
|
||||
|
||||
@ -519,9 +533,11 @@ func MixedPullAndPush(
|
||||
}
|
||||
|
||||
// clean up
|
||||
err = deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
if !skipCleanup {
|
||||
err = deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
@ -633,7 +649,7 @@ var testSuite = []testConfig{ //nolint:gochecknoglobals // used only in this tes
|
||||
func Perf(
|
||||
workdir, url, auth, repo string,
|
||||
concurrency int, requests int,
|
||||
outFmt string, srcIPs string, srcCIDR string,
|
||||
outFmt string, srcIPs string, srcCIDR string, skipCleanup bool,
|
||||
) {
|
||||
json := jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
// logging
|
||||
@ -702,7 +718,10 @@ func Perf(
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
_ = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient)
|
||||
err = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient, skipCleanup)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
|
@ -1045,6 +1045,13 @@ retry:
|
||||
}
|
||||
}
|
||||
|
||||
// also put dedupe blob in cache
|
||||
if err := is.cache.PutBlob(dstDigest, dst); err != nil {
|
||||
is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if err := os.Remove(src); err != nil {
|
||||
is.log.Error().Err(err).Str("src", src).Msg("dedupe: uname to remove blob")
|
||||
|
||||
|
@ -733,6 +733,48 @@ func FuzzFullBlobUpload(f *testing.F) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestStorageCacheErrors(t *testing.T) {
|
||||
Convey("get error in DedupeBlob() when cache.Put() deduped blob", t, func() {
|
||||
log := log.Logger{Logger: zerolog.New(os.Stdout)}
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
|
||||
dir := t.TempDir()
|
||||
|
||||
originRepo := "dedupe1"
|
||||
dedupedRepo := "dedupe2"
|
||||
|
||||
cblob, cdigest := test.GetRandomImageConfig()
|
||||
|
||||
getBlobPath := ""
|
||||
imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
|
||||
true, true, log, metrics, nil, &mocks.CacheMock{
|
||||
PutBlobFn: func(digest godigest.Digest, path string) error {
|
||||
if strings.Contains(path, dedupedRepo) {
|
||||
return errCache
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
GetBlobFn: func(digest godigest.Digest) (string, error) {
|
||||
return getBlobPath, nil
|
||||
},
|
||||
})
|
||||
|
||||
err := imgStore.InitRepo(originRepo)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.InitRepo(dedupedRepo)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.FullBlobUpload(originRepo, bytes.NewReader(cblob), cdigest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
getBlobPath = strings.ReplaceAll(imgStore.BlobPath(originRepo, cdigest), imgStore.RootDir(), "")
|
||||
_, _, err = imgStore.FullBlobUpload(dedupedRepo, bytes.NewReader(cblob), cdigest)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
}
|
||||
|
||||
func FuzzDedupeBlob(f *testing.F) {
|
||||
f.Fuzz(func(t *testing.T, data string) {
|
||||
log := &log.Logger{Logger: zerolog.New(os.Stdout)}
|
||||
@ -1107,8 +1149,15 @@ func TestDedupeLinks(t *testing.T) {
|
||||
UseRelPaths: true,
|
||||
}, log)
|
||||
|
||||
imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
|
||||
testCase.dedupe, true, log, metrics, nil, cacheDriver)
|
||||
var imgStore storage.ImageStore
|
||||
|
||||
if testCase.dedupe {
|
||||
imgStore = local.NewImageStore(dir, false, storage.DefaultGCDelay,
|
||||
testCase.dedupe, true, log, metrics, nil, cacheDriver)
|
||||
} else {
|
||||
imgStore = local.NewImageStore(dir, false, storage.DefaultGCDelay,
|
||||
testCase.dedupe, true, log, metrics, nil, nil)
|
||||
}
|
||||
|
||||
Convey(fmt.Sprintf("Dedupe %t", testCase.dedupe), t, func(c C) {
|
||||
// manifest1
|
||||
@ -1238,6 +1287,16 @@ func TestDedupeLinks(t *testing.T) {
|
||||
So(os.SameFile(fi1, fi2), ShouldEqual, testCase.expected)
|
||||
|
||||
if !testCase.dedupe {
|
||||
Convey("delete blobs from storage/cache should work when dedupe is false", func() {
|
||||
So(blobDigest1, ShouldEqual, blobDigest2)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe1", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest1))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe2", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest2))
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("Intrerrupt rebuilding and restart, checking idempotency", func() {
|
||||
for i := 0; i < 10; i++ {
|
||||
taskScheduler, cancel := runAndGetScheduler()
|
||||
@ -1358,6 +1417,16 @@ func TestDedupeLinks(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
Convey("delete blobs from storage/cache should work when dedupe is true", func() {
|
||||
So(blobDigest1, ShouldEqual, blobDigest2)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe1", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest1))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe2", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest2))
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("storage and cache inconsistency", func() {
|
||||
// delete blobs
|
||||
err = os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
|
||||
|
@ -1358,11 +1358,13 @@ func (is *ObjectStorage) DeleteBlob(repo string, digest godigest.Digest) error {
|
||||
}
|
||||
|
||||
// remove cache entry and move blob contents to the next candidate if there is any
|
||||
if err := is.cache.DeleteBlob(digest, blobPath); err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest.String()).Str("blobPath", blobPath).
|
||||
Msg("unable to remove blob path from cache")
|
||||
if ok := is.cache.HasBlob(digest, blobPath); ok {
|
||||
if err := is.cache.DeleteBlob(digest, blobPath); err != nil {
|
||||
is.log.Error().Err(err).Str("digest", digest.String()).Str("blobPath", blobPath).
|
||||
Msg("unable to remove blob path from cache")
|
||||
|
||||
return err
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// if the deleted blob is one with content
|
||||
|
@ -1400,6 +1400,16 @@ func TestS3Dedupe(t *testing.T) {
|
||||
// deduped blob should be of size 0
|
||||
So(fi2.Size(), ShouldEqual, 0)
|
||||
|
||||
Convey("delete blobs from storage/cache should work when dedupe is true", func() {
|
||||
So(blobDigest1, ShouldEqual, blobDigest2)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe1", blobDigest1)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe2", blobDigest2)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("Check that delete blobs moves the real content to the next contenders", func() {
|
||||
// if we delete blob1, the content should be moved to blob2
|
||||
err = imgStore.DeleteBlob("dedupe1", blobDigest1)
|
||||
@ -1537,6 +1547,19 @@ func TestS3Dedupe(t *testing.T) {
|
||||
// the new blob with dedupe false should be equal with the origin blob from dedupe1
|
||||
So(fi1.Size(), ShouldEqual, fi3.Size())
|
||||
|
||||
Convey("delete blobs from storage/cache should work when dedupe is false", func() {
|
||||
So(blobDigest1, ShouldEqual, blobDigest2)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe1", blobDigest1)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe2", blobDigest2)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe3", blobDigest2)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl
|
||||
taskScheduler, cancel := runAndGetScheduler()
|
||||
|
||||
@ -1749,6 +1772,16 @@ func TestS3Dedupe(t *testing.T) {
|
||||
// deduped blob should be of size 0
|
||||
So(fi2.Size(), ShouldEqual, 0)
|
||||
|
||||
Convey("delete blobs from storage/cache should work when dedupe is true", func() {
|
||||
So(blobDigest1, ShouldEqual, blobDigest2)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe1", blobDigest1)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe2", blobDigest2)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl
|
||||
taskScheduler, cancel := runAndGetScheduler()
|
||||
|
||||
@ -1777,6 +1810,16 @@ func TestS3Dedupe(t *testing.T) {
|
||||
So(err, ShouldBeNil)
|
||||
So(len(blobContent), ShouldEqual, fi1.Size())
|
||||
|
||||
Convey("delete blobs from storage/cache should work when dedupe is false", func() {
|
||||
So(blobDigest1, ShouldEqual, blobDigest2)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe1", blobDigest1)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DeleteBlob("dedupe2", blobDigest2)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("rebuild s3 dedupe index from false to true", func() {
|
||||
taskScheduler, cancel := runAndGetScheduler()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user