1
0
mirror of https://github.com/containous/traefik.git synced 2025-01-10 01:17:55 +03:00

Add InfluxDB support for traefik metrics

This commit is contained in:
Aditya C S 2017-11-08 19:44:03 +05:30 committed by Traefiker
parent e3131481e9
commit 00d7c5972f
35 changed files with 4693 additions and 28 deletions

View File

@ -66,7 +66,7 @@ Run it and forget it!
- Hot-reloading of configuration. No need to restart the process
- Circuit breakers, retry
- Round Robin, rebalancer load-balancers
- Metrics (Rest, Prometheus, Datadog, Statd)
- Metrics (Rest, Prometheus, Datadog, Statsd, InfluxDB)
- Clean AngularJS Web UI
- Websocket, HTTP/2, GRPC ready
- Access Logs (JSON, CLF)

View File

@ -63,6 +63,10 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
Address: "localhost:8125",
PushInterval: "10s",
},
InfluxDB: &types.InfluxDB{
Address: "localhost:8089",
PushInterval: "10s",
},
}
// default Marathon

View File

@ -158,6 +158,31 @@ pushinterval = "10s"
# ...
```
### InfluxDB
```toml
[web]
# ...
# InfluxDB metrics exporter type
[web.metrics.influxdb]
# InfluxDB's address.
#
# Required
# Default: "localhost:8089"
#
address = "localhost:8089"
# InfluxDB push interval
#
# Optional
# Default: "10s"
#
pushinterval = "10s"
# ...
```
## Statistics

View File

@ -44,7 +44,7 @@ Run it and forget it!
- Hot-reloading of configuration. No need to restart the process
- Circuit breakers, retry
- Round Robin, rebalancer load-balancers
- Metrics (Rest, Prometheus, Datadog, Statd)
- Metrics (Rest, Prometheus, Datadog, Statsd, InfluxDB)
- Clean AngularJS Web UI
- Websocket, HTTP/2, GRPC ready
- Access Logs (JSON, CLF)

14
glide.lock generated
View File

@ -1,4 +1,4 @@
hash: 1d18b9c76989feed304a8781b18ea24b43e858091b775316af97bfc210a06ea0
hash: 7fd36649e80749e16bbfa69777e0f90a017fbc2f67d7efd46373716a16b1a60a
updated: 2017-11-02T11:39:20.438135-04:00
imports:
- name: cloud.google.com/go
@ -250,6 +250,8 @@ imports:
- log
- metrics
- metrics/dogstatsd
- metrics/generic
- metrics/influx
- metrics/internal/lv
- metrics/internal/ratemap
- metrics/multi
@ -311,6 +313,12 @@ imports:
version: 3959339b333561bf62a38b424fd41517c2c90f40
- name: github.com/imdario/mergo
version: 3e95a51e0639b4cf372f2ccf74c86749d747fbdc
- name: github.com/influxdata/influxdb
version: 2d474a3089bcfce6b472779be9470a1f0ef3d5e4
subpackages:
- client/v2
- models
- pkg/escape
- name: github.com/JamesClonk/vultr
version: 2fd0705ce648e602e6c9c57329a174270a4f6688
subpackages:
@ -483,6 +491,8 @@ imports:
version: 824e85271811af89640ea25620c67f6c2eed987e
- name: github.com/urfave/negroni
version: 490e6a555d47ca891a89a150d0c1ef3922dfffe9
- name: github.com/VividCortex/gohistogram
version: 51564d9861991fb0ad0f531c99ef602d0f9866e6
- name: github.com/vulcand/oxy
version: 7e9763c4dc71b9758379da3581e6495c145caaab
repo: https://github.com/containous/oxy.git
@ -805,7 +815,7 @@ testImports:
- libcontainer/system
- libcontainer/user
- name: github.com/stvp/go-udp-testing
version: 06eb4f886d9f8242b0c176cf0d3ce5ec2cedda05
version: c4434f09ec131ecf30f986d5dcb1636508bfa49a
- name: github.com/vdemeester/shakers
version: 24d7f1d6a71aa5d9cbe7390e4afb66b7eef9e1b3
- name: github.com/xeipuuv/gojsonpointer

View File

@ -85,6 +85,10 @@ import:
version: ^1.1.0
- package: k8s.io/client-go
version: v2.0.0
- package: github.com/influxdata/influxdb
version: v1.3.7
subpackages:
- client/v2
- package: github.com/gambol99/go-marathon
version: dd6cbd4c2d71294a19fb89158f2a00d427f174ab
- package: github.com/ArthurHlt/go-eureka-client
@ -103,10 +107,13 @@ import:
- log
- metrics
- metrics/dogstatsd
- metrics/internal/lv
- metrics/internal/ratemap
- metrics/multi
- metrics/prometheus
- metrics/statsd
- util/conn
- metrics/influx
- package: github.com/prometheus/client_golang
version: 08fd2e12372a66e68e30523c7642e0cbc3e4fbde
subpackages:

90
metrics/influxdb.go Normal file
View File

@ -0,0 +1,90 @@
package metrics
import (
"bytes"
"time"
"github.com/containous/traefik/log"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/influx"
influxdb "github.com/influxdata/influxdb/client/v2"
)
var influxDBClient = influx.New(map[string]string{}, influxdb.BatchPointsConfig{}, kitlog.LoggerFunc(func(keyvals ...interface{}) error {
log.Info(keyvals)
return nil
}))
type influxDBWriter struct {
buf bytes.Buffer
config *types.InfluxDB
}
var influxDBTicker *time.Ticker
const (
influxDBMetricsReqsName = "traefik.requests.total"
influxDBMetricsLatencyName = "traefik.request.duration"
influxDBRetriesTotalName = "traefik.backend.retries.total"
)
// RegisterInfluxDB registers the metrics pusher if this didn't happen yet and creates a InfluxDB Registry instance.
func RegisterInfluxDB(config *types.InfluxDB) Registry {
if influxDBTicker == nil {
influxDBTicker = initInfluxDBTicker(config)
}
return &standardRegistry{
enabled: true,
reqsCounter: influxDBClient.NewCounter(influxDBMetricsReqsName),
reqDurationHistogram: influxDBClient.NewHistogram(influxDBMetricsLatencyName),
retriesCounter: influxDBClient.NewCounter(influxDBRetriesTotalName),
}
}
// initInfluxDBTicker initializes metrics pusher and creates a influxDBClient if not created already
func initInfluxDBTicker(config *types.InfluxDB) *time.Ticker {
address := config.Address
if len(address) == 0 {
address = "localhost:8089"
}
pushInterval, err := time.ParseDuration(config.PushInterval)
if err != nil {
log.Warnf("Unable to parse %s into pushInterval, using 10s as default value", config.PushInterval)
pushInterval = 10 * time.Second
}
report := time.NewTicker(pushInterval)
safe.Go(func() {
var buf bytes.Buffer
influxDBClient.WriteLoop(report.C, &influxDBWriter{buf: buf, config: config})
})
return report
}
// StopInfluxDB stops internal influxDBTicker which controls the pushing of metrics to InfluxDB Agent and resets it to `nil`
func StopInfluxDB() {
if influxDBTicker != nil {
influxDBTicker.Stop()
}
influxDBTicker = nil
}
func (w *influxDBWriter) Write(bp influxdb.BatchPoints) error {
c, err := influxdb.NewUDPClient(influxdb.UDPConfig{
Addr: w.config.Address,
})
if err != nil {
return err
}
defer c.Close()
return c.Write(bp)
}

53
metrics/influxdb_test.go Normal file
View File

@ -0,0 +1,53 @@
package metrics
import (
"net/http"
"regexp"
"strconv"
"testing"
"time"
"github.com/containous/traefik/types"
"github.com/stvp/go-udp-testing"
)
func TestInfluxDB(t *testing.T) {
udp.SetAddr(":8089")
// This is needed to make sure that UDP Listener listens for data a bit longer, otherwise it will quit after a millisecond
udp.Timeout = 5 * time.Second
influxDBRegistry := RegisterInfluxDB(&types.InfluxDB{Address: ":8089", PushInterval: "1s"})
defer StopInfluxDB()
if !influxDBRegistry.IsEnabled() {
t.Fatalf("InfluxDB registry must be enabled")
}
expected := []string{
"(traefik.requests.total,code=200,method=GET,service=test count=1) [0-9]{19}",
"(traefik.requests.total,code=404,method=GET,service=test count=1) [0-9]{19}",
"(traefik.request.duration,code=200,method=GET,service=test p50=10000,p90=10000,p95=10000,p99=10000) [0-9]{19}",
"(traefik.backend.retries.total,code=404,method=GET,service=test count=2) [0-9]{19}",
}
msg := udp.ReceiveString(t, func() {
influxDBRegistry.ReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDBRegistry.ReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
influxDBRegistry.RetriesCounter().With("service", "test").Add(1)
influxDBRegistry.RetriesCounter().With("service", "test").Add(1)
influxDBRegistry.ReqDurationHistogram().With("service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
})
extractAndMatchMessage(t, expected, msg)
}
func extractAndMatchMessage(t *testing.T, patterns []string, msg string) {
t.Helper()
for _, pattern := range patterns {
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(msg)
if len(match) != 2 {
t.Errorf("Got %q %v, want %q", msg, match, pattern)
}
}
}

View File

@ -17,6 +17,12 @@ var statsdClient = statsd.New("traefik.", kitlog.LoggerFunc(func(keyvals ...inte
var statsdTicker *time.Ticker
const (
statsdMetricsReqsName = "requests.total"
statsdMetricsLatencyName = "request.duration"
statsdRetriesTotalName = "backend.retries.total"
)
// RegisterStatsd registers the metrics pusher if this didn't happen yet and creates a statsd Registry instance.
func RegisterStatsd(config *types.Statsd) Registry {
if statsdTicker == nil {
@ -25,9 +31,9 @@ func RegisterStatsd(config *types.Statsd) Registry {
return &standardRegistry{
enabled: true,
reqsCounter: statsdClient.NewCounter(ddMetricsReqsName, 1.0),
reqDurationHistogram: statsdClient.NewTiming(ddMetricsLatencyName, 1.0),
retriesCounter: statsdClient.NewCounter(ddRetriesTotalName, 1.0),
reqsCounter: statsdClient.NewCounter(statsdMetricsReqsName, 1.0),
reqDurationHistogram: statsdClient.NewTiming(statsdMetricsLatencyName, 1.0),
retriesCounter: statsdClient.NewCounter(statsdRetriesTotalName, 1.0),
}
}

View File

@ -18,7 +18,7 @@ func TestStatsD(t *testing.T) {
defer StopStatsd()
if !statsdRegistry.IsEnabled() {
t.Errorf("PrometheusRegistry should return true for IsEnabled()")
t.Errorf("Statsd registry should return true for IsEnabled()")
}
expected := []string{

View File

@ -1244,6 +1244,10 @@ func (server *Server) registerMetricClients(metricsConfig *types.Metrics) {
registries = append(registries, metrics.RegisterStatsd(metricsConfig.StatsD))
log.Debugf("Configured StatsD metrics pushing to %s once every %s", metricsConfig.StatsD.Address, metricsConfig.StatsD.PushInterval)
}
if metricsConfig.InfluxDB != nil {
registries = append(registries, metrics.RegisterInfluxDB(metricsConfig.InfluxDB))
log.Debugf("Configured InfluxDB metrics pushing to %s once every %s", metricsConfig.InfluxDB.Address, metricsConfig.InfluxDB.PushInterval)
}
if len(registries) > 0 {
server.metricsRegistry = metrics.NewMultiRegistry(registries)
@ -1253,6 +1257,7 @@ func (server *Server) registerMetricClients(metricsConfig *types.Metrics) {
func stopMetricsClients() {
metrics.StopDatadog()
metrics.StopStatsd()
metrics.StopInfluxDB()
}
func (server *Server) buildRateLimiter(handler http.Handler, rlConfig *types.RateLimit) (http.Handler, error) {

View File

@ -368,6 +368,7 @@ type Metrics struct {
Prometheus *Prometheus `description:"Prometheus metrics exporter type" export:"true"`
Datadog *Datadog `description:"DataDog metrics exporter type" export:"true"`
StatsD *Statsd `description:"StatsD metrics exporter type" export:"true"`
InfluxDB *InfluxDB `description:"InfluxDB metrics exporter type"`
}
// Prometheus can contain specific configuration used by the Prometheus Metrics exporter
@ -384,7 +385,13 @@ type Datadog struct {
// Statsd contains address and metrics pushing interval configuration
type Statsd struct {
Address string `description:"StatsD address"`
PushInterval string `description:"DataDog push interval" export:"true"`
PushInterval string `description:"StatsD push interval" export:"true"`
}
// InfluxDB contains address and metrics pushing interval configuration
type InfluxDB struct {
Address string `description:"InfluxDB address"`
PushInterval string `description:"InfluxDB push interval"`
}
// Buckets holds Prometheus Buckets

19
vendor/github.com/VividCortex/gohistogram/LICENSE generated vendored Normal file
View File

@ -0,0 +1,19 @@
Copyright (c) 2013 VividCortex
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

23
vendor/github.com/VividCortex/gohistogram/histogram.go generated vendored Normal file
View File

@ -0,0 +1,23 @@
package gohistogram
// Copyright (c) 2013 VividCortex, Inc. All rights reserved.
// Please see the LICENSE file for applicable license terms.
// Histogram is the interface that wraps the Add and Quantile methods.
type Histogram interface {
// Add adds a new value, n, to the histogram. Trimming is done
// automatically.
Add(n float64)
// Quantile returns an approximation.
Quantile(n float64) (q float64)
// String returns a string reprentation of the histogram,
// which is useful for printing to a terminal.
String() (str string)
}
type bin struct {
value float64
count float64
}

View File

@ -0,0 +1,160 @@
package gohistogram
// Copyright (c) 2013 VividCortex, Inc. All rights reserved.
// Please see the LICENSE file for applicable license terms.
import (
"fmt"
)
type NumericHistogram struct {
bins []bin
maxbins int
total uint64
}
// NewHistogram returns a new NumericHistogram with a maximum of n bins.
//
// There is no "optimal" bin count, but somewhere between 20 and 80 bins
// should be sufficient.
func NewHistogram(n int) *NumericHistogram {
return &NumericHistogram{
bins: make([]bin, 0),
maxbins: n,
total: 0,
}
}
func (h *NumericHistogram) Add(n float64) {
defer h.trim()
h.total++
for i := range h.bins {
if h.bins[i].value == n {
h.bins[i].count++
return
}
if h.bins[i].value > n {
newbin := bin{value: n, count: 1}
head := append(make([]bin, 0), h.bins[0:i]...)
head = append(head, newbin)
tail := h.bins[i:]
h.bins = append(head, tail...)
return
}
}
h.bins = append(h.bins, bin{count: 1, value: n})
}
func (h *NumericHistogram) Quantile(q float64) float64 {
count := q * float64(h.total)
for i := range h.bins {
count -= float64(h.bins[i].count)
if count <= 0 {
return h.bins[i].value
}
}
return -1
}
// CDF returns the value of the cumulative distribution function
// at x
func (h *NumericHistogram) CDF(x float64) float64 {
count := 0.0
for i := range h.bins {
if h.bins[i].value <= x {
count += float64(h.bins[i].count)
}
}
return count / float64(h.total)
}
// Mean returns the sample mean of the distribution
func (h *NumericHistogram) Mean() float64 {
if h.total == 0 {
return 0
}
sum := 0.0
for i := range h.bins {
sum += h.bins[i].value * h.bins[i].count
}
return sum / float64(h.total)
}
// Variance returns the variance of the distribution
func (h *NumericHistogram) Variance() float64 {
if h.total == 0 {
return 0
}
sum := 0.0
mean := h.Mean()
for i := range h.bins {
sum += (h.bins[i].count * (h.bins[i].value - mean) * (h.bins[i].value - mean))
}
return sum / float64(h.total)
}
func (h *NumericHistogram) Count() float64 {
return float64(h.total)
}
// trim merges adjacent bins to decrease the bin count to the maximum value
func (h *NumericHistogram) trim() {
for len(h.bins) > h.maxbins {
// Find closest bins in terms of value
minDelta := 1e99
minDeltaIndex := 0
for i := range h.bins {
if i == 0 {
continue
}
if delta := h.bins[i].value - h.bins[i-1].value; delta < minDelta {
minDelta = delta
minDeltaIndex = i
}
}
// We need to merge bins minDeltaIndex-1 and minDeltaIndex
totalCount := h.bins[minDeltaIndex-1].count + h.bins[minDeltaIndex].count
mergedbin := bin{
value: (h.bins[minDeltaIndex-1].value*
h.bins[minDeltaIndex-1].count +
h.bins[minDeltaIndex].value*
h.bins[minDeltaIndex].count) /
totalCount, // weighted average
count: totalCount, // summed heights
}
head := append(make([]bin, 0), h.bins[0:minDeltaIndex-1]...)
tail := append([]bin{mergedbin}, h.bins[minDeltaIndex+1:]...)
h.bins = append(head, tail...)
}
}
// String returns a string reprentation of the histogram,
// which is useful for printing to a terminal.
func (h *NumericHistogram) String() (str string) {
str += fmt.Sprintln("Total:", h.total)
for i := range h.bins {
var bar string
for j := 0; j < int(float64(h.bins[i].count)/float64(h.total)*200); j++ {
bar += "."
}
str += fmt.Sprintln(h.bins[i].value, "\t", bar)
}
return
}

View File

@ -0,0 +1,190 @@
// Package gohistogram contains implementations of weighted and exponential histograms.
package gohistogram
// Copyright (c) 2013 VividCortex, Inc. All rights reserved.
// Please see the LICENSE file for applicable license terms.
import "fmt"
// A WeightedHistogram implements Histogram. A WeightedHistogram has bins that have values
// which are exponentially weighted moving averages. This allows you keep inserting large
// amounts of data into the histogram and approximate quantiles with recency factored in.
type WeightedHistogram struct {
bins []bin
maxbins int
total float64
alpha float64
}
// NewWeightedHistogram returns a new WeightedHistogram with a maximum of n bins with a decay factor
// of alpha.
//
// There is no "optimal" bin count, but somewhere between 20 and 80 bins should be
// sufficient.
//
// Alpha should be set to 2 / (N+1), where N represents the average age of the moving window.
// For example, a 60-second window with an average age of 30 seconds would yield an
// alpha of 0.064516129.
func NewWeightedHistogram(n int, alpha float64) *WeightedHistogram {
return &WeightedHistogram{
bins: make([]bin, 0),
maxbins: n,
total: 0,
alpha: alpha,
}
}
func ewma(existingVal float64, newVal float64, alpha float64) (result float64) {
result = newVal*(1-alpha) + existingVal*alpha
return
}
func (h *WeightedHistogram) scaleDown(except int) {
for i := range h.bins {
if i != except {
h.bins[i].count = ewma(h.bins[i].count, 0, h.alpha)
}
}
}
func (h *WeightedHistogram) Add(n float64) {
defer h.trim()
for i := range h.bins {
if h.bins[i].value == n {
h.bins[i].count++
defer h.scaleDown(i)
return
}
if h.bins[i].value > n {
newbin := bin{value: n, count: 1}
head := append(make([]bin, 0), h.bins[0:i]...)
head = append(head, newbin)
tail := h.bins[i:]
h.bins = append(head, tail...)
defer h.scaleDown(i)
return
}
}
h.bins = append(h.bins, bin{count: 1, value: n})
}
func (h *WeightedHistogram) Quantile(q float64) float64 {
count := q * h.total
for i := range h.bins {
count -= float64(h.bins[i].count)
if count <= 0 {
return h.bins[i].value
}
}
return -1
}
// CDF returns the value of the cumulative distribution function
// at x
func (h *WeightedHistogram) CDF(x float64) float64 {
count := 0.0
for i := range h.bins {
if h.bins[i].value <= x {
count += float64(h.bins[i].count)
}
}
return count / h.total
}
// Mean returns the sample mean of the distribution
func (h *WeightedHistogram) Mean() float64 {
if h.total == 0 {
return 0
}
sum := 0.0
for i := range h.bins {
sum += h.bins[i].value * h.bins[i].count
}
return sum / h.total
}
// Variance returns the variance of the distribution
func (h *WeightedHistogram) Variance() float64 {
if h.total == 0 {
return 0
}
sum := 0.0
mean := h.Mean()
for i := range h.bins {
sum += (h.bins[i].count * (h.bins[i].value - mean) * (h.bins[i].value - mean))
}
return sum / h.total
}
func (h *WeightedHistogram) Count() float64 {
return h.total
}
func (h *WeightedHistogram) trim() {
total := 0.0
for i := range h.bins {
total += h.bins[i].count
}
h.total = total
for len(h.bins) > h.maxbins {
// Find closest bins in terms of value
minDelta := 1e99
minDeltaIndex := 0
for i := range h.bins {
if i == 0 {
continue
}
if delta := h.bins[i].value - h.bins[i-1].value; delta < minDelta {
minDelta = delta
minDeltaIndex = i
}
}
// We need to merge bins minDeltaIndex-1 and minDeltaIndex
totalCount := h.bins[minDeltaIndex-1].count + h.bins[minDeltaIndex].count
mergedbin := bin{
value: (h.bins[minDeltaIndex-1].value*
h.bins[minDeltaIndex-1].count +
h.bins[minDeltaIndex].value*
h.bins[minDeltaIndex].count) /
totalCount, // weighted average
count: totalCount, // summed heights
}
head := append(make([]bin, 0), h.bins[0:minDeltaIndex-1]...)
tail := append([]bin{mergedbin}, h.bins[minDeltaIndex+1:]...)
h.bins = append(head, tail...)
}
}
// String returns a string reprentation of the histogram,
// which is useful for printing to a terminal.
func (h *WeightedHistogram) String() (str string) {
str += fmt.Sprintln("Total:", h.total)
for i := range h.bins {
var bar string
for j := 0; j < int(float64(h.bins[i].count)/float64(h.total)*200); j++ {
bar += "."
}
str += fmt.Sprintln(h.bins[i].value, "\t", bar)
}
return
}

218
vendor/github.com/go-kit/kit/metrics/generic/generic.go generated vendored Normal file
View File

@ -0,0 +1,218 @@
// Package generic implements generic versions of each of the metric types. They
// can be embedded by other implementations, and converted to specific formats
// as necessary.
package generic
import (
"fmt"
"io"
"math"
"sync"
"sync/atomic"
"github.com/VividCortex/gohistogram"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/internal/lv"
)
// Counter is an in-memory implementation of a Counter.
type Counter struct {
Name string
lvs lv.LabelValues
bits uint64
}
// NewCounter returns a new, usable Counter.
func NewCounter(name string) *Counter {
return &Counter{
Name: name,
}
}
// With implements Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{
bits: atomic.LoadUint64(&c.bits),
lvs: c.lvs.With(labelValues...),
}
}
// Add implements Counter.
func (c *Counter) Add(delta float64) {
for {
var (
old = atomic.LoadUint64(&c.bits)
newf = math.Float64frombits(old) + delta
new = math.Float64bits(newf)
)
if atomic.CompareAndSwapUint64(&c.bits, old, new) {
break
}
}
}
// Value returns the current value of the counter.
func (c *Counter) Value() float64 {
return math.Float64frombits(atomic.LoadUint64(&c.bits))
}
// ValueReset returns the current value of the counter, and resets it to zero.
// This is useful for metrics backends whose counter aggregations expect deltas,
// like Graphite.
func (c *Counter) ValueReset() float64 {
for {
var (
old = atomic.LoadUint64(&c.bits)
newf = 0.0
new = math.Float64bits(newf)
)
if atomic.CompareAndSwapUint64(&c.bits, old, new) {
return math.Float64frombits(old)
}
}
}
// LabelValues returns the set of label values attached to the counter.
func (c *Counter) LabelValues() []string {
return c.lvs
}
// Gauge is an in-memory implementation of a Gauge.
type Gauge struct {
Name string
lvs lv.LabelValues
bits uint64
}
// NewGauge returns a new, usable Gauge.
func NewGauge(name string) *Gauge {
return &Gauge{
Name: name,
}
}
// With implements Gauge.
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
return &Gauge{
bits: atomic.LoadUint64(&g.bits),
lvs: g.lvs.With(labelValues...),
}
}
// Set implements Gauge.
func (g *Gauge) Set(value float64) {
atomic.StoreUint64(&g.bits, math.Float64bits(value))
}
// Value returns the current value of the gauge.
func (g *Gauge) Value() float64 {
return math.Float64frombits(atomic.LoadUint64(&g.bits))
}
// LabelValues returns the set of label values attached to the gauge.
func (g *Gauge) LabelValues() []string {
return g.lvs
}
// Histogram is an in-memory implementation of a streaming histogram, based on
// VividCortex/gohistogram. It dynamically computes quantiles, so it's not
// suitable for aggregation.
type Histogram struct {
Name string
lvs lv.LabelValues
h gohistogram.Histogram
}
// NewHistogram returns a numeric histogram based on VividCortex/gohistogram. A
// good default value for buckets is 50.
func NewHistogram(name string, buckets int) *Histogram {
return &Histogram{
Name: name,
h: gohistogram.NewHistogram(buckets),
}
}
// With implements Histogram.
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
return &Histogram{
lvs: h.lvs.With(labelValues...),
h: h.h,
}
}
// Observe implements Histogram.
func (h *Histogram) Observe(value float64) {
h.h.Add(value)
}
// Quantile returns the value of the quantile q, 0.0 < q < 1.0.
func (h *Histogram) Quantile(q float64) float64 {
return h.h.Quantile(q)
}
// LabelValues returns the set of label values attached to the histogram.
func (h *Histogram) LabelValues() []string {
return h.lvs
}
// Print writes a string representation of the histogram to the passed writer.
// Useful for printing to a terminal.
func (h *Histogram) Print(w io.Writer) {
fmt.Fprintf(w, h.h.String())
}
// Bucket is a range in a histogram which aggregates observations.
type Bucket struct {
From, To, Count int64
}
// Quantile is a pair of a quantile (0..100) and its observed maximum value.
type Quantile struct {
Quantile int // 0..100
Value int64
}
// SimpleHistogram is an in-memory implementation of a Histogram. It only tracks
// an approximate moving average, so is likely too naïve for many use cases.
type SimpleHistogram struct {
mtx sync.RWMutex
lvs lv.LabelValues
avg float64
n uint64
}
// NewSimpleHistogram returns a SimpleHistogram, ready for observations.
func NewSimpleHistogram() *SimpleHistogram {
return &SimpleHistogram{}
}
// With implements Histogram.
func (h *SimpleHistogram) With(labelValues ...string) metrics.Histogram {
return &SimpleHistogram{
lvs: h.lvs.With(labelValues...),
avg: h.avg,
n: h.n,
}
}
// Observe implements Histogram.
func (h *SimpleHistogram) Observe(value float64) {
h.mtx.Lock()
defer h.mtx.Unlock()
h.n++
h.avg -= h.avg / float64(h.n)
h.avg += value / float64(h.n)
}
// ApproximateMovingAverage returns the approximate moving average of observations.
func (h *SimpleHistogram) ApproximateMovingAverage() float64 {
h.mtx.RLock()
defer h.mtx.RUnlock()
return h.avg
}
// LabelValues returns the set of label values attached to the histogram.
func (h *SimpleHistogram) LabelValues() []string {
return h.lvs
}

255
vendor/github.com/go-kit/kit/metrics/influx/influx.go generated vendored Normal file
View File

@ -0,0 +1,255 @@
// Package influx provides an InfluxDB implementation for metrics. The model is
// similar to other push-based instrumentation systems. Observations are
// aggregated locally and emitted to the Influx server on regular intervals.
package influx
import (
"time"
influxdb "github.com/influxdata/influxdb/client/v2"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/go-kit/kit/metrics/internal/lv"
)
// Influx is a store for metrics that will be emitted to an Influx database.
//
// Influx is a general purpose time-series database, and has no native concepts
// of counters, gauges, or histograms. Counters are modeled as a timeseries with
// one data point per flush, with a "count" field that reflects all adds since
// the last flush. Gauges are modeled as a timeseries with one data point per
// flush, with a "value" field that reflects the current state of the gauge.
// Histograms are modeled as a timeseries with one data point per combination of tags,
// with a set of quantile fields that reflects the p50, p90, p95 & p99.
//
// Influx tags are attached to the Influx object, can be given to each
// metric at construction and can be updated anytime via With function. Influx fields
// are mapped to Go kit label values directly by this collector. Actual metric
// values are provided as fields with specific names depending on the metric.
//
// All observations are collected in memory locally, and flushed on demand.
type Influx struct {
counters *lv.Space
gauges *lv.Space
histograms *lv.Space
tags map[string]string
conf influxdb.BatchPointsConfig
logger log.Logger
}
// New returns an Influx, ready to create metrics and collect observations. Tags
// are applied to all metrics created from this object. The BatchPointsConfig is
// used during flushing.
func New(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx {
return &Influx{
counters: lv.NewSpace(),
gauges: lv.NewSpace(),
histograms: lv.NewSpace(),
tags: tags,
conf: conf,
logger: logger,
}
}
// NewCounter returns an Influx counter.
func (in *Influx) NewCounter(name string) *Counter {
return &Counter{
name: name,
obs: in.counters.Observe,
}
}
// NewGauge returns an Influx gauge.
func (in *Influx) NewGauge(name string) *Gauge {
return &Gauge{
name: name,
obs: in.gauges.Observe,
}
}
// NewHistogram returns an Influx histogram.
func (in *Influx) NewHistogram(name string) *Histogram {
return &Histogram{
name: name,
obs: in.histograms.Observe,
}
}
// BatchPointsWriter captures a subset of the influxdb.Client methods necessary
// for emitting metrics observations.
type BatchPointsWriter interface {
Write(influxdb.BatchPoints) error
}
// WriteLoop is a helper method that invokes WriteTo to the passed writer every
// time the passed channel fires. This method blocks until the channel is
// closed, so clients probably want to run it in its own goroutine. For typical
// usage, create a time.Ticker and pass its C channel to this method.
func (in *Influx) WriteLoop(c <-chan time.Time, w BatchPointsWriter) {
for range c {
if err := in.WriteTo(w); err != nil {
in.logger.Log("during", "WriteTo", "err", err)
}
}
}
// WriteTo flushes the buffered content of the metrics to the writer, in an
// Influx BatchPoints format. WriteTo abides best-effort semantics, so
// observations are lost if there is a problem with the write. Clients should be
// sure to call WriteTo regularly, ideally through the WriteLoop helper method.
func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
bp, err := influxdb.NewBatchPoints(in.conf)
if err != nil {
return err
}
now := time.Now()
in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
tags := mergeTags(in.tags, lvs)
var p *influxdb.Point
fields := map[string]interface{}{"count": sum(values)}
p, err = influxdb.NewPoint(name, tags, fields, now)
if err != nil {
return false
}
bp.AddPoint(p)
return true
})
if err != nil {
return err
}
in.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
tags := mergeTags(in.tags, lvs)
var p *influxdb.Point
fields := map[string]interface{}{"value": last(values)}
p, err = influxdb.NewPoint(name, tags, fields, now)
if err != nil {
return false
}
bp.AddPoint(p)
return true
})
if err != nil {
return err
}
in.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
histogram := generic.NewHistogram(name, 50)
tags := mergeTags(in.tags, lvs)
var p *influxdb.Point
for _, v := range values {
histogram.Observe(v)
}
fields := map[string]interface{}{
"p50": histogram.Quantile(0.50),
"p90": histogram.Quantile(0.90),
"p95": histogram.Quantile(0.95),
"p99": histogram.Quantile(0.99),
}
p, err = influxdb.NewPoint(name, tags, fields, now)
if err != nil {
return false
}
bp.AddPoint(p)
return true
})
if err != nil {
return err
}
return w.Write(bp)
}
func mergeTags(tags map[string]string, labelValues []string) map[string]string {
if len(labelValues)%2 != 0 {
panic("mergeTags received a labelValues with an odd number of strings")
}
for i := 0; i < len(labelValues); i += 2 {
tags[labelValues[i]] = labelValues[i+1]
}
return tags
}
func sum(a []float64) float64 {
var v float64
for _, f := range a {
v += f
}
return v
}
func last(a []float64) float64 {
return a[len(a)-1]
}
type observeFunc func(name string, lvs lv.LabelValues, value float64)
// Counter is an Influx counter. Observations are forwarded to an Influx
// object, and aggregated (summed) per timeseries.
type Counter struct {
name string
lvs lv.LabelValues
obs observeFunc
}
// With implements metrics.Counter.
func (c *Counter) With(labelValues ...string) metrics.Counter {
return &Counter{
name: c.name,
lvs: c.lvs.With(labelValues...),
obs: c.obs,
}
}
// Add implements metrics.Counter.
func (c *Counter) Add(delta float64) {
c.obs(c.name, c.lvs, delta)
}
// Gauge is an Influx gauge. Observations are forwarded to a Dogstatsd
// object, and aggregated (the last observation selected) per timeseries.
type Gauge struct {
name string
lvs lv.LabelValues
obs observeFunc
}
// With implements metrics.Gauge.
func (g *Gauge) With(labelValues ...string) metrics.Gauge {
return &Gauge{
name: g.name,
lvs: g.lvs.With(labelValues...),
obs: g.obs,
}
}
// Set implements metrics.Gauge.
func (g *Gauge) Set(value float64) {
g.obs(g.name, g.lvs, value)
}
// Histogram is an Influx histrogram. Observations are aggregated into a
// generic.Histogram and emitted as per-quantile gauges to the Influx server.
type Histogram struct {
name string
lvs lv.LabelValues
obs observeFunc
}
// With implements metrics.Histogram.
func (h *Histogram) With(labelValues ...string) metrics.Histogram {
return &Histogram{
name: h.name,
lvs: h.lvs.With(labelValues...),
obs: h.obs,
}
}
// Observe implements metrics.Histogram.
func (h *Histogram) Observe(value float64) {
h.obs(h.name, h.lvs, value)
}

20
vendor/github.com/influxdata/influxdb/LICENSE generated vendored Normal file
View File

@ -0,0 +1,20 @@
The MIT License (MIT)
Copyright (c) 2013-2016 Errplane Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

View File

@ -0,0 +1,25 @@
# List
- bootstrap 3.3.5 [MIT LICENSE](https://github.com/twbs/bootstrap/blob/master/LICENSE)
- collectd.org [ISC LICENSE](https://github.com/collectd/go-collectd/blob/master/LICENSE)
- github.com/BurntSushi/toml [WTFPL LICENSE](https://github.com/BurntSushi/toml/blob/master/COPYING)
- github.com/bmizerany/pat [MIT LICENSE](https://github.com/bmizerany/pat#license)
- github.com/boltdb/bolt [MIT LICENSE](https://github.com/boltdb/bolt/blob/master/LICENSE)
- github.com/cespare/xxhash [MIT LICENSE](https://github.com/cespare/xxhash/blob/master/LICENSE.txt)
- github.com/clarkduvall/hyperloglog [MIT LICENSE](https://github.com/clarkduvall/hyperloglog/blob/master/LICENSE)
- github.com/davecgh/go-spew/spew [ISC LICENSE](https://github.com/davecgh/go-spew/blob/master/LICENSE)
- github.com/dgrijalva/jwt-go [MIT LICENSE](https://github.com/dgrijalva/jwt-go/blob/master/LICENSE)
- github.com/dgryski/go-bits [MIT LICENSE](https://github.com/dgryski/go-bits/blob/master/LICENSE)
- github.com/dgryski/go-bitstream [MIT LICENSE](https://github.com/dgryski/go-bitstream/blob/master/LICENSE)
- github.com/gogo/protobuf/proto [BSD LICENSE](https://github.com/gogo/protobuf/blob/master/LICENSE)
- github.com/golang/snappy [BSD LICENSE](https://github.com/golang/snappy/blob/master/LICENSE)
- github.com/google/go-cmp [BSD LICENSE](https://github.com/google/go-cmp/blob/master/LICENSE)
- github.com/influxdata/usage-client [MIT LICENSE](https://github.com/influxdata/usage-client/blob/master/LICENSE.txt)
- github.com/jwilder/encoding [MIT LICENSE](https://github.com/jwilder/encoding/blob/master/LICENSE)
- github.com/paulbellamy/ratecounter [MIT LICENSE](https://github.com/paulbellamy/ratecounter/blob/master/LICENSE)
- github.com/peterh/liner [MIT LICENSE](https://github.com/peterh/liner/blob/master/COPYING)
- github.com/rakyll/statik [APACHE LICENSE](https://github.com/rakyll/statik/blob/master/LICENSE)
- github.com/retailnext/hllpp [BSD LICENSE](https://github.com/retailnext/hllpp/blob/master/LICENSE)
- github.com/uber-go/atomic [MIT LICENSE](https://github.com/uber-go/atomic/blob/master/LICENSE.txt)
- github.com/uber-go/zap [MIT LICENSE](https://github.com/uber-go/zap/blob/master/LICENSE.txt)
- golang.org/x/crypto [BSD LICENSE](https://github.com/golang/crypto/blob/master/LICENSE)
- jquery 2.1.4 [MIT LICENSE](https://github.com/jquery/jquery/blob/master/LICENSE.txt)

View File

@ -0,0 +1,609 @@
// Package client (v2) is the current official Go client for InfluxDB.
package client // import "github.com/influxdata/influxdb/client/v2"
import (
"bytes"
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/influxdata/influxdb/models"
)
// HTTPConfig is the config data needed to create an HTTP Client.
type HTTPConfig struct {
// Addr should be of the form "http://host:port"
// or "http://[ipv6-host%zone]:port".
Addr string
// Username is the influxdb username, optional.
Username string
// Password is the influxdb password, optional.
Password string
// UserAgent is the http User Agent, defaults to "InfluxDBClient".
UserAgent string
// Timeout for influxdb writes, defaults to no timeout.
Timeout time.Duration
// InsecureSkipVerify gets passed to the http client, if true, it will
// skip https certificate verification. Defaults to false.
InsecureSkipVerify bool
// TLSConfig allows the user to set their own TLS config for the HTTP
// Client. If set, this option overrides InsecureSkipVerify.
TLSConfig *tls.Config
}
// BatchPointsConfig is the config data needed to create an instance of the BatchPoints struct.
type BatchPointsConfig struct {
// Precision is the write precision of the points, defaults to "ns".
Precision string
// Database is the database to write points to.
Database string
// RetentionPolicy is the retention policy of the points.
RetentionPolicy string
// Write consistency is the number of servers required to confirm write.
WriteConsistency string
}
// Client is a client interface for writing & querying the database.
type Client interface {
// Ping checks that status of cluster, and will always return 0 time and no
// error for UDP clients.
Ping(timeout time.Duration) (time.Duration, string, error)
// Write takes a BatchPoints object and writes all Points to InfluxDB.
Write(bp BatchPoints) error
// Query makes an InfluxDB Query on the database. This will fail if using
// the UDP client.
Query(q Query) (*Response, error)
// Close releases any resources a Client may be using.
Close() error
}
// NewHTTPClient returns a new Client from the provided config.
// Client is safe for concurrent use by multiple goroutines.
func NewHTTPClient(conf HTTPConfig) (Client, error) {
if conf.UserAgent == "" {
conf.UserAgent = "InfluxDBClient"
}
u, err := url.Parse(conf.Addr)
if err != nil {
return nil, err
} else if u.Scheme != "http" && u.Scheme != "https" {
m := fmt.Sprintf("Unsupported protocol scheme: %s, your address"+
" must start with http:// or https://", u.Scheme)
return nil, errors.New(m)
}
tr := &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
}
return &client{
url: *u,
username: conf.Username,
password: conf.Password,
useragent: conf.UserAgent,
httpClient: &http.Client{
Timeout: conf.Timeout,
Transport: tr,
},
transport: tr,
}, nil
}
// Ping will check to see if the server is up with an optional timeout on waiting for leader.
// Ping returns how long the request took, the version of the server it connected to, and an error if one occurred.
func (c *client) Ping(timeout time.Duration) (time.Duration, string, error) {
now := time.Now()
u := c.url
u.Path = "ping"
req, err := http.NewRequest("GET", u.String(), nil)
if err != nil {
return 0, "", err
}
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
if timeout > 0 {
params := req.URL.Query()
params.Set("wait_for_leader", fmt.Sprintf("%.0fs", timeout.Seconds()))
req.URL.RawQuery = params.Encode()
}
resp, err := c.httpClient.Do(req)
if err != nil {
return 0, "", err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return 0, "", err
}
if resp.StatusCode != http.StatusNoContent {
var err = fmt.Errorf(string(body))
return 0, "", err
}
version := resp.Header.Get("X-Influxdb-Version")
return time.Since(now), version, nil
}
// Close releases the client's resources.
func (c *client) Close() error {
c.transport.CloseIdleConnections()
return nil
}
// client is safe for concurrent use as the fields are all read-only
// once the client is instantiated.
type client struct {
// N.B - if url.UserInfo is accessed in future modifications to the
// methods on client, you will need to syncronise access to url.
url url.URL
username string
password string
useragent string
httpClient *http.Client
transport *http.Transport
}
// BatchPoints is an interface into a batched grouping of points to write into
// InfluxDB together. BatchPoints is NOT thread-safe, you must create a separate
// batch for each goroutine.
type BatchPoints interface {
// AddPoint adds the given point to the Batch of points.
AddPoint(p *Point)
// AddPoints adds the given points to the Batch of points.
AddPoints(ps []*Point)
// Points lists the points in the Batch.
Points() []*Point
// Precision returns the currently set precision of this Batch.
Precision() string
// SetPrecision sets the precision of this batch.
SetPrecision(s string) error
// Database returns the currently set database of this Batch.
Database() string
// SetDatabase sets the database of this Batch.
SetDatabase(s string)
// WriteConsistency returns the currently set write consistency of this Batch.
WriteConsistency() string
// SetWriteConsistency sets the write consistency of this Batch.
SetWriteConsistency(s string)
// RetentionPolicy returns the currently set retention policy of this Batch.
RetentionPolicy() string
// SetRetentionPolicy sets the retention policy of this Batch.
SetRetentionPolicy(s string)
}
// NewBatchPoints returns a BatchPoints interface based on the given config.
func NewBatchPoints(conf BatchPointsConfig) (BatchPoints, error) {
if conf.Precision == "" {
conf.Precision = "ns"
}
if _, err := time.ParseDuration("1" + conf.Precision); err != nil {
return nil, err
}
bp := &batchpoints{
database: conf.Database,
precision: conf.Precision,
retentionPolicy: conf.RetentionPolicy,
writeConsistency: conf.WriteConsistency,
}
return bp, nil
}
type batchpoints struct {
points []*Point
database string
precision string
retentionPolicy string
writeConsistency string
}
func (bp *batchpoints) AddPoint(p *Point) {
bp.points = append(bp.points, p)
}
func (bp *batchpoints) AddPoints(ps []*Point) {
bp.points = append(bp.points, ps...)
}
func (bp *batchpoints) Points() []*Point {
return bp.points
}
func (bp *batchpoints) Precision() string {
return bp.precision
}
func (bp *batchpoints) Database() string {
return bp.database
}
func (bp *batchpoints) WriteConsistency() string {
return bp.writeConsistency
}
func (bp *batchpoints) RetentionPolicy() string {
return bp.retentionPolicy
}
func (bp *batchpoints) SetPrecision(p string) error {
if _, err := time.ParseDuration("1" + p); err != nil {
return err
}
bp.precision = p
return nil
}
func (bp *batchpoints) SetDatabase(db string) {
bp.database = db
}
func (bp *batchpoints) SetWriteConsistency(wc string) {
bp.writeConsistency = wc
}
func (bp *batchpoints) SetRetentionPolicy(rp string) {
bp.retentionPolicy = rp
}
// Point represents a single data point.
type Point struct {
pt models.Point
}
// NewPoint returns a point with the given timestamp. If a timestamp is not
// given, then data is sent to the database without a timestamp, in which case
// the server will assign local time upon reception. NOTE: it is recommended to
// send data with a timestamp.
func NewPoint(
name string,
tags map[string]string,
fields map[string]interface{},
t ...time.Time,
) (*Point, error) {
var T time.Time
if len(t) > 0 {
T = t[0]
}
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
if err != nil {
return nil, err
}
return &Point{
pt: pt,
}, nil
}
// String returns a line-protocol string of the Point.
func (p *Point) String() string {
return p.pt.String()
}
// PrecisionString returns a line-protocol string of the Point,
// with the timestamp formatted for the given precision.
func (p *Point) PrecisionString(precison string) string {
return p.pt.PrecisionString(precison)
}
// Name returns the measurement name of the point.
func (p *Point) Name() string {
return string(p.pt.Name())
}
// Tags returns the tags associated with the point.
func (p *Point) Tags() map[string]string {
return p.pt.Tags().Map()
}
// Time return the timestamp for the point.
func (p *Point) Time() time.Time {
return p.pt.Time()
}
// UnixNano returns timestamp of the point in nanoseconds since Unix epoch.
func (p *Point) UnixNano() int64 {
return p.pt.UnixNano()
}
// Fields returns the fields for the point.
func (p *Point) Fields() (map[string]interface{}, error) {
return p.pt.Fields()
}
// NewPointFrom returns a point from the provided models.Point.
func NewPointFrom(pt models.Point) *Point {
return &Point{pt: pt}
}
func (c *client) Write(bp BatchPoints) error {
var b bytes.Buffer
for _, p := range bp.Points() {
if _, err := b.WriteString(p.pt.PrecisionString(bp.Precision())); err != nil {
return err
}
if err := b.WriteByte('\n'); err != nil {
return err
}
}
u := c.url
u.Path = "write"
req, err := http.NewRequest("POST", u.String(), &b)
if err != nil {
return err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("db", bp.Database())
params.Set("rp", bp.RetentionPolicy())
params.Set("precision", bp.Precision())
params.Set("consistency", bp.WriteConsistency())
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != http.StatusNoContent && resp.StatusCode != http.StatusOK {
var err = fmt.Errorf(string(body))
return err
}
return nil
}
// Query defines a query to send to the server.
type Query struct {
Command string
Database string
Precision string
Chunked bool
ChunkSize int
Parameters map[string]interface{}
}
// NewQuery returns a query object.
// The database and precision arguments can be empty strings if they are not needed for the query.
func NewQuery(command, database, precision string) Query {
return Query{
Command: command,
Database: database,
Precision: precision,
Parameters: make(map[string]interface{}),
}
}
// NewQueryWithParameters returns a query object.
// The database and precision arguments can be empty strings if they are not needed for the query.
// parameters is a map of the parameter names used in the command to their values.
func NewQueryWithParameters(command, database, precision string, parameters map[string]interface{}) Query {
return Query{
Command: command,
Database: database,
Precision: precision,
Parameters: parameters,
}
}
// Response represents a list of statement results.
type Response struct {
Results []Result
Err string `json:"error,omitempty"`
}
// Error returns the first error from any statement.
// It returns nil if no errors occurred on any statements.
func (r *Response) Error() error {
if r.Err != "" {
return fmt.Errorf(r.Err)
}
for _, result := range r.Results {
if result.Err != "" {
return fmt.Errorf(result.Err)
}
}
return nil
}
// Message represents a user message.
type Message struct {
Level string
Text string
}
// Result represents a resultset returned from a single statement.
type Result struct {
Series []models.Row
Messages []*Message
Err string `json:"error,omitempty"`
}
// Query sends a command to the server and returns the Response.
func (c *client) Query(q Query) (*Response, error) {
u := c.url
u.Path = "query"
jsonParameters, err := json.Marshal(q.Parameters)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", u.String(), nil)
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "")
req.Header.Set("User-Agent", c.useragent)
if c.username != "" {
req.SetBasicAuth(c.username, c.password)
}
params := req.URL.Query()
params.Set("q", q.Command)
params.Set("db", q.Database)
params.Set("params", string(jsonParameters))
if q.Chunked {
params.Set("chunked", "true")
if q.ChunkSize > 0 {
params.Set("chunk_size", strconv.Itoa(q.ChunkSize))
}
}
if q.Precision != "" {
params.Set("epoch", q.Precision)
}
req.URL.RawQuery = params.Encode()
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var response Response
if q.Chunked {
cr := NewChunkedResponse(resp.Body)
for {
r, err := cr.NextResponse()
if err != nil {
// If we got an error while decoding the response, send that back.
return nil, err
}
if r == nil {
break
}
response.Results = append(response.Results, r.Results...)
if r.Err != "" {
response.Err = r.Err
break
}
}
} else {
dec := json.NewDecoder(resp.Body)
dec.UseNumber()
decErr := dec.Decode(&response)
// ignore this error if we got an invalid status code
if decErr != nil && decErr.Error() == "EOF" && resp.StatusCode != http.StatusOK {
decErr = nil
}
// If we got a valid decode error, send that back
if decErr != nil {
return nil, fmt.Errorf("unable to decode json: received status code %d err: %s", resp.StatusCode, decErr)
}
}
// If we don't have an error in our json response, and didn't get statusOK
// then send back an error
if resp.StatusCode != http.StatusOK && response.Error() == nil {
return &response, fmt.Errorf("received status code %d from server",
resp.StatusCode)
}
return &response, nil
}
// duplexReader reads responses and writes it to another writer while
// satisfying the reader interface.
type duplexReader struct {
r io.Reader
w io.Writer
}
func (r *duplexReader) Read(p []byte) (n int, err error) {
n, err = r.r.Read(p)
if err == nil {
r.w.Write(p[:n])
}
return n, err
}
// ChunkedResponse represents a response from the server that
// uses chunking to stream the output.
type ChunkedResponse struct {
dec *json.Decoder
duplex *duplexReader
buf bytes.Buffer
}
// NewChunkedResponse reads a stream and produces responses from the stream.
func NewChunkedResponse(r io.Reader) *ChunkedResponse {
resp := &ChunkedResponse{}
resp.duplex = &duplexReader{r: r, w: &resp.buf}
resp.dec = json.NewDecoder(resp.duplex)
resp.dec.UseNumber()
return resp
}
// NextResponse reads the next line of the stream and returns a response.
func (r *ChunkedResponse) NextResponse() (*Response, error) {
var response Response
if err := r.dec.Decode(&response); err != nil {
if err == io.EOF {
return nil, nil
}
// A decoding error happened. This probably means the server crashed
// and sent a last-ditch error message to us. Ensure we have read the
// entirety of the connection to get any remaining error text.
io.Copy(ioutil.Discard, r.duplex)
return nil, errors.New(strings.TrimSpace(r.buf.String()))
}
r.buf.Reset()
return &response, nil
}

112
vendor/github.com/influxdata/influxdb/client/v2/udp.go generated vendored Normal file
View File

@ -0,0 +1,112 @@
package client
import (
"fmt"
"io"
"net"
"time"
)
const (
// UDPPayloadSize is a reasonable default payload size for UDP packets that
// could be travelling over the internet.
UDPPayloadSize = 512
)
// UDPConfig is the config data needed to create a UDP Client.
type UDPConfig struct {
// Addr should be of the form "host:port"
// or "[ipv6-host%zone]:port".
Addr string
// PayloadSize is the maximum size of a UDP client message, optional
// Tune this based on your network. Defaults to UDPPayloadSize.
PayloadSize int
}
// NewUDPClient returns a client interface for writing to an InfluxDB UDP
// service from the given config.
func NewUDPClient(conf UDPConfig) (Client, error) {
var udpAddr *net.UDPAddr
udpAddr, err := net.ResolveUDPAddr("udp", conf.Addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
payloadSize := conf.PayloadSize
if payloadSize == 0 {
payloadSize = UDPPayloadSize
}
return &udpclient{
conn: conn,
payloadSize: payloadSize,
}, nil
}
// Close releases the udpclient's resources.
func (uc *udpclient) Close() error {
return uc.conn.Close()
}
type udpclient struct {
conn io.WriteCloser
payloadSize int
}
func (uc *udpclient) Write(bp BatchPoints) error {
var b = make([]byte, 0, uc.payloadSize) // initial buffer size, it will grow as needed
var d, _ = time.ParseDuration("1" + bp.Precision())
var delayedError error
var checkBuffer = func(n int) {
if len(b) > 0 && len(b)+n > uc.payloadSize {
if _, err := uc.conn.Write(b); err != nil {
delayedError = err
}
b = b[:0]
}
}
for _, p := range bp.Points() {
p.pt.Round(d)
pointSize := p.pt.StringSize() + 1 // include newline in size
//point := p.pt.RoundedString(d) + "\n"
checkBuffer(pointSize)
if p.Time().IsZero() || pointSize <= uc.payloadSize {
b = p.pt.AppendString(b)
b = append(b, '\n')
continue
}
points := p.pt.Split(uc.payloadSize - 1) // account for newline character
for _, sp := range points {
checkBuffer(sp.StringSize() + 1)
b = sp.AppendString(b)
b = append(b, '\n')
}
}
if len(b) > 0 {
if _, err := uc.conn.Write(b); err != nil {
return err
}
}
return delayedError
}
func (uc *udpclient) Query(q Query) (*Response, error) {
return nil, fmt.Errorf("Querying via UDP is not supported")
}
func (uc *udpclient) Ping(timeout time.Duration) (time.Duration, string, error) {
return 0, "", nil
}

42
vendor/github.com/influxdata/influxdb/errors.go generated vendored Normal file
View File

@ -0,0 +1,42 @@
package influxdb
import (
"errors"
"fmt"
"strings"
)
// ErrFieldTypeConflict is returned when a new field already exists with a
// different type.
var ErrFieldTypeConflict = errors.New("field type conflict")
// ErrDatabaseNotFound indicates that a database operation failed on the
// specified database because the specified database does not exist.
func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) }
// ErrRetentionPolicyNotFound indicates that the named retention policy could
// not be found in the database.
func ErrRetentionPolicyNotFound(name string) error {
return fmt.Errorf("retention policy not found: %s", name)
}
// IsAuthorizationError indicates whether an error is due to an authorization failure
func IsAuthorizationError(err error) bool {
e, ok := err.(interface {
AuthorizationFailed() bool
})
return ok && e.AuthorizationFailed()
}
// IsClientError indicates whether an error is a known client error.
func IsClientError(err error) bool {
if err == nil {
return false
}
if strings.HasPrefix(err.Error(), ErrFieldTypeConflict.Error()) {
return true
}
return false
}

6
vendor/github.com/influxdata/influxdb/influxdb.go generated vendored Normal file
View File

@ -0,0 +1,6 @@
// Package influxdb is the root package of InfluxDB,
// the scalable datastore for metrics, events, and real-time analytics.
//
// If you're looking for the Go HTTP client for InfluxDB,
// see package github.com/influxdata/influxdb/client/v2.
package influxdb // import "github.com/influxdata/influxdb"

View File

@ -0,0 +1,48 @@
package models
import (
"errors"
"strings"
)
// ConsistencyLevel represent a required replication criteria before a write can
// be returned as successful.
//
// The consistency level is handled in open-source InfluxDB but only applicable to clusters.
type ConsistencyLevel int
const (
// ConsistencyLevelAny allows for hinted handoff, potentially no write happened yet.
ConsistencyLevelAny ConsistencyLevel = iota
// ConsistencyLevelOne requires at least one data node acknowledged a write.
ConsistencyLevelOne
// ConsistencyLevelQuorum requires a quorum of data nodes to acknowledge a write.
ConsistencyLevelQuorum
// ConsistencyLevelAll requires all data nodes to acknowledge a write.
ConsistencyLevelAll
)
var (
// ErrInvalidConsistencyLevel is returned when parsing the string version
// of a consistency level.
ErrInvalidConsistencyLevel = errors.New("invalid consistency level")
)
// ParseConsistencyLevel converts a consistency level string to the corresponding ConsistencyLevel const.
func ParseConsistencyLevel(level string) (ConsistencyLevel, error) {
switch strings.ToLower(level) {
case "any":
return ConsistencyLevelAny, nil
case "one":
return ConsistencyLevelOne, nil
case "quorum":
return ConsistencyLevelQuorum, nil
case "all":
return ConsistencyLevelAll, nil
default:
return 0, ErrInvalidConsistencyLevel
}
}

View File

@ -0,0 +1,32 @@
package models // import "github.com/influxdata/influxdb/models"
// from stdlib hash/fnv/fnv.go
const (
prime64 = 1099511628211
offset64 = 14695981039346656037
)
// InlineFNV64a is an alloc-free port of the standard library's fnv64a.
// See https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function.
type InlineFNV64a uint64
// NewInlineFNV64a returns a new instance of InlineFNV64a.
func NewInlineFNV64a() InlineFNV64a {
return offset64
}
// Write adds data to the running hash.
func (s *InlineFNV64a) Write(data []byte) (int, error) {
hash := uint64(*s)
for _, c := range data {
hash ^= uint64(c)
hash *= prime64
}
*s = InlineFNV64a(hash)
return len(data), nil
}
// Sum64 returns the uint64 of the current resulting hash.
func (s *InlineFNV64a) Sum64() uint64 {
return uint64(*s)
}

View File

@ -0,0 +1,38 @@
package models // import "github.com/influxdata/influxdb/models"
import (
"reflect"
"strconv"
"unsafe"
)
// parseIntBytes is a zero-alloc wrapper around strconv.ParseInt.
func parseIntBytes(b []byte, base int, bitSize int) (i int64, err error) {
s := unsafeBytesToString(b)
return strconv.ParseInt(s, base, bitSize)
}
// parseFloatBytes is a zero-alloc wrapper around strconv.ParseFloat.
func parseFloatBytes(b []byte, bitSize int) (float64, error) {
s := unsafeBytesToString(b)
return strconv.ParseFloat(s, bitSize)
}
// parseBoolBytes is a zero-alloc wrapper around strconv.ParseBool.
func parseBoolBytes(b []byte) (bool, error) {
return strconv.ParseBool(unsafeBytesToString(b))
}
// unsafeBytesToString converts a []byte to a string without a heap allocation.
//
// It is unsafe, and is intended to prepare input to short-lived functions
// that require strings.
func unsafeBytesToString(in []byte) string {
src := *(*reflect.SliceHeader)(unsafe.Pointer(&in))
dst := reflect.StringHeader{
Data: src.Data,
Len: src.Len,
}
s := *(*string)(unsafe.Pointer(&dst))
return s
}

2231
vendor/github.com/influxdata/influxdb/models/points.go generated vendored Normal file

File diff suppressed because it is too large Load Diff

62
vendor/github.com/influxdata/influxdb/models/rows.go generated vendored Normal file
View File

@ -0,0 +1,62 @@
package models
import (
"sort"
)
// Row represents a single row returned from the execution of a statement.
type Row struct {
Name string `json:"name,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Columns []string `json:"columns,omitempty"`
Values [][]interface{} `json:"values,omitempty"`
Partial bool `json:"partial,omitempty"`
}
// SameSeries returns true if r contains values for the same series as o.
func (r *Row) SameSeries(o *Row) bool {
return r.tagsHash() == o.tagsHash() && r.Name == o.Name
}
// tagsHash returns a hash of tag key/value pairs.
func (r *Row) tagsHash() uint64 {
h := NewInlineFNV64a()
keys := r.tagsKeys()
for _, k := range keys {
h.Write([]byte(k))
h.Write([]byte(r.Tags[k]))
}
return h.Sum64()
}
// tagKeys returns a sorted list of tag keys.
func (r *Row) tagsKeys() []string {
a := make([]string, 0, len(r.Tags))
for k := range r.Tags {
a = append(a, k)
}
sort.Strings(a)
return a
}
// Rows represents a collection of rows. Rows implements sort.Interface.
type Rows []*Row
// Len implements sort.Interface.
func (p Rows) Len() int { return len(p) }
// Less implements sort.Interface.
func (p Rows) Less(i, j int) bool {
// Sort by name first.
if p[i].Name != p[j].Name {
return p[i].Name < p[j].Name
}
// Sort by tag set hash. Tags don't have a meaningful sort order so we
// just compute a hash and sort by that instead. This allows the tests
// to receive rows in a predictable order every time.
return p[i].tagsHash() < p[j].tagsHash()
}
// Swap implements sort.Interface.
func (p Rows) Swap(i, j int) { p[i], p[j] = p[j], p[i] }

View File

@ -0,0 +1,42 @@
package models
// Statistic is the representation of a statistic used by the monitoring service.
type Statistic struct {
Name string `json:"name"`
Tags map[string]string `json:"tags"`
Values map[string]interface{} `json:"values"`
}
// NewStatistic returns an initialized Statistic.
func NewStatistic(name string) Statistic {
return Statistic{
Name: name,
Tags: make(map[string]string),
Values: make(map[string]interface{}),
}
}
// StatisticTags is a map that can be merged with others without causing
// mutations to either map.
type StatisticTags map[string]string
// Merge creates a new map containing the merged contents of tags and t.
// If both tags and the receiver map contain the same key, the value in tags
// is used in the resulting map.
//
// Merge always returns a usable map.
func (t StatisticTags) Merge(tags map[string]string) map[string]string {
// Add everything in tags to the result.
out := make(map[string]string, len(tags))
for k, v := range tags {
out[k] = v
}
// Only add values from t that don't appear in tags.
for k, v := range t {
if _, ok := tags[k]; !ok {
out[k] = v
}
}
return out
}

74
vendor/github.com/influxdata/influxdb/models/time.go generated vendored Normal file
View File

@ -0,0 +1,74 @@
package models
// Helper time methods since parsing time can easily overflow and we only support a
// specific time range.
import (
"fmt"
"math"
"time"
)
const (
// MinNanoTime is the minumum time that can be represented.
//
// 1677-09-21 00:12:43.145224194 +0000 UTC
//
// The two lowest minimum integers are used as sentinel values. The
// minimum value needs to be used as a value lower than any other value for
// comparisons and another separate value is needed to act as a sentinel
// default value that is unusable by the user, but usable internally.
// Because these two values need to be used for a special purpose, we do
// not allow users to write points at these two times.
MinNanoTime = int64(math.MinInt64) + 2
// MaxNanoTime is the maximum time that can be represented.
//
// 2262-04-11 23:47:16.854775806 +0000 UTC
//
// The highest time represented by a nanosecond needs to be used for an
// exclusive range in the shard group, so the maximum time needs to be one
// less than the possible maximum number of nanoseconds representable by an
// int64 so that we don't lose a point at that one time.
MaxNanoTime = int64(math.MaxInt64) - 1
)
var (
minNanoTime = time.Unix(0, MinNanoTime).UTC()
maxNanoTime = time.Unix(0, MaxNanoTime).UTC()
// ErrTimeOutOfRange gets returned when time is out of the representable range using int64 nanoseconds since the epoch.
ErrTimeOutOfRange = fmt.Errorf("time outside range %d - %d", MinNanoTime, MaxNanoTime)
)
// SafeCalcTime safely calculates the time given. Will return error if the time is outside the
// supported range.
func SafeCalcTime(timestamp int64, precision string) (time.Time, error) {
mult := GetPrecisionMultiplier(precision)
if t, ok := safeSignedMult(timestamp, mult); ok {
tme := time.Unix(0, t).UTC()
return tme, CheckTime(tme)
}
return time.Time{}, ErrTimeOutOfRange
}
// CheckTime checks that a time is within the safe range.
func CheckTime(t time.Time) error {
if t.Before(minNanoTime) || t.After(maxNanoTime) {
return ErrTimeOutOfRange
}
return nil
}
// Perform the multiplication and check to make sure it didn't overflow.
func safeSignedMult(a, b int64) (int64, bool) {
if a == 0 || b == 0 || a == 1 || b == 1 {
return a * b, true
}
if a == MinNanoTime || b == MaxNanoTime {
return 0, false
}
c := a * b
return c, c/b == a
}

121
vendor/github.com/influxdata/influxdb/node.go generated vendored Normal file
View File

@ -0,0 +1,121 @@
package influxdb
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strconv"
)
const (
nodeFile = "node.json"
oldNodeFile = "id"
peersFilename = "peers.json"
)
type Node struct {
path string
ID uint64
}
// LoadNode will load the node information from disk if present
func LoadNode(path string) (*Node, error) {
// Always check to see if we are upgrading first
if err := upgradeNodeFile(path); err != nil {
return nil, err
}
n := &Node{
path: path,
}
f, err := os.Open(filepath.Join(path, nodeFile))
if err != nil {
return nil, err
}
defer f.Close()
if err := json.NewDecoder(f).Decode(n); err != nil {
return nil, err
}
return n, nil
}
// NewNode will return a new node
func NewNode(path string) *Node {
return &Node{
path: path,
}
}
// Save will save the node file to disk and replace the existing one if present
func (n *Node) Save() error {
file := filepath.Join(n.path, nodeFile)
tmpFile := file + "tmp"
f, err := os.Create(tmpFile)
if err != nil {
return err
}
if err = json.NewEncoder(f).Encode(n); err != nil {
f.Close()
return err
}
if err = f.Close(); nil != err {
return err
}
return os.Rename(tmpFile, file)
}
func upgradeNodeFile(path string) error {
oldFile := filepath.Join(path, oldNodeFile)
b, err := ioutil.ReadFile(oldFile)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
// We shouldn't have an empty ID file, but if we do, ignore it
if len(b) == 0 {
return nil
}
peers := []string{}
pb, err := ioutil.ReadFile(filepath.Join(path, peersFilename))
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
err = json.Unmarshal(pb, &peers)
if err != nil {
return err
}
if len(peers) > 1 {
return fmt.Errorf("to upgrade a cluster, please contact support at influxdata")
}
n := &Node{
path: path,
}
if n.ID, err = strconv.ParseUint(string(b), 10, 64); err != nil {
return err
}
if err := n.Save(); err != nil {
return err
}
if err := os.Remove(oldFile); err != nil {
return err
}
return nil
}

View File

@ -0,0 +1,111 @@
// Package escape contains utilities for escaping parts of InfluxQL
// and InfluxDB line protocol.
package escape // import "github.com/influxdata/influxdb/pkg/escape"
import (
"bytes"
"strings"
)
// Codes is a map of bytes to be escaped.
var Codes = map[byte][]byte{
',': []byte(`\,`),
'"': []byte(`\"`),
' ': []byte(`\ `),
'=': []byte(`\=`),
}
// Bytes escapes characters on the input slice, as defined by Codes.
func Bytes(in []byte) []byte {
for b, esc := range Codes {
in = bytes.Replace(in, []byte{b}, esc, -1)
}
return in
}
const escapeChars = `," =`
// IsEscaped returns whether b has any escaped characters,
// i.e. whether b seems to have been processed by Bytes.
func IsEscaped(b []byte) bool {
for len(b) > 0 {
i := bytes.IndexByte(b, '\\')
if i < 0 {
return false
}
if i+1 < len(b) && strings.IndexByte(escapeChars, b[i+1]) >= 0 {
return true
}
b = b[i+1:]
}
return false
}
// AppendUnescaped appends the unescaped version of src to dst
// and returns the resulting slice.
func AppendUnescaped(dst, src []byte) []byte {
var pos int
for len(src) > 0 {
next := bytes.IndexByte(src[pos:], '\\')
if next < 0 || pos+next+1 >= len(src) {
return append(dst, src...)
}
if pos+next+1 < len(src) && strings.IndexByte(escapeChars, src[pos+next+1]) >= 0 {
if pos+next > 0 {
dst = append(dst, src[:pos+next]...)
}
src = src[pos+next+1:]
pos = 0
} else {
pos += next + 1
}
}
return dst
}
// Unescape returns a new slice containing the unescaped version of in.
func Unescape(in []byte) []byte {
if len(in) == 0 {
return nil
}
if bytes.IndexByte(in, '\\') == -1 {
return in
}
i := 0
inLen := len(in)
var out []byte
for {
if i >= inLen {
break
}
if in[i] == '\\' && i+1 < inLen {
switch in[i+1] {
case ',':
out = append(out, ',')
i += 2
continue
case '"':
out = append(out, '"')
i += 2
continue
case ' ':
out = append(out, ' ')
i += 2
continue
case '=':
out = append(out, '=')
i += 2
continue
}
}
out = append(out, in[i])
i += 1
}
return out
}

View File

@ -0,0 +1,21 @@
package escape
import "strings"
var (
escaper = strings.NewReplacer(`,`, `\,`, `"`, `\"`, ` `, `\ `, `=`, `\=`)
unescaper = strings.NewReplacer(`\,`, `,`, `\"`, `"`, `\ `, ` `, `\=`, `=`)
)
// UnescapeString returns unescaped version of in.
func UnescapeString(in string) string {
if strings.IndexByte(in, '\\') == -1 {
return in
}
return unescaper.Replace(in)
}
// String returns the escaped version of in.
func String(in string) string {
return escaper.Replace(in)
}

View File

@ -44,26 +44,21 @@ func getMessage(t *testing.T, body fn) string {
start(t)
defer stop(t)
result := make(chan string)
body()
go func() {
message := make([]byte, 1024*32)
var bufLen int
for {
listener.SetReadDeadline(time.Now().Add(Timeout))
n, _, _ := listener.ReadFrom(message[bufLen:])
if n == 0 {
result <- string(message[0:bufLen])
break
} else {
bufLen += n
}
}
}()
body()
return <-result
return string(message[0:bufLen])
}
func get(t *testing.T, match string, body fn) (got string, equals bool, contains bool) {
@ -190,3 +185,7 @@ func ShouldReceiveAllAndNotReceiveAny(t *testing.T, expected []string, unexpecte
t.Errorf("but got: %#v", got)
}
}
func ReceiveString(t *testing.T, body fn) string {
return getMessage(t, body)
}