2020-01-07 14:23:09 +03:00
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"fmt"
"sync"
2020-01-29 04:01:06 +03:00
"sync/atomic"
2020-01-07 14:23:09 +03:00
"time"
"code.gitea.io/gitea/modules/log"
)
// WrappedQueueType is the type for a wrapped delayed starting queue
const WrappedQueueType Type = "wrapped"
// WrappedQueueConfiguration is the configuration for a WrappedQueue
type WrappedQueueConfiguration struct {
Underlying Type
Timeout time . Duration
MaxAttempts int
Config interface { }
QueueLength int
Name string
}
type delayedStarter struct {
internal Queue
underlying Type
cfg interface { }
timeout time . Duration
maxAttempts int
name string
}
// setInternal must be called with the lock locked.
func ( q * delayedStarter ) setInternal ( atShutdown func ( context . Context , func ( ) ) , handle HandlerFunc , exemplar interface { } ) error {
var ctx context . Context
var cancel context . CancelFunc
if q . timeout > 0 {
ctx , cancel = context . WithTimeout ( context . Background ( ) , q . timeout )
} else {
ctx , cancel = context . WithCancel ( context . Background ( ) )
}
defer cancel ( )
// Ensure we also stop at shutdown
atShutdown ( ctx , func ( ) {
cancel ( )
} )
i := 1
for q . internal == nil {
select {
case <- ctx . Done ( ) :
Improve config logging when WrappedQueue times out (#11174)
Before
```sh
Unable to set the internal queue for -wrapper Error: Timedout creating queue redis with cfg []byte{0x7b, 0x22, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x65, 0x73, 0x22, 0x3a, 0x22, 0x31, 0x32, 0x37, 0x2e, 0x30, 0x2e, 0x30, 0x2e, 0x31, 0x3a, 0x36, 0x33, 0x37, 0x39, 0x22, 0x2c, 0x22, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4c, 0x65, 0x6e, 0x67, 0x74, 0x68, 0x22, 0x3a, 0x32, 0x30, 0x2c, 0x22, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x3a, 0x31, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x2c, 0x22, 0x42, 0x6f, 0x6f, 0x73, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x6f, 0x75, 0x74, 0x22, 0x3a, 0x33, 0x30, 0x30, 0x30, 0x30, 0x30
......
```
After
```sh
Unable to set the internal queue for -wrapper Error: Timedout creating queue redis with cfg "{\"Addresses\":\"127.0.0.1:6379\",\"BatchLength\":20,\"BlockTimeout\":1000000000,\"BoostTimeout\":300000000000,\"BoostWorkers\":5,\"DBIndex\":0,\"DataDir\":\".../data/queues/mail\",\"MaxWorkers\":10,\"Name\":\"mail\",\"Network\":\"\",\"Password\":\"\",\"QueueLength\":20,\"QueueName\":\"mail_queue\",\"SetName\":\"\",\"Workers\":1}" in
```
2020-04-22 15:38:40 +03:00
var cfg = q . cfg
if s , ok := cfg . ( [ ] byte ) ; ok {
cfg = string ( s )
}
return fmt . Errorf ( "Timedout creating queue %v with cfg %#v in %s" , q . underlying , cfg , q . name )
2020-01-07 14:23:09 +03:00
default :
queue , err := NewQueue ( q . underlying , handle , q . cfg , exemplar )
if err == nil {
q . internal = queue
break
}
if err . Error ( ) != "resource temporarily unavailable" {
2020-03-29 10:12:15 +03:00
if bs , ok := q . cfg . ( [ ] byte ) ; ok {
log . Warn ( "[Attempt: %d] Failed to create queue: %v for %s cfg: %s error: %v" , i , q . underlying , q . name , string ( bs ) , err )
} else {
log . Warn ( "[Attempt: %d] Failed to create queue: %v for %s cfg: %#v error: %v" , i , q . underlying , q . name , q . cfg , err )
}
2020-01-07 14:23:09 +03:00
}
i ++
if q . maxAttempts > 0 && i > q . maxAttempts {
2020-03-29 10:12:15 +03:00
if bs , ok := q . cfg . ( [ ] byte ) ; ok {
return fmt . Errorf ( "Unable to create queue %v for %s with cfg %s by max attempts: error: %v" , q . underlying , q . name , string ( bs ) , err )
}
2020-03-16 11:59:21 +03:00
return fmt . Errorf ( "Unable to create queue %v for %s with cfg %#v by max attempts: error: %v" , q . underlying , q . name , q . cfg , err )
2020-01-07 14:23:09 +03:00
}
sleepTime := 100 * time . Millisecond
if q . timeout > 0 && q . maxAttempts > 0 {
sleepTime = ( q . timeout - 200 * time . Millisecond ) / time . Duration ( q . maxAttempts )
}
t := time . NewTimer ( sleepTime )
select {
case <- ctx . Done ( ) :
t . Stop ( )
case <- t . C :
}
}
}
return nil
}
// WrappedQueue wraps a delayed starting queue
type WrappedQueue struct {
delayedStarter
2020-01-29 04:01:06 +03:00
lock sync . Mutex
handle HandlerFunc
exemplar interface { }
channel chan Data
numInQueue int64
2020-01-07 14:23:09 +03:00
}
// NewWrappedQueue will attempt to create a queue of the provided type,
// but if there is a problem creating this queue it will instead create
// a WrappedQueue with delayed startup of the queue instead and a
// channel which will be redirected to the queue
func NewWrappedQueue ( handle HandlerFunc , cfg , exemplar interface { } ) ( Queue , error ) {
configInterface , err := toConfig ( WrappedQueueConfiguration { } , cfg )
if err != nil {
return nil , err
}
config := configInterface . ( WrappedQueueConfiguration )
queue , err := NewQueue ( config . Underlying , handle , config . Config , exemplar )
if err == nil {
// Just return the queue there is no need to wrap
return queue , nil
}
if IsErrInvalidConfiguration ( err ) {
// Retrying ain't gonna make this any better...
return nil , ErrInvalidConfiguration { cfg : cfg }
}
queue = & WrappedQueue {
handle : handle ,
channel : make ( chan Data , config . QueueLength ) ,
exemplar : exemplar ,
delayedStarter : delayedStarter {
cfg : config . Config ,
underlying : config . Underlying ,
timeout : config . Timeout ,
maxAttempts : config . MaxAttempts ,
name : config . Name ,
} ,
}
2020-01-29 04:01:06 +03:00
_ = GetManager ( ) . Add ( queue , WrappedQueueType , config , exemplar )
2020-01-07 14:23:09 +03:00
return queue , nil
}
// Name returns the name of the queue
func ( q * WrappedQueue ) Name ( ) string {
return q . name + "-wrapper"
}
// Push will push the data to the internal channel checking it against the exemplar
func ( q * WrappedQueue ) Push ( data Data ) error {
2020-01-29 04:01:06 +03:00
if ! assignableTo ( data , q . exemplar ) {
return fmt . Errorf ( "unable to assign data: %v to same type as exemplar: %v in %s" , data , q . exemplar , q . name )
2020-01-07 14:23:09 +03:00
}
2020-01-29 04:01:06 +03:00
atomic . AddInt64 ( & q . numInQueue , 1 )
2020-01-07 14:23:09 +03:00
q . channel <- data
return nil
}
2020-01-29 04:01:06 +03:00
func ( q * WrappedQueue ) flushInternalWithContext ( ctx context . Context ) error {
q . lock . Lock ( )
if q . internal == nil {
q . lock . Unlock ( )
return fmt . Errorf ( "not ready to flush wrapped queue %s yet" , q . Name ( ) )
}
q . lock . Unlock ( )
select {
case <- ctx . Done ( ) :
return ctx . Err ( )
default :
}
return q . internal . FlushWithContext ( ctx )
}
// Flush flushes the queue and blocks till the queue is empty
func ( q * WrappedQueue ) Flush ( timeout time . Duration ) error {
var ctx context . Context
var cancel context . CancelFunc
if timeout > 0 {
ctx , cancel = context . WithTimeout ( context . Background ( ) , timeout )
} else {
ctx , cancel = context . WithCancel ( context . Background ( ) )
}
defer cancel ( )
return q . FlushWithContext ( ctx )
}
// FlushWithContext implements the final part of Flushable
func ( q * WrappedQueue ) FlushWithContext ( ctx context . Context ) error {
log . Trace ( "WrappedQueue: %s FlushWithContext" , q . Name ( ) )
errChan := make ( chan error , 1 )
go func ( ) {
errChan <- q . flushInternalWithContext ( ctx )
close ( errChan )
} ( )
select {
case err := <- errChan :
return err
case <- ctx . Done ( ) :
go func ( ) {
<- errChan
} ( )
return ctx . Err ( )
}
}
// IsEmpty checks whether the queue is empty
func ( q * WrappedQueue ) IsEmpty ( ) bool {
if atomic . LoadInt64 ( & q . numInQueue ) != 0 {
return false
}
q . lock . Lock ( )
defer q . lock . Unlock ( )
if q . internal == nil {
return false
}
return q . internal . IsEmpty ( )
}
2020-01-07 14:23:09 +03:00
// Run starts to run the queue and attempts to create the internal queue
func ( q * WrappedQueue ) Run ( atShutdown , atTerminate func ( context . Context , func ( ) ) ) {
2020-01-29 04:01:06 +03:00
log . Debug ( "WrappedQueue: %s Starting" , q . name )
2020-01-07 14:23:09 +03:00
q . lock . Lock ( )
if q . internal == nil {
err := q . setInternal ( atShutdown , q . handle , q . exemplar )
q . lock . Unlock ( )
if err != nil {
log . Fatal ( "Unable to set the internal queue for %s Error: %v" , q . Name ( ) , err )
return
}
go func ( ) {
for data := range q . channel {
_ = q . internal . Push ( data )
2020-01-29 04:01:06 +03:00
atomic . AddInt64 ( & q . numInQueue , - 1 )
2020-01-07 14:23:09 +03:00
}
} ( )
} else {
q . lock . Unlock ( )
}
q . internal . Run ( atShutdown , atTerminate )
log . Trace ( "WrappedQueue: %s Done" , q . name )
}
// Shutdown this queue and stop processing
func ( q * WrappedQueue ) Shutdown ( ) {
2020-01-29 04:01:06 +03:00
log . Trace ( "WrappedQueue: %s Shutting down" , q . name )
2020-01-07 14:23:09 +03:00
q . lock . Lock ( )
defer q . lock . Unlock ( )
if q . internal == nil {
return
}
if shutdownable , ok := q . internal . ( Shutdownable ) ; ok {
shutdownable . Shutdown ( )
}
2020-01-29 04:01:06 +03:00
log . Debug ( "WrappedQueue: %s Shutdown" , q . name )
2020-01-07 14:23:09 +03:00
}
// Terminate this queue and close the queue
func ( q * WrappedQueue ) Terminate ( ) {
log . Trace ( "WrappedQueue: %s Terminating" , q . name )
q . lock . Lock ( )
defer q . lock . Unlock ( )
if q . internal == nil {
return
}
if shutdownable , ok := q . internal . ( Shutdownable ) ; ok {
shutdownable . Terminate ( )
}
2020-01-29 04:01:06 +03:00
log . Debug ( "WrappedQueue: %s Terminated" , q . name )
2020-01-07 14:23:09 +03:00
}
func init ( ) {
queuesMap [ WrappedQueueType ] = NewWrappedQueue
}