TSDB: Fix some edge cases when OOO is enabled (#14710)

Fix some edge cases when OOO is enabled

Signed-off-by: Vanshikav123 <vanshikav928@gmail.com>
Signed-off-by: Vanshika <102902652+Vanshikav123@users.noreply.github.com>
Signed-off-by: Jesus Vazquez <jesusvzpg@gmail.com>
Co-authored-by: Jesus Vazquez <jesusvzpg@gmail.com>
This commit is contained in:
Vanshika 2024-10-23 21:04:28 +05:30 committed by GitHub
parent 7c7116fea8
commit cccbe72514
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 388 additions and 10 deletions

View File

@ -3,8 +3,9 @@
## unreleased
* [CHANGE] Scraping: Remove implicit fallback to the Prometheus text format in case of invalid/missing Content-Type and fail the scrape instead. Add ability to specify a `fallback_scrape_protocol` in the scrape config. #15136
* [BUGFIX] PromQL: Fix stddev+stdvar aggregations to always ignore native histograms. #14941
* [BUGFIX] PromQL: Fix stddev+stdvar aggregations to treat Infinity consistently. #14941
* [ENHANCEMENT] Scraping, rules: handle targets reappearing, or rules moving group, when out-of-order is enabled. #14710
- [BUGFIX] PromQL: Fix stddev+stdvar aggregations to always ignore native histograms. #14941
- [BUGFIX] PromQL: Fix stddev+stdvar aggregations to treat Infinity consistently. #14941
## 3.0.0-beta.1 / 2024-10-09
@ -20,7 +21,6 @@
* [ENHANCEMENT] PromQL: Introduce exponential interpolation for native histograms. #14677
* [ENHANCEMENT] TSDB: Add support for ingestion of out-of-order native histogram samples. #14850, #14546
* [ENHANCEMENT] Alerts: remove metrics for removed Alertmanagers. #13909
* [ENHANCEMENT] Scraping: support Created-Timestamp feature on native histograms. #14694
* [ENHANCEMENT] Kubernetes SD: Support sidecar containers in endpoint discovery. #14929
* [ENHANCEMENT] Consul SD: Support catalog filters. #11224
* [PERF] TSDB: Parallelize deletion of postings after head compaction. #14975
@ -41,6 +41,10 @@ Release 3.0.0-beta.0 includes new features such as a brand new UI and UTF-8 supp
As is traditional with a beta release, we do **not** recommend users install 3.0.0-beta on critical production systems, but we do want everyone to test it out and find bugs.
<<<<<<< HEAD
<<<<<<< HEAD
=======
>>>>>>> b10c3696c (Revert "updated changelog")
* [CHANGE] UI: The old web UI has been replaced by a completely new one that is less cluttered and adds a few new features (PromLens-style tree view, better metrics explorer, "Explain" tab). However, it is still missing some features of the old UI (notably, exemplar display and heatmaps). To switch back to the old UI, you can use the feature flag `--enable-feature=old-ui` for the time being. #14872
* [CHANGE] PromQL: Range selectors and the lookback delta are now left-open, i.e. a sample coinciding with the lower time limit is excluded rather than included. #13904
* [CHANGE] Kubernetes SD: Remove support for `discovery.k8s.io/v1beta1` API version of EndpointSlice. This version is no longer served as of Kubernetes v1.25. #14365
@ -52,6 +56,7 @@ As is traditional with a beta release, we do **not** recommend users install 3.0
* [CHANGE] Remove deprecated `remote-write-receiver`,`promql-at-modifier`, and `promql-negative-offset` feature flags. #13456, #14526
* [CHANGE] Remove deprecated `storage.tsdb.allow-overlapping-blocks`, `alertmanager.timeout`, and `storage.tsdb.retention` flags. #14640, #14643
* [ENHANCEMENT] Move AM discovery page from "Monitoring status" to "Server status". #14875
<<<<<<< HEAD
* [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769
* [BUGFIX] Scrape: Do not override target parameter labels with config params. #11029
@ -85,6 +90,87 @@ As is traditional with a beta release, we do **not** recommend users install 3.0
* [BUGFIX] Remote-Write: Return 4xx not 5xx when timeseries has duplicate label. #14716
* [BUGFIX] Experimental Native Histograms: many fixes for incorrect results, panics, warnings. #14513, #14575, #14598, #14609, #14611, #14771, #14821
* [BUGFIX] TSDB: Only count unknown record types in `record_decode_failures_total` metric. #14042
=======
- [CHANGE] UI: The old web UI has been replaced by a completely new one that is less cluttered and adds a few new features (PromLens-style tree view, better metrics explorer, "Explain" tab). However, it is still missing some features of the old UI (notably, exemplar display and heatmaps). To switch back to the old UI, you can use the feature flag `--enable-feature=old-ui` for the time being. #14872
- [CHANGE] PromQL: Range selectors and the lookback delta are now left-open, i.e. a sample coinciding with the lower time limit is excluded rather than included. #13904
- [CHANGE] Kubernetes SD: Remove support for `discovery.k8s.io/v1beta1` API version of EndpointSlice. This version is no longer served as of Kubernetes v1.25. #14365
- [CHANGE] Kubernetes SD: Remove support for `networking.k8s.io/v1beta1` API version of Ingress. This version is no longer served as of Kubernetes v1.22. #14365
- [CHANGE] UTF-8: Enable UTF-8 support by default. Prometheus now allows all UTF-8 characters in metric and label names. The corresponding `utf8-name` feature flag has been removed. #14705
- [CHANGE] Console: Remove example files for the console feature. Users can continue using the console feature by supplying their own JavaScript and templates. #14807
- [CHANGE] SD: Enable the new service discovery manager by default. This SD manager does not restart unchanged discoveries upon reloading. This makes reloads faster and reduces pressure on service discoveries' sources. The corresponding `new-service-discovery-manager` feature flag has been removed. #14770
- [CHANGE] Agent mode has been promoted to stable. The feature flag `agent` has been removed. To run Prometheus in Agent mode, use the new `--agent` cmdline arg instead. #14747
- [CHANGE] Remove deprecated `remote-write-receiver`,`promql-at-modifier`, and `promql-negative-offset` feature flags. #13456, #14526
- [CHANGE] Remove deprecated `storage.tsdb.allow-overlapping-blocks`, `alertmanager.timeout`, and `storage.tsdb.retention` flags. #14640, #14643
- [ENHANCEMENT] Move AM discovery page from "Monitoring status" to "Server status". #14875
- [BUGFIX] Scrape: Do not override target parameter labels with config params. #11029
## 2.55.0-rc.0 / 2024-09-20
- [FEATURE] Support UTF-8 characters in label names - feature flag `utf8-names`. #14482, #14880, #14736, #14727
- [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769
- [FEATURE] Scraping: Add the ability to set custom `http_headers` in config. #14817
- [FEATURE] Scraping: Support feature flag `created-timestamp-zero-ingestion` in OpenMetrics. #14356, #14815
- [FEATURE] Scraping: `scrape_failure_log_file` option to log failures to a file. #14734
- [FEATURE] OTLP receiver: Optional promotion of resource attributes to series labels. #14200
- [FEATURE] Remote-Write: Support Google Cloud Monitoring authorization. #14346
- [FEATURE] Promtool: `tsdb create-blocks` new option to add labels. #14403
- [FEATURE] Promtool: `promtool test` adds `--junit` flag to format results. #14506
- [ENHANCEMENT] OTLP receiver: Warn on exponential histograms with zero count and non-zero sum. #14706
- [ENHANCEMENT] OTLP receiver: Interrupt translation on context cancellation/timeout. #14612
- [ENHANCEMENT] Remote Read client: Enable streaming remote read if the server supports it. #11379
- [ENHANCEMENT] Remote-Write: Don't reshard if we haven't successfully sent a sample since last update. #14450
- [ENHANCEMENT] PromQL: Delay deletion of `__name__` label to the end of the query evaluation. This is **experimental** and enabled under the feature-flag `promql-delayed-name-removal`. #14477
- [ENHANCEMENT] PromQL: Experimental `sort_by_label` and `sort_by_label_desc` sort by all labels when label is equal. #14655
- [ENHANCEMENT] PromQL: Clarify error message logged when Go runtime panic occurs during query evaluation. #14621
- [ENHANCEMENT] PromQL: Use Kahan summation for better accuracy in `avg` and `avg_over_time`. #14413
- [ENHANCEMENT] Tracing: Improve PromQL tracing, including showing the operation performed for aggregates, operators, and calls. #14816
- [ENHANCEMENT] API: Support multiple listening addresses. #14665
- [ENHANCEMENT] TSDB: Backward compatibility with upcoming index v3. #14934
- [PERF] TSDB: Query in-order and out-of-order series together. #14354, #14693, #14714, #14831, #14874, #14948
- [PERF] TSDB: Streamline reading of overlapping out-of-order head chunks. #14729
- [BUGFIX] SD: Fix dropping targets (with feature flag `new-service-discovery-manager`). #13147
- [BUGFIX] SD: Stop storing stale targets (with feature flag `new-service-discovery-manager`). #13622
- [BUGFIX] Scraping: exemplars could be dropped in protobuf scraping. #14810
- [BUGFIX] Remote-Write: fix metadata sending for experimental Remote-Write V2. #14766
- [BUGFIX] Remote-Write: Return 4xx not 5xx when timeseries has duplicate label. #14716
- [BUGFIX] Experimental Native Histograms: many fixes for incorrect results, panics, warnings. #14513, #14575, #14598, #14609, #14611, #14771, #14821
- [BUGFIX] TSDB: Only count unknown record types in `record_decode_failures_total` metric. #14042
>>>>>>> 58173ab1e (updated changelog)
=======
* [BUGFIX] Scrape: Do not override target parameter labels with config params. #11029
## 2.55.0-rc.0 / 2024-09-20
* [FEATURE] Support UTF-8 characters in label names - feature flag `utf8-names`. #14482, #14880, #14736, #14727
* [FEATURE] Support config reload automatically - feature flag `auto-reload-config`. #14769
* [FEATURE] Scraping: Add the ability to set custom `http_headers` in config. #14817
* [FEATURE] Scraping: Support feature flag `created-timestamp-zero-ingestion` in OpenMetrics. #14356, #14815
* [FEATURE] Scraping: `scrape_failure_log_file` option to log failures to a file. #14734
* [FEATURE] OTLP receiver: Optional promotion of resource attributes to series labels. #14200
* [FEATURE] Remote-Write: Support Google Cloud Monitoring authorization. #14346
* [FEATURE] Promtool: `tsdb create-blocks` new option to add labels. #14403
* [FEATURE] Promtool: `promtool test` adds `--junit` flag to format results. #14506
* [ENHANCEMENT] OTLP receiver: Warn on exponential histograms with zero count and non-zero sum. #14706
* [ENHANCEMENT] OTLP receiver: Interrupt translation on context cancellation/timeout. #14612
* [ENHANCEMENT] Remote Read client: Enable streaming remote read if the server supports it. #11379
* [ENHANCEMENT] Remote-Write: Don't reshard if we haven't successfully sent a sample since last update. #14450
* [ENHANCEMENT] PromQL: Delay deletion of `__name__` label to the end of the query evaluation. This is **experimental** and enabled under the feature-flag `promql-delayed-name-removal`. #14477
* [ENHANCEMENT] PromQL: Experimental `sort_by_label` and `sort_by_label_desc` sort by all labels when label is equal. #14655
* [ENHANCEMENT] PromQL: Clarify error message logged when Go runtime panic occurs during query evaluation. #14621
* [ENHANCEMENT] PromQL: Use Kahan summation for better accuracy in `avg` and `avg_over_time`. #14413
* [ENHANCEMENT] Tracing: Improve PromQL tracing, including showing the operation performed for aggregates, operators, and calls. #14816
* [ENHANCEMENT] API: Support multiple listening addresses. #14665
* [ENHANCEMENT] TSDB: Backward compatibility with upcoming index v3. #14934
* [PERF] TSDB: Query in-order and out-of-order series together. #14354, #14693, #14714, #14831, #14874, #14948
* [PERF] TSDB: Streamline reading of overlapping out-of-order head chunks. #14729
* [BUGFIX] SD: Fix dropping targets (with feature flag `new-service-discovery-manager`). #13147
* [BUGFIX] SD: Stop storing stale targets (with feature flag `new-service-discovery-manager`). #13622
* [BUGFIX] Scraping: exemplars could be dropped in protobuf scraping. #14810
* [BUGFIX] Remote-Write: fix metadata sending for experimental Remote-Write V2. #14766
* [BUGFIX] Remote-Write: Return 4xx not 5xx when timeseries has duplicate label. #14716
* [BUGFIX] Experimental Native Histograms: many fixes for incorrect results, panics, warnings. #14513, #14575, #14598, #14609, #14611, #14771, #14821
* [BUGFIX] TSDB: Only count unknown record types in `record_decode_failures_total` metric. #14042
>>>>>>> b10c3696c (Revert "updated changelog")
## 2.54.1 / 2024-08-27

View File

@ -1639,6 +1639,9 @@ func (s *readyStorage) Appender(ctx context.Context) storage.Appender {
type notReadyAppender struct{}
// SetOptions does nothing in this appender implementation.
func (n notReadyAppender) SetOptions(opts *storage.AppendOptions) {}
func (n notReadyAppender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
return 0, tsdb.ErrNotReady
}

View File

@ -0,0 +1,5 @@
groups:
- name: test_1
rules:
- record: test_2
expr: vector(2)

View File

@ -75,6 +75,7 @@ type Group struct {
// concurrencyController controls the rules evaluation concurrency.
concurrencyController RuleConcurrencyController
appOpts *storage.AppendOptions
}
// GroupEvalIterationFunc is used to implement and extend rule group
@ -145,6 +146,7 @@ func NewGroup(o GroupOptions) *Group {
metrics: metrics,
evalIterationFunc: evalIterationFunc,
concurrencyController: concurrencyController,
appOpts: &storage.AppendOptions{DiscardOutOfOrder: true},
}
}
@ -564,6 +566,7 @@ func (g *Group) Eval(ctx context.Context, ts time.Time) {
if s.H != nil {
_, err = app.AppendHistogram(0, s.Metric, s.T, nil, s.H)
} else {
app.SetOptions(g.appOpts)
_, err = app.Append(0, s.Metric, s.T, s.F)
}
@ -660,6 +663,7 @@ func (g *Group) cleanupStaleSeries(ctx context.Context, ts time.Time) {
return
}
app := g.opts.Appendable.Appender(ctx)
app.SetOptions(g.appOpts)
queryOffset := g.QueryOffset()
for _, s := range g.staleSeries {
// Rule that produced series no longer configured, mark it stale.

View File

@ -1195,6 +1195,53 @@ func countStaleNaN(t *testing.T, st storage.Storage) int {
return c
}
func TestRuleMovedBetweenGroups(t *testing.T) {
if testing.Short() {
t.Skip("skipping test in short mode.")
}
storage := teststorage.New(t, 600000)
defer storage.Close()
opts := promql.EngineOpts{
Logger: nil,
Reg: nil,
MaxSamples: 10,
Timeout: 10 * time.Second,
}
engine := promql.NewEngine(opts)
ruleManager := NewManager(&ManagerOptions{
Appendable: storage,
Queryable: storage,
QueryFunc: EngineQueryFunc(engine, storage),
Context: context.Background(),
Logger: promslog.NewNopLogger(),
})
var stopped bool
ruleManager.start()
defer func() {
if !stopped {
ruleManager.Stop()
}
}()
rule2 := "fixtures/rules2.yaml"
rule1 := "fixtures/rules1.yaml"
// Load initial configuration of rules2
require.NoError(t, ruleManager.Update(1*time.Second, []string{rule2}, labels.EmptyLabels(), "", nil))
// Wait for rule to be evaluated
time.Sleep(3 * time.Second)
// Reload configuration of rules1
require.NoError(t, ruleManager.Update(1*time.Second, []string{rule1}, labels.EmptyLabels(), "", nil))
// Wait for rule to be evaluated in new location and potential staleness marker
time.Sleep(3 * time.Second)
require.Equal(t, 0, countStaleNaN(t, storage)) // Not expecting any stale markers.
}
func TestGroupHasAlertingRules(t *testing.T) {
tests := []struct {
group *Group

View File

@ -43,6 +43,8 @@ func (a nopAppendable) Appender(_ context.Context) storage.Appender {
type nopAppender struct{}
func (a nopAppender) SetOptions(opts *storage.AppendOptions) {}
func (a nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) {
return 0, nil
}
@ -114,6 +116,8 @@ type collectResultAppender struct {
pendingMetadata []metadata.Metadata
}
func (a *collectResultAppender) SetOptions(opts *storage.AppendOptions) {}
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()

View File

@ -1864,7 +1864,9 @@ loop:
if err == nil {
sl.cache.forEachStale(func(lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
_, err = app.Append(0, lset, defTime, math.Float64frombits(value.StaleNaN))
app.SetOptions(nil)
switch {
case errors.Is(err, storage.ErrOutOfOrderSample), errors.Is(err, storage.ErrDuplicateSampleForTimestamp):
// Do not count these in logging, as this is expected if a target
@ -1970,7 +1972,7 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim
func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) {
ts := timestamp.FromTime(start)
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
stale := math.Float64frombits(value.StaleNaN)
b := labels.NewBuilder(labels.EmptyLabels())

View File

@ -86,6 +86,97 @@ func TestNewScrapePool(t *testing.T) {
require.NotNil(t, sp.newLoop, "newLoop function not initialized.")
}
func TestStorageHandlesOutOfOrderTimestamps(t *testing.T) {
// Test with default OutOfOrderTimeWindow (0)
t.Run("Out-Of-Order Sample Disabled", func(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
runScrapeLoopTest(t, s, false)
})
// Test with specific OutOfOrderTimeWindow (600000)
t.Run("Out-Of-Order Sample Enabled", func(t *testing.T) {
s := teststorage.New(t, 600000)
defer s.Close()
runScrapeLoopTest(t, s, true)
})
}
func runScrapeLoopTest(t *testing.T, s *teststorage.TestStorage, expectOutOfOrder bool) {
// Create an appender for adding samples to the storage.
app := s.Appender(context.Background())
capp := &collectResultAppender{next: app}
sl := newBasicScrapeLoop(t, context.Background(), nil, func(ctx context.Context) storage.Appender { return capp }, 0)
// Current time for generating timestamps.
now := time.Now()
// Calculate timestamps for the samples based on the current time.
now = now.Truncate(time.Minute) // round down the now timestamp to the nearest minute
timestampInorder1 := now
timestampOutOfOrder := now.Add(-5 * time.Minute)
timestampInorder2 := now.Add(5 * time.Minute)
slApp := sl.appender(context.Background())
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", timestampInorder1)
require.NoError(t, err)
_, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 2`), "", timestampOutOfOrder)
require.NoError(t, err)
_, _, _, err = sl.append(slApp, []byte(`metric_a{a="1",b="1"} 3`), "", timestampInorder2)
require.NoError(t, err)
require.NoError(t, slApp.Commit())
// Query the samples back from the storage.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := s.Querier(time.Time{}.UnixNano(), time.Now().UnixNano())
require.NoError(t, err)
defer q.Close()
// Use a matcher to filter the metric name.
series := q.Select(ctx, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "metric_a"))
var results []floatSample
for series.Next() {
it := series.At().Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
results = append(results, floatSample{
metric: series.At().Labels(),
t: t,
f: v,
})
}
require.NoError(t, it.Err())
}
require.NoError(t, series.Err())
// Define the expected results
want := []floatSample{
{
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: timestamp.FromTime(timestampInorder1),
f: 1,
},
{
metric: labels.FromStrings("__name__", "metric_a", "a", "1", "b", "1"),
t: timestamp.FromTime(timestampInorder2),
f: 3,
},
}
if expectOutOfOrder {
require.NotEqual(t, want, results, "Expected results to include out-of-order sample:\n%s", results)
} else {
require.Equal(t, want, results, "Appended samples not as expected:\n%s", results)
}
}
func TestDroppedTargetsList(t *testing.T) {
var (
app = &nopAppendable{}
@ -1157,6 +1248,87 @@ func BenchmarkScrapeLoopAppendOM(b *testing.B) {
}
}
func TestSetOptionsHandlingStaleness(t *testing.T) {
s := teststorage.New(t, 600000)
defer s.Close()
signal := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Function to run the scrape loop
runScrapeLoop := func(ctx context.Context, t *testing.T, cue int, action func(*scrapeLoop)) {
var (
scraper = &testScraper{}
app = func(ctx context.Context) storage.Appender {
return s.Appender(ctx)
}
)
sl := newBasicScrapeLoop(t, ctx, scraper, app, 10*time.Millisecond)
numScrapes := 0
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
if numScrapes == cue {
action(sl)
}
w.Write([]byte(fmt.Sprintf("metric_a{a=\"1\",b=\"1\"} %d\n", 42+numScrapes)))
return nil
}
sl.run(nil)
}
go func() {
runScrapeLoop(ctx, t, 2, func(sl *scrapeLoop) {
go sl.stop()
// Wait a bit then start a new target.
time.Sleep(100 * time.Millisecond)
go func() {
runScrapeLoop(ctx, t, 4, func(_ *scrapeLoop) {
cancel()
})
signal <- struct{}{}
}()
})
}()
select {
case <-signal:
case <-time.After(10 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
ctx1, cancel := context.WithCancel(context.Background())
defer cancel()
q, err := s.Querier(0, time.Now().UnixNano())
require.NoError(t, err)
defer q.Close()
series := q.Select(ctx1, false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", "metric_a"))
var results []floatSample
for series.Next() {
it := series.At().Iterator(nil)
for it.Next() == chunkenc.ValFloat {
t, v := it.At()
results = append(results, floatSample{
metric: series.At().Labels(),
t: t,
f: v,
})
}
require.NoError(t, it.Err())
}
require.NoError(t, series.Err())
var c int
for _, s := range results {
if value.IsStaleNaN(s.f) {
c++
}
}
require.Equal(t, 0, c, "invalid count of staleness markers after stopping the engine")
}
func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrape(t *testing.T) {
appender := &collectResultAppender{}
var (
@ -4032,7 +4204,6 @@ func TestScrapeLoopRunCreatesStaleMarkersOnFailedScrapeForTimestampedMetrics(t *
case <-time.After(5 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not.
require.Len(t, appender.resultFloats, 27, "Appended samples not as expected:\n%s", appender)

View File

@ -147,6 +147,16 @@ type fanoutAppender struct {
secondaries []Appender
}
// SetOptions propagates the hints to both primary and secondary appenders.
func (f *fanoutAppender) SetOptions(opts *AppendOptions) {
if f.primary != nil {
f.primary.SetOptions(opts)
}
for _, appender := range f.secondaries {
appender.SetOptions(opts)
}
}
func (f *fanoutAppender) Append(ref SeriesRef, l labels.Labels, t int64, v float64) (SeriesRef, error) {
ref, err := f.primary.Append(ref, l, t, v)
if err != nil {

View File

@ -243,6 +243,10 @@ func (f QueryableFunc) Querier(mint, maxt int64) (Querier, error) {
return f(mint, maxt)
}
type AppendOptions struct {
DiscardOutOfOrder bool
}
// Appender provides batched appends against a storage.
// It must be completed with a call to Commit or Rollback and must not be reused afterwards.
//
@ -271,6 +275,10 @@ type Appender interface {
// Appender has to be discarded after rollback.
Rollback() error
// SetOptions configures the appender with specific append options such as
// discarding out-of-order samples even if out-of-order is enabled in the TSDB.
SetOptions(opts *AppendOptions)
ExemplarAppender
HistogramAppender
MetadataUpdater

View File

@ -278,6 +278,7 @@ func (rws *WriteStorage) Close() error {
type timestampTracker struct {
writeStorage *WriteStorage
appendOptions *storage.AppendOptions
samples int64
exemplars int64
histograms int64
@ -285,6 +286,10 @@ type timestampTracker struct {
highestRecvTimestamp *maxTimestamp
}
func (t *timestampTracker) SetOptions(opts *storage.AppendOptions) {
t.appendOptions = opts
}
// Append implements storage.Appender.
func (t *timestampTracker) Append(_ storage.SeriesRef, _ labels.Labels, ts int64, _ float64) (storage.SeriesRef, error) {
t.samples++

View File

@ -833,6 +833,10 @@ func (m *mockAppendable) Appender(_ context.Context) storage.Appender {
return m
}
func (m *mockAppendable) SetOptions(opts *storage.AppendOptions) {
panic("unimplemented")
}
func (m *mockAppendable) Append(_ storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if m.appendSampleErr != nil {
return 0, m.appendSampleErr

View File

@ -763,6 +763,7 @@ func (db *DB) Close() error {
type appender struct {
*DB
hints *storage.AppendOptions
pendingSeries []record.RefSeries
pendingSamples []record.RefSample
@ -783,6 +784,10 @@ type appender struct {
floatHistogramSeries []*memSeries
}
func (a *appender) SetOptions(opts *storage.AppendOptions) {
a.hints = opts
}
func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
// series references and chunk references are identical for agent mode.
headRef := chunks.HeadSeriesRef(ref)

View File

@ -40,6 +40,12 @@ type initAppender struct {
var _ storage.GetRef = &initAppender{}
func (a *initAppender) SetOptions(opts *storage.AppendOptions) {
if a.app != nil {
a.app.SetOptions(opts)
}
}
func (a *initAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if a.app != nil {
return a.app.Append(ref, lset, t, v)
@ -326,6 +332,11 @@ type headAppender struct {
appendID, cleanupAppendIDsBelow uint64
closed bool
hints *storage.AppendOptions
}
func (a *headAppender) SetOptions(opts *storage.AppendOptions) {
a.hints = opts
}
func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
@ -359,13 +370,18 @@ func (a *headAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64
}
s.Lock()
defer s.Unlock()
// TODO(codesome): If we definitely know at this point that the sample is ooo, then optimise
// to skip that sample from the WAL and write only in the WBL.
_, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
isOOO, delta, err := s.appendable(t, v, a.headMaxt, a.minValidTime, a.oooTimeWindow)
if err == nil {
if isOOO && a.hints != nil && a.hints.DiscardOutOfOrder {
a.head.metrics.outOfOrderSamples.WithLabelValues(sampleMetricTypeFloat).Inc()
return 0, storage.ErrOutOfOrderSample
}
s.pendingCommit = true
}
s.Unlock()
if delta > 0 {
a.head.metrics.oooHistogram.Observe(float64(delta) / 1000)
}

View File

@ -30,15 +30,15 @@ import (
// New returns a new TestStorage for testing purposes
// that removes all associated files on closing.
func New(t testutil.T) *TestStorage {
stor, err := NewWithError()
func New(t testutil.T, outOfOrderTimeWindow ...int64) *TestStorage {
stor, err := NewWithError(outOfOrderTimeWindow...)
require.NoError(t, err)
return stor
}
// NewWithError returns a new TestStorage for user facing tests, which reports
// errors directly.
func NewWithError() (*TestStorage, error) {
func NewWithError(outOfOrderTimeWindow ...int64) (*TestStorage, error) {
dir, err := os.MkdirTemp("", "test_storage")
if err != nil {
return nil, fmt.Errorf("opening test directory: %w", err)
@ -51,6 +51,14 @@ func NewWithError() (*TestStorage, error) {
opts.MaxBlockDuration = int64(24 * time.Hour / time.Millisecond)
opts.RetentionDuration = 0
opts.EnableNativeHistograms = true
// Set OutOfOrderTimeWindow if provided, otherwise use default (0)
if len(outOfOrderTimeWindow) > 0 {
opts.OutOfOrderTimeWindow = outOfOrderTimeWindow[0]
} else {
opts.OutOfOrderTimeWindow = 0 // Default value is zero
}
db, err := tsdb.Open(dir, nil, nil, opts, tsdb.NewDBStats())
if err != nil {
return nil, fmt.Errorf("opening test storage: %w", err)