2020-01-07 14:23:09 +03:00
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"context"
"fmt"
2020-01-29 04:01:06 +03:00
"time"
2020-01-07 14:23:09 +03:00
)
// ErrInvalidConfiguration is called when there is invalid configuration for a queue
type ErrInvalidConfiguration struct {
cfg interface { }
err error
}
func ( err ErrInvalidConfiguration ) Error ( ) string {
if err . err != nil {
return fmt . Sprintf ( "Invalid Configuration Argument: %v: Error: %v" , err . cfg , err . err )
}
return fmt . Sprintf ( "Invalid Configuration Argument: %v" , err . cfg )
}
// IsErrInvalidConfiguration checks if an error is an ErrInvalidConfiguration
func IsErrInvalidConfiguration ( err error ) bool {
_ , ok := err . ( ErrInvalidConfiguration )
return ok
}
// Type is a type of Queue
type Type string
// Data defines an type of queuable data
type Data interface { }
// HandlerFunc is a function that takes a variable amount of data and processes it
type HandlerFunc func ( ... Data )
// NewQueueFunc is a function that creates a queue
2022-01-20 20:46:10 +03:00
type NewQueueFunc func ( handler HandlerFunc , config , exemplar interface { } ) ( Queue , error )
2020-01-07 14:23:09 +03:00
// Shutdownable represents a queue that can be shutdown
type Shutdownable interface {
Shutdown ( )
Terminate ( )
}
// Named represents a queue with a name
type Named interface {
Name ( ) string
}
2020-01-29 04:01:06 +03:00
// Queue defines an interface of a queue-like item
//
// Queues will handle their own contents in the Run method
2020-01-07 14:23:09 +03:00
type Queue interface {
2020-01-29 04:01:06 +03:00
Flushable
2021-05-15 17:22:26 +03:00
Run ( atShutdown , atTerminate func ( func ( ) ) )
2020-01-07 14:23:09 +03:00
Push ( Data ) error
}
// DummyQueueType is the type for the dummy queue
const DummyQueueType Type = "dummy"
// NewDummyQueue creates a new DummyQueue
func NewDummyQueue ( handler HandlerFunc , opts , exemplar interface { } ) ( Queue , error ) {
return & DummyQueue { } , nil
}
// DummyQueue represents an empty queue
2022-01-20 20:46:10 +03:00
type DummyQueue struct { }
2020-01-07 14:23:09 +03:00
2020-01-29 04:01:06 +03:00
// Run does nothing
2021-05-15 17:22:26 +03:00
func ( * DummyQueue ) Run ( _ , _ func ( func ( ) ) ) { }
2020-01-07 14:23:09 +03:00
2020-01-29 04:01:06 +03:00
// Push fakes a push of data to the queue
2020-02-03 02:19:58 +03:00
func ( * DummyQueue ) Push ( Data ) error {
2020-01-07 14:23:09 +03:00
return nil
}
2020-02-03 02:19:58 +03:00
// PushFunc fakes a push of data to the queue with a function. The function is never run.
func ( * DummyQueue ) PushFunc ( Data , func ( ) error ) error {
return nil
}
// Has always returns false as this queue never does anything
func ( * DummyQueue ) Has ( Data ) ( bool , error ) {
return false , nil
}
2020-01-29 04:01:06 +03:00
// Flush always returns nil
2020-02-03 02:19:58 +03:00
func ( * DummyQueue ) Flush ( time . Duration ) error {
2020-01-29 04:01:06 +03:00
return nil
}
2020-01-07 14:23:09 +03:00
2020-01-29 04:01:06 +03:00
// FlushWithContext always returns nil
2020-02-03 02:19:58 +03:00
func ( * DummyQueue ) FlushWithContext ( context . Context ) error {
2020-01-29 04:01:06 +03:00
return nil
}
// IsEmpty asserts that the queue is empty
2020-02-03 02:19:58 +03:00
func ( * DummyQueue ) IsEmpty ( ) bool {
2020-01-29 04:01:06 +03:00
return true
2020-01-07 14:23:09 +03:00
}
2020-09-07 18:05:08 +03:00
// ImmediateType is the type to execute the function when push
const ImmediateType Type = "immediate"
// NewImmediate creates a new false queue to execute the function when push
func NewImmediate ( handler HandlerFunc , opts , exemplar interface { } ) ( Queue , error ) {
return & Immediate {
handler : handler ,
} , nil
}
// Immediate represents an direct execution queue
type Immediate struct {
handler HandlerFunc
}
// Run does nothing
2021-05-15 17:22:26 +03:00
func ( * Immediate ) Run ( _ , _ func ( func ( ) ) ) { }
2020-09-07 18:05:08 +03:00
// Push fakes a push of data to the queue
func ( q * Immediate ) Push ( data Data ) error {
return q . PushFunc ( data , nil )
}
// PushFunc fakes a push of data to the queue with a function. The function is never run.
func ( q * Immediate ) PushFunc ( data Data , f func ( ) error ) error {
if f != nil {
if err := f ( ) ; err != nil {
return err
}
}
q . handler ( data )
return nil
}
// Has always returns false as this queue never does anything
func ( * Immediate ) Has ( Data ) ( bool , error ) {
return false , nil
}
// Flush always returns nil
func ( * Immediate ) Flush ( time . Duration ) error {
return nil
}
// FlushWithContext always returns nil
func ( * Immediate ) FlushWithContext ( context . Context ) error {
return nil
}
// IsEmpty asserts that the queue is empty
func ( * Immediate ) IsEmpty ( ) bool {
return true
}
var queuesMap = map [ Type ] NewQueueFunc {
DummyQueueType : NewDummyQueue ,
ImmediateType : NewImmediate ,
}
2020-01-07 14:23:09 +03:00
// RegisteredTypes provides the list of requested types of queues
func RegisteredTypes ( ) [ ] Type {
types := make ( [ ] Type , len ( queuesMap ) )
i := 0
for key := range queuesMap {
types [ i ] = key
i ++
}
return types
}
// RegisteredTypesAsString provides the list of requested types of queues
func RegisteredTypesAsString ( ) [ ] string {
types := make ( [ ] string , len ( queuesMap ) )
i := 0
for key := range queuesMap {
types [ i ] = string ( key )
i ++
}
return types
}
2020-01-29 04:01:06 +03:00
// NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
2020-01-07 14:23:09 +03:00
func NewQueue ( queueType Type , handlerFunc HandlerFunc , opts , exemplar interface { } ) ( Queue , error ) {
newFn , ok := queuesMap [ queueType ]
if ! ok {
return nil , fmt . Errorf ( "Unsupported queue type: %v" , queueType )
}
return newFn ( handlerFunc , opts , exemplar )
}