Add wait group for graceful shutdown, closes #302

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
Petu Eusebiu 2021-12-02 19:45:26 +02:00 committed by Ramkumar Chinchani
parent f011192615
commit 627cb97ef1
8 changed files with 68 additions and 105 deletions

View File

@ -1,12 +1,14 @@
package api
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"net/http"
goSync "sync"
"time"
"github.com/gorilla/handlers"
@ -34,6 +36,7 @@ type Controller struct {
Audit *log.Logger
Server *http.Server
Metrics monitoring.MetricServer
wgShutDown *goSync.WaitGroup // use it to gracefully shutdown goroutines
}
func NewController(config *config.Config) *Controller {
@ -43,6 +46,7 @@ func NewController(config *config.Config) *Controller {
controller.Config = config
controller.Log = logger
controller.wgShutDown = new(goSync.WaitGroup)
if config.Log.Audit != "" {
audit := log.NewAuditLogger(config.Log.Level, config.Log.Audit)
@ -195,7 +199,7 @@ func (c *Controller) Run() error {
// Enable extensions if extension config is provided
if c.Config.Extensions != nil && c.Config.Extensions.Sync != nil {
ext.EnableSyncExtension(c.Config, c.Log, c.StoreController)
ext.EnableSyncExtension(c.Config, c.wgShutDown, c.StoreController, c.Log)
}
monitoring.SetServerInfo(c.Metrics, c.Config.Commit, c.Config.BinaryType, c.Config.GoVersion, c.Config.Version)
@ -247,3 +251,11 @@ func (c *Controller) Run() error {
return server.Serve(l)
}
func (c *Controller) Shutdown() {
// wait gracefully
c.wgShutDown.Wait()
ctx := context.Background()
_ = c.Server.Shutdown(ctx)
}

View File

@ -1271,7 +1271,7 @@ func getImageManifest(rh *RouteHandler, is storage.ImageStore, name,
if rh.c.Config.Extensions != nil && rh.c.Config.Extensions.Sync != nil {
rh.c.Log.Info().Msgf("image not found, trying to get image %s:%s by syncing on demand", name, reference)
errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, rh.c.StoreController, name, reference)
errSync := ext.SyncOneImage(rh.c.Config, rh.c.StoreController, name, reference, rh.c.Log)
if errSync != nil {
rh.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s", name, reference)
} else {
@ -1283,7 +1283,7 @@ func getImageManifest(rh *RouteHandler, is storage.ImageStore, name,
if rh.c.Config.Extensions != nil && rh.c.Config.Extensions.Sync != nil {
rh.c.Log.Info().Msgf("manifest not found, trying to get image %s:%s by syncing on demand", name, reference)
errSync := ext.SyncOneImage(rh.c.Config, rh.c.Log, rh.c.StoreController, name, reference)
errSync := ext.SyncOneImage(rh.c.Config, rh.c.StoreController, name, reference, rh.c.Log)
if errSync != nil {
rh.c.Log.Err(errSync).Msgf("error encounter while syncing image %s:%s", name, reference)
} else {

View File

@ -4,6 +4,7 @@
package extensions
import (
goSync "sync"
"time"
gqlHandler "github.com/99designs/gqlgen/graphql/handler"
@ -68,7 +69,8 @@ func EnableExtensions(config *config.Config, log log.Logger, rootDir string) {
}
// EnableSyncExtension enables sync extension.
func EnableSyncExtension(config *config.Config, log log.Logger, storeController storage.StoreController) {
func EnableSyncExtension(config *config.Config, wg *goSync.WaitGroup,
storeController storage.StoreController, log log.Logger) {
if config.Extensions.Sync != nil {
defaultPollInterval, _ := time.ParseDuration("1h")
for id, registryCfg := range config.Extensions.Sync.Registries {
@ -83,7 +85,7 @@ func EnableSyncExtension(config *config.Config, log log.Logger, storeController
}
}
if err := sync.Run(*config.Extensions.Sync, storeController, log); err != nil {
if err := sync.Run(*config.Extensions.Sync, storeController, wg, log); err != nil {
log.Error().Err(err).Msg("Error encountered while setting up syncing")
}
} else {
@ -128,11 +130,11 @@ func SetupRoutes(config *config.Config, router *mux.Router, storeController stor
}
// SyncOneImage syncs one image.
func SyncOneImage(config *config.Config, log log.Logger,
storeController storage.StoreController, repoName, reference string) error {
func SyncOneImage(config *config.Config, storeController storage.StoreController,
repoName, reference string, log log.Logger) error {
log.Info().Msgf("syncing image %s:%s", repoName, reference)
err := sync.OneImage(*config.Extensions.Sync, log, storeController, repoName, reference)
err := sync.OneImage(*config.Extensions.Sync, storeController, repoName, reference, log)
return err
}

View File

@ -4,6 +4,7 @@
package extensions
import (
goSync "sync"
"time"
"github.com/gorilla/mux"
@ -24,7 +25,8 @@ func EnableExtensions(config *config.Config, log log.Logger, rootDir string) {
}
// EnableSyncExtension ...
func EnableSyncExtension(config *config.Config, log log.Logger, storeController storage.StoreController) {
func EnableSyncExtension(config *config.Config, wg *goSync.WaitGroup,
storeController storage.StoreController, log log.Logger) {
log.Warn().Msg("skipping enabling sync extension because given zot binary doesn't support any extensions," +
"please build zot full binary for this feature")
}
@ -36,8 +38,8 @@ func SetupRoutes(conf *config.Config, router *mux.Router, storeController storag
}
// SyncOneImage ...
func SyncOneImage(config *config.Config, log log.Logger, storeController storage.StoreController,
repoName, reference string) error {
func SyncOneImage(config *config.Config, storeController storage.StoreController,
repoName, reference string, log log.Logger) error {
log.Warn().Msg("skipping syncing on demand because given zot binary doesn't support any extensions," +
"please build zot full binary for this feature")
return nil

View File

@ -17,8 +17,8 @@ import (
"zotregistry.io/zot/pkg/storage"
)
func OneImage(cfg Config, log log.Logger,
storeController storage.StoreController, repo, tag string) error {
func OneImage(cfg Config, storeController storage.StoreController,
repo, tag string, log log.Logger) error {
var credentialsFile CredentialsFile
if cfg.CredentialsFile != "" {

View File

@ -12,6 +12,7 @@ import (
"path"
"regexp"
"strings"
goSync "sync"
"time"
"github.com/Masterminds/semver"
@ -438,7 +439,7 @@ func getLocalContexts(log log.Logger) (*types.SystemContext, *signature.PolicyCo
return localCtx, policyContext, nil
}
func Run(cfg Config, storeController storage.StoreController, logger log.Logger) error {
func Run(cfg Config, storeController storage.StoreController, wg *goSync.WaitGroup, logger log.Logger) error {
var credentialsFile CredentialsFile
var err error
@ -468,6 +469,9 @@ func Run(cfg Config, storeController storage.StoreController, logger log.Logger)
continue
}
// increment reference since will be busy, so shutdown has to wait
wg.Add(1)
// schedule each registry sync
ticker := time.NewTicker(regCfg.PollInterval)
@ -484,6 +488,8 @@ func Run(cfg Config, storeController storage.StoreController, logger log.Logger)
l.Error().Err(err).Msg("sync exited with error, stopping it...")
ticker.Stop()
}
// mark as done after a single sync run
wg.Done()
}
}(regCfg, l)
}

View File

@ -7,6 +7,7 @@ import (
"io/ioutil"
"os"
"path"
goSync "sync"
"testing"
"time"
@ -101,7 +102,7 @@ func TestSyncInternal(t *testing.T) {
cfg := Config{Registries: []RegistryConfig{syncRegistryConfig}, CredentialsFile: "/invalid/path/to/file"}
So(Run(cfg, storage.StoreController{}, log.NewLogger("debug", "")), ShouldNotBeNil)
So(Run(cfg, storage.StoreController{}, new(goSync.WaitGroup), log.NewLogger("debug", "")), ShouldNotBeNil)
_, err = getFileCredentials("/invalid/path/to/file")
So(err, ShouldNotBeNil)

View File

@ -4,7 +4,6 @@
package sync_test
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
@ -235,9 +234,7 @@ func TestSyncOnDemand(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
regex := ".*"
@ -269,9 +266,7 @@ func TestSyncOnDemand(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
var srcTagsList TagsList
@ -369,9 +364,7 @@ func TestSync(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
regex := ".*"
@ -400,9 +393,7 @@ func TestSync(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
var srcTagsList TagsList
@ -480,9 +471,7 @@ func TestSync(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
var srcTagsList TagsList
@ -551,9 +540,7 @@ func TestSyncPermsDenied(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
regex := ".*"
@ -582,9 +569,7 @@ func TestSyncPermsDenied(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
err := os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0000)
@ -608,9 +593,7 @@ func TestSyncBadTLS(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
regex := ".*"
@ -639,9 +622,7 @@ func TestSyncBadTLS(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
// give it time to set up sync
@ -669,9 +650,7 @@ func TestSyncTLS(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
var srcIndex ispec.Index
@ -738,9 +717,7 @@ func TestSyncTLS(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
// wait till ready
@ -781,9 +758,7 @@ func TestSyncBasicAuth(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
Convey("Verify sync basic auth with file credentials", func() {
@ -811,9 +786,7 @@ func TestSyncBasicAuth(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
var srcTagsList TagsList
@ -915,9 +888,7 @@ func TestSyncBasicAuth(t *testing.T) {
}()
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
// wait till ready
@ -982,9 +953,7 @@ func TestSyncBasicAuth(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
@ -1028,9 +997,7 @@ func TestSyncBasicAuth(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
var srcTagsList TagsList
@ -1119,9 +1086,7 @@ func TestSyncBadURL(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
Convey("Test sync on POST request on /sync", func() {
@ -1141,9 +1106,7 @@ func TestSyncNoImagesByRegex(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
regex := "9.9.9"
@ -1170,9 +1133,7 @@ func TestSyncNoImagesByRegex(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
Convey("Test sync on POST request on /sync", func() {
@ -1205,9 +1166,7 @@ func TestSyncInvalidRegex(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
regex := "["
@ -1234,9 +1193,7 @@ func TestSyncInvalidRegex(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
Convey("Test sync on POST request on /sync", func() {
@ -1256,9 +1213,7 @@ func TestSyncNotSemver(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
// get manifest so we can update it with a semver non compliant tag
@ -1300,9 +1255,7 @@ func TestSyncNotSemver(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
Convey("Test sync on POST request on /sync", func() {
@ -1335,9 +1288,7 @@ func TestSyncInvalidCerts(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
// copy client certs, use them in sync config
@ -1397,9 +1348,7 @@ func TestSyncInvalidCerts(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
Convey("Test sync on POST request on /sync", func() {
@ -1456,9 +1405,7 @@ func TestSyncInvalidUrl(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag)
@ -1475,9 +1422,7 @@ func TestSyncInvalidTags(t *testing.T) {
defer os.RemoveAll(srcDir)
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
regex := ".*"
@ -1509,9 +1454,7 @@ func TestSyncInvalidTags(t *testing.T) {
defer os.RemoveAll(destDir)
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
dc.Shutdown()
}()
resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "invalid:tag")
@ -1565,9 +1508,7 @@ func TestSyncSubPaths(t *testing.T) {
}
defer func() {
ctx := context.Background()
_ = sc.Server.Shutdown(ctx)
time.Sleep(500 * time.Millisecond)
sc.Shutdown()
}()
regex := ".*"
@ -1646,8 +1587,7 @@ func TestSyncSubPaths(t *testing.T) {
}
defer func() {
ctx := context.Background()
_ = dc.Server.Shutdown(ctx)
dc.Shutdown()
}()
var destTagsList TagsList