2022-08-14 10:34:19 +08:00
package poller
import (
"context"
2022-09-29 22:19:21 +08:00
"errors"
2022-08-14 11:17:55 +08:00
"time"
2022-08-14 10:34:19 +08:00
"gitea.com/gitea/act_runner/client"
2022-08-17 14:26:58 +08:00
runnerv1 "gitea.com/gitea/proto-go/runner/v1"
2022-08-14 10:59:09 +08:00
2022-09-25 18:54:00 +08:00
"github.com/bufbuild/connect-go"
2022-08-14 10:34:19 +08:00
log "github.com/sirupsen/logrus"
)
2022-10-14 10:55:49 +08:00
const (
errorRetryCounterLimit = 3
errorRetryTimeSleepSecs = 30
)
2022-10-12 20:55:50 +08:00
2022-11-11 13:51:00 +08:00
var ErrDataLock = errors . New ( "Data Lock Error" )
2022-09-29 22:19:21 +08:00
2022-10-15 20:03:33 +08:00
func New ( cli client . Client , dispatch func ( context . Context , * runnerv1 . Task ) error ) * Poller {
2022-08-14 10:34:19 +08:00
return & Poller {
Client : cli ,
2022-08-14 13:29:00 +08:00
Dispatch : dispatch ,
2022-08-14 10:34:19 +08:00
routineGroup : newRoutineGroup ( ) ,
}
}
type Poller struct {
2022-08-14 13:29:00 +08:00
Client client . Client
2022-08-28 14:05:56 +08:00
Filter * client . Filter
2022-09-25 18:54:00 +08:00
Dispatch func ( context . Context , * runnerv1 . Task ) error
2022-08-14 10:34:19 +08:00
2022-10-12 20:55:50 +08:00
routineGroup * routineGroup
errorRetryCounter int
2022-08-14 10:34:19 +08:00
}
2022-11-11 13:51:00 +08:00
func ( p * Poller ) Wait ( ) {
p . routineGroup . Wait ( )
}
2022-10-02 12:33:17 +08:00
func ( p * Poller ) Poll ( ctx context . Context , n int ) error {
2022-08-14 10:34:19 +08:00
for i := 0 ; i < n ; i ++ {
func ( i int ) {
p . routineGroup . Run ( func ( ) {
for {
select {
case <- ctx . Done ( ) :
log . Infof ( "stopped the runner: %d" , i + 1 )
return
default :
if ctx . Err ( ) != nil {
log . Infof ( "stopping the runner: %d" , i + 1 )
return
}
2022-09-01 15:09:22 +08:00
if err := p . poll ( ctx , i + 1 ) ; err != nil {
2022-09-03 15:59:44 +08:00
log . WithField ( "thread" , i + 1 ) .
WithError ( err ) . Error ( "poll error" )
2022-10-12 20:55:50 +08:00
if p . errorRetryCounter > errorRetryCounterLimit {
log . WithField ( "thread" , i + 1 ) . Error ( "poller: too many errors, sleeping for 30 seconds" )
// FIXME: it makes ctrl+c hang up
time . Sleep ( time . Second * errorRetryTimeSleepSecs )
}
2022-08-14 10:34:19 +08:00
}
}
}
} )
} ( i )
}
p . routineGroup . Wait ( )
2022-08-28 14:05:56 +08:00
return nil
2022-08-14 10:34:19 +08:00
}
2022-09-01 15:09:22 +08:00
func ( p * Poller ) poll ( ctx context . Context , thread int ) error {
2022-09-29 22:19:21 +08:00
l := log . WithField ( "thread" , thread )
l . Info ( "poller: request stage from remote server" )
2022-08-14 10:34:19 +08:00
2022-10-14 18:30:36 +08:00
reqCtx , cancel := context . WithTimeout ( ctx , 5 * time . Second )
2022-09-03 15:59:44 +08:00
defer cancel ( )
2022-08-14 11:17:55 +08:00
2022-09-01 15:09:22 +08:00
// request a new build stage for execution from the central
// build server.
2022-10-14 18:30:36 +08:00
resp , err := p . Client . FetchTask ( reqCtx , connect . NewRequest ( & runnerv1 . FetchTaskRequest { } ) )
2022-09-01 15:09:22 +08:00
if err == context . Canceled || err == context . DeadlineExceeded {
2022-09-29 22:19:21 +08:00
l . WithError ( err ) . Trace ( "poller: no stage returned" )
2022-10-12 20:55:50 +08:00
p . errorRetryCounter ++
2022-09-01 15:09:22 +08:00
return nil
}
2022-09-29 22:19:21 +08:00
if err != nil && err == ErrDataLock {
l . WithError ( err ) . Info ( "task accepted by another runner" )
2022-10-12 20:55:50 +08:00
p . errorRetryCounter ++
2022-09-29 22:19:21 +08:00
return nil
}
2022-09-01 15:09:22 +08:00
if err != nil {
2022-09-29 22:19:21 +08:00
l . WithError ( err ) . Error ( "cannot accept task" )
2022-10-12 20:55:50 +08:00
p . errorRetryCounter ++
2022-09-01 15:09:22 +08:00
return err
}
// exit if a nil or empty stage is returned from the system
// and allow the runner to retry.
2022-09-25 18:54:00 +08:00
if resp . Msg . Task == nil || resp . Msg . Task . Id == 0 {
2022-09-01 15:09:22 +08:00
return nil
}
2022-10-12 20:55:50 +08:00
p . errorRetryCounter = 0
2022-10-14 18:30:36 +08:00
runCtx , cancel := context . WithTimeout ( ctx , time . Hour )
defer cancel ( )
return p . Dispatch ( runCtx , resp . Msg . Task )
2022-08-14 10:34:19 +08:00
}