2022-02-09 17:32:12 +03:00
package metrics
import (
"context"
"errors"
"time"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/influx"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2api "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
influxdb2log "github.com/influxdata/influxdb-client-go/v2/log"
influxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/safe"
"github.com/traefik/traefik/v2/pkg/types"
)
var (
influxDB2Ticker * time . Ticker
influxDB2Store * influx . Influx
influxDB2Client influxdb2 . Client
)
// RegisterInfluxDB2 creates metrics exporter for InfluxDB2.
func RegisterInfluxDB2 ( ctx context . Context , config * types . InfluxDB2 ) Registry {
if influxDB2Client == nil {
var err error
if influxDB2Client , err = newInfluxDB2Client ( config ) ; err != nil {
log . FromContext ( ctx ) . Error ( err )
return nil
}
}
if influxDB2Store == nil {
influxDB2Store = influx . New (
config . AdditionalLabels ,
influxdb . BatchPointsConfig { } ,
kitlog . LoggerFunc ( func ( kv ... interface { } ) error {
2022-07-19 18:38:09 +02:00
log . FromContext ( ctx ) . Error ( kv ... )
2022-02-09 17:32:12 +03:00
return nil
} ) ,
)
influxDB2Ticker = time . NewTicker ( time . Duration ( config . PushInterval ) )
safe . Go ( func ( ) {
wc := influxDB2Client . WriteAPIBlocking ( config . Org , config . Bucket )
influxDB2Store . WriteLoop ( ctx , influxDB2Ticker . C , influxDB2Writer { wc : wc } )
} )
}
registry := & standardRegistry {
configReloadsCounter : influxDB2Store . NewCounter ( influxDBConfigReloadsName ) ,
configReloadsFailureCounter : influxDB2Store . NewCounter ( influxDBConfigReloadsFailureName ) ,
lastConfigReloadSuccessGauge : influxDB2Store . NewGauge ( influxDBLastConfigReloadSuccessName ) ,
lastConfigReloadFailureGauge : influxDB2Store . NewGauge ( influxDBLastConfigReloadFailureName ) ,
tlsCertsNotAfterTimestampGauge : influxDB2Store . NewGauge ( influxDBTLSCertsNotAfterTimestampName ) ,
}
if config . AddEntryPointsLabels {
registry . epEnabled = config . AddEntryPointsLabels
registry . entryPointReqsCounter = influxDB2Store . NewCounter ( influxDBEntryPointReqsName )
registry . entryPointReqsTLSCounter = influxDB2Store . NewCounter ( influxDBEntryPointReqsTLSName )
registry . entryPointReqDurationHistogram , _ = NewHistogramWithScale ( influxDB2Store . NewHistogram ( influxDBEntryPointReqDurationName ) , time . Second )
registry . entryPointOpenConnsGauge = influxDB2Store . NewGauge ( influxDBEntryPointOpenConnsName )
2022-09-12 17:10:09 +02:00
registry . entryPointReqsBytesCounter = influxDB2Store . NewCounter ( influxDBEntryPointReqsBytesName )
registry . entryPointRespsBytesCounter = influxDB2Store . NewCounter ( influxDBEntryPointRespsBytesName )
2022-02-09 17:32:12 +03:00
}
if config . AddRoutersLabels {
registry . routerEnabled = config . AddRoutersLabels
registry . routerReqsCounter = influxDB2Store . NewCounter ( influxDBRouterReqsName )
registry . routerReqsTLSCounter = influxDB2Store . NewCounter ( influxDBRouterReqsTLSName )
registry . routerReqDurationHistogram , _ = NewHistogramWithScale ( influxDB2Store . NewHistogram ( influxDBRouterReqsDurationName ) , time . Second )
registry . routerOpenConnsGauge = influxDB2Store . NewGauge ( influxDBORouterOpenConnsName )
2022-09-12 17:10:09 +02:00
registry . routerReqsBytesCounter = influxDB2Store . NewCounter ( influxDBRouterReqsBytesName )
registry . routerRespsBytesCounter = influxDB2Store . NewCounter ( influxDBRouterRespsBytesName )
2022-02-09 17:32:12 +03:00
}
if config . AddServicesLabels {
registry . svcEnabled = config . AddServicesLabels
registry . serviceReqsCounter = influxDB2Store . NewCounter ( influxDBServiceReqsName )
registry . serviceReqsTLSCounter = influxDB2Store . NewCounter ( influxDBServiceReqsTLSName )
registry . serviceReqDurationHistogram , _ = NewHistogramWithScale ( influxDB2Store . NewHistogram ( influxDBServiceReqsDurationName ) , time . Second )
registry . serviceRetriesCounter = influxDB2Store . NewCounter ( influxDBServiceRetriesTotalName )
registry . serviceOpenConnsGauge = influxDB2Store . NewGauge ( influxDBServiceOpenConnsName )
registry . serviceServerUpGauge = influxDB2Store . NewGauge ( influxDBServiceServerUpName )
2022-09-12 17:10:09 +02:00
registry . serviceReqsBytesCounter = influxDB2Store . NewCounter ( influxDBServiceReqsBytesName )
registry . serviceRespsBytesCounter = influxDB2Store . NewCounter ( influxDBServiceRespsBytesName )
2022-02-09 17:32:12 +03:00
}
return registry
}
// StopInfluxDB2 stops and resets InfluxDB2 client, ticker and store.
func StopInfluxDB2 ( ) {
if influxDB2Client != nil {
influxDB2Client . Close ( )
}
influxDB2Client = nil
if influxDB2Ticker != nil {
influxDB2Ticker . Stop ( )
}
influxDB2Ticker = nil
influxDB2Store = nil
}
// newInfluxDB2Client creates an influxdb2.Client.
func newInfluxDB2Client ( config * types . InfluxDB2 ) ( influxdb2 . Client , error ) {
if config . Token == "" || config . Org == "" || config . Bucket == "" {
return nil , errors . New ( "token, org or bucket property is missing" )
}
// Disable InfluxDB2 logs.
// See https://github.com/influxdata/influxdb-client-go/blob/v2.7.0/options.go#L128
influxdb2log . Log = nil
return influxdb2 . NewClient ( config . Address , config . Token ) , nil
}
type influxDB2Writer struct {
wc influxdb2api . WriteAPIBlocking
}
func ( w influxDB2Writer ) Write ( bp influxdb . BatchPoints ) error {
ctx := log . With ( context . Background ( ) , log . Str ( log . MetricsProviderName , "influxdb2" ) )
logger := log . FromContext ( ctx )
wps := make ( [ ] * write . Point , 0 , len ( bp . Points ( ) ) )
for _ , p := range bp . Points ( ) {
fields , err := p . Fields ( )
if err != nil {
logger . Errorf ( "Error while getting %s point fields: %s" , p . Name ( ) , err )
continue
}
wps = append ( wps , influxdb2 . NewPoint (
p . Name ( ) ,
p . Tags ( ) ,
fields ,
p . Time ( ) ,
) )
}
return w . wc . WritePoint ( ctx , wps ... )
}