2020-02-02 23:19:58 +00:00
// Copyright 2019 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
2021-05-15 15:22:26 +01:00
"context"
2020-09-27 22:09:46 +01:00
"code.gitea.io/gitea/modules/nosql"
2020-02-02 23:19:58 +00:00
"gitea.com/lunny/levelqueue"
)
// LevelUniqueQueueType is the type for level queue
const LevelUniqueQueueType Type = "unique-level"
// LevelUniqueQueueConfiguration is the configuration for a LevelUniqueQueue
type LevelUniqueQueueConfiguration struct {
ByteFIFOQueueConfiguration
2020-09-27 22:09:46 +01:00
DataDir string
ConnectionString string
QueueName string
2020-02-02 23:19:58 +00:00
}
// LevelUniqueQueue implements a disk library queue
type LevelUniqueQueue struct {
* ByteFIFOUniqueQueue
}
// NewLevelUniqueQueue creates a ledis local queue
//
// Please note that this Queue does not guarantee that a particular
// task cannot be processed twice or more at the same time. Uniqueness is
// only guaranteed whilst the task is waiting in the queue.
func NewLevelUniqueQueue ( handle HandlerFunc , cfg , exemplar interface { } ) ( Queue , error ) {
configInterface , err := toConfig ( LevelUniqueQueueConfiguration { } , cfg )
if err != nil {
return nil , err
}
config := configInterface . ( LevelUniqueQueueConfiguration )
2020-09-27 22:09:46 +01:00
if len ( config . ConnectionString ) == 0 {
config . ConnectionString = config . DataDir
}
2021-05-15 15:22:26 +01:00
config . WaitOnEmpty = true
2020-09-27 22:09:46 +01:00
byteFIFO , err := NewLevelUniqueQueueByteFIFO ( config . ConnectionString , config . QueueName )
2020-02-02 23:19:58 +00:00
if err != nil {
return nil , err
}
byteFIFOQueue , err := NewByteFIFOUniqueQueue ( LevelUniqueQueueType , byteFIFO , handle , config . ByteFIFOQueueConfiguration , exemplar )
if err != nil {
return nil , err
}
queue := & LevelUniqueQueue {
ByteFIFOUniqueQueue : byteFIFOQueue ,
}
queue . qid = GetManager ( ) . Add ( queue , LevelUniqueQueueType , config , exemplar )
return queue , nil
}
2021-04-09 09:40:34 +02:00
var _ UniqueByteFIFO = & LevelUniqueQueueByteFIFO { }
2020-02-02 23:19:58 +00:00
// LevelUniqueQueueByteFIFO represents a ByteFIFO formed from a LevelUniqueQueue
type LevelUniqueQueueByteFIFO struct {
2020-09-27 22:09:46 +01:00
internal * levelqueue . UniqueQueue
connection string
2020-02-02 23:19:58 +00:00
}
// NewLevelUniqueQueueByteFIFO creates a new ByteFIFO formed from a LevelUniqueQueue
2020-09-27 22:09:46 +01:00
func NewLevelUniqueQueueByteFIFO ( connection , prefix string ) ( * LevelUniqueQueueByteFIFO , error ) {
db , err := nosql . GetManager ( ) . GetLevelDB ( connection )
if err != nil {
return nil , err
}
internal , err := levelqueue . NewUniqueQueue ( db , [ ] byte ( prefix ) , [ ] byte ( prefix + "-unique" ) , false )
2020-02-02 23:19:58 +00:00
if err != nil {
return nil , err
}
return & LevelUniqueQueueByteFIFO {
2020-09-27 22:09:46 +01:00
connection : connection ,
internal : internal ,
2020-02-02 23:19:58 +00:00
} , nil
}
// PushFunc pushes data to the end of the fifo and calls the callback if it is added
2021-05-15 15:22:26 +01:00
func ( fifo * LevelUniqueQueueByteFIFO ) PushFunc ( ctx context . Context , data [ ] byte , fn func ( ) error ) error {
2020-02-02 23:19:58 +00:00
return fifo . internal . LPushFunc ( data , fn )
}
2022-01-22 21:22:14 +00:00
// PushBack pushes data to the top of the fifo
func ( fifo * LevelUniqueQueueByteFIFO ) PushBack ( ctx context . Context , data [ ] byte ) error {
return fifo . internal . RPush ( data )
}
2020-02-02 23:19:58 +00:00
// Pop pops data from the start of the fifo
2021-05-15 15:22:26 +01:00
func ( fifo * LevelUniqueQueueByteFIFO ) Pop ( ctx context . Context ) ( [ ] byte , error ) {
2020-02-02 23:19:58 +00:00
data , err := fifo . internal . RPop ( )
if err != nil && err != levelqueue . ErrNotFound {
return nil , err
}
return data , nil
}
// Len returns the length of the fifo
2021-05-15 15:22:26 +01:00
func ( fifo * LevelUniqueQueueByteFIFO ) Len ( ctx context . Context ) int64 {
2020-02-02 23:19:58 +00:00
return fifo . internal . Len ( )
}
// Has returns whether the fifo contains this data
2021-05-15 15:22:26 +01:00
func ( fifo * LevelUniqueQueueByteFIFO ) Has ( ctx context . Context , data [ ] byte ) ( bool , error ) {
2020-02-02 23:19:58 +00:00
return fifo . internal . Has ( data )
}
// Close this fifo
func ( fifo * LevelUniqueQueueByteFIFO ) Close ( ) error {
2020-09-27 22:09:46 +01:00
err := fifo . internal . Close ( )
_ = nosql . GetManager ( ) . CloseLevelDB ( fifo . connection )
return err
2020-02-02 23:19:58 +00:00
}
func init ( ) {
queuesMap [ LevelUniqueQueueType ] = NewLevelUniqueQueue
}