diff --git a/.github/workflows/ci-cd.yml b/.github/workflows/ci-cd.yml index 1614dcb4..20315406 100644 --- a/.github/workflows/ci-cd.yml +++ b/.github/workflows/ci-cd.yml @@ -43,15 +43,18 @@ jobs: - name: Install dependencies run: | cd $GITHUB_WORKSPACE - go get -u github.com/swaggo/swag/cmd/swag + go install github.com/swaggo/swag/cmd/swag@latest go mod download sudo apt-get update sudo apt-get install rpm - sudo apt install snapd - sudo apt-get install skopeo + sudo apt-get install snapd + sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config + git clone https://github.com/containers/skopeo -b v1.6.1 $GITHUB_WORKSPACE/src/github.com/containers/skopeo + cd $GITHUB_WORKSPACE/src/github.com/containers/skopeo && make bin/skopeo + cd $GITHUB_WORKSPACE curl -Lo notation.tar.gz https://github.com/notaryproject/notation/releases/download/v0.7.1-alpha.1/notation_0.7.1-alpha.1_linux_amd64.tar.gz sudo tar xvzf notation.tar.gz -C /usr/bin notation - go get github.com/wadey/gocovmerge + go install github.com/wadey/gocovmerge@latest - name: Run build and test timeout-minutes: 60 run: | diff --git a/Makefile b/Makefile index dbadaf0a..8cd8cf2d 100644 --- a/Makefile +++ b/Makefile @@ -54,8 +54,9 @@ test: check-skopeo $(NOTATION) go test -tags extended,containers_image_openpgp -v -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./... go test -tags minimal,containers_image_openpgp -v -trimpath -race -cover -coverpkg ./... -coverprofile=coverage-minimal.txt -covermode=atomic ./... # development-mode unit tests possibly using failure injection - go test -tags dev,extended,containers_image_openpgp -v -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-dev-extended.txt -covermode=atomic ./pkg/test/... ./pkg/storage/... ./pkg/extensions/sync/... -run ^TestInject + go test -tags dev,extended,containers_image_openpgp -v -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-dev-extended.txt -covermode=atomic ./pkg/test/... ./pkg/api/... ./pkg/storage/... ./pkg/extensions/sync/... -run ^TestInject go test -tags dev,minimal,containers_image_openpgp -v -trimpath -race -cover -coverpkg ./... -coverprofile=coverage-dev-minimal.txt -covermode=atomic ./pkg/test/... ./pkg/storage/... ./pkg/extensions/sync/... -run ^TestInject + go test -tags stress,extended,containers_image_openpgp -v -trimpath -race -timeout 15m ./pkg/cli/stress_test.go .PHONY: run-bench run-bench: binary bench @@ -95,6 +96,7 @@ check: ./golangcilint.yaml $(GOLINTER) $(GOLINTER) --config ./golangcilint.yaml run --enable-all --out-format=colored-line-number --build-tags extended,containers_image_openpgp ./... $(GOLINTER) --config ./golangcilint.yaml run --enable-all --out-format=colored-line-number --build-tags dev,minimal,containers_image_openpgp ./... $(GOLINTER) --config ./golangcilint.yaml run --enable-all --out-format=colored-line-number --build-tags dev,extended,containers_image_openpgp ./... + $(GOLINTER) --config ./golangcilint.yaml run --enable-all --out-format=colored-line-number --build-tags stress,extended,containers_image_openpgp ./... swagger/docs.go: swag -v || go install github.com/swaggo/swag/cmd/swag diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 3eaaabee..bed76546 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -5,6 +5,7 @@ package api_test import ( "bufio" + "bytes" "context" "crypto/tls" "crypto/x509" @@ -4124,6 +4125,353 @@ func TestStorageCommit(t *testing.T) { }) } +func TestInjectInterruptedImageManifest(t *testing.T) { + Convey("Make a new controller", t, func() { + port := test.GetFreePort() + baseURL := test.GetBaseURL(port) + conf := config.New() + conf.HTTP.Port = port + + ctlr := api.NewController(conf) + dir := t.TempDir() + ctlr.Config.Storage.RootDirectory = dir + + go startServer(ctlr) + defer stopServer(ctlr) + test.WaitTillServerReady(baseURL) + + rthdlr := api.NewRouteHandler(ctlr) + + Convey("Upload a blob & a config blob; Create an image manifest", func() { + // create a blob/layer + resp, err := resty.R().Post(baseURL + "/v2/repotest/blobs/uploads/") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusAccepted) + loc := test.Location(baseURL, resp) + So(loc, ShouldNotBeEmpty) + + // since we are not specifying any prefix i.e provided in config while starting server, + // so it should store repotest to global root dir + _, err = os.Stat(path.Join(dir, "repotest")) + So(err, ShouldBeNil) + + resp, err = resty.R().Get(loc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNoContent) + content := []byte("this is a dummy blob") + digest := godigest.FromBytes(content) + So(digest, ShouldNotBeNil) + // monolithic blob upload: success + resp, err = resty.R().SetQueryParam("digest", digest.String()). + SetHeader("Content-Type", "application/octet-stream").SetBody(content).Put(loc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusCreated) + blobLoc := resp.Header().Get("Location") + So(blobLoc, ShouldNotBeEmpty) + So(resp.Header().Get("Content-Length"), ShouldEqual, "0") + So(resp.Header().Get(constants.DistContentDigestKey), ShouldNotBeEmpty) + + // upload image config blob + resp, err = resty.R().Post(baseURL + "/v2/repotest/blobs/uploads/") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusAccepted) + loc = test.Location(baseURL, resp) + cblob, cdigest := test.GetRandomImageConfig() + + resp, err = resty.R(). + SetContentLength(true). + SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", cdigest.String()). + SetBody(cblob). + Put(loc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusCreated) + + // create a manifest + manifest := ispec.Manifest{ + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(len(content)), + }, + }, + } + manifest.SchemaVersion = 2 + content, err = json.Marshal(manifest) + So(err, ShouldBeNil) + digest = godigest.FromBytes(content) + So(digest, ShouldNotBeNil) + + // Testing router path: @Router /v2/{name}/manifests/{reference} [put] + Convey("Uploading an image manifest blob (when injected simulates an interrupted image manifest upload)", func() { + injected := test.InjectFailure(0) + + request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content)) + request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"}) + request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json") + response := httptest.NewRecorder() + + rthdlr.UpdateManifest(response, request) + + resp := response.Result() + defer resp.Body.Close() + + So(resp, ShouldNotBeNil) + + if injected { + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) + } else { + So(resp.StatusCode, ShouldEqual, http.StatusCreated) + } + }) + }) + }) +} + +func TestInjectTooManyOpenFiles(t *testing.T) { + Convey("Make a new controller", t, func() { + port := test.GetFreePort() + baseURL := test.GetBaseURL(port) + conf := config.New() + conf.HTTP.Port = port + + ctlr := api.NewController(conf) + dir := t.TempDir() + ctlr.Config.Storage.RootDirectory = dir + + go startServer(ctlr) + defer stopServer(ctlr) + test.WaitTillServerReady(baseURL) + + rthdlr := api.NewRouteHandler(ctlr) + + // create a blob/layer + resp, err := resty.R().Post(baseURL + "/v2/repotest/blobs/uploads/") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusAccepted) + loc := test.Location(baseURL, resp) + So(loc, ShouldNotBeEmpty) + + // since we are not specifying any prefix i.e provided in config while starting server, + // so it should store repotest to global root dir + _, err = os.Stat(path.Join(dir, "repotest")) + So(err, ShouldBeNil) + + resp, err = resty.R().Get(loc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusNoContent) + content := []byte("this is a dummy blob") + digest := godigest.FromBytes(content) + So(digest, ShouldNotBeNil) + + // monolithic blob upload + injected := test.InjectFailure(0) + if injected { + request, _ := http.NewRequestWithContext(context.TODO(), "PUT", loc, bytes.NewReader(content)) + tokens := strings.Split(loc, "/") + request = mux.SetURLVars(request, map[string]string{"name": "repotest", "session_id": tokens[len(tokens)-1]}) + q := request.URL.Query() + q.Add("digest", digest.String()) + request.URL.RawQuery = q.Encode() + request.Header.Set("Content-Type", "application/octet-stream") + request.Header.Set("Content-Length", fmt.Sprintf("%d", len(content))) + response := httptest.NewRecorder() + + rthdlr.UpdateBlobUpload(response, request) + + resp := response.Result() + defer resp.Body.Close() + + So(resp, ShouldNotBeNil) + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) + } else { + resp, err = resty.R().SetQueryParam("digest", digest.String()). + SetHeader("Content-Type", "application/octet-stream"). + SetBody(content).Put(loc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusCreated) + + blobLoc := resp.Header().Get("Location") + So(blobLoc, ShouldNotBeEmpty) + So(resp.Header().Get("Content-Length"), ShouldEqual, "0") + So(resp.Header().Get(constants.DistContentDigestKey), ShouldNotBeEmpty) + } + + // upload image config blob + resp, err = resty.R().Post(baseURL + "/v2/repotest/blobs/uploads/") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusAccepted) + loc = test.Location(baseURL, resp) + cblob, cdigest := test.GetRandomImageConfig() + + resp, err = resty.R(). + SetContentLength(true). + SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", cdigest.String()). + SetBody(cblob). + Put(loc) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusCreated) + + // create a manifest + manifest := ispec.Manifest{ + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(len(content)), + }, + }, + } + manifest.SchemaVersion = 2 + content, err = json.Marshal(manifest) + So(err, ShouldBeNil) + digest = godigest.FromBytes(content) + So(digest, ShouldNotBeNil) + + // Testing router path: @Router /v2/{name}/manifests/{reference} [put] + // nolint: lll + Convey("Uploading an image manifest blob (when injected simulates that PutImageManifest failed due to 'too many open files' error)", func() { + injected := test.InjectFailure(1) + + request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content)) + request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"}) + request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json") + response := httptest.NewRecorder() + + rthdlr.UpdateManifest(response, request) + + resp := response.Result() + So(resp, ShouldNotBeNil) + defer resp.Body.Close() + + if injected { + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) + } else { + So(resp.StatusCode, ShouldEqual, http.StatusCreated) + } + }) + Convey("when injected simulates a `too many open files` error inside PutImageManifest method of img store", func() { + injected := test.InjectFailure(2) + + request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content)) + request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"}) + request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json") + response := httptest.NewRecorder() + + rthdlr.UpdateManifest(response, request) + + resp := response.Result() + defer resp.Body.Close() + + So(resp, ShouldNotBeNil) + + if injected { + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) + } else { + So(resp.StatusCode, ShouldEqual, http.StatusCreated) + } + }) + Convey("code coverage: error inside PutImageManifest method of img store (unable to marshal JSON)", func() { + injected := test.InjectFailure(1) + + request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content)) + request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"}) + request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json") + response := httptest.NewRecorder() + + rthdlr.UpdateManifest(response, request) + + resp := response.Result() + defer resp.Body.Close() + + So(resp, ShouldNotBeNil) + + if injected { + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) + } else { + So(resp.StatusCode, ShouldEqual, http.StatusCreated) + } + }) + Convey("code coverage: error inside PutImageManifest method of img store (umoci.OpenLayout error)", func() { + injected := test.InjectFailure(3) + + request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content)) + request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"}) + request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json") + response := httptest.NewRecorder() + + rthdlr.UpdateManifest(response, request) + + resp := response.Result() + defer resp.Body.Close() + + So(resp, ShouldNotBeNil) + + if injected { + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) + } else { + So(resp.StatusCode, ShouldEqual, http.StatusCreated) + } + }) + Convey("code coverage: error inside PutImageManifest method of img store (oci.GC)", func() { + injected := test.InjectFailure(4) + + request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content)) + request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"}) + request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json") + response := httptest.NewRecorder() + + rthdlr.UpdateManifest(response, request) + + resp := response.Result() + defer resp.Body.Close() + + So(resp, ShouldNotBeNil) + + if injected { + So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError) + } else { + So(resp.StatusCode, ShouldEqual, http.StatusCreated) + } + }) + Convey("when index.json is not in json format", func() { + resp, err = resty.R().SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). + SetBody(content).Put(baseURL + "/v2/repotest/manifests/v1.0") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusCreated) + digestHdr := resp.Header().Get(constants.DistContentDigestKey) + So(digestHdr, ShouldNotBeEmpty) + So(digestHdr, ShouldEqual, digest.String()) + + indexFile := path.Join(dir, "repotest", "index.json") + _, err = os.Stat(indexFile) + So(err, ShouldBeNil) + indexContent := []byte(`not a JSON content`) + err = ioutil.WriteFile(indexFile, indexContent, 0o600) + So(err, ShouldBeNil) + + resp, err = resty.R().SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json"). + SetBody(content).Put(baseURL + "/v2/repotest/manifests/v1.1") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError) + }) + }) +} + func getAllBlobs(imagePath string) []string { blobList := make([]string, 0) diff --git a/pkg/api/errors.go b/pkg/api/errors.go index 5f20bca7..c0bfb53d 100644 --- a/pkg/api/errors.go +++ b/pkg/api/errors.go @@ -34,6 +34,7 @@ const ( UNAUTHORIZED DENIED UNSUPPORTED + INVALID_INDEX ) func (e ErrorCode) String() string { @@ -53,6 +54,7 @@ func (e ErrorCode) String() string { UNAUTHORIZED: "UNAUTHORIZED", DENIED: "DENIED", UNSUPPORTED: "UNSUPPORTED", + INVALID_INDEX: "INVALID_INDEX", } return errMap[e] @@ -152,6 +154,11 @@ func NewError(code ErrorCode, detail ...interface{}) Error { //nolint: interface Description: `The operation was unsupported due to a missing implementation or invalid set of parameters.`, }, + + INVALID_INDEX: { + Message: "Invalid format of index.json file of the repo", + Description: "index.json file does not contain data in json format", + }, } err, ok := errMap[code] diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 06796507..6d093e08 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -33,6 +33,7 @@ import ( ext "zotregistry.io/zot/pkg/extensions" "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/storage" + "zotregistry.io/zot/pkg/test" // as required by swaggo. _ "zotregistry.io/zot/swagger" @@ -428,7 +429,9 @@ func (rh *RouteHandler) UpdateManifest(response http.ResponseWriter, request *ht } body, err := ioutil.ReadAll(request.Body) - if err != nil { + // hard to reach test case, injected error (simulates an interrupted image manifest upload) + // err could be io.ErrUnexpectedEOF + if err := test.Error(err); err != nil { rh.c.Log.Error().Err(err).Msg("unexpected error") response.WriteHeader(http.StatusInternalServerError) @@ -449,8 +452,20 @@ func (rh *RouteHandler) UpdateManifest(response http.ResponseWriter, request *ht } else if errors.Is(err, zerr.ErrBlobNotFound) { WriteJSON(response, http.StatusBadRequest, NewErrorList(NewError(BLOB_UNKNOWN, map[string]string{"blob": digest}))) + } else if errors.Is(err, zerr.ErrRepoBadVersion) { + WriteJSON(response, http.StatusInternalServerError, + NewErrorList(NewError(INVALID_INDEX, map[string]string{"name": name}))) } else { - rh.c.Log.Error().Err(err).Msg("unexpected error") + // could be syscall.EMFILE (Err:0x18 too many opened files), etc + rh.c.Log.Error().Err(err).Msg("unexpected error: performing cleanup") + + if err = imgStore.DeleteImageManifest(name, reference); err != nil { + // deletion of image manifest is important, but not critical for image repo consistancy + // in the worst scenario a partial manifest file written to disk will not affect the repo because + // the new manifest was not added to "index.json" file (it is possible that GC will take care of it) + rh.c.Log.Error().Err(err).Msgf("couldn't remove image manifest %s in repo %s", reference, name) + } + response.WriteHeader(http.StatusInternalServerError) } @@ -954,15 +969,7 @@ func (rh *RouteHandler) PatchBlobUpload(response http.ResponseWriter, request *h } if err != nil { - if errors.Is(err, io.ErrUnexpectedEOF) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain - rh.c.Log.Warn().Msg("received unexpected EOF, removing .uploads/ files") - - if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil { - rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name) - } - - response.WriteHeader(http.StatusInternalServerError) - } else if errors.Is(err, zerr.ErrBadUploadRange) { + if errors.Is(err, zerr.ErrBadUploadRange) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain WriteJSON(response, http.StatusRequestedRangeNotSatisfiable, NewErrorList(NewError(BLOB_UPLOAD_INVALID, map[string]string{"session_id": sessionID}))) } else if errors.Is(err, zerr.ErrRepoNotFound) { @@ -972,7 +979,12 @@ func (rh *RouteHandler) PatchBlobUpload(response http.ResponseWriter, request *h WriteJSON(response, http.StatusNotFound, NewErrorList(NewError(BLOB_UPLOAD_UNKNOWN, map[string]string{"session_id": sessionID}))) } else { - rh.c.Log.Error().Err(err).Msg("unexpected error") + // could be io.ErrUnexpectedEOF, syscall.EMFILE (Err:0x18 too many opened files), etc + rh.c.Log.Error().Err(err).Msg("unexpected error: removing .uploads/ files") + + if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil { + rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name) + } response.WriteHeader(http.StatusInternalServerError) } @@ -1071,15 +1083,7 @@ func (rh *RouteHandler) UpdateBlobUpload(response http.ResponseWriter, request * _, err = imgStore.PutBlobChunk(name, sessionID, from, to, request.Body) if err != nil { - if errors.Is(err, io.ErrUnexpectedEOF) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain - rh.c.Log.Warn().Msg("received unexpected EOF, removing .uploads/ files") - - if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil { - rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name) - } - - response.WriteHeader(http.StatusInternalServerError) - } else if errors.Is(err, zerr.ErrBadUploadRange) { + if errors.Is(err, zerr.ErrBadUploadRange) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain WriteJSON(response, http.StatusBadRequest, NewErrorList(NewError(BLOB_UPLOAD_INVALID, map[string]string{"session_id": sessionID}))) } else if errors.Is(err, zerr.ErrRepoNotFound) { @@ -1089,7 +1093,12 @@ func (rh *RouteHandler) UpdateBlobUpload(response http.ResponseWriter, request * WriteJSON(response, http.StatusNotFound, NewErrorList(NewError(BLOB_UPLOAD_UNKNOWN, map[string]string{"session_id": sessionID}))) } else { - rh.c.Log.Error().Err(err).Msg("unexpected error") + // could be io.ErrUnexpectedEOF, syscall.EMFILE (Err:0x18 too many opened files), etc + rh.c.Log.Error().Err(err).Msg("unexpected error: removing .uploads/ files") + + if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil { + rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name) + } response.WriteHeader(http.StatusInternalServerError) } @@ -1113,7 +1122,12 @@ finish: WriteJSON(response, http.StatusNotFound, NewErrorList(NewError(BLOB_UPLOAD_UNKNOWN, map[string]string{"session_id": sessionID}))) } else { - rh.c.Log.Error().Err(err).Msg("unexpected error") + // could be io.ErrUnexpectedEOF, syscall.EMFILE (Err:0x18 too many opened files), etc + rh.c.Log.Error().Err(err).Msg("unexpected error: removing .uploads/ files") + + if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil { + rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name) + } response.WriteHeader(http.StatusInternalServerError) } diff --git a/pkg/cli/stress_test.go b/pkg/cli/stress_test.go new file mode 100644 index 00000000..56202c15 --- /dev/null +++ b/pkg/cli/stress_test.go @@ -0,0 +1,223 @@ +//go:build stress +// +build stress + +package cli_test + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "os/exec" + "sync" + "syscall" + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" + "zotregistry.io/zot/pkg/api" + "zotregistry.io/zot/pkg/api/config" + "zotregistry.io/zot/pkg/cli" + "zotregistry.io/zot/pkg/test" +) + +const ( + MaxFileDescriptors = 512 + WorkerRunningTime = 60 * time.Second +) + +func TestSressTooManyOpenFiles(t *testing.T) { + oldArgs := os.Args + + defer func() { os.Args = oldArgs }() + + Convey("configure zot with dedupe=false", t, func(c C) { + // In case one of the So()-assertions will fail it will allow us to print + // all the log files to figure out what happened in this test (zot log file, scrub output, storage rootFS tree) + SetDefaultFailureMode(FailureContinues) + + err := setMaxOpenFilesLimit(MaxFileDescriptors) + So(err, ShouldBeNil) + + port := test.GetFreePort() + baseURL := test.GetBaseURL(port) + conf := config.New() + conf.HTTP.Port = port + conf.Storage.Dedupe = false + conf.Storage.GC = true + + logFile, err := ioutil.TempFile("", "zot-log*.txt") + So(err, ShouldBeNil) + + defer func() { + data, err := os.ReadFile(logFile.Name()) + if err != nil { + fmt.Printf("error when reading zot log file:\n%s\n", err) + } + fmt.Printf("\n\n Zot log file content:\n%s\n", string(data)) + os.Remove(logFile.Name()) + }() + fmt.Println("Log file is: ", logFile.Name()) + conf.Log.Output = logFile.Name() + + ctlr := api.NewController(conf) + dir, err := ioutil.TempDir("", "oci-repo-test") + if err != nil { + panic(err) + } + + defer func() { + // list the content of the directory (useful in case of test fail) + cmd := fmt.Sprintf("du -ab %s", dir) + out, err := exec.Command("bash", "-c", cmd).Output() + if err != nil { + fmt.Printf("error when listing storage files:\n%s\n", err) + } + fmt.Printf("Listing Storage root FS:\n%s\n", out) + os.RemoveAll(dir) + }() + + fmt.Println("Storage root dir is: ", dir) + ctlr.Config.Storage.RootDirectory = dir + + go startServer(ctlr) + test.WaitTillServerReady(baseURL) + content := fmt.Sprintf(`{ + "storage": { + "rootDirectory": "%s", + "dedupe": %t, + "gc": %t + }, + "http": { + "address": "127.0.0.1", + "port": "%s" + }, + "log": { + "level": "debug", + "output": "%s" + } + }`, dir, conf.Storage.Dedupe, conf.Storage.GC, port, logFile.Name()) + + cfgfile, err := ioutil.TempFile("", "zot-test*.json") + So(err, ShouldBeNil) + defer os.Remove(cfgfile.Name()) // clean up + _, err = cfgfile.Write([]byte(content)) + So(err, ShouldBeNil) + err = cfgfile.Close() + So(err, ShouldBeNil) + + cmd := fmt.Sprintf("skopeo copy --format=oci --dest-tls-verify=false "+ + "--insecure-policy docker://public.ecr.aws/zomato/alpine:3.11.3 dir:%s/alpine", dir) + out, err := exec.Command("bash", "-c", cmd).Output() + if err != nil { + fmt.Printf("\nCopy skopeo docker image:\n%s\n", out) + fmt.Printf("\nerror on skopeo copy:\n%s\n", err) + } + + So(err, ShouldBeNil) + + var wg sync.WaitGroup + for i := 1; i <= MaxFileDescriptors; i++ { + wg.Add(1) + + i := i + + go func() { + defer wg.Done() + worker(i, port, dir) + }() + } + wg.Wait() + + data, err := os.ReadFile(logFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldContainSubstring, "too many open files") + + stopServer(ctlr) + time.Sleep(2 * time.Second) + + scrubFile, err := ioutil.TempFile("", "zot-scrub*.txt") + So(err, ShouldBeNil) + + defer func() { + data, err := os.ReadFile(scrubFile.Name()) + if err != nil { + fmt.Printf("error when reading zot scrub file:\n%s\n", err) + } + fmt.Printf("\n\n Zot scrub file content:\n%s\n", string(data)) + os.Remove(scrubFile.Name()) + }() + fmt.Println("Scrub file is: ", scrubFile.Name()) + + os.Args = []string{"cli_test", "scrub", cfgfile.Name()} + cobraCmd := cli.NewServerRootCmd() + cobraCmd.SetOut(scrubFile) + err = cobraCmd.Execute() + So(err, ShouldBeNil) + + data, err = os.ReadFile(scrubFile.Name()) + So(err, ShouldBeNil) + So(string(data), ShouldNotContainSubstring, "affected") + }) +} + +func worker(id int, zotPort, rootDir string) { + start := time.Now() + + for i := 0; ; i++ { + cmd := fmt.Sprintf("skopeo copy --format=oci --insecure-policy --dest-tls-verify=false "+ + "dir:%s/alpine docker://localhost:%s/client%d:%d", rootDir, zotPort, id, i) + err := exec.Command("bash", "-c", cmd).Run() + if err != nil { // nolint: wsl + continue // we expect clients to receive errors due to FD limit reached on server + } + + time.Sleep(100 * time.Millisecond) + end := time.Now() + latency := end.Sub(start) + + if latency > WorkerRunningTime { + break + } + } +} + +func setMaxOpenFilesLimit(limit uint64) error { + var rLimit syscall.Rlimit + + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) + if err != nil { + return err + } + + fmt.Println("Current max. open files ", rLimit.Cur) + rLimit.Cur = limit + fmt.Println("Changing max. open files to ", limit) + + err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit) + if err != nil { + return err + } + + err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) + if err != nil { + return err + } + + fmt.Println("Max. open files is set to", rLimit.Cur) + + return nil +} + +func startServer(c *api.Controller) { + // this blocks + ctx := context.Background() + if err := c.Run(ctx); err != nil { + return + } +} + +func stopServer(c *api.Controller) { + ctx := context.Background() + _ = c.Server.Shutdown(ctx) +} diff --git a/pkg/storage/storage_fs.go b/pkg/storage/storage_fs.go index 519f98f9..e67c3d8a 100644 --- a/pkg/storage/storage_fs.go +++ b/pkg/storage/storage_fs.go @@ -677,13 +677,14 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp file = path.Join(dir, "index.json") buf, err = json.Marshal(index) - if err != nil { + if err := test.Error(err); err != nil { is.log.Error().Err(err).Str("file", file).Msg("unable to marshal JSON") return "", err } - if err := is.writeFile(file, buf); err != nil { + err = is.writeFile(file, buf) + if err := test.Error(err); err != nil { is.log.Error().Err(err).Str("file", file).Msg("unable to write") return "", err @@ -691,12 +692,13 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp if is.gc { oci, err := umoci.OpenLayout(dir) - if err != nil { + if err := test.Error(err); err != nil { return "", err } defer oci.Close() - if err := oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay)); err != nil { + err = oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay)) + if err := test.Error(err); err != nil { return "", err } } @@ -1041,7 +1043,8 @@ func (is *ImageStoreFS) FinishBlobUpload(repo string, uuid string, body io.Reade dst := is.BlobPath(repo, dstDigest) if is.dedupe && is.cache != nil { - if err := is.DedupeBlob(src, dstDigest, dst); err != nil { + err = is.DedupeBlob(src, dstDigest, dst) + if err := test.Error(err); err != nil { is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). Str("dst", dst).Msg("unable to dedupe blob")