2017-11-08 19:44:03 +05:30
package metrics
import (
"bytes"
2018-11-14 10:18:03 +01:00
"context"
2018-05-29 16:58:03 -04:00
"fmt"
"net/url"
"regexp"
2017-11-08 19:44:03 +05:30
"time"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/influx"
2019-07-18 21:36:05 +02:00
influxdb "github.com/influxdata/influxdb1-client/v2"
2020-09-16 15:46:04 +02:00
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/safe"
"github.com/traefik/traefik/v2/pkg/types"
2017-11-08 19:44:03 +05:30
)
2021-09-17 09:08:07 +02:00
var (
influxDBClient * influx . Influx
influxDBTicker * time . Ticker
)
2017-11-08 19:44:03 +05:30
const (
2021-04-30 10:22:04 +02:00
influxDBConfigReloadsName = "traefik.config.reload.total"
influxDBConfigReloadsFailureName = influxDBConfigReloadsName + ".failure"
influxDBLastConfigReloadSuccessName = "traefik.config.reload.lastSuccessTimestamp"
influxDBLastConfigReloadFailureName = "traefik.config.reload.lastFailureTimestamp"
2020-12-18 18:44:03 +01:00
influxDBTLSCertsNotAfterTimestampName = "traefik.tls.certs.notAfterTimestamp"
2021-04-30 10:22:04 +02:00
influxDBEntryPointReqsName = "traefik.entrypoint.requests.total"
influxDBEntryPointReqsTLSName = "traefik.entrypoint.requests.tls.total"
influxDBEntryPointReqDurationName = "traefik.entrypoint.request.duration"
influxDBEntryPointOpenConnsName = "traefik.entrypoint.connections.open"
2022-09-12 17:10:09 +02:00
influxDBEntryPointReqsBytesName = "traefik.entrypoint.requests.bytes.total"
influxDBEntryPointRespsBytesName = "traefik.entrypoint.responses.bytes.total"
2021-04-30 10:22:04 +02:00
influxDBRouterReqsName = "traefik.router.requests.total"
influxDBRouterReqsTLSName = "traefik.router.requests.tls.total"
influxDBRouterReqsDurationName = "traefik.router.request.duration"
influxDBORouterOpenConnsName = "traefik.router.connections.open"
2022-09-12 17:10:09 +02:00
influxDBRouterReqsBytesName = "traefik.router.requests.bytes.total"
influxDBRouterRespsBytesName = "traefik.router.responses.bytes.total"
2021-04-30 10:22:04 +02:00
influxDBServiceReqsName = "traefik.service.requests.total"
influxDBServiceReqsTLSName = "traefik.service.requests.tls.total"
influxDBServiceReqsDurationName = "traefik.service.request.duration"
influxDBServiceRetriesTotalName = "traefik.service.retries.total"
influxDBServiceOpenConnsName = "traefik.service.connections.open"
influxDBServiceServerUpName = "traefik.service.server.up"
2022-09-12 17:10:09 +02:00
influxDBServiceReqsBytesName = "traefik.service.requests.bytes.total"
influxDBServiceRespsBytesName = "traefik.service.responses.bytes.total"
2017-11-08 19:44:03 +05:30
)
2018-11-14 10:18:03 +01:00
const (
protocolHTTP = "http"
protocolUDP = "udp"
)
2017-11-08 19:44:03 +05:30
// RegisterInfluxDB registers the metrics pusher if this didn't happen yet and creates a InfluxDB Registry instance.
2018-11-14 10:18:03 +01:00
func RegisterInfluxDB ( ctx context . Context , config * types . InfluxDB ) Registry {
2018-05-29 16:58:03 -04:00
if influxDBClient == nil {
2018-11-14 10:18:03 +01:00
influxDBClient = initInfluxDBClient ( ctx , config )
2018-05-29 16:58:03 -04:00
}
2017-11-08 19:44:03 +05:30
if influxDBTicker == nil {
2019-07-18 21:36:05 +02:00
influxDBTicker = initInfluxDBTicker ( ctx , config )
2017-11-08 19:44:03 +05:30
}
2019-07-18 21:36:05 +02:00
registry := & standardRegistry {
2020-12-18 18:44:03 +01:00
configReloadsCounter : influxDBClient . NewCounter ( influxDBConfigReloadsName ) ,
configReloadsFailureCounter : influxDBClient . NewCounter ( influxDBConfigReloadsFailureName ) ,
lastConfigReloadSuccessGauge : influxDBClient . NewGauge ( influxDBLastConfigReloadSuccessName ) ,
lastConfigReloadFailureGauge : influxDBClient . NewGauge ( influxDBLastConfigReloadFailureName ) ,
tlsCertsNotAfterTimestampGauge : influxDBClient . NewGauge ( influxDBTLSCertsNotAfterTimestampName ) ,
2017-11-08 19:44:03 +05:30
}
2019-07-18 21:36:05 +02:00
if config . AddEntryPointsLabels {
registry . epEnabled = config . AddEntryPointsLabels
2023-03-20 18:06:07 +01:00
registry . entryPointReqsCounter = NewCounterWithNoopHeaders ( influxDBClient . NewCounter ( influxDBEntryPointReqsName ) )
2021-04-30 10:22:04 +02:00
registry . entryPointReqsTLSCounter = influxDBClient . NewCounter ( influxDBEntryPointReqsTLSName )
2020-03-05 15:10:07 +01:00
registry . entryPointReqDurationHistogram , _ = NewHistogramWithScale ( influxDBClient . NewHistogram ( influxDBEntryPointReqDurationName ) , time . Second )
2019-07-18 21:36:05 +02:00
registry . entryPointOpenConnsGauge = influxDBClient . NewGauge ( influxDBEntryPointOpenConnsName )
2022-09-12 17:10:09 +02:00
registry . entryPointReqsBytesCounter = influxDBClient . NewCounter ( influxDBEntryPointReqsBytesName )
registry . entryPointRespsBytesCounter = influxDBClient . NewCounter ( influxDBEntryPointRespsBytesName )
2019-07-18 21:36:05 +02:00
}
2021-04-30 10:22:04 +02:00
if config . AddRoutersLabels {
registry . routerEnabled = config . AddRoutersLabels
2023-03-20 18:06:07 +01:00
registry . routerReqsCounter = NewCounterWithNoopHeaders ( influxDBClient . NewCounter ( influxDBRouterReqsName ) )
2021-04-30 10:22:04 +02:00
registry . routerReqsTLSCounter = influxDBClient . NewCounter ( influxDBRouterReqsTLSName )
registry . routerReqDurationHistogram , _ = NewHistogramWithScale ( influxDBClient . NewHistogram ( influxDBRouterReqsDurationName ) , time . Second )
registry . routerOpenConnsGauge = influxDBClient . NewGauge ( influxDBORouterOpenConnsName )
2022-09-12 17:10:09 +02:00
registry . routerReqsBytesCounter = influxDBClient . NewCounter ( influxDBRouterReqsBytesName )
registry . routerRespsBytesCounter = influxDBClient . NewCounter ( influxDBRouterRespsBytesName )
2021-04-30 10:22:04 +02:00
}
2019-07-18 21:36:05 +02:00
if config . AddServicesLabels {
registry . svcEnabled = config . AddServicesLabels
2023-03-20 18:06:07 +01:00
registry . serviceReqsCounter = NewCounterWithNoopHeaders ( influxDBClient . NewCounter ( influxDBServiceReqsName ) )
2021-04-30 10:22:04 +02:00
registry . serviceReqsTLSCounter = influxDBClient . NewCounter ( influxDBServiceReqsTLSName )
registry . serviceReqDurationHistogram , _ = NewHistogramWithScale ( influxDBClient . NewHistogram ( influxDBServiceReqsDurationName ) , time . Second )
registry . serviceRetriesCounter = influxDBClient . NewCounter ( influxDBServiceRetriesTotalName )
registry . serviceOpenConnsGauge = influxDBClient . NewGauge ( influxDBServiceOpenConnsName )
registry . serviceServerUpGauge = influxDBClient . NewGauge ( influxDBServiceServerUpName )
2022-09-12 17:10:09 +02:00
registry . serviceReqsBytesCounter = influxDBClient . NewCounter ( influxDBServiceReqsBytesName )
registry . serviceRespsBytesCounter = influxDBClient . NewCounter ( influxDBServiceRespsBytesName )
2019-07-18 21:36:05 +02:00
}
return registry
2017-11-08 19:44:03 +05:30
}
2022-02-09 17:32:12 +03:00
// initInfluxDBClient creates a influxDBClient.
2018-11-14 10:18:03 +01:00
func initInfluxDBClient ( ctx context . Context , config * types . InfluxDB ) * influx . Influx {
logger := log . FromContext ( ctx )
2018-05-29 16:58:03 -04:00
// TODO deprecated: move this switch into configuration.SetEffectiveConfiguration when web provider will be removed.
switch config . Protocol {
2018-11-14 10:18:03 +01:00
case protocolUDP :
2018-05-29 16:58:03 -04:00
if len ( config . Database ) > 0 || len ( config . RetentionPolicy ) > 0 {
2018-11-14 10:18:03 +01:00
logger . Warn ( "Database and RetentionPolicy options have no effect with UDP." )
2018-05-29 16:58:03 -04:00
config . Database = ""
config . RetentionPolicy = ""
}
2018-11-14 10:18:03 +01:00
case protocolHTTP :
2018-05-29 16:58:03 -04:00
if u , err := url . Parse ( config . Address ) ; err == nil {
if u . Scheme != "http" && u . Scheme != "https" {
2018-11-14 10:18:03 +01:00
logger . Warnf ( "InfluxDB address %s should specify a scheme (http or https): falling back on HTTP." , config . Address )
2018-05-29 16:58:03 -04:00
config . Address = "http://" + config . Address
}
} else {
2018-11-14 10:18:03 +01:00
logger . Errorf ( "Unable to parse the InfluxDB address %v: falling back on UDP." , err )
config . Protocol = protocolUDP
2018-05-29 16:58:03 -04:00
config . Database = ""
config . RetentionPolicy = ""
}
default :
2018-11-14 10:18:03 +01:00
logger . Warnf ( "Unsupported protocol %s: falling back on UDP." , config . Protocol )
config . Protocol = protocolUDP
2018-05-29 16:58:03 -04:00
config . Database = ""
config . RetentionPolicy = ""
}
return influx . New (
2021-09-17 09:08:07 +02:00
config . AdditionalLabels ,
2018-05-29 16:58:03 -04:00
influxdb . BatchPointsConfig {
Database : config . Database ,
RetentionPolicy : config . RetentionPolicy ,
} ,
kitlog . LoggerFunc ( func ( keyvals ... interface { } ) error {
2022-07-19 18:38:09 +02:00
log . WithoutContext ( ) . WithField ( log . MetricsProviderName , "influxdb" ) . Info ( keyvals ... )
2018-05-29 16:58:03 -04:00
return nil
} ) )
}
2020-05-11 12:06:07 +02:00
// initInfluxDBTicker initializes metrics pusher.
2019-07-18 21:36:05 +02:00
func initInfluxDBTicker ( ctx context . Context , config * types . InfluxDB ) * time . Ticker {
2019-06-17 11:48:05 +02:00
report := time . NewTicker ( time . Duration ( config . PushInterval ) )
2017-11-08 19:44:03 +05:30
safe . Go ( func ( ) {
var buf bytes . Buffer
2019-07-18 21:36:05 +02:00
influxDBClient . WriteLoop ( ctx , report . C , & influxDBWriter { buf : buf , config : config } )
2017-11-08 19:44:03 +05:30
} )
return report
}
2020-05-11 12:06:07 +02:00
// StopInfluxDB stops internal influxDBTicker which controls the pushing of metrics to InfluxDB Agent and resets it to `nil`.
2017-11-08 19:44:03 +05:30
func StopInfluxDB ( ) {
if influxDBTicker != nil {
influxDBTicker . Stop ( )
}
influxDBTicker = nil
}
2021-09-17 09:08:07 +02:00
type influxDBWriter struct {
buf bytes . Buffer
config * types . InfluxDB
}
2018-05-29 16:58:03 -04:00
// Write creates a http or udp client and attempts to write BatchPoints.
// If a "database not found" error is encountered, a CREATE DATABASE
// query is attempted when using protocol http.
2017-11-08 19:44:03 +05:30
func ( w * influxDBWriter ) Write ( bp influxdb . BatchPoints ) error {
2018-05-29 16:58:03 -04:00
c , err := w . initWriteClient ( )
2017-11-08 19:44:03 +05:30
if err != nil {
return err
}
defer c . Close ( )
2018-05-29 16:58:03 -04:00
if writeErr := c . Write ( bp ) ; writeErr != nil {
2018-11-14 10:18:03 +01:00
ctx := log . With ( context . Background ( ) , log . Str ( log . MetricsProviderName , "influxdb" ) )
log . FromContext ( ctx ) . Errorf ( "Error while writing to InfluxDB: %s" , writeErr . Error ( ) )
if handleErr := w . handleWriteError ( ctx , c , writeErr ) ; handleErr != nil {
2018-05-29 16:58:03 -04:00
return handleErr
}
// Retry write after successful handling of writeErr
return c . Write ( bp )
}
return nil
}
2018-07-03 10:02:03 +02:00
func ( w * influxDBWriter ) initWriteClient ( ) ( influxdb . Client , error ) {
2018-05-29 16:58:03 -04:00
if w . config . Protocol == "http" {
2018-07-03 10:02:03 +02:00
return influxdb . NewHTTPClient ( influxdb . HTTPConfig {
2018-07-11 17:50:03 +02:00
Addr : w . config . Address ,
Username : w . config . Username ,
Password : w . config . Password ,
2018-05-29 16:58:03 -04:00
} )
}
2018-07-03 10:02:03 +02:00
return influxdb . NewUDPClient ( influxdb . UDPConfig {
Addr : w . config . Address ,
} )
2018-05-29 16:58:03 -04:00
}
2018-11-14 10:18:03 +01:00
func ( w * influxDBWriter ) handleWriteError ( ctx context . Context , c influxdb . Client , writeErr error ) error {
if w . config . Protocol != protocolHTTP {
2018-05-29 16:58:03 -04:00
return writeErr
}
match , matchErr := regexp . MatchString ( "database not found" , writeErr . Error ( ) )
if matchErr != nil || ! match {
return writeErr
}
qStr := fmt . Sprintf ( "CREATE DATABASE \"%s\"" , w . config . Database )
if w . config . RetentionPolicy != "" {
qStr = fmt . Sprintf ( "%s WITH NAME \"%s\"" , qStr , w . config . RetentionPolicy )
}
2018-11-14 10:18:03 +01:00
logger := log . FromContext ( ctx )
logger . Debugf ( "InfluxDB database not found: attempting to create one with %s" , qStr )
2018-05-29 16:58:03 -04:00
q := influxdb . NewQuery ( qStr , "" , "" )
response , queryErr := c . Query ( q )
if queryErr == nil && response . Error ( ) != nil {
queryErr = response . Error ( )
}
if queryErr != nil {
2018-11-14 10:18:03 +01:00
logger . Errorf ( "Error while creating the InfluxDB database %s" , queryErr )
2018-05-29 16:58:03 -04:00
return queryErr
}
2018-11-14 10:18:03 +01:00
logger . Debugf ( "Successfully created the InfluxDB database %s" , w . config . Database )
2018-05-29 16:58:03 -04:00
return nil
2017-11-08 19:44:03 +05:30
}