2018-05-16 08:02:55 -04:00
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"bufio"
"encoding/binary"
"fmt"
"hash/crc32"
"io"
2020-04-06 07:34:20 -06:00
"io/ioutil"
2018-05-16 08:02:55 -04:00
"os"
"path/filepath"
2018-06-18 07:52:57 -04:00
"sort"
2018-05-16 08:02:55 -04:00
"strconv"
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
2019-06-19 07:46:24 -06:00
"github.com/golang/snappy"
2018-05-16 08:02:55 -04:00
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
2020-10-22 11:00:08 +02:00
2019-08-13 14:04:14 +05:30
"github.com/prometheus/prometheus/tsdb/fileutil"
2018-05-16 08:02:55 -04:00
)
const (
2018-12-18 19:56:51 +01:00
DefaultSegmentSize = 128 * 1024 * 1024 // 128 MB
2018-05-16 08:02:55 -04:00
pageSize = 32 * 1024 // 32KB
recordHeaderSize = 7
)
// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
var castagnoliTable = crc32 . MakeTable ( crc32 . Castagnoli )
2018-11-08 10:27:16 +02:00
// page is an in memory buffer used to batch disk writes.
// Records bigger than the page size are split and flushed separately.
// A flush is triggered when a single records doesn't fit the page size or
// when the next record can't fit in the remaining free page space.
2018-05-16 08:02:55 -04:00
type page struct {
alloc int
flushed int
buf [ pageSize ] byte
}
func ( p * page ) remaining ( ) int {
return pageSize - p . alloc
}
func ( p * page ) full ( ) bool {
return pageSize - p . alloc < recordHeaderSize
}
2019-08-30 13:08:36 +04:30
func ( p * page ) reset ( ) {
for i := range p . buf {
p . buf [ i ] = 0
}
p . alloc = 0
p . flushed = 0
}
2020-10-29 11:37:03 +01:00
// SegmentFile represents the underlying file used to store a segment.
type SegmentFile interface {
Stat ( ) ( os . FileInfo , error )
Sync ( ) error
io . Writer
io . Reader
io . Closer
}
2018-05-17 09:00:32 -04:00
// Segment represents a segment file.
type Segment struct {
2020-10-29 11:37:03 +01:00
SegmentFile
2018-05-17 09:00:32 -04:00
dir string
i int
}
// Index returns the index of the segment.
func ( s * Segment ) Index ( ) int {
return s . i
}
// Dir returns the directory of the segment.
func ( s * Segment ) Dir ( ) string {
return s . dir
}
// CorruptionErr is an error that's returned when corruption is encountered.
type CorruptionErr struct {
2018-11-30 16:46:16 +02:00
Dir string
2018-05-17 09:00:32 -04:00
Segment int
2018-08-07 06:52:16 -04:00
Offset int64
2018-05-17 09:00:32 -04:00
Err error
}
func ( e * CorruptionErr ) Error ( ) string {
if e . Segment < 0 {
return fmt . Sprintf ( "corruption after %d bytes: %s" , e . Offset , e . Err )
}
2018-11-30 16:46:16 +02:00
return fmt . Sprintf ( "corruption in segment %s at %d: %s" , SegmentName ( e . Dir , e . Segment ) , e . Offset , e . Err )
2018-05-17 09:00:32 -04:00
}
// OpenWriteSegment opens segment k in dir. The returned segment is ready for new appends.
2018-11-14 18:43:33 +02:00
func OpenWriteSegment ( logger log . Logger , dir string , k int ) ( * Segment , error ) {
segName := SegmentName ( dir , k )
f , err := os . OpenFile ( segName , os . O_WRONLY | os . O_APPEND , 0666 )
2018-05-17 09:00:32 -04:00
if err != nil {
return nil , err
}
stat , err := f . Stat ( )
if err != nil {
f . Close ( )
return nil , err
}
// If the last page is torn, fill it with zeros.
// In case it was torn after all records were written successfully, this
// will just pad the page and everything will be fine.
// If it was torn mid-record, a full read (which the caller should do anyway
// to ensure integrity) will detect it as a corruption by the end.
if d := stat . Size ( ) % pageSize ; d != 0 {
2020-04-11 08:22:18 +00:00
level . Warn ( logger ) . Log ( "msg" , "Last page of the wal is torn, filling it with zeros" , "segment" , segName )
2018-05-17 09:00:32 -04:00
if _ , err := f . Write ( make ( [ ] byte , pageSize - d ) ) ; err != nil {
f . Close ( )
return nil , errors . Wrap ( err , "zero-pad torn page" )
}
}
2020-10-29 11:37:03 +01:00
return & Segment { SegmentFile : f , i : k , dir : dir } , nil
2018-05-17 09:00:32 -04:00
}
// CreateSegment creates a new segment k in dir.
func CreateSegment ( dir string , k int ) ( * Segment , error ) {
f , err := os . OpenFile ( SegmentName ( dir , k ) , os . O_WRONLY | os . O_CREATE | os . O_APPEND , 0666 )
if err != nil {
return nil , err
}
2020-10-29 11:37:03 +01:00
return & Segment { SegmentFile : f , i : k , dir : dir } , nil
2018-05-17 09:00:32 -04:00
}
2018-06-18 07:52:57 -04:00
// OpenReadSegment opens the segment with the given filename.
2018-05-17 09:00:32 -04:00
func OpenReadSegment ( fn string ) ( * Segment , error ) {
k , err := strconv . Atoi ( filepath . Base ( fn ) )
if err != nil {
return nil , errors . New ( "not a valid filename" )
}
f , err := os . Open ( fn )
if err != nil {
return nil , err
}
2020-10-29 11:37:03 +01:00
return & Segment { SegmentFile : f , i : k , dir : filepath . Dir ( fn ) } , nil
2018-05-17 09:00:32 -04:00
}
2018-05-16 08:02:55 -04:00
// WAL is a write ahead log that stores records in segment files.
2018-05-17 09:00:32 -04:00
// It must be read from start to end once before logging new data.
2018-07-20 02:26:12 -04:00
// If an error occurs during read, the repair procedure must be called
2018-05-17 09:00:32 -04:00
// before it's safe to do further writes.
//
2018-05-16 08:02:55 -04:00
// Segments are written to in pages of 32KB, with records possibly split
// across page boundaries.
// Records are never split across segments to allow full segments to be
2018-05-17 09:00:32 -04:00
// safely truncated. It also ensures that torn writes never corrupt records
// beyond the most recent segment.
2018-05-16 08:02:55 -04:00
type WAL struct {
dir string
logger log . Logger
segmentSize int
mtx sync . RWMutex
2019-04-30 10:17:07 +03:00
segment * Segment // Active segment.
donePages int // Pages written to the segment.
page * page // Active page.
2018-05-16 08:02:55 -04:00
stopc chan chan struct { }
actorc chan func ( )
2019-01-07 11:43:33 +03:00
closed bool // To allow calling Close() more than once without blocking.
2019-06-19 07:46:24 -06:00
compress bool
snappyBuf [ ] byte
2018-05-16 08:02:55 -04:00
2019-09-19 19:24:34 +08:00
metrics * walMetrics
}
type walMetrics struct {
2018-05-16 08:02:55 -04:00
fsyncDuration prometheus . Summary
pageFlushes prometheus . Counter
pageCompletions prometheus . Counter
2018-09-25 17:19:09 +05:30
truncateFail prometheus . Counter
2018-09-25 19:18:33 +05:30
truncateTotal prometheus . Counter
2019-05-17 01:47:42 -07:00
currentSegment prometheus . Gauge
2020-01-17 13:12:04 -07:00
writesFailed prometheus . Counter
2018-05-16 08:02:55 -04:00
}
2020-02-15 21:48:47 +08:00
func newWALMetrics ( r prometheus . Registerer ) * walMetrics {
2019-09-19 19:24:34 +08:00
m := & walMetrics { }
m . fsyncDuration = prometheus . NewSummary ( prometheus . SummaryOpts {
Name : "prometheus_tsdb_wal_fsync_duration_seconds" ,
Help : "Duration of WAL fsync." ,
Objectives : map [ float64 ] float64 { 0.5 : 0.05 , 0.9 : 0.01 , 0.99 : 0.001 } ,
} )
m . pageFlushes = prometheus . NewCounter ( prometheus . CounterOpts {
Name : "prometheus_tsdb_wal_page_flushes_total" ,
Help : "Total number of page flushes." ,
} )
m . pageCompletions = prometheus . NewCounter ( prometheus . CounterOpts {
Name : "prometheus_tsdb_wal_completed_pages_total" ,
Help : "Total number of completed pages." ,
} )
m . truncateFail = prometheus . NewCounter ( prometheus . CounterOpts {
Name : "prometheus_tsdb_wal_truncations_failed_total" ,
Help : "Total number of WAL truncations that failed." ,
} )
m . truncateTotal = prometheus . NewCounter ( prometheus . CounterOpts {
Name : "prometheus_tsdb_wal_truncations_total" ,
Help : "Total number of WAL truncations attempted." ,
} )
m . currentSegment = prometheus . NewGauge ( prometheus . GaugeOpts {
Name : "prometheus_tsdb_wal_segment_current" ,
Help : "WAL segment index that TSDB is currently writing to." ,
} )
2020-01-17 13:12:04 -07:00
m . writesFailed = prometheus . NewCounter ( prometheus . CounterOpts {
Name : "prometheus_tsdb_wal_writes_failed_total" ,
Help : "Total number of WAL writes that failed." ,
} )
2019-09-19 19:24:34 +08:00
if r != nil {
r . MustRegister (
m . fsyncDuration ,
m . pageFlushes ,
m . pageCompletions ,
m . truncateFail ,
m . truncateTotal ,
m . currentSegment ,
2020-01-17 13:12:04 -07:00
m . writesFailed ,
2019-09-19 19:24:34 +08:00
)
}
return m
}
2018-05-16 08:02:55 -04:00
// New returns a new WAL over the given directory.
2019-06-19 07:46:24 -06:00
func New ( logger log . Logger , reg prometheus . Registerer , dir string , compress bool ) ( * WAL , error ) {
return NewSize ( logger , reg , dir , DefaultSegmentSize , compress )
2018-05-16 08:02:55 -04:00
}
2018-05-17 09:00:32 -04:00
// NewSize returns a new WAL over the given directory.
// New segments are created with the specified size.
2019-06-19 07:46:24 -06:00
func NewSize ( logger log . Logger , reg prometheus . Registerer , dir string , segmentSize int , compress bool ) ( * WAL , error ) {
2018-05-16 08:02:55 -04:00
if segmentSize % pageSize != 0 {
return nil , errors . New ( "invalid segment size" )
}
if err := os . MkdirAll ( dir , 0777 ) ; err != nil {
return nil , errors . Wrap ( err , "create dir" )
}
if logger == nil {
logger = log . NewNopLogger ( )
}
w := & WAL {
dir : dir ,
logger : logger ,
segmentSize : segmentSize ,
page : & page { } ,
actorc : make ( chan func ( ) , 100 ) ,
stopc : make ( chan chan struct { } ) ,
2019-06-19 07:46:24 -06:00
compress : compress ,
2018-05-16 08:02:55 -04:00
}
2020-02-15 21:48:47 +08:00
w . metrics = newWALMetrics ( reg )
2019-07-23 11:04:48 +03:00
2020-09-01 09:16:57 +00:00
_ , last , err := Segments ( w . Dir ( ) )
2019-07-23 11:04:48 +03:00
if err != nil {
return nil , errors . Wrap ( err , "get segment range" )
}
2019-08-22 21:12:59 +08:00
// Index of the Segment we want to open and write to.
writeSegmentIndex := 0
2019-07-23 11:04:48 +03:00
// If some segments already exist create one with a higher index than the last segment.
2019-08-22 21:12:59 +08:00
if last != - 1 {
writeSegmentIndex = last + 1
2019-07-23 11:04:48 +03:00
}
2020-09-01 09:16:57 +00:00
segment , err := CreateSegment ( w . Dir ( ) , writeSegmentIndex )
2019-07-23 11:04:48 +03:00
if err != nil {
return nil , err
}
if err := w . setSegment ( segment ) ; err != nil {
return nil , err
}
go w . run ( )
return w , nil
}
// Open an existing WAL.
2020-03-10 13:31:47 +08:00
func Open ( logger log . Logger , dir string ) ( * WAL , error ) {
2019-07-23 11:04:48 +03:00
if logger == nil {
logger = log . NewNopLogger ( )
}
w := & WAL {
dir : dir ,
logger : logger ,
}
return w , nil
}
2019-06-19 07:46:24 -06:00
// CompressionEnabled returns if compression is enabled on this WAL.
func ( w * WAL ) CompressionEnabled ( ) bool {
return w . compress
}
2018-05-16 08:02:55 -04:00
// Dir returns the directory of the WAL.
func ( w * WAL ) Dir ( ) string {
return w . dir
}
func ( w * WAL ) run ( ) {
2018-05-17 09:00:32 -04:00
Loop :
2018-05-16 08:02:55 -04:00
for {
select {
case f := <- w . actorc :
f ( )
2018-05-17 09:00:32 -04:00
case donec := <- w . stopc :
2018-06-18 07:52:57 -04:00
close ( w . actorc )
2018-05-17 09:00:32 -04:00
defer close ( donec )
break Loop
2018-05-16 08:02:55 -04:00
}
2018-05-17 09:00:32 -04:00
}
// Drain and process any remaining functions.
2018-06-18 07:52:57 -04:00
for f := range w . actorc {
f ( )
2018-05-16 08:02:55 -04:00
}
}
2018-05-17 09:00:32 -04:00
// Repair attempts to repair the WAL based on the error.
2018-06-18 07:52:57 -04:00
// It discards all data after the corruption.
2018-09-19 11:40:07 +05:30
func ( w * WAL ) Repair ( origErr error ) error {
2018-05-17 09:00:32 -04:00
// We could probably have a mode that only discards torn records right around
// the corruption to preserve as data much as possible.
// But that's not generally applicable if the records have any kind of causality.
// Maybe as an extra mode in the future if mid-WAL corruptions become
// a frequent concern.
2018-09-19 11:40:07 +05:30
err := errors . Cause ( origErr ) // So that we can pick up errors even if wrapped.
2018-05-17 09:00:32 -04:00
cerr , ok := err . ( * CorruptionErr )
if ! ok {
2018-09-19 11:40:07 +05:30
return errors . Wrap ( origErr , "cannot handle error" )
2018-05-17 09:00:32 -04:00
}
if cerr . Segment < 0 {
return errors . New ( "corruption error does not specify position" )
}
2020-04-11 08:22:18 +00:00
level . Warn ( w . logger ) . Log ( "msg" , "Starting corruption repair" ,
2018-05-17 09:00:32 -04:00
"segment" , cerr . Segment , "offset" , cerr . Offset )
// All segments behind the corruption can no longer be used.
2020-09-01 09:16:57 +00:00
segs , err := listSegments ( w . Dir ( ) )
2018-05-17 09:00:32 -04:00
if err != nil {
return errors . Wrap ( err , "list segments" )
}
2020-04-11 08:22:18 +00:00
level . Warn ( w . logger ) . Log ( "msg" , "Deleting all segments newer than corrupted segment" , "segment" , cerr . Segment )
2018-05-17 09:00:32 -04:00
for _ , s := range segs {
2018-10-11 18:23:52 +03:00
if w . segment . i == s . index {
2018-09-21 11:01:22 +05:30
// The active segment needs to be removed,
// close it first (Windows!). Can be closed safely
// as we set the current segment to repaired file
// below.
if err := w . segment . Close ( ) ; err != nil {
return errors . Wrap ( err , "close active segment" )
}
}
2018-12-13 16:29:29 +03:00
if s . index <= cerr . Segment {
continue
}
2020-09-01 09:16:57 +00:00
if err := os . Remove ( filepath . Join ( w . Dir ( ) , s . name ) ) ; err != nil {
2018-10-11 18:23:52 +03:00
return errors . Wrapf ( err , "delete segment:%v" , s . index )
2018-05-17 09:00:32 -04:00
}
}
// Regardless of the corruption offset, no record reaches into the previous segment.
// So we can safely repair the WAL by removing the segment and re-inserting all
// its records up to the corruption.
2020-04-11 08:22:18 +00:00
level . Warn ( w . logger ) . Log ( "msg" , "Rewrite corrupted segment" , "segment" , cerr . Segment )
2018-05-17 09:00:32 -04:00
2020-09-01 09:16:57 +00:00
fn := SegmentName ( w . Dir ( ) , cerr . Segment )
2018-05-17 09:00:32 -04:00
tmpfn := fn + ".repair"
if err := fileutil . Rename ( fn , tmpfn ) ; err != nil {
return err
}
// Create a clean segment and make it the active one.
2020-09-01 09:16:57 +00:00
s , err := CreateSegment ( w . Dir ( ) , cerr . Segment )
2018-05-17 09:00:32 -04:00
if err != nil {
return err
}
2019-02-25 02:10:27 -08:00
if err := w . setSegment ( s ) ; err != nil {
return err
}
2018-05-17 09:00:32 -04:00
f , err := os . Open ( tmpfn )
if err != nil {
return errors . Wrap ( err , "open segment" )
}
defer f . Close ( )
2018-09-21 11:01:22 +05:30
2018-05-17 09:00:32 -04:00
r := NewReader ( bufio . NewReader ( f ) )
for r . Next ( ) {
2018-11-30 13:37:04 +02:00
// Add records only up to the where the error was.
if r . Offset ( ) >= cerr . Offset {
break
}
2018-05-17 09:00:32 -04:00
if err := w . Log ( r . Record ( ) ) ; err != nil {
return errors . Wrap ( err , "insert record" )
}
}
2018-09-21 11:01:22 +05:30
// We expect an error here from r.Err(), so nothing to handle.
2018-05-17 09:00:32 -04:00
2019-05-24 11:33:28 -07:00
// We need to pad to the end of the last page in the repaired segment
2019-08-13 14:04:14 +05:30
if err := w . flushPage ( true ) ; err != nil {
return errors . Wrap ( err , "flush page in repair" )
}
2019-05-24 11:33:28 -07:00
2018-09-21 11:01:22 +05:30
// We explicitly close even when there is a defer for Windows to be
// able to delete it. The defer is in place to close it in-case there
// are errors above.
if err := f . Close ( ) ; err != nil {
return errors . Wrap ( err , "close corrupted file" )
}
2018-05-17 09:00:32 -04:00
if err := os . Remove ( tmpfn ) ; err != nil {
return errors . Wrap ( err , "delete corrupted segment" )
}
2019-05-24 11:33:28 -07:00
2019-08-28 21:43:02 +08:00
// Explicitly close the segment we just repaired to avoid issues with Windows.
2019-05-24 11:33:28 -07:00
s . Close ( )
// We always want to start writing to a new Segment rather than an existing
// Segment, which is handled by NewSize, but earlier in Repair we're deleting
// all segments that come after the corrupted Segment. Recreate a new Segment here.
2020-09-01 09:16:57 +00:00
s , err = CreateSegment ( w . Dir ( ) , cerr . Segment + 1 )
2019-05-24 11:33:28 -07:00
if err != nil {
return err
}
if err := w . setSegment ( s ) ; err != nil {
return err
}
2018-05-17 09:00:32 -04:00
return nil
}
2018-05-16 08:02:55 -04:00
// SegmentName builds a segment name for the directory.
func SegmentName ( dir string , i int ) string {
2018-06-18 07:52:57 -04:00
return filepath . Join ( dir , fmt . Sprintf ( "%08d" , i ) )
2018-05-16 08:02:55 -04:00
}
2019-06-07 11:35:02 +01:00
// NextSegment creates the next segment and closes the previous one.
func ( w * WAL ) NextSegment ( ) error {
w . mtx . Lock ( )
defer w . mtx . Unlock ( )
return w . nextSegment ( )
}
2018-05-16 08:02:55 -04:00
// nextSegment creates the next segment and closes the previous one.
func ( w * WAL ) nextSegment ( ) error {
2018-05-17 09:00:32 -04:00
// Only flush the current page if it actually holds data.
if w . page . alloc > 0 {
if err := w . flushPage ( true ) ; err != nil {
return err
}
2018-05-16 08:02:55 -04:00
}
2020-09-01 09:16:57 +00:00
next , err := CreateSegment ( w . Dir ( ) , w . segment . Index ( ) + 1 )
2018-05-16 08:02:55 -04:00
if err != nil {
return errors . Wrap ( err , "create new segment file" )
}
prev := w . segment
2019-02-25 02:10:27 -08:00
if err := w . setSegment ( next ) ; err != nil {
return err
}
2018-05-16 08:02:55 -04:00
2018-05-17 09:00:32 -04:00
// Don't block further writes by fsyncing the last segment.
2018-05-16 08:02:55 -04:00
w . actorc <- func ( ) {
if err := w . fsync ( prev ) ; err != nil {
level . Error ( w . logger ) . Log ( "msg" , "sync previous segment" , "err" , err )
}
if err := prev . Close ( ) ; err != nil {
level . Error ( w . logger ) . Log ( "msg" , "close previous segment" , "err" , err )
}
}
return nil
}
2019-02-25 02:10:27 -08:00
func ( w * WAL ) setSegment ( segment * Segment ) error {
w . segment = segment
// Correctly initialize donePages.
stat , err := segment . Stat ( )
if err != nil {
return err
}
w . donePages = int ( stat . Size ( ) / pageSize )
2019-09-19 19:24:34 +08:00
w . metrics . currentSegment . Set ( float64 ( segment . Index ( ) ) )
2019-02-25 02:10:27 -08:00
return nil
}
2018-05-16 08:02:55 -04:00
// flushPage writes the new contents of the page to disk. If no more records will fit into
// the page, the remaining bytes will be set to zero and a new page will be started.
// If clear is true, this is enforced regardless of how many bytes are left in the page.
func ( w * WAL ) flushPage ( clear bool ) error {
2019-09-19 19:24:34 +08:00
w . metrics . pageFlushes . Inc ( )
2018-05-16 08:02:55 -04:00
p := w . page
clear = clear || p . full ( )
2019-05-16 16:40:43 +03:00
// No more data will fit into the page or an implicit clear.
// Enqueue and clear it.
2018-05-16 08:02:55 -04:00
if clear {
2018-11-08 10:27:16 +02:00
p . alloc = pageSize // Write till end of page.
2018-05-16 08:02:55 -04:00
}
2020-10-29 11:37:03 +01:00
2018-05-16 08:02:55 -04:00
n , err := w . segment . Write ( p . buf [ p . flushed : p . alloc ] )
if err != nil {
2020-10-29 11:37:03 +01:00
p . flushed += n
2018-05-16 08:02:55 -04:00
return err
}
p . flushed += n
2018-06-18 07:52:57 -04:00
// We flushed an entire page, prepare a new one.
2018-05-16 08:02:55 -04:00
if clear {
2019-08-30 13:08:36 +04:30
p . reset ( )
2018-05-16 08:02:55 -04:00
w . donePages ++
2019-09-19 19:24:34 +08:00
w . metrics . pageCompletions . Inc ( )
2018-05-16 08:02:55 -04:00
}
return nil
}
2019-06-19 07:46:24 -06:00
// First Byte of header format:
// [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ]
const (
snappyMask = 1 << 3
recTypeMask = snappyMask - 1
)
2018-05-16 08:02:55 -04:00
type recType uint8
const (
2018-08-02 17:46:45 -04:00
recPageTerm recType = 0 // Rest of page is empty.
recFull recType = 1 // Full record.
recFirst recType = 2 // First fragment of a record.
recMiddle recType = 3 // Middle fragments of a record.
recLast recType = 4 // Final fragment of a record.
2018-05-16 08:02:55 -04:00
)
2019-06-19 07:46:24 -06:00
func recTypeFromHeader ( header byte ) recType {
return recType ( header & recTypeMask )
}
2018-05-16 08:02:55 -04:00
func ( t recType ) String ( ) string {
switch t {
case recPageTerm :
return "zero"
case recFull :
return "full"
case recFirst :
return "first"
case recMiddle :
return "middle"
case recLast :
return "last"
default :
return "<invalid>"
}
}
func ( w * WAL ) pagesPerSegment ( ) int {
return w . segmentSize / pageSize
}
// Log writes the records into the log.
// Multiple records can be passed at once to reduce writes and increase throughput.
func ( w * WAL ) Log ( recs ... [ ] byte ) error {
2018-05-29 11:46:06 -04:00
w . mtx . Lock ( )
defer w . mtx . Unlock ( )
2018-05-16 08:02:55 -04:00
// Callers could just implement their own list record format but adding
// a bit of extra logic here frees them from that overhead.
for i , r := range recs {
if err := w . log ( r , i == len ( recs ) - 1 ) ; err != nil {
2020-01-17 13:12:04 -07:00
w . metrics . writesFailed . Inc ( )
2018-05-16 08:02:55 -04:00
return err
}
}
return nil
}
2019-05-16 16:40:43 +03:00
// log writes rec to the log and forces a flush of the current page if:
// - the final record of a batch
// - the record is bigger than the page size
// - the current page is full.
2018-05-16 08:02:55 -04:00
func ( w * WAL ) log ( rec [ ] byte , final bool ) error {
2019-05-16 16:40:43 +03:00
// When the last page flush failed the page will remain full.
// When the page is full, need to flush it before trying to add more records to it.
if w . page . full ( ) {
if err := w . flushPage ( true ) ; err != nil {
return err
}
}
2018-11-08 10:27:16 +02:00
// If the record is too big to fit within the active page in the current
2018-05-16 08:02:55 -04:00
// segment, terminate the active segment and advance to the next one.
// This ensures that records do not cross segment boundaries.
2018-11-08 10:27:16 +02:00
left := w . page . remaining ( ) - recordHeaderSize // Free space in the active page.
left += ( pageSize - recordHeaderSize ) * ( w . pagesPerSegment ( ) - w . donePages - 1 ) // Free pages in the active segment.
2018-05-16 08:02:55 -04:00
if len ( rec ) > left {
if err := w . nextSegment ( ) ; err != nil {
return err
}
}
2019-06-19 07:46:24 -06:00
compressed := false
if w . compress && len ( rec ) > 0 {
// The snappy library uses `len` to calculate if we need a new buffer.
// In order to allocate as few buffers as possible make the length
// equal to the capacity.
w . snappyBuf = w . snappyBuf [ : cap ( w . snappyBuf ) ]
w . snappyBuf = snappy . Encode ( w . snappyBuf , rec )
if len ( w . snappyBuf ) < len ( rec ) {
rec = w . snappyBuf
compressed = true
}
}
2018-05-16 08:02:55 -04:00
// Populate as many pages as necessary to fit the record.
// Be careful to always do one pass to ensure we write zero-length records.
for i := 0 ; i == 0 || len ( rec ) > 0 ; i ++ {
p := w . page
// Find how much of the record we can fit into the page.
var (
l = min ( len ( rec ) , ( pageSize - p . alloc ) - recordHeaderSize )
part = rec [ : l ]
buf = p . buf [ p . alloc : ]
typ recType
)
switch {
case i == 0 && len ( part ) == len ( rec ) :
typ = recFull
case len ( part ) == len ( rec ) :
typ = recLast
case i == 0 :
typ = recFirst
default :
typ = recMiddle
}
2019-06-19 07:46:24 -06:00
if compressed {
typ |= snappyMask
}
2018-05-16 08:02:55 -04:00
buf [ 0 ] = byte ( typ )
crc := crc32 . Checksum ( part , castagnoliTable )
binary . BigEndian . PutUint16 ( buf [ 1 : ] , uint16 ( len ( part ) ) )
binary . BigEndian . PutUint32 ( buf [ 3 : ] , crc )
2018-06-18 07:52:57 -04:00
copy ( buf [ recordHeaderSize : ] , part )
2018-05-16 08:02:55 -04:00
p . alloc += len ( part ) + recordHeaderSize
2019-07-12 12:15:16 +08:00
if w . page . full ( ) {
if err := w . flushPage ( true ) ; err != nil {
2020-10-29 11:37:03 +01:00
// TODO When the flushing fails at this point and the record has not been
// fully written to the buffer, we end up with a corrupted WAL because some part of the
// record have been written to the buffer, while the rest of the record will be discarded.
2018-05-16 08:02:55 -04:00
return err
}
}
rec = rec [ l : ]
}
2019-07-12 12:15:16 +08:00
// If it's the final record of the batch and the page is not empty, flush it.
if final && w . page . alloc > 0 {
if err := w . flushPage ( false ) ; err != nil {
return err
}
}
2018-05-16 08:02:55 -04:00
return nil
}
// Truncate drops all segments before i.
2018-09-25 19:18:33 +05:30
func ( w * WAL ) Truncate ( i int ) ( err error ) {
2019-09-19 19:24:34 +08:00
w . metrics . truncateTotal . Inc ( )
2018-09-25 19:18:33 +05:30
defer func ( ) {
if err != nil {
2019-09-19 19:24:34 +08:00
w . metrics . truncateFail . Inc ( )
2018-09-25 19:18:33 +05:30
}
} ( )
2020-09-01 09:16:57 +00:00
refs , err := listSegments ( w . Dir ( ) )
2018-05-16 08:02:55 -04:00
if err != nil {
return err
}
for _ , r := range refs {
2018-10-11 18:23:52 +03:00
if r . index >= i {
2018-05-16 08:02:55 -04:00
break
}
2020-09-01 09:16:57 +00:00
if err = os . Remove ( filepath . Join ( w . Dir ( ) , r . name ) ) ; err != nil {
2018-05-16 08:02:55 -04:00
return err
}
}
return nil
}
2018-05-17 09:00:32 -04:00
func ( w * WAL ) fsync ( f * Segment ) error {
2018-05-16 08:02:55 -04:00
start := time . Now ( )
2020-10-29 11:37:03 +01:00
err := f . Sync ( )
2019-09-19 19:24:34 +08:00
w . metrics . fsyncDuration . Observe ( time . Since ( start ) . Seconds ( ) )
2018-05-16 08:02:55 -04:00
return err
}
// Close flushes all writes and closes active segment.
func ( w * WAL ) Close ( ) ( err error ) {
w . mtx . Lock ( )
defer w . mtx . Unlock ( )
2019-01-07 11:43:33 +03:00
if w . closed {
2019-04-30 10:17:07 +03:00
return errors . New ( "wal already closed" )
2019-01-07 11:43:33 +03:00
}
2020-03-23 14:49:44 +05:30
if w . segment == nil {
w . closed = true
return nil
}
2018-05-16 08:02:55 -04:00
// Flush the last page and zero out all its remaining size.
// We must not flush an empty page as it would falsely signal
// the segment is done if we start writing to it again after opening.
if w . page . alloc > 0 {
if err := w . flushPage ( true ) ; err != nil {
return err
}
}
donec := make ( chan struct { } )
w . stopc <- donec
<- donec
if err = w . fsync ( w . segment ) ; err != nil {
level . Error ( w . logger ) . Log ( "msg" , "sync previous segment" , "err" , err )
}
if err := w . segment . Close ( ) ; err != nil {
level . Error ( w . logger ) . Log ( "msg" , "close previous segment" , "err" , err )
}
2019-01-07 11:43:33 +03:00
w . closed = true
2018-05-16 08:02:55 -04:00
return nil
}
2020-09-01 09:16:57 +00:00
// Segments returns the range [first, n] of currently existing segments.
// If no segments are found, first and n are -1.
func Segments ( walDir string ) ( first , last int , err error ) {
refs , err := listSegments ( walDir )
if err != nil {
return 0 , 0 , err
}
if len ( refs ) == 0 {
return - 1 , - 1 , nil
}
return refs [ 0 ] . index , refs [ len ( refs ) - 1 ] . index , nil
}
2018-05-16 08:02:55 -04:00
type segmentRef struct {
2018-10-11 18:23:52 +03:00
name string
index int
2018-05-16 08:02:55 -04:00
}
func listSegments ( dir string ) ( refs [ ] segmentRef , err error ) {
2020-04-06 07:34:20 -06:00
files , err := ioutil . ReadDir ( dir )
2018-05-16 08:02:55 -04:00
if err != nil {
return nil , err
}
2020-04-06 07:34:20 -06:00
for _ , f := range files {
fn := f . Name ( )
2018-05-16 08:02:55 -04:00
k , err := strconv . Atoi ( fn )
if err != nil {
continue
}
2018-10-11 18:23:52 +03:00
refs = append ( refs , segmentRef { name : fn , index : k } )
2018-05-16 08:02:55 -04:00
}
2018-06-18 07:52:57 -04:00
sort . Slice ( refs , func ( i , j int ) bool {
2018-10-11 18:23:52 +03:00
return refs [ i ] . index < refs [ j ] . index
2018-06-18 07:52:57 -04:00
} )
2020-03-23 15:46:28 +08:00
for i := 0 ; i < len ( refs ) - 1 ; i ++ {
if refs [ i ] . index + 1 != refs [ i + 1 ] . index {
return nil , errors . New ( "segments are not sequential" )
}
}
2018-05-16 08:02:55 -04:00
return refs , nil
}
2018-11-30 16:46:16 +02:00
// SegmentRange groups segments by the directory and the first and last index it includes.
type SegmentRange struct {
Dir string
First , Last int
}
2018-05-16 08:02:55 -04:00
// NewSegmentsReader returns a new reader over all segments in the directory.
func NewSegmentsReader ( dir string ) ( io . ReadCloser , error ) {
2018-11-30 16:46:16 +02:00
return NewSegmentsRangeReader ( SegmentRange { dir , - 1 , - 1 } )
2018-05-16 08:02:55 -04:00
}
2018-11-30 16:46:16 +02:00
// NewSegmentsRangeReader returns a new reader over the given WAL segment ranges.
2018-10-11 18:23:52 +03:00
// If first or last are -1, the range is open on the respective end.
2018-11-30 16:46:16 +02:00
func NewSegmentsRangeReader ( sr ... SegmentRange ) ( io . ReadCloser , error ) {
2018-05-17 09:00:32 -04:00
var segs [ ] * Segment
2018-05-16 08:02:55 -04:00
2018-11-30 16:46:16 +02:00
for _ , sgmRange := range sr {
refs , err := listSegments ( sgmRange . Dir )
2018-05-16 08:02:55 -04:00
if err != nil {
2018-11-30 16:46:16 +02:00
return nil , errors . Wrapf ( err , "list segment in dir:%v" , sgmRange . Dir )
}
for _ , r := range refs {
if sgmRange . First >= 0 && r . index < sgmRange . First {
continue
}
if sgmRange . Last >= 0 && r . index > sgmRange . Last {
break
}
s , err := OpenReadSegment ( filepath . Join ( sgmRange . Dir , r . name ) )
if err != nil {
return nil , errors . Wrapf ( err , "open segment:%v in dir:%v" , r . name , sgmRange . Dir )
}
segs = append ( segs , s )
2018-05-16 08:02:55 -04:00
}
2018-05-17 09:00:32 -04:00
}
2019-05-24 11:33:28 -07:00
return NewSegmentBufReader ( segs ... ) , nil
2018-05-17 09:00:32 -04:00
}
// segmentBufReader is a buffered reader that reads in multiples of pages.
// The main purpose is that we are able to track segment and offset for
2019-02-18 19:05:07 +00:00
// corruption reporting. We have to be careful not to increment curr too
// early, as it is used by Reader.Err() to tell Repair which segment is corrupt.
// As such we pad the end of non-page align segments with zeros.
2018-05-17 09:00:32 -04:00
type segmentBufReader struct {
buf * bufio . Reader
segs [ ] * Segment
2019-02-18 19:05:07 +00:00
cur int // Index into segs.
off int // Offset of read data into current segment.
2018-05-17 09:00:32 -04:00
}
2020-03-23 15:47:11 +01:00
// nolint:golint // TODO: Consider exporting segmentBufReader
2019-05-24 11:33:28 -07:00
func NewSegmentBufReader ( segs ... * Segment ) * segmentBufReader {
2018-05-17 09:00:32 -04:00
return & segmentBufReader {
2019-02-18 19:05:07 +00:00
buf : bufio . NewReaderSize ( segs [ 0 ] , 16 * pageSize ) ,
2018-05-17 09:00:32 -04:00
segs : segs ,
2018-05-16 08:02:55 -04:00
}
}
2018-05-17 09:00:32 -04:00
func ( r * segmentBufReader ) Close ( ) ( err error ) {
for _ , s := range r . segs {
2018-05-16 08:02:55 -04:00
if e := s . Close ( ) ; e != nil {
err = e
}
}
return err
}
2019-02-18 19:05:07 +00:00
// Read implements io.Reader.
2018-05-17 09:00:32 -04:00
func ( r * segmentBufReader ) Read ( b [ ] byte ) ( n int , err error ) {
n , err = r . buf . Read ( b )
r . off += n
2019-02-18 19:05:07 +00:00
// If we succeeded, or hit a non-EOF, we can stop.
if err == nil || err != io . EOF {
2018-05-17 09:00:32 -04:00
return n , err
}
2019-02-18 19:05:07 +00:00
// We hit EOF; fake out zero padding at the end of short segments, so we
// don't increment curr too early and report the wrong segment as corrupt.
if r . off % pageSize != 0 {
i := 0
for ; n + i < len ( b ) && ( r . off + i ) % pageSize != 0 ; i ++ {
b [ n + i ] = 0
}
// Return early, even if we didn't fill b.
r . off += i
return n + i , nil
}
// There is no more deta left in the curr segment and there are no more
// segments left. Return EOF.
if r . cur + 1 >= len ( r . segs ) {
return n , io . EOF
}
// Move to next segment.
r . cur ++
r . off = 0
r . buf . Reset ( r . segs [ r . cur ] )
2018-05-17 09:00:32 -04:00
return n , nil
2018-05-16 08:02:55 -04:00
}
2019-11-11 21:40:16 -05:00
// Computing size of the WAL.
// We do this by adding the sizes of all the files under the WAL dir.
func ( w * WAL ) Size ( ) ( int64 , error ) {
return fileutil . DirSize ( w . Dir ( ) )
}