2020-02-02 23:19:58 +00:00
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"fmt"
2022-03-31 18:01:43 +01:00
"runtime/pprof"
2020-02-02 23:19:58 +00:00
"sync"
2022-01-22 21:22:14 +00:00
"sync/atomic"
2020-02-02 23:19:58 +00:00
"time"
2021-07-25 00:03:58 +08:00
"code.gitea.io/gitea/modules/json"
2020-02-02 23:19:58 +00:00
"code.gitea.io/gitea/modules/log"
2022-01-22 21:22:14 +00:00
"code.gitea.io/gitea/modules/util"
2020-02-02 23:19:58 +00:00
)
// ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
type ByteFIFOQueueConfiguration struct {
WorkerPoolConfiguration
2021-05-15 15:22:26 +01:00
Workers int
WaitOnEmpty bool
2020-02-02 23:19:58 +00:00
}
2021-04-09 09:40:34 +02:00
var _ Queue = & ByteFIFOQueue { }
2020-02-02 23:19:58 +00:00
// ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool
type ByteFIFOQueue struct {
* WorkerPool
2021-05-15 15:22:26 +01:00
byteFIFO ByteFIFO
typ Type
shutdownCtx context . Context
shutdownCtxCancel context . CancelFunc
terminateCtx context . Context
terminateCtxCancel context . CancelFunc
exemplar interface { }
workers int
name string
lock sync . Mutex
waitOnEmpty bool
pushed chan struct { }
2020-02-02 23:19:58 +00:00
}
// NewByteFIFOQueue creates a new ByteFIFOQueue
func NewByteFIFOQueue ( typ Type , byteFIFO ByteFIFO , handle HandlerFunc , cfg , exemplar interface { } ) ( * ByteFIFOQueue , error ) {
configInterface , err := toConfig ( ByteFIFOQueueConfiguration { } , cfg )
if err != nil {
return nil , err
}
config := configInterface . ( ByteFIFOQueueConfiguration )
2021-05-15 15:22:26 +01:00
terminateCtx , terminateCtxCancel := context . WithCancel ( context . Background ( ) )
shutdownCtx , shutdownCtxCancel := context . WithCancel ( terminateCtx )
2022-01-22 21:22:14 +00:00
q := & ByteFIFOQueue {
2021-05-15 15:22:26 +01:00
byteFIFO : byteFIFO ,
typ : typ ,
shutdownCtx : shutdownCtx ,
shutdownCtxCancel : shutdownCtxCancel ,
terminateCtx : terminateCtx ,
terminateCtxCancel : terminateCtxCancel ,
exemplar : exemplar ,
workers : config . Workers ,
name : config . Name ,
waitOnEmpty : config . WaitOnEmpty ,
pushed : make ( chan struct { } , 1 ) ,
2022-01-22 21:22:14 +00:00
}
q . WorkerPool = NewWorkerPool ( func ( data ... Data ) ( failed [ ] Data ) {
for _ , unhandled := range handle ( data ... ) {
if fail := q . PushBack ( unhandled ) ; fail != nil {
failed = append ( failed , fail )
}
}
return
} , config . WorkerPoolConfiguration )
return q , nil
2020-02-02 23:19:58 +00:00
}
// Name returns the name of this queue
func ( q * ByteFIFOQueue ) Name ( ) string {
return q . name
}
// Push pushes data to the fifo
func ( q * ByteFIFOQueue ) Push ( data Data ) error {
return q . PushFunc ( data , nil )
}
2022-01-22 21:22:14 +00:00
// PushBack pushes data to the fifo
func ( q * ByteFIFOQueue ) PushBack ( data Data ) error {
if ! assignableTo ( data , q . exemplar ) {
2022-01-24 22:54:35 +00:00
return fmt . Errorf ( "unable to assign data: %v to same type as exemplar: %v in %s" , data , q . exemplar , q . name )
2022-01-22 21:22:14 +00:00
}
bs , err := json . Marshal ( data )
if err != nil {
return err
}
defer func ( ) {
select {
case q . pushed <- struct { } { } :
default :
}
} ( )
return q . byteFIFO . PushBack ( q . terminateCtx , bs )
}
2020-02-02 23:19:58 +00:00
// PushFunc pushes data to the fifo
func ( q * ByteFIFOQueue ) PushFunc ( data Data , fn func ( ) error ) error {
if ! assignableTo ( data , q . exemplar ) {
2022-01-24 22:54:35 +00:00
return fmt . Errorf ( "unable to assign data: %v to same type as exemplar: %v in %s" , data , q . exemplar , q . name )
2020-02-02 23:19:58 +00:00
}
bs , err := json . Marshal ( data )
if err != nil {
return err
}
2022-01-22 21:22:14 +00:00
defer func ( ) {
select {
case q . pushed <- struct { } { } :
default :
}
} ( )
2021-05-15 15:22:26 +01:00
return q . byteFIFO . PushFunc ( q . terminateCtx , bs , fn )
2020-02-02 23:19:58 +00:00
}
// IsEmpty checks if the queue is empty
func ( q * ByteFIFOQueue ) IsEmpty ( ) bool {
q . lock . Lock ( )
defer q . lock . Unlock ( )
if ! q . WorkerPool . IsEmpty ( ) {
return false
}
2021-05-15 15:22:26 +01:00
return q . byteFIFO . Len ( q . terminateCtx ) == 0
2020-02-02 23:19:58 +00:00
}
2022-02-12 05:31:26 +00:00
// NumberInQueue returns the number in the queue
func ( q * ByteFIFOQueue ) NumberInQueue ( ) int64 {
q . lock . Lock ( )
defer q . lock . Unlock ( )
return q . byteFIFO . Len ( q . terminateCtx ) + q . WorkerPool . NumberInQueue ( )
}
2022-01-22 21:22:14 +00:00
// Flush flushes the ByteFIFOQueue
func ( q * ByteFIFOQueue ) Flush ( timeout time . Duration ) error {
select {
case q . pushed <- struct { } { } :
default :
}
return q . WorkerPool . Flush ( timeout )
}
2020-02-02 23:19:58 +00:00
// Run runs the bytefifo queue
2021-05-15 15:22:26 +01:00
func ( q * ByteFIFOQueue ) Run ( atShutdown , atTerminate func ( func ( ) ) ) {
2022-03-31 18:01:43 +01:00
pprof . SetGoroutineLabels ( q . baseCtx )
2021-05-15 15:22:26 +01:00
atShutdown ( q . Shutdown )
atTerminate ( q . Terminate )
2020-02-02 23:19:58 +00:00
log . Debug ( "%s: %s Starting" , q . typ , q . name )
2021-05-15 15:22:26 +01:00
_ = q . AddWorkers ( q . workers , 0 )
2020-02-02 23:19:58 +00:00
2021-05-15 15:22:26 +01:00
log . Trace ( "%s: %s Now running" , q . typ , q . name )
q . readToChan ( )
2020-02-02 23:19:58 +00:00
2021-05-15 15:22:26 +01:00
<- q . shutdownCtx . Done ( )
2020-02-02 23:19:58 +00:00
log . Trace ( "%s: %s Waiting til done" , q . typ , q . name )
q . Wait ( )
log . Trace ( "%s: %s Waiting til cleaned" , q . typ , q . name )
2021-05-15 15:22:26 +01:00
q . CleanUp ( q . terminateCtx )
q . terminateCtxCancel ( )
2020-02-02 23:19:58 +00:00
}
2021-05-15 15:22:26 +01:00
const maxBackOffTime = time . Second * 3
2020-02-02 23:19:58 +00:00
func ( q * ByteFIFOQueue ) readToChan ( ) {
2021-05-08 17:29:47 +01:00
// handle quick cancels
select {
2021-05-15 15:22:26 +01:00
case <- q . shutdownCtx . Done ( ) :
2021-05-08 17:29:47 +01:00
// tell the pool to shutdown.
2021-05-15 15:22:26 +01:00
q . baseCtxCancel ( )
2021-05-08 17:29:47 +01:00
return
default :
}
2021-05-15 15:22:26 +01:00
// Default backoff values
2021-05-08 17:29:47 +01:00
backOffTime := time . Millisecond * 100
2022-01-22 21:22:14 +00:00
backOffTimer := time . NewTimer ( 0 )
util . StopTimer ( backOffTimer )
paused , _ := q . IsPausedIsResumed ( )
2020-02-02 23:19:58 +00:00
2021-05-15 15:22:26 +01:00
loop :
for {
2022-01-22 21:22:14 +00:00
select {
case <- paused :
log . Trace ( "Queue %s pausing" , q . name )
_ , resumed := q . IsPausedIsResumed ( )
2021-05-08 17:29:47 +01:00
select {
2022-01-22 21:22:14 +00:00
case <- resumed :
paused , _ = q . IsPausedIsResumed ( )
log . Trace ( "Queue %s resuming" , q . name )
if q . HasNoWorkerScaling ( ) {
log . Warn (
"Queue: %s is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n" +
"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required." , q . name )
q . Pause ( )
continue loop
}
2021-05-15 15:22:26 +01:00
case <- q . shutdownCtx . Done ( ) :
2022-01-22 21:22:14 +00:00
// tell the pool to shutdown.
2021-05-15 15:22:26 +01:00
q . baseCtxCancel ( )
2021-05-08 17:29:47 +01:00
return
2022-01-29 11:37:08 +00:00
case data , ok := <- q . dataChan :
if ! ok {
return
}
2022-01-22 21:22:14 +00:00
if err := q . PushBack ( data ) ; err != nil {
log . Error ( "Unable to push back data into queue %s" , q . name )
}
atomic . AddInt64 ( & q . numInQueue , - 1 )
2020-02-02 23:19:58 +00:00
}
2022-01-22 21:22:14 +00:00
default :
2021-05-15 15:22:26 +01:00
}
2022-01-22 21:22:14 +00:00
// empty the pushed channel
select {
case <- q . pushed :
default :
2021-05-15 15:22:26 +01:00
}
2022-01-22 21:22:14 +00:00
err := q . doPop ( )
util . StopTimer ( backOffTimer )
2021-05-15 15:22:26 +01:00
if err != nil {
2022-01-22 21:22:14 +00:00
if err == errQueueEmpty && q . waitOnEmpty {
log . Trace ( "%s: %s Waiting on Empty" , q . typ , q . name )
// reset the backoff time but don't set the timer
backOffTime = 100 * time . Millisecond
} else if err == errUnmarshal {
// reset the timer and backoff
backOffTime = 100 * time . Millisecond
backOffTimer . Reset ( backOffTime )
} else {
// backoff
backOffTimer . Reset ( backOffTime )
}
2021-05-15 15:22:26 +01:00
// Need to Backoff
2021-05-08 17:29:47 +01:00
select {
2021-05-15 15:22:26 +01:00
case <- q . shutdownCtx . Done ( ) :
// Oops we've been shutdown whilst backing off
// Make sure the worker pool is shutdown too
q . baseCtxCancel ( )
2021-05-08 17:29:47 +01:00
return
2022-01-22 21:22:14 +00:00
case <- q . pushed :
// Data has been pushed to the fifo (or flush has been called)
// reset the backoff time
backOffTime = 100 * time . Millisecond
continue loop
case <- backOffTimer . C :
// Calculate the next backoff time
2021-05-15 15:22:26 +01:00
backOffTime += backOffTime / 2
if backOffTime > maxBackOffTime {
backOffTime = maxBackOffTime
}
continue loop
2020-02-02 23:19:58 +00:00
}
}
2022-01-22 21:22:14 +00:00
// Reset the backoff time
backOffTime = 100 * time . Millisecond
2021-05-15 15:22:26 +01:00
select {
case <- q . shutdownCtx . Done ( ) :
// Oops we've been shutdown
// Make sure the worker pool is shutdown too
q . baseCtxCancel ( )
return
default :
continue loop
}
2020-02-02 23:19:58 +00:00
}
}
2022-01-20 18:46:10 +01:00
var (
errQueueEmpty = fmt . Errorf ( "empty queue" )
errEmptyBytes = fmt . Errorf ( "empty bytes" )
errUnmarshal = fmt . Errorf ( "failed to unmarshal" )
)
2021-05-15 15:22:26 +01:00
func ( q * ByteFIFOQueue ) doPop ( ) error {
2021-05-08 17:29:47 +01:00
q . lock . Lock ( )
defer q . lock . Unlock ( )
2021-05-15 15:22:26 +01:00
bs , err := q . byteFIFO . Pop ( q . shutdownCtx )
2021-05-08 17:29:47 +01:00
if err != nil {
2021-05-15 15:22:26 +01:00
if err == context . Canceled {
q . baseCtxCancel ( )
return err
}
2021-05-08 17:29:47 +01:00
log . Error ( "%s: %s Error on Pop: %v" , q . typ , q . name , err )
2021-05-15 15:22:26 +01:00
return err
2021-05-08 17:29:47 +01:00
}
if len ( bs ) == 0 {
2021-05-15 15:22:26 +01:00
if q . waitOnEmpty && q . byteFIFO . Len ( q . shutdownCtx ) == 0 {
return errQueueEmpty
}
return errEmptyBytes
2021-05-08 17:29:47 +01:00
}
data , err := unmarshalAs ( bs , q . exemplar )
if err != nil {
log . Error ( "%s: %s Failed to unmarshal with error: %v" , q . typ , q . name , err )
2021-05-15 15:22:26 +01:00
return errUnmarshal
2021-05-08 17:29:47 +01:00
}
log . Trace ( "%s %s: Task found: %#v" , q . typ , q . name , data )
q . WorkerPool . Push ( data )
2021-05-15 15:22:26 +01:00
return nil
2021-05-08 17:29:47 +01:00
}
2020-02-02 23:19:58 +00:00
// Shutdown processing from this queue
func ( q * ByteFIFOQueue ) Shutdown ( ) {
log . Trace ( "%s: %s Shutting down" , q . typ , q . name )
select {
2021-05-15 15:22:26 +01:00
case <- q . shutdownCtx . Done ( ) :
return
2020-02-02 23:19:58 +00:00
default :
}
2021-05-15 15:22:26 +01:00
q . shutdownCtxCancel ( )
2020-02-02 23:19:58 +00:00
log . Debug ( "%s: %s Shutdown" , q . typ , q . name )
}
2021-02-10 22:28:32 +01:00
// IsShutdown returns a channel which is closed when this Queue is shutdown
func ( q * ByteFIFOQueue ) IsShutdown ( ) <- chan struct { } {
2021-05-15 15:22:26 +01:00
return q . shutdownCtx . Done ( )
2021-02-10 22:28:32 +01:00
}
2020-02-02 23:19:58 +00:00
// Terminate this queue and close the queue
func ( q * ByteFIFOQueue ) Terminate ( ) {
log . Trace ( "%s: %s Terminating" , q . typ , q . name )
q . Shutdown ( )
select {
2021-05-15 15:22:26 +01:00
case <- q . terminateCtx . Done ( ) :
2020-02-02 23:19:58 +00:00
return
default :
}
if log . IsDebug ( ) {
2021-05-15 15:22:26 +01:00
log . Debug ( "%s: %s Closing with %d tasks left in queue" , q . typ , q . name , q . byteFIFO . Len ( q . terminateCtx ) )
2020-02-02 23:19:58 +00:00
}
2021-05-15 15:22:26 +01:00
q . terminateCtxCancel ( )
2020-02-02 23:19:58 +00:00
if err := q . byteFIFO . Close ( ) ; err != nil {
log . Error ( "Error whilst closing internal byte fifo in %s: %s: %v" , q . typ , q . name , err )
}
2022-03-31 18:01:43 +01:00
q . baseCtxFinished ( )
2020-02-02 23:19:58 +00:00
log . Debug ( "%s: %s Terminated" , q . typ , q . name )
}
2021-02-10 22:28:32 +01:00
// IsTerminated returns a channel which is closed when this Queue is terminated
func ( q * ByteFIFOQueue ) IsTerminated ( ) <- chan struct { } {
2021-05-15 15:22:26 +01:00
return q . terminateCtx . Done ( )
2021-02-10 22:28:32 +01:00
}
2021-04-09 09:40:34 +02:00
var _ UniqueQueue = & ByteFIFOUniqueQueue { }
2020-02-02 23:19:58 +00:00
// ByteFIFOUniqueQueue represents a UniqueQueue formed from a UniqueByteFifo
type ByteFIFOUniqueQueue struct {
ByteFIFOQueue
}
// NewByteFIFOUniqueQueue creates a new ByteFIFOUniqueQueue
func NewByteFIFOUniqueQueue ( typ Type , byteFIFO UniqueByteFIFO , handle HandlerFunc , cfg , exemplar interface { } ) ( * ByteFIFOUniqueQueue , error ) {
configInterface , err := toConfig ( ByteFIFOQueueConfiguration { } , cfg )
if err != nil {
return nil , err
}
config := configInterface . ( ByteFIFOQueueConfiguration )
2021-05-15 15:22:26 +01:00
terminateCtx , terminateCtxCancel := context . WithCancel ( context . Background ( ) )
shutdownCtx , shutdownCtxCancel := context . WithCancel ( terminateCtx )
2020-02-02 23:19:58 +00:00
2022-01-22 21:22:14 +00:00
q := & ByteFIFOUniqueQueue {
2020-02-02 23:19:58 +00:00
ByteFIFOQueue : ByteFIFOQueue {
2021-05-15 15:22:26 +01:00
byteFIFO : byteFIFO ,
typ : typ ,
shutdownCtx : shutdownCtx ,
shutdownCtxCancel : shutdownCtxCancel ,
terminateCtx : terminateCtx ,
terminateCtxCancel : terminateCtxCancel ,
exemplar : exemplar ,
workers : config . Workers ,
name : config . Name ,
2020-02-02 23:19:58 +00:00
} ,
2022-01-22 21:22:14 +00:00
}
q . WorkerPool = NewWorkerPool ( func ( data ... Data ) ( failed [ ] Data ) {
for _ , unhandled := range handle ( data ... ) {
if fail := q . PushBack ( unhandled ) ; fail != nil {
failed = append ( failed , fail )
}
}
return
} , config . WorkerPoolConfiguration )
return q , nil
2020-02-02 23:19:58 +00:00
}
// Has checks if the provided data is in the queue
func ( q * ByteFIFOUniqueQueue ) Has ( data Data ) ( bool , error ) {
if ! assignableTo ( data , q . exemplar ) {
2022-01-24 22:54:35 +00:00
return false , fmt . Errorf ( "unable to assign data: %v to same type as exemplar: %v in %s" , data , q . exemplar , q . name )
2020-02-02 23:19:58 +00:00
}
bs , err := json . Marshal ( data )
if err != nil {
return false , err
}
2021-05-15 15:22:26 +01:00
return q . byteFIFO . ( UniqueByteFIFO ) . Has ( q . terminateCtx , bs )
2020-02-02 23:19:58 +00:00
}