chore: replace underlying event implementation with single slice

The idea here is to use single slice of events for all the consumers.
Each consumer keeps its own position within the stream, and stream is
structured as circular buffer to avoid using too much memory.

This implementation allows for one more future: looking "back" into the
event history and returning past event starting with some offset (e.g.
timestamp, event ID, etc.). This feature is not implemented yet.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2020-05-15 18:19:06 +03:00 committed by talos-bot
parent 644b464227
commit fb585902a3
7 changed files with 292 additions and 213 deletions

1
go.mod
View File

@ -72,6 +72,7 @@ require (
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
golang.org/x/sys v0.0.0-20200202164722-d101bd2416d5
golang.org/x/text v0.3.2
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
google.golang.org/grpc v1.26.0
gopkg.in/freddierice/go-losetup.v1 v1.0.0-20170407175016-fc9adea44124
gopkg.in/fsnotify.v1 v1.4.7

View File

@ -683,17 +683,21 @@ func (s *Server) Read(in *machine.ReadRequest, srv machine.MachineService_ReadSe
func (s *Server) Events(req *machine.EventsRequest, l machine.MachineService_EventsServer) error {
errCh := make(chan error)
s.Controller.Runtime().Events().Watch(func(events <-chan machine.Event) {
s.Controller.Runtime().Events().Watch(func(events <-chan runtime.Event) {
errCh <- func() error {
for {
select {
case <-l.Context().Done():
return l.Context().Err()
case event := <-events:
err := l.Send(&event)
msg, err := event.ToMachineEvent()
if err != nil {
return err
}
if err = l.Send(msg); err != nil {
return err
}
}
}
}()

View File

@ -19,7 +19,6 @@ import (
"golang.org/x/net/http/httpproxy"
"golang.org/x/sys/unix"
"github.com/gogo/protobuf/proto"
"github.com/talos-systems/go-procfs/procfs"
"github.com/talos-systems/talos/api/machine"
@ -154,17 +153,10 @@ func main() {
}
// Watch and handle runtime events.
c.Runtime().Events().Watch(func(events <-chan machine.Event) {
c.Runtime().Events().Watch(func(events <-chan runtime.Event) {
for {
for event := range events {
if event.Data.TypeUrl == "talos/runtime/"+proto.MessageName(&machine.SequenceEvent{}) {
msg := &machine.SequenceEvent{}
if err = proto.Unmarshal(event.GetData().GetValue(), msg); err != nil {
log.Printf("failed to unmarshal message: %v", err)
continue
}
if msg, ok := event.Payload.(*machine.SequenceEvent); ok {
if msg.Error != nil {
handle(fmt.Errorf("fatal sequencer error in %q sequence: %v", msg.GetSequence(), msg.GetError().String()))
}

View File

@ -6,12 +6,19 @@ package runtime
import (
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/any"
"github.com/talos-systems/talos/api/machine"
)
// Event is what is sent on the wire.
type Event struct {
TypeURL string
Payload proto.Message
}
// WatchFunc defines the watcher callback function.
type WatchFunc func(<-chan machine.Event)
type WatchFunc func(<-chan Event)
// Watcher defines a runtime event watcher.
type Watcher interface {
@ -28,3 +35,18 @@ type EventStream interface {
Watcher
Publisher
}
// ToMachineEvent serializes Event as proto message machine.Event.
func (event *Event) ToMachineEvent() (*machine.Event, error) {
value, err := proto.Marshal(event.Payload)
if err != nil {
return nil, err
}
return &machine.Event{
Data: &any.Any{
TypeUrl: event.TypeURL,
Value: value,
},
}, nil
}

View File

@ -60,7 +60,8 @@ func NewController(b []byte) (*Controller, error) {
}
}
e := NewEvents(100)
// TODO: this should be streaming capacity and probably some constant
e := NewEvents(1000)
ctlr := &Controller{
r: NewRuntime(cfg, s, e),

View File

@ -5,97 +5,186 @@
package v1alpha1
import (
"context"
"fmt"
"log"
"sync"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/any"
"github.com/talos-systems/talos/api/machine"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
)
// Events represents the runtime event stream.
//
// Events internally is implemented as circular buffer of `runtime.Event`.
// `e.stream` slice is allocated to the initial capacity and slice size doesn't change
// throughout the lifetime of Events.
//
// To explain the internals, let's call `Publish()` method 'Publisher' (there might be
// multiple callers for it), and each `Watch()` handler as 'Consumer'.
//
// For Publisher, `Events` keeps `e.writePos` and `e.gen`, `e.writePos` is index into
// `e.stream` to write the next event to. After the write `e.writePos` is incremented.
// As `e.writePos` goes above capacity-1, it wraps around to zero and `e.gen` is incremented.
//
// So at any time `0 <= e.writePos < e.cap`, but `e.gen` indicates how many times `e.stream` slice
// was overwritten.
//
// Each Consumer captures initial position it starts consumption from as `pos` and `gen` which are
// local to each Consumers, as Consumers are free to work on their own pace. Following diagram shows
// Publisher and three Consumers:
//
// Consumer 3 Consumer 2
// pos = 9 pos = 16
// gen = 1 gen = 1
// e.stream []Event | |
// | |
// +----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
// | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 |17 |
// +----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+
// | |
// | |
// Consumer 1 Publisher
// pos = 7 e.writePos = 14
// gen = 2 e.gen = 2
//
// Capacity of Events in this diagram is 18, Publisher published already 14 + 2 * 18 = 40 events, so it
// already overwrote `e.stream` twice fully (e.gen = 2).
//
// Consumer1 is trying to keep up with the publisher, it's on the same `gen`, and it has 14-7 = 7 events
// to catch up.
//
// Consumer2 is on `gen` 1, so it is reading events which were published before the Publisher did last
// wraparound for `e.writePos` at `e.gen == 1`. Consumer 2 has a lot of events to catch up, but as it stays
// on track, it can still do that.
//
// Consumer3 is doing bad - it's on `gen` 1, but Publisher already overwrote this element while going on `gen` 2,
// so Consumer3 is handling incorrect data, it should error out.
//
// Synchronization: at the moment single mutex protects `e.stream`, `e.writePos` and `e.gen`, consumers keep their
// position as local variable, so it doesn't require synchronization. If Consumer catches up with Publisher,
// it sleeps on condition variable to be woken up by Publisher on next publish.
type Events struct {
subscribers []chan machine.Event
// stream is used as ring buffer of events
stream []runtime.Event
sync.Mutex
// writePos is the index in streams for the next write (publish)
writePos int
// gen tracks number of wraparounds in stream
gen int64
// cap is capacity of streams
cap int
// mutext protects access to writePos, gen and stream
mu sync.Mutex
c *sync.Cond
}
// NewEvents initializes and returns the v1alpha1 runtime event stream.
func NewEvents(n int) *Events {
func NewEvents(cap int) *Events {
e := &Events{
subscribers: make([]chan machine.Event, 0, n),
stream: make([]runtime.Event, cap),
cap: cap,
}
e.c = sync.NewCond(&e.mu)
return e
}
// Watch implements the Events interface.
//
//nolint: gocyclo
func (e *Events) Watch(f runtime.WatchFunc) {
ch := e.add()
// context is used to abort the loop when WatchFunc exits
ctx, ctxCancel := context.WithCancel(context.Background())
ch := make(chan runtime.Event)
go func() {
defer ctxCancel()
f(ch)
}()
// capture initial consumer position/gen, consumer starts consuming from the next
// event to be published
e.mu.Lock()
pos := e.writePos
gen := e.gen
e.mu.Unlock()
go func() {
defer close(ch)
defer e.delete(ch)
f(ch)
for {
e.mu.Lock()
// while there's no data to consume (pos == e.writePos), wait for Condition variable signal,
// then recheck the condition to be true.
for pos == e.writePos {
e.c.Wait()
select {
case <-ctx.Done():
e.mu.Unlock()
return
default:
}
}
if e.gen > gen+1 || (e.gen > gen && pos < e.writePos) {
// buffer overrun, there's no way to signal error in this case,
// so for now just return
//
// why buffer overrun?
// if gen is 2 generations behind of e.gen, buffer was overwritten anyways
// if gen is 1 generation behind of e.gen, buffer was overwritten if consumer
// is behind the publisher.
e.mu.Unlock()
return
}
event := e.stream[pos]
pos = (pos + 1) % e.cap
if pos == 0 {
// consumer wraps around e.cap-1, so increment gen
gen++
}
e.mu.Unlock()
// send event to WatchFunc, wait for it to process the event
select {
case ch <- event:
case <-ctx.Done():
return
}
}
}()
}
// Publish implements the Events interface.
func (e *Events) Publish(msg proto.Message) {
value, err := proto.Marshal(msg)
if err != nil {
log.Printf("failed to marshal message: %v", err)
return
event := runtime.Event{
// In the future, we can publish `talos/runtime`, and
// `talos/plugin/<plugin>` (or something along those lines) events.
TypeURL: fmt.Sprintf("talos/runtime/%s", proto.MessageName(msg)),
Payload: msg,
}
event := machine.Event{
Data: &any.Any{
// In the future, we can publish `talos/runtime`, and
// `talos/plugin/<plugin>` (or something along those lines) events.
TypeUrl: fmt.Sprintf("talos/runtime/%s", proto.MessageName(msg)),
Value: value,
},
e.mu.Lock()
defer e.mu.Unlock()
e.stream[e.writePos] = event
e.writePos = (e.writePos + 1) % e.cap
if e.writePos == 0 {
// wraparound around e.cap-1, increment generation
e.gen++
}
e.Lock()
defer e.Unlock()
for _, sub := range e.subscribers {
// drop the event if some subscriber is stuck
// dropping is bad, but better than blocking event propagation
select {
case sub <- event:
default:
}
}
}
func (e *Events) add() chan machine.Event {
e.Lock()
defer e.Unlock()
ch := make(chan machine.Event, 100)
e.subscribers = append(e.subscribers, ch)
return ch
}
func (e *Events) delete(ch chan machine.Event) {
e.Lock()
defer e.Unlock()
for i, sub := range e.subscribers {
if sub == ch {
l := len(e.subscribers)
e.subscribers[i] = e.subscribers[l-1]
e.subscribers[l-1] = nil
e.subscribers = e.subscribers[:l-1]
}
}
e.c.Broadcast()
}

View File

@ -6,189 +6,159 @@
package v1alpha1
import (
"reflect"
"context"
"fmt"
"strconv"
"sync"
"sync/atomic"
"testing"
"github.com/golang/protobuf/proto"
"golang.org/x/time/rate"
"github.com/talos-systems/talos/api/machine"
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
)
func TestNewEvents(t *testing.T) {
type args struct {
n int
}
func TestEvents_Publish(t *testing.T) {
tests := []struct {
name string
args args
want *Events
name string
cap int
watchers int
messages int
}{
{
name: "success",
args: args{
n: 100,
},
want: &Events{
subscribers: make([]chan machine.Event, 0, 100),
},
name: "nowatchers",
cap: 100,
watchers: 0,
messages: 100,
},
{
name: "onemessage",
cap: 100,
watchers: 10,
messages: 1,
},
{
name: "manymessages_singlewatcher",
cap: 100,
watchers: 1,
messages: 50,
},
{
name: "manymessages_manywatchers",
cap: 100,
watchers: 20,
messages: 50,
},
{
name: "manymessages_overcap",
cap: 10,
watchers: 5,
messages: 200,
},
{
name: "megamessages_overcap",
cap: 1000,
watchers: 1,
messages: 2000,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := NewEvents(tt.args.n); !reflect.DeepEqual(got, tt.want) {
t.Errorf("NewEvents() = %v, want %v", got, tt.want)
e := NewEvents(tt.cap)
var wg sync.WaitGroup
wg.Add(tt.watchers)
got := uint32(0)
for i := 0; i < tt.watchers; i++ {
e.Watch(func(events <-chan runtime.Event) {
defer wg.Done()
for j := 0; j < tt.messages; j++ {
event := <-events
seq, err := strconv.Atoi(event.Payload.(*machine.SequenceEvent).Sequence) //nolint: errcheck
if err != nil {
t.Fatalf("failed to convert sequence to number: %s", err)
}
if seq != j {
t.Fatalf("unexpected sequence: %d != %d", seq, j)
}
atomic.AddUint32(&got, 1)
}
})
}
l := rate.NewLimiter(1000, tt.cap/2)
for i := 0; i < tt.messages; i++ {
_ = l.Wait(context.Background()) //nolint: errcheck
e.Publish(&machine.SequenceEvent{
Sequence: strconv.Itoa(i),
})
}
wg.Wait()
if got != uint32(tt.messages*tt.watchers) {
t.Errorf("Watch() = got %v, want %v", got, tt.messages*tt.watchers)
}
})
}
}
func BenchmarkWatch(b *testing.B) {
e := NewEvents(b.N)
e := NewEvents(100)
var wg sync.WaitGroup
wg.Add(b.N)
for i := 0; i < b.N; i++ {
e.Watch(func(events <-chan machine.Event) { wg.Done() })
e.Watch(func(events <-chan runtime.Event) { wg.Done() })
}
wg.Wait()
}
func TestEvents_Watch(t *testing.T) {
type fields struct {
subscribers []chan machine.Event
}
tests := []struct {
name string
count int
fields fields
}{
{
name: "success",
count: 10,
fields: fields{
subscribers: make([]chan machine.Event, 0, 100),
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &Events{
subscribers: tt.fields.subscribers,
}
func BenchmarkPublish(bb *testing.B) {
for _, watchers := range []int{0, 1, 10} {
bb.Run(fmt.Sprintf("Watchers-%d", watchers), func(b *testing.B) {
e := NewEvents(10000)
var wg sync.WaitGroup
wg.Add(tt.count)
for i := 0; i < tt.count; i++ {
e.Watch(func(events <-chan machine.Event) { wg.Done() })
}
watchers := 10
wg.Wait()
wg.Add(watchers)
// We need to lock here to prevent a race condition when checking the
// number of subscribers.
e.Lock()
defer e.Unlock()
for j := 0; j < watchers; j++ {
e.Watch(func(events <-chan runtime.Event) {
defer wg.Done()
// We can only check if the number of subscribers decreases because there
// is a race condition between the tear down of subscriber and the above
// lock. In other words, there is a chance that the number of subscribers
// is not zero.
if len(e.subscribers) > tt.count {
t.Errorf("Watch() = got %v subscribers, expected to be < %v", len(e.subscribers), tt.count)
}
})
}
}
func TestEvents_Publish(t *testing.T) {
type fields struct {
subscribers []chan machine.Event
Mutex *sync.Mutex
}
type args struct {
event proto.Message
}
tests := []struct {
name string
count int
fields fields
args args
}{
{
name: "success",
count: 10,
fields: fields{
subscribers: make([]chan machine.Event, 0, 100),
Mutex: &sync.Mutex{},
},
args: args{
event: &machine.SequenceEvent{},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
e := &Events{
subscribers: tt.fields.subscribers,
}
var wg sync.WaitGroup
wg.Add(tt.count)
mu := &sync.Mutex{}
got := 0
for i := 0; i < tt.count; i++ {
e.Watch(func(events <-chan machine.Event) {
<-events
mu.Lock()
got++
mu.Unlock()
wg.Done()
for i := 0; i < b.N; i++ {
if _, ok := <-events; !ok {
return
}
}
})
}
e.Publish(tt.args.event)
ev := machine.SequenceEvent{}
b.ResetTimer()
for i := 0; i < b.N; i++ {
e.Publish(&ev)
}
wg.Wait()
if got != tt.count {
t.Errorf("Watch() = got %v, want %v", got, tt.count)
}
})
}
}
func BenchmarkPublish(b *testing.B) {
e := NewEvents(b.N)
var wg sync.WaitGroup
wg.Add(b.N)
for i := 0; i < b.N; i++ {
e.Watch(func(events <-chan machine.Event) {
<-events
wg.Done()
})
}
e.Publish(&machine.SequenceEvent{})
wg.Wait()
}