2016-08-31 01:50:30 +03:00
// Copyright 2016 The Gogs Authors. All rights reserved.
2019-12-15 12:51:28 +03:00
// Copyright 2019 The Gitea Authors. All rights reserved.
2016-08-31 01:50:30 +03:00
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package sync
import (
2019-08-23 19:40:30 +03:00
"github.com/unknwon/com"
2016-08-31 01:50:30 +03:00
)
// UniqueQueue is a queue which guarantees only one instance of same
// identity is in the line. Instances with same identity will be
// discarded if there is already one in the line.
//
// This queue is particularly useful for preventing duplicated task
// of same purpose.
type UniqueQueue struct {
2019-12-15 12:51:28 +03:00
table * StatusTable
queue chan string
closed chan struct { }
2016-08-31 01:50:30 +03:00
}
// NewUniqueQueue initializes and returns a new UniqueQueue object.
func NewUniqueQueue ( queueLength int ) * UniqueQueue {
if queueLength <= 0 {
queueLength = 100
}
return & UniqueQueue {
2019-12-15 12:51:28 +03:00
table : NewStatusTable ( ) ,
queue : make ( chan string , queueLength ) ,
closed : make ( chan struct { } ) ,
2016-08-31 01:50:30 +03:00
}
}
2019-12-15 12:51:28 +03:00
// Close closes this queue
func ( q * UniqueQueue ) Close ( ) {
select {
case <- q . closed :
default :
q . table . lock . Lock ( )
select {
case <- q . closed :
default :
close ( q . closed )
}
q . table . lock . Unlock ( )
}
}
// IsClosed returns a channel that is closed when this Queue is closed
func ( q * UniqueQueue ) IsClosed ( ) <- chan struct { } {
return q . closed
}
// IDs returns the current ids in the pool
func ( q * UniqueQueue ) IDs ( ) [ ] interface { } {
q . table . lock . Lock ( )
defer q . table . lock . Unlock ( )
ids := make ( [ ] interface { } , 0 , len ( q . table . pool ) )
for id := range q . table . pool {
ids = append ( ids , id )
}
return ids
}
2016-08-31 01:50:30 +03:00
// Queue returns channel of queue for retrieving instances.
func ( q * UniqueQueue ) Queue ( ) <- chan string {
return q . queue
}
2017-03-15 03:52:01 +03:00
// Exist returns true if there is an instance with given identity
2016-08-31 01:50:30 +03:00
// exists in the queue.
func ( q * UniqueQueue ) Exist ( id interface { } ) bool {
return q . table . IsRunning ( com . ToStr ( id ) )
}
// AddFunc adds new instance to the queue with a custom runnable function,
// the queue is blocked until the function exits.
func ( q * UniqueQueue ) AddFunc ( id interface { } , fn func ( ) ) {
idStr := com . ToStr ( id )
q . table . lock . Lock ( )
2019-12-15 12:51:28 +03:00
if _ , ok := q . table . pool [ idStr ] ; ok {
2020-01-16 00:58:33 +03:00
q . table . lock . Unlock ( )
2019-12-15 12:51:28 +03:00
return
}
2017-02-09 09:39:06 +03:00
q . table . pool [ idStr ] = struct { } { }
2016-08-31 01:50:30 +03:00
if fn != nil {
fn ( )
}
q . table . lock . Unlock ( )
2019-12-15 12:51:28 +03:00
select {
case <- q . closed :
return
case q . queue <- idStr :
return
}
2016-08-31 01:50:30 +03:00
}
// Add adds new instance to the queue.
func ( q * UniqueQueue ) Add ( id interface { } ) {
q . AddFunc ( id , nil )
}
// Remove removes instance from the queue.
func ( q * UniqueQueue ) Remove ( id interface { } ) {
q . table . Stop ( com . ToStr ( id ) )
}