2020-01-07 14:23:09 +03:00
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"fmt"
2020-02-03 02:19:58 +03:00
"strings"
2020-01-07 14:23:09 +03:00
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
2021-03-02 00:08:10 +03:00
jsoniter "github.com/json-iterator/go"
2020-01-07 14:23:09 +03:00
)
func validType ( t string ) ( Type , error ) {
if len ( t ) == 0 {
return PersistableChannelQueueType , nil
}
for _ , typ := range RegisteredTypes ( ) {
if t == string ( typ ) {
return typ , nil
}
}
return PersistableChannelQueueType , fmt . Errorf ( "Unknown queue type: %s defaulting to %s" , t , string ( PersistableChannelQueueType ) )
}
2020-01-29 04:01:06 +03:00
func getQueueSettings ( name string ) ( setting . QueueSettings , [ ] byte ) {
2020-01-07 14:23:09 +03:00
q := setting . GetQueueSettings ( name )
2021-03-02 00:08:10 +03:00
json := jsoniter . ConfigCompatibleWithStandardLibrary
2020-10-16 00:40:03 +03:00
cfg , err := json . Marshal ( q )
2020-01-07 14:23:09 +03:00
if err != nil {
2020-10-16 00:40:03 +03:00
log . Error ( "Unable to marshall generic options: %v Error: %v" , q , err )
2020-01-07 14:23:09 +03:00
log . Error ( "Unable to create queue for %s" , name , err )
2020-01-29 04:01:06 +03:00
return q , [ ] byte { }
}
return q , cfg
}
// CreateQueue for name with provided handler and exemplar
func CreateQueue ( name string , handle HandlerFunc , exemplar interface { } ) Queue {
q , cfg := getQueueSettings ( name )
if len ( cfg ) == 0 {
2020-01-07 14:23:09 +03:00
return nil
}
2020-01-29 04:01:06 +03:00
typ , err := validType ( q . Type )
if err != nil {
log . Error ( "Invalid type %s provided for queue named %s defaulting to %s" , q . Type , name , string ( typ ) )
}
2020-01-07 14:23:09 +03:00
returnable , err := NewQueue ( typ , handle , cfg , exemplar )
if q . WrapIfNecessary && err != nil {
log . Warn ( "Unable to create queue for %s: %v" , name , err )
log . Warn ( "Attempting to create wrapped queue" )
returnable , err = NewQueue ( WrappedQueueType , handle , WrappedQueueConfiguration {
2020-01-29 04:01:06 +03:00
Underlying : typ ,
2020-01-07 14:23:09 +03:00
Timeout : q . Timeout ,
MaxAttempts : q . MaxAttempts ,
Config : cfg ,
2020-10-16 00:40:03 +03:00
QueueLength : q . QueueLength ,
2020-07-05 22:38:03 +03:00
Name : name ,
2020-01-07 14:23:09 +03:00
} , exemplar )
}
if err != nil {
log . Error ( "Unable to create queue for %s: %v" , name , err )
return nil
}
return returnable
}
2020-02-03 02:19:58 +03:00
// CreateUniqueQueue for name with provided handler and exemplar
func CreateUniqueQueue ( name string , handle HandlerFunc , exemplar interface { } ) UniqueQueue {
q , cfg := getQueueSettings ( name )
if len ( cfg ) == 0 {
return nil
}
if len ( q . Type ) > 0 && q . Type != "dummy" && ! strings . HasPrefix ( q . Type , "unique-" ) {
q . Type = "unique-" + q . Type
}
typ , err := validType ( q . Type )
if err != nil || typ == PersistableChannelQueueType {
typ = PersistableChannelUniqueQueueType
if err != nil {
log . Error ( "Invalid type %s provided for queue named %s defaulting to %s" , q . Type , name , string ( typ ) )
}
}
returnable , err := NewQueue ( typ , handle , cfg , exemplar )
if q . WrapIfNecessary && err != nil {
log . Warn ( "Unable to create unique queue for %s: %v" , name , err )
log . Warn ( "Attempting to create wrapped queue" )
returnable , err = NewQueue ( WrappedUniqueQueueType , handle , WrappedUniqueQueueConfiguration {
Underlying : typ ,
Timeout : q . Timeout ,
MaxAttempts : q . MaxAttempts ,
Config : cfg ,
2020-10-16 00:40:03 +03:00
QueueLength : q . QueueLength ,
2020-02-03 02:19:58 +03:00
} , exemplar )
}
if err != nil {
log . Error ( "Unable to create unique queue for %s: %v" , name , err )
return nil
}
return returnable . ( UniqueQueue )
}