Leave zot repositories in a consistent state after zot hits fd limit closes #359

Signed-off-by: Alexei Dodon <adodon@cisco.com>
This commit is contained in:
Alexei Dodon 2022-01-23 22:26:02 +02:00 committed by Ramkumar Chinchani
parent ba41368469
commit ad519e2d3e
7 changed files with 633 additions and 33 deletions

View File

@ -43,15 +43,18 @@ jobs:
- name: Install dependencies
run: |
cd $GITHUB_WORKSPACE
go get -u github.com/swaggo/swag/cmd/swag
go install github.com/swaggo/swag/cmd/swag@latest
go mod download
sudo apt-get update
sudo apt-get install rpm
sudo apt install snapd
sudo apt-get install skopeo
sudo apt-get install snapd
sudo apt-get install libgpgme-dev libassuan-dev libbtrfs-dev libdevmapper-dev pkg-config
git clone https://github.com/containers/skopeo -b v1.6.1 $GITHUB_WORKSPACE/src/github.com/containers/skopeo
cd $GITHUB_WORKSPACE/src/github.com/containers/skopeo && make bin/skopeo
cd $GITHUB_WORKSPACE
curl -Lo notation.tar.gz https://github.com/notaryproject/notation/releases/download/v0.7.1-alpha.1/notation_0.7.1-alpha.1_linux_amd64.tar.gz
sudo tar xvzf notation.tar.gz -C /usr/bin notation
go get github.com/wadey/gocovmerge
go install github.com/wadey/gocovmerge@latest
- name: Run build and test
timeout-minutes: 60
run: |

View File

@ -54,8 +54,9 @@ test: check-skopeo $(NOTATION)
go test -tags extended,containers_image_openpgp -v -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-extended.txt -covermode=atomic ./...
go test -tags minimal,containers_image_openpgp -v -trimpath -race -cover -coverpkg ./... -coverprofile=coverage-minimal.txt -covermode=atomic ./...
# development-mode unit tests possibly using failure injection
go test -tags dev,extended,containers_image_openpgp -v -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-dev-extended.txt -covermode=atomic ./pkg/test/... ./pkg/storage/... ./pkg/extensions/sync/... -run ^TestInject
go test -tags dev,extended,containers_image_openpgp -v -trimpath -race -timeout 15m -cover -coverpkg ./... -coverprofile=coverage-dev-extended.txt -covermode=atomic ./pkg/test/... ./pkg/api/... ./pkg/storage/... ./pkg/extensions/sync/... -run ^TestInject
go test -tags dev,minimal,containers_image_openpgp -v -trimpath -race -cover -coverpkg ./... -coverprofile=coverage-dev-minimal.txt -covermode=atomic ./pkg/test/... ./pkg/storage/... ./pkg/extensions/sync/... -run ^TestInject
go test -tags stress,extended,containers_image_openpgp -v -trimpath -race -timeout 15m ./pkg/cli/stress_test.go
.PHONY: run-bench
run-bench: binary bench
@ -95,6 +96,7 @@ check: ./golangcilint.yaml $(GOLINTER)
$(GOLINTER) --config ./golangcilint.yaml run --enable-all --out-format=colored-line-number --build-tags extended,containers_image_openpgp ./...
$(GOLINTER) --config ./golangcilint.yaml run --enable-all --out-format=colored-line-number --build-tags dev,minimal,containers_image_openpgp ./...
$(GOLINTER) --config ./golangcilint.yaml run --enable-all --out-format=colored-line-number --build-tags dev,extended,containers_image_openpgp ./...
$(GOLINTER) --config ./golangcilint.yaml run --enable-all --out-format=colored-line-number --build-tags stress,extended,containers_image_openpgp ./...
swagger/docs.go:
swag -v || go install github.com/swaggo/swag/cmd/swag

View File

@ -5,6 +5,7 @@ package api_test
import (
"bufio"
"bytes"
"context"
"crypto/tls"
"crypto/x509"
@ -4124,6 +4125,353 @@ func TestStorageCommit(t *testing.T) {
})
}
func TestInjectInterruptedImageManifest(t *testing.T) {
Convey("Make a new controller", t, func() {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
ctlr := api.NewController(conf)
dir := t.TempDir()
ctlr.Config.Storage.RootDirectory = dir
go startServer(ctlr)
defer stopServer(ctlr)
test.WaitTillServerReady(baseURL)
rthdlr := api.NewRouteHandler(ctlr)
Convey("Upload a blob & a config blob; Create an image manifest", func() {
// create a blob/layer
resp, err := resty.R().Post(baseURL + "/v2/repotest/blobs/uploads/")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusAccepted)
loc := test.Location(baseURL, resp)
So(loc, ShouldNotBeEmpty)
// since we are not specifying any prefix i.e provided in config while starting server,
// so it should store repotest to global root dir
_, err = os.Stat(path.Join(dir, "repotest"))
So(err, ShouldBeNil)
resp, err = resty.R().Get(loc)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusNoContent)
content := []byte("this is a dummy blob")
digest := godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
// monolithic blob upload: success
resp, err = resty.R().SetQueryParam("digest", digest.String()).
SetHeader("Content-Type", "application/octet-stream").SetBody(content).Put(loc)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)
blobLoc := resp.Header().Get("Location")
So(blobLoc, ShouldNotBeEmpty)
So(resp.Header().Get("Content-Length"), ShouldEqual, "0")
So(resp.Header().Get(constants.DistContentDigestKey), ShouldNotBeEmpty)
// upload image config blob
resp, err = resty.R().Post(baseURL + "/v2/repotest/blobs/uploads/")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusAccepted)
loc = test.Location(baseURL, resp)
cblob, cdigest := test.GetRandomImageConfig()
resp, err = resty.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", cdigest.String()).
SetBody(cblob).
Put(loc)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)
// create a manifest
manifest := ispec.Manifest{
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(len(content)),
},
},
}
manifest.SchemaVersion = 2
content, err = json.Marshal(manifest)
So(err, ShouldBeNil)
digest = godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
// Testing router path: @Router /v2/{name}/manifests/{reference} [put]
Convey("Uploading an image manifest blob (when injected simulates an interrupted image manifest upload)", func() {
injected := test.InjectFailure(0)
request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
response := httptest.NewRecorder()
rthdlr.UpdateManifest(response, request)
resp := response.Result()
defer resp.Body.Close()
So(resp, ShouldNotBeNil)
if injected {
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
} else {
So(resp.StatusCode, ShouldEqual, http.StatusCreated)
}
})
})
})
}
func TestInjectTooManyOpenFiles(t *testing.T) {
Convey("Make a new controller", t, func() {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
ctlr := api.NewController(conf)
dir := t.TempDir()
ctlr.Config.Storage.RootDirectory = dir
go startServer(ctlr)
defer stopServer(ctlr)
test.WaitTillServerReady(baseURL)
rthdlr := api.NewRouteHandler(ctlr)
// create a blob/layer
resp, err := resty.R().Post(baseURL + "/v2/repotest/blobs/uploads/")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusAccepted)
loc := test.Location(baseURL, resp)
So(loc, ShouldNotBeEmpty)
// since we are not specifying any prefix i.e provided in config while starting server,
// so it should store repotest to global root dir
_, err = os.Stat(path.Join(dir, "repotest"))
So(err, ShouldBeNil)
resp, err = resty.R().Get(loc)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusNoContent)
content := []byte("this is a dummy blob")
digest := godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
// monolithic blob upload
injected := test.InjectFailure(0)
if injected {
request, _ := http.NewRequestWithContext(context.TODO(), "PUT", loc, bytes.NewReader(content))
tokens := strings.Split(loc, "/")
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "session_id": tokens[len(tokens)-1]})
q := request.URL.Query()
q.Add("digest", digest.String())
request.URL.RawQuery = q.Encode()
request.Header.Set("Content-Type", "application/octet-stream")
request.Header.Set("Content-Length", fmt.Sprintf("%d", len(content)))
response := httptest.NewRecorder()
rthdlr.UpdateBlobUpload(response, request)
resp := response.Result()
defer resp.Body.Close()
So(resp, ShouldNotBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
} else {
resp, err = resty.R().SetQueryParam("digest", digest.String()).
SetHeader("Content-Type", "application/octet-stream").
SetBody(content).Put(loc)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)
blobLoc := resp.Header().Get("Location")
So(blobLoc, ShouldNotBeEmpty)
So(resp.Header().Get("Content-Length"), ShouldEqual, "0")
So(resp.Header().Get(constants.DistContentDigestKey), ShouldNotBeEmpty)
}
// upload image config blob
resp, err = resty.R().Post(baseURL + "/v2/repotest/blobs/uploads/")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusAccepted)
loc = test.Location(baseURL, resp)
cblob, cdigest := test.GetRandomImageConfig()
resp, err = resty.R().
SetContentLength(true).
SetHeader("Content-Length", fmt.Sprintf("%d", len(cblob))).
SetHeader("Content-Type", "application/octet-stream").
SetQueryParam("digest", cdigest.String()).
SetBody(cblob).
Put(loc)
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)
// create a manifest
manifest := ispec.Manifest{
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(len(content)),
},
},
}
manifest.SchemaVersion = 2
content, err = json.Marshal(manifest)
So(err, ShouldBeNil)
digest = godigest.FromBytes(content)
So(digest, ShouldNotBeNil)
// Testing router path: @Router /v2/{name}/manifests/{reference} [put]
// nolint: lll
Convey("Uploading an image manifest blob (when injected simulates that PutImageManifest failed due to 'too many open files' error)", func() {
injected := test.InjectFailure(1)
request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
response := httptest.NewRecorder()
rthdlr.UpdateManifest(response, request)
resp := response.Result()
So(resp, ShouldNotBeNil)
defer resp.Body.Close()
if injected {
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
} else {
So(resp.StatusCode, ShouldEqual, http.StatusCreated)
}
})
Convey("when injected simulates a `too many open files` error inside PutImageManifest method of img store", func() {
injected := test.InjectFailure(2)
request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
response := httptest.NewRecorder()
rthdlr.UpdateManifest(response, request)
resp := response.Result()
defer resp.Body.Close()
So(resp, ShouldNotBeNil)
if injected {
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
} else {
So(resp.StatusCode, ShouldEqual, http.StatusCreated)
}
})
Convey("code coverage: error inside PutImageManifest method of img store (unable to marshal JSON)", func() {
injected := test.InjectFailure(1)
request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
response := httptest.NewRecorder()
rthdlr.UpdateManifest(response, request)
resp := response.Result()
defer resp.Body.Close()
So(resp, ShouldNotBeNil)
if injected {
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
} else {
So(resp.StatusCode, ShouldEqual, http.StatusCreated)
}
})
Convey("code coverage: error inside PutImageManifest method of img store (umoci.OpenLayout error)", func() {
injected := test.InjectFailure(3)
request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
response := httptest.NewRecorder()
rthdlr.UpdateManifest(response, request)
resp := response.Result()
defer resp.Body.Close()
So(resp, ShouldNotBeNil)
if injected {
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
} else {
So(resp.StatusCode, ShouldEqual, http.StatusCreated)
}
})
Convey("code coverage: error inside PutImageManifest method of img store (oci.GC)", func() {
injected := test.InjectFailure(4)
request, _ := http.NewRequestWithContext(context.TODO(), "PUT", baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
request.Header.Set("Content-Type", "application/vnd.oci.image.manifest.v1+json")
response := httptest.NewRecorder()
rthdlr.UpdateManifest(response, request)
resp := response.Result()
defer resp.Body.Close()
So(resp, ShouldNotBeNil)
if injected {
So(resp.StatusCode, ShouldEqual, http.StatusInternalServerError)
} else {
So(resp.StatusCode, ShouldEqual, http.StatusCreated)
}
})
Convey("when index.json is not in json format", func() {
resp, err = resty.R().SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).Put(baseURL + "/v2/repotest/manifests/v1.0")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)
digestHdr := resp.Header().Get(constants.DistContentDigestKey)
So(digestHdr, ShouldNotBeEmpty)
So(digestHdr, ShouldEqual, digest.String())
indexFile := path.Join(dir, "repotest", "index.json")
_, err = os.Stat(indexFile)
So(err, ShouldBeNil)
indexContent := []byte(`not a JSON content`)
err = ioutil.WriteFile(indexFile, indexContent, 0o600)
So(err, ShouldBeNil)
resp, err = resty.R().SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetBody(content).Put(baseURL + "/v2/repotest/manifests/v1.1")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError)
})
})
}
func getAllBlobs(imagePath string) []string {
blobList := make([]string, 0)

View File

@ -34,6 +34,7 @@ const (
UNAUTHORIZED
DENIED
UNSUPPORTED
INVALID_INDEX
)
func (e ErrorCode) String() string {
@ -53,6 +54,7 @@ func (e ErrorCode) String() string {
UNAUTHORIZED: "UNAUTHORIZED",
DENIED: "DENIED",
UNSUPPORTED: "UNSUPPORTED",
INVALID_INDEX: "INVALID_INDEX",
}
return errMap[e]
@ -152,6 +154,11 @@ func NewError(code ErrorCode, detail ...interface{}) Error { //nolint: interface
Description: `The operation was unsupported due to a missing
implementation or invalid set of parameters.`,
},
INVALID_INDEX: {
Message: "Invalid format of index.json file of the repo",
Description: "index.json file does not contain data in json format",
},
}
err, ok := errMap[code]

View File

@ -33,6 +33,7 @@ import (
ext "zotregistry.io/zot/pkg/extensions"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/test"
// as required by swaggo.
_ "zotregistry.io/zot/swagger"
@ -428,7 +429,9 @@ func (rh *RouteHandler) UpdateManifest(response http.ResponseWriter, request *ht
}
body, err := ioutil.ReadAll(request.Body)
if err != nil {
// hard to reach test case, injected error (simulates an interrupted image manifest upload)
// err could be io.ErrUnexpectedEOF
if err := test.Error(err); err != nil {
rh.c.Log.Error().Err(err).Msg("unexpected error")
response.WriteHeader(http.StatusInternalServerError)
@ -449,8 +452,20 @@ func (rh *RouteHandler) UpdateManifest(response http.ResponseWriter, request *ht
} else if errors.Is(err, zerr.ErrBlobNotFound) {
WriteJSON(response, http.StatusBadRequest,
NewErrorList(NewError(BLOB_UNKNOWN, map[string]string{"blob": digest})))
} else if errors.Is(err, zerr.ErrRepoBadVersion) {
WriteJSON(response, http.StatusInternalServerError,
NewErrorList(NewError(INVALID_INDEX, map[string]string{"name": name})))
} else {
rh.c.Log.Error().Err(err).Msg("unexpected error")
// could be syscall.EMFILE (Err:0x18 too many opened files), etc
rh.c.Log.Error().Err(err).Msg("unexpected error: performing cleanup")
if err = imgStore.DeleteImageManifest(name, reference); err != nil {
// deletion of image manifest is important, but not critical for image repo consistancy
// in the worst scenario a partial manifest file written to disk will not affect the repo because
// the new manifest was not added to "index.json" file (it is possible that GC will take care of it)
rh.c.Log.Error().Err(err).Msgf("couldn't remove image manifest %s in repo %s", reference, name)
}
response.WriteHeader(http.StatusInternalServerError)
}
@ -954,15 +969,7 @@ func (rh *RouteHandler) PatchBlobUpload(response http.ResponseWriter, request *h
}
if err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
rh.c.Log.Warn().Msg("received unexpected EOF, removing .uploads/ files")
if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil {
rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name)
}
response.WriteHeader(http.StatusInternalServerError)
} else if errors.Is(err, zerr.ErrBadUploadRange) {
if errors.Is(err, zerr.ErrBadUploadRange) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
WriteJSON(response, http.StatusRequestedRangeNotSatisfiable,
NewErrorList(NewError(BLOB_UPLOAD_INVALID, map[string]string{"session_id": sessionID})))
} else if errors.Is(err, zerr.ErrRepoNotFound) {
@ -972,7 +979,12 @@ func (rh *RouteHandler) PatchBlobUpload(response http.ResponseWriter, request *h
WriteJSON(response, http.StatusNotFound,
NewErrorList(NewError(BLOB_UPLOAD_UNKNOWN, map[string]string{"session_id": sessionID})))
} else {
rh.c.Log.Error().Err(err).Msg("unexpected error")
// could be io.ErrUnexpectedEOF, syscall.EMFILE (Err:0x18 too many opened files), etc
rh.c.Log.Error().Err(err).Msg("unexpected error: removing .uploads/ files")
if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil {
rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name)
}
response.WriteHeader(http.StatusInternalServerError)
}
@ -1071,15 +1083,7 @@ func (rh *RouteHandler) UpdateBlobUpload(response http.ResponseWriter, request *
_, err = imgStore.PutBlobChunk(name, sessionID, from, to, request.Body)
if err != nil {
if errors.Is(err, io.ErrUnexpectedEOF) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
rh.c.Log.Warn().Msg("received unexpected EOF, removing .uploads/ files")
if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil {
rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name)
}
response.WriteHeader(http.StatusInternalServerError)
} else if errors.Is(err, zerr.ErrBadUploadRange) {
if errors.Is(err, zerr.ErrBadUploadRange) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
WriteJSON(response, http.StatusBadRequest,
NewErrorList(NewError(BLOB_UPLOAD_INVALID, map[string]string{"session_id": sessionID})))
} else if errors.Is(err, zerr.ErrRepoNotFound) {
@ -1089,7 +1093,12 @@ func (rh *RouteHandler) UpdateBlobUpload(response http.ResponseWriter, request *
WriteJSON(response, http.StatusNotFound,
NewErrorList(NewError(BLOB_UPLOAD_UNKNOWN, map[string]string{"session_id": sessionID})))
} else {
rh.c.Log.Error().Err(err).Msg("unexpected error")
// could be io.ErrUnexpectedEOF, syscall.EMFILE (Err:0x18 too many opened files), etc
rh.c.Log.Error().Err(err).Msg("unexpected error: removing .uploads/ files")
if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil {
rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name)
}
response.WriteHeader(http.StatusInternalServerError)
}
@ -1113,7 +1122,12 @@ finish:
WriteJSON(response, http.StatusNotFound,
NewErrorList(NewError(BLOB_UPLOAD_UNKNOWN, map[string]string{"session_id": sessionID})))
} else {
rh.c.Log.Error().Err(err).Msg("unexpected error")
// could be io.ErrUnexpectedEOF, syscall.EMFILE (Err:0x18 too many opened files), etc
rh.c.Log.Error().Err(err).Msg("unexpected error: removing .uploads/ files")
if err = imgStore.DeleteBlobUpload(name, sessionID); err != nil {
rh.c.Log.Error().Err(err).Msgf("couldn't remove blobUpload %s in repo %s", sessionID, name)
}
response.WriteHeader(http.StatusInternalServerError)
}

223
pkg/cli/stress_test.go Normal file
View File

@ -0,0 +1,223 @@
//go:build stress
// +build stress
package cli_test
import (
"context"
"fmt"
"io/ioutil"
"os"
"os/exec"
"sync"
"syscall"
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
"zotregistry.io/zot/pkg/api"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/cli"
"zotregistry.io/zot/pkg/test"
)
const (
MaxFileDescriptors = 512
WorkerRunningTime = 60 * time.Second
)
func TestSressTooManyOpenFiles(t *testing.T) {
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
Convey("configure zot with dedupe=false", t, func(c C) {
// In case one of the So()-assertions will fail it will allow us to print
// all the log files to figure out what happened in this test (zot log file, scrub output, storage rootFS tree)
SetDefaultFailureMode(FailureContinues)
err := setMaxOpenFilesLimit(MaxFileDescriptors)
So(err, ShouldBeNil)
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
conf.Storage.Dedupe = false
conf.Storage.GC = true
logFile, err := ioutil.TempFile("", "zot-log*.txt")
So(err, ShouldBeNil)
defer func() {
data, err := os.ReadFile(logFile.Name())
if err != nil {
fmt.Printf("error when reading zot log file:\n%s\n", err)
}
fmt.Printf("\n\n Zot log file content:\n%s\n", string(data))
os.Remove(logFile.Name())
}()
fmt.Println("Log file is: ", logFile.Name())
conf.Log.Output = logFile.Name()
ctlr := api.NewController(conf)
dir, err := ioutil.TempDir("", "oci-repo-test")
if err != nil {
panic(err)
}
defer func() {
// list the content of the directory (useful in case of test fail)
cmd := fmt.Sprintf("du -ab %s", dir)
out, err := exec.Command("bash", "-c", cmd).Output()
if err != nil {
fmt.Printf("error when listing storage files:\n%s\n", err)
}
fmt.Printf("Listing Storage root FS:\n%s\n", out)
os.RemoveAll(dir)
}()
fmt.Println("Storage root dir is: ", dir)
ctlr.Config.Storage.RootDirectory = dir
go startServer(ctlr)
test.WaitTillServerReady(baseURL)
content := fmt.Sprintf(`{
"storage": {
"rootDirectory": "%s",
"dedupe": %t,
"gc": %t
},
"http": {
"address": "127.0.0.1",
"port": "%s"
},
"log": {
"level": "debug",
"output": "%s"
}
}`, dir, conf.Storage.Dedupe, conf.Storage.GC, port, logFile.Name())
cfgfile, err := ioutil.TempFile("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(cfgfile.Name()) // clean up
_, err = cfgfile.Write([]byte(content))
So(err, ShouldBeNil)
err = cfgfile.Close()
So(err, ShouldBeNil)
cmd := fmt.Sprintf("skopeo copy --format=oci --dest-tls-verify=false "+
"--insecure-policy docker://public.ecr.aws/zomato/alpine:3.11.3 dir:%s/alpine", dir)
out, err := exec.Command("bash", "-c", cmd).Output()
if err != nil {
fmt.Printf("\nCopy skopeo docker image:\n%s\n", out)
fmt.Printf("\nerror on skopeo copy:\n%s\n", err)
}
So(err, ShouldBeNil)
var wg sync.WaitGroup
for i := 1; i <= MaxFileDescriptors; i++ {
wg.Add(1)
i := i
go func() {
defer wg.Done()
worker(i, port, dir)
}()
}
wg.Wait()
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "too many open files")
stopServer(ctlr)
time.Sleep(2 * time.Second)
scrubFile, err := ioutil.TempFile("", "zot-scrub*.txt")
So(err, ShouldBeNil)
defer func() {
data, err := os.ReadFile(scrubFile.Name())
if err != nil {
fmt.Printf("error when reading zot scrub file:\n%s\n", err)
}
fmt.Printf("\n\n Zot scrub file content:\n%s\n", string(data))
os.Remove(scrubFile.Name())
}()
fmt.Println("Scrub file is: ", scrubFile.Name())
os.Args = []string{"cli_test", "scrub", cfgfile.Name()}
cobraCmd := cli.NewServerRootCmd()
cobraCmd.SetOut(scrubFile)
err = cobraCmd.Execute()
So(err, ShouldBeNil)
data, err = os.ReadFile(scrubFile.Name())
So(err, ShouldBeNil)
So(string(data), ShouldNotContainSubstring, "affected")
})
}
func worker(id int, zotPort, rootDir string) {
start := time.Now()
for i := 0; ; i++ {
cmd := fmt.Sprintf("skopeo copy --format=oci --insecure-policy --dest-tls-verify=false "+
"dir:%s/alpine docker://localhost:%s/client%d:%d", rootDir, zotPort, id, i)
err := exec.Command("bash", "-c", cmd).Run()
if err != nil { // nolint: wsl
continue // we expect clients to receive errors due to FD limit reached on server
}
time.Sleep(100 * time.Millisecond)
end := time.Now()
latency := end.Sub(start)
if latency > WorkerRunningTime {
break
}
}
}
func setMaxOpenFilesLimit(limit uint64) error {
var rLimit syscall.Rlimit
err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
return err
}
fmt.Println("Current max. open files ", rLimit.Cur)
rLimit.Cur = limit
fmt.Println("Changing max. open files to ", limit)
err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
return err
}
err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit)
if err != nil {
return err
}
fmt.Println("Max. open files is set to", rLimit.Cur)
return nil
}
func startServer(c *api.Controller) {
// this blocks
ctx := context.Background()
if err := c.Run(ctx); err != nil {
return
}
}
func stopServer(c *api.Controller) {
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}

View File

@ -677,13 +677,14 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp
file = path.Join(dir, "index.json")
buf, err = json.Marshal(index)
if err != nil {
if err := test.Error(err); err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to marshal JSON")
return "", err
}
if err := is.writeFile(file, buf); err != nil {
err = is.writeFile(file, buf)
if err := test.Error(err); err != nil {
is.log.Error().Err(err).Str("file", file).Msg("unable to write")
return "", err
@ -691,12 +692,13 @@ func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaTyp
if is.gc {
oci, err := umoci.OpenLayout(dir)
if err != nil {
if err := test.Error(err); err != nil {
return "", err
}
defer oci.Close()
if err := oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay)); err != nil {
err = oci.GC(context.Background(), ifOlderThan(is, repo, is.gcDelay))
if err := test.Error(err); err != nil {
return "", err
}
}
@ -1041,7 +1043,8 @@ func (is *ImageStoreFS) FinishBlobUpload(repo string, uuid string, body io.Reade
dst := is.BlobPath(repo, dstDigest)
if is.dedupe && is.cache != nil {
if err := is.DedupeBlob(src, dstDigest, dst); err != nil {
err = is.DedupeBlob(src, dstDigest, dst)
if err := test.Error(err); err != nil {
is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()).
Str("dst", dst).Msg("unable to dedupe blob")