2016-04-13 21:36:23 +03:00
package safe
import (
2017-02-02 23:07:44 +03:00
"context"
2016-12-08 15:32:12 +03:00
"fmt"
2016-04-13 21:36:23 +03:00
"runtime/debug"
"sync"
2017-04-30 15:39:49 +03:00
2019-08-03 04:58:23 +03:00
"github.com/cenkalti/backoff/v3"
"github.com/containous/traefik/v2/pkg/log"
2016-04-13 21:36:23 +03:00
)
type routine struct {
goroutine func ( chan bool )
stop chan bool
}
2016-08-18 14:03:10 +03:00
type routineCtx func ( ctx context . Context )
// Pool is a pool of go routines
2016-04-13 21:36:23 +03:00
type Pool struct {
2020-01-20 19:42:05 +03:00
routines [ ] routine
waitGroup sync . WaitGroup
lock sync . Mutex
baseCtx context . Context
baseCancel context . CancelFunc
ctx context . Context
cancel context . CancelFunc
2016-08-18 14:03:10 +03:00
}
// NewPool creates a Pool
2016-08-18 15:20:11 +03:00
func NewPool ( parentCtx context . Context ) * Pool {
2016-11-15 22:14:11 +03:00
baseCtx , baseCancel := context . WithCancel ( parentCtx )
2016-08-18 14:03:10 +03:00
ctx , cancel := context . WithCancel ( baseCtx )
return & Pool {
2016-11-15 22:14:11 +03:00
baseCtx : baseCtx ,
baseCancel : baseCancel ,
ctx : ctx ,
cancel : cancel ,
2016-08-18 14:03:10 +03:00
}
}
// Ctx returns main context
func ( p * Pool ) Ctx ( ) context . Context {
2016-08-18 15:20:11 +03:00
return p . baseCtx
}
2018-07-03 11:02:03 +03:00
// GoCtx starts a recoverable goroutine with a context
2016-08-18 14:03:10 +03:00
func ( p * Pool ) GoCtx ( goroutine routineCtx ) {
p . lock . Lock ( )
p . waitGroup . Add ( 1 )
Go ( func ( ) {
2019-08-26 11:54:05 +03:00
defer p . waitGroup . Done ( )
2016-08-18 14:03:10 +03:00
goroutine ( p . ctx )
} )
p . lock . Unlock ( )
2016-04-13 21:36:23 +03:00
}
// Go starts a recoverable goroutine, and can be stopped with stop chan
func ( p * Pool ) Go ( goroutine func ( stop chan bool ) ) {
p . lock . Lock ( )
newRoutine := routine {
goroutine : goroutine ,
stop : make ( chan bool , 1 ) ,
}
p . routines = append ( p . routines , newRoutine )
p . waitGroup . Add ( 1 )
Go ( func ( ) {
2019-08-26 11:54:05 +03:00
defer p . waitGroup . Done ( )
2016-04-13 21:36:23 +03:00
goroutine ( newRoutine . stop )
} )
p . lock . Unlock ( )
}
// Stop stops all started routines, waiting for their termination
func ( p * Pool ) Stop ( ) {
p . lock . Lock ( )
2016-08-18 15:20:11 +03:00
defer p . lock . Unlock ( )
2016-08-18 14:03:10 +03:00
p . cancel ( )
2016-04-13 21:36:23 +03:00
for _ , routine := range p . routines {
routine . stop <- true
}
p . waitGroup . Wait ( )
for _ , routine := range p . routines {
close ( routine . stop )
}
}
2016-11-15 22:14:11 +03:00
// Cleanup releases resources used by the pool, and should be called when the pool will no longer be used
func ( p * Pool ) Cleanup ( ) {
p . Stop ( )
p . lock . Lock ( )
defer p . lock . Unlock ( )
p . baseCancel ( )
}
2016-04-13 21:36:23 +03:00
// Go starts a recoverable goroutine
func Go ( goroutine func ( ) ) {
GoWithRecover ( goroutine , defaultRecoverGoroutine )
}
// GoWithRecover starts a recoverable goroutine using given customRecover() function
func GoWithRecover ( goroutine func ( ) , customRecover func ( err interface { } ) ) {
go func ( ) {
defer func ( ) {
if err := recover ( ) ; err != nil {
customRecover ( err )
}
} ( )
goroutine ( )
} ( )
}
func defaultRecoverGoroutine ( err interface { } ) {
2019-09-13 20:28:04 +03:00
logger := log . WithoutContext ( )
logger . Errorf ( "Error in Go routine: %s" , err )
logger . Errorf ( "Stack: %s" , debug . Stack ( ) )
2016-04-13 21:36:23 +03:00
}
2016-12-08 15:32:12 +03:00
// OperationWithRecover wrap a backoff operation in a Recover
func OperationWithRecover ( operation backoff . Operation ) backoff . Operation {
return func ( ) ( err error ) {
defer func ( ) {
if res := recover ( ) ; res != nil {
2016-12-08 15:32:52 +03:00
defaultRecoverGoroutine ( res )
2018-07-03 11:02:03 +03:00
err = fmt . Errorf ( "panic in operation: %s" , err )
2016-12-08 15:32:12 +03:00
}
} ( )
2016-12-08 15:32:52 +03:00
return operation ( )
2016-12-08 15:32:12 +03:00
}
}