2020-01-07 14:23:09 +03:00
// Copyright 2019 The Gitea Authors. All rights reserved.
2022-11-27 21:20:29 +03:00
// SPDX-License-Identifier: MIT
2020-01-07 14:23:09 +03:00
package queue
import (
"context"
2022-03-31 20:01:43 +03:00
"fmt"
"runtime/pprof"
2020-01-07 14:23:09 +03:00
"sync"
2020-01-29 04:01:06 +03:00
"sync/atomic"
2020-01-07 14:23:09 +03:00
"time"
"code.gitea.io/gitea/modules/log"
2022-03-31 20:01:43 +03:00
"code.gitea.io/gitea/modules/process"
2020-05-08 18:46:05 +03:00
"code.gitea.io/gitea/modules/util"
2020-01-07 14:23:09 +03:00
)
2020-01-29 04:01:06 +03:00
// WorkerPool represent a dynamically growable worker pool for a
// provided handler function. They have an internal channel which
// they use to detect if there is a block and will grow and shrink in
// response to demand as per configuration.
2020-01-07 14:23:09 +03:00
type WorkerPool struct {
2022-04-27 18:32:04 +03:00
// This field requires to be the first one in the struct.
// This is to allow 64 bit atomic operations on 32-bit machines.
// See: https://pkg.go.dev/sync/atomic#pkg-note-BUG & Gitea issue 19518
numInQueue int64
2020-01-07 14:23:09 +03:00
lock sync . Mutex
baseCtx context . Context
2021-05-15 17:22:26 +03:00
baseCtxCancel context . CancelFunc
2022-03-31 20:01:43 +03:00
baseCtxFinished process . FinishedFunc
2022-01-23 00:22:14 +03:00
paused chan struct { }
resumed chan struct { }
2020-01-07 14:23:09 +03:00
cond * sync . Cond
qid int64
maxNumberOfWorkers int
numberOfWorkers int
batchLength int
handle HandlerFunc
dataChan chan Data
blockTimeout time . Duration
boostTimeout time . Duration
boostWorkers int
2020-01-29 04:01:06 +03:00
}
2022-01-23 00:22:14 +03:00
var (
_ Flushable = & WorkerPool { }
_ ManagedPool = & WorkerPool { }
)
2020-01-29 04:01:06 +03:00
// WorkerPoolConfiguration is the basic configuration for a WorkerPool
type WorkerPoolConfiguration struct {
2022-03-31 20:01:43 +03:00
Name string
2020-01-29 04:01:06 +03:00
QueueLength int
BatchLength int
BlockTimeout time . Duration
BoostTimeout time . Duration
BoostWorkers int
MaxWorkers int
}
// NewWorkerPool creates a new worker pool
func NewWorkerPool ( handle HandlerFunc , config WorkerPoolConfiguration ) * WorkerPool {
2022-03-31 20:01:43 +03:00
ctx , cancel , finished := process . GetManager ( ) . AddTypedContext ( context . Background ( ) , fmt . Sprintf ( "Queue: %s" , config . Name ) , process . SystemProcessType , false )
2020-01-29 04:01:06 +03:00
dataChan := make ( chan Data , config . QueueLength )
pool := & WorkerPool {
baseCtx : ctx ,
2021-05-15 17:22:26 +03:00
baseCtxCancel : cancel ,
2022-03-31 20:01:43 +03:00
baseCtxFinished : finished ,
2020-01-29 04:01:06 +03:00
batchLength : config . BatchLength ,
dataChan : dataChan ,
2022-01-25 01:54:35 +03:00
resumed : closedChan ,
2022-01-23 00:22:14 +03:00
paused : make ( chan struct { } ) ,
2020-01-29 04:01:06 +03:00
handle : handle ,
blockTimeout : config . BlockTimeout ,
boostTimeout : config . BoostTimeout ,
boostWorkers : config . BoostWorkers ,
maxNumberOfWorkers : config . MaxWorkers ,
}
return pool
2020-01-07 14:23:09 +03:00
}
2022-02-05 23:51:25 +03:00
// Done returns when this worker pool's base context has been cancelled
func ( p * WorkerPool ) Done ( ) <- chan struct { } {
return p . baseCtx . Done ( )
}
2020-01-07 14:23:09 +03:00
// Push pushes the data to the internal channel
func ( p * WorkerPool ) Push ( data Data ) {
2020-01-29 04:01:06 +03:00
atomic . AddInt64 ( & p . numInQueue , 1 )
2020-01-07 14:23:09 +03:00
p . lock . Lock ( )
2022-01-23 00:22:14 +03:00
select {
case <- p . paused :
p . lock . Unlock ( )
p . dataChan <- data
return
default :
}
2020-01-07 14:23:09 +03:00
if p . blockTimeout > 0 && p . boostTimeout > 0 && ( p . numberOfWorkers <= p . maxNumberOfWorkers || p . maxNumberOfWorkers < 0 ) {
2021-05-02 10:22:30 +03:00
if p . numberOfWorkers == 0 {
p . zeroBoost ( )
} else {
p . lock . Unlock ( )
}
2020-01-07 14:23:09 +03:00
p . pushBoost ( data )
} else {
p . lock . Unlock ( )
p . dataChan <- data
}
}
2022-01-23 00:22:14 +03:00
// HasNoWorkerScaling will return true if the queue has no workers, and has no worker boosting
func ( p * WorkerPool ) HasNoWorkerScaling ( ) bool {
p . lock . Lock ( )
defer p . lock . Unlock ( )
return p . hasNoWorkerScaling ( )
}
func ( p * WorkerPool ) hasNoWorkerScaling ( ) bool {
return p . numberOfWorkers == 0 && ( p . boostTimeout == 0 || p . boostWorkers == 0 || p . maxNumberOfWorkers == 0 )
}
2022-02-08 17:02:32 +03:00
// zeroBoost will add a temporary boost worker for a no worker queue
// p.lock must be locked at the start of this function BUT it will be unlocked by the end of this function
// (This is because addWorkers has to be called whilst unlocked)
2021-05-02 10:22:30 +03:00
func ( p * WorkerPool ) zeroBoost ( ) {
2021-05-15 17:22:26 +03:00
ctx , cancel := context . WithTimeout ( p . baseCtx , p . boostTimeout )
2021-05-02 10:22:30 +03:00
mq := GetManager ( ) . GetManagedQueue ( p . qid )
boost := p . boostWorkers
if ( boost + p . numberOfWorkers ) > p . maxNumberOfWorkers && p . maxNumberOfWorkers >= 0 {
boost = p . maxNumberOfWorkers - p . numberOfWorkers
}
if mq != nil {
2022-01-29 23:52:37 +03:00
log . Debug ( "WorkerPool: %d (for %s) has zero workers - adding %d temporary workers for %s" , p . qid , mq . Name , boost , p . boostTimeout )
2021-05-02 10:22:30 +03:00
start := time . Now ( )
pid := mq . RegisterWorkers ( boost , start , true , start . Add ( p . boostTimeout ) , cancel , false )
2021-05-15 17:22:26 +03:00
cancel = func ( ) {
2021-05-02 10:22:30 +03:00
mq . RemoveWorkers ( pid )
2021-05-15 17:22:26 +03:00
}
2021-05-02 10:22:30 +03:00
} else {
2022-01-29 23:52:37 +03:00
log . Debug ( "WorkerPool: %d has zero workers - adding %d temporary workers for %s" , p . qid , p . boostWorkers , p . boostTimeout )
2021-05-02 10:22:30 +03:00
}
p . lock . Unlock ( )
2021-05-15 17:22:26 +03:00
p . addWorkers ( ctx , cancel , boost )
2021-05-02 10:22:30 +03:00
}
2020-01-07 14:23:09 +03:00
func ( p * WorkerPool ) pushBoost ( data Data ) {
select {
case p . dataChan <- data :
default :
p . lock . Lock ( )
if p . blockTimeout <= 0 {
p . lock . Unlock ( )
p . dataChan <- data
return
}
ourTimeout := p . blockTimeout
timer := time . NewTimer ( p . blockTimeout )
p . lock . Unlock ( )
select {
case p . dataChan <- data :
2020-05-08 18:46:05 +03:00
util . StopTimer ( timer )
2020-01-07 14:23:09 +03:00
case <- timer . C :
p . lock . Lock ( )
if p . blockTimeout > ourTimeout || ( p . numberOfWorkers > p . maxNumberOfWorkers && p . maxNumberOfWorkers >= 0 ) {
p . lock . Unlock ( )
p . dataChan <- data
return
}
p . blockTimeout *= 2
2021-05-15 17:22:26 +03:00
boostCtx , boostCtxCancel := context . WithCancel ( p . baseCtx )
2020-01-07 14:23:09 +03:00
mq := GetManager ( ) . GetManagedQueue ( p . qid )
boost := p . boostWorkers
if ( boost + p . numberOfWorkers ) > p . maxNumberOfWorkers && p . maxNumberOfWorkers >= 0 {
boost = p . maxNumberOfWorkers - p . numberOfWorkers
}
if mq != nil {
2022-01-29 23:52:37 +03:00
log . Debug ( "WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v" , p . qid , mq . Name , ourTimeout , boost , p . boostTimeout , p . blockTimeout )
2020-01-07 14:23:09 +03:00
start := time . Now ( )
2021-05-15 17:22:26 +03:00
pid := mq . RegisterWorkers ( boost , start , true , start . Add ( p . boostTimeout ) , boostCtxCancel , false )
2020-01-07 14:23:09 +03:00
go func ( ) {
2021-05-15 17:22:26 +03:00
<- boostCtx . Done ( )
2020-01-07 14:23:09 +03:00
mq . RemoveWorkers ( pid )
2021-05-15 17:22:26 +03:00
boostCtxCancel ( )
2020-01-07 14:23:09 +03:00
} ( )
} else {
2022-01-29 23:52:37 +03:00
log . Debug ( "WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v" , p . qid , ourTimeout , p . boostWorkers , p . boostTimeout , p . blockTimeout )
2020-01-07 14:23:09 +03:00
}
go func ( ) {
<- time . After ( p . boostTimeout )
2021-05-15 17:22:26 +03:00
boostCtxCancel ( )
2020-01-07 14:23:09 +03:00
p . lock . Lock ( )
p . blockTimeout /= 2
p . lock . Unlock ( )
} ( )
p . lock . Unlock ( )
2021-05-15 17:22:26 +03:00
p . addWorkers ( boostCtx , boostCtxCancel , boost )
2020-01-07 14:23:09 +03:00
p . dataChan <- data
}
}
}
// NumberOfWorkers returns the number of current workers in the pool
func ( p * WorkerPool ) NumberOfWorkers ( ) int {
p . lock . Lock ( )
defer p . lock . Unlock ( )
return p . numberOfWorkers
}
2022-02-12 08:31:26 +03:00
// NumberInQueue returns the number of items in the queue
func ( p * WorkerPool ) NumberInQueue ( ) int64 {
return atomic . LoadInt64 ( & p . numInQueue )
}
2020-01-07 14:23:09 +03:00
// MaxNumberOfWorkers returns the maximum number of workers automatically added to the pool
func ( p * WorkerPool ) MaxNumberOfWorkers ( ) int {
p . lock . Lock ( )
defer p . lock . Unlock ( )
return p . maxNumberOfWorkers
}
// BoostWorkers returns the number of workers for a boost
func ( p * WorkerPool ) BoostWorkers ( ) int {
p . lock . Lock ( )
defer p . lock . Unlock ( )
return p . boostWorkers
}
// BoostTimeout returns the timeout of the next boost
func ( p * WorkerPool ) BoostTimeout ( ) time . Duration {
p . lock . Lock ( )
defer p . lock . Unlock ( )
return p . boostTimeout
}
// BlockTimeout returns the timeout til the next boost
func ( p * WorkerPool ) BlockTimeout ( ) time . Duration {
p . lock . Lock ( )
defer p . lock . Unlock ( )
return p . blockTimeout
}
2020-01-29 04:01:06 +03:00
// SetPoolSettings sets the setable boost values
func ( p * WorkerPool ) SetPoolSettings ( maxNumberOfWorkers , boostWorkers int , timeout time . Duration ) {
2020-01-07 14:23:09 +03:00
p . lock . Lock ( )
defer p . lock . Unlock ( )
p . maxNumberOfWorkers = maxNumberOfWorkers
p . boostWorkers = boostWorkers
p . boostTimeout = timeout
}
// SetMaxNumberOfWorkers sets the maximum number of workers automatically added to the pool
// Changing this number will not change the number of current workers but will change the limit
// for future additions
func ( p * WorkerPool ) SetMaxNumberOfWorkers ( newMax int ) {
p . lock . Lock ( )
defer p . lock . Unlock ( )
p . maxNumberOfWorkers = newMax
}
2020-01-29 04:01:06 +03:00
func ( p * WorkerPool ) commonRegisterWorkers ( number int , timeout time . Duration , isFlusher bool ) ( context . Context , context . CancelFunc ) {
2020-01-07 14:23:09 +03:00
var ctx context . Context
var cancel context . CancelFunc
start := time . Now ( )
end := start
hasTimeout := false
if timeout > 0 {
ctx , cancel = context . WithTimeout ( p . baseCtx , timeout )
end = start . Add ( timeout )
hasTimeout = true
} else {
ctx , cancel = context . WithCancel ( p . baseCtx )
}
mq := GetManager ( ) . GetManagedQueue ( p . qid )
if mq != nil {
2020-01-29 04:01:06 +03:00
pid := mq . RegisterWorkers ( number , start , hasTimeout , end , cancel , isFlusher )
2020-01-07 14:23:09 +03:00
log . Trace ( "WorkerPool: %d (for %s) adding %d workers with group id: %d" , p . qid , mq . Name , number , pid )
2021-05-15 17:22:26 +03:00
return ctx , func ( ) {
mq . RemoveWorkers ( pid )
}
2020-01-07 14:23:09 +03:00
}
2021-05-15 17:22:26 +03:00
log . Trace ( "WorkerPool: %d adding %d workers (no group id)" , p . qid , number )
2020-01-29 04:01:06 +03:00
return ctx , cancel
}
// AddWorkers adds workers to the pool - this allows the number of workers to go above the limit
func ( p * WorkerPool ) AddWorkers ( number int , timeout time . Duration ) context . CancelFunc {
ctx , cancel := p . commonRegisterWorkers ( number , timeout , false )
2021-05-15 17:22:26 +03:00
p . addWorkers ( ctx , cancel , number )
2020-01-07 14:23:09 +03:00
return cancel
}
// addWorkers adds workers to the pool
2021-05-15 17:22:26 +03:00
func ( p * WorkerPool ) addWorkers ( ctx context . Context , cancel context . CancelFunc , number int ) {
2020-01-07 14:23:09 +03:00
for i := 0 ; i < number ; i ++ {
p . lock . Lock ( )
if p . cond == nil {
p . cond = sync . NewCond ( & p . lock )
}
p . numberOfWorkers ++
p . lock . Unlock ( )
go func ( ) {
2022-03-31 20:01:43 +03:00
pprof . SetGoroutineLabels ( ctx )
2020-01-07 14:23:09 +03:00
p . doWork ( ctx )
p . lock . Lock ( )
p . numberOfWorkers --
if p . numberOfWorkers == 0 {
p . cond . Broadcast ( )
2021-05-15 17:22:26 +03:00
cancel ( )
2020-01-07 14:23:09 +03:00
} else if p . numberOfWorkers < 0 {
// numberOfWorkers can't go negative but...
log . Warn ( "Number of Workers < 0 for QID %d - this shouldn't happen" , p . qid )
p . numberOfWorkers = 0
p . cond . Broadcast ( )
2021-05-15 17:22:26 +03:00
cancel ( )
2020-01-07 14:23:09 +03:00
}
2022-02-08 17:02:32 +03:00
select {
case <- p . baseCtx . Done ( ) :
2022-02-08 21:53:34 +03:00
// Don't warn or check for ongoing work if the baseCtx is shutdown
case <- p . paused :
// Don't warn or check for ongoing work if the pool is paused
2022-02-08 17:02:32 +03:00
default :
2022-02-08 21:53:34 +03:00
if p . hasNoWorkerScaling ( ) {
log . Warn (
"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n" +
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required." , p . qid )
p . pause ( )
} else if p . numberOfWorkers == 0 && atomic . LoadInt64 ( & p . numInQueue ) > 0 {
2022-02-08 17:02:32 +03:00
// OK there are no workers but... there's still work to be done -> Reboost
p . zeroBoost ( )
// p.lock will be unlocked by zeroBoost
return
}
}
2020-01-07 14:23:09 +03:00
p . lock . Unlock ( )
} ( )
}
}
// Wait for WorkerPool to finish
func ( p * WorkerPool ) Wait ( ) {
p . lock . Lock ( )
defer p . lock . Unlock ( )
if p . cond == nil {
p . cond = sync . NewCond ( & p . lock )
}
if p . numberOfWorkers <= 0 {
return
}
p . cond . Wait ( )
}
2022-01-23 00:22:14 +03:00
// IsPaused returns if the pool is paused
func ( p * WorkerPool ) IsPaused ( ) bool {
p . lock . Lock ( )
defer p . lock . Unlock ( )
select {
case <- p . paused :
return true
default :
return false
}
}
// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed
func ( p * WorkerPool ) IsPausedIsResumed ( ) ( <- chan struct { } , <- chan struct { } ) {
p . lock . Lock ( )
defer p . lock . Unlock ( )
return p . paused , p . resumed
}
// Pause pauses the WorkerPool
func ( p * WorkerPool ) Pause ( ) {
p . lock . Lock ( )
defer p . lock . Unlock ( )
p . pause ( )
}
func ( p * WorkerPool ) pause ( ) {
select {
case <- p . paused :
default :
p . resumed = make ( chan struct { } )
close ( p . paused )
}
}
// Resume resumes the WorkerPool
func ( p * WorkerPool ) Resume ( ) {
2022-02-08 21:53:34 +03:00
p . lock . Lock ( ) // can't defer unlock because of the zeroBoost at the end
2022-01-23 00:22:14 +03:00
select {
case <- p . resumed :
2022-02-08 21:53:34 +03:00
// already resumed - there's nothing to do
p . lock . Unlock ( )
return
2022-01-23 00:22:14 +03:00
default :
}
2022-02-08 21:53:34 +03:00
p . paused = make ( chan struct { } )
close ( p . resumed )
// OK now we need to check if we need to add some workers...
if p . numberOfWorkers > 0 || p . hasNoWorkerScaling ( ) || atomic . LoadInt64 ( & p . numInQueue ) == 0 {
// We either have workers, can't scale or there's no work to be done -> so just resume
p . lock . Unlock ( )
return
}
// OK we got some work but no workers we need to think about boosting
select {
case <- p . baseCtx . Done ( ) :
// don't bother boosting if the baseCtx is done
p . lock . Unlock ( )
return
default :
}
// OK we'd better add some boost workers!
p . zeroBoost ( )
// p.zeroBoost will unlock the lock
2022-01-23 00:22:14 +03:00
}
2020-01-07 14:23:09 +03:00
// CleanUp will drain the remaining contents of the channel
// This should be called after AddWorkers context is closed
func ( p * WorkerPool ) CleanUp ( ctx context . Context ) {
log . Trace ( "WorkerPool: %d CleanUp" , p . qid )
close ( p . dataChan )
for data := range p . dataChan {
2022-01-23 00:22:14 +03:00
if unhandled := p . handle ( data ) ; unhandled != nil {
if unhandled != nil {
log . Error ( "Unhandled Data in clean-up of queue %d" , p . qid )
}
}
2020-01-29 04:01:06 +03:00
atomic . AddInt64 ( & p . numInQueue , - 1 )
2020-01-07 14:23:09 +03:00
select {
case <- ctx . Done ( ) :
log . Warn ( "WorkerPool: %d Cleanup context closed before finishing clean-up" , p . qid )
return
default :
}
}
log . Trace ( "WorkerPool: %d CleanUp Done" , p . qid )
}
2020-01-29 04:01:06 +03:00
// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
func ( p * WorkerPool ) Flush ( timeout time . Duration ) error {
ctx , cancel := p . commonRegisterWorkers ( 1 , timeout , true )
defer cancel ( )
return p . FlushWithContext ( ctx )
}
// IsEmpty returns if true if the worker queue is empty
func ( p * WorkerPool ) IsEmpty ( ) bool {
return atomic . LoadInt64 ( & p . numInQueue ) == 0
}
2022-12-30 03:06:47 +03:00
// contextError returns either ctx.Done(), the base context's error or nil
func ( p * WorkerPool ) contextError ( ctx context . Context ) error {
select {
case <- p . baseCtx . Done ( ) :
return p . baseCtx . Err ( )
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
return nil
}
}
2020-01-29 04:01:06 +03:00
// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
// NB: The worker will not be registered with the manager.
func ( p * WorkerPool ) FlushWithContext ( ctx context . Context ) error {
log . Trace ( "WorkerPool: %d Flush" , p . qid )
2022-12-30 03:06:47 +03:00
paused , _ := p . IsPausedIsResumed ( )
2020-01-29 04:01:06 +03:00
for {
2022-12-30 03:06:47 +03:00
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
select {
case <- paused :
// Ensure that even if paused that the cancelled error is still sent
return p . contextError ( ctx )
case <- p . baseCtx . Done ( ) :
return p . baseCtx . Err ( )
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
2020-01-29 04:01:06 +03:00
select {
2022-12-30 03:06:47 +03:00
case <- paused :
return p . contextError ( ctx )
case data , ok := <- p . dataChan :
if ! ok {
return nil
}
2022-01-23 00:22:14 +03:00
if unhandled := p . handle ( data ) ; unhandled != nil {
log . Error ( "Unhandled Data whilst flushing queue %d" , p . qid )
}
2020-01-29 04:01:06 +03:00
atomic . AddInt64 ( & p . numInQueue , - 1 )
case <- p . baseCtx . Done ( ) :
return p . baseCtx . Err ( )
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
return nil
}
}
}
2020-01-07 14:23:09 +03:00
func ( p * WorkerPool ) doWork ( ctx context . Context ) {
2022-03-31 20:01:43 +03:00
pprof . SetGoroutineLabels ( ctx )
2020-01-07 14:23:09 +03:00
delay := time . Millisecond * 300
2022-01-23 00:22:14 +03:00
// Create a common timer - we will use this elsewhere
timer := time . NewTimer ( 0 )
util . StopTimer ( timer )
paused , _ := p . IsPausedIsResumed ( )
2022-01-20 20:46:10 +03:00
data := make ( [ ] Data , 0 , p . batchLength )
2020-01-07 14:23:09 +03:00
for {
2022-12-30 03:06:47 +03:00
// Because select will return any case that is satisified at random we precheck here before looking at dataChan.
2020-01-07 14:23:09 +03:00
select {
2022-01-23 00:22:14 +03:00
case <- paused :
log . Trace ( "Worker for Queue %d Pausing" , p . qid )
if len ( data ) > 0 {
log . Trace ( "Handling: %d data, %v" , len ( data ) , data )
if unhandled := p . handle ( data ... ) ; unhandled != nil {
log . Error ( "Unhandled Data in queue %d" , p . qid )
}
atomic . AddInt64 ( & p . numInQueue , - 1 * int64 ( len ( data ) ) )
}
_ , resumed := p . IsPausedIsResumed ( )
select {
case <- resumed :
paused , _ = p . IsPausedIsResumed ( )
log . Trace ( "Worker for Queue %d Resuming" , p . qid )
util . StopTimer ( timer )
case <- ctx . Done ( ) :
log . Trace ( "Worker shutting down" )
return
}
2022-12-30 03:06:47 +03:00
case <- ctx . Done ( ) :
if len ( data ) > 0 {
log . Trace ( "Handling: %d data, %v" , len ( data ) , data )
if unhandled := p . handle ( data ... ) ; unhandled != nil {
log . Error ( "Unhandled Data in queue %d" , p . qid )
}
atomic . AddInt64 ( & p . numInQueue , - 1 * int64 ( len ( data ) ) )
}
log . Trace ( "Worker shutting down" )
return
2022-01-23 00:22:14 +03:00
default :
}
2022-12-30 03:06:47 +03:00
2022-01-23 00:22:14 +03:00
select {
case <- paused :
// go back around
2020-01-07 14:23:09 +03:00
case <- ctx . Done ( ) :
if len ( data ) > 0 {
log . Trace ( "Handling: %d data, %v" , len ( data ) , data )
2022-01-23 00:22:14 +03:00
if unhandled := p . handle ( data ... ) ; unhandled != nil {
log . Error ( "Unhandled Data in queue %d" , p . qid )
}
2020-01-29 04:01:06 +03:00
atomic . AddInt64 ( & p . numInQueue , - 1 * int64 ( len ( data ) ) )
2020-01-07 14:23:09 +03:00
}
log . Trace ( "Worker shutting down" )
return
case datum , ok := <- p . dataChan :
if ! ok {
// the dataChan has been closed - we should finish up:
if len ( data ) > 0 {
log . Trace ( "Handling: %d data, %v" , len ( data ) , data )
2022-01-23 00:22:14 +03:00
if unhandled := p . handle ( data ... ) ; unhandled != nil {
log . Error ( "Unhandled Data in queue %d" , p . qid )
}
2020-01-29 04:01:06 +03:00
atomic . AddInt64 ( & p . numInQueue , - 1 * int64 ( len ( data ) ) )
2020-01-07 14:23:09 +03:00
}
log . Trace ( "Worker shutting down" )
return
}
data = append ( data , datum )
2022-01-23 00:22:14 +03:00
util . StopTimer ( timer )
2020-01-07 14:23:09 +03:00
if len ( data ) >= p . batchLength {
log . Trace ( "Handling: %d data, %v" , len ( data ) , data )
2022-01-23 00:22:14 +03:00
if unhandled := p . handle ( data ... ) ; unhandled != nil {
log . Error ( "Unhandled Data in queue %d" , p . qid )
}
2020-01-29 04:01:06 +03:00
atomic . AddInt64 ( & p . numInQueue , - 1 * int64 ( len ( data ) ) )
2020-01-07 14:23:09 +03:00
data = make ( [ ] Data , 0 , p . batchLength )
2022-01-23 00:22:14 +03:00
} else {
timer . Reset ( delay )
2020-01-07 14:23:09 +03:00
}
2022-01-23 00:22:14 +03:00
case <- timer . C :
delay = time . Millisecond * 100
if len ( data ) > 0 {
log . Trace ( "Handling: %d data, %v" , len ( data ) , data )
if unhandled := p . handle ( data ... ) ; unhandled != nil {
log . Error ( "Unhandled Data in queue %d" , p . qid )
2020-01-07 14:23:09 +03:00
}
2022-01-23 00:22:14 +03:00
atomic . AddInt64 ( & p . numInQueue , - 1 * int64 ( len ( data ) ) )
data = make ( [ ] Data , 0 , p . batchLength )
2020-01-07 14:23:09 +03:00
}
}
}
}