feat(metrics): add scheduler related metrics (#2076)

Signed-off-by: Alexei Dodon <adodon@cisco.com>
This commit is contained in:
Alexei Dodon 2023-12-05 00:13:50 +02:00 committed by GitHub
parent 8bac653dd2
commit 2e733b3f4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 566 additions and 69 deletions

View File

@ -377,7 +377,7 @@ func (c *Controller) Shutdown() {
}
func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
c.taskScheduler = scheduler.NewScheduler(c.Config, c.Log)
c.taskScheduler = scheduler.NewScheduler(c.Config, c.Metrics, c.Log)
c.taskScheduler.RunScheduler(reloadCtx)
// Enable running garbage-collect periodically for DefaultStore

View File

@ -3,6 +3,7 @@ package api
import (
"context"
"encoding/gob"
"fmt"
"io/fs"
"os"
"path"
@ -157,3 +158,12 @@ func (cleanTask *CleanTask) DoWork(ctx context.Context) error {
return nil
}
func (cleanTask *CleanTask) String() string {
return fmt.Sprintf("{Name: %s, sessions: %s}",
cleanTask.Name(), cleanTask.sessions)
}
func (cleanTask *CleanTask) Name() string {
return "SessionCleanupTask"
}

View File

@ -10,6 +10,7 @@ import (
"fmt"
"math/big"
"net/http"
"runtime"
"strings"
"sync"
"testing"
@ -25,6 +26,7 @@ import (
zotcfg "zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/exporter/api"
"zotregistry.io/zot/pkg/extensions/monitoring"
"zotregistry.io/zot/pkg/scheduler"
. "zotregistry.io/zot/pkg/test/common"
)
@ -69,12 +71,22 @@ func readDefaultMetrics(collector *api.Collector, chMetric chan prometheus.Metri
So(err, ShouldBeNil)
So(*metric.Gauge.Value, ShouldEqual, 1)
pmMetric = <-chMetric
So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_scheduler_workers_total"].String())
err = pmMetric.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Gauge.Value, ShouldEqual, runtime.NumCPU()*scheduler.NumWorkersMultiplier)
pmMetric = <-chMetric
So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_info"].String())
err = pmMetric.Write(&metric)
So(err, ShouldBeNil)
So(*metric.Gauge.Value, ShouldEqual, 0)
pmMetric = <-chMetric
So(pmMetric.Desc().String(), ShouldEqual, collector.MetricsDesc["zot_scheduler_generators_total"].String())
}
func TestNewExporter(t *testing.T) {

View File

@ -18,6 +18,6 @@ func IsBuiltWithUserPrefsExtension() bool {
func SetupUserPreferencesRoutes(config *config.Config, router *mux.Router,
metaDB mTypes.MetaDB, log log.Logger,
) {
log.Warn().Msg("userprefs extension is disabled because given zot binary doesn't" +
log.Warn().Msg("userprefs extension is disabled because given zot binary doesn't " +
"include this feature please build a binary that does so")
}

View File

@ -5,6 +5,7 @@ package imagetrust
import (
"context"
"fmt"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
@ -274,3 +275,13 @@ func (validityT *validityTask) DoWork(ctx context.Context) error {
return nil
}
func (validityT *validityTask) String() string {
return fmt.Sprintf("{sigValidityTaskGenerator: %s, repo: %s}",
"signatures validity task", // description of generator's task purpose
validityT.repo.Name)
}
func (validityT *validityTask) Name() string {
return "SignatureValidityTask"
}

View File

@ -83,6 +83,53 @@ var (
},
[]string{"storageName", "lockType"},
)
schedulerGenerators = promauto.NewCounter( //nolint: gochecknoglobals
prometheus.CounterOpts{
Namespace: metricsNamespace,
Name: "scheduler_generators_total",
Help: "Total number of generators registered in scheduler",
},
)
schedulerGeneratorsStatus = promauto.NewGaugeVec( //nolint: gochecknoglobals
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "scheduler_generators_status",
Help: "Scheduler generators by priority & state",
},
[]string{"priority", "state"},
)
schedulerNumWorkers = promauto.NewGauge( //nolint: gochecknoglobals
prometheus.GaugeOpts{ //nolint: promlinter
Namespace: metricsNamespace,
Name: "scheduler_workers_total",
Help: "Total number of available workers to perform scheduler tasks",
},
)
schedulerWorkers = promauto.NewGaugeVec( //nolint: gochecknoglobals
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "scheduler_workers",
Help: "Scheduler workers state",
},
[]string{"state"},
)
schedulerTasksQueue = promauto.NewGaugeVec( //nolint: gochecknoglobals
prometheus.GaugeOpts{
Namespace: metricsNamespace,
Name: "scheduler_tasksqueue_length",
Help: "Number of tasks waiting in the queue to pe processed by scheduler workers",
},
[]string{"priority"},
)
workersTasksDuration = promauto.NewHistogramVec( //nolint: gochecknoglobals
prometheus.HistogramOpts{
Namespace: metricsNamespace,
Name: "scheduler_workers_tasks_duration_seconds",
Help: "How long it takes for a worker to execute a task",
Buckets: GetDefaultBuckets(),
},
[]string{"name"},
)
)
type metricServer struct {
@ -169,7 +216,7 @@ func IncDownloadCounter(ms MetricServer, repo string) {
}
func SetStorageUsage(ms MetricServer, rootDir, repo string) {
ms.SendMetric(func() {
ms.ForceSendMetric(func() {
dir := path.Join(rootDir, repo)
repoSize, err := GetDirSize(dir)
@ -196,3 +243,47 @@ func ObserveStorageLockLatency(ms MetricServer, latency time.Duration, storageNa
storageLockLatency.WithLabelValues(storageName, lockType).Observe(latency.Seconds())
})
}
func IncSchedulerGenerators(ms MetricServer) {
ms.ForceSendMetric(func() {
schedulerGenerators.Inc()
})
}
func SetSchedulerGenerators(ms MetricServer, gen map[string]map[string]uint64) {
ms.SendMetric(func() {
for priority, states := range gen {
for state, value := range states {
schedulerGeneratorsStatus.WithLabelValues(priority, state).Set(float64(value))
}
}
})
}
func SetSchedulerNumWorkers(ms MetricServer, total int) {
ms.SendMetric(func() {
schedulerNumWorkers.Set(float64(total))
})
}
func SetSchedulerWorkers(ms MetricServer, w map[string]int) {
ms.SendMetric(func() {
for state, value := range w {
schedulerWorkers.WithLabelValues(state).Set(float64(value))
}
})
}
func SetSchedulerTasksQueue(ms MetricServer, tq map[string]int) {
ms.SendMetric(func() {
for priority, value := range tq {
schedulerTasksQueue.WithLabelValues(priority).Set(float64(value))
}
})
}
func ObserveWorkersTasksDuration(ms MetricServer, taskName string, duration time.Duration) {
ms.SendMetric(func() {
workersTasksDuration.WithLabelValues(taskName).Observe(duration.Seconds())
})
}

View File

@ -18,17 +18,23 @@ import (
const (
metricsNamespace = "zot"
// Counters.
httpConnRequests = metricsNamespace + ".http.requests"
repoDownloads = metricsNamespace + ".repo.downloads"
repoUploads = metricsNamespace + ".repo.uploads"
httpConnRequests = metricsNamespace + ".http.requests"
repoDownloads = metricsNamespace + ".repo.downloads"
repoUploads = metricsNamespace + ".repo.uploads"
schedulerGenerators = metricsNamespace + ".scheduler.generators"
// Gauge.
repoStorageBytes = metricsNamespace + ".repo.storage.bytes"
serverInfo = metricsNamespace + ".info"
repoStorageBytes = metricsNamespace + ".repo.storage.bytes"
serverInfo = metricsNamespace + ".info"
schedulerNumWorkers = metricsNamespace + ".scheduler.workers.total"
schedulerWorkers = metricsNamespace + ".scheduler.workers"
schedulerGeneratorsStatus = metricsNamespace + ".scheduler.generators.status"
schedulerTasksQueue = metricsNamespace + ".scheduler.tasksqueue.length"
// Summary.
httpRepoLatencySeconds = metricsNamespace + ".http.repo.latency.seconds"
// Histogram.
httpMethodLatencySeconds = metricsNamespace + ".http.method.latency.seconds"
storageLockLatencySeconds = metricsNamespace + ".storage.lock.latency.seconds"
workersTasksDuration = metricsNamespace + ".scheduler.workers.tasks.duration.seconds"
metricsScrapeTimeout = 2 * time.Minute
metricsScrapeCheckInterval = 30 * time.Second
@ -39,7 +45,7 @@ type metricServer struct {
lastCheck time.Time
reqChan chan interface{}
cache *MetricsInfo
cacheChan chan *MetricsInfo
cacheChan chan MetricsCopy
bucketsF2S map[float64]string // float64 to string conversion of buckets label
log log.Logger
lock *sync.RWMutex
@ -51,6 +57,12 @@ type MetricsInfo struct {
Summaries []*SummaryValue
Histograms []*HistogramValue
}
type MetricsCopy struct {
Counters []CounterValue
Gauges []GaugeValue
Summaries []SummaryValue
Histograms []HistogramValue
}
// CounterValue stores info about a metric that is incremented over time,
// such as the number of requests to an HTTP endpoint.
@ -118,7 +130,7 @@ func (ms *metricServer) ReceiveMetrics() interface{} {
ms.enabled = true
}
ms.lock.Unlock()
ms.cacheChan <- &MetricsInfo{}
ms.cacheChan <- MetricsCopy{}
return <-ms.cacheChan
}
@ -145,7 +157,29 @@ func (ms *metricServer) Run() {
select {
case <-ms.cacheChan:
ms.lastCheck = time.Now()
ms.cacheChan <- ms.cache
// make a copy of cache values to prevent data race
metrics := MetricsCopy{
Counters: make([]CounterValue, len(ms.cache.Counters)),
Gauges: make([]GaugeValue, len(ms.cache.Gauges)),
Summaries: make([]SummaryValue, len(ms.cache.Summaries)),
Histograms: make([]HistogramValue, len(ms.cache.Histograms)),
}
for i, cv := range ms.cache.Counters {
metrics.Counters[i] = *cv
}
for i, gv := range ms.cache.Gauges {
metrics.Gauges[i] = *gv
}
for i, sv := range ms.cache.Summaries {
metrics.Summaries[i] = *sv
}
for i, hv := range ms.cache.Histograms {
metrics.Histograms[i] = *hv
}
ms.cacheChan <- metrics
case m := <-ms.reqChan:
switch v := m.(type) {
case CounterValue:
@ -200,7 +234,7 @@ func NewMetricsServer(enabled bool, log log.Logger) MetricServer {
ms := &metricServer{
enabled: enabled,
reqChan: make(chan interface{}),
cacheChan: make(chan *MetricsInfo),
cacheChan: make(chan MetricsCopy),
cache: mi,
bucketsF2S: bucketsFloat2String,
log: log,
@ -215,16 +249,21 @@ func NewMetricsServer(enabled bool, log log.Logger) MetricServer {
// contains a map with key=CounterName and value=CounterLabels.
func GetCounters() map[string][]string {
return map[string][]string{
httpConnRequests: {"method", "code"},
repoDownloads: {"repo"},
repoUploads: {"repo"},
httpConnRequests: {"method", "code"},
repoDownloads: {"repo"},
repoUploads: {"repo"},
schedulerGenerators: {},
}
}
func GetGauges() map[string][]string {
return map[string][]string{
repoStorageBytes: {"repo"},
serverInfo: {"commit", "binaryType", "goVersion", "version"},
repoStorageBytes: {"repo"},
serverInfo: {"commit", "binaryType", "goVersion", "version"},
schedulerNumWorkers: {},
schedulerGeneratorsStatus: {"priority", "state"},
schedulerTasksQueue: {"priority"},
schedulerWorkers: {"state"},
}
}
@ -238,6 +277,7 @@ func GetHistograms() map[string][]string {
return map[string][]string{
httpMethodLatencySeconds: {"method"},
storageLockLatencySeconds: {"storageName", "lockType"},
workersTasksDuration: {"name"},
}
}
@ -533,3 +573,66 @@ func GetBuckets(metricName string) []float64 {
return GetDefaultBuckets()
}
}
func SetSchedulerNumWorkers(ms MetricServer, workers int) {
numWorkers := GaugeValue{
Name: schedulerNumWorkers,
Value: float64(workers),
}
ms.ForceSendMetric(numWorkers)
}
func IncSchedulerGenerators(ms MetricServer) {
genCounter := CounterValue{
Name: schedulerGenerators,
}
ms.ForceSendMetric(genCounter)
}
func ObserveWorkersTasksDuration(ms MetricServer, taskName string, duration time.Duration) {
h := HistogramValue{
Name: workersTasksDuration,
Sum: duration.Seconds(), // convenient temporary store for Histogram latency value
LabelNames: []string{"name"},
LabelValues: []string{taskName},
}
ms.SendMetric(h)
}
func SetSchedulerGenerators(ms MetricServer, gen map[string]map[string]uint64) {
for priority, states := range gen {
for state, value := range states {
generator := GaugeValue{
Name: schedulerGeneratorsStatus,
Value: float64(value),
LabelNames: []string{"priority", "state"},
LabelValues: []string{priority, state},
}
ms.SendMetric(generator)
}
}
}
func SetSchedulerTasksQueue(ms MetricServer, tq map[string]int) {
for priority, value := range tq {
tasks := GaugeValue{
Name: schedulerTasksQueue,
Value: float64(value),
LabelNames: []string{"priority"},
LabelValues: []string{priority},
}
ms.SendMetric(tasks)
}
}
func SetSchedulerWorkers(ms MetricServer, w map[string]int) {
for state, value := range w {
workers := GaugeValue{
Name: schedulerWorkers,
Value: float64(value),
LabelNames: []string{"state"},
LabelValues: []string{state},
}
ms.SendMetric(workers)
}
}

View File

@ -461,7 +461,8 @@ func TestPopulateStorageMetrics(t *testing.T) {
err = WriteImageToFileSystem(CreateDefaultImage(), "busybox", "0.0.1", srcStorageCtlr)
So(err, ShouldBeNil)
sch := scheduler.NewScheduler(conf, ctlr.Log)
metrics := monitoring.NewMetricsServer(true, ctlr.Log)
sch := scheduler.NewScheduler(conf, metrics, ctlr.Log)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)

View File

@ -63,3 +63,13 @@ func NewTask(imgStore storageTypes.ImageStore, repo string, log log.Logger) *Tas
func (scrubT *Task) DoWork(ctx context.Context) error {
return RunScrubRepo(ctx, scrubT.imgStore, scrubT.repo, scrubT.log) //nolint: contextcheck
}
func (scrubT *Task) String() string {
return fmt.Sprintf("{taskGenerator: \"%s\", repo: \"%s\"}",
"image scrub", // description of generator's task purpose
scrubT.repo)
}
func (scrubT *Task) Name() string {
return "ScrubTask"
}

View File

@ -2,6 +2,7 @@ package cveinfo
import (
"context"
"fmt"
"sync"
"zotregistry.io/zot/pkg/log"
@ -194,3 +195,12 @@ func (st *scanTask) DoWork(ctx context.Context) error {
return nil
}
func (st *scanTask) String() string {
return fmt.Sprintf("{Name: \"%s\", repo: \"%s\", digest: \"%s\"}",
st.Name(), st.repo, st.digest)
}
func (st *scanTask) Name() string {
return "ScanTask"
}

View File

@ -57,7 +57,8 @@ func TestScanGeneratorWithMockedData(t *testing.T) { //nolint: gocyclo
cfg := config.New()
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
sch := scheduler.NewScheduler(cfg, logger)
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(cfg, metrics, logger)
params := boltdb.DBParameters{
RootDir: t.TempDir(),
@ -502,8 +503,9 @@ func TestScanGeneratorWithRealData(t *testing.T) {
metaDB, err := boltdb.New(boltDriver, logger)
So(err, ShouldBeNil)
metrics := monitoring.NewMetricsServer(true, logger)
imageStore := local.NewImageStore(rootDir, false, false,
logger, monitoring.NewMetricsServer(false, logger), nil, nil)
logger, metrics, nil, nil)
storeController := storage.StoreController{DefaultStore: imageStore}
image := CreateRandomVulnerableImage()
@ -520,7 +522,7 @@ func TestScanGeneratorWithRealData(t *testing.T) {
So(scanner.IsResultCached(image.DigestStr()), ShouldBeFalse)
sch := scheduler.NewScheduler(cfg, logger)
sch := scheduler.NewScheduler(cfg, metrics, logger)
generator := cveinfo.NewScanTaskGenerator(metaDB, scanner, logger)

View File

@ -2,6 +2,7 @@ package cveinfo
import (
"context"
"fmt"
"sync"
"time"
@ -118,3 +119,11 @@ func (dbt *dbUpdateTask) DoWork(ctx context.Context) error {
return nil
}
func (dbt *dbUpdateTask) String() string {
return fmt.Sprintf("{Name: %s}", dbt.Name())
}
func (dbt *dbUpdateTask) Name() string {
return "DBUpdateTask"
}

View File

@ -14,6 +14,7 @@ import (
. "github.com/smartystreets/goconvey/convey"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/extensions/monitoring"
cveinfo "zotregistry.io/zot/pkg/extensions/search/cve"
"zotregistry.io/zot/pkg/log"
mTypes "zotregistry.io/zot/pkg/meta/types"
@ -37,7 +38,8 @@ func TestCVEDBGenerator(t *testing.T) {
cfg := config.New()
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
sch := scheduler.NewScheduler(cfg, logger)
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(cfg, metrics, logger)
metaDB := &mocks.MetaDBMock{
GetRepoMetaFn: func(ctx context.Context, repo string) (mTypes.RepoMeta, error) {

View File

@ -5,6 +5,7 @@ package sync
import (
"context"
"fmt"
"github.com/containers/common/pkg/retry"
"github.com/containers/image/v5/types"
@ -137,3 +138,12 @@ func newSyncRepoTask(repo string, service Service) *syncRepoTask {
func (srt *syncRepoTask) DoWork(ctx context.Context) error {
return srt.service.SyncRepo(ctx, srt.repo)
}
func (srt *syncRepoTask) String() string {
return fmt.Sprintf("{Name: \"%s\", repo: \"%s\"}",
srt.Name(), srt.repo)
}
func (srt *syncRepoTask) Name() string {
return "SyncTask"
}

View File

@ -2,9 +2,9 @@
## What is a generator and how should it be implemented?
In order to create a new generator (which will generate new tasks one by one) and add it to the scheduler there are 4 methods which should be implemented:
1. ***GenerateTask() (Task, error)***
1. ***Next() (Task, error)***
```
This method should implement the logic for generating a new task.
This method should implement the logic for generating a new task.
Basically, when this method is called by the scheduler it should return the next task until there are no more tasks to be generated.
Also, the Task returned by this method should implement DoWork(ctx context.Context) method which should contain the logic that should be executed when this task is run by the scheduler.
```
@ -27,15 +27,28 @@ In order to create a new generator (which will generate new tasks one by one) an
The scheduler accepts both periodic and non-periodic generators.
To submit a generator to the scheduler, ***SubmitGenerator*** should be called with the implemented generator, interval of time (which should be time.Duration(0) in case of non-periodic generator, or the interval for the periodic generator) and the priority of the tasks which will be generated by this generator as parameters.
Notes:
- A generator should submit only tasks having the same priority
- The priority of a task can be: LowPriorirty, MediumPriority or HighPriority
- The priority of a task can be: LowPriority, MediumPriority or HighPriority
# How to submit a Task to the scheduler
In order to create a new task and add it to the scheduler ***DoWork(ctx context.Context) error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler.
In order to create a new task and add it to the scheduler we need to implement below methods:
1. ***DoWork(ctx context.Context) error***
```
This should contain the logic that should be executed when this task is run by the scheduler.
```
2. ***Name() string***
```
Name of the task.
```
3. ***String() string***
```
Description of the task. Used in debugging to identify executed task.
```
To submit a task to the scheduler ***SubmitTask*** should be called with the implemented task and the priority of the task as parameters.

View File

@ -9,11 +9,14 @@ import (
"time"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/extensions/monitoring"
"zotregistry.io/zot/pkg/log"
)
type Task interface {
DoWork(ctx context.Context) error
Name() string
String() string
}
type generatorsPriorityQueue []*generator
@ -58,25 +61,31 @@ func (pq *generatorsPriorityQueue) Pop() any {
const (
rateLimiterScheduler = 400
rateLimit = 5 * time.Second
numWorkersMultiplier = 4
NumWorkersMultiplier = 4
sendMetricsInterval = 5 * time.Second
)
type Scheduler struct {
tasksQLow chan Task
tasksQMedium chan Task
tasksQHigh chan Task
tasksDoWork int
tasksLock *sync.Mutex
generators generatorsPriorityQueue
waitingGenerators []*generator
doneGenerators []*generator
generatorsLock *sync.Mutex
log log.Logger
RateLimit time.Duration
NumWorkers int
workerChan chan Task
metricsChan chan struct{}
workerWg *sync.WaitGroup
isShuttingDown atomic.Bool
metricServer monitoring.MetricServer
}
func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
func NewScheduler(cfg *config.Config, ms monitoring.MetricServer, logC log.Logger) *Scheduler { //nolint: varnamelen
chLow := make(chan Task, rateLimiterScheduler)
chMedium := make(chan Task, rateLimiterScheduler)
chHigh := make(chan Task, rateLimiterScheduler)
@ -85,19 +94,25 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler {
sublogger := logC.With().Str("component", "scheduler").Logger()
heap.Init(&generatorPQ)
// force pushing this metric (for zot minimal metrics are enabled on first scraping)
monitoring.SetSchedulerNumWorkers(ms, numWorkers)
return &Scheduler{
tasksQLow: chLow,
tasksQMedium: chMedium,
tasksQHigh: chHigh,
tasksDoWork: 0, // number of tasks that are in working state
tasksLock: new(sync.Mutex),
generators: generatorPQ,
generatorsLock: new(sync.Mutex),
log: log.Logger{Logger: sublogger},
// default value
RateLimit: rateLimit,
NumWorkers: numWorkers,
workerChan: make(chan Task, numWorkers),
workerWg: new(sync.WaitGroup),
RateLimit: rateLimit,
NumWorkers: numWorkers,
workerChan: make(chan Task, numWorkers),
metricsChan: make(chan struct{}, 1),
workerWg: new(sync.WaitGroup),
metricServer: ms,
}
}
@ -106,19 +121,95 @@ func (scheduler *Scheduler) poolWorker(ctx context.Context) {
go func(workerID int) {
defer scheduler.workerWg.Done()
for task := range scheduler.workerChan {
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task")
var workStart time.Time
if err := task.DoWork(ctx); err != nil {
scheduler.log.Error().Int("worker", workerID).Err(err).Msg("scheduler: error while executing task")
var workDuration time.Duration
for task := range scheduler.workerChan {
// leave below line here (for zot minimal metrics can be enabled on first scraping)
metricsEnabled := scheduler.metricServer.IsEnabled()
scheduler.log.Debug().Int("worker", workerID).Str("task", task.String()).Msg("scheduler: starting task")
if metricsEnabled {
scheduler.tasksLock.Lock()
scheduler.tasksDoWork++
scheduler.tasksLock.Unlock()
workStart = time.Now()
}
scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: finished task")
if err := task.DoWork(ctx); err != nil {
scheduler.log.Error().Int("worker", workerID).Str("task", task.String()).Err(err).
Msg("scheduler: error while executing task")
}
if metricsEnabled {
scheduler.tasksLock.Lock()
scheduler.tasksDoWork--
scheduler.tasksLock.Unlock()
workDuration = time.Since(workStart)
monitoring.ObserveWorkersTasksDuration(scheduler.metricServer, task.Name(), workDuration)
}
scheduler.log.Debug().Int("worker", workerID).Str("task", task.String()).Msg("scheduler: finished task")
}
}(i + 1)
}
}
func (scheduler *Scheduler) metricsWorker() {
ticker := time.NewTicker(sendMetricsInterval)
for {
if scheduler.inShutdown() {
return
}
select {
case <-scheduler.metricsChan:
ticker.Stop()
return
case <-ticker.C:
genMap := make(map[string]map[string]uint64)
tasksMap := make(map[string]int)
// initialize map
for _, p := range []Priority{LowPriority, MediumPriority, HighPriority} {
priority := p.String()
genMap[priority] = make(map[string]uint64)
for _, s := range []State{Ready, Waiting, Done} {
genMap[priority][s.String()] = 0
}
}
scheduler.generatorsLock.Lock()
generators := append(append(scheduler.generators, scheduler.waitingGenerators...),
scheduler.doneGenerators...)
for _, gen := range generators {
p := gen.priority.String()
s := gen.getState().String()
genMap[p][s]++
}
// tasks queue size by priority
tasksMap[LowPriority.String()] = len(scheduler.tasksQLow)
tasksMap[MediumPriority.String()] = len(scheduler.tasksQMedium)
tasksMap[HighPriority.String()] = len(scheduler.tasksQHigh)
scheduler.generatorsLock.Unlock()
monitoring.SetSchedulerGenerators(scheduler.metricServer, genMap)
monitoring.SetSchedulerTasksQueue(scheduler.metricServer, tasksMap)
workersMap := make(map[string]int)
scheduler.tasksLock.Lock()
workersMap["idle"] = scheduler.NumWorkers - scheduler.tasksDoWork
workersMap["working"] = scheduler.tasksDoWork
scheduler.tasksLock.Unlock()
monitoring.SetSchedulerWorkers(scheduler.metricServer, workersMap)
}
}
}
func (scheduler *Scheduler) Shutdown() {
if !scheduler.inShutdown() {
scheduler.shutdown()
@ -133,6 +224,7 @@ func (scheduler *Scheduler) inShutdown() bool {
func (scheduler *Scheduler) shutdown() {
close(scheduler.workerChan)
close(scheduler.metricsChan)
scheduler.isShuttingDown.Store(true)
}
@ -147,6 +239,9 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
// start worker pool
go scheduler.poolWorker(ctx)
// periodically send metrics
go scheduler.metricsWorker()
go func() {
for {
select {
@ -166,7 +261,7 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) {
if task != nil {
// push tasks into worker pool
if !scheduler.inShutdown() {
scheduler.log.Debug().Msg("scheduler: pushing task into worker pool")
scheduler.log.Debug().Str("task", task.String()).Msg("scheduler: pushing task into worker pool")
scheduler.workerChan <- task
}
}
@ -185,7 +280,7 @@ func (scheduler *Scheduler) pushReadyGenerators() {
modified := false
for i, gen := range scheduler.waitingGenerators {
if gen.getState() == ready {
if gen.getState() == Ready {
gen.done = false
heap.Push(&scheduler.generators, gen)
scheduler.waitingGenerators = append(scheduler.waitingGenerators[:i], scheduler.waitingGenerators[i+1:]...)
@ -217,13 +312,15 @@ func (scheduler *Scheduler) generateTasks() {
var gen *generator
// check if the generator with highest prioriy is ready to run
if scheduler.generators[0].getState() == ready {
// check if the generator with highest priority is ready to run
if scheduler.generators[0].getState() == Ready {
gen = scheduler.generators[0]
} else {
gen, _ = heap.Pop(&scheduler.generators).(*generator)
if gen.getState() == waiting {
if gen.getState() == Waiting {
scheduler.waitingGenerators = append(scheduler.waitingGenerators, gen)
} else if gen.getState() == Done {
scheduler.doneGenerators = append(scheduler.doneGenerators, gen)
}
return
@ -279,7 +376,7 @@ func (scheduler *Scheduler) SubmitTask(task Task, priority Priority) {
return
}
// check if the scheduler it's still running in order to add the task to the channel
// check if the scheduler is still running in order to add the task to the channel
if scheduler.inShutdown() {
return
}
@ -302,12 +399,12 @@ const (
HighPriority
)
type state int
type State int
const (
ready state = iota
waiting
done
Ready State = iota
Waiting
Done
)
type TaskGenerator interface {
@ -342,8 +439,6 @@ func (gen *generator) generate(sch *Scheduler) {
return
}
task = nextTask
// check if the generator is done
if gen.taskGenerator.IsDone() {
gen.done = true
@ -352,6 +447,8 @@ func (gen *generator) generate(sch *Scheduler) {
return
}
task = nextTask
}
// check if it's possible to add a new task to the channel
@ -370,22 +467,22 @@ func (gen *generator) generate(sch *Scheduler) {
// if the generator is not periodic then it can be done or ready to generate a new task.
// if the generator is periodic then it can be waiting (finished its work and wait for its interval to pass)
// or ready to generate a new task.
func (gen *generator) getState() state {
func (gen *generator) getState() State {
if gen.interval == time.Duration(0) {
if gen.done && gen.remainingTask == nil {
return done
return Done
}
} else {
if gen.done && time.Since(gen.lastRun) < gen.interval && gen.remainingTask == nil {
return waiting
return Waiting
}
}
if !gen.taskGenerator.IsReady() {
return waiting
return Waiting
}
return ready
return Ready
}
func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interval time.Duration, priority Priority) {
@ -402,6 +499,8 @@ func (scheduler *Scheduler) SubmitGenerator(taskGenerator TaskGenerator, interva
// add generator to the generators priority queue
heap.Push(&scheduler.generators, newGenerator)
// force pushing this metric (for zot minimal metrics are enabled on first scraping)
monitoring.IncSchedulerGenerators(scheduler.metricServer)
}
func getNumWorkers(cfg *config.Config) int {
@ -409,5 +508,39 @@ func getNumWorkers(cfg *config.Config) int {
return cfg.Scheduler.NumWorkers
}
return runtime.NumCPU() * numWorkersMultiplier
return runtime.NumCPU() * NumWorkersMultiplier
}
func (p Priority) String() string {
var priority string
switch p {
case LowPriority:
priority = "low"
case MediumPriority:
priority = "medium"
case HighPriority:
priority = "high"
default:
priority = "invalid"
}
return priority
}
func (s State) String() string {
var status string
switch s {
case Ready:
status = "ready"
case Waiting:
status = "waiting"
case Done:
status = "done"
default:
status = "invalid"
}
return status
}

View File

@ -12,6 +12,7 @@ import (
. "github.com/smartystreets/goconvey/convey"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/extensions/monitoring"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
)
@ -34,6 +35,14 @@ func (t *task) DoWork(ctx context.Context) error {
return nil
}
func (t *task) String() string {
return t.Name()
}
func (t *task) Name() string {
return "TestTask"
}
type generator struct {
log log.Logger
priority string
@ -100,7 +109,8 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(config.New(), logger)
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(config.New(), metrics, logger)
genH := &shortGenerator{log: logger, priority: "high priority"}
// interval has to be higher than throttle value to simulate
@ -126,7 +136,8 @@ func TestScheduler(t *testing.T) {
logger := log.NewLogger("debug", logFile.Name())
cfg := config.New()
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
sch := scheduler.NewScheduler(cfg, logger)
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(cfg, metrics, logger)
genL := &generator{log: logger, priority: "low priority"}
sch.SubmitGenerator(genL, time.Duration(0), scheduler.LowPriority)
@ -160,7 +171,8 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(config.New(), logger)
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(config.New(), metrics, logger)
t := &task{log: logger, msg: "", err: true}
sch.SubmitTask(t, scheduler.MediumPriority)
@ -184,7 +196,8 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(config.New(), logger)
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(config.New(), metrics, logger)
genL := &generator{log: logger, priority: "low priority"}
sch.SubmitGenerator(genL, 20*time.Millisecond, scheduler.LowPriority)
@ -208,7 +221,8 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(config.New(), logger)
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(config.New(), metrics, logger)
t := &task{log: logger, msg: "", err: false}
sch.SubmitTask(t, -1)
@ -225,7 +239,8 @@ func TestScheduler(t *testing.T) {
defer os.Remove(logFile.Name()) // clean up
logger := log.NewLogger("debug", logFile.Name())
sch := scheduler.NewScheduler(config.New(), logger)
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(config.New(), metrics, logger)
ctx, cancel := context.WithCancel(context.Background())
@ -240,11 +255,39 @@ func TestScheduler(t *testing.T) {
So(err, ShouldBeNil)
So(string(data), ShouldNotContainSubstring, "scheduler: adding a new task")
})
Convey("Test scheduler Priority.String() method", t, func() {
var p scheduler.Priority //nolint: varnamelen
// test invalid priority
p = 6238734
So(p.String(), ShouldEqual, "invalid")
p = scheduler.LowPriority
So(p.String(), ShouldEqual, "low")
p = scheduler.MediumPriority
So(p.String(), ShouldEqual, "medium")
p = scheduler.HighPriority
So(p.String(), ShouldEqual, "high")
})
Convey("Test scheduler State.String() method", t, func() {
var s scheduler.State //nolint: varnamelen
// test invalid state
s = -67
So(s.String(), ShouldEqual, "invalid")
s = scheduler.Ready
So(s.String(), ShouldEqual, "ready")
s = scheduler.Waiting
So(s.String(), ShouldEqual, "waiting")
s = scheduler.Done
So(s.String(), ShouldEqual, "done")
})
}
func TestGetNumWorkers(t *testing.T) {
Convey("Test setting the number of workers - default value", t, func() {
sch := scheduler.NewScheduler(config.New(), log.NewLogger("debug", "logFile"))
logger := log.NewLogger("debug", "logFile")
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(config.New(), metrics, logger)
defer os.Remove("logFile")
So(sch.NumWorkers, ShouldEqual, runtime.NumCPU()*4)
})
@ -252,7 +295,9 @@ func TestGetNumWorkers(t *testing.T) {
Convey("Test setting the number of workers - getting the value from config", t, func() {
cfg := config.New()
cfg.Scheduler = &config.SchedulerConfig{NumWorkers: 3}
sch := scheduler.NewScheduler(cfg, log.NewLogger("debug", "logFile"))
logger := log.NewLogger("debug", "logFile")
metrics := monitoring.NewMetricsServer(true, logger)
sch := scheduler.NewScheduler(cfg, metrics, logger)
defer os.Remove("logFile")
So(sch.NumWorkers, ShouldEqual, 3)
})

View File

@ -1061,6 +1061,15 @@ func (dt *dedupeTask) DoWork(ctx context.Context) error {
return err
}
func (dt *dedupeTask) String() string {
return fmt.Sprintf("{Name: %s, digest: %s, dedupe: %t}",
dt.Name(), dt.digest, dt.dedupe)
}
func (dt *dedupeTask) Name() string {
return "DedupeTask"
}
type StorageMetricsInitGenerator struct {
ImgStore storageTypes.ImageStore
done bool
@ -1132,3 +1141,12 @@ func (smt *smTask) DoWork(ctx context.Context) error {
return nil
}
func (smt *smTask) String() string {
return fmt.Sprintf("{Name: \"%s\", repo: \"%s\"}",
smt.Name(), smt.repo)
}
func (smt *smTask) Name() string {
return "StorageMetricsTask"
}

View File

@ -851,3 +851,12 @@ func (gct *gcTask) DoWork(ctx context.Context) error {
// run task
return gct.gc.CleanRepo(ctx, gct.repo) //nolint: contextcheck
}
func (gct *gcTask) String() string {
return fmt.Sprintf("{Name: %s, repo: %s}",
gct.Name(), gct.repo)
}
func (gct *gcTask) Name() string {
return "GCTask"
}

View File

@ -63,7 +63,9 @@ var DeleteReferrers = config.ImageRetention{ //nolint: gochecknoglobals
var errCache = errors.New("new cache error")
func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) {
taskScheduler := scheduler.NewScheduler(config.New(), zlog.Logger{})
log := zlog.Logger{}
metrics := monitoring.NewMetricsServer(true, log)
taskScheduler := scheduler.NewScheduler(config.New(), metrics, log)
taskScheduler.RateLimit = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())

View File

@ -62,7 +62,7 @@ func cleanupStorage(store driver.StorageDriver, name string) {
func createMockStorage(rootDir string, cacheDir string, dedupe bool, store driver.StorageDriver,
) storageTypes.ImageStore {
log := log.Logger{Logger: zerolog.New(os.Stdout)}
metrics := monitoring.NewMetricsServer(false, log)
metrics := monitoring.NewMetricsServer(true, log)
var cacheDriver cache.Cache
@ -187,7 +187,9 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl
}
func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) {
taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{})
logger := log.Logger{}
metrics := monitoring.NewMetricsServer(false, logger)
taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger)
taskScheduler.RateLimit = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
@ -2048,7 +2050,9 @@ func TestRebuildDedupeIndex(t *testing.T) {
Convey("Intrerrupt rebuilding and restart, checking idempotency", func() {
for i := 0; i < 10; i++ {
taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{})
logger := log.Logger{}
metrics := monitoring.NewMetricsServer(false, logger)
taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger)
taskScheduler.RateLimit = 1 * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
@ -2087,7 +2091,9 @@ func TestRebuildDedupeIndex(t *testing.T) {
// now from dedupe false to true
for i := 0; i < 10; i++ {
taskScheduler := scheduler.NewScheduler(config.New(), log.Logger{})
logger := log.Logger{}
metrics := monitoring.NewMetricsServer(false, logger)
taskScheduler := scheduler.NewScheduler(config.New(), metrics, logger)
taskScheduler.RateLimit = 1 * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)