From 92fcf375b0d6e4d7420c084ef2f328366c227745 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Tue, 19 Feb 2019 15:49:57 +0000 Subject: [PATCH] Update vendored TSDB version. Signed-off-by: Tom Wilkie --- go.mod | 2 +- go.sum | 4 +- .../client_golang/prometheus/promauto/auto.go | 223 +++++++++ vendor/github.com/prometheus/tsdb/.travis.yml | 3 +- .../github.com/prometheus/tsdb/CHANGELOG.md | 15 +- .../prometheus/tsdb/Makefile.common | 65 +-- vendor/github.com/prometheus/tsdb/block.go | 32 +- .../prometheus/tsdb/chunkenc/bstream.go | 10 - .../prometheus/tsdb/chunkenc/chunk.go | 4 +- .../prometheus/tsdb/chunkenc/xor.go | 6 +- .../prometheus/tsdb/chunks/chunks.go | 109 ++++- vendor/github.com/prometheus/tsdb/compact.go | 196 ++++++-- vendor/github.com/prometheus/tsdb/db.go | 86 ++-- .../prometheus/tsdb/encoding_helpers.go | 44 +- .../prometheus/tsdb/fileutil/fileutil.go | 3 +- vendor/github.com/prometheus/tsdb/head.go | 129 +++-- .../prometheus/tsdb/index/encoding_helpers.go | 36 +- .../github.com/prometheus/tsdb/index/index.go | 4 +- .../prometheus/tsdb/index/postings.go | 5 +- vendor/github.com/prometheus/tsdb/querier.go | 191 +++++++- vendor/github.com/prometheus/tsdb/repair.go | 2 +- .../prometheus/tsdb/staticcheck.conf | 2 - .../prometheus/tsdb/wal/live_reader.go | 284 +++++++++++ .../github.com/prometheus/tsdb/wal/reader.go | 183 ++++++++ vendor/github.com/prometheus/tsdb/wal/wal.go | 443 ++---------------- vendor/modules.txt | 3 +- 26 files changed, 1418 insertions(+), 666 deletions(-) create mode 100644 vendor/github.com/prometheus/client_golang/prometheus/promauto/auto.go delete mode 100644 vendor/github.com/prometheus/tsdb/staticcheck.conf create mode 100644 vendor/github.com/prometheus/tsdb/wal/live_reader.go create mode 100644 vendor/github.com/prometheus/tsdb/wal/reader.go diff --git a/go.mod b/go.mod index 04c036a62..db21d92d7 100644 --- a/go.mod +++ b/go.mod @@ -90,7 +90,7 @@ require ( github.com/prometheus/client_golang v0.9.1 github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea - github.com/prometheus/tsdb v0.4.0 + github.com/prometheus/tsdb v0.4.1-0.20190219143357-77d5a7d47a52 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a // indirect github.com/rlmcpherson/s3gof3r v0.5.0 // indirect github.com/rubyist/circuitbreaker v2.2.1+incompatible // indirect diff --git a/go.sum b/go.sum index 06dbd4509..ab98fd23c 100644 --- a/go.sum +++ b/go.sum @@ -231,8 +231,8 @@ github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea/go.mod h1:daVV7q github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d h1:GoAlyOgbOEIFdaDqxJVlbOQ1DtGmZWs/Qau0hIlk+WQ= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= -github.com/prometheus/tsdb v0.4.0 h1:pXJyEi/5p6UBmOrnzsZmYxLrZjxnRlEB78/qj3+a8Gk= -github.com/prometheus/tsdb v0.4.0/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/prometheus/tsdb v0.4.1-0.20190219143357-77d5a7d47a52 h1:ULXRH8vXOu1QwA7l7N+zAKS/jfvs7HLCNH77FEdKTTQ= +github.com/prometheus/tsdb v0.4.1-0.20190219143357-77d5a7d47a52/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rlmcpherson/s3gof3r v0.5.0 h1:1izOJpTiohSibfOHuNyEA/yQnAirh05enzEdmhez43k= diff --git a/vendor/github.com/prometheus/client_golang/prometheus/promauto/auto.go b/vendor/github.com/prometheus/client_golang/prometheus/promauto/auto.go new file mode 100644 index 000000000..a00ba1eb8 --- /dev/null +++ b/vendor/github.com/prometheus/client_golang/prometheus/promauto/auto.go @@ -0,0 +1,223 @@ +// Copyright 2018 The Prometheus Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package promauto provides constructors for the usual Prometheus metrics that +// return them already registered with the global registry +// (prometheus.DefaultRegisterer). This allows very compact code, avoiding any +// references to the registry altogether, but all the constructors in this +// package will panic if the registration fails. +// +// The following example is a complete program to create a histogram of normally +// distributed random numbers from the math/rand package: +// +// package main +// +// import ( +// "math/rand" +// "net/http" +// +// "github.com/prometheus/client_golang/prometheus" +// "github.com/prometheus/client_golang/prometheus/promauto" +// "github.com/prometheus/client_golang/prometheus/promhttp" +// ) +// +// var histogram = promauto.NewHistogram(prometheus.HistogramOpts{ +// Name: "random_numbers", +// Help: "A histogram of normally distributed random numbers.", +// Buckets: prometheus.LinearBuckets(-3, .1, 61), +// }) +// +// func Random() { +// for { +// histogram.Observe(rand.NormFloat64()) +// } +// } +// +// func main() { +// go Random() +// http.Handle("/metrics", promhttp.Handler()) +// http.ListenAndServe(":1971", nil) +// } +// +// Prometheus's version of a minimal hello-world program: +// +// package main +// +// import ( +// "fmt" +// "net/http" +// +// "github.com/prometheus/client_golang/prometheus" +// "github.com/prometheus/client_golang/prometheus/promauto" +// "github.com/prometheus/client_golang/prometheus/promhttp" +// ) +// +// func main() { +// http.Handle("/", promhttp.InstrumentHandlerCounter( +// promauto.NewCounterVec( +// prometheus.CounterOpts{ +// Name: "hello_requests_total", +// Help: "Total number of hello-world requests by HTTP code.", +// }, +// []string{"code"}, +// ), +// http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +// fmt.Fprint(w, "Hello, world!") +// }), +// )) +// http.Handle("/metrics", promhttp.Handler()) +// http.ListenAndServe(":1971", nil) +// } +// +// This appears very handy. So why are these constructors locked away in a +// separate package? There are two caveats: +// +// First, in more complex programs, global state is often quite problematic. +// That's the reason why the metrics constructors in the prometheus package do +// not interact with the global prometheus.DefaultRegisterer on their own. You +// are free to use the Register or MustRegister functions to register them with +// the global prometheus.DefaultRegisterer, but you could as well choose a local +// Registerer (usually created with prometheus.NewRegistry, but there are other +// scenarios, e.g. testing). +// +// The second issue is that registration may fail, e.g. if a metric inconsistent +// with the newly to be registered one is already registered. But how to signal +// and handle a panic in the automatic registration with the default registry? +// The only way is panicking. While panicking on invalid input provided by the +// programmer is certainly fine, things are a bit more subtle in this case: You +// might just add another package to the program, and that package (in its init +// function) happens to register a metric with the same name as your code. Now, +// all of a sudden, either your code or the code of the newly imported package +// panics, depending on initialization order, without any opportunity to handle +// the case gracefully. Even worse is a scenario where registration happens +// later during the runtime (e.g. upon loading some kind of plugin), where the +// panic could be triggered long after the code has been deployed to +// production. A possibility to panic should be explicitly called out by the +// Must… idiom, cf. prometheus.MustRegister. But adding a separate set of +// constructors in the prometheus package called MustRegisterNewCounterVec or +// similar would be quite unwieldy. Adding an extra MustRegister method to each +// metric, returning the registered metric, would result in nice code for those +// using the method, but would pollute every single metric interface for +// everybody avoiding the global registry. +// +// To address both issues, the problematic auto-registering and possibly +// panicking constructors are all in this package with a clear warning +// ahead. And whoever cares about avoiding global state and possibly panicking +// function calls can simply ignore the existence of the promauto package +// altogether. +// +// A final note: There is a similar case in the net/http package of the standard +// library. It has DefaultServeMux as a global instance of ServeMux, and the +// Handle function acts on it, panicking if a handler for the same pattern has +// already been registered. However, one might argue that the whole HTTP routing +// is usually set up closely together in the same package or file, while +// Prometheus metrics tend to be spread widely over the codebase, increasing the +// chance of surprising registration failures. Furthermore, the use of global +// state in net/http has been criticized widely, and some avoid it altogether. +package promauto + +import "github.com/prometheus/client_golang/prometheus" + +// NewCounter works like the function of the same name in the prometheus package +// but it automatically registers the Counter with the +// prometheus.DefaultRegisterer. If the registration fails, NewCounter panics. +func NewCounter(opts prometheus.CounterOpts) prometheus.Counter { + c := prometheus.NewCounter(opts) + prometheus.MustRegister(c) + return c +} + +// NewCounterVec works like the function of the same name in the prometheus +// package but it automatically registers the CounterVec with the +// prometheus.DefaultRegisterer. If the registration fails, NewCounterVec +// panics. +func NewCounterVec(opts prometheus.CounterOpts, labelNames []string) *prometheus.CounterVec { + c := prometheus.NewCounterVec(opts, labelNames) + prometheus.MustRegister(c) + return c +} + +// NewCounterFunc works like the function of the same name in the prometheus +// package but it automatically registers the CounterFunc with the +// prometheus.DefaultRegisterer. If the registration fails, NewCounterFunc +// panics. +func NewCounterFunc(opts prometheus.CounterOpts, function func() float64) prometheus.CounterFunc { + g := prometheus.NewCounterFunc(opts, function) + prometheus.MustRegister(g) + return g +} + +// NewGauge works like the function of the same name in the prometheus package +// but it automatically registers the Gauge with the +// prometheus.DefaultRegisterer. If the registration fails, NewGauge panics. +func NewGauge(opts prometheus.GaugeOpts) prometheus.Gauge { + g := prometheus.NewGauge(opts) + prometheus.MustRegister(g) + return g +} + +// NewGaugeVec works like the function of the same name in the prometheus +// package but it automatically registers the GaugeVec with the +// prometheus.DefaultRegisterer. If the registration fails, NewGaugeVec panics. +func NewGaugeVec(opts prometheus.GaugeOpts, labelNames []string) *prometheus.GaugeVec { + g := prometheus.NewGaugeVec(opts, labelNames) + prometheus.MustRegister(g) + return g +} + +// NewGaugeFunc works like the function of the same name in the prometheus +// package but it automatically registers the GaugeFunc with the +// prometheus.DefaultRegisterer. If the registration fails, NewGaugeFunc panics. +func NewGaugeFunc(opts prometheus.GaugeOpts, function func() float64) prometheus.GaugeFunc { + g := prometheus.NewGaugeFunc(opts, function) + prometheus.MustRegister(g) + return g +} + +// NewSummary works like the function of the same name in the prometheus package +// but it automatically registers the Summary with the +// prometheus.DefaultRegisterer. If the registration fails, NewSummary panics. +func NewSummary(opts prometheus.SummaryOpts) prometheus.Summary { + s := prometheus.NewSummary(opts) + prometheus.MustRegister(s) + return s +} + +// NewSummaryVec works like the function of the same name in the prometheus +// package but it automatically registers the SummaryVec with the +// prometheus.DefaultRegisterer. If the registration fails, NewSummaryVec +// panics. +func NewSummaryVec(opts prometheus.SummaryOpts, labelNames []string) *prometheus.SummaryVec { + s := prometheus.NewSummaryVec(opts, labelNames) + prometheus.MustRegister(s) + return s +} + +// NewHistogram works like the function of the same name in the prometheus +// package but it automatically registers the Histogram with the +// prometheus.DefaultRegisterer. If the registration fails, NewHistogram panics. +func NewHistogram(opts prometheus.HistogramOpts) prometheus.Histogram { + h := prometheus.NewHistogram(opts) + prometheus.MustRegister(h) + return h +} + +// NewHistogramVec works like the function of the same name in the prometheus +// package but it automatically registers the HistogramVec with the +// prometheus.DefaultRegisterer. If the registration fails, NewHistogramVec +// panics. +func NewHistogramVec(opts prometheus.HistogramOpts, labelNames []string) *prometheus.HistogramVec { + h := prometheus.NewHistogramVec(opts, labelNames) + prometheus.MustRegister(h) + return h +} diff --git a/vendor/github.com/prometheus/tsdb/.travis.yml b/vendor/github.com/prometheus/tsdb/.travis.yml index c5012a34a..faed5148e 100644 --- a/vendor/github.com/prometheus/tsdb/.travis.yml +++ b/vendor/github.com/prometheus/tsdb/.travis.yml @@ -20,5 +20,4 @@ install: - make deps script: - # `staticcheck` target is omitted due to linting errors - - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make; fi + - if [[ "$TRAVIS_OS_NAME" == "windows" ]]; then make test; else make all; fi diff --git a/vendor/github.com/prometheus/tsdb/CHANGELOG.md b/vendor/github.com/prometheus/tsdb/CHANGELOG.md index 184734537..705dc6b4c 100644 --- a/vendor/github.com/prometheus/tsdb/CHANGELOG.md +++ b/vendor/github.com/prometheus/tsdb/CHANGELOG.md @@ -1,12 +1,19 @@ ## master / unreleased + - [ENHANCEMENT] Time-ovelapping blocks are now allowed. [#370](https://github.com/prometheus/tsdb/pull/370) + - Added `MergeChunks` function in `chunkenc/xor.go` to merge 2 time-overlapping chunks. + - Added `MergeOverlappingChunks` function in `chunks/chunks.go` to merge multiple time-overlapping Chunk Metas. + - Added `MinTime` and `MaxTime` method for `BlockReader`. + - [CHANGE] `NewLeveledCompactor` takes a context so that a compaction is canceled when closing the db. + - [ENHANCEMENT] When closing the db any running compaction will be cancelled so it doesn't block. + - [CHANGE] `prometheus_tsdb_storage_blocks_bytes_total` is now `prometheus_tsdb_storage_blocks_bytes` ## 0.4.0 - [CHANGE] New `WALSegmentSize` option to override the `DefaultOptions.WALSegmentSize`. Added to allow using smaller wal files. For example using tmpfs on a RPI to minimise the SD card wear out from the constant WAL writes. As part of this change the `DefaultOptions.WALSegmentSize` constant was also exposed. - [CHANGE] Empty blocks are not written during compaction [#374](https://github.com/prometheus/tsdb/pull/374) - [FEATURE] Size base retention through `Options.MaxBytes`. As part of this change: - - added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` - - new public interface `SizeReader: Size() int64` - - `OpenBlock` signature changed to take a logger. + - Added new metrics - `prometheus_tsdb_storage_blocks_bytes_total`, `prometheus_tsdb_size_retentions_total`, `prometheus_tsdb_time_retentions_total` + - New public interface `SizeReader: Size() int64` + - `OpenBlock` signature changed to take a logger. - [REMOVED] `PrefixMatcher` is considered unused so was removed. - [CLEANUP] `Options.WALFlushInterval` is removed as it wasn't used anywhere. - [FEATURE] Add new `LiveReader` to WAL pacakge. Added to allow live tailing of a WAL segment, used by Prometheus Remote Write after refactor. The main difference between the new reader and the existing `Reader` is that for `LiveReader` a call to `Next()` that returns false does not mean that there will never be more data to read. @@ -21,4 +28,4 @@ - [CHANGE] `Head.Init()` is changed to `Head.Init(minValidTime int64)` - [CHANGE] `SymbolTable()` renamed to `SymbolTableSize()` to make the name consistent with the `Block{ symbolTableSize uint64 }` field. - [CHANGE] `wal.Reader{}` now exposes `Segment()` for the current segment being read and `Offset()` for the current offset. - -[FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc. + - [FEATURE] tsdbutil analyze subcomand to find churn, high cardinality, etc. diff --git a/vendor/github.com/prometheus/tsdb/Makefile.common b/vendor/github.com/prometheus/tsdb/Makefile.common index 741579e60..7e86f4543 100644 --- a/vendor/github.com/prometheus/tsdb/Makefile.common +++ b/vendor/github.com/prometheus/tsdb/Makefile.common @@ -29,12 +29,15 @@ GO ?= go GOFMT ?= $(GO)fmt FIRST_GOPATH := $(firstword $(subst :, ,$(shell $(GO) env GOPATH))) GOOPTS ?= +GOHOSTOS ?= $(shell $(GO) env GOHOSTOS) +GOHOSTARCH ?= $(shell $(GO) env GOHOSTARCH) GO_VERSION ?= $(shell $(GO) version) GO_VERSION_NUMBER ?= $(word 3, $(GO_VERSION)) PRE_GO_111 ?= $(shell echo $(GO_VERSION_NUMBER) | grep -E 'go1\.(10|[0-9])\.') -unexport GOVENDOR +GOVENDOR := +GO111MODULE := ifeq (, $(PRE_GO_111)) ifneq (,$(wildcard go.mod)) # Enforce Go modules support just in case the directory is inside GOPATH (and for Travis CI). @@ -55,32 +58,43 @@ $(warning Some recipes may not work as expected as the current Go runtime is '$( # This repository isn't using Go modules (yet). GOVENDOR := $(FIRST_GOPATH)/bin/govendor endif - - unexport GO111MODULE endif PROMU := $(FIRST_GOPATH)/bin/promu STATICCHECK := $(FIRST_GOPATH)/bin/staticcheck pkgs = ./... -GO_VERSION ?= $(shell $(GO) version) -GO_BUILD_PLATFORM ?= $(subst /,-,$(lastword $(GO_VERSION))) +ifeq (arm, $(GOHOSTARCH)) + GOHOSTARM ?= $(shell GOARM= $(GO) env GOARM) + GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH)v$(GOHOSTARM) +else + GO_BUILD_PLATFORM ?= $(GOHOSTOS)-$(GOHOSTARCH) +endif PROMU_VERSION ?= 0.2.0 PROMU_URL := https://github.com/prometheus/promu/releases/download/v$(PROMU_VERSION)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM).tar.gz +STATICCHECK_VERSION ?= 2019.1 +STATICCHECK_URL := https://github.com/dominikh/go-tools/releases/download/$(STATICCHECK_VERSION)/staticcheck_$(GOHOSTOS)_$(GOHOSTARCH) PREFIX ?= $(shell pwd) BIN_DIR ?= $(shell pwd) DOCKER_IMAGE_TAG ?= $(subst /,-,$(shell git rev-parse --abbrev-ref HEAD)) DOCKER_REPO ?= prom -.PHONY: all -all: precheck style staticcheck unused build test +ifeq ($(GOHOSTARCH),amd64) + ifeq ($(GOHOSTOS),$(filter $(GOHOSTOS),linux freebsd darwin windows)) + # Only supported on amd64 + test-flags := -race + endif +endif # This rule is used to forward a target like "build" to "common-build". This # allows a new "build" target to be defined in a Makefile which includes this # one and override "common-build" without override warnings. %: common-% ; +.PHONY: common-all +common-all: precheck style check_license staticcheck unused build test + .PHONY: common-style common-style: @echo ">> checking code style" @@ -110,12 +124,12 @@ common-test-short: .PHONY: common-test common-test: @echo ">> running all tests" - GO111MODULE=$(GO111MODULE) $(GO) test -race $(GOOPTS) $(pkgs) + GO111MODULE=$(GO111MODULE) $(GO) test $(test-flags) $(GOOPTS) $(pkgs) .PHONY: common-format common-format: @echo ">> formatting code" - GO111MODULE=$(GO111MODULE) $(GO) fmt $(GOOPTS) $(pkgs) + GO111MODULE=$(GO111MODULE) $(GO) fmt $(pkgs) .PHONY: common-vet common-vet: @@ -125,8 +139,12 @@ common-vet: .PHONY: common-staticcheck common-staticcheck: $(STATICCHECK) @echo ">> running staticcheck" + chmod +x $(STATICCHECK) ifdef GO111MODULE - GO111MODULE=$(GO111MODULE) $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" -checks "SA*" $(pkgs) +# 'go list' needs to be executed before staticcheck to prepopulate the modules cache. +# Otherwise staticcheck might fail randomly for some reason not yet explained. + GO111MODULE=$(GO111MODULE) $(GO) list -e -compiled -test=true -export=false -deps=true -find=false -tags= -- ./... > /dev/null + GO111MODULE=$(GO111MODULE) $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs) else $(STATICCHECK) -ignore "$(STATICCHECK_IGNORE)" $(pkgs) endif @@ -140,8 +158,9 @@ else ifdef GO111MODULE @echo ">> running check for unused/missing packages in go.mod" GO111MODULE=$(GO111MODULE) $(GO) mod tidy +ifeq (,$(wildcard vendor)) @git diff --exit-code -- go.sum go.mod -ifneq (,$(wildcard vendor)) +else @echo ">> running check for unused packages in vendor/" GO111MODULE=$(GO111MODULE) $(GO) mod vendor @git diff --exit-code -- go.sum go.mod vendor/ @@ -175,30 +194,20 @@ common-docker-tag-latest: promu: $(PROMU) $(PROMU): - curl -s -L $(PROMU_URL) | tar -xvz -C /tmp - mkdir -v -p $(FIRST_GOPATH)/bin - cp -v /tmp/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(PROMU) + $(eval PROMU_TMP := $(shell mktemp -d)) + curl -s -L $(PROMU_URL) | tar -xvzf - -C $(PROMU_TMP) + mkdir -p $(FIRST_GOPATH)/bin + cp $(PROMU_TMP)/promu-$(PROMU_VERSION).$(GO_BUILD_PLATFORM)/promu $(FIRST_GOPATH)/bin/promu + rm -r $(PROMU_TMP) .PHONY: proto proto: @echo ">> generating code from proto files" @./scripts/genproto.sh -.PHONY: $(STATICCHECK) $(STATICCHECK): -ifdef GO111MODULE -# Get staticcheck from a temporary directory to avoid modifying the local go.{mod,sum}. -# See https://github.com/golang/go/issues/27643. -# For now, we are using the next branch of staticcheck because master isn't compatible yet with Go modules. - tmpModule=$$(mktemp -d 2>&1) && \ - mkdir -p $${tmpModule}/staticcheck && \ - cd "$${tmpModule}"/staticcheck && \ - GO111MODULE=on $(GO) mod init example.com/staticcheck && \ - GO111MODULE=on GOOS= GOARCH= $(GO) get -u honnef.co/go/tools/cmd/staticcheck@next && \ - rm -rf $${tmpModule}; -else - GOOS= GOARCH= GO111MODULE=off $(GO) get -u honnef.co/go/tools/cmd/staticcheck -endif + mkdir -p $(FIRST_GOPATH)/bin + curl -s -L $(STATICCHECK_URL) > $(STATICCHECK) ifdef GOVENDOR .PHONY: $(GOVENDOR) diff --git a/vendor/github.com/prometheus/tsdb/block.go b/vendor/github.com/prometheus/tsdb/block.go index 42e11d951..d9570bdd6 100644 --- a/vendor/github.com/prometheus/tsdb/block.go +++ b/vendor/github.com/prometheus/tsdb/block.go @@ -16,6 +16,7 @@ package tsdb import ( "encoding/json" + "io" "io/ioutil" "os" "path/filepath" @@ -134,6 +135,12 @@ type BlockReader interface { // Tombstones returns a TombstoneReader over the block's deleted data. Tombstones() (TombstoneReader, error) + + // MinTime returns the min time of the block. + MinTime() int64 + + // MaxTime returns the max time of the block. + MaxTime() int64 } // Appendable defines an entity to which data can be appended. @@ -269,10 +276,19 @@ type Block struct { // OpenBlock opens the block in the directory. It can be passed a chunk pool, which is used // to instantiate chunk structs. -func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error) { +func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (pb *Block, err error) { if logger == nil { logger = log.NewNopLogger() } + var closers []io.Closer + defer func() { + if err != nil { + var merr MultiError + merr.Add(err) + merr.Add(closeAll(closers)) + err = merr.Err() + } + }() meta, err := readMetaFile(dir) if err != nil { return nil, err @@ -282,15 +298,19 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error if err != nil { return nil, err } - ir, err := index.NewFileReader(filepath.Join(dir, "index")) + closers = append(closers, cr) + + ir, err := index.NewFileReader(filepath.Join(dir, indexFilename)) if err != nil { return nil, err } + closers = append(closers, ir) tr, tsr, err := readTombstones(dir) if err != nil { return nil, err } + closers = append(closers, tr) // TODO refactor to set this at block creation time as // that would be the logical place for a block size to be calculated. @@ -301,7 +321,7 @@ func OpenBlock(logger log.Logger, dir string, pool chunkenc.Pool) (*Block, error level.Warn(logger).Log("msg", "couldn't write the meta file for the block size", "block", dir, "err", err) } - pb := &Block{ + pb = &Block{ dir: dir, meta: *meta, chunkr: cr, @@ -349,6 +369,12 @@ func (pb *Block) Dir() string { return pb.dir } // Meta returns meta information about the block. func (pb *Block) Meta() BlockMeta { return pb.meta } +// MinTime returns the min time of the meta. +func (pb *Block) MinTime() int64 { return pb.meta.MinTime } + +// MaxTime returns the max time of the meta. +func (pb *Block) MaxTime() int64 { return pb.meta.MaxTime } + // Size returns the number of bytes that the block takes up. func (pb *Block) Size() int64 { return pb.meta.Stats.NumBytes } diff --git a/vendor/github.com/prometheus/tsdb/chunkenc/bstream.go b/vendor/github.com/prometheus/tsdb/chunkenc/bstream.go index ef04d44ba..0a02a7303 100644 --- a/vendor/github.com/prometheus/tsdb/chunkenc/bstream.go +++ b/vendor/github.com/prometheus/tsdb/chunkenc/bstream.go @@ -53,16 +53,6 @@ func newBReader(b []byte) bstream { return bstream{stream: b, count: 8} } -func newBWriter(size int) *bstream { - return &bstream{stream: make([]byte, 0, size), count: 0} -} - -func (b *bstream) clone() *bstream { - d := make([]byte, len(b.stream)) - copy(d, b.stream) - return &bstream{stream: d, count: b.count} -} - func (b *bstream) bytes() []byte { return b.stream } diff --git a/vendor/github.com/prometheus/tsdb/chunkenc/chunk.go b/vendor/github.com/prometheus/tsdb/chunkenc/chunk.go index 4c85fa054..12dc75403 100644 --- a/vendor/github.com/prometheus/tsdb/chunkenc/chunk.go +++ b/vendor/github.com/prometheus/tsdb/chunkenc/chunk.go @@ -52,7 +52,7 @@ type Chunk interface { func FromData(e Encoding, d []byte) (Chunk, error) { switch e { case EncXOR: - return &XORChunk{b: &bstream{count: 0, stream: d}}, nil + return &XORChunk{b: bstream{count: 0, stream: d}}, nil } return nil, fmt.Errorf("unknown chunk encoding: %d", e) } @@ -94,7 +94,7 @@ func NewPool() Pool { return &pool{ xor: sync.Pool{ New: func() interface{} { - return &XORChunk{b: &bstream{}} + return &XORChunk{b: bstream{}} }, }, } diff --git a/vendor/github.com/prometheus/tsdb/chunkenc/xor.go b/vendor/github.com/prometheus/tsdb/chunkenc/xor.go index 77cc3208b..1518772b3 100644 --- a/vendor/github.com/prometheus/tsdb/chunkenc/xor.go +++ b/vendor/github.com/prometheus/tsdb/chunkenc/xor.go @@ -51,13 +51,13 @@ import ( // XORChunk holds XOR encoded sample data. type XORChunk struct { - b *bstream + b bstream } // NewXORChunk returns a new chunk with XOR encoding of the given size. func NewXORChunk() *XORChunk { b := make([]byte, 2, 128) - return &XORChunk{b: &bstream{stream: b, count: 0}} + return &XORChunk{b: bstream{stream: b, count: 0}} } // Encoding returns the encoding type. @@ -89,7 +89,7 @@ func (c *XORChunk) Appender() (Appender, error) { } a := &xorAppender{ - b: c.b, + b: &c.b, t: it.t, v: it.val, tDelta: it.tDelta, diff --git a/vendor/github.com/prometheus/tsdb/chunks/chunks.go b/vendor/github.com/prometheus/tsdb/chunks/chunks.go index 8fb288384..3f643bc74 100644 --- a/vendor/github.com/prometheus/tsdb/chunks/chunks.go +++ b/vendor/github.com/prometheus/tsdb/chunks/chunks.go @@ -64,9 +64,7 @@ func (cm *Meta) OverlapsClosedInterval(mint, maxt int64) bool { } var ( - errInvalidSize = fmt.Errorf("invalid size") - errInvalidFlag = fmt.Errorf("invalid flag") - errInvalidChecksum = fmt.Errorf("invalid checksum") + errInvalidSize = fmt.Errorf("invalid size") ) var castagnoliTable *crc32.Table @@ -198,6 +196,84 @@ func (w *Writer) write(b []byte) error { return err } +// MergeOverlappingChunks removes the samples whose timestamp is overlapping. +// The last appearing sample is retained in case there is overlapping. +// This assumes that `chks []Meta` is sorted w.r.t. MinTime. +func MergeOverlappingChunks(chks []Meta) ([]Meta, error) { + if len(chks) < 2 { + return chks, nil + } + newChks := make([]Meta, 0, len(chks)) // Will contain the merged chunks. + newChks = append(newChks, chks[0]) + last := 0 + for _, c := range chks[1:] { + // We need to check only the last chunk in newChks. + // Reason: (1) newChks[last-1].MaxTime < newChks[last].MinTime (non overlapping) + // (2) As chks are sorted w.r.t. MinTime, newChks[last].MinTime < c.MinTime. + // So never overlaps with newChks[last-1] or anything before that. + if c.MinTime > newChks[last].MaxTime { + newChks = append(newChks, c) + continue + } + nc := &newChks[last] + if c.MaxTime > nc.MaxTime { + nc.MaxTime = c.MaxTime + } + chk, err := MergeChunks(nc.Chunk, c.Chunk) + if err != nil { + return nil, err + } + nc.Chunk = chk + } + + return newChks, nil +} + +// MergeChunks vertically merges a and b, i.e., if there is any sample +// with same timestamp in both a and b, the sample in a is discarded. +func MergeChunks(a, b chunkenc.Chunk) (*chunkenc.XORChunk, error) { + newChunk := chunkenc.NewXORChunk() + app, err := newChunk.Appender() + if err != nil { + return nil, err + } + ait := a.Iterator() + bit := b.Iterator() + aok, bok := ait.Next(), bit.Next() + for aok && bok { + at, av := ait.At() + bt, bv := bit.At() + if at < bt { + app.Append(at, av) + aok = ait.Next() + } else if bt < at { + app.Append(bt, bv) + bok = bit.Next() + } else { + app.Append(bt, bv) + aok = ait.Next() + bok = bit.Next() + } + } + for aok { + at, av := ait.At() + app.Append(at, av) + aok = ait.Next() + } + for bok { + bt, bv := bit.At() + app.Append(bt, bv) + bok = bit.Next() + } + if ait.Err() != nil { + return nil, ait.Err() + } + if bit.Err() != nil { + return nil, bit.Err() + } + return newChunk, nil +} + func (w *Writer) WriteChunks(chks ...Meta) error { // Calculate maximum space we need and cut a new segment in case // we don't fit into the current one. @@ -344,7 +420,7 @@ func NewDirReader(dir string, pool chunkenc.Pool) (*Reader, error) { } func (s *Reader) Close() error { - return closeAll(s.cs...) + return closeAll(s.cs) } // Size returns the size of the chunks. @@ -352,30 +428,31 @@ func (s *Reader) Size() int64 { return s.size } +// Chunk returns a chunk from a given reference. func (s *Reader) Chunk(ref uint64) (chunkenc.Chunk, error) { var ( - seq = int(ref >> 32) - off = int((ref << 32) >> 32) + sgmSeq = int(ref >> 32) + sgmOffset = int((ref << 32) >> 32) ) - if seq >= len(s.bs) { - return nil, errors.Errorf("reference sequence %d out of range", seq) + if sgmSeq >= len(s.bs) { + return nil, errors.Errorf("reference sequence %d out of range", sgmSeq) } - b := s.bs[seq] + chkS := s.bs[sgmSeq] - if off >= b.Len() { - return nil, errors.Errorf("offset %d beyond data size %d", off, b.Len()) + if sgmOffset >= chkS.Len() { + return nil, errors.Errorf("offset %d beyond data size %d", sgmOffset, chkS.Len()) } // With the minimum chunk length this should never cause us reading // over the end of the slice. - r := b.Range(off, off+binary.MaxVarintLen32) + chk := chkS.Range(sgmOffset, sgmOffset+binary.MaxVarintLen32) - l, n := binary.Uvarint(r) + chkLen, n := binary.Uvarint(chk) if n <= 0 { return nil, errors.Errorf("reading chunk length failed with %d", n) } - r = b.Range(off+n, off+n+int(l)) + chk = chkS.Range(sgmOffset+n, sgmOffset+n+1+int(chkLen)) - return s.pool.Get(chunkenc.Encoding(r[0]), r[1:1+l]) + return s.pool.Get(chunkenc.Encoding(chk[0]), chk[1:1+chkLen]) } func nextSequenceFile(dir string) (string, int, error) { @@ -411,7 +488,7 @@ func sequenceFiles(dir string) ([]string, error) { return res, nil } -func closeAll(cs ...io.Closer) (err error) { +func closeAll(cs []io.Closer) (err error) { for _, c := range cs { if e := c.Close(); e != nil { err = e diff --git a/vendor/github.com/prometheus/tsdb/compact.go b/vendor/github.com/prometheus/tsdb/compact.go index 5d8155f51..065b43e75 100644 --- a/vendor/github.com/prometheus/tsdb/compact.go +++ b/vendor/github.com/prometheus/tsdb/compact.go @@ -14,8 +14,10 @@ package tsdb import ( + "context" "fmt" "io" + "math" "math/rand" "os" "path/filepath" @@ -71,20 +73,22 @@ type Compactor interface { // LeveledCompactor implements the Compactor interface. type LeveledCompactor struct { - dir string metrics *compactorMetrics logger log.Logger ranges []int64 chunkPool chunkenc.Pool + ctx context.Context } type compactorMetrics struct { - ran prometheus.Counter - failed prometheus.Counter - duration prometheus.Histogram - chunkSize prometheus.Histogram - chunkSamples prometheus.Histogram - chunkRange prometheus.Histogram + ran prometheus.Counter + populatingBlocks prometheus.Gauge + failed prometheus.Counter + overlappingBlocks prometheus.Counter + duration prometheus.Histogram + chunkSize prometheus.Histogram + chunkSamples prometheus.Histogram + chunkRange prometheus.Histogram } func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { @@ -94,10 +98,18 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { Name: "prometheus_tsdb_compactions_total", Help: "Total number of compactions that were executed for the partition.", }) + m.populatingBlocks = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "prometheus_tsdb_compaction_populating_block", + Help: "Set to 1 when a block is currently being written to the disk.", + }) m.failed = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_compactions_failed_total", Help: "Total number of compactions that failed for the partition.", }) + m.overlappingBlocks = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_vertical_compactions_total", + Help: "Total number of compactions done on overlapping blocks.", + }) m.duration = prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "prometheus_tsdb_compaction_duration_seconds", Help: "Duration of compaction runs", @@ -122,7 +134,9 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { if r != nil { r.MustRegister( m.ran, + m.populatingBlocks, m.failed, + m.overlappingBlocks, m.duration, m.chunkRange, m.chunkSamples, @@ -133,18 +147,22 @@ func newCompactorMetrics(r prometheus.Registerer) *compactorMetrics { } // NewLeveledCompactor returns a LeveledCompactor. -func NewLeveledCompactor(r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { +func NewLeveledCompactor(ctx context.Context, r prometheus.Registerer, l log.Logger, ranges []int64, pool chunkenc.Pool) (*LeveledCompactor, error) { if len(ranges) == 0 { return nil, errors.Errorf("at least one range must be provided") } if pool == nil { pool = chunkenc.NewPool() } + if l == nil { + l = log.NewNopLogger() + } return &LeveledCompactor{ ranges: ranges, chunkPool: pool, logger: l, metrics: newCompactorMetrics(r), + ctx: ctx, }, nil } @@ -179,11 +197,15 @@ func (c *LeveledCompactor) plan(dms []dirMeta) ([]string, error) { return dms[i].meta.MinTime < dms[j].meta.MinTime }) + res := c.selectOverlappingDirs(dms) + if len(res) > 0 { + return res, nil + } + // No overlapping blocks, do compaction the usual way. // We do not include a recently created block with max(minTime), so the block which was just created from WAL. // This gives users a window of a full block size to piece-wise backup new data without having to care about data overlap. dms = dms[:len(dms)-1] - var res []string for _, dm := range c.selectDirs(dms) { res = append(res, dm.dir) } @@ -244,6 +266,28 @@ func (c *LeveledCompactor) selectDirs(ds []dirMeta) []dirMeta { return nil } +// selectOverlappingDirs returns all dirs with overlaping time ranges. +// It expects sorted input by mint. +func (c *LeveledCompactor) selectOverlappingDirs(ds []dirMeta) []string { + if len(ds) < 2 { + return nil + } + var overlappingDirs []string + globalMaxt := ds[0].meta.MaxTime + for i, d := range ds[1:] { + if d.meta.MinTime < globalMaxt { + if len(overlappingDirs) == 0 { // When it is the first overlap, need to add the last one as well. + overlappingDirs = append(overlappingDirs, ds[i].dir) + } + overlappingDirs = append(overlappingDirs, d.dir) + } + if d.meta.MaxTime > globalMaxt { + globalMaxt = d.meta.MaxTime + } + } + return overlappingDirs +} + // splitByRange splits the directories by the time range. The range sequence starts at 0. // // For example, if we have blocks [0-10, 10-20, 50-60, 90-100] and the split range tr is 30 @@ -291,12 +335,17 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { res := &BlockMeta{ ULID: uid, MinTime: blocks[0].MinTime, - MaxTime: blocks[len(blocks)-1].MaxTime, } sources := map[ulid.ULID]struct{}{} + // For overlapping blocks, the Maxt can be + // in any block so we track it globally. + maxt := int64(math.MinInt64) for _, b := range blocks { + if b.MaxTime > maxt { + maxt = b.MaxTime + } if b.Compaction.Level > res.Compaction.Level { res.Compaction.Level = b.Compaction.Level } @@ -318,6 +367,7 @@ func compactBlockMetas(uid ulid.ULID, blocks ...*BlockMeta) *BlockMeta { return res.Compaction.Sources[i].Compare(res.Compaction.Sources[j]) < 0 }) + res.MaxTime = maxt return res } @@ -403,10 +453,11 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u var merr MultiError merr.Add(err) - - for _, b := range bs { - if err := b.setCompactionFailed(); err != nil { - merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + if err != context.Canceled { + for _, b := range bs { + if err := b.setCompactionFailed(); err != nil { + merr.Add(errors.Wrapf(err, "setting compaction failed for block: %s", b.Dir())) + } } } @@ -414,6 +465,8 @@ func (c *LeveledCompactor) Compact(dest string, dirs []string, open []*Block) (u } func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, parent *BlockMeta) (ulid.ULID, error) { + start := time.Now() + entropy := rand.New(rand.NewSource(time.Now().UnixNano())) uid := ulid.MustNew(ulid.Now(), entropy) @@ -440,7 +493,13 @@ func (c *LeveledCompactor) Write(dest string, b BlockReader, mint, maxt int64, p return ulid.ULID{}, nil } - level.Info(c.logger).Log("msg", "write block", "mint", meta.MinTime, "maxt", meta.MaxTime, "ulid", meta.ULID) + level.Info(c.logger).Log( + "msg", "write block", + "mint", meta.MinTime, + "maxt", meta.MaxTime, + "ulid", meta.ULID, + "duration", time.Since(start), + ) return uid, nil } @@ -468,14 +527,19 @@ func (w *instrumentedChunkWriter) WriteChunks(chunks ...chunks.Meta) error { func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockReader) (err error) { dir := filepath.Join(dest, meta.ULID.String()) tmp := dir + ".tmp" - + var closers []io.Closer defer func(t time.Time) { + var merr MultiError + merr.Add(err) + merr.Add(closeAll(closers)) + err = merr.Err() + + // RemoveAll returns no error when tmp doesn't exist so it is safe to always run it. + if err := os.RemoveAll(tmp); err != nil { + level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) + } if err != nil { c.metrics.failed.Inc() - // TODO(gouthamve): Handle error how? - if err := os.RemoveAll(tmp); err != nil { - level.Error(c.logger).Log("msg", "removed tmp folder after failed compaction", "err", err.Error()) - } } c.metrics.ran.Inc() c.metrics.duration.Observe(time.Since(t).Seconds()) @@ -497,7 +561,7 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open chunk writer") } - defer chunkw.Close() + closers = append(closers, chunkw) // Record written chunk sizes on level 1 compactions. if meta.Compaction.Level == 1 { chunkw = &instrumentedChunkWriter{ @@ -512,27 +576,33 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe if err != nil { return errors.Wrap(err, "open index writer") } - defer indexw.Close() + closers = append(closers, indexw) if err := c.populateBlock(blocks, meta, indexw, chunkw); err != nil { return errors.Wrap(err, "write compaction") } + + select { + case <-c.ctx.Done(): + return c.ctx.Err() + default: + } + // We are explicitly closing them here to check for error even // though these are covered under defer. This is because in Windows, // you cannot delete these unless they are closed and the defer is to // make sure they are closed if the function exits due to an error above. - if err = chunkw.Close(); err != nil { - return errors.Wrap(err, "close chunk writer") + var merr MultiError + for _, w := range closers { + merr.Add(w.Close()) } - if err = indexw.Close(); err != nil { - return errors.Wrap(err, "close index writer") + closers = closers[:0] // Avoid closing the writers twice in the defer. + if merr.Err() != nil { + return merr.Err() } - // Populated block is empty, so cleanup and exit. + // Populated block is empty, so exit early. if meta.Stats.NumSamples == 0 { - if err := os.RemoveAll(tmp); err != nil { - return errors.Wrap(err, "remove tmp folder after empty block failed") - } return nil } @@ -575,19 +645,46 @@ func (c *LeveledCompactor) write(dest string, meta *BlockMeta, blocks ...BlockRe // populateBlock fills the index and chunk writers with new data gathered as the union // of the provided blocks. It returns meta information for the new block. -func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) error { +// It expects sorted blocks input by mint. +func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, indexw IndexWriter, chunkw ChunkWriter) (err error) { if len(blocks) == 0 { return errors.New("cannot populate block from no readers") } var ( - set ChunkSeriesSet - allSymbols = make(map[string]struct{}, 1<<16) - closers = []io.Closer{} + set ChunkSeriesSet + allSymbols = make(map[string]struct{}, 1<<16) + closers = []io.Closer{} + overlapping bool ) - defer func() { closeAll(closers...) }() + defer func() { + var merr MultiError + merr.Add(err) + merr.Add(closeAll(closers)) + err = merr.Err() + c.metrics.populatingBlocks.Set(0) + }() + c.metrics.populatingBlocks.Set(1) + globalMaxt := blocks[0].MaxTime() for i, b := range blocks { + select { + case <-c.ctx.Done(): + return c.ctx.Err() + default: + } + + if !overlapping { + if i > 0 && b.MinTime() < globalMaxt { + c.metrics.overlappingBlocks.Inc() + overlapping = true + level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID) + } + if b.MaxTime() > globalMaxt { + globalMaxt = b.MaxTime() + } + } + indexr, err := b.Index() if err != nil { return errors.Wrapf(err, "open index reader for block %s", b) @@ -644,7 +741,19 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } for set.Next() { + select { + case <-c.ctx.Done(): + return c.ctx.Err() + default: + } + lset, chks, dranges := set.At() // The chunks here are not fully deleted. + if overlapping { + // If blocks are overlapping, it is possible to have unsorted chunks. + sort.Slice(chks, func(i, j int) bool { + return chks[i].MinTime < chks[j].MinTime + }) + } // Skip the series with all deleted chunks. if len(chks) == 0 { @@ -678,21 +787,28 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } } - if err := chunkw.WriteChunks(chks...); err != nil { + mergedChks := chks + if overlapping { + mergedChks, err = chunks.MergeOverlappingChunks(chks) + if err != nil { + return errors.Wrap(err, "merge overlapping chunks") + } + } + if err := chunkw.WriteChunks(mergedChks...); err != nil { return errors.Wrap(err, "write chunks") } - if err := indexw.AddSeries(i, lset, chks...); err != nil { + if err := indexw.AddSeries(i, lset, mergedChks...); err != nil { return errors.Wrap(err, "add series") } - meta.Stats.NumChunks += uint64(len(chks)) + meta.Stats.NumChunks += uint64(len(mergedChks)) meta.Stats.NumSeries++ - for _, chk := range chks { + for _, chk := range mergedChks { meta.Stats.NumSamples += uint64(chk.Chunk.NumSamples()) } - for _, chk := range chks { + for _, chk := range mergedChks { if err := c.chunkPool.Put(chk.Chunk); err != nil { return errors.Wrap(err, "put chunk") } diff --git a/vendor/github.com/prometheus/tsdb/db.go b/vendor/github.com/prometheus/tsdb/db.go index 1349dfbd5..fd457ca4c 100644 --- a/vendor/github.com/prometheus/tsdb/db.go +++ b/vendor/github.com/prometheus/tsdb/db.go @@ -16,6 +16,7 @@ package tsdb import ( "bytes" + "context" "fmt" "io" "io/ioutil" @@ -126,6 +127,9 @@ type DB struct { // changing the autoCompact var. autoCompactMtx sync.Mutex autoCompact bool + + // Cancel a running compaction when a shutdown is initiated. + compactCancel context.CancelFunc } type dbMetrics struct { @@ -202,7 +206,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Help: "The time taken to recompact blocks to remove tombstones.", }) m.blocksBytes = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "prometheus_tsdb_storage_blocks_bytes_total", + Name: "prometheus_tsdb_storage_blocks_bytes", Help: "The number of bytes that are currently used for local storage by all blocks.", }) m.sizeRetentionCount = prometheus.NewCounter(prometheus.CounterOpts{ @@ -271,10 +275,13 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db db.lockf = lockf } - db.compactor, err = NewLeveledCompactor(r, l, opts.BlockRanges, db.chunkPool) + ctx, cancel := context.WithCancel(context.Background()) + db.compactor, err = NewLeveledCompactor(ctx, r, l, opts.BlockRanges, db.chunkPool) if err != nil { + cancel() return nil, errors.Wrap(err, "create leveled compactor") } + db.compactCancel = cancel segmentSize := wal.DefaultSegmentSize if opts.WALSegmentSize > 0 { @@ -425,6 +432,9 @@ func (db *DB) compact() (err error) { runtime.GC() if err := db.reload(); err != nil { + if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { + return errors.Wrapf(err, "delete persisted head block after failed db reload:%s", uid) + } return errors.Wrap(err, "reload blocks") } if (uid == ulid.ULID{}) { @@ -454,12 +464,16 @@ func (db *DB) compact() (err error) { default: } - if _, err := db.compactor.Compact(db.dir, plan, db.blocks); err != nil { + uid, err := db.compactor.Compact(db.dir, plan, db.blocks) + if err != nil { return errors.Wrapf(err, "compact %s", plan) } runtime.GC() if err := db.reload(); err != nil { + if err := os.RemoveAll(filepath.Join(db.dir, uid.String())); err != nil { + return errors.Wrapf(err, "delete compacted block after failed db reload:%s", uid) + } return errors.Wrap(err, "reload blocks") } runtime.GC() @@ -505,7 +519,13 @@ func (db *DB) reload() (err error) { } } if len(corrupted) > 0 { - return errors.Wrap(err, "unexpected corrupted block") + // Close all new blocks to release the lock for windows. + for _, block := range loadable { + if _, loaded := db.getBlock(block.Meta().ULID); !loaded { + block.Close() + } + } + return fmt.Errorf("unexpected corrupted block:%v", corrupted) } // All deletable blocks should not be loaded. @@ -526,11 +546,8 @@ func (db *DB) reload() (err error) { db.metrics.blocksBytes.Set(float64(blocksSize)) sort.Slice(loadable, func(i, j int) bool { - return loadable[i].Meta().MaxTime < loadable[j].Meta().MaxTime + return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime }) - if err := validateBlockSequence(loadable); err != nil { - return errors.Wrap(err, "invalid block sequence") - } // Swap new blocks first for subsequently created readers to be seen. db.mtx.Lock() @@ -538,6 +555,14 @@ func (db *DB) reload() (err error) { db.blocks = loadable db.mtx.Unlock() + blockMetas := make([]BlockMeta, 0, len(loadable)) + for _, b := range loadable { + blockMetas = append(blockMetas, b.Meta()) + } + if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { + level.Warn(db.logger).Log("msg", "overlapping blocks found during reload", "detail", overlaps.String()) + } + for _, b := range oldBlocks { if _, ok := deletable[b.Meta().ULID]; ok { deletable[b.Meta().ULID] = b @@ -674,25 +699,6 @@ func (db *DB) deleteBlocks(blocks map[ulid.ULID]*Block) error { return nil } -// validateBlockSequence returns error if given block meta files indicate that some blocks overlaps within sequence. -func validateBlockSequence(bs []*Block) error { - if len(bs) <= 1 { - return nil - } - - var metas []BlockMeta - for _, b := range bs { - metas = append(metas, b.meta) - } - - overlaps := OverlappingBlocks(metas) - if len(overlaps) > 0 { - return errors.Errorf("block time ranges overlap: %s", overlaps) - } - - return nil -} - // TimeRange specifies minTime and maxTime range. type TimeRange struct { Min, Max int64 @@ -813,6 +819,7 @@ func (db *DB) Head() *Head { // Close the partition. func (db *DB) Close() error { close(db.stopc) + db.compactCancel() <-db.donec db.mtx.Lock() @@ -888,6 +895,7 @@ func (db *DB) Snapshot(dir string, withHead bool) error { // A goroutine must not handle more than one open Querier. func (db *DB) Querier(mint, maxt int64) (Querier, error) { var blocks []BlockReader + var blockMetas []BlockMeta db.mtx.RLock() defer db.mtx.RUnlock() @@ -895,6 +903,7 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { for _, b := range db.blocks { if b.OverlapsClosedInterval(mint, maxt) { blocks = append(blocks, b) + blockMetas = append(blockMetas, b.Meta()) } } if maxt >= db.head.MinTime() { @@ -905,22 +914,31 @@ func (db *DB) Querier(mint, maxt int64) (Querier, error) { }) } - sq := &querier{ - blocks: make([]Querier, 0, len(blocks)), - } + blockQueriers := make([]Querier, 0, len(blocks)) for _, b := range blocks { q, err := NewBlockQuerier(b, mint, maxt) if err == nil { - sq.blocks = append(sq.blocks, q) + blockQueriers = append(blockQueriers, q) continue } // If we fail, all previously opened queriers must be closed. - for _, q := range sq.blocks { + for _, q := range blockQueriers { q.Close() } return nil, errors.Wrapf(err, "open querier for block %s", b) } - return sq, nil + + if len(OverlappingBlocks(blockMetas)) > 0 { + return &verticalQuerier{ + querier: querier{ + blocks: blockQueriers, + }, + }, nil + } + + return &querier{ + blocks: blockQueriers, + }, nil } func rangeForTimestamp(t int64, width int64) (maxt int64) { @@ -1084,7 +1102,7 @@ func (es MultiError) Err() error { return es } -func closeAll(cs ...io.Closer) error { +func closeAll(cs []io.Closer) error { var merr MultiError for _, c := range cs { diff --git a/vendor/github.com/prometheus/tsdb/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/encoding_helpers.go index 6dd6e7c2e..9c10e3160 100644 --- a/vendor/github.com/prometheus/tsdb/encoding_helpers.go +++ b/vendor/github.com/prometheus/tsdb/encoding_helpers.go @@ -15,8 +15,6 @@ package tsdb import ( "encoding/binary" - "hash" - "hash/crc32" "unsafe" "github.com/pkg/errors" @@ -32,17 +30,12 @@ type encbuf struct { func (e *encbuf) reset() { e.b = e.b[:0] } func (e *encbuf) get() []byte { return e.b } -func (e *encbuf) len() int { return len(e.b) } func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } -func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } -func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } -func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } -func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } -func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } -func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } +func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } +func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } func (e *encbuf) putBE32(x uint32) { binary.BigEndian.PutUint32(e.c[:], x) @@ -71,16 +64,6 @@ func (e *encbuf) putUvarintStr(s string) { e.putString(s) } -// putHash appends a hash over the buffers current contents to the buffer. -func (e *encbuf) putHash(h hash.Hash) { - h.Reset() - _, err := h.Write(e.b) - if err != nil { - panic(err) // The CRC32 implementation does not error - } - e.b = h.Sum(e.b) -} - // decbuf provides safe methods to extract data from a byte slice. It does all // necessary bounds checking and advancing of the byte slice. // Several datums can be extracted without checking for errors. However, before using @@ -90,15 +73,8 @@ type decbuf struct { e error } -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } -func (d *decbuf) be32int() int { return int(d.be32()) } -func (d *decbuf) be64int64() int64 { return int64(d.be64()) } - -// crc32 returns a CRC32 checksum over the remaining bytes. -func (d *decbuf) crc32() uint32 { - return crc32.Checksum(d.b, castagnoliTable) -} +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) be64int64() int64 { return int64(d.be64()) } func (d *decbuf) uvarintStr() string { l := d.uvarint64() @@ -179,18 +155,6 @@ func (d *decbuf) byte() byte { return x } -func (d *decbuf) decbuf(l int) decbuf { - if d.e != nil { - return decbuf{e: d.e} - } - if l > len(d.b) { - return decbuf{e: errInvalidSize} - } - r := decbuf{b: d.b[:l]} - d.b = d.b[l:] - return r -} - func (d *decbuf) err() error { return d.e } func (d *decbuf) len() int { return len(d.b) } func (d *decbuf) get() []byte { return d.b } diff --git a/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go b/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go index 677df8c09..154fa1844 100644 --- a/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go +++ b/vendor/github.com/prometheus/tsdb/fileutil/fileutil.go @@ -77,9 +77,8 @@ func copyFile(src, dest string) error { // returns relative paths to all files and empty directories. func readDirs(src string) ([]string, error) { var files []string - var err error - err = filepath.Walk(src, func(path string, f os.FileInfo, err error) error { + err := filepath.Walk(src, func(path string, f os.FileInfo, err error) error { relativePath := strings.TrimPrefix(path, src) if len(relativePath) > 0 { files = append(files, relativePath) diff --git a/vendor/github.com/prometheus/tsdb/head.go b/vendor/github.com/prometheus/tsdb/head.go index cbc8661f8..c5ac06c91 100644 --- a/vendor/github.com/prometheus/tsdb/head.go +++ b/vendor/github.com/prometheus/tsdb/head.go @@ -48,6 +48,10 @@ var ( // ErrOutOfBounds is returned if an appended sample is out of the // writable time range. ErrOutOfBounds = errors.New("out of bounds") + + // emptyTombstoneReader is a no-op Tombstone Reader. + // This is used by head to satisfy the Tombstones() function call. + emptyTombstoneReader = newMemTombstones() ) // Head handles reads and writes of time series data within a time window. @@ -71,8 +75,6 @@ type Head struct { values map[string]stringset // label names to possible values postings *index.MemPostings // postings lists for terms - - tombstones *memTombstones } type headMetrics struct { @@ -231,7 +233,6 @@ func NewHead(r prometheus.Registerer, l log.Logger, wal *wal.WAL, chunkRange int values: map[string]stringset{}, symbols: map[string]struct{}{}, postings: index.NewUnorderedMemPostings(), - tombstones: newMemTombstones(), } h.metrics = newHeadMetrics(h, r) @@ -334,12 +335,14 @@ func (h *Head) loadWAL(r *wal.Reader) error { } var ( - dec RecordDecoder - series []RefSeries - samples []RefSample - tstones []Stone - err error + dec RecordDecoder + series []RefSeries + samples []RefSample + tstones []Stone + allStones = newMemTombstones() + err error ) + defer allStones.Close() for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] rec := r.Record() @@ -413,7 +416,7 @@ func (h *Head) loadWAL(r *wal.Reader) error { if itv.Maxt < h.minValidTime { continue } - h.tombstones.addInterval(s.ref, itv) + allStones.addInterval(s.ref, itv) } } default: @@ -436,6 +439,12 @@ func (h *Head) loadWAL(r *wal.Reader) error { } wg.Wait() + if err := allStones.Iter(func(ref uint64, dranges Intervals) error { + return h.chunkRewrite(ref, dranges) + }); err != nil { + return errors.Wrap(r.Err(), "deleting samples from tombstones") + } + if unknownRefs > 0 { level.Warn(h.logger).Log("msg", "unknown series references", "count", unknownRefs) } @@ -604,7 +613,15 @@ func (h *rangeHead) Chunks() (ChunkReader, error) { } func (h *rangeHead) Tombstones() (TombstoneReader, error) { - return h.head.tombstones, nil + return emptyTombstoneReader, nil +} + +func (h *rangeHead) MinTime() int64 { + return h.mint +} + +func (h *rangeHead) MaxTime() int64 { + return h.maxt } // initAppender is a helper to initialize the time bounds of the head @@ -849,7 +866,7 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { } var stones []Stone - + dirty := false for p.Next() { series := h.series.getByID(p.At()) @@ -859,22 +876,61 @@ func (h *Head) Delete(mint, maxt int64, ms ...labels.Matcher) error { } // Delete only until the current values and not beyond. t0, t1 = clampInterval(mint, maxt, t0, t1) - stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + if h.wal != nil { + stones = append(stones, Stone{p.At(), Intervals{{t0, t1}}}) + } + if err := h.chunkRewrite(p.At(), Intervals{{t0, t1}}); err != nil { + return errors.Wrap(err, "delete samples") + } + dirty = true } - if p.Err() != nil { return p.Err() } var enc RecordEncoder - if h.wal != nil { + // Although we don't store the stones in the head + // we need to write them to the WAL to mark these as deleted + // after a restart while loeading the WAL. if err := h.wal.Log(enc.Tombstones(stones, nil)); err != nil { return err } } - for _, s := range stones { - h.tombstones.addInterval(s.ref, s.intervals[0]) + if dirty { + h.gc() } + + return nil +} + +// chunkRewrite re-writes the chunks which overlaps with deleted ranges +// and removes the samples in the deleted ranges. +// Chunks is deleted if no samples are left at the end. +func (h *Head) chunkRewrite(ref uint64, dranges Intervals) (err error) { + if len(dranges) == 0 { + return nil + } + + ms := h.series.getByID(ref) + ms.Lock() + defer ms.Unlock() + if len(ms.chunks) == 0 { + return nil + } + + metas := ms.chunksMetas() + mint, maxt := metas[0].MinTime, metas[len(metas)-1].MaxTime + it := newChunkSeriesIterator(metas, dranges, mint, maxt) + + ms.reset() + for it.Next() { + t, v := it.At() + ok, _ := ms.append(t, v) + if !ok { + level.Warn(h.logger).Log("msg", "failed to add sample during delete") + } + } + return nil } @@ -926,7 +982,7 @@ func (h *Head) gc() { // Tombstones returns a new reader over the head's tombstones func (h *Head) Tombstones() (TombstoneReader, error) { - return h.tombstones, nil + return emptyTombstoneReader, nil } // Index returns an IndexReader against the block. @@ -1406,6 +1462,16 @@ type memSeries struct { app chunkenc.Appender // Current appender for the chunk. } +func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { + s := &memSeries{ + lset: lset, + ref: id, + chunkRange: chunkRange, + nextAt: math.MinInt64, + } + return s +} + func (s *memSeries) minTime() int64 { if len(s.chunks) == 0 { return math.MinInt64 @@ -1442,14 +1508,24 @@ func (s *memSeries) cut(mint int64) *memChunk { return c } -func newMemSeries(lset labels.Labels, id uint64, chunkRange int64) *memSeries { - s := &memSeries{ - lset: lset, - ref: id, - chunkRange: chunkRange, - nextAt: math.MinInt64, +func (s *memSeries) chunksMetas() []chunks.Meta { + metas := make([]chunks.Meta, 0, len(s.chunks)) + for _, chk := range s.chunks { + metas = append(metas, chunks.Meta{Chunk: chk.chunk, MinTime: chk.minTime, MaxTime: chk.maxTime}) } - return s + return metas +} + +// reset re-initialises all the variable in the memSeries except 'lset', 'ref', +// and 'chunkRange', like how it would appear after 'newMemSeries(...)'. +func (s *memSeries) reset() { + s.chunks = nil + s.headChunk = nil + s.firstChunkID = 0 + s.nextAt = math.MinInt64 + s.sampleBuf = [4]sample{} + s.pendingCommit = false + s.app = nil } // appendable checks whether the given sample is valid for appending to the series. @@ -1628,11 +1704,6 @@ func (ss stringset) set(s string) { ss[s] = struct{}{} } -func (ss stringset) has(s string) bool { - _, ok := ss[s] - return ok -} - func (ss stringset) String() string { return strings.Join(ss.slice(), ",") } diff --git a/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go b/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go index 9104f1cb5..1ed130158 100644 --- a/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go +++ b/vendor/github.com/prometheus/tsdb/index/encoding_helpers.go @@ -33,12 +33,9 @@ func (e *encbuf) get() []byte { return e.b } func (e *encbuf) len() int { return len(e.b) } func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } -func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } -func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } -func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } @@ -142,10 +139,8 @@ func newDecbufUvarintAt(bs ByteSlice, off int) decbuf { return dec } -func (d *decbuf) uvarint() int { return int(d.uvarint64()) } -func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } -func (d *decbuf) be32int() int { return int(d.be32()) } -func (d *decbuf) be64int64() int64 { return int64(d.be64()) } +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } // crc32 returns a CRC32 checksum over the remaining bytes. func (d *decbuf) crc32() uint32 { @@ -196,7 +191,7 @@ func (d *decbuf) be64() uint64 { if d.e != nil { return 0 } - if len(d.b) < 4 { + if len(d.b) < 8 { d.e = errInvalidSize return 0 } @@ -218,31 +213,6 @@ func (d *decbuf) be32() uint32 { return x } -func (d *decbuf) byte() byte { - if d.e != nil { - return 0 - } - if len(d.b) < 1 { - d.e = errInvalidSize - return 0 - } - x := d.b[0] - d.b = d.b[1:] - return x -} - -func (d *decbuf) decbuf(l int) decbuf { - if d.e != nil { - return decbuf{e: d.e} - } - if l > len(d.b) { - return decbuf{e: errInvalidSize} - } - r := decbuf{b: d.b[:l]} - d.b = d.b[l:] - return r -} - func (d *decbuf) err() error { return d.e } func (d *decbuf) len() int { return len(d.b) } func (d *decbuf) get() []byte { return d.b } diff --git a/vendor/github.com/prometheus/tsdb/index/index.go b/vendor/github.com/prometheus/tsdb/index/index.go index 74e08d465..442e0255f 100644 --- a/vendor/github.com/prometheus/tsdb/index/index.go +++ b/vendor/github.com/prometheus/tsdb/index/index.go @@ -45,6 +45,8 @@ const ( FormatV2 = 2 labelNameSeperator = "\xff" + + indexFilename = "index" ) type indexWriterSeries struct { @@ -752,7 +754,7 @@ func ReadSymbols(bs ByteSlice, version int, off int) ([]string, map[uint32]strin symbolSlice []string symbols = map[uint32]string{} ) - if version == 2 { + if version == FormatV2 { symbolSlice = make([]string, 0, cnt) } diff --git a/vendor/github.com/prometheus/tsdb/index/postings.go b/vendor/github.com/prometheus/tsdb/index/postings.go index 13df1c69a..6212d07b4 100644 --- a/vendor/github.com/prometheus/tsdb/index/postings.go +++ b/vendor/github.com/prometheus/tsdb/index/postings.go @@ -305,9 +305,8 @@ func Intersect(its ...Postings) Postings { } type intersectPostings struct { - a, b Postings - aok, bok bool - cur uint64 + a, b Postings + cur uint64 } func newIntersectPostings(a, b Postings) *intersectPostings { diff --git a/vendor/github.com/prometheus/tsdb/querier.go b/vendor/github.com/prometheus/tsdb/querier.go index 4a5a40636..61503d672 100644 --- a/vendor/github.com/prometheus/tsdb/querier.go +++ b/vendor/github.com/prometheus/tsdb/querier.go @@ -111,7 +111,6 @@ func (q *querier) LabelValuesFor(string, labels.Label) ([]string, error) { func (q *querier) Select(ms ...labels.Matcher) (SeriesSet, error) { return q.sel(q.blocks, ms) - } func (q *querier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { @@ -143,6 +142,36 @@ func (q *querier) Close() error { return merr.Err() } +// verticalQuerier aggregates querying results from time blocks within +// a single partition. The block time ranges can be overlapping. +type verticalQuerier struct { + querier +} + +func (q *verticalQuerier) Select(ms ...labels.Matcher) (SeriesSet, error) { + return q.sel(q.blocks, ms) +} + +func (q *verticalQuerier) sel(qs []Querier, ms []labels.Matcher) (SeriesSet, error) { + if len(qs) == 0 { + return EmptySeriesSet(), nil + } + if len(qs) == 1 { + return qs[0].Select(ms...) + } + l := len(qs) / 2 + + a, err := q.sel(qs[:l], ms) + if err != nil { + return nil, err + } + b, err := q.sel(qs[l:], ms) + if err != nil { + return nil, err + } + return newMergedVerticalSeriesSet(a, b), nil +} + // NewBlockQuerier returns a querier against the reader. func NewBlockQuerier(b BlockReader, mint, maxt int64) (Querier, error) { indexr, err := b.Index() @@ -444,6 +473,72 @@ func (s *mergedSeriesSet) Next() bool { return true } +type mergedVerticalSeriesSet struct { + a, b SeriesSet + cur Series + adone, bdone bool +} + +// NewMergedVerticalSeriesSet takes two series sets as a single series set. +// The input series sets must be sorted and +// the time ranges of the series can be overlapping. +func NewMergedVerticalSeriesSet(a, b SeriesSet) SeriesSet { + return newMergedVerticalSeriesSet(a, b) +} + +func newMergedVerticalSeriesSet(a, b SeriesSet) *mergedVerticalSeriesSet { + s := &mergedVerticalSeriesSet{a: a, b: b} + // Initialize first elements of both sets as Next() needs + // one element look-ahead. + s.adone = !s.a.Next() + s.bdone = !s.b.Next() + + return s +} + +func (s *mergedVerticalSeriesSet) At() Series { + return s.cur +} + +func (s *mergedVerticalSeriesSet) Err() error { + if s.a.Err() != nil { + return s.a.Err() + } + return s.b.Err() +} + +func (s *mergedVerticalSeriesSet) compare() int { + if s.adone { + return 1 + } + if s.bdone { + return -1 + } + return labels.Compare(s.a.At().Labels(), s.b.At().Labels()) +} + +func (s *mergedVerticalSeriesSet) Next() bool { + if s.adone && s.bdone || s.Err() != nil { + return false + } + + d := s.compare() + + // Both sets contain the current series. Chain them into a single one. + if d > 0 { + s.cur = s.b.At() + s.bdone = !s.b.Next() + } else if d < 0 { + s.cur = s.a.At() + s.adone = !s.a.Next() + } else { + s.cur = &verticalChainedSeries{series: []Series{s.a.At(), s.b.At()}} + s.adone = !s.a.Next() + s.bdone = !s.b.Next() + } + return true +} + // ChunkSeriesSet exposes the chunks and intervals of a series instead of the // actual series itself. type ChunkSeriesSet interface { @@ -739,6 +834,100 @@ func (it *chainedSeriesIterator) Err() error { return it.cur.Err() } +// verticalChainedSeries implements a series for a list of time-sorted, time-overlapping series. +// They all must have the same labels. +type verticalChainedSeries struct { + series []Series +} + +func (s *verticalChainedSeries) Labels() labels.Labels { + return s.series[0].Labels() +} + +func (s *verticalChainedSeries) Iterator() SeriesIterator { + return newVerticalMergeSeriesIterator(s.series...) +} + +// verticalMergeSeriesIterator implements a series iterater over a list +// of time-sorted, time-overlapping iterators. +type verticalMergeSeriesIterator struct { + a, b SeriesIterator + aok, bok, initialized bool + + curT int64 + curV float64 +} + +func newVerticalMergeSeriesIterator(s ...Series) SeriesIterator { + if len(s) == 1 { + return s[0].Iterator() + } else if len(s) == 2 { + return &verticalMergeSeriesIterator{ + a: s[0].Iterator(), + b: s[1].Iterator(), + } + } + return &verticalMergeSeriesIterator{ + a: s[0].Iterator(), + b: newVerticalMergeSeriesIterator(s[1:]...), + } +} + +func (it *verticalMergeSeriesIterator) Seek(t int64) bool { + it.aok, it.bok = it.a.Seek(t), it.b.Seek(t) + it.initialized = true + return it.Next() +} + +func (it *verticalMergeSeriesIterator) Next() bool { + if !it.initialized { + it.aok = it.a.Next() + it.bok = it.b.Next() + it.initialized = true + } + + if !it.aok && !it.bok { + return false + } + + if !it.aok { + it.curT, it.curV = it.b.At() + it.bok = it.b.Next() + return true + } + if !it.bok { + it.curT, it.curV = it.a.At() + it.aok = it.a.Next() + return true + } + + acurT, acurV := it.a.At() + bcurT, bcurV := it.b.At() + if acurT < bcurT { + it.curT, it.curV = acurT, acurV + it.aok = it.a.Next() + } else if acurT > bcurT { + it.curT, it.curV = bcurT, bcurV + it.bok = it.b.Next() + } else { + it.curT, it.curV = bcurT, bcurV + it.aok = it.a.Next() + it.bok = it.b.Next() + } + return true +} + +func (it *verticalMergeSeriesIterator) At() (t int64, v float64) { + return it.curT, it.curV +} + +func (it *verticalMergeSeriesIterator) Err() error { + if it.a.Err() != nil { + return it.a.Err() + } + return it.b.Err() +} + // chunkSeriesIterator implements a series iterator on top // of a list of time-sorted, non-overlapping chunks. type chunkSeriesIterator struct { diff --git a/vendor/github.com/prometheus/tsdb/repair.go b/vendor/github.com/prometheus/tsdb/repair.go index 15f79d5f7..4aeffb554 100644 --- a/vendor/github.com/prometheus/tsdb/repair.go +++ b/vendor/github.com/prometheus/tsdb/repair.go @@ -64,7 +64,7 @@ func repairBadIndexVersion(logger log.Logger, dir string) error { if err != nil { return wrapErr(err, d) } - broken, err := os.Open(filepath.Join(d, "index")) + broken, err := os.Open(filepath.Join(d, indexFilename)) if err != nil { return wrapErr(err, d) } diff --git a/vendor/github.com/prometheus/tsdb/staticcheck.conf b/vendor/github.com/prometheus/tsdb/staticcheck.conf deleted file mode 100644 index 3266a2e29..000000000 --- a/vendor/github.com/prometheus/tsdb/staticcheck.conf +++ /dev/null @@ -1,2 +0,0 @@ -# Enable only "legacy" staticcheck verifications. -checks = [ "SA*" ] diff --git a/vendor/github.com/prometheus/tsdb/wal/live_reader.go b/vendor/github.com/prometheus/tsdb/wal/live_reader.go new file mode 100644 index 000000000..8394bfd08 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/wal/live_reader.go @@ -0,0 +1,284 @@ +// Copyright 2019 The Prometheus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "encoding/binary" + "fmt" + "hash/crc32" + "io" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + readerCorruptionErrors = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_reader_corruption_errors", + Help: "Errors encountered when reading the WAL.", + }, []string{"error"}) +) + +// NewLiveReader returns a new live reader. +func NewLiveReader(logger log.Logger, r io.Reader) *LiveReader { + return &LiveReader{ + logger: logger, + rdr: r, + + // Until we understand how they come about, make readers permissive + // to records spanning pages. + permissive: true, + } +} + +// LiveReader reads WAL records from an io.Reader. It allows reading of WALs +// that are still in the process of being written, and returns records as soon +// as they can be read. +type LiveReader struct { + logger log.Logger + rdr io.Reader + err error + rec []byte + hdr [recordHeaderSize]byte + buf [pageSize]byte + readIndex int // Index in buf to start at for next read. + writeIndex int // Index in buf to start at for next write. + total int64 // Total bytes processed during reading in calls to Next(). + index int // Used to track partial records, should be 0 at the start of every new record. + + // For testing, we can treat EOF as a non-error. + eofNonErr bool + + // We sometime see records span page boundaries. Should never happen, but it + // does. Until we track down why, set permissive to true to tolerate it. + // NB the non-ive Reader implementation allows for this. + permissive bool +} + +// Err returns any errors encountered reading the WAL. io.EOFs are not terminal +// and Next can be tried again. Non-EOFs are terminal, and the reader should +// not be used again. It is up to the user to decide when to stop trying should +// io.EOF be returned. +func (r *LiveReader) Err() error { + if r.eofNonErr && r.err == io.EOF { + return nil + } + return r.err +} + +// Offset returns the number of bytes consumed from this segment. +func (r *LiveReader) Offset() int64 { + return r.total +} + +func (r *LiveReader) fillBuffer() (int, error) { + n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)]) + r.writeIndex += n + return n, err +} + +// Next returns true if Record() will contain a full record. +// If Next returns false, you should always checked the contents of Error(). +// Return false guarantees there are no more records if the segment is closed +// and not corrupt, otherwise if Err() == io.EOF you should try again when more +// data has been written. +func (r *LiveReader) Next() bool { + for { + // If buildRecord returns a non-EOF error, its game up - the segment is + // corrupt. If buildRecord returns an EOF, we try and read more in + // fillBuffer later on. If that fails to read anything (n=0 && err=EOF), + // we return EOF and the user can try again later. If we have a full + // page, buildRecord is guaranteed to return a record or a non-EOF; it + // has checks the records fit in pages. + if ok, err := r.buildRecord(); ok { + return true + } else if err != nil && err != io.EOF { + r.err = err + return false + } + + // If we've filled the page and not found a record, this + // means records have started to span pages. Shouldn't happen + // but does and until we found out why, we need to deal with this. + if r.permissive && r.writeIndex == pageSize && r.readIndex > 0 { + copy(r.buf[:], r.buf[r.readIndex:]) + r.writeIndex -= r.readIndex + r.readIndex = 0 + continue + } + + if r.readIndex == pageSize { + r.writeIndex = 0 + r.readIndex = 0 + } + + if r.writeIndex != pageSize { + n, err := r.fillBuffer() + if n == 0 || (err != nil && err != io.EOF) { + r.err = err + return false + } + } + } +} + +// Record returns the current record. +// The returned byte slice is only valid until the next call to Next. +func (r *LiveReader) Record() []byte { + return r.rec +} + +// Rebuild a full record from potentially partial records. Returns false +// if there was an error or if we weren't able to read a record for any reason. +// Returns true if we read a full record. Any record data is appended to +// LiveReader.rec +func (r *LiveReader) buildRecord() (bool, error) { + for { + // Check that we have data in the internal buffer to read. + if r.writeIndex <= r.readIndex { + return false, nil + } + + // Attempt to read a record, partial or otherwise. + temp, n, err := r.readRecord() + if err != nil { + return false, err + } + + r.readIndex += n + r.total += int64(n) + if temp == nil { + return false, nil + } + + rt := recType(r.hdr[0]) + if rt == recFirst || rt == recFull { + r.rec = r.rec[:0] + } + r.rec = append(r.rec, temp...) + + if err := validateRecord(rt, r.index); err != nil { + r.index = 0 + return false, err + } + if rt == recLast || rt == recFull { + r.index = 0 + return true, nil + } + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + r.index++ + } +} + +// Returns an error if the recType and i indicate an invalid record sequence. +// As an example, if i is > 0 because we've read some amount of a partial record +// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull +// instead of a recLast or recMiddle we would have an invalid record. +func validateRecord(typ recType, i int) error { + switch typ { + case recFull: + if i != 0 { + return errors.New("unexpected full record") + } + return nil + case recFirst: + if i != 0 { + return errors.New("unexpected first record, dropping buffer") + } + return nil + case recMiddle: + if i == 0 { + return errors.New("unexpected middle record, dropping buffer") + } + return nil + case recLast: + if i == 0 { + return errors.New("unexpected last record, dropping buffer") + } + return nil + default: + return errors.Errorf("unexpected record type %d", typ) + } +} + +// Read a sub-record (see recType) from the buffer. It could potentially +// be a full record (recFull) if the record fits within the bounds of a single page. +// Returns a byte slice of the record data read, the number of bytes read, and an error +// if there's a non-zero byte in a page term record or the record checksum fails. +// This is a non-method function to make it clear it does not mutate the reader. +func (r *LiveReader) readRecord() ([]byte, int, error) { + // Special case: for recPageTerm, check that are all zeros to end of page, + // consume them but don't return them. + if r.buf[r.readIndex] == byte(recPageTerm) { + // End of page won't necessarily be end of buffer, as we may have + // got misaligned by records spanning page boundaries. + // r.total % pageSize is the offset into the current page + // that r.readIndex points to in buf. Therefore + // pageSize - (r.total % pageSize) is the amount left to read of + // the current page. + remaining := int(pageSize - (r.total % pageSize)) + if r.readIndex+remaining > r.writeIndex { + return nil, 0, io.EOF + } + + for i := r.readIndex; i < r.readIndex+remaining; i++ { + if r.buf[i] != 0 { + return nil, 0, errors.New("unexpected non-zero byte in page term bytes") + } + } + + return nil, remaining, nil + } + + // Not a recPageTerm; read the record and check the checksum. + if r.writeIndex-r.readIndex < recordHeaderSize { + return nil, 0, io.EOF + } + + copy(r.hdr[:], r.buf[r.readIndex:r.readIndex+recordHeaderSize]) + length := int(binary.BigEndian.Uint16(r.hdr[1:])) + crc := binary.BigEndian.Uint32(r.hdr[3:]) + if r.readIndex+recordHeaderSize+length > pageSize { + if !r.permissive { + return nil, 0, fmt.Errorf("record would overflow current page: %d > %d", r.readIndex+recordHeaderSize+length, pageSize) + } + readerCorruptionErrors.WithLabelValues("record_span_page").Inc() + level.Warn(r.logger).Log("msg", "record spans page boundaries", "start", r.readIndex, "end", recordHeaderSize+length, "pageSize", pageSize) + } + if recordHeaderSize+length > pageSize { + return nil, 0, fmt.Errorf("record length greater than a single page: %d > %d", recordHeaderSize+length, pageSize) + } + if r.readIndex+recordHeaderSize+length > r.writeIndex { + return nil, 0, io.EOF + } + + rec := r.buf[r.readIndex+recordHeaderSize : r.readIndex+recordHeaderSize+length] + if c := crc32.Checksum(rec, castagnoliTable); c != crc { + return nil, 0, errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + + return rec, length + recordHeaderSize, nil +} + +func min(i, j int) int { + if i < j { + return i + } + return j +} diff --git a/vendor/github.com/prometheus/tsdb/wal/reader.go b/vendor/github.com/prometheus/tsdb/wal/reader.go new file mode 100644 index 000000000..297463b00 --- /dev/null +++ b/vendor/github.com/prometheus/tsdb/wal/reader.go @@ -0,0 +1,183 @@ +// Copyright 2019 The Prometheus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package wal + +import ( + "encoding/binary" + "hash/crc32" + "io" + + "github.com/pkg/errors" +) + +// Reader reads WAL records from an io.Reader. +type Reader struct { + rdr io.Reader + err error + rec []byte + buf [pageSize]byte + total int64 // Total bytes processed. + curRecTyp recType // Used for checking that the last record is not torn. +} + +// NewReader returns a new reader. +func NewReader(r io.Reader) *Reader { + return &Reader{rdr: r} +} + +// Next advances the reader to the next records and returns true if it exists. +// It must not be called again after it returned false. +func (r *Reader) Next() bool { + err := r.next() + if errors.Cause(err) == io.EOF { + // The last WAL segment record shouldn't be torn(should be full or last). + // The last record would be torn after a crash just before + // the last record part could be persisted to disk. + if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle { + r.err = errors.New("last record is torn") + } + return false + } + r.err = err + return r.err == nil +} + +func (r *Reader) next() (err error) { + // We have to use r.buf since allocating byte arrays here fails escape + // analysis and ends up on the heap, even though it seemingly should not. + hdr := r.buf[:recordHeaderSize] + buf := r.buf[recordHeaderSize:] + + r.rec = r.rec[:0] + + i := 0 + for { + if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil { + return errors.Wrap(err, "read first header byte") + } + r.total++ + r.curRecTyp = recType(hdr[0]) + + // Gobble up zero bytes. + if r.curRecTyp == recPageTerm { + // recPageTerm is a single byte that indicates the rest of the page is padded. + // If it's the first byte in a page, buf is too small and + // needs to be resized to fit pageSize-1 bytes. + buf = r.buf[1:] + + // We are pedantic and check whether the zeros are actually up + // to a page boundary. + // It's not strictly necessary but may catch sketchy state early. + k := pageSize - (r.total % pageSize) + if k == pageSize { + continue // Initial 0 byte was last page byte. + } + n, err := io.ReadFull(r.rdr, buf[:k]) + if err != nil { + return errors.Wrap(err, "read remaining zeros") + } + r.total += int64(n) + + for _, c := range buf[:k] { + if c != 0 { + return errors.New("unexpected non-zero byte in padded page") + } + } + continue + } + n, err := io.ReadFull(r.rdr, hdr[1:]) + if err != nil { + return errors.Wrap(err, "read remaining header") + } + r.total += int64(n) + + var ( + length = binary.BigEndian.Uint16(hdr[1:]) + crc = binary.BigEndian.Uint32(hdr[3:]) + ) + + if length > pageSize-recordHeaderSize { + return errors.Errorf("invalid record size %d", length) + } + n, err = io.ReadFull(r.rdr, buf[:length]) + if err != nil { + return err + } + r.total += int64(n) + + if n != int(length) { + return errors.Errorf("invalid size: expected %d, got %d", length, n) + } + if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { + return errors.Errorf("unexpected checksum %x, expected %x", c, crc) + } + r.rec = append(r.rec, buf[:length]...) + + if err := validateRecord(r.curRecTyp, i); err != nil { + return err + } + if r.curRecTyp == recLast || r.curRecTyp == recFull { + return nil + } + + // Only increment i for non-zero records since we use it + // to determine valid content record sequences. + i++ + } +} + +// Err returns the last encountered error wrapped in a corruption error. +// If the reader does not allow to infer a segment index and offset, a total +// offset in the reader stream will be provided. +func (r *Reader) Err() error { + if r.err == nil { + return nil + } + if b, ok := r.rdr.(*segmentBufReader); ok { + return &CorruptionErr{ + Err: r.err, + Dir: b.segs[b.cur].Dir(), + Segment: b.segs[b.cur].Index(), + Offset: int64(b.off), + } + } + return &CorruptionErr{ + Err: r.err, + Segment: -1, + Offset: r.total, + } +} + +// Record returns the current record. The returned byte slice is only +// valid until the next call to Next. +func (r *Reader) Record() []byte { + return r.rec +} + +// Segment returns the current segment being read. +func (r *Reader) Segment() int { + if b, ok := r.rdr.(*segmentBufReader); ok { + return b.segs[b.cur].Index() + } + return -1 +} + +// Offset returns the current position of the segment being read. +func (r *Reader) Offset() int64 { + if b, ok := r.rdr.(*segmentBufReader); ok { + return int64(b.off) + } + return r.total +} diff --git a/vendor/github.com/prometheus/tsdb/wal/wal.go b/vendor/github.com/prometheus/tsdb/wal/wal.go index fd90eb90e..dd0be9cd8 100644 --- a/vendor/github.com/prometheus/tsdb/wal/wal.go +++ b/vendor/github.com/prometheus/tsdb/wal/wal.go @@ -296,7 +296,7 @@ func (w *WAL) Repair(origErr error) error { if err != nil { return errors.Wrap(err, "list segments") } - level.Warn(w.logger).Log("msg", "deleting all segments behind corruption", "segment", cerr.Segment) + level.Warn(w.logger).Log("msg", "deleting all segments newer than corrupted segment", "segment", cerr.Segment) for _, s := range segs { if w.segment.i == s.index { @@ -681,20 +681,20 @@ func NewSegmentsRangeReader(sr ...SegmentRange) (io.ReadCloser, error) { // segmentBufReader is a buffered reader that reads in multiples of pages. // The main purpose is that we are able to track segment and offset for -// corruption reporting. +// corruption reporting. We have to be careful not to increment curr too +// early, as it is used by Reader.Err() to tell Repair which segment is corrupt. +// As such we pad the end of non-page align segments with zeros. type segmentBufReader struct { buf *bufio.Reader segs []*Segment - cur int - off int - more bool + cur int // Index into segs. + off int // Offset of read data into current segment. } func newSegmentBufReader(segs ...*Segment) *segmentBufReader { return &segmentBufReader{ - buf: bufio.NewReaderSize(nil, 16*pageSize), + buf: bufio.NewReaderSize(segs[0], 16*pageSize), segs: segs, - cur: -1, } } @@ -707,411 +707,38 @@ func (r *segmentBufReader) Close() (err error) { return err } +// Read implements io.Reader. func (r *segmentBufReader) Read(b []byte) (n int, err error) { - if !r.more { - if r.cur+1 >= len(r.segs) { - return 0, io.EOF - } - r.cur++ - r.off = 0 - r.more = true - r.buf.Reset(r.segs[r.cur]) - } n, err = r.buf.Read(b) r.off += n - if err != io.EOF { + + // If we succeeded, or hit a non-EOF, we can stop. + if err == nil || err != io.EOF { return n, err } - // Just return what we read so far, but don't signal EOF. - // Only unset more so we don't invalidate the current segment and - // offset before the next read. - r.more = false + + // We hit EOF; fake out zero padding at the end of short segments, so we + // don't increment curr too early and report the wrong segment as corrupt. + if r.off%pageSize != 0 { + i := 0 + for ; n+i < len(b) && (r.off+i)%pageSize != 0; i++ { + b[n+i] = 0 + } + + // Return early, even if we didn't fill b. + r.off += i + return n + i, nil + } + + // There is no more deta left in the curr segment and there are no more + // segments left. Return EOF. + if r.cur+1 >= len(r.segs) { + return n, io.EOF + } + + // Move to next segment. + r.cur++ + r.off = 0 + r.buf.Reset(r.segs[r.cur]) return n, nil } - -// Reader reads WAL records from an io.Reader. -type Reader struct { - rdr io.Reader - err error - rec []byte - buf [pageSize]byte - total int64 // Total bytes processed. - curRecTyp recType // Used for checking that the last record is not torn. -} - -// NewReader returns a new reader. -func NewReader(r io.Reader) *Reader { - return &Reader{rdr: r} -} - -// Next advances the reader to the next records and returns true if it exists. -// It must not be called again after it returned false. -func (r *Reader) Next() bool { - err := r.next() - if errors.Cause(err) == io.EOF { - // The last WAL segment record shouldn't be torn(should be full or last). - // The last record would be torn after a crash just before - // the last record part could be persisted to disk. - if recType(r.curRecTyp) == recFirst || recType(r.curRecTyp) == recMiddle { - r.err = errors.New("last record is torn") - } - return false - } - r.err = err - return r.err == nil -} - -func (r *Reader) next() (err error) { - // We have to use r.buf since allocating byte arrays here fails escape - // analysis and ends up on the heap, even though it seemingly should not. - hdr := r.buf[:recordHeaderSize] - buf := r.buf[recordHeaderSize:] - - r.rec = r.rec[:0] - - i := 0 - for { - if _, err = io.ReadFull(r.rdr, hdr[:1]); err != nil { - return errors.Wrap(err, "read first header byte") - } - r.total++ - r.curRecTyp = recType(hdr[0]) - - // Gobble up zero bytes. - if r.curRecTyp == recPageTerm { - // recPageTerm is a single byte that indicates the rest of the page is padded. - // If it's the first byte in a page, buf is too small and - // needs to be resized to fit pageSize-1 bytes. - buf = r.buf[1:] - - // We are pedantic and check whether the zeros are actually up - // to a page boundary. - // It's not strictly necessary but may catch sketchy state early. - k := pageSize - (r.total % pageSize) - if k == pageSize { - continue // Initial 0 byte was last page byte. - } - n, err := io.ReadFull(r.rdr, buf[:k]) - if err != nil { - return errors.Wrap(err, "read remaining zeros") - } - r.total += int64(n) - - for _, c := range buf[:k] { - if c != 0 { - return errors.New("unexpected non-zero byte in padded page") - } - } - continue - } - n, err := io.ReadFull(r.rdr, hdr[1:]) - if err != nil { - return errors.Wrap(err, "read remaining header") - } - r.total += int64(n) - - var ( - length = binary.BigEndian.Uint16(hdr[1:]) - crc = binary.BigEndian.Uint32(hdr[3:]) - ) - - if length > pageSize-recordHeaderSize { - return errors.Errorf("invalid record size %d", length) - } - n, err = io.ReadFull(r.rdr, buf[:length]) - if err != nil { - return err - } - r.total += int64(n) - - if n != int(length) { - return errors.Errorf("invalid size: expected %d, got %d", length, n) - } - if c := crc32.Checksum(buf[:length], castagnoliTable); c != crc { - return errors.Errorf("unexpected checksum %x, expected %x", c, crc) - } - r.rec = append(r.rec, buf[:length]...) - - if err := validateRecord(r.curRecTyp, i); err != nil { - return err - } - if r.curRecTyp == recLast || r.curRecTyp == recFull { - return nil - } - - // Only increment i for non-zero records since we use it - // to determine valid content record sequences. - i++ - } -} - -// Err returns the last encountered error wrapped in a corruption error. -// If the reader does not allow to infer a segment index and offset, a total -// offset in the reader stream will be provided. -func (r *Reader) Err() error { - if r.err == nil { - return nil - } - if b, ok := r.rdr.(*segmentBufReader); ok { - return &CorruptionErr{ - Err: r.err, - Dir: b.segs[b.cur].Dir(), - Segment: b.segs[b.cur].Index(), - Offset: int64(b.off), - } - } - return &CorruptionErr{ - Err: r.err, - Segment: -1, - Offset: r.total, - } -} - -// Record returns the current record. The returned byte slice is only -// valid until the next call to Next. -func (r *Reader) Record() []byte { - return r.rec -} - -// Segment returns the current segment being read. -func (r *Reader) Segment() int { - if b, ok := r.rdr.(*segmentBufReader); ok { - return b.segs[b.cur].Index() - } - return -1 -} - -// Offset returns the current position of the segment being read. -func (r *Reader) Offset() int64 { - if b, ok := r.rdr.(*segmentBufReader); ok { - return int64(b.off) - } - return r.total -} - -// NewLiveReader returns a new live reader. -func NewLiveReader(r io.Reader) *LiveReader { - return &LiveReader{rdr: r} -} - -// Reader reads WAL records from an io.Reader. It buffers partial record data for -// the next read. -type LiveReader struct { - rdr io.Reader - err error - rec []byte - hdr [recordHeaderSize]byte - buf [pageSize]byte - readIndex int // Index in buf to start at for next read. - writeIndex int // Index in buf to start at for next write. - total int64 // Total bytes processed during reading in calls to Next(). - index int // Used to track partial records, should be 0 at the start of every new record. -} - -func (r *LiveReader) Err() error { - return r.err -} - -func (r *LiveReader) TotalRead() int64 { - return r.total -} - -func (r *LiveReader) fillBuffer() error { - n, err := r.rdr.Read(r.buf[r.writeIndex:len(r.buf)]) - r.writeIndex += n - return err -} - -// Shift the buffer up to the read index. -func (r *LiveReader) shiftBuffer() { - copied := copy(r.buf[0:], r.buf[r.readIndex:r.writeIndex]) - r.readIndex = 0 - r.writeIndex = copied -} - -// Next returns true if r.rec will contain a full record. -// False does not indicate that there will never be more data to -// read for the current io.Reader. -func (r *LiveReader) Next() bool { - for { - if r.buildRecord() { - return true - } - if r.err != nil && r.err != io.EOF { - return false - } - if r.readIndex == pageSize { - r.shiftBuffer() - } - if r.writeIndex != pageSize { - if err := r.fillBuffer(); err != nil { - // We expect to get EOF, since we're reading the segment file as it's being written. - if err != io.EOF { - r.err = err - } - return false - } - } - } -} - -// Record returns the current record. -// The returned byte slice is only valid until the next call to Next. -func (r *LiveReader) Record() []byte { - return r.rec -} - -// Rebuild a full record from potentially partial records. Returns false -// if there was an error or if we weren't able to read a record for any reason. -// Returns true if we read a full record. Any record data is appeneded to -// LiveReader.rec -func (r *LiveReader) buildRecord() bool { - for { - // Check that we have data in the internal buffer to read. - if r.writeIndex <= r.readIndex { - return false - } - - // Attempt to read a record, partial or otherwise. - temp, n, err := readRecord(r.buf[r.readIndex:r.writeIndex], r.hdr[:], r.total) - r.readIndex += n - r.total += int64(n) - if err != nil { - r.err = err - return false - } - - if temp == nil { - return false - } - - rt := recType(r.hdr[0]) - - if rt == recFirst || rt == recFull { - r.rec = r.rec[:0] - } - r.rec = append(r.rec, temp...) - - if err := validateRecord(rt, r.index); err != nil { - r.err = err - r.index = 0 - return false - } - if rt == recLast || rt == recFull { - r.index = 0 - return true - } - // Only increment i for non-zero records since we use it - // to determine valid content record sequences. - r.index++ - } -} - -// Returns an error if the recType and i indicate an invalid record sequence. -// As an example, if i is > 0 because we've read some amount of a partial record -// (recFirst, recMiddle, etc. but not recLast) and then we get another recFirst or recFull -// instead of a recLast or recMiddle we would have an invalid record. -func validateRecord(typ recType, i int) error { - switch typ { - case recFull: - if i != 0 { - return errors.New("unexpected full record") - } - return nil - case recFirst: - if i != 0 { - return errors.New("unexpected first record, dropping buffer") - } - return nil - case recMiddle: - if i == 0 { - return errors.New("unexpected middle record, dropping buffer") - } - return nil - case recLast: - if i == 0 { - return errors.New("unexpected last record, dropping buffer") - } - return nil - default: - return errors.Errorf("unexpected record type %d", typ) - } -} - -// Read a sub-record (see recType) from the buffer. It could potentially -// be a full record (recFull) if the record fits within the bounds of a single page. -// Returns a byte slice of the record data read, the number of bytes read, and an error -// if there's a non-zero byte in a page term record or the record checksum fails. -// TODO(callum) the EOF errors we're returning from this function should theoretically -// never happen, add a metric for them. -func readRecord(buf []byte, header []byte, total int64) ([]byte, int, error) { - readIndex := 0 - header[0] = buf[0] - readIndex++ - total++ - - // The rest of this function is mostly from Reader.Next(). - typ := recType(header[0]) - // Gobble up zero bytes. - if typ == recPageTerm { - // We are pedantic and check whether the zeros are actually up to a page boundary. - // It's not strictly necessary but may catch sketchy state early. - k := pageSize - (total % pageSize) - if k == pageSize { - return nil, 1, nil // Initial 0 byte was last page byte. - } - - if k <= int64(len(buf)-readIndex) { - for _, v := range buf[readIndex : int64(readIndex)+k] { - readIndex++ - if v != 0 { - return nil, readIndex, errors.New("unexpected non-zero byte in page term bytes") - } - } - return nil, readIndex, nil - } - // Not enough bytes to read the rest of the page term rec. - // This theoretically should never happen, since we're only shifting the - // internal buffer of the live reader when we read to the end of page. - // Treat this the same as an EOF, it's an error we would expect to see. - return nil, 0, io.EOF - } - - if readIndex+recordHeaderSize-1 > len(buf) { - // Treat this the same as an EOF, it's an error we would expect to see. - return nil, 0, io.EOF - } - - copy(header[1:], buf[readIndex:readIndex+len(header[1:])]) - readIndex += recordHeaderSize - 1 - total += int64(recordHeaderSize - 1) - var ( - length = binary.BigEndian.Uint16(header[1:]) - crc = binary.BigEndian.Uint32(header[3:]) - ) - readTo := int(length) + readIndex - if readTo > len(buf) { - if (readTo - readIndex) > pageSize { - return nil, 0, errors.Errorf("invalid record, record size would be larger than max page size: %d", int(length)) - } - // Not enough data to read all of the record data. - // Treat this the same as an EOF, it's an error we would expect to see. - return nil, 0, io.EOF - } - recData := buf[readIndex:readTo] - readIndex += int(length) - total += int64(length) - - // TODO(callum) what should we do here, throw out the record? We should add a metric at least. - if c := crc32.Checksum(recData, castagnoliTable); c != crc { - return recData, readIndex, errors.Errorf("unexpected checksum %x, expected %x", c, crc) - } - return recData, readIndex, nil -} - -func min(i, j int) int { - if i < j { - return i - } - return j -} diff --git a/vendor/modules.txt b/vendor/modules.txt index eb31b3b52..f9017254b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -225,6 +225,7 @@ github.com/prometheus/client_golang/api github.com/prometheus/client_golang/api/prometheus/v1 github.com/prometheus/client_golang/prometheus/promhttp github.com/prometheus/client_golang/prometheus/internal +github.com/prometheus/client_golang/prometheus/promauto # github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 github.com/prometheus/client_model/go # github.com/prometheus/common v0.0.0-20181119215939-b36ad289a3ea @@ -241,7 +242,7 @@ github.com/prometheus/procfs github.com/prometheus/procfs/nfs github.com/prometheus/procfs/xfs github.com/prometheus/procfs/internal/util -# github.com/prometheus/tsdb v0.4.0 +# github.com/prometheus/tsdb v0.4.1-0.20190219143357-77d5a7d47a52 github.com/prometheus/tsdb github.com/prometheus/tsdb/fileutil github.com/prometheus/tsdb/wal