diff --git a/cmd/zb/helper.go b/cmd/zb/helper.go index 3f7df5bf..a9ec1988 100644 --- a/cmd/zb/helper.go +++ b/cmd/zb/helper.go @@ -3,6 +3,7 @@ package main import ( "bytes" "encoding/json" + "errors" "fmt" "io" "log" @@ -18,21 +19,83 @@ import ( ispec "github.com/opencontainers/image-spec/specs-go/v1" "gopkg.in/resty.v1" - "zotregistry.io/zot/errors" + "zotregistry.io/zot/pkg/api" "zotregistry.io/zot/pkg/test" ) +func makeHTTPGetRequest(url string, resultPtr interface{}, client *resty.Client) error { + resp, err := client.R().Get(url) + if err != nil { + return err + } + + if resp.StatusCode() != http.StatusOK { + log.Printf("unable to make GET request on %s, response status code: %d", url, resp.StatusCode()) + + return errors.New(string(resp.Body())) //nolint: goerr113 + } + + err = json.Unmarshal(resp.Body(), resultPtr) + if err != nil { + return err + } + + return nil +} + +func makeHTTPDeleteRequest(url string, client *resty.Client) error { + resp, err := client.R().Delete(url) + if err != nil { + return err + } + + if resp.StatusCode() != http.StatusAccepted { + log.Printf("unable to make DELETE request on %s, response status code: %d", url, resp.StatusCode()) + + return errors.New(string(resp.Body())) //nolint: goerr113 + } + + return nil +} + func deleteTestRepo(repos []string, url string, client *resty.Client) error { for _, repo := range repos { - resp, err := client.R().Delete((fmt.Sprintf("%s/v2/%s/", url, repo))) + var tags api.ImageTags + + // get tags + err := makeHTTPGetRequest(fmt.Sprintf("%s/v2/%s/tags/list", url, repo), &tags, client) if err != nil { return err } - // request specific check - statusCode := resp.StatusCode() - if statusCode != http.StatusAccepted { - return errors.ErrUnknownCode + for _, tag := range tags.Tags { + var manifest ispec.Manifest + + // first get tag manifest to get containing blobs + err := makeHTTPGetRequest(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, tag), &manifest, client) + if err != nil { + return err + } + + // delete blobs + for _, blob := range manifest.Layers { + err := makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, blob.Digest.String()), client) + if err != nil { + return err + } + } + + // delete config blob + err = makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/blobs/%s", url, repo, manifest.Config.Digest.String()), client) + if err != nil { + return err + } + + // delete manifest + err = makeHTTPDeleteRequest(fmt.Sprintf("%s/v2/%s/manifests/%s", url, repo, tag), client) + if err != nil { + return err + } } } @@ -273,7 +336,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo // request specific check statusCode = resp.StatusCode() if statusCode != http.StatusAccepted { - return nil, repos, errors.ErrUnknownCode + return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113 } loc := test.Location(url, resp) @@ -311,7 +374,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo // request specific check statusCode = resp.StatusCode() if statusCode != http.StatusCreated { - return nil, repos, errors.ErrUnknownCode + return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113 } // upload image config blob @@ -325,7 +388,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo // request specific check statusCode = resp.StatusCode() if statusCode != http.StatusAccepted { - return nil, repos, errors.ErrUnknownCode + return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113 } loc = test.Location(url, resp) @@ -345,7 +408,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo // request specific check statusCode = resp.StatusCode() if statusCode != http.StatusCreated { - return nil, repos, errors.ErrUnknownCode + return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113 } // create a manifest @@ -388,7 +451,7 @@ func pushMonolithImage(workdir, url, trepo string, repos []string, config testCo // request specific check statusCode = resp.StatusCode() if statusCode != http.StatusCreated { - return nil, repos, errors.ErrUnknownCode + return nil, repos, errors.New(string(resp.Body())) //nolint: goerr113 } manifestHash[repo] = manifestTag diff --git a/cmd/zb/main.go b/cmd/zb/main.go index a802d4f4..a520b9ad 100644 --- a/cmd/zb/main.go +++ b/cmd/zb/main.go @@ -18,6 +18,8 @@ func NewPerfRootCmd() *cobra.Command { var concurrency, requests int + var skipCleanup bool + rootCmd := &cobra.Command{ Use: "zb ", Short: "`zb`", @@ -46,7 +48,7 @@ func NewPerfRootCmd() *cobra.Command { requests = concurrency * (requests / concurrency) - Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR) + Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR, skipCleanup) }, } @@ -66,6 +68,8 @@ func NewPerfRootCmd() *cobra.Command { "Number of requests to perform") rootCmd.Flags().StringVarP(&outFmt, "output-format", "o", "", "Output format of test results: stdout (default), json, ci-cd") + rootCmd.Flags().BoolVar(&skipCleanup, "skip-cleanup", false, + "Clean up pushed repos from remote registry after running benchmark (default true)") // "version" rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "Show the version and exit") diff --git a/cmd/zb/perf.go b/cmd/zb/perf.go index 4fbf6c8e..ad0edff9 100644 --- a/cmd/zb/perf.go +++ b/cmd/zb/perf.go @@ -270,6 +270,7 @@ type testFunc func( config testConfig, statsCh chan statsRecord, client *resty.Client, + skipCleanup bool, ) error //nolint:gosec @@ -279,6 +280,7 @@ func GetCatalog( config testConfig, statsCh chan statsRecord, client *resty.Client, + skipCleanup bool, ) error { var repos []string @@ -336,9 +338,11 @@ func GetCatalog( } // clean up - err = deleteTestRepo(repos, url, client) - if err != nil { - return err + if !skipCleanup { + err = deleteTestRepo(repos, url, client) + if err != nil { + return err + } } return nil @@ -350,6 +354,7 @@ func PushMonolithStreamed( config testConfig, statsCh chan statsRecord, client *resty.Client, + skipCleanup bool, ) error { var repos []string @@ -363,9 +368,11 @@ func PushMonolithStreamed( } // clean up - err := deleteTestRepo(repos, url, client) - if err != nil { - return err + if !skipCleanup { + err := deleteTestRepo(repos, url, client) + if err != nil { + return err + } } return nil @@ -377,6 +384,7 @@ func PushChunkStreamed( config testConfig, statsCh chan statsRecord, client *resty.Client, + skipCleanup bool, ) error { var repos []string @@ -390,9 +398,11 @@ func PushChunkStreamed( } // clean up - err := deleteTestRepo(repos, url, client) - if err != nil { - return err + if !skipCleanup { + err := deleteTestRepo(repos, url, client) + if err != nil { + return err + } } return nil @@ -404,6 +414,7 @@ func Pull( config testConfig, statsCh chan statsRecord, client *resty.Client, + skipCleanup bool, ) error { var repos []string @@ -472,9 +483,11 @@ func Pull( } // clean up - err := deleteTestRepo(repos, url, client) - if err != nil { - return err + if !skipCleanup { + err := deleteTestRepo(repos, url, client) + if err != nil { + return err + } } return nil @@ -486,6 +499,7 @@ func MixedPullAndPush( config testConfig, statsCh chan statsRecord, client *resty.Client, + skipCleanup bool, ) error { var repos []string @@ -519,9 +533,11 @@ func MixedPullAndPush( } // clean up - err = deleteTestRepo(repos, url, client) - if err != nil { - return err + if !skipCleanup { + err = deleteTestRepo(repos, url, client) + if err != nil { + return err + } } return nil @@ -633,7 +649,7 @@ var testSuite = []testConfig{ //nolint:gochecknoglobals // used only in this tes func Perf( workdir, url, auth, repo string, concurrency int, requests int, - outFmt string, srcIPs string, srcCIDR string, + outFmt string, srcIPs string, srcCIDR string, skipCleanup bool, ) { json := jsoniter.ConfigCompatibleWithStandardLibrary // logging @@ -702,7 +718,10 @@ func Perf( log.Fatal(err) } - _ = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient) + err = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient, skipCleanup) + if err != nil { + log.Fatal(err) + } }() } wg.Wait() diff --git a/pkg/storage/local/local.go b/pkg/storage/local/local.go index 6eed01fa..c2b2c4db 100644 --- a/pkg/storage/local/local.go +++ b/pkg/storage/local/local.go @@ -1045,6 +1045,13 @@ retry: } } + // also put dedupe blob in cache + if err := is.cache.PutBlob(dstDigest, dst); err != nil { + is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record") + + return err + } + if err := os.Remove(src); err != nil { is.log.Error().Err(err).Str("src", src).Msg("dedupe: uname to remove blob") diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 677376b4..60879506 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -733,6 +733,48 @@ func FuzzFullBlobUpload(f *testing.F) { }) } +func TestStorageCacheErrors(t *testing.T) { + Convey("get error in DedupeBlob() when cache.Put() deduped blob", t, func() { + log := log.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + + dir := t.TempDir() + + originRepo := "dedupe1" + dedupedRepo := "dedupe2" + + cblob, cdigest := test.GetRandomImageConfig() + + getBlobPath := "" + imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay, + true, true, log, metrics, nil, &mocks.CacheMock{ + PutBlobFn: func(digest godigest.Digest, path string) error { + if strings.Contains(path, dedupedRepo) { + return errCache + } + + return nil + }, + GetBlobFn: func(digest godigest.Digest) (string, error) { + return getBlobPath, nil + }, + }) + + err := imgStore.InitRepo(originRepo) + So(err, ShouldBeNil) + + err = imgStore.InitRepo(dedupedRepo) + So(err, ShouldBeNil) + + _, _, err = imgStore.FullBlobUpload(originRepo, bytes.NewReader(cblob), cdigest) + So(err, ShouldBeNil) + + getBlobPath = strings.ReplaceAll(imgStore.BlobPath(originRepo, cdigest), imgStore.RootDir(), "") + _, _, err = imgStore.FullBlobUpload(dedupedRepo, bytes.NewReader(cblob), cdigest) + So(err, ShouldNotBeNil) + }) +} + func FuzzDedupeBlob(f *testing.F) { f.Fuzz(func(t *testing.T, data string) { log := &log.Logger{Logger: zerolog.New(os.Stdout)} @@ -1107,8 +1149,15 @@ func TestDedupeLinks(t *testing.T) { UseRelPaths: true, }, log) - imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay, - testCase.dedupe, true, log, metrics, nil, cacheDriver) + var imgStore storage.ImageStore + + if testCase.dedupe { + imgStore = local.NewImageStore(dir, false, storage.DefaultGCDelay, + testCase.dedupe, true, log, metrics, nil, cacheDriver) + } else { + imgStore = local.NewImageStore(dir, false, storage.DefaultGCDelay, + testCase.dedupe, true, log, metrics, nil, nil) + } Convey(fmt.Sprintf("Dedupe %t", testCase.dedupe), t, func(c C) { // manifest1 @@ -1238,6 +1287,16 @@ func TestDedupeLinks(t *testing.T) { So(os.SameFile(fi1, fi2), ShouldEqual, testCase.expected) if !testCase.dedupe { + Convey("delete blobs from storage/cache should work when dedupe is false", func() { + So(blobDigest1, ShouldEqual, blobDigest2) + + err = imgStore.DeleteBlob("dedupe1", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest1)) + So(err, ShouldBeNil) + + err = imgStore.DeleteBlob("dedupe2", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest2)) + So(err, ShouldBeNil) + }) + Convey("Intrerrupt rebuilding and restart, checking idempotency", func() { for i := 0; i < 10; i++ { taskScheduler, cancel := runAndGetScheduler() @@ -1358,6 +1417,16 @@ func TestDedupeLinks(t *testing.T) { }) } + Convey("delete blobs from storage/cache should work when dedupe is true", func() { + So(blobDigest1, ShouldEqual, blobDigest2) + + err = imgStore.DeleteBlob("dedupe1", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest1)) + So(err, ShouldBeNil) + + err = imgStore.DeleteBlob("dedupe2", godigest.NewDigestFromEncoded(godigest.SHA256, blobDigest2)) + So(err, ShouldBeNil) + }) + Convey("storage and cache inconsistency", func() { // delete blobs err = os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) diff --git a/pkg/storage/s3/s3.go b/pkg/storage/s3/s3.go index f01b028a..faf23735 100644 --- a/pkg/storage/s3/s3.go +++ b/pkg/storage/s3/s3.go @@ -1358,11 +1358,13 @@ func (is *ObjectStorage) DeleteBlob(repo string, digest godigest.Digest) error { } // remove cache entry and move blob contents to the next candidate if there is any - if err := is.cache.DeleteBlob(digest, blobPath); err != nil { - is.log.Error().Err(err).Str("digest", digest.String()).Str("blobPath", blobPath). - Msg("unable to remove blob path from cache") + if ok := is.cache.HasBlob(digest, blobPath); ok { + if err := is.cache.DeleteBlob(digest, blobPath); err != nil { + is.log.Error().Err(err).Str("digest", digest.String()).Str("blobPath", blobPath). + Msg("unable to remove blob path from cache") - return err + return err + } } // if the deleted blob is one with content diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index a152805a..5b1c338e 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -1400,6 +1400,16 @@ func TestS3Dedupe(t *testing.T) { // deduped blob should be of size 0 So(fi2.Size(), ShouldEqual, 0) + Convey("delete blobs from storage/cache should work when dedupe is true", func() { + So(blobDigest1, ShouldEqual, blobDigest2) + + err = imgStore.DeleteBlob("dedupe1", blobDigest1) + So(err, ShouldBeNil) + + err = imgStore.DeleteBlob("dedupe2", blobDigest2) + So(err, ShouldBeNil) + }) + Convey("Check that delete blobs moves the real content to the next contenders", func() { // if we delete blob1, the content should be moved to blob2 err = imgStore.DeleteBlob("dedupe1", blobDigest1) @@ -1537,6 +1547,19 @@ func TestS3Dedupe(t *testing.T) { // the new blob with dedupe false should be equal with the origin blob from dedupe1 So(fi1.Size(), ShouldEqual, fi3.Size()) + Convey("delete blobs from storage/cache should work when dedupe is false", func() { + So(blobDigest1, ShouldEqual, blobDigest2) + + err = imgStore.DeleteBlob("dedupe1", blobDigest1) + So(err, ShouldBeNil) + + err = imgStore.DeleteBlob("dedupe2", blobDigest2) + So(err, ShouldBeNil) + + err = imgStore.DeleteBlob("dedupe3", blobDigest2) + So(err, ShouldBeNil) + }) + Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl taskScheduler, cancel := runAndGetScheduler() @@ -1749,6 +1772,16 @@ func TestS3Dedupe(t *testing.T) { // deduped blob should be of size 0 So(fi2.Size(), ShouldEqual, 0) + Convey("delete blobs from storage/cache should work when dedupe is true", func() { + So(blobDigest1, ShouldEqual, blobDigest2) + + err = imgStore.DeleteBlob("dedupe1", blobDigest1) + So(err, ShouldBeNil) + + err = imgStore.DeleteBlob("dedupe2", blobDigest2) + So(err, ShouldBeNil) + }) + Convey("rebuild s3 dedupe index from true to false", func() { //nolint: dupl taskScheduler, cancel := runAndGetScheduler() @@ -1777,6 +1810,16 @@ func TestS3Dedupe(t *testing.T) { So(err, ShouldBeNil) So(len(blobContent), ShouldEqual, fi1.Size()) + Convey("delete blobs from storage/cache should work when dedupe is false", func() { + So(blobDigest1, ShouldEqual, blobDigest2) + + err = imgStore.DeleteBlob("dedupe1", blobDigest1) + So(err, ShouldBeNil) + + err = imgStore.DeleteBlob("dedupe2", blobDigest2) + So(err, ShouldBeNil) + }) + Convey("rebuild s3 dedupe index from false to true", func() { taskScheduler, cancel := runAndGetScheduler()