2020-01-07 14:23:09 +03:00
// Copyright 2019 The Gitea Authors. All rights reserved.
2022-11-27 21:20:29 +03:00
// SPDX-License-Identifier: MIT
2020-01-07 14:23:09 +03:00
package queue
import (
"context"
"fmt"
2022-03-31 20:01:43 +03:00
"runtime/pprof"
2022-01-23 00:22:14 +03:00
"sync/atomic"
"time"
2020-01-07 14:23:09 +03:00
"code.gitea.io/gitea/modules/log"
)
// ChannelQueueType is the type for channel queue
const ChannelQueueType Type = "channel"
// ChannelQueueConfiguration is the configuration for a ChannelQueue
type ChannelQueueConfiguration struct {
2020-01-29 04:01:06 +03:00
WorkerPoolConfiguration
Workers int
2020-01-07 14:23:09 +03:00
}
2020-01-29 04:01:06 +03:00
// ChannelQueue implements Queue
//
// A channel queue is not persistable and does not shutdown or terminate cleanly
// It is basically a very thin wrapper around a WorkerPool
2020-01-07 14:23:09 +03:00
type ChannelQueue struct {
2020-01-29 04:01:06 +03:00
* WorkerPool
2021-05-15 17:22:26 +03:00
shutdownCtx context . Context
shutdownCtxCancel context . CancelFunc
terminateCtx context . Context
terminateCtxCancel context . CancelFunc
exemplar interface { }
workers int
name string
2020-01-07 14:23:09 +03:00
}
2020-01-29 04:01:06 +03:00
// NewChannelQueue creates a memory channel queue
2020-01-07 14:23:09 +03:00
func NewChannelQueue ( handle HandlerFunc , cfg , exemplar interface { } ) ( Queue , error ) {
configInterface , err := toConfig ( ChannelQueueConfiguration { } , cfg )
if err != nil {
return nil , err
}
config := configInterface . ( ChannelQueueConfiguration )
if config . BatchLength == 0 {
config . BatchLength = 1
}
2021-05-15 17:22:26 +03:00
terminateCtx , terminateCtxCancel := context . WithCancel ( context . Background ( ) )
shutdownCtx , shutdownCtxCancel := context . WithCancel ( terminateCtx )
2020-01-07 14:23:09 +03:00
queue := & ChannelQueue {
2021-05-15 17:22:26 +03:00
shutdownCtx : shutdownCtx ,
shutdownCtxCancel : shutdownCtxCancel ,
terminateCtx : terminateCtx ,
terminateCtxCancel : terminateCtxCancel ,
exemplar : exemplar ,
workers : config . Workers ,
name : config . Name ,
2020-01-07 14:23:09 +03:00
}
2022-01-23 00:22:14 +03:00
queue . WorkerPool = NewWorkerPool ( func ( data ... Data ) [ ] Data {
unhandled := handle ( data ... )
if len ( unhandled ) > 0 {
// We can only pushback to the channel if we're paused.
if queue . IsPaused ( ) {
atomic . AddInt64 ( & queue . numInQueue , int64 ( len ( unhandled ) ) )
go func ( ) {
for _ , datum := range data {
queue . dataChan <- datum
}
} ( )
return nil
}
}
return unhandled
} , config . WorkerPoolConfiguration )
2020-01-29 04:01:06 +03:00
queue . qid = GetManager ( ) . Add ( queue , ChannelQueueType , config , exemplar )
2020-01-07 14:23:09 +03:00
return queue , nil
}
// Run starts to run the queue
2021-05-15 17:22:26 +03:00
func ( q * ChannelQueue ) Run ( atShutdown , atTerminate func ( func ( ) ) ) {
2022-03-31 20:01:43 +03:00
pprof . SetGoroutineLabels ( q . baseCtx )
2021-05-15 17:22:26 +03:00
atShutdown ( q . Shutdown )
atTerminate ( q . Terminate )
2020-02-03 02:19:58 +03:00
log . Debug ( "ChannelQueue: %s Starting" , q . name )
2021-05-15 17:22:26 +03:00
_ = q . AddWorkers ( q . workers , 0 )
2020-01-07 14:23:09 +03:00
}
// Push will push data into the queue
2020-02-03 02:19:58 +03:00
func ( q * ChannelQueue ) Push ( data Data ) error {
if ! assignableTo ( data , q . exemplar ) {
2022-01-25 01:54:35 +03:00
return fmt . Errorf ( "unable to assign data: %v to same type as exemplar: %v in queue: %s" , data , q . exemplar , q . name )
2020-01-07 14:23:09 +03:00
}
2020-02-03 02:19:58 +03:00
q . WorkerPool . Push ( data )
2020-01-07 14:23:09 +03:00
return nil
}
2022-01-23 00:22:14 +03:00
// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
func ( q * ChannelQueue ) Flush ( timeout time . Duration ) error {
if q . IsPaused ( ) {
return nil
}
ctx , cancel := q . commonRegisterWorkers ( 1 , timeout , true )
defer cancel ( )
return q . FlushWithContext ( ctx )
}
2021-05-15 17:22:26 +03:00
// Shutdown processing from this queue
func ( q * ChannelQueue ) Shutdown ( ) {
q . lock . Lock ( )
defer q . lock . Unlock ( )
select {
case <- q . shutdownCtx . Done ( ) :
log . Trace ( "ChannelQueue: %s Already Shutting down" , q . name )
return
default :
}
log . Trace ( "ChannelQueue: %s Shutting down" , q . name )
go func ( ) {
log . Trace ( "ChannelQueue: %s Flushing" , q . name )
2022-01-23 00:22:14 +03:00
// We can't use Cleanup here because that will close the channel
2021-05-15 17:22:26 +03:00
if err := q . FlushWithContext ( q . terminateCtx ) ; err != nil {
2023-03-01 01:55:43 +03:00
count := atomic . LoadInt64 ( & q . numInQueue )
if count > 0 {
log . Warn ( "ChannelQueue: %s Terminated before completed flushing" , q . name )
}
2021-05-15 17:22:26 +03:00
return
}
log . Debug ( "ChannelQueue: %s Flushed" , q . name )
} ( )
q . shutdownCtxCancel ( )
log . Debug ( "ChannelQueue: %s Shutdown" , q . name )
}
// Terminate this queue and close the queue
func ( q * ChannelQueue ) Terminate ( ) {
log . Trace ( "ChannelQueue: %s Terminating" , q . name )
q . Shutdown ( )
select {
case <- q . terminateCtx . Done ( ) :
return
default :
}
q . terminateCtxCancel ( )
2022-03-31 20:01:43 +03:00
q . baseCtxFinished ( )
2021-05-15 17:22:26 +03:00
log . Debug ( "ChannelQueue: %s Terminated" , q . name )
}
2020-01-07 14:23:09 +03:00
// Name returns the name of this queue
2020-02-03 02:19:58 +03:00
func ( q * ChannelQueue ) Name ( ) string {
return q . name
2020-01-07 14:23:09 +03:00
}
func init ( ) {
queuesMap [ ChannelQueueType ] = NewChannelQueue
}