2016-08-16 19:13:18 +02:00
package cluster
import (
2016-08-16 16:26:10 +01:00
"context"
2016-08-18 14:20:11 +02:00
"encoding/json"
2016-08-16 19:13:18 +02:00
"fmt"
2016-12-30 09:21:13 +01:00
"sync"
"time"
2016-09-23 18:27:01 +02:00
"github.com/cenk/backoff"
2016-08-16 19:13:18 +02:00
"github.com/containous/staert"
2016-09-23 18:27:01 +02:00
"github.com/containous/traefik/job"
2016-08-18 14:20:11 +02:00
"github.com/containous/traefik/log"
2016-12-08 13:32:12 +01:00
"github.com/containous/traefik/safe"
2016-08-16 19:13:18 +02:00
"github.com/docker/libkv/store"
"github.com/satori/go.uuid"
)
// Metadata stores Object plus metadata
type Metadata struct {
2016-08-18 14:20:11 +02:00
object Object
Object [ ] byte
Lock string
}
2016-09-29 15:36:52 +02:00
// NewMetadata returns new Metadata
func NewMetadata ( object Object ) * Metadata {
return & Metadata { object : object }
}
// Marshall marshalls object
func ( m * Metadata ) Marshall ( ) error {
2016-08-18 14:20:11 +02:00
var err error
m . Object , err = json . Marshal ( m . object )
return err
}
func ( m * Metadata ) unmarshall ( ) error {
if len ( m . Object ) == 0 {
return nil
}
return json . Unmarshal ( m . Object , m . object )
2016-08-16 19:13:18 +02:00
}
2016-08-18 14:20:11 +02:00
// Listener is called when Object has been changed in KV store
type Listener func ( Object ) error
var _ Store = ( * Datastore ) ( nil )
2016-08-16 19:13:18 +02:00
// Datastore holds a struct synced in a KV store
type Datastore struct {
2016-08-18 14:20:11 +02:00
kv staert . KvSource
2016-08-16 19:13:18 +02:00
ctx context . Context
localLock * sync . RWMutex
meta * Metadata
lockKey string
2016-08-18 14:20:11 +02:00
listener Listener
2016-08-16 19:13:18 +02:00
}
// NewDataStore creates a Datastore
2016-11-16 08:56:52 +00:00
func NewDataStore ( ctx context . Context , kvSource staert . KvSource , object Object , listener Listener ) ( * Datastore , error ) {
2016-08-16 19:13:18 +02:00
datastore := Datastore {
kv : kvSource ,
ctx : ctx ,
2016-08-18 14:20:11 +02:00
meta : & Metadata { object : object } ,
2016-08-16 19:13:18 +02:00
lockKey : kvSource . Prefix + "/lock" ,
localLock : & sync . RWMutex { } ,
2016-08-18 14:20:11 +02:00
listener : listener ,
2016-08-16 19:13:18 +02:00
}
err := datastore . watchChanges ( )
if err != nil {
return nil , err
}
return & datastore , nil
}
func ( d * Datastore ) watchChanges ( ) error {
stopCh := make ( chan struct { } )
kvCh , err := d . kv . Watch ( d . lockKey , stopCh )
if err != nil {
return err
}
2017-07-19 14:11:45 +02:00
safe . Go ( func ( ) {
2016-08-16 19:13:18 +02:00
ctx , cancel := context . WithCancel ( d . ctx )
operation := func ( ) error {
for {
select {
case <- ctx . Done ( ) :
stopCh <- struct { } { }
return nil
case _ , ok := <- kvCh :
if ! ok {
cancel ( )
return err
}
2016-09-23 18:27:01 +02:00
err = d . reload ( )
2016-08-16 19:13:18 +02:00
if err != nil {
return err
}
2016-08-18 14:20:11 +02:00
if d . listener != nil {
err := d . listener ( d . meta . object )
if err != nil {
log . Errorf ( "Error calling datastore listener: %s" , err )
}
}
2016-08-16 19:13:18 +02:00
}
}
}
notify := func ( err error , time time . Duration ) {
log . Errorf ( "Error in watch datastore: %+v, retrying in %s" , err , time )
}
2016-12-08 13:32:12 +01:00
err := backoff . RetryNotify ( safe . OperationWithRecover ( operation ) , job . NewBackOff ( backoff . NewExponentialBackOff ( ) ) , notify )
2016-08-16 19:13:18 +02:00
if err != nil {
log . Errorf ( "Error in watch datastore: %v" , err )
}
2017-07-19 14:11:45 +02:00
} )
2016-08-16 19:13:18 +02:00
return nil
}
2016-09-23 18:27:01 +02:00
func ( d * Datastore ) reload ( ) error {
2017-05-26 17:03:14 +02:00
log . Debug ( "Datastore reload" )
2016-09-23 18:27:01 +02:00
d . localLock . Lock ( )
err := d . kv . LoadConfig ( d . meta )
if err != nil {
d . localLock . Unlock ( )
return err
}
err = d . meta . unmarshall ( )
if err != nil {
d . localLock . Unlock ( )
return err
}
d . localLock . Unlock ( )
return nil
}
2016-08-16 19:13:18 +02:00
// Begin creates a transaction with the KV store.
2016-08-18 14:20:11 +02:00
func ( d * Datastore ) Begin ( ) ( Transaction , Object , error ) {
2016-08-18 13:03:10 +02:00
id := uuid . NewV4 ( ) . String ( )
2016-08-18 14:20:11 +02:00
log . Debugf ( "Transaction %s begins" , id )
2016-08-18 13:03:10 +02:00
remoteLock , err := d . kv . NewLock ( d . lockKey , & store . LockOptions { TTL : 20 * time . Second , Value : [ ] byte ( id ) } )
2016-08-16 19:13:18 +02:00
if err != nil {
2016-08-18 14:20:11 +02:00
return nil , nil , err
2016-08-16 19:13:18 +02:00
}
stopCh := make ( chan struct { } )
ctx , cancel := context . WithCancel ( d . ctx )
var errLock error
go func ( ) {
_ , errLock = remoteLock . Lock ( stopCh )
cancel ( )
} ( )
select {
case <- ctx . Done ( ) :
if errLock != nil {
2016-08-18 14:20:11 +02:00
return nil , nil , errLock
2016-08-16 19:13:18 +02:00
}
case <- d . ctx . Done ( ) :
stopCh <- struct { } { }
2016-08-18 14:20:11 +02:00
return nil , nil , d . ctx . Err ( )
2016-08-16 19:13:18 +02:00
}
// we got the lock! Now make sure we are synced with KV store
operation := func ( ) error {
meta := d . get ( )
2016-08-18 13:03:10 +02:00
if meta . Lock != id {
return fmt . Errorf ( "Object lock value: expected %s, got %s" , id , meta . Lock )
2016-08-16 19:13:18 +02:00
}
return nil
}
notify := func ( err error , time time . Duration ) {
log . Errorf ( "Datastore sync error: %v, retrying in %s" , err , time )
2016-09-23 18:27:01 +02:00
err = d . reload ( )
if err != nil {
log . Errorf ( "Error reloading: %+v" , err )
}
2016-08-16 19:13:18 +02:00
}
ebo := backoff . NewExponentialBackOff ( )
ebo . MaxElapsedTime = 60 * time . Second
2016-12-08 13:32:52 +01:00
err = backoff . RetryNotify ( safe . OperationWithRecover ( operation ) , ebo , notify )
2016-08-16 19:13:18 +02:00
if err != nil {
2016-08-18 14:20:11 +02:00
return nil , nil , fmt . Errorf ( "Datastore cannot sync: %v" , err )
2016-08-16 19:13:18 +02:00
}
// we synced with KV store, we can now return Setter
2016-08-18 14:20:11 +02:00
return & datastoreTransaction {
2016-08-16 19:13:18 +02:00
Datastore : d ,
remoteLock : remoteLock ,
2016-08-18 14:20:11 +02:00
id : id ,
} , d . meta . object , nil
2016-08-16 19:13:18 +02:00
}
func ( d * Datastore ) get ( ) * Metadata {
d . localLock . RLock ( )
defer d . localLock . RUnlock ( )
return d . meta
}
2016-08-18 14:20:11 +02:00
// Load load atomically a struct from the KV store
func ( d * Datastore ) Load ( ) ( Object , error ) {
d . localLock . Lock ( )
defer d . localLock . Unlock ( )
err := d . kv . LoadConfig ( d . meta )
if err != nil {
return nil , err
}
err = d . meta . unmarshall ( )
if err != nil {
return nil , err
}
return d . meta . object , nil
}
2016-08-16 19:13:18 +02:00
// Get atomically a struct from the KV store
func ( d * Datastore ) Get ( ) Object {
d . localLock . RLock ( )
defer d . localLock . RUnlock ( )
2016-08-18 14:20:11 +02:00
return d . meta . object
2016-08-16 19:13:18 +02:00
}
2016-08-18 14:20:11 +02:00
var _ Transaction = ( * datastoreTransaction ) ( nil )
type datastoreTransaction struct {
2016-08-16 19:13:18 +02:00
* Datastore
remoteLock store . Locker
dirty bool
2016-08-18 14:20:11 +02:00
id string
2016-08-16 19:13:18 +02:00
}
// Commit allows to set an object in the KV store
2016-08-18 14:20:11 +02:00
func ( s * datastoreTransaction ) Commit ( object Object ) error {
2016-08-16 19:13:18 +02:00
s . localLock . Lock ( )
defer s . localLock . Unlock ( )
if s . dirty {
2016-12-09 14:37:39 +01:00
return fmt . Errorf ( "Transaction already used, please begin a new one" )
2016-08-16 19:13:18 +02:00
}
2016-08-18 14:20:11 +02:00
s . Datastore . meta . object = object
2016-09-29 15:36:52 +02:00
err := s . Datastore . meta . Marshall ( )
2016-08-18 14:20:11 +02:00
if err != nil {
2016-12-09 14:37:39 +01:00
return fmt . Errorf ( "Marshall error: %s" , err )
2016-08-18 14:20:11 +02:00
}
err = s . kv . StoreConfig ( s . Datastore . meta )
2016-08-16 19:13:18 +02:00
if err != nil {
2016-12-09 14:37:39 +01:00
return fmt . Errorf ( "StoreConfig error: %s" , err )
2016-08-16 19:13:18 +02:00
}
err = s . remoteLock . Unlock ( )
if err != nil {
2016-12-09 14:37:39 +01:00
return fmt . Errorf ( "Unlock error: %s" , err )
2016-08-16 19:13:18 +02:00
}
s . dirty = true
2016-11-16 02:32:01 +00:00
log . Debugf ( "Transaction committed %s" , s . id )
2016-08-16 19:13:18 +02:00
return nil
}