2020-01-07 11:23:09 +00:00
// Copyright 2019 The Gitea Authors. All rights reserved.
2022-11-27 13:20:29 -05:00
// SPDX-License-Identifier: MIT
2020-01-07 11:23:09 +00:00
package queue
import (
2021-07-17 18:09:56 +01:00
"sync"
2020-01-07 11:23:09 +00:00
"testing"
2022-01-22 21:22:14 +00:00
"time"
2020-01-07 11:23:09 +00:00
2022-01-22 21:22:14 +00:00
"code.gitea.io/gitea/modules/log"
2021-11-17 20:34:35 +08:00
2020-01-07 11:23:09 +00:00
"github.com/stretchr/testify/assert"
)
func TestPersistableChannelQueue ( t * testing . T ) {
handleChan := make ( chan * testData )
2022-01-22 21:22:14 +00:00
handle := func ( data ... Data ) [ ] Data {
2020-01-07 11:23:09 +00:00
for _ , datum := range data {
2021-11-19 01:13:25 +00:00
if datum == nil {
continue
}
2020-01-07 11:23:09 +00:00
testDatum := datum . ( * testData )
handleChan <- testDatum
}
2022-01-22 21:22:14 +00:00
return nil
2020-01-07 11:23:09 +00:00
}
2021-07-17 18:09:56 +01:00
lock := sync . Mutex { }
2020-01-07 11:23:09 +00:00
queueShutdown := [ ] func ( ) { }
queueTerminate := [ ] func ( ) { }
2022-09-04 23:14:53 +08:00
tmpDir := t . TempDir ( )
2020-01-07 11:23:09 +00:00
queue , err := NewPersistableChannelQueue ( handle , PersistableChannelQueueConfiguration {
2021-05-15 15:22:26 +01:00
DataDir : tmpDir ,
BatchLength : 2 ,
QueueLength : 20 ,
Workers : 1 ,
BoostWorkers : 0 ,
MaxWorkers : 10 ,
2023-02-28 22:55:43 +00:00
Name : "test-queue" ,
2020-01-07 11:23:09 +00:00
} , & testData { } )
assert . NoError ( t , err )
2021-11-19 01:13:25 +00:00
readyForShutdown := make ( chan struct { } )
readyForTerminate := make ( chan struct { } )
2021-05-15 15:22:26 +01:00
go queue . Run ( func ( shutdown func ( ) ) {
2021-07-17 18:09:56 +01:00
lock . Lock ( )
defer lock . Unlock ( )
2021-11-19 01:13:25 +00:00
select {
case <- readyForShutdown :
default :
close ( readyForShutdown )
}
2020-01-07 11:23:09 +00:00
queueShutdown = append ( queueShutdown , shutdown )
2021-05-15 15:22:26 +01:00
} , func ( terminate func ( ) ) {
2021-07-17 18:09:56 +01:00
lock . Lock ( )
defer lock . Unlock ( )
2021-11-19 01:13:25 +00:00
select {
case <- readyForTerminate :
default :
close ( readyForTerminate )
}
2020-01-07 11:23:09 +00:00
queueTerminate = append ( queueTerminate , terminate )
} )
test1 := testData { "A" , 1 }
test2 := testData { "B" , 2 }
err = queue . Push ( & test1 )
assert . NoError ( t , err )
go func ( ) {
2020-09-05 23:50:57 +01:00
err := queue . Push ( & test2 )
2020-01-07 11:23:09 +00:00
assert . NoError ( t , err )
} ( )
result1 := <- handleChan
assert . Equal ( t , test1 . TestString , result1 . TestString )
assert . Equal ( t , test1 . TestInt , result1 . TestInt )
result2 := <- handleChan
assert . Equal ( t , test2 . TestString , result2 . TestString )
assert . Equal ( t , test2 . TestInt , result2 . TestInt )
2021-05-15 15:22:26 +01:00
// test1 is a testData not a *testData so will be rejected
2020-01-07 11:23:09 +00:00
err = queue . Push ( test1 )
assert . Error ( t , err )
2021-11-19 01:13:25 +00:00
<- readyForShutdown
2021-05-15 15:22:26 +01:00
// Now shutdown the queue
2021-07-17 18:09:56 +01:00
lock . Lock ( )
callbacks := make ( [ ] func ( ) , len ( queueShutdown ) )
copy ( callbacks , queueShutdown )
lock . Unlock ( )
for _ , callback := range callbacks {
2020-01-07 11:23:09 +00:00
callback ( )
}
2021-05-15 15:22:26 +01:00
// Wait til it is closed
<- queue . ( * PersistableChannelQueue ) . closed
2020-01-07 11:23:09 +00:00
err = queue . Push ( & test1 )
assert . NoError ( t , err )
err = queue . Push ( & test2 )
assert . NoError ( t , err )
select {
case <- handleChan :
assert . Fail ( t , "Handler processing should have stopped" )
default :
}
2021-05-15 15:22:26 +01:00
// terminate the queue
2021-11-19 01:13:25 +00:00
<- readyForTerminate
2021-07-17 18:09:56 +01:00
lock . Lock ( )
callbacks = make ( [ ] func ( ) , len ( queueTerminate ) )
copy ( callbacks , queueTerminate )
lock . Unlock ( )
for _ , callback := range callbacks {
2020-01-07 11:23:09 +00:00
callback ( )
}
2021-05-15 15:22:26 +01:00
select {
case <- handleChan :
assert . Fail ( t , "Handler processing should have stopped" )
default :
}
2020-01-07 11:23:09 +00:00
// Reopen queue
queue , err = NewPersistableChannelQueue ( handle , PersistableChannelQueueConfiguration {
2021-05-15 15:22:26 +01:00
DataDir : tmpDir ,
BatchLength : 2 ,
QueueLength : 20 ,
Workers : 1 ,
BoostWorkers : 0 ,
MaxWorkers : 10 ,
2023-02-28 22:55:43 +00:00
Name : "test-queue" ,
2020-01-07 11:23:09 +00:00
} , & testData { } )
assert . NoError ( t , err )
2021-11-19 01:13:25 +00:00
readyForShutdown = make ( chan struct { } )
readyForTerminate = make ( chan struct { } )
2021-05-15 15:22:26 +01:00
go queue . Run ( func ( shutdown func ( ) ) {
2021-07-17 18:09:56 +01:00
lock . Lock ( )
defer lock . Unlock ( )
2021-11-19 01:13:25 +00:00
select {
case <- readyForShutdown :
default :
close ( readyForShutdown )
}
2020-01-07 11:23:09 +00:00
queueShutdown = append ( queueShutdown , shutdown )
2021-05-15 15:22:26 +01:00
} , func ( terminate func ( ) ) {
2021-07-17 18:09:56 +01:00
lock . Lock ( )
defer lock . Unlock ( )
2021-11-19 01:13:25 +00:00
select {
case <- readyForTerminate :
default :
close ( readyForTerminate )
}
2020-01-07 11:23:09 +00:00
queueTerminate = append ( queueTerminate , terminate )
} )
result3 := <- handleChan
assert . Equal ( t , test1 . TestString , result3 . TestString )
assert . Equal ( t , test1 . TestInt , result3 . TestInt )
result4 := <- handleChan
assert . Equal ( t , test2 . TestString , result4 . TestString )
assert . Equal ( t , test2 . TestInt , result4 . TestInt )
2021-07-17 18:09:56 +01:00
2021-11-19 01:13:25 +00:00
<- readyForShutdown
2021-07-17 18:09:56 +01:00
lock . Lock ( )
callbacks = make ( [ ] func ( ) , len ( queueShutdown ) )
copy ( callbacks , queueShutdown )
lock . Unlock ( )
for _ , callback := range callbacks {
2020-01-07 11:23:09 +00:00
callback ( )
}
2021-11-19 01:13:25 +00:00
<- readyForTerminate
2021-07-17 18:09:56 +01:00
lock . Lock ( )
callbacks = make ( [ ] func ( ) , len ( queueTerminate ) )
copy ( callbacks , queueTerminate )
lock . Unlock ( )
for _ , callback := range callbacks {
2020-01-07 11:23:09 +00:00
callback ( )
}
}
2022-01-22 21:22:14 +00:00
func TestPersistableChannelQueue_Pause ( t * testing . T ) {
lock := sync . Mutex { }
var queue Queue
var err error
pushBack := false
handleChan := make ( chan * testData )
handle := func ( data ... Data ) [ ] Data {
lock . Lock ( )
if pushBack {
if pausable , ok := queue . ( Pausable ) ; ok {
log . Info ( "pausing" )
pausable . Pause ( )
}
lock . Unlock ( )
return data
}
lock . Unlock ( )
for _ , datum := range data {
testDatum := datum . ( * testData )
handleChan <- testDatum
}
return nil
}
queueShutdown := [ ] func ( ) { }
queueTerminate := [ ] func ( ) { }
2022-04-02 08:59:04 +01:00
terminated := make ( chan struct { } )
2022-01-22 21:22:14 +00:00
2022-09-04 23:14:53 +08:00
tmpDir := t . TempDir ( )
2022-01-22 21:22:14 +00:00
queue , err = NewPersistableChannelQueue ( handle , PersistableChannelQueueConfiguration {
DataDir : tmpDir ,
BatchLength : 2 ,
QueueLength : 20 ,
Workers : 1 ,
BoostWorkers : 0 ,
MaxWorkers : 10 ,
2023-02-28 22:55:43 +00:00
Name : "test-queue" ,
2022-01-22 21:22:14 +00:00
} , & testData { } )
assert . NoError ( t , err )
2022-04-02 08:59:04 +01:00
go func ( ) {
queue . Run ( func ( shutdown func ( ) ) {
lock . Lock ( )
defer lock . Unlock ( )
queueShutdown = append ( queueShutdown , shutdown )
} , func ( terminate func ( ) ) {
lock . Lock ( )
defer lock . Unlock ( )
queueTerminate = append ( queueTerminate , terminate )
} )
close ( terminated )
} ( )
2022-01-22 21:22:14 +00:00
2022-01-29 11:37:08 +00:00
// Shutdown and Terminate in defer
defer func ( ) {
lock . Lock ( )
callbacks := make ( [ ] func ( ) , len ( queueShutdown ) )
copy ( callbacks , queueShutdown )
lock . Unlock ( )
for _ , callback := range callbacks {
callback ( )
}
lock . Lock ( )
log . Info ( "Finally terminating" )
callbacks = make ( [ ] func ( ) , len ( queueTerminate ) )
copy ( callbacks , queueTerminate )
lock . Unlock ( )
for _ , callback := range callbacks {
callback ( )
}
} ( )
2022-01-22 21:22:14 +00:00
test1 := testData { "A" , 1 }
test2 := testData { "B" , 2 }
err = queue . Push ( & test1 )
assert . NoError ( t , err )
pausable , ok := queue . ( Pausable )
if ! assert . True ( t , ok ) {
return
}
result1 := <- handleChan
assert . Equal ( t , test1 . TestString , result1 . TestString )
assert . Equal ( t , test1 . TestInt , result1 . TestInt )
pausable . Pause ( )
2022-01-29 11:37:08 +00:00
paused , _ := pausable . IsPausedIsResumed ( )
2022-01-22 21:22:14 +00:00
select {
case <- paused :
2022-01-29 11:37:08 +00:00
case <- time . After ( 100 * time . Millisecond ) :
2022-01-22 21:22:14 +00:00
assert . Fail ( t , "Queue is not paused" )
return
}
queue . Push ( & test2 )
var result2 * testData
select {
case result2 = <- handleChan :
assert . Fail ( t , "handler chan should be empty" )
case <- time . After ( 100 * time . Millisecond ) :
}
assert . Nil ( t , result2 )
pausable . Resume ( )
2022-01-29 11:37:08 +00:00
_ , resumed := pausable . IsPausedIsResumed ( )
2022-01-22 21:22:14 +00:00
select {
case <- resumed :
2022-01-29 11:37:08 +00:00
case <- time . After ( 100 * time . Millisecond ) :
2022-01-22 21:22:14 +00:00
assert . Fail ( t , "Queue should be resumed" )
2022-01-25 23:09:57 +00:00
return
2022-01-22 21:22:14 +00:00
}
select {
case result2 = <- handleChan :
case <- time . After ( 500 * time . Millisecond ) :
assert . Fail ( t , "handler chan should contain test2" )
}
assert . Equal ( t , test2 . TestString , result2 . TestString )
assert . Equal ( t , test2 . TestInt , result2 . TestInt )
2022-01-29 11:37:08 +00:00
// Set pushBack to so that the next handle will result in a Pause
2022-01-22 21:22:14 +00:00
lock . Lock ( )
pushBack = true
lock . Unlock ( )
2022-01-29 11:37:08 +00:00
// Ensure that we're still resumed
_ , resumed = pausable . IsPausedIsResumed ( )
2022-01-22 21:22:14 +00:00
select {
case <- resumed :
2022-01-29 11:37:08 +00:00
case <- time . After ( 100 * time . Millisecond ) :
2022-01-22 21:22:14 +00:00
assert . Fail ( t , "Queue is not resumed" )
return
}
2022-01-29 11:37:08 +00:00
// push test1
2022-01-22 21:22:14 +00:00
queue . Push ( & test1 )
2022-01-29 11:37:08 +00:00
// Now as this is handled it should pause
paused , _ = pausable . IsPausedIsResumed ( )
2022-01-22 21:22:14 +00:00
select {
case <- paused :
case <- handleChan :
assert . Fail ( t , "handler chan should not contain test1" )
return
case <- time . After ( 500 * time . Millisecond ) :
assert . Fail ( t , "queue should be paused" )
return
}
2022-01-29 11:37:08 +00:00
lock . Lock ( )
pushBack = false
lock . Unlock ( )
2022-01-22 21:22:14 +00:00
pausable . Resume ( )
2022-01-29 11:37:08 +00:00
_ , resumed = pausable . IsPausedIsResumed ( )
2022-01-22 21:22:14 +00:00
select {
case <- resumed :
2022-01-29 11:37:08 +00:00
case <- time . After ( 500 * time . Millisecond ) :
2022-01-22 21:22:14 +00:00
assert . Fail ( t , "Queue should be resumed" )
2022-01-25 23:09:57 +00:00
return
2022-01-22 21:22:14 +00:00
}
select {
case result1 = <- handleChan :
case <- time . After ( 500 * time . Millisecond ) :
assert . Fail ( t , "handler chan should contain test1" )
2022-01-25 23:09:57 +00:00
return
2022-01-22 21:22:14 +00:00
}
assert . Equal ( t , test1 . TestString , result1 . TestString )
assert . Equal ( t , test1 . TestInt , result1 . TestInt )
lock . Lock ( )
callbacks := make ( [ ] func ( ) , len ( queueShutdown ) )
copy ( callbacks , queueShutdown )
2022-01-29 11:37:08 +00:00
queueShutdown = queueShutdown [ : 0 ]
2022-01-22 21:22:14 +00:00
lock . Unlock ( )
// Now shutdown the queue
for _ , callback := range callbacks {
callback ( )
}
// Wait til it is closed
2022-01-25 23:09:57 +00:00
select {
case <- queue . ( * PersistableChannelQueue ) . closed :
case <- time . After ( 5 * time . Second ) :
assert . Fail ( t , "queue should close" )
return
}
2022-01-22 21:22:14 +00:00
err = queue . Push ( & test1 )
assert . NoError ( t , err )
err = queue . Push ( & test2 )
assert . NoError ( t , err )
select {
case <- handleChan :
assert . Fail ( t , "Handler processing should have stopped" )
2022-01-25 23:09:57 +00:00
return
2022-01-22 21:22:14 +00:00
default :
}
// terminate the queue
lock . Lock ( )
callbacks = make ( [ ] func ( ) , len ( queueTerminate ) )
copy ( callbacks , queueTerminate )
2022-01-29 11:37:08 +00:00
queueShutdown = queueTerminate [ : 0 ]
2022-01-22 21:22:14 +00:00
lock . Unlock ( )
for _ , callback := range callbacks {
callback ( )
}
select {
case <- handleChan :
assert . Fail ( t , "Handler processing should have stopped" )
2022-01-25 23:09:57 +00:00
return
2022-04-02 08:59:04 +01:00
case <- terminated :
case <- time . After ( 10 * time . Second ) :
assert . Fail ( t , "Queue should have terminated" )
return
2022-01-22 21:22:14 +00:00
}
lock . Lock ( )
pushBack = true
lock . Unlock ( )
// Reopen queue
2022-04-02 08:59:04 +01:00
terminated = make ( chan struct { } )
2022-01-22 21:22:14 +00:00
queue , err = NewPersistableChannelQueue ( handle , PersistableChannelQueueConfiguration {
DataDir : tmpDir ,
BatchLength : 1 ,
QueueLength : 20 ,
Workers : 1 ,
BoostWorkers : 0 ,
MaxWorkers : 10 ,
2023-02-28 22:55:43 +00:00
Name : "test-queue" ,
2022-01-22 21:22:14 +00:00
} , & testData { } )
assert . NoError ( t , err )
pausable , ok = queue . ( Pausable )
if ! assert . True ( t , ok ) {
return
}
paused , _ = pausable . IsPausedIsResumed ( )
2022-04-02 08:59:04 +01:00
go func ( ) {
queue . Run ( func ( shutdown func ( ) ) {
lock . Lock ( )
defer lock . Unlock ( )
queueShutdown = append ( queueShutdown , shutdown )
} , func ( terminate func ( ) ) {
lock . Lock ( )
defer lock . Unlock ( )
queueTerminate = append ( queueTerminate , terminate )
} )
close ( terminated )
} ( )
2022-01-22 21:22:14 +00:00
select {
case <- handleChan :
assert . Fail ( t , "Handler processing should have stopped" )
2022-01-25 23:09:57 +00:00
return
2022-01-22 21:22:14 +00:00
case <- paused :
}
2022-01-29 11:37:08 +00:00
paused , _ = pausable . IsPausedIsResumed ( )
2022-01-22 21:22:14 +00:00
select {
case <- paused :
2022-01-29 11:37:08 +00:00
case <- time . After ( 500 * time . Millisecond ) :
2022-01-22 21:22:14 +00:00
assert . Fail ( t , "Queue is not paused" )
return
}
select {
case <- handleChan :
assert . Fail ( t , "Handler processing should have stopped" )
2022-01-25 23:09:57 +00:00
return
2022-01-22 21:22:14 +00:00
default :
}
2022-01-29 11:37:08 +00:00
lock . Lock ( )
pushBack = false
lock . Unlock ( )
2022-01-22 21:22:14 +00:00
pausable . Resume ( )
2022-01-29 11:37:08 +00:00
_ , resumed = pausable . IsPausedIsResumed ( )
2022-01-25 23:09:57 +00:00
select {
case <- resumed :
2022-01-29 11:37:08 +00:00
case <- time . After ( 500 * time . Millisecond ) :
2022-01-25 23:09:57 +00:00
assert . Fail ( t , "Queue should be resumed" )
return
}
2022-01-22 21:22:14 +00:00
2022-01-25 23:09:57 +00:00
var result3 , result4 * testData
select {
case result3 = <- handleChan :
case <- time . After ( 1 * time . Second ) :
assert . Fail ( t , "Handler processing should have resumed" )
return
}
select {
case result4 = <- handleChan :
case <- time . After ( 1 * time . Second ) :
assert . Fail ( t , "Handler processing should have resumed" )
return
}
2022-01-22 21:22:14 +00:00
if result4 . TestString == test1 . TestString {
result3 , result4 = result4 , result3
}
assert . Equal ( t , test1 . TestString , result3 . TestString )
assert . Equal ( t , test1 . TestInt , result3 . TestInt )
assert . Equal ( t , test2 . TestString , result4 . TestString )
assert . Equal ( t , test2 . TestInt , result4 . TestInt )
2022-04-02 08:59:04 +01:00
lock . Lock ( )
callbacks = make ( [ ] func ( ) , len ( queueShutdown ) )
copy ( callbacks , queueShutdown )
queueShutdown = queueShutdown [ : 0 ]
lock . Unlock ( )
// Now shutdown the queue
for _ , callback := range callbacks {
callback ( )
}
// terminate the queue
lock . Lock ( )
callbacks = make ( [ ] func ( ) , len ( queueTerminate ) )
copy ( callbacks , queueTerminate )
queueShutdown = queueTerminate [ : 0 ]
lock . Unlock ( )
for _ , callback := range callbacks {
callback ( )
}
select {
case <- time . After ( 10 * time . Second ) :
assert . Fail ( t , "Queue should have terminated" )
return
case <- terminated :
}
2022-01-22 21:22:14 +00:00
}