2017-11-08 19:44:03 +05:30
package metrics
import (
"bytes"
2018-11-14 10:18:03 +01:00
"context"
2018-05-29 16:58:03 -04:00
"fmt"
"net/url"
"regexp"
2017-11-08 19:44:03 +05:30
"time"
2019-08-03 03:58:23 +02:00
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/safe"
"github.com/containous/traefik/v2/pkg/types"
2017-11-08 19:44:03 +05:30
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/influx"
2019-07-18 21:36:05 +02:00
influxdb "github.com/influxdata/influxdb1-client/v2"
2017-11-08 19:44:03 +05:30
)
2018-05-29 16:58:03 -04:00
var influxDBClient * influx . Influx
2017-11-08 19:44:03 +05:30
type influxDBWriter struct {
buf bytes . Buffer
config * types . InfluxDB
}
var influxDBTicker * time . Ticker
const (
2019-07-18 21:36:05 +02:00
influxDBMetricsServiceReqsName = "traefik.service.requests.total"
influxDBMetricsServiceLatencyName = "traefik.service.request.duration"
influxDBRetriesTotalName = "traefik.service.retries.total"
2018-03-12 14:04:03 +05:30
influxDBConfigReloadsName = "traefik.config.reload.total"
influxDBConfigReloadsFailureName = influxDBConfigReloadsName + ".failure"
influxDBLastConfigReloadSuccessName = "traefik.config.reload.lastSuccessTimestamp"
influxDBLastConfigReloadFailureName = "traefik.config.reload.lastFailureTimestamp"
2019-07-18 21:36:05 +02:00
influxDBEntryPointReqsName = "traefik.entrypoint.requests.total"
influxDBEntryPointReqDurationName = "traefik.entrypoint.request.duration"
influxDBEntryPointOpenConnsName = "traefik.entrypoint.connections.open"
influxDBOpenConnsName = "traefik.service.connections.open"
influxDBServerUpName = "traefik.service.server.up"
2017-11-08 19:44:03 +05:30
)
2018-11-14 10:18:03 +01:00
const (
protocolHTTP = "http"
protocolUDP = "udp"
)
2017-11-08 19:44:03 +05:30
// RegisterInfluxDB registers the metrics pusher if this didn't happen yet and creates a InfluxDB Registry instance.
2018-11-14 10:18:03 +01:00
func RegisterInfluxDB ( ctx context . Context , config * types . InfluxDB ) Registry {
2018-05-29 16:58:03 -04:00
if influxDBClient == nil {
2018-11-14 10:18:03 +01:00
influxDBClient = initInfluxDBClient ( ctx , config )
2018-05-29 16:58:03 -04:00
}
2017-11-08 19:44:03 +05:30
if influxDBTicker == nil {
2019-07-18 21:36:05 +02:00
influxDBTicker = initInfluxDBTicker ( ctx , config )
2017-11-08 19:44:03 +05:30
}
2019-07-18 21:36:05 +02:00
registry := & standardRegistry {
configReloadsCounter : influxDBClient . NewCounter ( influxDBConfigReloadsName ) ,
configReloadsFailureCounter : influxDBClient . NewCounter ( influxDBConfigReloadsFailureName ) ,
lastConfigReloadSuccessGauge : influxDBClient . NewGauge ( influxDBLastConfigReloadSuccessName ) ,
lastConfigReloadFailureGauge : influxDBClient . NewGauge ( influxDBLastConfigReloadFailureName ) ,
2017-11-08 19:44:03 +05:30
}
2019-07-18 21:36:05 +02:00
if config . AddEntryPointsLabels {
registry . epEnabled = config . AddEntryPointsLabels
registry . entryPointReqsCounter = influxDBClient . NewCounter ( influxDBEntryPointReqsName )
registry . entryPointReqDurationHistogram = influxDBClient . NewHistogram ( influxDBEntryPointReqDurationName )
registry . entryPointOpenConnsGauge = influxDBClient . NewGauge ( influxDBEntryPointOpenConnsName )
}
if config . AddServicesLabels {
registry . svcEnabled = config . AddServicesLabels
registry . serviceReqsCounter = influxDBClient . NewCounter ( influxDBMetricsServiceReqsName )
registry . serviceReqDurationHistogram = influxDBClient . NewHistogram ( influxDBMetricsServiceLatencyName )
registry . serviceRetriesCounter = influxDBClient . NewCounter ( influxDBRetriesTotalName )
registry . serviceOpenConnsGauge = influxDBClient . NewGauge ( influxDBOpenConnsName )
registry . serviceServerUpGauge = influxDBClient . NewGauge ( influxDBServerUpName )
}
return registry
2017-11-08 19:44:03 +05:30
}
2018-05-29 16:58:03 -04:00
// initInfluxDBTicker creates a influxDBClient
2018-11-14 10:18:03 +01:00
func initInfluxDBClient ( ctx context . Context , config * types . InfluxDB ) * influx . Influx {
logger := log . FromContext ( ctx )
2018-05-29 16:58:03 -04:00
// TODO deprecated: move this switch into configuration.SetEffectiveConfiguration when web provider will be removed.
switch config . Protocol {
2018-11-14 10:18:03 +01:00
case protocolUDP :
2018-05-29 16:58:03 -04:00
if len ( config . Database ) > 0 || len ( config . RetentionPolicy ) > 0 {
2018-11-14 10:18:03 +01:00
logger . Warn ( "Database and RetentionPolicy options have no effect with UDP." )
2018-05-29 16:58:03 -04:00
config . Database = ""
config . RetentionPolicy = ""
}
2018-11-14 10:18:03 +01:00
case protocolHTTP :
2018-05-29 16:58:03 -04:00
if u , err := url . Parse ( config . Address ) ; err == nil {
if u . Scheme != "http" && u . Scheme != "https" {
2018-11-14 10:18:03 +01:00
logger . Warnf ( "InfluxDB address %s should specify a scheme (http or https): falling back on HTTP." , config . Address )
2018-05-29 16:58:03 -04:00
config . Address = "http://" + config . Address
}
} else {
2018-11-14 10:18:03 +01:00
logger . Errorf ( "Unable to parse the InfluxDB address %v: falling back on UDP." , err )
config . Protocol = protocolUDP
2018-05-29 16:58:03 -04:00
config . Database = ""
config . RetentionPolicy = ""
}
default :
2018-11-14 10:18:03 +01:00
logger . Warnf ( "Unsupported protocol %s: falling back on UDP." , config . Protocol )
config . Protocol = protocolUDP
2018-05-29 16:58:03 -04:00
config . Database = ""
config . RetentionPolicy = ""
}
return influx . New (
map [ string ] string { } ,
influxdb . BatchPointsConfig {
Database : config . Database ,
RetentionPolicy : config . RetentionPolicy ,
} ,
kitlog . LoggerFunc ( func ( keyvals ... interface { } ) error {
2018-11-14 10:18:03 +01:00
log . WithoutContext ( ) . WithField ( log . MetricsProviderName , "influxdb" ) . Info ( keyvals )
2018-05-29 16:58:03 -04:00
return nil
} ) )
}
// initInfluxDBTicker initializes metrics pusher
2019-07-18 21:36:05 +02:00
func initInfluxDBTicker ( ctx context . Context , config * types . InfluxDB ) * time . Ticker {
2019-06-17 11:48:05 +02:00
report := time . NewTicker ( time . Duration ( config . PushInterval ) )
2017-11-08 19:44:03 +05:30
safe . Go ( func ( ) {
var buf bytes . Buffer
2019-07-18 21:36:05 +02:00
influxDBClient . WriteLoop ( ctx , report . C , & influxDBWriter { buf : buf , config : config } )
2017-11-08 19:44:03 +05:30
} )
return report
}
// StopInfluxDB stops internal influxDBTicker which controls the pushing of metrics to InfluxDB Agent and resets it to `nil`
func StopInfluxDB ( ) {
if influxDBTicker != nil {
influxDBTicker . Stop ( )
}
influxDBTicker = nil
}
2018-05-29 16:58:03 -04:00
// Write creates a http or udp client and attempts to write BatchPoints.
// If a "database not found" error is encountered, a CREATE DATABASE
// query is attempted when using protocol http.
2017-11-08 19:44:03 +05:30
func ( w * influxDBWriter ) Write ( bp influxdb . BatchPoints ) error {
2018-05-29 16:58:03 -04:00
c , err := w . initWriteClient ( )
2017-11-08 19:44:03 +05:30
if err != nil {
return err
}
defer c . Close ( )
2018-05-29 16:58:03 -04:00
if writeErr := c . Write ( bp ) ; writeErr != nil {
2018-11-14 10:18:03 +01:00
ctx := log . With ( context . Background ( ) , log . Str ( log . MetricsProviderName , "influxdb" ) )
log . FromContext ( ctx ) . Errorf ( "Error while writing to InfluxDB: %s" , writeErr . Error ( ) )
if handleErr := w . handleWriteError ( ctx , c , writeErr ) ; handleErr != nil {
2018-05-29 16:58:03 -04:00
return handleErr
}
// Retry write after successful handling of writeErr
return c . Write ( bp )
}
return nil
}
2018-07-03 10:02:03 +02:00
func ( w * influxDBWriter ) initWriteClient ( ) ( influxdb . Client , error ) {
2018-05-29 16:58:03 -04:00
if w . config . Protocol == "http" {
2018-07-03 10:02:03 +02:00
return influxdb . NewHTTPClient ( influxdb . HTTPConfig {
2018-07-11 17:50:03 +02:00
Addr : w . config . Address ,
Username : w . config . Username ,
Password : w . config . Password ,
2018-05-29 16:58:03 -04:00
} )
}
2018-07-03 10:02:03 +02:00
return influxdb . NewUDPClient ( influxdb . UDPConfig {
Addr : w . config . Address ,
} )
2018-05-29 16:58:03 -04:00
}
2018-11-14 10:18:03 +01:00
func ( w * influxDBWriter ) handleWriteError ( ctx context . Context , c influxdb . Client , writeErr error ) error {
if w . config . Protocol != protocolHTTP {
2018-05-29 16:58:03 -04:00
return writeErr
}
match , matchErr := regexp . MatchString ( "database not found" , writeErr . Error ( ) )
if matchErr != nil || ! match {
return writeErr
}
qStr := fmt . Sprintf ( "CREATE DATABASE \"%s\"" , w . config . Database )
if w . config . RetentionPolicy != "" {
qStr = fmt . Sprintf ( "%s WITH NAME \"%s\"" , qStr , w . config . RetentionPolicy )
}
2018-11-14 10:18:03 +01:00
logger := log . FromContext ( ctx )
logger . Debugf ( "InfluxDB database not found: attempting to create one with %s" , qStr )
2018-05-29 16:58:03 -04:00
q := influxdb . NewQuery ( qStr , "" , "" )
response , queryErr := c . Query ( q )
if queryErr == nil && response . Error ( ) != nil {
queryErr = response . Error ( )
}
if queryErr != nil {
2018-11-14 10:18:03 +01:00
logger . Errorf ( "Error while creating the InfluxDB database %s" , queryErr )
2018-05-29 16:58:03 -04:00
return queryErr
}
2018-11-14 10:18:03 +01:00
logger . Debugf ( "Successfully created the InfluxDB database %s" , w . config . Database )
2018-05-29 16:58:03 -04:00
return nil
2017-11-08 19:44:03 +05:30
}