2022-09-23 08:27:56 +03:00
package scheduler
import (
"container/heap"
"context"
2024-01-25 19:05:47 +02:00
"math"
2023-07-04 11:03:29 +03:00
"runtime"
2022-09-23 08:27:56 +03:00
"sync"
2023-11-24 10:40:10 +02:00
"sync/atomic"
2022-09-23 08:27:56 +03:00
"time"
2024-02-01 06:34:07 +02:00
"zotregistry.dev/zot/pkg/api/config"
"zotregistry.dev/zot/pkg/extensions/monitoring"
"zotregistry.dev/zot/pkg/log"
2022-09-23 08:27:56 +03:00
)
type Task interface {
2023-09-05 19:48:56 +03:00
DoWork ( ctx context . Context ) error
2023-12-05 00:13:50 +02:00
Name ( ) string
String ( ) string
2022-09-23 08:27:56 +03:00
}
type generatorsPriorityQueue [ ] * generator
func ( pq generatorsPriorityQueue ) Len ( ) int {
return len ( pq )
}
func ( pq generatorsPriorityQueue ) Less ( i , j int ) bool {
2024-01-25 19:05:47 +02:00
return pq [ i ] . getRanking ( ) > pq [ j ] . getRanking ( )
2022-09-23 08:27:56 +03:00
}
func ( pq generatorsPriorityQueue ) Swap ( i , j int ) {
pq [ i ] , pq [ j ] = pq [ j ] , pq [ i ]
pq [ i ] . index = i
pq [ j ] . index = j
}
func ( pq * generatorsPriorityQueue ) Push ( x any ) {
n := len ( * pq )
item , ok := x . ( * generator )
if ! ok {
return
}
item . index = n
* pq = append ( * pq , item )
}
func ( pq * generatorsPriorityQueue ) Pop ( ) any {
old := * pq
n := len ( old )
item := old [ n - 1 ]
old [ n - 1 ] = nil
item . index = - 1
* pq = old [ 0 : n - 1 ]
return item
}
2023-04-07 19:49:24 +03:00
const (
rateLimiterScheduler = 400
2024-01-11 19:30:16 +02:00
rateLimit = 50 * time . Millisecond
2023-12-05 00:13:50 +02:00
NumWorkersMultiplier = 4
sendMetricsInterval = 5 * time . Second
2023-04-07 19:49:24 +03:00
)
2022-09-23 08:27:56 +03:00
type Scheduler struct {
tasksQLow chan Task
tasksQMedium chan Task
tasksQHigh chan Task
2023-12-05 00:13:50 +02:00
tasksDoWork int
tasksLock * sync . Mutex
2022-09-23 08:27:56 +03:00
generators generatorsPriorityQueue
waitingGenerators [ ] * generator
2023-12-05 00:13:50 +02:00
doneGenerators [ ] * generator
2022-09-23 08:27:56 +03:00
generatorsLock * sync . Mutex
log log . Logger
2023-04-07 19:49:24 +03:00
RateLimit time . Duration
2023-07-04 11:03:29 +03:00
NumWorkers int
2023-11-24 10:40:10 +02:00
workerChan chan Task
2023-12-05 00:13:50 +02:00
metricsChan chan struct { }
2023-11-24 10:40:10 +02:00
workerWg * sync . WaitGroup
isShuttingDown atomic . Bool
2023-12-05 00:13:50 +02:00
metricServer monitoring . MetricServer
2023-12-11 20:00:34 +02:00
cancelFunc context . CancelFunc
2022-09-23 08:27:56 +03:00
}
2023-12-05 00:13:50 +02:00
func NewScheduler ( cfg * config . Config , ms monitoring . MetricServer , logC log . Logger ) * Scheduler { //nolint: varnamelen
2022-09-23 08:27:56 +03:00
chLow := make ( chan Task , rateLimiterScheduler )
chMedium := make ( chan Task , rateLimiterScheduler )
chHigh := make ( chan Task , rateLimiterScheduler )
generatorPQ := make ( generatorsPriorityQueue , 0 )
2023-07-04 11:03:29 +03:00
numWorkers := getNumWorkers ( cfg )
2022-09-23 08:27:56 +03:00
sublogger := logC . With ( ) . Str ( "component" , "scheduler" ) . Logger ( )
heap . Init ( & generatorPQ )
2023-12-05 00:13:50 +02:00
// force pushing this metric (for zot minimal metrics are enabled on first scraping)
monitoring . SetSchedulerNumWorkers ( ms , numWorkers )
2022-09-23 08:27:56 +03:00
return & Scheduler {
tasksQLow : chLow ,
tasksQMedium : chMedium ,
tasksQHigh : chHigh ,
2023-12-05 00:13:50 +02:00
tasksDoWork : 0 , // number of tasks that are in working state
tasksLock : new ( sync . Mutex ) ,
2022-09-23 08:27:56 +03:00
generators : generatorPQ ,
generatorsLock : new ( sync . Mutex ) ,
log : log . Logger { Logger : sublogger } ,
2023-04-07 19:49:24 +03:00
// default value
2023-12-11 20:00:34 +02:00
metricServer : ms ,
2023-12-05 00:13:50 +02:00
RateLimit : rateLimit ,
NumWorkers : numWorkers ,
workerChan : make ( chan Task , numWorkers ) ,
metricsChan : make ( chan struct { } , 1 ) ,
workerWg : new ( sync . WaitGroup ) ,
2022-09-23 08:27:56 +03:00
}
}
2023-11-24 10:40:10 +02:00
func ( scheduler * Scheduler ) poolWorker ( ctx context . Context ) {
for i := 0 ; i < scheduler . NumWorkers ; i ++ {
2023-02-15 21:36:50 +02:00
go func ( workerID int ) {
2023-11-24 10:40:10 +02:00
defer scheduler . workerWg . Done ( )
2023-12-05 00:13:50 +02:00
var workStart time . Time
var workDuration time . Duration
2023-11-24 10:40:10 +02:00
for task := range scheduler . workerChan {
2023-12-05 00:13:50 +02:00
// leave below line here (for zot minimal metrics can be enabled on first scraping)
metricsEnabled := scheduler . metricServer . IsEnabled ( )
2023-12-08 00:05:02 -08:00
scheduler . log . Debug ( ) . Int ( "worker" , workerID ) . Str ( "task" , task . String ( ) ) . Msg ( "starting task" )
2023-12-05 00:13:50 +02:00
if metricsEnabled {
scheduler . tasksLock . Lock ( )
scheduler . tasksDoWork ++
scheduler . tasksLock . Unlock ( )
workStart = time . Now ( )
}
2023-02-15 21:36:50 +02:00
2023-09-05 19:48:56 +03:00
if err := task . DoWork ( ctx ) ; err != nil {
2023-12-05 00:13:50 +02:00
scheduler . log . Error ( ) . Int ( "worker" , workerID ) . Str ( "task" , task . String ( ) ) . Err ( err ) .
2023-12-08 00:05:02 -08:00
Msg ( "failed to execute task" )
2023-02-15 21:36:50 +02:00
}
2023-12-05 00:13:50 +02:00
if metricsEnabled {
scheduler . tasksLock . Lock ( )
scheduler . tasksDoWork --
scheduler . tasksLock . Unlock ( )
workDuration = time . Since ( workStart )
monitoring . ObserveWorkersTasksDuration ( scheduler . metricServer , task . Name ( ) , workDuration )
}
2023-12-08 00:05:02 -08:00
scheduler . log . Debug ( ) . Int ( "worker" , workerID ) . Str ( "task" , task . String ( ) ) . Msg ( "finished task" )
2023-02-15 21:36:50 +02:00
}
} ( i + 1 )
}
}
2022-09-23 08:27:56 +03:00
2023-12-05 00:13:50 +02:00
func ( scheduler * Scheduler ) metricsWorker ( ) {
ticker := time . NewTicker ( sendMetricsInterval )
for {
if scheduler . inShutdown ( ) {
return
}
select {
case <- scheduler . metricsChan :
ticker . Stop ( )
return
case <- ticker . C :
genMap := make ( map [ string ] map [ string ] uint64 )
tasksMap := make ( map [ string ] int )
// initialize map
for _ , p := range [ ] Priority { LowPriority , MediumPriority , HighPriority } {
priority := p . String ( )
genMap [ priority ] = make ( map [ string ] uint64 )
for _ , s := range [ ] State { Ready , Waiting , Done } {
genMap [ priority ] [ s . String ( ) ] = 0
}
}
scheduler . generatorsLock . Lock ( )
generators := append ( append ( scheduler . generators , scheduler . waitingGenerators ... ) ,
scheduler . doneGenerators ... )
for _ , gen := range generators {
p := gen . priority . String ( )
s := gen . getState ( ) . String ( )
genMap [ p ] [ s ] ++
}
// tasks queue size by priority
tasksMap [ LowPriority . String ( ) ] = len ( scheduler . tasksQLow )
tasksMap [ MediumPriority . String ( ) ] = len ( scheduler . tasksQMedium )
tasksMap [ HighPriority . String ( ) ] = len ( scheduler . tasksQHigh )
scheduler . generatorsLock . Unlock ( )
monitoring . SetSchedulerGenerators ( scheduler . metricServer , genMap )
monitoring . SetSchedulerTasksQueue ( scheduler . metricServer , tasksMap )
workersMap := make ( map [ string ] int )
scheduler . tasksLock . Lock ( )
workersMap [ "idle" ] = scheduler . NumWorkers - scheduler . tasksDoWork
workersMap [ "working" ] = scheduler . tasksDoWork
scheduler . tasksLock . Unlock ( )
monitoring . SetSchedulerWorkers ( scheduler . metricServer , workersMap )
}
}
}
2023-12-11 20:00:34 +02:00
/ *
Scheduler can be stopped by calling Shutdown ( ) .
it will wait for all tasks being run to finish their work before exiting .
* /
2023-11-24 10:40:10 +02:00
func ( scheduler * Scheduler ) Shutdown ( ) {
2023-12-11 20:00:34 +02:00
defer scheduler . workerWg . Wait ( )
2023-11-24 10:40:10 +02:00
if ! scheduler . inShutdown ( ) {
scheduler . shutdown ( )
}
}
func ( scheduler * Scheduler ) inShutdown ( ) bool {
return scheduler . isShuttingDown . Load ( )
}
func ( scheduler * Scheduler ) shutdown ( ) {
scheduler . isShuttingDown . Store ( true )
2023-12-11 20:00:34 +02:00
scheduler . cancelFunc ( )
close ( scheduler . metricsChan )
2023-11-24 10:40:10 +02:00
}
2023-12-11 20:00:34 +02:00
func ( scheduler * Scheduler ) RunScheduler ( ) {
/ * This context is passed to all task generators
calling scheduler . Shutdown ( ) will cancel this context and will wait for all tasks
to finish their work gracefully . * /
ctx , cancel := context . WithCancel ( context . Background ( ) )
scheduler . cancelFunc = cancel
2024-01-11 19:30:16 +02:00
throttle := time . NewTicker ( scheduler . RateLimit ) . C
2023-07-04 11:03:29 +03:00
numWorkers := scheduler . NumWorkers
2023-11-24 10:40:10 +02:00
// wait all workers to finish their work before exiting from Shutdown()
scheduler . workerWg . Add ( numWorkers )
2023-02-15 21:36:50 +02:00
// start worker pool
2023-11-24 10:40:10 +02:00
go scheduler . poolWorker ( ctx )
2022-09-23 08:27:56 +03:00
2023-12-05 00:13:50 +02:00
// periodically send metrics
go scheduler . metricsWorker ( )
2022-09-23 08:27:56 +03:00
go func ( ) {
2023-12-11 20:00:34 +02:00
// will close workers chan when either ctx is canceled or scheduler.Shutdown()
defer close ( scheduler . workerChan )
2022-09-23 08:27:56 +03:00
for {
select {
case <- ctx . Done ( ) :
2023-11-24 10:40:10 +02:00
if ! scheduler . inShutdown ( ) {
scheduler . shutdown ( )
}
2022-09-23 08:27:56 +03:00
2023-12-08 00:05:02 -08:00
scheduler . log . Debug ( ) . Msg ( "received stop signal, gracefully shutting down..." )
2023-04-07 19:49:24 +03:00
2022-09-23 08:27:56 +03:00
return
default :
2023-12-11 20:00:34 +02:00
// we don't want to block on sending task in workerChan.
if len ( scheduler . workerChan ) == scheduler . NumWorkers {
<- throttle
continue
2022-09-23 08:27:56 +03:00
}
2023-02-15 21:36:50 +02:00
2023-12-11 20:00:34 +02:00
task := scheduler . getTask ( )
2024-01-11 19:30:16 +02:00
if task == nil {
<- throttle
continue
2023-12-11 20:00:34 +02:00
}
2024-01-11 19:30:16 +02:00
// push tasks into worker pool until workerChan is full.
scheduler . workerChan <- task
2023-12-11 20:00:34 +02:00
}
2022-09-23 08:27:56 +03:00
}
} ( )
}
func ( scheduler * Scheduler ) pushReadyGenerators ( ) {
// iterate through waiting generators list and resubmit those which become ready to run
2022-10-21 18:33:22 +03:00
for {
modified := false
for i , gen := range scheduler . waitingGenerators {
2023-12-05 00:13:50 +02:00
if gen . getState ( ) == Ready {
2022-10-21 18:33:22 +03:00
gen . done = false
heap . Push ( & scheduler . generators , gen )
scheduler . waitingGenerators = append ( scheduler . waitingGenerators [ : i ] , scheduler . waitingGenerators [ i + 1 : ] ... )
modified = true
2024-02-01 19:15:53 +02:00
scheduler . log . Debug ( ) . Str ( "generator" , gen . taskGenerator . Name ( ) ) .
Msg ( "waiting generator is ready, pushing to ready generators" )
2022-10-21 18:33:22 +03:00
break
}
}
if ! modified {
break
2022-09-23 08:27:56 +03:00
}
}
}
func ( scheduler * Scheduler ) generateTasks ( ) {
scheduler . generatorsLock . Lock ( )
defer scheduler . generatorsLock . Unlock ( )
// resubmit ready generators(which were in a waiting state) to generators priority queue
scheduler . pushReadyGenerators ( )
// get the highest priority generator from queue
if scheduler . generators . Len ( ) == 0 {
return
}
var gen * generator
2023-12-05 00:13:50 +02:00
// check if the generator with highest priority is ready to run
if scheduler . generators [ 0 ] . getState ( ) == Ready {
2024-01-25 19:05:47 +02:00
// we are not popping it as we will generate multiple tasks until it is done
// we are going to pop after all tasks are generated
2022-09-23 08:27:56 +03:00
gen = scheduler . generators [ 0 ]
2024-01-25 19:05:47 +02:00
// trigger a generator reorder, as generating a task may impact the order
// equivalent of pop/remove followed by push, but more efficient
heap . Fix ( & scheduler . generators , 0 )
2022-09-23 08:27:56 +03:00
} else {
gen , _ = heap . Pop ( & scheduler . generators ) . ( * generator )
2023-12-05 00:13:50 +02:00
if gen . getState ( ) == Waiting {
2022-09-23 08:27:56 +03:00
scheduler . waitingGenerators = append ( scheduler . waitingGenerators , gen )
2023-12-05 00:13:50 +02:00
} else if gen . getState ( ) == Done {
scheduler . doneGenerators = append ( scheduler . doneGenerators , gen )
2022-09-23 08:27:56 +03:00
}
return
}
// run generator to generate a new task which will be added to a channel by priority
gen . generate ( scheduler )
}
func ( scheduler * Scheduler ) getTask ( ) Task {
// first, generate a task with highest possible priority
scheduler . generateTasks ( )
// then, return a task with highest possible priority
select {
case t := <- scheduler . tasksQHigh :
return t
default :
}
select {
case t := <- scheduler . tasksQMedium :
return t
default :
}
select {
case t := <- scheduler . tasksQLow :
return t
default :
}
return nil
}
func ( scheduler * Scheduler ) getTasksChannelByPriority ( priority Priority ) chan Task {
switch priority {
case LowPriority :
return scheduler . tasksQLow
case MediumPriority :
return scheduler . tasksQMedium
case HighPriority :
return scheduler . tasksQHigh
}
return nil
}
func ( scheduler * Scheduler ) SubmitTask ( task Task , priority Priority ) {
// get by priority the channel where the task should be added to
tasksQ := scheduler . getTasksChannelByPriority ( priority )
if tasksQ == nil {
return
}
2023-12-05 00:13:50 +02:00
// check if the scheduler is still running in order to add the task to the channel
2023-11-24 10:40:10 +02:00
if scheduler . inShutdown ( ) {
2022-09-23 08:27:56 +03:00
return
}
select {
case tasksQ <- task :
2023-12-08 00:05:02 -08:00
scheduler . log . Info ( ) . Msg ( "adding a new task" )
2023-11-24 10:40:10 +02:00
default :
if scheduler . inShutdown ( ) {
return
}
2022-09-23 08:27:56 +03:00
}
}
type Priority int
const (
LowPriority Priority = iota
MediumPriority
HighPriority
)
2023-12-05 00:13:50 +02:00
type State int
2022-09-23 08:27:56 +03:00
const (
2023-12-05 00:13:50 +02:00
Ready State = iota
Waiting
Done
2022-09-23 08:27:56 +03:00
)
2023-07-04 11:03:29 +03:00
type TaskGenerator interface {
Next ( ) ( Task , error )
2022-09-23 08:27:56 +03:00
IsDone ( ) bool
2023-08-07 22:55:19 +03:00
IsReady ( ) bool
2024-02-01 19:15:53 +02:00
Name ( ) string
2022-09-23 08:27:56 +03:00
Reset ( )
}
type generator struct {
interval time . Duration
lastRun time . Time
done bool
priority Priority
2023-07-04 11:03:29 +03:00
taskGenerator TaskGenerator
2022-09-23 08:27:56 +03:00
remainingTask Task
index int
2024-01-25 19:05:47 +02:00
taskCount int64
2022-09-23 08:27:56 +03:00
}
func ( gen * generator ) generate ( sch * Scheduler ) {
// get by priority the channel where the new generated task should be added to
taskQ := sch . getTasksChannelByPriority ( gen . priority )
task := gen . remainingTask
// in case there is no task already generated, generate a new task
if gen . remainingTask == nil {
2023-07-04 11:03:29 +03:00
nextTask , err := gen . taskGenerator . Next ( )
2022-09-23 08:27:56 +03:00
if err != nil {
2024-02-01 19:15:53 +02:00
sch . log . Error ( ) . Err ( err ) . Str ( "generator" , gen . taskGenerator . Name ( ) ) .
Msg ( "failed to execute generator" )
2022-09-23 08:27:56 +03:00
return
}
// check if the generator is done
if gen . taskGenerator . IsDone ( ) {
gen . done = true
gen . lastRun = time . Now ( )
2024-01-25 19:05:47 +02:00
gen . taskCount = 0
2022-09-23 08:27:56 +03:00
gen . taskGenerator . Reset ( )
2024-02-01 19:15:53 +02:00
sch . log . Debug ( ) . Str ( "generator" , gen . taskGenerator . Name ( ) ) .
Msg ( "generator is done" )
2022-09-23 08:27:56 +03:00
return
}
2023-12-05 00:13:50 +02:00
task = nextTask
2022-09-23 08:27:56 +03:00
}
2024-01-25 19:05:47 +02:00
// keep track of generated task count to use it for generator ordering
gen . taskCount ++
2022-09-23 08:27:56 +03:00
// check if it's possible to add a new task to the channel
// if not, keep the generated task and retry to add it next time
select {
case taskQ <- task :
gen . remainingTask = nil
return
default :
gen . remainingTask = task
}
}
// getState() returns the state of a generator.
// if the generator is not periodic then it can be done or ready to generate a new task.
// if the generator is periodic then it can be waiting (finished its work and wait for its interval to pass)
// or ready to generate a new task.
2023-12-05 00:13:50 +02:00
func ( gen * generator ) getState ( ) State {
2022-09-23 08:27:56 +03:00
if gen . interval == time . Duration ( 0 ) {
if gen . done && gen . remainingTask == nil {
2023-12-05 00:13:50 +02:00
return Done
2022-09-23 08:27:56 +03:00
}
} else {
if gen . done && time . Since ( gen . lastRun ) < gen . interval && gen . remainingTask == nil {
2023-12-05 00:13:50 +02:00
return Waiting
2022-09-23 08:27:56 +03:00
}
}
2023-08-07 22:55:19 +03:00
if ! gen . taskGenerator . IsReady ( ) {
2023-12-05 00:13:50 +02:00
return Waiting
2023-08-07 22:55:19 +03:00
}
2023-12-05 00:13:50 +02:00
return Ready
2022-09-23 08:27:56 +03:00
}
2024-01-25 19:05:47 +02:00
func ( gen * generator ) getRanking ( ) float64 {
// take into account the priority, but also how many tasks of
// a specific generator were executed in the current generator run
return math . Pow ( 10 , float64 ( gen . priority ) ) / ( 1 + float64 ( gen . taskCount ) ) //nolint:gomnd
}
2023-07-04 11:03:29 +03:00
func ( scheduler * Scheduler ) SubmitGenerator ( taskGenerator TaskGenerator , interval time . Duration , priority Priority ) {
2022-09-23 08:27:56 +03:00
newGenerator := & generator {
interval : interval ,
done : false ,
priority : priority ,
taskGenerator : taskGenerator ,
2024-01-25 19:05:47 +02:00
taskCount : 0 ,
2022-09-23 08:27:56 +03:00
remainingTask : nil ,
}
scheduler . generatorsLock . Lock ( )
defer scheduler . generatorsLock . Unlock ( )
// add generator to the generators priority queue
heap . Push ( & scheduler . generators , newGenerator )
2023-12-05 00:13:50 +02:00
// force pushing this metric (for zot minimal metrics are enabled on first scraping)
monitoring . IncSchedulerGenerators ( scheduler . metricServer )
2022-09-23 08:27:56 +03:00
}
2023-07-04 11:03:29 +03:00
func getNumWorkers ( cfg * config . Config ) int {
if cfg . Scheduler != nil && cfg . Scheduler . NumWorkers != 0 {
return cfg . Scheduler . NumWorkers
}
2023-12-05 00:13:50 +02:00
return runtime . NumCPU ( ) * NumWorkersMultiplier
}
func ( p Priority ) String ( ) string {
var priority string
switch p {
case LowPriority :
priority = "low"
case MediumPriority :
priority = "medium"
case HighPriority :
priority = "high"
default :
priority = "invalid"
}
return priority
}
func ( s State ) String ( ) string {
var status string
switch s {
case Ready :
status = "ready"
case Waiting :
status = "waiting"
case Done :
status = "done"
default :
status = "invalid"
}
return status
2023-07-04 11:03:29 +03:00
}