2022-09-23 08:27:56 +03:00
package scheduler_test
import (
"context"
"errors"
"fmt"
"os"
2023-07-04 11:03:29 +03:00
"runtime"
2024-01-25 19:05:47 +02:00
"strings"
2022-09-23 08:27:56 +03:00
"testing"
"time"
. "github.com/smartystreets/goconvey/convey"
2022-10-20 19:39:20 +03:00
2024-02-01 06:34:07 +02:00
"zotregistry.dev/zot/pkg/api/config"
"zotregistry.dev/zot/pkg/extensions/monitoring"
"zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/scheduler"
2022-09-23 08:27:56 +03:00
)
type task struct {
2024-01-25 19:05:47 +02:00
log log . Logger
msg string
err bool
delay time . Duration
2022-09-23 08:27:56 +03:00
}
var errInternal = errors . New ( "task: internal error" )
2023-09-05 19:48:56 +03:00
func ( t * task ) DoWork ( ctx context . Context ) error {
2022-09-23 08:27:56 +03:00
if t . err {
return errInternal
}
2023-12-11 20:00:34 +02:00
for idx := 0 ; idx < 5 ; idx ++ {
if ctx . Err ( ) != nil {
return ctx . Err ( )
}
2024-01-25 19:05:47 +02:00
time . Sleep ( t . delay )
2023-12-11 20:00:34 +02:00
}
2022-09-23 08:27:56 +03:00
t . log . Info ( ) . Msg ( t . msg )
return nil
}
2023-12-05 00:13:50 +02:00
func ( t * task ) String ( ) string {
return t . Name ( )
}
func ( t * task ) Name ( ) string {
return "TestTask"
}
2022-09-23 08:27:56 +03:00
type generator struct {
2024-01-25 19:05:47 +02:00
log log . Logger
priority string
done bool
index int
step int
limit int
taskDelay time . Duration
2022-09-23 08:27:56 +03:00
}
2024-02-01 19:15:53 +02:00
func ( g * generator ) Name ( ) string {
return "TestGenerator"
}
2023-07-04 11:03:29 +03:00
func ( g * generator ) Next ( ) ( scheduler . Task , error ) {
2024-01-25 19:05:47 +02:00
if g . step > g . limit {
2022-09-23 08:27:56 +03:00
g . done = true
}
g . step ++
g . index ++
2023-12-11 20:00:34 +02:00
if g . step % 11 == 0 {
return nil , nil
}
if g . step % 13 == 0 {
return nil , errInternal
}
2024-01-25 19:05:47 +02:00
return & task {
log : g . log ,
msg : fmt . Sprintf ( "executing %s task; index: %d" , g . priority , g . index ) ,
err : false ,
delay : g . taskDelay ,
} , nil
2022-09-23 08:27:56 +03:00
}
func ( g * generator ) IsDone ( ) bool {
return g . done
}
2023-08-07 22:55:19 +03:00
func ( g * generator ) IsReady ( ) bool {
return true
}
2022-09-23 08:27:56 +03:00
func ( g * generator ) Reset ( ) {
g . done = false
g . step = 0
}
2022-10-21 18:33:22 +03:00
type shortGenerator struct {
log log . Logger
priority string
done bool
index int
step int
}
2024-02-01 19:15:53 +02:00
func ( g * shortGenerator ) Name ( ) string {
return "ShortTestGenerator"
}
2023-07-04 11:03:29 +03:00
func ( g * shortGenerator ) Next ( ) ( scheduler . Task , error ) {
2022-10-21 18:33:22 +03:00
g . done = true
return & task { log : g . log , msg : fmt . Sprintf ( "executing %s task; index: %d" , g . priority , g . index ) , err : false } , nil
}
func ( g * shortGenerator ) IsDone ( ) bool {
return g . done
}
2023-08-07 22:55:19 +03:00
func ( g * shortGenerator ) IsReady ( ) bool {
return true
}
2022-10-21 18:33:22 +03:00
func ( g * shortGenerator ) Reset ( ) {
g . done = true
g . step = 0
}
2022-09-23 08:27:56 +03:00
func TestScheduler ( t * testing . T ) {
2022-10-21 18:33:22 +03:00
Convey ( "Test active to waiting periodic generator" , t , func ( ) {
logFile , err := os . CreateTemp ( "" , "zot-log*.txt" )
So ( err , ShouldBeNil )
defer os . Remove ( logFile . Name ( ) ) // clean up
logger := log . NewLogger ( "debug" , logFile . Name ( ) )
2023-12-05 00:13:50 +02:00
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( config . New ( ) , metrics , logger )
2022-10-21 18:33:22 +03:00
genH := & shortGenerator { log : logger , priority : "high priority" }
// interval has to be higher than throttle value to simulate
2024-01-11 19:30:16 +02:00
sch . SubmitGenerator ( genH , 1 * time . Second , scheduler . HighPriority )
2022-10-21 18:33:22 +03:00
2023-12-11 20:00:34 +02:00
sch . RunScheduler ( )
2024-01-11 19:30:16 +02:00
time . Sleep ( 2 * time . Second )
2023-12-11 20:00:34 +02:00
sch . Shutdown ( )
2022-10-21 18:33:22 +03:00
data , err := os . ReadFile ( logFile . Name ( ) )
So ( err , ShouldBeNil )
So ( string ( data ) , ShouldContainSubstring , "waiting generator is ready, pushing to ready generators" )
} )
2022-09-23 08:27:56 +03:00
Convey ( "Test order of generators in queue" , t , func ( ) {
logFile , err := os . CreateTemp ( "" , "zot-log*.txt" )
So ( err , ShouldBeNil )
defer os . Remove ( logFile . Name ( ) ) // clean up
logger := log . NewLogger ( "debug" , logFile . Name ( ) )
2023-07-04 11:03:29 +03:00
cfg := config . New ( )
cfg . Scheduler = & config . SchedulerConfig { NumWorkers : 3 }
2023-12-05 00:13:50 +02:00
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( cfg , metrics , logger )
2024-01-11 19:30:16 +02:00
sch . RateLimit = 5 * time . Second
2022-09-23 08:27:56 +03:00
2024-01-25 19:05:47 +02:00
genL := & generator { log : logger , priority : "low priority" , limit : 100 , taskDelay : 100 * time . Millisecond }
2022-09-23 08:27:56 +03:00
sch . SubmitGenerator ( genL , time . Duration ( 0 ) , scheduler . LowPriority )
2024-01-25 19:05:47 +02:00
genM := & generator { log : logger , priority : "medium priority" , limit : 100 , taskDelay : 100 * time . Millisecond }
2023-02-15 21:36:50 +02:00
sch . SubmitGenerator ( genM , time . Duration ( 0 ) , scheduler . MediumPriority )
2024-01-25 19:05:47 +02:00
genH := & generator { log : logger , priority : "high priority" , limit : 100 , taskDelay : 100 * time . Millisecond }
2022-09-23 08:27:56 +03:00
sch . SubmitGenerator ( genH , time . Duration ( 0 ) , scheduler . HighPriority )
2023-12-11 20:00:34 +02:00
sch . RunScheduler ( )
2023-02-15 21:36:50 +02:00
time . Sleep ( 4 * time . Second )
2023-12-11 20:00:34 +02:00
sch . Shutdown ( )
2022-09-23 08:27:56 +03:00
data , err := os . ReadFile ( logFile . Name ( ) )
So ( err , ShouldBeNil )
2023-02-15 21:36:50 +02:00
2022-09-23 08:27:56 +03:00
So ( string ( data ) , ShouldContainSubstring , "executing high priority task; index: 1" )
2023-02-15 21:36:50 +02:00
So ( string ( data ) , ShouldContainSubstring , "executing high priority task; index: 2" )
So ( string ( data ) , ShouldNotContainSubstring , "executing medium priority task; index: 1" )
2023-12-08 00:05:02 -08:00
So ( string ( data ) , ShouldNotContainSubstring , "failed to execute task" )
2022-09-23 08:27:56 +03:00
} )
2024-01-25 19:05:47 +02:00
Convey ( "Test reordering of generators in queue" , t , func ( ) {
logFile , err := os . CreateTemp ( "" , "zot-log*.txt" )
So ( err , ShouldBeNil )
defer os . Remove ( logFile . Name ( ) ) // clean up
logger := log . NewLogger ( "debug" , logFile . Name ( ) )
cfg := config . New ( )
cfg . Scheduler = & config . SchedulerConfig { NumWorkers : 3 }
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( cfg , metrics , logger )
sch . RateLimit = 1 * time . Nanosecond
// Testing repordering of generators using the same medium priority, as well as reordering with
// a low priority generator
genL := & generator { log : logger , priority : "low priority" , limit : 110 , taskDelay : time . Nanosecond }
sch . SubmitGenerator ( genL , time . Duration ( 0 ) , scheduler . LowPriority )
genM := & generator { log : logger , priority : "medium 1 priority" , limit : 110 , taskDelay : time . Nanosecond }
sch . SubmitGenerator ( genM , time . Duration ( 0 ) , scheduler . MediumPriority )
genH := & generator { log : logger , priority : "medium 2 priority" , limit : 110 , taskDelay : time . Nanosecond }
sch . SubmitGenerator ( genH , time . Duration ( 0 ) , scheduler . MediumPriority )
sch . RunScheduler ( )
time . Sleep ( 1 * time . Second )
sch . Shutdown ( )
data , err := os . ReadFile ( logFile . Name ( ) )
So ( err , ShouldBeNil )
// Check all tasks show up in the logs
for i := 1 ; i < 110 ; i ++ {
if i % 11 == 0 || i % 13 == 0 {
continue
}
So ( string ( data ) , ShouldContainSubstring , fmt . Sprintf ( "executing medium 1 priority task; index: %d" , i ) )
So ( string ( data ) , ShouldContainSubstring , fmt . Sprintf ( "executing medium 2 priority task; index: %d" , i ) )
So ( string ( data ) , ShouldContainSubstring , fmt . Sprintf ( "executing low priority task; index: %d" , i ) )
}
taskCounter := 0
priorityFlippedCounter := 0
samePriorityFlippedCounter := 0
lastPriority := "medium"
lastMediumGenerator := "1"
for _ , line := range strings . Split ( strings . TrimSuffix ( string ( data ) , "\n" ) , "\n" ) {
if ! strings . Contains ( line , "priority task; index: " ) {
continue
}
taskCounter ++
// low priority tasks start executing later
// medium priority generators are prioritized until the rank 100/9 (8 generated tasks)
// starting with 100/10, a low priority generator could potentially be prioritized instead
// there will be at least 8 * 2 medium priority tasks executed before low priority tasks are pushed
if taskCounter < 17 {
So ( line , ShouldContainSubstring , "executing medium" )
}
// medium priority 2*110 medium priority tasks should have been generated,
// medium priority generators should be done
// add around 10 low priority tasks to the counter
// and an additional margin of 5 to make sure the test is stable
if taskCounter > 225 {
So ( line , ShouldContainSubstring , "executing low priority" )
}
if strings . Contains ( line , "executing medium" ) {
if ! strings . Contains ( line , fmt . Sprintf ( "executing medium %s" , lastMediumGenerator ) ) {
samePriorityFlippedCounter ++
if lastMediumGenerator == "1" {
lastMediumGenerator = "2"
} else {
lastMediumGenerator = "1"
}
}
}
if ! strings . Contains ( line , fmt . Sprintf ( "executing %s" , lastPriority ) ) {
priorityFlippedCounter ++
if lastPriority == "low" {
lastPriority = "medium"
} else {
lastPriority = "low"
}
}
}
// fairness: make sure none of the medium priority generators is favored by the algorithm
So ( samePriorityFlippedCounter , ShouldBeGreaterThanOrEqualTo , 60 )
t . Logf ( "Switched between medium priority generators %d times" , samePriorityFlippedCounter )
// fairness: make sure the algorithm alternates between generator priorities
So ( priorityFlippedCounter , ShouldBeGreaterThanOrEqualTo , 10 )
t . Logf ( "Switched between generator priorities %d times" , priorityFlippedCounter )
} )
2022-09-23 08:27:56 +03:00
Convey ( "Test task returning an error" , t , func ( ) {
logFile , err := os . CreateTemp ( "" , "zot-log*.txt" )
So ( err , ShouldBeNil )
defer os . Remove ( logFile . Name ( ) ) // clean up
logger := log . NewLogger ( "debug" , logFile . Name ( ) )
2023-12-05 00:13:50 +02:00
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( config . New ( ) , metrics , logger )
2022-09-23 08:27:56 +03:00
t := & task { log : logger , msg : "" , err : true }
sch . SubmitTask ( t , scheduler . MediumPriority )
2023-12-11 20:00:34 +02:00
sch . RunScheduler ( )
2022-09-23 08:27:56 +03:00
time . Sleep ( 500 * time . Millisecond )
2023-12-11 20:00:34 +02:00
sch . Shutdown ( )
2022-09-23 08:27:56 +03:00
data , err := os . ReadFile ( logFile . Name ( ) )
So ( err , ShouldBeNil )
2023-12-08 00:05:02 -08:00
So ( string ( data ) , ShouldContainSubstring , "adding a new task" )
So ( string ( data ) , ShouldContainSubstring , "failed to execute task" )
2022-09-23 08:27:56 +03:00
} )
Convey ( "Test resubmit generator" , t , func ( ) {
logFile , err := os . CreateTemp ( "" , "zot-log*.txt" )
So ( err , ShouldBeNil )
defer os . Remove ( logFile . Name ( ) ) // clean up
logger := log . NewLogger ( "debug" , logFile . Name ( ) )
2023-12-05 00:13:50 +02:00
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( config . New ( ) , metrics , logger )
2022-09-23 08:27:56 +03:00
2024-01-25 19:05:47 +02:00
genL := & generator { log : logger , priority : "low priority" , limit : 100 , taskDelay : 100 * time . Millisecond }
2022-09-23 08:27:56 +03:00
sch . SubmitGenerator ( genL , 20 * time . Millisecond , scheduler . LowPriority )
2023-12-11 20:00:34 +02:00
sch . RunScheduler ( )
2024-01-11 19:30:16 +02:00
time . Sleep ( 1 * time . Second )
2023-12-11 20:00:34 +02:00
sch . Shutdown ( )
2022-09-23 08:27:56 +03:00
data , err := os . ReadFile ( logFile . Name ( ) )
So ( err , ShouldBeNil )
So ( string ( data ) , ShouldContainSubstring , "executing low priority task; index: 1" )
So ( string ( data ) , ShouldContainSubstring , "executing low priority task; index: 2" )
} )
Convey ( "Try to add a task with wrong priority" , t , func ( ) {
logFile , err := os . CreateTemp ( "" , "zot-log*.txt" )
So ( err , ShouldBeNil )
defer os . Remove ( logFile . Name ( ) ) // clean up
logger := log . NewLogger ( "debug" , logFile . Name ( ) )
2023-12-05 00:13:50 +02:00
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( config . New ( ) , metrics , logger )
2022-09-23 08:27:56 +03:00
t := & task { log : logger , msg : "" , err : false }
sch . SubmitTask ( t , - 1 )
data , err := os . ReadFile ( logFile . Name ( ) )
So ( err , ShouldBeNil )
2023-12-08 00:05:02 -08:00
So ( string ( data ) , ShouldNotContainSubstring , "adding a new task" )
2022-09-23 08:27:56 +03:00
} )
Convey ( "Test adding a new task when context is done" , t , func ( ) {
logFile , err := os . CreateTemp ( "" , "zot-log*.txt" )
So ( err , ShouldBeNil )
defer os . Remove ( logFile . Name ( ) ) // clean up
logger := log . NewLogger ( "debug" , logFile . Name ( ) )
2023-12-05 00:13:50 +02:00
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( config . New ( ) , metrics , logger )
2022-09-23 08:27:56 +03:00
2023-12-11 20:00:34 +02:00
sch . RunScheduler ( )
sch . Shutdown ( )
2022-09-23 08:27:56 +03:00
time . Sleep ( 500 * time . Millisecond )
t := & task { log : logger , msg : "" , err : false }
sch . SubmitTask ( t , scheduler . LowPriority )
data , err := os . ReadFile ( logFile . Name ( ) )
So ( err , ShouldBeNil )
2023-12-08 00:05:02 -08:00
So ( string ( data ) , ShouldNotContainSubstring , "adding a new task" )
2022-09-23 08:27:56 +03:00
} )
2023-12-05 00:13:50 +02:00
2023-12-11 20:00:34 +02:00
Convey ( "Test stopping scheduler by calling Shutdown()" , t , func ( ) {
logFile , err := os . CreateTemp ( "" , "zot-log*.txt" )
So ( err , ShouldBeNil )
defer os . Remove ( logFile . Name ( ) ) // clean up
logger := log . NewLogger ( "debug" , logFile . Name ( ) )
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( config . New ( ) , metrics , logger )
2024-01-25 19:05:47 +02:00
genL := & generator { log : logger , priority : "medium priority" , limit : 100 , taskDelay : 100 * time . Millisecond }
2023-12-11 20:00:34 +02:00
sch . SubmitGenerator ( genL , 20 * time . Millisecond , scheduler . MediumPriority )
sch . RunScheduler ( )
2024-01-11 19:30:16 +02:00
time . Sleep ( 1 * time . Second )
2023-12-11 20:00:34 +02:00
sch . Shutdown ( )
data , err := os . ReadFile ( logFile . Name ( ) )
So ( err , ShouldBeNil )
So ( string ( data ) , ShouldContainSubstring , "executing medium priority task; index: 1" )
So ( string ( data ) , ShouldContainSubstring , "executing medium priority task; index: 2" )
So ( string ( data ) , ShouldContainSubstring , "received stop signal, gracefully shutting down..." )
} )
2023-12-05 00:13:50 +02:00
Convey ( "Test scheduler Priority.String() method" , t , func ( ) {
var p scheduler . Priority //nolint: varnamelen
// test invalid priority
p = 6238734
So ( p . String ( ) , ShouldEqual , "invalid" )
p = scheduler . LowPriority
So ( p . String ( ) , ShouldEqual , "low" )
p = scheduler . MediumPriority
So ( p . String ( ) , ShouldEqual , "medium" )
p = scheduler . HighPriority
So ( p . String ( ) , ShouldEqual , "high" )
} )
Convey ( "Test scheduler State.String() method" , t , func ( ) {
var s scheduler . State //nolint: varnamelen
// test invalid state
s = - 67
So ( s . String ( ) , ShouldEqual , "invalid" )
s = scheduler . Ready
So ( s . String ( ) , ShouldEqual , "ready" )
s = scheduler . Waiting
So ( s . String ( ) , ShouldEqual , "waiting" )
s = scheduler . Done
So ( s . String ( ) , ShouldEqual , "done" )
} )
2022-09-23 08:27:56 +03:00
}
2023-07-04 11:03:29 +03:00
func TestGetNumWorkers ( t * testing . T ) {
Convey ( "Test setting the number of workers - default value" , t , func ( ) {
2023-12-05 00:13:50 +02:00
logger := log . NewLogger ( "debug" , "logFile" )
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( config . New ( ) , metrics , logger )
2023-09-22 16:33:18 +03:00
defer os . Remove ( "logFile" )
2023-07-04 11:03:29 +03:00
So ( sch . NumWorkers , ShouldEqual , runtime . NumCPU ( ) * 4 )
} )
Convey ( "Test setting the number of workers - getting the value from config" , t , func ( ) {
cfg := config . New ( )
cfg . Scheduler = & config . SchedulerConfig { NumWorkers : 3 }
2023-12-05 00:13:50 +02:00
logger := log . NewLogger ( "debug" , "logFile" )
metrics := monitoring . NewMetricsServer ( true , logger )
sch := scheduler . NewScheduler ( cfg , metrics , logger )
2023-09-22 16:33:18 +03:00
defer os . Remove ( "logFile" )
2023-07-04 11:03:29 +03:00
So ( sch . NumWorkers , ShouldEqual , 3 )
} )
}