feat: allow running only one sequence at a time
Fix `Talos` sequencer to run only a single sequence at the same time. Sequences priority was updated. To match the table: | what is running (columns) what is requested (rows) | boot | reboot | reset | upgrade | |----------------------------------------------------|------|--------|-------|---------| | reboot | Y | Y | Y | N | | reset | Y | N | N | N | | upgrade | Y | N | N | N | With a small addition that `WithTakeover` is still there. If set, priority is ignored. This is mainly used for `Shutdown` sequence invokation. And if doing apply config with reboot enabled. Signed-off-by: Artem Chernyshev <artem.chernyshev@talos-systems.com>
This commit is contained in:
parent
ec05aee040
commit
ae1bec59e9
@ -46,6 +46,7 @@ extend google.protobuf.ServiceOptions {
|
||||
enum Code {
|
||||
FATAL = 0;
|
||||
LOCKED = 1;
|
||||
CANCELED = 2;
|
||||
}
|
||||
|
||||
message Error {
|
||||
|
@ -275,10 +275,6 @@ Config diff:
|
||||
if !runtime.IsRebootError(err) {
|
||||
log.Println("apply configuration failed:", err)
|
||||
}
|
||||
|
||||
if err != runtime.ErrLocked {
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
}
|
||||
}()
|
||||
default:
|
||||
@ -315,16 +311,10 @@ func (s *Server) Reboot(ctx context.Context, in *machine.RebootRequest) (reply *
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := s.Controller.Run(context.Background(), runtime.SequenceReboot, in, runtime.WithTakeover()); err != nil {
|
||||
if err := s.Controller.Run(context.Background(), runtime.SequenceReboot, in); err != nil {
|
||||
if !runtime.IsRebootError(err) {
|
||||
log.Println("reboot failed:", err)
|
||||
}
|
||||
|
||||
if err != runtime.ErrLocked {
|
||||
// NB: We stop the gRPC server since a failed sequence triggers a
|
||||
// reboot.
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -393,16 +383,10 @@ func (s *Server) Rollback(ctx context.Context, in *machine.RollbackRequest) (*ma
|
||||
}
|
||||
|
||||
go func() {
|
||||
if err := s.Controller.Run(context.Background(), runtime.SequenceReboot, in, runtime.WithForce(), runtime.WithTakeover()); err != nil {
|
||||
if err := s.Controller.Run(context.Background(), runtime.SequenceReboot, in, runtime.WithTakeover()); err != nil {
|
||||
if !runtime.IsRebootError(err) {
|
||||
log.Println("reboot failed:", err)
|
||||
}
|
||||
|
||||
if err != runtime.ErrLocked {
|
||||
// NB: We stop the gRPC server since a failed sequence triggers a
|
||||
// reboot.
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -464,12 +448,6 @@ func (s *Server) Shutdown(ctx context.Context, in *machine.ShutdownRequest) (rep
|
||||
if !runtime.IsRebootError(err) {
|
||||
log.Println("shutdown failed:", err)
|
||||
}
|
||||
|
||||
if err != runtime.ErrLocked {
|
||||
// NB: We stop the gRPC server since a failed sequence triggers a
|
||||
// reboot.
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@ -563,12 +541,6 @@ func (s *Server) Upgrade(ctx context.Context, in *machine.UpgradeRequest) (reply
|
||||
if !runtime.IsRebootError(err) {
|
||||
log.Println("reboot for staged upgrade failed:", err)
|
||||
}
|
||||
|
||||
if err != runtime.ErrLocked {
|
||||
// NB: We stop the gRPC server since a failed sequence triggers a
|
||||
// reboot.
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
}
|
||||
}()
|
||||
} else {
|
||||
@ -581,12 +553,6 @@ func (s *Server) Upgrade(ctx context.Context, in *machine.UpgradeRequest) (reply
|
||||
if !runtime.IsRebootError(err) {
|
||||
log.Println("upgrade failed:", err)
|
||||
}
|
||||
|
||||
if err != runtime.ErrLocked {
|
||||
// NB: We stop the gRPC server since a failed sequence triggers a
|
||||
// reboot.
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@ -674,12 +640,6 @@ func (s *Server) Reset(ctx context.Context, in *machine.ResetRequest) (reply *ma
|
||||
if !runtime.IsRebootError(err) {
|
||||
log.Println("reset failed:", err)
|
||||
}
|
||||
|
||||
if err != runtime.ErrLocked {
|
||||
// NB: We stop the gRPC server since a failed sequence triggers a
|
||||
// reboot.
|
||||
s.server.GracefulStop()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -272,8 +272,9 @@ func run() error {
|
||||
switch msg := event.Payload.(type) {
|
||||
case *machine.SequenceEvent:
|
||||
if msg.Error != nil {
|
||||
if msg.Error.GetCode() == common.Code_LOCKED {
|
||||
// ignore sequence lock errors, they're not fatal
|
||||
if msg.Error.GetCode() == common.Code_LOCKED ||
|
||||
msg.Error.GetCode() == common.Code_CANCELED {
|
||||
// ignore sequence lock and canceled errors, they're not fatal
|
||||
continue
|
||||
}
|
||||
|
||||
|
@ -25,27 +25,17 @@ type Phase struct {
|
||||
Tasks []TaskSetupFunc
|
||||
}
|
||||
|
||||
// ControllerOptions represents the options for a controller.
|
||||
type ControllerOptions struct {
|
||||
Force bool
|
||||
// LockOptions represents the options for a controller.
|
||||
type LockOptions struct {
|
||||
Takeover bool
|
||||
}
|
||||
|
||||
// ControllerOption represents an option setter.
|
||||
type ControllerOption func(o *ControllerOptions) error
|
||||
|
||||
// WithForce sets the force option to true.
|
||||
func WithForce() ControllerOption {
|
||||
return func(o *ControllerOptions) error {
|
||||
o.Force = true
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
// LockOption represents an option setter.
|
||||
type LockOption func(o *LockOptions) error
|
||||
|
||||
// WithTakeover sets the take option to true.
|
||||
func WithTakeover() ControllerOption {
|
||||
return func(o *ControllerOptions) error {
|
||||
func WithTakeover() LockOption {
|
||||
return func(o *LockOptions) error {
|
||||
o.Takeover = true
|
||||
|
||||
return nil
|
||||
@ -53,8 +43,8 @@ func WithTakeover() ControllerOption {
|
||||
}
|
||||
|
||||
// DefaultControllerOptions returns the default controller options.
|
||||
func DefaultControllerOptions() ControllerOptions {
|
||||
return ControllerOptions{}
|
||||
func DefaultControllerOptions() LockOptions {
|
||||
return LockOptions{}
|
||||
}
|
||||
|
||||
// Controller represents the controller responsible for managing the execution
|
||||
@ -62,7 +52,7 @@ func DefaultControllerOptions() ControllerOptions {
|
||||
type Controller interface {
|
||||
Runtime() Runtime
|
||||
Sequencer() Sequencer
|
||||
Run(context.Context, Sequence, interface{}, ...ControllerOption) error
|
||||
Run(context.Context, Sequence, interface{}, ...LockOption) error
|
||||
V1Alpha2() V1Alpha2Controller
|
||||
}
|
||||
|
||||
|
@ -14,8 +14,10 @@ import (
|
||||
type Sequence int
|
||||
|
||||
const (
|
||||
// SequenceNoop is the noop sequence.
|
||||
SequenceNoop Sequence = iota
|
||||
// SequenceBoot is the boot sequence.
|
||||
SequenceBoot Sequence = iota
|
||||
SequenceBoot
|
||||
// SequenceInitialize is the initialize sequence.
|
||||
SequenceInitialize
|
||||
// SequenceInstall is the install sequence.
|
||||
@ -30,8 +32,6 @@ const (
|
||||
SequenceReset
|
||||
// SequenceReboot is the reboot sequence.
|
||||
SequenceReboot
|
||||
// SequenceNoop is the noop sequence.
|
||||
SequenceNoop
|
||||
)
|
||||
|
||||
const (
|
||||
@ -46,9 +46,44 @@ const (
|
||||
noop = "noop"
|
||||
)
|
||||
|
||||
var sequenceTakeOver = map[Sequence]map[Sequence]struct{}{
|
||||
SequenceBoot: {
|
||||
SequenceReboot: {},
|
||||
SequenceReset: {},
|
||||
SequenceUpgrade: {},
|
||||
},
|
||||
SequenceReboot: {
|
||||
SequenceReboot: {},
|
||||
},
|
||||
SequenceReset: {
|
||||
SequenceReboot: {},
|
||||
},
|
||||
}
|
||||
|
||||
// String returns the string representation of a `Sequence`.
|
||||
func (s Sequence) String() string {
|
||||
return [...]string{boot, initialize, install, shutdown, upgrade, stageUpgrade, reset, reboot, noop}[s]
|
||||
return [...]string{noop, boot, initialize, install, shutdown, upgrade, stageUpgrade, reset, reboot}[s]
|
||||
}
|
||||
|
||||
// CanTakeOver defines sequences priority.
|
||||
//
|
||||
// | what is running (columns) what is requested (rows) | boot | reboot | reset | upgrade |
|
||||
// |----------------------------------------------------|------|--------|-------|---------|
|
||||
// | reboot | Y | Y | Y | N |
|
||||
// | reset | Y | N | N | N |
|
||||
// | upgrade | Y | N | N | N |.
|
||||
func (s Sequence) CanTakeOver(running Sequence) bool {
|
||||
if running == SequenceNoop {
|
||||
return true
|
||||
}
|
||||
|
||||
if sequences, ok := sequenceTakeOver[running]; ok {
|
||||
if _, ok = sequences[s]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// ParseSequence returns a `Sequence` that matches the specified string.
|
||||
|
@ -6,6 +6,7 @@ package v1alpha1
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
@ -13,12 +14,9 @@ import (
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/talos-systems/go-retry/retry"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
|
||||
@ -32,13 +30,11 @@ import (
|
||||
// Controller represents the controller responsible for managing the execution
|
||||
// of sequences.
|
||||
type Controller struct {
|
||||
s runtime.Sequencer
|
||||
r *Runtime
|
||||
s *Sequencer
|
||||
v2 *v1alpha2.Controller
|
||||
|
||||
semaphore int32
|
||||
cancelCtx context.CancelFunc
|
||||
ctxMutex sync.Mutex
|
||||
priorityLock *PriorityLock[runtime.Sequence]
|
||||
}
|
||||
|
||||
// NewController intializes and returns a controller.
|
||||
@ -62,8 +58,9 @@ func NewController() (*Controller, error) {
|
||||
l := logging.NewCircularBufferLoggingManager(log.New(os.Stdout, "machined fallback logger: ", log.Flags()))
|
||||
|
||||
ctlr := &Controller{
|
||||
r: NewRuntime(nil, s, e, l),
|
||||
s: NewSequencer(),
|
||||
r: NewRuntime(nil, s, e, l),
|
||||
s: NewSequencer(),
|
||||
priorityLock: NewPriorityLock[runtime.Sequence](),
|
||||
}
|
||||
|
||||
ctlr.v2, err = v1alpha2.NewController(ctlr.r)
|
||||
@ -77,50 +74,16 @@ func NewController() (*Controller, error) {
|
||||
// Run executes all phases known to the controller in serial. `Controller`
|
||||
// aborts immediately if any phase fails.
|
||||
//nolint:gocyclo
|
||||
func (c *Controller) Run(ctx context.Context, seq runtime.Sequence, data interface{}, setters ...runtime.ControllerOption) error {
|
||||
func (c *Controller) Run(ctx context.Context, seq runtime.Sequence, data interface{}, setters ...runtime.LockOption) error {
|
||||
// We must ensure that the runtime is configured since all sequences depend
|
||||
// on the runtime.
|
||||
if c.r == nil {
|
||||
return runtime.ErrUndefinedRuntime
|
||||
}
|
||||
|
||||
opts := runtime.DefaultControllerOptions()
|
||||
|
||||
for _, f := range setters {
|
||||
if err := f(&opts); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Allow only one sequence to run at a time with the exception of bootstrap
|
||||
// and reset sequences.
|
||||
switch seq { //nolint:exhaustive
|
||||
case runtime.SequenceReset:
|
||||
// Do not attempt to lock.
|
||||
default:
|
||||
if opts.Force {
|
||||
break
|
||||
}
|
||||
|
||||
if opts.Takeover {
|
||||
c.ctxMutex.Lock()
|
||||
if c.cancelCtx != nil {
|
||||
c.cancelCtx()
|
||||
}
|
||||
|
||||
c.ctxMutex.Unlock()
|
||||
|
||||
err := retry.Constant(time.Minute*1, retry.WithUnits(time.Millisecond*100)).RetryWithContext(ctx, func(ctx context.Context) error {
|
||||
if c.TryLock() {
|
||||
return retry.ExpectedError(fmt.Errorf("failed to acquire lock"))
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else if c.TryLock() {
|
||||
ctx, err := c.priorityLock.Lock(ctx, time.Minute, seq, setters...)
|
||||
if err != nil {
|
||||
if errors.Is(err, runtime.ErrLocked) {
|
||||
c.Runtime().Events().Publish(&machine.SequenceEvent{
|
||||
Sequence: seq.String(),
|
||||
Action: machine.SequenceEvent_NOOP,
|
||||
@ -129,23 +92,13 @@ func (c *Controller) Run(ctx context.Context, seq runtime.Sequence, data interfa
|
||||
Message: fmt.Sprintf("sequence not started: %s", runtime.ErrLocked.Error()),
|
||||
},
|
||||
})
|
||||
|
||||
return runtime.ErrLocked
|
||||
}
|
||||
|
||||
defer c.Unlock()
|
||||
|
||||
c.ctxMutex.Lock()
|
||||
ctx, c.cancelCtx = context.WithCancel(ctx)
|
||||
c.ctxMutex.Unlock()
|
||||
|
||||
defer func() {
|
||||
c.ctxMutex.Lock()
|
||||
c.cancelCtx = nil
|
||||
c.ctxMutex.Unlock()
|
||||
}()
|
||||
return err
|
||||
}
|
||||
|
||||
defer c.priorityLock.Unlock()
|
||||
|
||||
phases, err := c.phases(seq, data)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -153,11 +106,17 @@ func (c *Controller) Run(ctx context.Context, seq runtime.Sequence, data interfa
|
||||
|
||||
err = c.run(ctx, seq, phases, data)
|
||||
if err != nil {
|
||||
code := common.Code_FATAL
|
||||
|
||||
if errors.Is(err, context.Canceled) {
|
||||
code = common.Code_CANCELED
|
||||
}
|
||||
|
||||
c.Runtime().Events().Publish(&machine.SequenceEvent{
|
||||
Sequence: seq.String(),
|
||||
Action: machine.SequenceEvent_NOOP,
|
||||
Error: &common.Error{
|
||||
Code: common.Code_FATAL,
|
||||
Code: code,
|
||||
Message: fmt.Sprintf("sequence failed: %s", err.Error()),
|
||||
},
|
||||
})
|
||||
@ -230,18 +189,6 @@ func (c *Controller) ListenForEvents(ctx context.Context) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// TryLock attempts to set a lock that prevents multiple sequences from running
|
||||
// at once. If currently locked, a value of true will be returned. If not
|
||||
// currently locked, a value of false will be returned.
|
||||
func (c *Controller) TryLock() bool {
|
||||
return !atomic.CompareAndSwapInt32(&c.semaphore, 0, 1)
|
||||
}
|
||||
|
||||
// Unlock removes the lock set by `TryLock`.
|
||||
func (c *Controller) Unlock() bool {
|
||||
return atomic.CompareAndSwapInt32(&c.semaphore, 1, 0)
|
||||
}
|
||||
|
||||
func (c *Controller) run(ctx context.Context, seq runtime.Sequence, phases []runtime.Phase, data interface{}) error {
|
||||
c.Runtime().Events().Publish(&machine.SequenceEvent{
|
||||
Sequence: seq.String(),
|
||||
@ -314,7 +261,7 @@ func (c *Controller) runPhase(ctx context.Context, phase runtime.Phase, seq runt
|
||||
Action: machine.PhaseEvent_START,
|
||||
})
|
||||
|
||||
var eg errgroup.Group
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
|
||||
for number, task := range phase.Tasks {
|
||||
// Make the task number human friendly.
|
||||
|
@ -0,0 +1,210 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
//nolint:scopelint,testpackage
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
v1alpha1server "github.com/talos-systems/talos/internal/app/machined/internal/server/v1alpha1"
|
||||
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
|
||||
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime/logging"
|
||||
"github.com/talos-systems/talos/pkg/machinery/api/machine"
|
||||
)
|
||||
|
||||
type mockSequencer struct {
|
||||
callsMu sync.Mutex
|
||||
calls map[runtime.Sequence]int
|
||||
|
||||
phases map[runtime.Sequence]PhaseList
|
||||
}
|
||||
|
||||
func (m *mockSequencer) Boot(r runtime.Runtime) []runtime.Phase {
|
||||
return m.phases[runtime.SequenceBoot]
|
||||
}
|
||||
|
||||
func (m *mockSequencer) Initialize(r runtime.Runtime) []runtime.Phase {
|
||||
return m.phases[runtime.SequenceInitialize]
|
||||
}
|
||||
|
||||
func (m *mockSequencer) Install(r runtime.Runtime) []runtime.Phase {
|
||||
return m.phases[runtime.SequenceInstall]
|
||||
}
|
||||
|
||||
func (m *mockSequencer) Reboot(r runtime.Runtime) []runtime.Phase {
|
||||
return m.phases[runtime.SequenceReboot]
|
||||
}
|
||||
|
||||
func (m *mockSequencer) Reset(r runtime.Runtime, opts runtime.ResetOptions) []runtime.Phase {
|
||||
return m.phases[runtime.SequenceReset]
|
||||
}
|
||||
|
||||
func (m *mockSequencer) Shutdown(r runtime.Runtime, req *machine.ShutdownRequest) []runtime.Phase {
|
||||
return m.phases[runtime.SequenceShutdown]
|
||||
}
|
||||
|
||||
func (m *mockSequencer) StageUpgrade(r runtime.Runtime, req *machine.UpgradeRequest) []runtime.Phase {
|
||||
return m.phases[runtime.SequenceStageUpgrade]
|
||||
}
|
||||
|
||||
func (m *mockSequencer) Upgrade(r runtime.Runtime, req *machine.UpgradeRequest) []runtime.Phase {
|
||||
return m.phases[runtime.SequenceUpgrade]
|
||||
}
|
||||
|
||||
func (m *mockSequencer) trackCall(name string, doneCh chan struct{}) func(runtime.Sequence, interface{}) (runtime.TaskExecutionFunc, string) {
|
||||
return func(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
|
||||
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error {
|
||||
if doneCh != nil {
|
||||
defer func() {
|
||||
select {
|
||||
case doneCh <- struct{}{}:
|
||||
case <-time.After(time.Second):
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
m.callsMu.Lock()
|
||||
defer m.callsMu.Unlock()
|
||||
|
||||
m.calls[seq]++
|
||||
|
||||
return nil
|
||||
}, name
|
||||
}
|
||||
}
|
||||
|
||||
func TestRun(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
from runtime.Sequence
|
||||
to runtime.Sequence
|
||||
expectError error
|
||||
dataFrom interface{}
|
||||
dataTo interface{}
|
||||
}{
|
||||
{
|
||||
name: "reboot should take over boot",
|
||||
from: runtime.SequenceBoot,
|
||||
to: runtime.SequenceReboot,
|
||||
expectError: context.Canceled,
|
||||
},
|
||||
{
|
||||
name: "reset should take over boot",
|
||||
from: runtime.SequenceBoot,
|
||||
to: runtime.SequenceReset,
|
||||
expectError: context.Canceled,
|
||||
dataTo: &v1alpha1server.ResetOptions{},
|
||||
},
|
||||
{
|
||||
name: "upgrade should take over boot",
|
||||
from: runtime.SequenceBoot,
|
||||
to: runtime.SequenceUpgrade,
|
||||
expectError: context.Canceled,
|
||||
dataTo: &machine.UpgradeRequest{},
|
||||
},
|
||||
{
|
||||
name: "boot should not take over reboot",
|
||||
from: runtime.SequenceReboot,
|
||||
to: runtime.SequenceBoot,
|
||||
expectError: runtime.ErrLocked,
|
||||
},
|
||||
{
|
||||
name: "reset should not take over upgrade",
|
||||
from: runtime.SequenceUpgrade,
|
||||
to: runtime.SequenceReset,
|
||||
expectError: runtime.ErrLocked,
|
||||
dataFrom: &machine.UpgradeRequest{},
|
||||
dataTo: &v1alpha1server.ResetOptions{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
e := NewEvents(1000, 10)
|
||||
|
||||
t.Setenv("PLATFORM", "container")
|
||||
s, err := NewState()
|
||||
require.NoError(err)
|
||||
|
||||
sequencer := &mockSequencer{
|
||||
calls: map[runtime.Sequence]int{},
|
||||
phases: map[runtime.Sequence]PhaseList{},
|
||||
}
|
||||
|
||||
var (
|
||||
eg errgroup.Group
|
||||
doneCh = make(chan struct{})
|
||||
)
|
||||
|
||||
sequencer.phases[tt.from] = sequencer.phases[tt.from].
|
||||
Append(tt.from.String(), sequencer.trackCall(tt.from.String(), doneCh)).
|
||||
Append("wait", wait)
|
||||
|
||||
sequencer.phases[tt.to] = sequencer.phases[tt.to].Append(tt.to.String(), sequencer.trackCall(tt.to.String(), nil))
|
||||
|
||||
l := logging.NewCircularBufferLoggingManager(log.New(os.Stdout, "machined fallback logger: ", log.Flags()))
|
||||
|
||||
r := NewRuntime(nil, s, e, l)
|
||||
|
||||
controller := Controller{
|
||||
r: r,
|
||||
s: sequencer,
|
||||
priorityLock: NewPriorityLock[runtime.Sequence](),
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*200)
|
||||
defer cancel()
|
||||
|
||||
eg.Go(func() error {
|
||||
return controller.Run(ctx, tt.from, tt.dataFrom)
|
||||
})
|
||||
|
||||
eg.Go(func() error {
|
||||
select {
|
||||
case <-doneCh:
|
||||
case <-time.After(time.Second):
|
||||
return fmt.Errorf("timed out waiting for %s sequence to start", tt.from.String())
|
||||
}
|
||||
|
||||
return controller.Run(ctx, tt.to, tt.dataTo)
|
||||
})
|
||||
|
||||
require.ErrorIs(eg.Wait(), tt.expectError)
|
||||
|
||||
if errors.Is(tt.expectError, runtime.ErrLocked) {
|
||||
return
|
||||
}
|
||||
|
||||
sequencer.callsMu.Lock()
|
||||
defer sequencer.callsMu.Unlock()
|
||||
|
||||
require.Equal(1, sequencer.calls[tt.to])
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func wait(seq runtime.Sequence, data interface{}) (runtime.TaskExecutionFunc, string) {
|
||||
return func(ctx context.Context, logger *log.Logger, r runtime.Runtime) error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-time.After(time.Second * 1):
|
||||
}
|
||||
|
||||
return nil
|
||||
}, "wait"
|
||||
}
|
@ -0,0 +1,119 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package v1alpha1
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
|
||||
)
|
||||
|
||||
// Priority describes the running priority of a process.
|
||||
//
|
||||
// If CanTakeOver returns true, current process with "lower" priority
|
||||
// will be canceled and "higher" priority process will be run.
|
||||
type Priority[T any] interface {
|
||||
comparable
|
||||
CanTakeOver(another T) bool
|
||||
}
|
||||
|
||||
// PriorityLock is a lock that makes sure that only a single process can run at a time.
|
||||
//
|
||||
// If a process with "higher" priority tries to acquire the lock, previous process is stopped
|
||||
// and new process with "higher" priority is run.
|
||||
type PriorityLock[T Priority[T]] struct {
|
||||
runningCh chan struct{}
|
||||
takeoverCh chan struct{}
|
||||
|
||||
mu sync.Mutex
|
||||
runningPriority T
|
||||
cancelCtx context.CancelFunc
|
||||
}
|
||||
|
||||
// NewPriorityLock returns a new PriorityLock.
|
||||
func NewPriorityLock[T Priority[T]]() *PriorityLock[T] {
|
||||
runningCh := make(chan struct{}, 1)
|
||||
runningCh <- struct{}{}
|
||||
|
||||
return &PriorityLock[T]{
|
||||
runningCh: runningCh,
|
||||
takeoverCh: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (lock *PriorityLock[T]) getRunningPriority() T {
|
||||
lock.mu.Lock()
|
||||
defer lock.mu.Unlock()
|
||||
|
||||
return lock.runningPriority
|
||||
}
|
||||
|
||||
func (lock *PriorityLock[T]) setRunningPriority(seq T, cancelCtx context.CancelFunc) {
|
||||
lock.mu.Lock()
|
||||
defer lock.mu.Unlock()
|
||||
|
||||
var zeroSeq T
|
||||
|
||||
if seq == zeroSeq && lock.cancelCtx != nil {
|
||||
lock.cancelCtx()
|
||||
}
|
||||
|
||||
lock.runningPriority, lock.cancelCtx = seq, cancelCtx
|
||||
}
|
||||
|
||||
// Lock acquires the lock according the priority rules and returns a context that should be used within the process.
|
||||
//
|
||||
// Process should terminate as soon as the context is canceled.
|
||||
// Argument seq defines the priority of the process.
|
||||
// Argument takeOverTimeout defines the maximum time to wait for the low-priority process to terminate.
|
||||
func (lock *PriorityLock[T]) Lock(ctx context.Context, takeOverTimeout time.Duration, seq T, options ...runtime.LockOption) (context.Context, error) {
|
||||
opts := runtime.DefaultControllerOptions()
|
||||
for _, o := range options {
|
||||
if err := o(&opts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
takeOverTimer := time.NewTimer(takeOverTimeout)
|
||||
defer takeOverTimer.Stop()
|
||||
|
||||
select {
|
||||
case lock.takeoverCh <- struct{}{}:
|
||||
case <-takeOverTimer.C:
|
||||
return nil, fmt.Errorf("failed to acquire lock: timeout")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
<-lock.takeoverCh
|
||||
}()
|
||||
|
||||
if !seq.CanTakeOver(lock.getRunningPriority()) && !opts.Takeover {
|
||||
return nil, runtime.ErrLocked
|
||||
}
|
||||
|
||||
if lock.cancelCtx != nil {
|
||||
lock.cancelCtx()
|
||||
}
|
||||
|
||||
select {
|
||||
case <-lock.runningCh:
|
||||
seqCtx, seqCancel := context.WithCancel(ctx)
|
||||
lock.setRunningPriority(seq, seqCancel)
|
||||
|
||||
return seqCtx, nil
|
||||
case <-takeOverTimer.C:
|
||||
return nil, fmt.Errorf("failed to acquire lock: timeout")
|
||||
}
|
||||
}
|
||||
|
||||
func (lock *PriorityLock[T]) Unlock() {
|
||||
var zeroSeq T
|
||||
|
||||
lock.setRunningPriority(zeroSeq, nil)
|
||||
lock.runningCh <- struct{}{}
|
||||
}
|
@ -0,0 +1,156 @@
|
||||
// This Source Code Form is subject to the terms of the Mozilla Public
|
||||
// License, v. 2.0. If a copy of the MPL was not distributed with this
|
||||
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
||||
|
||||
package v1alpha1_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
|
||||
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime"
|
||||
"github.com/talos-systems/talos/internal/app/machined/pkg/runtime/v1alpha1"
|
||||
)
|
||||
|
||||
type testSequenceNumber int
|
||||
|
||||
func (candidate testSequenceNumber) CanTakeOver(running testSequenceNumber) bool {
|
||||
return candidate > running
|
||||
}
|
||||
|
||||
func TestPriorityLock(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
lock := v1alpha1.NewPriorityLock[testSequenceNumber]()
|
||||
ctx := context.Background()
|
||||
|
||||
ctx1, err := lock.Lock(ctx, time.Second, 2)
|
||||
require.NoError(err)
|
||||
|
||||
select {
|
||||
case <-ctx1.Done():
|
||||
require.FailNow("should not be canceled")
|
||||
default:
|
||||
}
|
||||
|
||||
_, err = lock.Lock(ctx, time.Millisecond, 1)
|
||||
require.Error(err)
|
||||
require.EqualError(err, runtime.ErrLocked.Error())
|
||||
|
||||
errCh := make(chan error)
|
||||
|
||||
go func() {
|
||||
_, err2 := lock.Lock(ctx, time.Second, 3)
|
||||
errCh <- err2
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx1.Done():
|
||||
case <-time.After(time.Second):
|
||||
require.FailNow("should be canceled")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-errCh:
|
||||
require.FailNow("should not be reached")
|
||||
default:
|
||||
}
|
||||
|
||||
lock.Unlock()
|
||||
|
||||
select {
|
||||
case err = <-errCh:
|
||||
require.NoError(err)
|
||||
case <-time.After(time.Second):
|
||||
require.FailNow("should be canceled")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPriorityLockSequential(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
lock := v1alpha1.NewPriorityLock[testSequenceNumber]()
|
||||
ctx := context.Background()
|
||||
|
||||
_, err := lock.Lock(ctx, time.Second, 2)
|
||||
require.NoError(err)
|
||||
|
||||
lock.Unlock()
|
||||
|
||||
_, err = lock.Lock(ctx, time.Second, 1)
|
||||
require.NoError(err)
|
||||
|
||||
lock.Unlock()
|
||||
}
|
||||
|
||||
//nolint:gocyclo
|
||||
func TestPriorityLockConcurrent(t *testing.T) {
|
||||
require := require.New(t)
|
||||
|
||||
lock := v1alpha1.NewPriorityLock[testSequenceNumber]()
|
||||
|
||||
globalCtx, globalCtxCancel := context.WithCancel(context.Background())
|
||||
defer globalCtxCancel()
|
||||
|
||||
var eg errgroup.Group
|
||||
|
||||
sequenceCh := make(chan testSequenceNumber)
|
||||
|
||||
for seq := testSequenceNumber(1); seq <= 20; seq++ {
|
||||
seq := seq
|
||||
|
||||
eg.Go(func() error {
|
||||
ctx, err := lock.Lock(globalCtx, time.Second, seq)
|
||||
if errors.Is(err, runtime.ErrLocked) {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case sequenceCh <- seq:
|
||||
<-ctx.Done()
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
lock.Unlock()
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
timer := time.NewTimer(5 * time.Second)
|
||||
defer timer.Stop()
|
||||
|
||||
var prevSeq testSequenceNumber
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-timer.C:
|
||||
require.FailNow("timeout")
|
||||
case seq := <-sequenceCh:
|
||||
t.Logf("sequence running: %d", seq)
|
||||
|
||||
if prevSeq >= seq {
|
||||
require.FailNowf("can take over inversion", "sequence %d should be greater than %d", seq, prevSeq)
|
||||
}
|
||||
|
||||
prevSeq = seq
|
||||
}
|
||||
|
||||
if prevSeq == 20 {
|
||||
globalCtxCancel()
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
require.NoError(eg.Wait())
|
||||
}
|
@ -18,6 +18,7 @@ import (
|
||||
|
||||
"github.com/talos-systems/talos/internal/integration/base"
|
||||
"github.com/talos-systems/talos/pkg/machinery/client"
|
||||
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
|
||||
)
|
||||
|
||||
// RebootSuite ...
|
||||
@ -70,6 +71,51 @@ func (suite *RebootSuite) TestRebootNodeByNode() {
|
||||
}
|
||||
}
|
||||
|
||||
// TestRebootMultiple reboots a node, issues consequent reboots
|
||||
// reboot should cancel boot sequence, and cancel another reboot.
|
||||
func (suite *RebootSuite) TestRebootMultiple() {
|
||||
if !suite.Capabilities().SupportsReboot {
|
||||
suite.T().Skip("cluster doesn't support reboots")
|
||||
}
|
||||
|
||||
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
|
||||
nodeCtx := client.WithNodes(suite.ctx, node)
|
||||
|
||||
bootID := suite.ReadBootIDWithRetry(nodeCtx, time.Minute*5)
|
||||
|
||||
// Issue reboot.
|
||||
suite.Require().NoError(base.IgnoreGRPCUnavailable(
|
||||
suite.Client.Reboot(nodeCtx),
|
||||
))
|
||||
|
||||
// Issue reboot once again and wait for node to get a new boot id.
|
||||
suite.Require().NoError(base.IgnoreGRPCUnavailable(
|
||||
suite.Client.Reboot(nodeCtx),
|
||||
))
|
||||
|
||||
suite.AssertBootIDChanged(nodeCtx, bootID, node, time.Minute*5)
|
||||
|
||||
bootID = suite.ReadBootIDWithRetry(nodeCtx, time.Minute*5)
|
||||
|
||||
suite.Require().NoError(retry.Constant(time.Second * 5).Retry(func() error {
|
||||
// Issue reboot while the node is still booting.
|
||||
err := suite.Client.Reboot(nodeCtx)
|
||||
if err != nil {
|
||||
return retry.ExpectedError(err)
|
||||
}
|
||||
|
||||
// Reboot again and wait for cluster to become healthy.
|
||||
suite.Require().NoError(base.IgnoreGRPCUnavailable(
|
||||
suite.Client.Reboot(nodeCtx),
|
||||
))
|
||||
|
||||
return nil
|
||||
}))
|
||||
|
||||
suite.AssertBootIDChanged(nodeCtx, bootID, node, time.Minute*5)
|
||||
suite.WaitForBootDone(suite.ctx)
|
||||
}
|
||||
|
||||
// TestRebootAllNodes reboots all cluster nodes at the same time.
|
||||
//
|
||||
//nolint:gocyclo
|
||||
|
@ -13,8 +13,11 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/talos-systems/go-retry/retry"
|
||||
|
||||
"github.com/talos-systems/talos/internal/integration/base"
|
||||
machineapi "github.com/talos-systems/talos/pkg/machinery/api/machine"
|
||||
"github.com/talos-systems/talos/pkg/machinery/client"
|
||||
"github.com/talos-systems/talos/pkg/machinery/config/types/v1alpha1/machine"
|
||||
"github.com/talos-systems/talos/pkg/machinery/constants"
|
||||
)
|
||||
@ -243,6 +246,65 @@ func (suite *ResetSuite) TestResetWithSpecState() {
|
||||
suite.Assert().Equal(preReset, postReset, "ephemeral partition was not reset")
|
||||
}
|
||||
|
||||
// TestResetDuringBoot resets the node multiple times, second reset is done
|
||||
// before boot sequence is complete.
|
||||
//
|
||||
//nolint:dupl
|
||||
func (suite *ResetSuite) TestResetDuringBoot() {
|
||||
if !suite.Capabilities().SupportsReboot {
|
||||
suite.T().Skip("cluster doesn't support reboot (and reset)")
|
||||
}
|
||||
|
||||
node := suite.RandomDiscoveredNodeInternalIP()
|
||||
nodeCtx := client.WithNodes(suite.ctx, node)
|
||||
|
||||
suite.T().Log("Resetting node", node)
|
||||
|
||||
for i := 0; i < 2; i++ {
|
||||
var (
|
||||
bootID string
|
||||
err error
|
||||
)
|
||||
|
||||
err = retry.Constant(5*time.Minute, retry.WithUnits(time.Millisecond*1000)).Retry(
|
||||
func() error {
|
||||
bootID, err = suite.ReadBootID(nodeCtx)
|
||||
if err != nil {
|
||||
return retry.ExpectedError(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
|
||||
err = retry.Constant(5*time.Minute, retry.WithUnits(time.Millisecond*1000)).Retry(
|
||||
func() error {
|
||||
// force reboot after reset, as this is the only mode we can test
|
||||
return base.IgnoreGRPCUnavailable(
|
||||
suite.Client.ResetGeneric(
|
||||
client.WithNodes(suite.ctx, node), &machineapi.ResetRequest{
|
||||
Reboot: true,
|
||||
Graceful: true,
|
||||
SystemPartitionsToWipe: []*machineapi.ResetPartitionSpec{
|
||||
{
|
||||
Label: constants.EphemeralPartitionLabel,
|
||||
Wipe: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
),
|
||||
)
|
||||
},
|
||||
)
|
||||
|
||||
suite.AssertBootIDChanged(nodeCtx, bootID, node, time.Minute*5)
|
||||
suite.Require().NoError(err)
|
||||
suite.ClearConnectionRefused(suite.ctx, node)
|
||||
}
|
||||
|
||||
suite.WaitForBootDone(suite.ctx)
|
||||
}
|
||||
|
||||
func init() {
|
||||
allSuites = append(allSuites, new(ResetSuite))
|
||||
}
|
||||
|
@ -218,31 +218,85 @@ func (apiSuite *APISuite) ReadBootID(ctx context.Context) (string, error) {
|
||||
return bootID, reader.Close()
|
||||
}
|
||||
|
||||
// ReadBootIDWithRetry reads node boot_id.
|
||||
//
|
||||
// Context provided might have specific node attached for API call.
|
||||
func (apiSuite *APISuite) ReadBootIDWithRetry(ctx context.Context, timeout time.Duration) string {
|
||||
var bootID string
|
||||
|
||||
apiSuite.Require().NoError(retry.Constant(timeout, retry.WithUnits(time.Millisecond*1000)).Retry(
|
||||
func() error {
|
||||
var err error
|
||||
|
||||
bootID, err = apiSuite.ReadBootID(ctx)
|
||||
if err != nil {
|
||||
return retry.ExpectedError(err)
|
||||
}
|
||||
|
||||
if bootID == "" {
|
||||
return retry.ExpectedErrorf("boot id is empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
))
|
||||
|
||||
return bootID
|
||||
}
|
||||
|
||||
// AssertRebooted verifies that node got rebooted as result of running some API call.
|
||||
//
|
||||
// Verification happens via reading boot_id of the node.
|
||||
func (apiSuite *APISuite) AssertRebooted(ctx context.Context, node string, rebootFunc func(nodeCtx context.Context) error, timeout time.Duration) {
|
||||
apiSuite.AssertRebootedNoChecks(ctx, node, rebootFunc, timeout)
|
||||
|
||||
if apiSuite.Cluster != nil {
|
||||
// without cluster state we can't do deep checks, but basic reboot test still works
|
||||
// NB: using `ctx` here to have client talking to init node by default
|
||||
apiSuite.AssertClusterHealthy(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// AssertRebootedNoChecks waits for node to be rebooted without waiting for cluster to become healthy afterwards.
|
||||
func (apiSuite *APISuite) AssertRebootedNoChecks(ctx context.Context, node string, rebootFunc func(nodeCtx context.Context) error, timeout time.Duration) {
|
||||
// timeout for single node Reset
|
||||
ctx, ctxCancel := context.WithTimeout(ctx, timeout)
|
||||
defer ctxCancel()
|
||||
|
||||
nodeCtx := client.WithNodes(ctx, node)
|
||||
|
||||
// read boot_id before Reset
|
||||
bootIDBefore, err := apiSuite.ReadBootID(nodeCtx)
|
||||
var (
|
||||
bootIDBefore string
|
||||
err error
|
||||
)
|
||||
|
||||
err = retry.Constant(time.Minute * 5).Retry(func() error {
|
||||
// read boot_id before reboot
|
||||
bootIDBefore, err = apiSuite.ReadBootID(nodeCtx)
|
||||
|
||||
if err != nil {
|
||||
return retry.ExpectedError(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
apiSuite.Require().NoError(err)
|
||||
|
||||
apiSuite.Assert().NoError(rebootFunc(nodeCtx))
|
||||
|
||||
var bootIDAfter string
|
||||
apiSuite.AssertBootIDChanged(nodeCtx, bootIDBefore, node, timeout)
|
||||
}
|
||||
|
||||
// AssertBootIDChanged waits until node boot id changes.
|
||||
func (apiSuite *APISuite) AssertBootIDChanged(nodeCtx context.Context, bootIDBefore, node string, timeout time.Duration) {
|
||||
apiSuite.Assert().NotEmpty(bootIDBefore)
|
||||
|
||||
apiSuite.Require().NoError(retry.Constant(timeout).Retry(func() error {
|
||||
requestCtx, requestCtxCancel := context.WithTimeout(nodeCtx, 5*time.Second)
|
||||
requestCtx, requestCtxCancel := context.WithTimeout(nodeCtx, time.Second)
|
||||
defer requestCtxCancel()
|
||||
|
||||
bootIDAfter, err = apiSuite.ReadBootID(requestCtx)
|
||||
|
||||
bootIDAfter, err := apiSuite.ReadBootID(requestCtx)
|
||||
if err != nil {
|
||||
// API might be unresponsive during reboot
|
||||
return retry.ExpectedError(err)
|
||||
@ -255,44 +309,54 @@ func (apiSuite *APISuite) AssertRebooted(ctx context.Context, node string, reboo
|
||||
|
||||
return nil
|
||||
}))
|
||||
|
||||
if apiSuite.Cluster != nil {
|
||||
// without cluster state we can't do deep checks, but basic reboot test still works
|
||||
// NB: using `ctx` here to have client talking to init node by default
|
||||
apiSuite.AssertClusterHealthy(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
// WaitForBootDone waits for boot phase done event.
|
||||
func (apiSuite *APISuite) WaitForBootDone(ctx context.Context) {
|
||||
nodes := apiSuite.DiscoverNodeInternalIPs(ctx)
|
||||
apiSuite.WaitForSequenceDone(
|
||||
ctx,
|
||||
runtime.SequenceBoot,
|
||||
apiSuite.DiscoverNodeInternalIPs(ctx)...,
|
||||
)
|
||||
}
|
||||
|
||||
nodesNotDoneBooting := make(map[string]struct{})
|
||||
// WaitForSequenceDone waits for sequence done event.
|
||||
func (apiSuite *APISuite) WaitForSequenceDone(ctx context.Context, sequence runtime.Sequence, nodes ...string) {
|
||||
nodesNotDone := make(map[string]struct{})
|
||||
|
||||
for _, node := range nodes {
|
||||
nodesNotDoneBooting[node] = struct{}{}
|
||||
nodesNotDone[node] = struct{}{}
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(client.WithNodes(ctx, nodes...), 3*time.Minute)
|
||||
defer cancel()
|
||||
|
||||
apiSuite.Require().NoError(apiSuite.Client.EventsWatch(ctx, func(ch <-chan client.Event) {
|
||||
apiSuite.Require().NoError(retry.Constant(5*time.Minute, retry.WithUnits(time.Second*10)).Retry(func() error {
|
||||
eventsCtx, cancel := context.WithTimeout(client.WithNodes(ctx, nodes...), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
for event := range ch {
|
||||
if msg, ok := event.Payload.(*machineapi.SequenceEvent); ok {
|
||||
if msg.GetAction() == machineapi.SequenceEvent_STOP && msg.GetSequence() == runtime.SequenceBoot.String() {
|
||||
delete(nodesNotDoneBooting, event.Node)
|
||||
err := apiSuite.Client.EventsWatch(eventsCtx, func(ch <-chan client.Event) {
|
||||
defer cancel()
|
||||
|
||||
if len(nodesNotDoneBooting) == 0 {
|
||||
return
|
||||
for event := range ch {
|
||||
if msg, ok := event.Payload.(*machineapi.SequenceEvent); ok {
|
||||
if msg.GetAction() == machineapi.SequenceEvent_STOP && msg.GetSequence() == sequence.String() {
|
||||
delete(nodesNotDone, event.Node)
|
||||
|
||||
if len(nodesNotDone) == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}, client.WithTailEvents(-1))
|
||||
if err != nil {
|
||||
return retry.ExpectedError(err)
|
||||
}
|
||||
}, client.WithTailEvents(-1)))
|
||||
|
||||
apiSuite.Require().Empty(nodesNotDoneBooting)
|
||||
if len(nodesNotDone) > 0 {
|
||||
return retry.ExpectedErrorf("nodes %#v sequence %s is not completed", nodesNotDone, sequence.String())
|
||||
}
|
||||
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
// ClearConnectionRefused clears cached connection refused errors which might be left after node reboot.
|
||||
|
@ -27,8 +27,9 @@ const (
|
||||
type Code int32
|
||||
|
||||
const (
|
||||
Code_FATAL Code = 0
|
||||
Code_LOCKED Code = 1
|
||||
Code_FATAL Code = 0
|
||||
Code_LOCKED Code = 1
|
||||
Code_CANCELED Code = 2
|
||||
)
|
||||
|
||||
// Enum value maps for Code.
|
||||
@ -36,10 +37,12 @@ var (
|
||||
Code_name = map[int32]string{
|
||||
0: "FATAL",
|
||||
1: "LOCKED",
|
||||
2: "CANCELED",
|
||||
}
|
||||
Code_value = map[string]int32{
|
||||
"FATAL": 0,
|
||||
"LOCKED": 1,
|
||||
"FATAL": 0,
|
||||
"LOCKED": 1,
|
||||
"CANCELED": 2,
|
||||
}
|
||||
)
|
||||
|
||||
@ -581,51 +584,52 @@ var file_common_common_proto_rawDesc = []byte{
|
||||
0x3a, 0x0a, 0x0d, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65,
|
||||
0x12, 0x29, 0x0a, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03,
|
||||
0x28, 0x0b, 0x32, 0x0d, 0x2e, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x2e, 0x45, 0x6d, 0x70, 0x74,
|
||||
0x79, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2a, 0x1d, 0x0a, 0x04, 0x43,
|
||||
0x79, 0x52, 0x08, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x2a, 0x2b, 0x0a, 0x04, 0x43,
|
||||
0x6f, 0x64, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x46, 0x41, 0x54, 0x41, 0x4c, 0x10, 0x00, 0x12, 0x0a,
|
||||
0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b, 0x45, 0x44, 0x10, 0x01, 0x2a, 0x2a, 0x0a, 0x0f, 0x43, 0x6f,
|
||||
0x6e, 0x74, 0x61, 0x69, 0x6e, 0x65, 0x72, 0x44, 0x72, 0x69, 0x76, 0x65, 0x72, 0x12, 0x0e, 0x0a,
|
||||
0x0a, 0x43, 0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x44, 0x10, 0x00, 0x12, 0x07, 0x0a,
|
||||
0x03, 0x43, 0x52, 0x49, 0x10, 0x01, 0x3a, 0x5d, 0x0a, 0x19, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65,
|
||||
0x5f, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x6d, 0x65, 0x73, 0x73,
|
||||
0x61, 0x67, 0x65, 0x12, 0x1f, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
|
||||
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4f, 0x70, 0x74,
|
||||
0x69, 0x6f, 0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x17, 0x72, 0x65,
|
||||
0x0a, 0x06, 0x4c, 0x4f, 0x43, 0x4b, 0x45, 0x44, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x43, 0x41,
|
||||
0x4e, 0x43, 0x45, 0x4c, 0x45, 0x44, 0x10, 0x02, 0x2a, 0x2a, 0x0a, 0x0f, 0x43, 0x6f, 0x6e, 0x74,
|
||||
0x61, 0x69, 0x6e, 0x65, 0x72, 0x44, 0x72, 0x69, 0x76, 0x65, 0x72, 0x12, 0x0e, 0x0a, 0x0a, 0x43,
|
||||
0x4f, 0x4e, 0x54, 0x41, 0x49, 0x4e, 0x45, 0x52, 0x44, 0x10, 0x00, 0x12, 0x07, 0x0a, 0x03, 0x43,
|
||||
0x52, 0x49, 0x10, 0x01, 0x3a, 0x5d, 0x0a, 0x19, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64,
|
||||
0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67,
|
||||
0x65, 0x12, 0x1f, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x62, 0x75, 0x66, 0x2e, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x17, 0x72, 0x65, 0x6d, 0x6f,
|
||||
0x76, 0x65, 0x44, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73,
|
||||
0x61, 0x67, 0x65, 0x3a, 0x57, 0x0a, 0x17, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64, 0x65,
|
||||
0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x12, 0x1d,
|
||||
0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
|
||||
0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xbd, 0xd7,
|
||||
0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x15, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x65, 0x70,
|
||||
0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x3a, 0x54, 0x0a, 0x16,
|
||||
0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65,
|
||||
0x64, 0x5f, 0x65, 0x6e, 0x75, 0x6d, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
|
||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6e, 0x75, 0x6d, 0x4f, 0x70, 0x74,
|
||||
0x69, 0x6f, 0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x14, 0x72, 0x65,
|
||||
0x6d, 0x6f, 0x76, 0x65, 0x44, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x45, 0x6e,
|
||||
0x75, 0x6d, 0x3a, 0x64, 0x0a, 0x1c, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64, 0x65, 0x70,
|
||||
0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x65, 0x6e, 0x75, 0x6d, 0x5f, 0x76, 0x61, 0x6c,
|
||||
0x75, 0x65, 0x12, 0x21, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
|
||||
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6e, 0x75, 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x4f, 0x70,
|
||||
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x19, 0x72,
|
||||
0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x45,
|
||||
0x6e, 0x75, 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x5a, 0x0a, 0x18, 0x72, 0x65, 0x6d, 0x6f,
|
||||
0x76, 0x65, 0x5f, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x6d, 0x65,
|
||||
0x74, 0x68, 0x6f, 0x64, 0x12, 0x1e, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x4f, 0x70, 0x74,
|
||||
0x69, 0x6f, 0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x16, 0x72, 0x65,
|
||||
0x6d, 0x6f, 0x76, 0x65, 0x44, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x4d, 0x65,
|
||||
0x73, 0x73, 0x61, 0x67, 0x65, 0x3a, 0x57, 0x0a, 0x17, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f,
|
||||
0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64,
|
||||
0x12, 0x1d, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
|
||||
0x75, 0x66, 0x2e, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18,
|
||||
0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x15, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44,
|
||||
0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x3a, 0x54,
|
||||
0x0a, 0x16, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61,
|
||||
0x74, 0x65, 0x64, 0x5f, 0x65, 0x6e, 0x75, 0x6d, 0x12, 0x1c, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c,
|
||||
0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6e, 0x75, 0x6d, 0x4f,
|
||||
0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x14,
|
||||
0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64,
|
||||
0x45, 0x6e, 0x75, 0x6d, 0x3a, 0x64, 0x0a, 0x1c, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64,
|
||||
0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x65, 0x6e, 0x75, 0x6d, 0x5f, 0x76,
|
||||
0x61, 0x6c, 0x75, 0x65, 0x12, 0x21, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6e, 0x75, 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65,
|
||||
0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x19, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65,
|
||||
0x64, 0x45, 0x6e, 0x75, 0x6d, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x5a, 0x0a, 0x18, 0x72, 0x65,
|
||||
0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f,
|
||||
0x6d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x12, 0x1e, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e,
|
||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x4f,
|
||||
0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x16,
|
||||
0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x44, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64,
|
||||
0x4d, 0x65, 0x74, 0x68, 0x6f, 0x64, 0x3a, 0x5d, 0x0a, 0x19, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65,
|
||||
0x5f, 0x64, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x73, 0x65, 0x72, 0x76,
|
||||
0x69, 0x63, 0x65, 0x12, 0x1f, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f,
|
||||
0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4f, 0x70, 0x74,
|
||||
0x69, 0x6f, 0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x17, 0x72, 0x65,
|
||||
0x6d, 0x6f, 0x76, 0x65, 0x44, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x65,
|
||||
0x72, 0x76, 0x69, 0x63, 0x65, 0x42, 0x39, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e,
|
||||
0x63, 0x6f, 0x6d, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2d, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d,
|
||||
0x73, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x6d, 0x61, 0x63, 0x68,
|
||||
0x69, 0x6e, 0x65, 0x72, 0x79, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e,
|
||||
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x74, 0x68, 0x6f, 0x64, 0x3a, 0x5d, 0x0a, 0x19, 0x72, 0x65, 0x6d, 0x6f, 0x76, 0x65, 0x5f, 0x64,
|
||||
0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63,
|
||||
0x65, 0x12, 0x1f, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x62, 0x75, 0x66, 0x2e, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x73, 0x18, 0xbd, 0xd7, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x17, 0x72, 0x65, 0x6d, 0x6f,
|
||||
0x76, 0x65, 0x44, 0x65, 0x70, 0x72, 0x65, 0x63, 0x61, 0x74, 0x65, 0x64, 0x53, 0x65, 0x72, 0x76,
|
||||
0x69, 0x63, 0x65, 0x42, 0x39, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
|
||||
0x6d, 0x2f, 0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2d, 0x73, 0x79, 0x73, 0x74, 0x65, 0x6d, 0x73, 0x2f,
|
||||
0x74, 0x61, 0x6c, 0x6f, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x6d, 0x61, 0x63, 0x68, 0x69, 0x6e,
|
||||
0x65, 0x72, 0x79, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x62, 0x06,
|
||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -327,6 +327,7 @@ Common metadata message nested in all reply message types
|
||||
| ---- | ------ | ----------- |
|
||||
| FATAL | 0 | |
|
||||
| LOCKED | 1 | |
|
||||
| CANCELED | 2 | |
|
||||
|
||||
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user