diff --git a/promql/engine_test.go b/promql/engine_test.go index e4171eb5b..19bd78144 100644 --- a/promql/engine_test.go +++ b/promql/engine_test.go @@ -17,6 +17,7 @@ import ( "context" "errors" "fmt" + "math" "sort" "strconv" "strings" @@ -29,11 +30,13 @@ import ( "github.com/prometheus/prometheus/model/histogram" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/timestamp" + "github.com/prometheus/prometheus/model/value" "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/promql/parser/posrange" "github.com/prometheus/prometheus/promql/promqltest" "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" "github.com/prometheus/prometheus/util/stats" "github.com/prometheus/prometheus/util/teststorage" @@ -3781,3 +3784,115 @@ func TestRateAnnotations(t *testing.T) { }) } } + +func TestHistogramRateWithFloatStaleness(t *testing.T) { + // Make a chunk with two normal histograms of the same value. + h1 := histogram.Histogram{ + Schema: 2, + Count: 10, + Sum: 100, + PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}}, + PositiveBuckets: []int64{100}, + } + + c1 := chunkenc.NewHistogramChunk() + app, err := c1.Appender() + require.NoError(t, err) + var ( + newc chunkenc.Chunk + recoded bool + ) + + newc, recoded, app, err = app.AppendHistogram(nil, 0, h1.Copy(), false) + require.NoError(t, err) + require.False(t, recoded) + require.Nil(t, newc) + + newc, recoded, _, err = app.AppendHistogram(nil, 10, h1.Copy(), false) + require.NoError(t, err) + require.False(t, recoded) + require.Nil(t, newc) + + // Make a chunk with a single float stale marker. + c2 := chunkenc.NewXORChunk() + app, err = c2.Appender() + require.NoError(t, err) + + app.Append(20, math.Float64frombits(value.StaleNaN)) + + // Make a chunk with two normal histograms that have zero value. + h2 := histogram.Histogram{ + Schema: 2, + } + + c3 := chunkenc.NewHistogramChunk() + app, err = c3.Appender() + require.NoError(t, err) + + newc, recoded, app, err = app.AppendHistogram(nil, 30, h2.Copy(), false) + require.NoError(t, err) + require.False(t, recoded) + require.Nil(t, newc) + + newc, recoded, _, err = app.AppendHistogram(nil, 40, h2.Copy(), false) + require.NoError(t, err) + require.False(t, recoded) + require.Nil(t, newc) + + querier := storage.MockQuerier{ + SelectMockFunction: func(_ bool, _ *storage.SelectHints, _ ...*labels.Matcher) storage.SeriesSet { + return &singleSeriesSet{ + series: mockSeries{chunks: []chunkenc.Chunk{c1, c2, c3}, labelSet: []string{"__name__", "foo"}}, + } + }, + } + + queriable := storage.MockQueryable{MockQuerier: &querier} + + engine := promqltest.NewTestEngine(t, false, 0, promqltest.DefaultMaxSamplesPerQuery) + + q, err := engine.NewInstantQuery(context.Background(), &queriable, nil, "rate(foo[40s])", timestamp.Time(45)) + require.NoError(t, err) + defer q.Close() + + res := q.Exec(context.Background()) + require.NoError(t, res.Err) + + vec, err := res.Vector() + require.NoError(t, err) + + // Single sample result. + require.Len(t, vec, 1) + // The result is a histogram. + require.NotNil(t, vec[0].H) + // The result should be zero as the histogram has not increased, so the rate is zero. + require.Equal(t, 0.0, vec[0].H.Count) + require.Equal(t, 0.0, vec[0].H.Sum) +} + +type singleSeriesSet struct { + series storage.Series + consumed bool +} + +func (s *singleSeriesSet) Next() bool { c := s.consumed; s.consumed = true; return !c } +func (s singleSeriesSet) At() storage.Series { return s.series } +func (s singleSeriesSet) Err() error { return nil } +func (s singleSeriesSet) Warnings() annotations.Annotations { return nil } + +type mockSeries struct { + chunks []chunkenc.Chunk + labelSet []string +} + +func (s mockSeries) Labels() labels.Labels { + return labels.FromStrings(s.labelSet...) +} + +func (s mockSeries) Iterator(it chunkenc.Iterator) chunkenc.Iterator { + iterables := []chunkenc.Iterator{} + for _, c := range s.chunks { + iterables = append(iterables, c.Iterator(nil)) + } + return storage.ChainSampleIteratorFromIterators(it, iterables) +} diff --git a/storage/buffer_test.go b/storage/buffer_test.go index b5c6443ac..6e8e83db8 100644 --- a/storage/buffer_test.go +++ b/storage/buffer_test.go @@ -314,6 +314,56 @@ func TestBufferedSeriesIteratorMixedHistograms(t *testing.T) { require.Equal(t, histograms[1].ToFloat(nil), fh) } +func TestBufferedSeriesIteratorMixedFloatsAndHistograms(t *testing.T) { + histograms := tsdbutil.GenerateTestHistograms(5) + + it := NewBufferIterator(NewListSeriesIteratorWithCopy(samples{ + hSample{t: 1, h: histograms[0].Copy()}, + fSample{t: 2, f: 2}, + hSample{t: 3, h: histograms[1].Copy()}, + hSample{t: 4, h: histograms[2].Copy()}, + fhSample{t: 3, fh: histograms[3].ToFloat(nil)}, + fhSample{t: 4, fh: histograms[4].ToFloat(nil)}, + }), 6) + + require.Equal(t, chunkenc.ValNone, it.Seek(7)) + require.NoError(t, it.Err()) + + buf := it.Buffer() + + require.Equal(t, chunkenc.ValHistogram, buf.Next()) + _, h0 := buf.AtHistogram() + require.Equal(t, histograms[0], h0) + + require.Equal(t, chunkenc.ValFloat, buf.Next()) + _, v := buf.At() + require.Equal(t, 2.0, v) + + require.Equal(t, chunkenc.ValHistogram, buf.Next()) + _, h1 := buf.AtHistogram() + require.Equal(t, histograms[1], h1) + + require.Equal(t, chunkenc.ValHistogram, buf.Next()) + _, h2 := buf.AtHistogram() + require.Equal(t, histograms[2], h2) + + require.Equal(t, chunkenc.ValFloatHistogram, buf.Next()) + _, h3 := buf.AtFloatHistogram(nil) + require.Equal(t, histograms[3].ToFloat(nil), h3) + + require.Equal(t, chunkenc.ValFloatHistogram, buf.Next()) + _, h4 := buf.AtFloatHistogram(nil) + require.Equal(t, histograms[4].ToFloat(nil), h4) + + // Test for overwrite bug where the buffered histogram was reused + // between items in the buffer. + require.Equal(t, histograms[0], h0) + require.Equal(t, histograms[1], h1) + require.Equal(t, histograms[2], h2) + require.Equal(t, histograms[3].ToFloat(nil), h3) + require.Equal(t, histograms[4].ToFloat(nil), h4) +} + func BenchmarkBufferedSeriesIterator(b *testing.B) { // Simulate a 5 minute rate. it := NewBufferIterator(newFakeSeriesIterator(int64(b.N), 30), 5*60) diff --git a/storage/series.go b/storage/series.go index 70e3d0a19..a3dbec708 100644 --- a/storage/series.go +++ b/storage/series.go @@ -171,6 +171,34 @@ func (it *listSeriesIterator) Seek(t int64) chunkenc.ValueType { func (it *listSeriesIterator) Err() error { return nil } +type listSeriesIteratorWithCopy struct { + *listSeriesIterator +} + +func NewListSeriesIteratorWithCopy(samples Samples) chunkenc.Iterator { + return &listSeriesIteratorWithCopy{ + listSeriesIterator: &listSeriesIterator{samples: samples, idx: -1}, + } +} + +func (it *listSeriesIteratorWithCopy) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) { + t, ih := it.listSeriesIterator.AtHistogram(nil) + if h == nil || ih == nil { + return t, ih + } + ih.CopyTo(h) + return t, h +} + +func (it *listSeriesIteratorWithCopy) AtFloatHistogram(fh *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) { + t, ih := it.listSeriesIterator.AtFloatHistogram(nil) + if fh == nil || ih == nil { + return t, ih + } + ih.CopyTo(fh) + return t, fh +} + type listChunkSeriesIterator struct { chks []chunks.Meta idx int