2016-04-13 20:36:23 +02:00
package safe
import (
2017-02-02 21:07:44 +01:00
"context"
2016-12-08 13:32:12 +01:00
"fmt"
2016-04-13 20:36:23 +02:00
"runtime/debug"
"sync"
2017-04-30 14:39:49 +02:00
"github.com/cenk/backoff"
"github.com/containous/traefik/log"
2016-04-13 20:36:23 +02:00
)
type routine struct {
goroutine func ( chan bool )
stop chan bool
}
2016-08-18 13:03:10 +02:00
type routineCtx func ( ctx context . Context )
// Pool is a pool of go routines
2016-04-13 20:36:23 +02:00
type Pool struct {
2016-08-18 13:03:10 +02:00
routines [ ] routine
routinesCtx [ ] routineCtx
waitGroup sync . WaitGroup
lock sync . Mutex
baseCtx context . Context
2016-11-15 19:14:11 +00:00
baseCancel context . CancelFunc
2016-08-18 13:03:10 +02:00
ctx context . Context
cancel context . CancelFunc
}
// NewPool creates a Pool
2016-08-18 14:20:11 +02:00
func NewPool ( parentCtx context . Context ) * Pool {
2016-11-15 19:14:11 +00:00
baseCtx , baseCancel := context . WithCancel ( parentCtx )
2016-08-18 13:03:10 +02:00
ctx , cancel := context . WithCancel ( baseCtx )
return & Pool {
2016-11-15 19:14:11 +00:00
baseCtx : baseCtx ,
baseCancel : baseCancel ,
ctx : ctx ,
cancel : cancel ,
2016-08-18 13:03:10 +02:00
}
}
// Ctx returns main context
func ( p * Pool ) Ctx ( ) context . Context {
2016-08-18 14:20:11 +02:00
return p . baseCtx
}
//AddGoCtx adds a recoverable goroutine with a context without starting it
func ( p * Pool ) AddGoCtx ( goroutine routineCtx ) {
p . lock . Lock ( )
p . routinesCtx = append ( p . routinesCtx , goroutine )
p . lock . Unlock ( )
2016-08-18 13:03:10 +02:00
}
//GoCtx starts a recoverable goroutine with a context
func ( p * Pool ) GoCtx ( goroutine routineCtx ) {
p . lock . Lock ( )
p . routinesCtx = append ( p . routinesCtx , goroutine )
p . waitGroup . Add ( 1 )
Go ( func ( ) {
goroutine ( p . ctx )
p . waitGroup . Done ( )
} )
p . lock . Unlock ( )
2016-04-13 20:36:23 +02:00
}
// Go starts a recoverable goroutine, and can be stopped with stop chan
func ( p * Pool ) Go ( goroutine func ( stop chan bool ) ) {
p . lock . Lock ( )
newRoutine := routine {
goroutine : goroutine ,
stop : make ( chan bool , 1 ) ,
}
p . routines = append ( p . routines , newRoutine )
p . waitGroup . Add ( 1 )
Go ( func ( ) {
goroutine ( newRoutine . stop )
p . waitGroup . Done ( )
} )
p . lock . Unlock ( )
}
// Stop stops all started routines, waiting for their termination
func ( p * Pool ) Stop ( ) {
p . lock . Lock ( )
2016-08-18 14:20:11 +02:00
defer p . lock . Unlock ( )
2016-08-18 13:03:10 +02:00
p . cancel ( )
2016-04-13 20:36:23 +02:00
for _ , routine := range p . routines {
routine . stop <- true
}
p . waitGroup . Wait ( )
for _ , routine := range p . routines {
close ( routine . stop )
}
}
2016-11-15 19:14:11 +00:00
// Cleanup releases resources used by the pool, and should be called when the pool will no longer be used
func ( p * Pool ) Cleanup ( ) {
p . Stop ( )
p . lock . Lock ( )
defer p . lock . Unlock ( )
p . baseCancel ( )
}
2016-08-18 14:20:11 +02:00
// Start starts all stopped routines
2016-08-18 13:03:10 +02:00
func ( p * Pool ) Start ( ) {
p . lock . Lock ( )
2016-08-18 14:20:11 +02:00
defer p . lock . Unlock ( )
2016-08-18 13:03:10 +02:00
p . ctx , p . cancel = context . WithCancel ( p . baseCtx )
2017-04-30 14:39:49 +02:00
for i := range p . routines {
2016-08-18 13:03:10 +02:00
p . waitGroup . Add ( 1 )
2017-04-30 14:39:49 +02:00
p . routines [ i ] . stop = make ( chan bool , 1 )
2016-08-18 13:03:10 +02:00
Go ( func ( ) {
2017-04-30 14:39:49 +02:00
p . routines [ i ] . goroutine ( p . routines [ i ] . stop )
2016-08-18 13:03:10 +02:00
p . waitGroup . Done ( )
} )
}
for _ , routine := range p . routinesCtx {
p . waitGroup . Add ( 1 )
Go ( func ( ) {
routine ( p . ctx )
p . waitGroup . Done ( )
} )
}
}
2016-04-13 20:36:23 +02:00
// Go starts a recoverable goroutine
func Go ( goroutine func ( ) ) {
GoWithRecover ( goroutine , defaultRecoverGoroutine )
}
// GoWithRecover starts a recoverable goroutine using given customRecover() function
func GoWithRecover ( goroutine func ( ) , customRecover func ( err interface { } ) ) {
go func ( ) {
defer func ( ) {
if err := recover ( ) ; err != nil {
customRecover ( err )
}
} ( )
goroutine ( )
} ( )
}
func defaultRecoverGoroutine ( err interface { } ) {
2016-08-18 14:20:11 +02:00
log . Errorf ( "Error in Go routine: %s" , err )
2016-04-13 20:36:23 +02:00
debug . PrintStack ( )
}
2016-12-08 13:32:12 +01:00
// OperationWithRecover wrap a backoff operation in a Recover
func OperationWithRecover ( operation backoff . Operation ) backoff . Operation {
return func ( ) ( err error ) {
defer func ( ) {
if res := recover ( ) ; res != nil {
2016-12-08 13:32:52 +01:00
defaultRecoverGoroutine ( res )
2016-12-08 13:32:12 +01:00
err = fmt . Errorf ( "Panic in operation: %s" , err )
}
} ( )
2016-12-08 13:32:52 +01:00
return operation ( )
2016-12-08 13:32:12 +01:00
}
}