Reuse the same appender for report and scrape (#7562)

Additionally, implement isolation in collectResultAppender.

Signed-off-by: Julien Pivotto <roidelapluie@inuits.eu>
This commit is contained in:
Julien Pivotto 2020-07-16 13:53:39 +02:00 committed by GitHub
parent e0067a7bd8
commit 754461b74f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 254 additions and 116 deletions

View File

@ -40,8 +40,10 @@ type sample struct {
// collectResultAppender records all samples that were added through the appender.
// It can be used as its zero value or be backed by another appender it writes samples through.
type collectResultAppender struct {
next storage.Appender
result []sample
next storage.Appender
result []sample
pendingResult []sample
rolledbackResult []sample
mapper map[uint64]labels.Labels
}
@ -55,7 +57,7 @@ func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error {
if err != nil {
return err
}
a.result = append(a.result, sample{
a.pendingResult = append(a.pendingResult, sample{
metric: a.mapper[ref],
t: t,
v: v,
@ -64,7 +66,7 @@ func (a *collectResultAppender) AddFast(ref uint64, t int64, v float64) error {
}
func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64, error) {
a.result = append(a.result, sample{
a.pendingResult = append(a.pendingResult, sample{
metric: m,
t: t,
v: v,
@ -85,5 +87,20 @@ func (a *collectResultAppender) Add(m labels.Labels, t int64, v float64) (uint64
return ref, nil
}
func (a *collectResultAppender) Commit() error { return nil }
func (a *collectResultAppender) Rollback() error { return nil }
func (a *collectResultAppender) Commit() error {
a.result = append(a.result, a.pendingResult...)
a.pendingResult = nil
if a.next == nil {
return nil
}
return a.next.Commit()
}
func (a *collectResultAppender) Rollback() error {
a.rolledbackResult = a.pendingResult
a.pendingResult = nil
if a.next == nil {
return nil
}
return a.next.Rollback()
}

View File

@ -932,61 +932,7 @@ mainLoop:
default:
}
var (
start = time.Now()
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
)
// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
buf := bytes.NewBuffer(b)
contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
cancel()
if scrapeErr == nil {
b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
} else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
errc <- scrapeErr
}
}
// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
total, added, seriesAdded, appErr := sl.append(b, contentType, start)
if appErr != nil {
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sl.append([]byte{}, "", start); err != nil {
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
}
sl.buffers.Put(b)
if scrapeErr == nil {
scrapeErr = appErr
}
if err := sl.report(start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
}
last = start
last = sl.scrapeAndReport(interval, timeout, last, errc)
select {
case <-sl.parentCtx.Done():
@ -1005,6 +951,83 @@ mainLoop:
}
}
// scrapeAndReport performs a scrape and then appends the result to the storage
// together with reporting metrics, by using as few appenders as possible.
// In the happy scenario, a single appender is used.
func (sl *scrapeLoop) scrapeAndReport(interval, timeout time.Duration, last time.Time, errc chan<- error) time.Time {
var (
start = time.Now()
scrapeCtx, cancel = context.WithTimeout(sl.ctx, timeout)
)
// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}
b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
buf := bytes.NewBuffer(b)
contentType, scrapeErr := sl.scraper.scrape(scrapeCtx, buf)
cancel()
if scrapeErr == nil {
b = buf.Bytes()
// NOTE: There were issues with misbehaving clients in the past
// that occasionally returned empty results. We don't want those
// to falsely reset our buffer size.
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
} else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr.Error())
if errc != nil {
errc <- scrapeErr
}
}
app := sl.appender()
var err error
defer func() {
if err != nil {
app.Rollback()
return
}
err = app.Commit()
if err != nil {
level.Error(sl.l).Log("msg", "Scrape commit failed", "err", err)
}
}()
// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
total, added, seriesAdded, appErr := sl.append(app, b, contentType, start)
if appErr != nil {
app.Rollback()
app = sl.appender()
level.Debug(sl.l).Log("msg", "Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", start); err != nil {
app.Rollback()
app = sl.appender()
level.Warn(sl.l).Log("msg", "Append failed", "err", err)
}
}
sl.buffers.Put(b)
if scrapeErr == nil {
scrapeErr = appErr
}
if err = sl.report(app, start, time.Since(start), total, added, seriesAdded, scrapeErr); err != nil {
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
}
return start
}
func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, interval time.Duration) {
// Scraping has stopped. We want to write stale markers but
// the target may be recreated, so we wait just over 2 scrape intervals
@ -1045,11 +1068,25 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// Call sl.append again with an empty scrape to trigger stale markers.
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
if _, _, _, err := sl.append([]byte{}, "", staleTime); err != nil {
level.Error(sl.l).Log("msg", "stale append failed", "err", err)
app := sl.appender()
var err error
defer func() {
if err != nil {
app.Rollback()
return
}
err = app.Commit()
if err != nil {
level.Warn(sl.l).Log("msg", "Stale commit failed", "err", err)
}
}()
if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil {
app.Rollback()
app = sl.appender()
level.Warn(sl.l).Log("msg", "Stale append failed", "err", err)
}
if err := sl.reportStale(staleTime); err != nil {
level.Error(sl.l).Log("msg", "stale report failed", "err", err)
if err = sl.reportStale(app, staleTime); err != nil {
level.Warn(sl.l).Log("msg", "Stale report failed", "err", err)
}
}
@ -1074,9 +1111,8 @@ type appendErrors struct {
numOutOfBounds int
}
func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
var (
app = sl.appender()
p = textparse.New(b, contentType)
defTime = timestamp.FromTime(ts)
appErrs = appendErrors{}
@ -1085,10 +1121,6 @@ func (sl *scrapeLoop) append(b []byte, contentType string, ts time.Time) (total,
defer func() {
if err != nil {
app.Rollback()
return
}
if err = app.Commit(); err != nil {
return
}
// Only perform cache cleaning if the scrape was not empty.
@ -1275,7 +1307,7 @@ const (
scrapeSeriesAddedMetricName = "scrape_series_added" + "\xff"
)
func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, added, seriesAdded int, scrapeErr error) (err error) {
func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration time.Duration, scraped, added, seriesAdded int, scrapeErr error) (err error) {
sl.scraper.Report(start, duration, scrapeErr)
ts := timestamp.FromTime(start)
@ -1284,14 +1316,6 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a
if scrapeErr == nil {
health = 1
}
app := sl.appender()
defer func() {
if err != nil {
app.Rollback()
return
}
err = app.Commit()
}()
if err = sl.addReportSample(app, scrapeHealthMetricName, ts, health); err != nil {
return
@ -1311,16 +1335,8 @@ func (sl *scrapeLoop) report(start time.Time, duration time.Duration, scraped, a
return
}
func (sl *scrapeLoop) reportStale(start time.Time) (err error) {
func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) {
ts := timestamp.FromTime(start)
app := sl.appender()
defer func() {
if err != nil {
app.Rollback()
return
}
err = app.Commit()
}()
stale := math.Float64frombits(value.StaleNaN)

View File

@ -612,7 +612,8 @@ func TestScrapeLoopMetadata(t *testing.T) {
)
defer cancel()
total, _, _, err := sl.append([]byte(`# TYPE test_metric counter
slApp := sl.appender()
total, _, _, err := sl.append(slApp, []byte(`# TYPE test_metric counter
# HELP test_metric some help text
# UNIT test_metric metric
test_metric 1
@ -620,6 +621,7 @@ test_metric 1
# HELP test_metric_no_type other help text
# EOF`), "application/openmetrics-text", time.Now())
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
testutil.Equals(t, 1, total)
md, ok := cache.GetMetadata("test_metric")
@ -661,13 +663,17 @@ func TestScrapeLoopSeriesAdded(t *testing.T) {
)
defer cancel()
total, added, seriesAdded, err := sl.append([]byte("test_metric 1\n"), "", time.Time{})
slApp := sl.appender()
total, added, seriesAdded, err := sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
testutil.Equals(t, 1, total)
testutil.Equals(t, 1, added)
testutil.Equals(t, 1, seriesAdded)
total, added, seriesAdded, err = sl.append([]byte("test_metric 1\n"), "", time.Time{})
slApp = sl.appender()
total, added, seriesAdded, err = sl.append(slApp, []byte("test_metric 1\n"), "", time.Time{})
testutil.Ok(t, slApp.Commit())
testutil.Ok(t, err)
testutil.Equals(t, 1, total)
testutil.Equals(t, 1, added)
@ -854,9 +860,7 @@ func TestScrapeLoopCache(t *testing.T) {
// 1 successfully scraped sample, 1 stale marker after first fail, 5 report samples for
// each scrape successful or not.
if len(appender.result) != 26 {
t.Fatalf("Appended samples not as expected. Wanted: %d samples Got: %d", 26, len(appender.result))
}
testutil.Equals(t, 26, len(appender.result), "Appended samples not as expected")
}
func TestScrapeLoopCacheMemoryExhaustionProtection(t *testing.T) {
@ -992,8 +996,10 @@ func TestScrapeLoopAppend(t *testing.T) {
now := time.Now()
_, _, _, err := sl.append([]byte(test.scrapeLabels), "", now)
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte(test.scrapeLabels), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
expected := []sample{
{
@ -1043,8 +1049,10 @@ func TestScrapeLoopAppendCacheEntryButErrNotFound(t *testing.T) {
sl.cache.addRef(mets, fakeRef, lset, hash)
now := time.Now()
_, _, _, err := sl.append([]byte(metric), "", now)
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte(metric), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
expected := []sample{
{
@ -1084,10 +1092,12 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
beforeMetricValue := beforeMetric.GetCounter().GetValue()
now := time.Now()
total, added, seriesAdded, err := sl.append([]byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
slApp := sl.appender()
total, added, seriesAdded, err := sl.append(app, []byte("metric_a 1\nmetric_b 1\nmetric_c 1\n"), "", now)
if err != errSampleLimit {
t.Fatalf("Did not see expected sample limit error: %s", err)
}
testutil.Ok(t, slApp.Rollback())
testutil.Equals(t, 3, total)
testutil.Equals(t, 3, added)
testutil.Equals(t, 1, seriesAdded)
@ -1110,13 +1120,15 @@ func TestScrapeLoopAppendSampleLimit(t *testing.T) {
v: 1,
},
}
testutil.Equals(t, want, resApp.result, "Appended samples not as expected")
testutil.Equals(t, want, resApp.rolledbackResult, "Appended samples not as expected")
now = time.Now()
total, added, seriesAdded, err = sl.append([]byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now)
slApp = sl.appender()
total, added, seriesAdded, err = sl.append(slApp, []byte("metric_a 1\nmetric_b 1\nmetric_c{deleteme=\"yes\"} 1\nmetric_d 1\nmetric_e 1\nmetric_f 1\nmetric_g 1\nmetric_h{deleteme=\"yes\"} 1\nmetric_i{deleteme=\"yes\"} 1\n"), "", now)
if err != errSampleLimit {
t.Fatalf("Did not see expected sample limit error: %s", err)
}
testutil.Ok(t, slApp.Rollback())
testutil.Equals(t, 9, total)
testutil.Equals(t, 6, added)
testutil.Equals(t, 0, seriesAdded)
@ -1144,11 +1156,15 @@ func TestScrapeLoop_ChangingMetricString(t *testing.T) {
)
now := time.Now()
_, _, _, err := sl.append([]byte(`metric_a{a="1",b="1"} 1`), "", now)
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1`), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
_, _, _, err = sl.append([]byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
slApp = sl.appender()
_, _, _, err = sl.append(slApp, []byte(`metric_a{b="1",a="1"} 2`), "", now.Add(time.Minute))
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
// DeepEqual will report NaNs as being different, so replace with a different value.
want := []sample{
@ -1180,11 +1196,15 @@ func TestScrapeLoopAppendStaleness(t *testing.T) {
)
now := time.Now()
_, _, _, err := sl.append([]byte("metric_a 1\n"), "", now)
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte("metric_a 1\n"), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
_, _, _, err = sl.append([]byte(""), "", now.Add(time.Second))
slApp = sl.appender()
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
ingestedNaN := math.Float64bits(app.result[1].v)
testutil.Equals(t, value.StaleNaN, ingestedNaN, "Appended stale sample wasn't as expected")
@ -1219,11 +1239,15 @@ func TestScrapeLoopAppendNoStalenessIfTimestamp(t *testing.T) {
)
now := time.Now()
_, _, _, err := sl.append([]byte("metric_a 1 1000\n"), "", now)
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte("metric_a 1 1000\n"), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
_, _, _, err = sl.append([]byte(""), "", now.Add(time.Second))
slApp = sl.appender()
_, _, _, err = sl.append(slApp, []byte(""), "", now.Add(time.Second))
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
want := []sample{
{
@ -1328,8 +1352,10 @@ func TestScrapeLoopAppendGracefullyIfAmendOrOutOfOrderOrOutOfBounds(t *testing.T
)
now := time.Unix(1, 0)
total, added, seriesAdded, err := sl.append([]byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
slApp := sl.appender()
total, added, seriesAdded, err := sl.append(slApp, []byte("out_of_order 1\namend 1\nnormal 1\nout_of_bounds 1\n"), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
want := []sample{
{
@ -1363,12 +1389,14 @@ func TestScrapeLoopOutOfBoundsTimeError(t *testing.T) {
)
now := time.Now().Add(20 * time.Minute)
total, added, seriesAdded, err := sl.append([]byte("normal 1\n"), "", now)
slApp := sl.appender()
total, added, seriesAdded, err := sl.append(slApp, []byte("normal 1\n"), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
testutil.Equals(t, 1, total)
testutil.Equals(t, 1, added)
testutil.Equals(t, 0, seriesAdded)
testutil.Ok(t, err)
}
func TestTargetScraperScrapeOK(t *testing.T) {
@ -1548,8 +1576,10 @@ func TestScrapeLoop_RespectTimestamps(t *testing.T) {
)
now := time.Now()
_, _, _, err := sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
want := []sample{
{
@ -1579,8 +1609,10 @@ func TestScrapeLoop_DiscardTimestamps(t *testing.T) {
)
now := time.Now()
_, _, _, err := sl.append([]byte(`metric_a{a="1",b="1"} 1 0`), "", now)
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte(`metric_a{a="1",b="1"} 1 0`), "", now)
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
want := []sample{
{
@ -1612,8 +1644,10 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
defer cancel()
// We add a good and a bad metric to check that both are discarded.
_, _, _, err := sl.append([]byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte("test_metric{le=\"500\"} 1\ntest_metric{le=\"600\",le=\"700\"} 1\n"), "", time.Time{})
testutil.NotOk(t, err)
testutil.Ok(t, slApp.Rollback())
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
@ -1622,8 +1656,10 @@ func TestScrapeLoopDiscardDuplicateLabels(t *testing.T) {
testutil.Ok(t, series.Err())
// We add a good metric to check that it is recorded.
_, _, _, err = sl.append([]byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
slApp = sl.appender()
_, _, _, err = sl.append(slApp, []byte("test_metric{le=\"500\"} 1\n"), "", time.Time{})
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
q, err = s.Querier(ctx, time.Time{}.UnixNano(), 0)
testutil.Ok(t, err)
@ -1657,8 +1693,10 @@ func TestScrapeLoopDiscardUnnamedMetrics(t *testing.T) {
)
defer cancel()
_, _, _, err := sl.append([]byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte("nok 1\nnok2{drop=\"drop\"} 1\n"), "", time.Time{})
testutil.NotOk(t, err)
testutil.Ok(t, slApp.Rollback())
testutil.Equals(t, errNameLabelMandatory, err)
q, err := s.Querier(ctx, time.Time{}.UnixNano(), 0)
@ -1873,8 +1911,10 @@ func TestScrapeAddFast(t *testing.T) {
)
defer cancel()
_, _, _, err := sl.append([]byte("up 1\n"), "", time.Time{})
slApp := sl.appender()
_, _, _, err := sl.append(slApp, []byte("up 1\n"), "", time.Time{})
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
// Poison the cache. There is just one entry, and one series in the
// storage. Changing the ref will create a 'not found' error.
@ -1882,8 +1922,10 @@ func TestScrapeAddFast(t *testing.T) {
v.ref++
}
_, _, _, err = sl.append([]byte("up 1\n"), "", time.Time{}.Add(time.Second))
slApp = sl.appender()
_, _, _, err = sl.append(slApp, []byte("up 1\n"), "", time.Time{}.Add(time.Second))
testutil.Ok(t, err)
testutil.Ok(t, slApp.Commit())
}
func TestReuseCacheRace(t *testing.T) {
@ -1928,3 +1970,66 @@ func TestCheckAddError(t *testing.T) {
sl.checkAddError(nil, nil, nil, storage.ErrOutOfOrderSample, nil, &appErrs)
testutil.Equals(t, 1, appErrs.numOutOfOrder)
}
func TestScrapeReportSingleAppender(t *testing.T) {
s := teststorage.New(t)
defer s.Close()
var (
signal = make(chan struct{}, 1)
scraper = &testScraper{}
)
ctx, cancel := context.WithCancel(context.Background())
sl := newScrapeLoop(ctx,
scraper,
nil, nil,
nopMutator,
nopMutator,
s.Appender,
nil,
0,
true,
)
numScrapes := 0
scraper.scrapeFunc = func(ctx context.Context, w io.Writer) error {
numScrapes++
if numScrapes%4 == 0 {
return fmt.Errorf("scrape failed")
}
w.Write([]byte("metric_a 44\nmetric_b 44\nmetric_c 44\nmetric_d 44\n"))
return nil
}
go func() {
sl.run(10*time.Millisecond, time.Hour, nil)
signal <- struct{}{}
}()
start := time.Now()
for time.Since(start) < 3*time.Second {
q, err := s.Querier(ctx, time.Time{}.UnixNano(), time.Now().UnixNano())
testutil.Ok(t, err)
series := q.Select(false, nil, labels.MustNewMatcher(labels.MatchRegexp, "__name__", ".+"))
c := 0
for series.Next() {
i := series.At().Iterator()
for i.Next() {
c++
}
}
testutil.Equals(t, 0, c%9, "Appended samples not as expected: %d", c)
q.Close()
}
cancel()
select {
case <-signal:
case <-time.After(5 * time.Second):
t.Fatalf("Scrape wasn't stopped.")
}
}