2020-05-04 11:40:46 +02:00
package pilot
import (
"bytes"
"context"
"encoding/json"
"fmt"
2020-12-03 15:52:05 +01:00
"hash/fnv"
2021-03-04 20:08:03 +01:00
"io"
2020-05-04 11:40:46 +02:00
"net/http"
"time"
"github.com/cenkalti/backoff/v4"
2020-12-03 15:52:05 +01:00
"github.com/traefik/traefik/v2/pkg/anonymize"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
2020-09-16 15:46:04 +02:00
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/metrics"
"github.com/traefik/traefik/v2/pkg/safe"
"github.com/traefik/traefik/v2/pkg/version"
2020-05-04 11:40:46 +02:00
)
const (
2020-12-03 15:52:05 +01:00
baseInstanceInfoURL = "https://instance-info.pilot.traefik.io/public"
baseGatewayURL = "https://gateway.pilot.traefik.io"
2020-05-04 11:40:46 +02:00
)
2020-12-03 15:52:05 +01:00
const (
tokenHeader = "X-Token"
tokenHashHeader = "X-Token-Hash"
)
2020-05-04 11:40:46 +02:00
2020-12-03 15:52:05 +01:00
const (
pilotInstanceInfoTimer = 5 * time . Minute
pilotDynConfTimer = 12 * time . Hour
maxElapsedTime = 4 * time . Minute
2021-02-01 14:42:04 +01:00
initialInterval = 5 * time . Second
multiplier = 3
2020-12-03 15:52:05 +01:00
)
2020-05-04 11:40:46 +02:00
type instanceInfo struct {
2020-12-03 15:52:05 +01:00
ID string ` json:"id,omitempty" `
Metrics [ ] metrics . PilotMetric ` json:"metrics,omitempty" `
2020-05-04 11:40:46 +02:00
}
// New creates a new Pilot.
2020-08-10 15:26:04 +02:00
func New ( token string , metricsRegistry * metrics . PilotRegistry , pool * safe . Pool ) * Pilot {
2020-12-03 15:52:05 +01:00
tokenHash := fnv . New64a ( )
// the `sum64a` implementation of the `Write` method never returns an error.
_ , _ = tokenHash . Write ( [ ] byte ( token ) )
2020-05-04 11:40:46 +02:00
return & Pilot {
2020-12-03 15:52:05 +01:00
dynamicConfigCh : make ( chan dynamic . Configuration ) ,
2020-05-04 11:40:46 +02:00
client : & client {
2020-12-03 15:52:05 +01:00
token : token ,
tokenHash : fmt . Sprintf ( "%x" , tokenHash . Sum64 ( ) ) ,
httpClient : & http . Client { Timeout : 5 * time . Second } ,
baseInstanceInfoURL : baseInstanceInfoURL ,
baseGatewayURL : baseGatewayURL ,
2020-05-04 11:40:46 +02:00
} ,
2020-08-10 15:26:04 +02:00
routinesPool : pool ,
metricsRegistry : metricsRegistry ,
2020-05-04 11:40:46 +02:00
}
}
// Pilot connector with Pilot.
type Pilot struct {
routinesPool * safe . Pool
client * client
2020-12-03 15:52:05 +01:00
dynamicConfig dynamic . Configuration
dynamicConfigCh chan dynamic . Configuration
2020-08-10 15:26:04 +02:00
metricsRegistry * metrics . PilotRegistry
2020-05-04 11:40:46 +02:00
}
2020-12-03 15:52:05 +01:00
// SetDynamicConfiguration stores the dynamic configuration.
func ( p * Pilot ) SetDynamicConfiguration ( dynamicConfig dynamic . Configuration ) {
p . dynamicConfigCh <- dynamicConfig
2020-05-04 11:40:46 +02:00
}
2020-12-03 15:52:05 +01:00
func ( p * Pilot ) sendAnonDynConf ( ctx context . Context , config dynamic . Configuration ) {
err := p . client . SendAnonDynConf ( ctx , config )
if err != nil {
log . WithoutContext ( ) . Error ( err )
2020-05-04 11:40:46 +02:00
}
}
2020-12-03 15:52:05 +01:00
func ( p * Pilot ) sendInstanceInfo ( ctx context . Context , pilotMetrics [ ] metrics . PilotMetric ) {
err := p . client . SendInstanceInfo ( ctx , pilotMetrics )
2020-05-04 11:40:46 +02:00
if err != nil {
log . WithoutContext ( ) . Error ( err )
}
}
// Tick sends data periodically.
func ( p * Pilot ) Tick ( ctx context . Context ) {
2020-08-10 15:26:04 +02:00
pilotMetrics := p . metricsRegistry . Data ( )
2020-05-04 11:40:46 +02:00
p . routinesPool . GoCtx ( func ( ctxRt context . Context ) {
2020-12-03 15:52:05 +01:00
p . sendInstanceInfo ( ctxRt , pilotMetrics )
2020-05-04 11:40:46 +02:00
} )
2020-12-03 15:52:05 +01:00
instanceInfoTicker := time . NewTicker ( pilotInstanceInfoTimer )
dynConfTicker := time . NewTicker ( pilotDynConfTimer )
2020-05-04 11:40:46 +02:00
for {
select {
2020-12-03 15:52:05 +01:00
case tick := <- instanceInfoTicker . C :
log . WithoutContext ( ) . Debugf ( "Send instance info to pilot: %s" , tick )
2020-05-04 11:40:46 +02:00
2020-08-10 15:26:04 +02:00
pilotMetrics := p . metricsRegistry . Data ( )
2020-05-04 11:40:46 +02:00
p . routinesPool . GoCtx ( func ( ctxRt context . Context ) {
2020-12-03 15:52:05 +01:00
p . sendInstanceInfo ( ctxRt , pilotMetrics )
} )
case tick := <- dynConfTicker . C :
log . WithoutContext ( ) . Debugf ( "Send anonymized dynamic configuration to pilot: %s" , tick )
p . routinesPool . GoCtx ( func ( ctxRt context . Context ) {
p . sendAnonDynConf ( ctxRt , p . dynamicConfig )
2020-05-04 11:40:46 +02:00
} )
2020-12-03 15:52:05 +01:00
case dynamicConfig := <- p . dynamicConfigCh :
p . dynamicConfig = dynamicConfig
2020-05-04 11:40:46 +02:00
case <- ctx . Done ( ) :
return
}
}
}
type client struct {
2020-12-03 15:52:05 +01:00
httpClient * http . Client
baseInstanceInfoURL string
baseGatewayURL string
token string
tokenHash string
uuid string
2020-05-04 11:40:46 +02:00
}
func ( c * client ) createUUID ( ) ( string , error ) {
data := [ ] byte ( ` { "version":" ` + version . Version + ` ","codeName":" ` + version . Codename + ` "} ` )
2020-12-03 15:52:05 +01:00
req , err := http . NewRequest ( http . MethodPost , c . baseInstanceInfoURL + "/" , bytes . NewBuffer ( data ) )
2020-05-04 11:40:46 +02:00
if err != nil {
return "" , fmt . Errorf ( "failed to create request: %w" , err )
}
req . Header . Set ( "Content-Type" , "application/json" )
req . Header . Set ( tokenHeader , c . token )
resp , err := c . httpClient . Do ( req )
if err != nil {
return "" , fmt . Errorf ( "failed call Pilot: %w" , err )
}
defer resp . Body . Close ( )
2021-03-04 20:08:03 +01:00
body , err := io . ReadAll ( resp . Body )
2020-05-04 11:40:46 +02:00
if err != nil {
return "" , fmt . Errorf ( "failed read response body: %w" , err )
}
if resp . StatusCode / 100 != 2 {
return "" , fmt . Errorf ( "wrong status code while sending configuration: %d: %s" , resp . StatusCode , body )
}
created := instanceInfo { }
err = json . Unmarshal ( body , & created )
if err != nil {
return "" , fmt . Errorf ( "failed to unmarshal response body: %w" , err )
}
return created . ID , nil
}
2020-12-03 15:52:05 +01:00
// SendAnonDynConf sends anonymized dynamic configuration to Pilot.
func ( c * client ) SendAnonDynConf ( ctx context . Context , config dynamic . Configuration ) error {
anonConfig , err := anonymize . Do ( & config , false )
if err != nil {
return fmt . Errorf ( "unable to anonymize dynamic configuration: %w" , err )
}
2020-05-04 11:40:46 +02:00
2020-12-03 15:52:05 +01:00
req , err := http . NewRequest ( http . MethodPost , c . baseGatewayURL + "/collect" , bytes . NewReader ( [ ] byte ( anonConfig ) ) )
if err != nil {
return fmt . Errorf ( "failed to create request: %w" , err )
}
return c . sendDataRetryable ( ctx , req )
2020-05-04 11:40:46 +02:00
}
2020-12-03 15:52:05 +01:00
// SendInstanceInfo sends instance information to Pilot.
func ( c * client ) SendInstanceInfo ( ctx context . Context , pilotMetrics [ ] metrics . PilotMetric ) error {
2020-05-04 11:40:46 +02:00
if len ( c . uuid ) == 0 {
var err error
c . uuid , err = c . createUUID ( )
if err != nil {
return fmt . Errorf ( "failed to create UUID: %w" , err )
}
version . UUID = c . uuid
}
info := instanceInfo {
2020-08-10 15:26:04 +02:00
ID : c . uuid ,
Metrics : pilotMetrics ,
2020-05-04 11:40:46 +02:00
}
b , err := json . Marshal ( info )
if err != nil {
return fmt . Errorf ( "failed to marshall request body: %w" , err )
}
2020-12-03 15:52:05 +01:00
req , err := http . NewRequest ( http . MethodPost , c . baseInstanceInfoURL + "/command" , bytes . NewReader ( b ) )
2020-05-04 11:40:46 +02:00
if err != nil {
2020-12-03 15:52:05 +01:00
return fmt . Errorf ( "failed to create instance info request: %w" , err )
2020-05-04 11:40:46 +02:00
}
2020-12-03 15:52:05 +01:00
req . Header . Set ( tokenHeader , c . token )
2020-05-04 11:40:46 +02:00
2020-12-03 15:52:05 +01:00
return c . sendDataRetryable ( ctx , req )
}
2020-05-04 11:40:46 +02:00
2020-12-03 15:52:05 +01:00
func ( c * client ) sendDataRetryable ( ctx context . Context , req * http . Request ) error {
exponentialBackOff := backoff . NewExponentialBackOff ( )
exponentialBackOff . MaxElapsedTime = maxElapsedTime
2021-02-01 14:42:04 +01:00
exponentialBackOff . InitialInterval = initialInterval
exponentialBackOff . Multiplier = multiplier
2020-05-04 11:40:46 +02:00
2020-12-03 15:52:05 +01:00
req . Header . Set ( "Content-Type" , "application/json" )
req . Header . Set ( tokenHashHeader , c . tokenHash )
2020-05-04 11:40:46 +02:00
2020-12-03 15:52:05 +01:00
return backoff . RetryNotify (
func ( ) error {
resp , err := c . httpClient . Do ( req )
if err != nil {
return fmt . Errorf ( "failed to call Pilot: %w" , err )
}
defer func ( ) { _ = resp . Body . Close ( ) } ( )
2020-05-04 11:40:46 +02:00
2021-03-04 20:08:03 +01:00
body , err := io . ReadAll ( resp . Body )
2020-12-03 15:52:05 +01:00
if err != nil {
return fmt . Errorf ( "failed to read response body: %w" , err )
}
if resp . StatusCode != http . StatusOK {
return fmt . Errorf ( "wrong status code while sending configuration: %d: %s" , resp . StatusCode , body )
}
return nil
} ,
backoff . WithContext ( exponentialBackOff , ctx ) ,
func ( err error , duration time . Duration ) {
log . WithoutContext ( ) . Errorf ( "retry in %s due to: %v " , duration , err )
} )
2020-05-04 11:40:46 +02:00
}