refactor: extract 'restart' piece of the runners into wrapper runner (#559)

This changes `runner.Runner` API to support more methods to allow for
containerd runner to create container object only once, and start/stop
tasks to implement restarts.

New API: `Open()` (initialize), `Run()` (run once until exits), `Stop()`
(stop running instance), `Close()` (free resource, no longer available
for new `Run()`).

So the sequence might be: `Open`, `Run`, `Stop`, `Run`, `Stop`, `Close`.

Process and containerd runners were updated for the new API, and
'restart' part was removed, now both runners only run the task once.

Restart piece was implemented in an abstract way for any wrapped
`runner.Runner` in the `runner/restart` package. Restart supports three
restart policies: `Once`, `UntilSuccess` and `Forever`.

Service API was changed slightly to return the `runner.Runner`
interface, and `system.Services` now handles running the service.

For all the services, code was adjusted to either return runner (run
once), or was wrapped with `restart` runner to provide restart policy.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2019-04-23 01:25:26 +03:00 committed by GitHub
parent 544c9259fc
commit a858cb4986
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 567 additions and 214 deletions

View File

@ -6,6 +6,7 @@ package containerd
import (
"context"
"fmt"
"log"
"path/filepath"
"syscall"
@ -31,10 +32,11 @@ type containerdRunner struct {
stop chan struct{}
stopped chan struct{}
}
// errStopped is used internally to signal that task was stopped
var errStopped = errors.New("stopped")
client *containerd.Client
ctx context.Context
container containerd.Container
}
// NewRunner creates runner.Runner that runs a container in containerd
func NewRunner(data *userdata.UserData, args *runner.Args, setters ...runner.Option) runner.Runner {
@ -53,10 +55,8 @@ func NewRunner(data *userdata.UserData, args *runner.Args, setters ...runner.Opt
return r
}
// Run implements the Runner interface.
// nolint: gocyclo
func (c *containerdRunner) Run() error {
defer close(c.stopped)
// Open implements the Runner interface.
func (c *containerdRunner) Open() error {
// Wait for the containerd socket.
_, err := conditions.WaitForFileToExist(constants.ContainerdAddress)()
@ -66,15 +66,13 @@ func (c *containerdRunner) Run() error {
// Create the containerd client.
ctx := namespaces.WithNamespace(context.Background(), c.opts.Namespace)
client, err := containerd.New(constants.ContainerdAddress)
c.ctx = namespaces.WithNamespace(context.Background(), c.opts.Namespace)
c.client, err = containerd.New(constants.ContainerdAddress)
if err != nil {
return err
}
// nolint: errcheck
defer client.Close()
image, err := client.GetImage(ctx, c.opts.ContainerImage)
image, err := c.client.GetImage(c.ctx, c.opts.ContainerImage)
if err != nil {
return err
}
@ -83,59 +81,50 @@ func (c *containerdRunner) Run() error {
specOpts := c.newOCISpecOpts(image)
containerOpts := c.newContainerOpts(image, specOpts)
container, err := client.NewContainer(
ctx,
c.container, err = c.client.NewContainer(
c.ctx,
c.args.ID,
containerOpts...,
)
if err != nil {
return errors.Wrapf(err, "failed to create container %q", c.args.ID)
}
defer container.Delete(ctx, containerd.WithSnapshotCleanup) // nolint: errcheck
// Manage task lifecycle
switch c.opts.Type {
case runner.Once:
err = c.runOnce(ctx, container)
if err == errStopped {
err = nil
}
return err
case runner.Forever:
for {
err = c.runOnce(ctx, container)
if err == errStopped {
return nil
}
if err != nil {
log.Printf("error running %v, going to restart forever: %s", c.args.ID, err)
}
select {
case <-c.stop:
return nil
case <-time.After(c.opts.RestartInterval):
}
}
default:
panic("unsupported runner type")
}
return nil
}
func (c *containerdRunner) runOnce(ctx context.Context, container containerd.Container) error {
// Close implements runner.Runner interface
func (c *containerdRunner) Close() error {
if c.container != nil {
err := c.container.Delete(c.ctx, containerd.WithSnapshotCleanup)
if err != nil {
return err
}
}
if c.client == nil {
return nil
}
return c.client.Close()
}
// Run implements runner.Runner interface
func (c *containerdRunner) Run() error {
defer close(c.stopped)
// Create the task and start it.
task, err := container.NewTask(ctx, cio.LogFile(c.logPath()))
task, err := c.container.NewTask(c.ctx, cio.LogFile(c.logPath()))
if err != nil {
return errors.Wrapf(err, "failed to create task: %q", c.args.ID)
}
defer task.Delete(ctx) // nolint: errcheck
defer task.Delete(c.ctx) // nolint: errcheck
if err = task.Start(ctx); err != nil {
if err = task.Start(c.ctx); err != nil {
return errors.Wrapf(err, "failed to start task: %q", c.args.ID)
}
statusC, err := task.Wait(ctx)
statusC, err := task.Wait(c.ctx)
if err != nil {
return errors.Wrapf(err, "failed waiting for task: %q", c.args.ID)
}
@ -152,23 +141,23 @@ func (c *containerdRunner) runOnce(ctx context.Context, container containerd.Con
log.Printf("sending SIGTERM to %v", c.args.ID)
// nolint: errcheck
_ = task.Kill(ctx, syscall.SIGTERM, containerd.WithKillAll)
_ = task.Kill(c.ctx, syscall.SIGTERM, containerd.WithKillAll)
}
select {
case <-statusC:
// stopped process exited
return errStopped
return nil
case <-time.After(c.opts.GracefulShutdownTimeout):
// kill the process
log.Printf("sending SIGKILL to %v", c.args.ID)
// nolint: errcheck
_ = task.Kill(ctx, syscall.SIGKILL, containerd.WithKillAll)
_ = task.Kill(c.ctx, syscall.SIGKILL, containerd.WithKillAll)
}
<-statusC
return errStopped
return nil
}
// Stop implements runner.Runner interface
@ -177,6 +166,9 @@ func (c *containerdRunner) Stop() error {
<-c.stopped
c.stop = make(chan struct{})
c.stopped = make(chan struct{})
return nil
}
@ -210,3 +202,7 @@ func (c *containerdRunner) newOCISpecOpts(image oci.Image) []oci.SpecOpts {
func (c *containerdRunner) logPath() string {
return filepath.Join(c.opts.LogPath, c.args.ID+".log")
}
func (c *containerdRunner) String() string {
return fmt.Sprintf("Containerd(%v)", c.args.ID)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
containerdrunner "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -57,13 +58,14 @@ func (suite *ContainerdSuite) SetupSuite() {
suite.containerdRunner = process.NewRunner(
&userdata.UserData{},
args,
runner.WithType(runner.Once),
runner.WithLogPath(suite.tmpDir),
runner.WithEnv([]string{"PATH=/rootfs/bin:" + constants.PATH}),
)
suite.Require().NoError(suite.containerdRunner.Open())
suite.containerdWg.Add(1)
go func() {
defer suite.containerdWg.Done()
defer func() { suite.Require().NoError(suite.containerdRunner.Close()) }()
suite.Require().NoError(suite.containerdRunner.Run())
}()
@ -90,30 +92,35 @@ func (suite *ContainerdSuite) TestRunSuccess() {
ID: "test",
ProcessArgs: []string{"/bin/sh", "-c", "exit 0"},
},
runner.WithType(runner.Once),
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
)
suite.Require().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
suite.Assert().NoError(r.Run())
// calling stop when Run has finished is no-op
suite.Assert().NoError(r.Stop())
}
func (suite *ContainerdSuite) TestRunTwice() {
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "runtwice",
ProcessArgs: []string{"/bin/sh", "-c", "exit 0"},
},
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
)
suite.Require().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
// running same container twice should be fine
// (checks that containerd state is cleaned up properly)
for i := 0; i < 2; i++ {
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "runtwice",
ProcessArgs: []string{"/bin/sh", "-c", "exit 0"},
},
runner.WithType(runner.Once),
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
)
suite.Assert().NoError(r.Run())
// calling stop when Run has finished is no-op
@ -126,12 +133,14 @@ func (suite *ContainerdSuite) TestRunLogs() {
ID: "logtest",
ProcessArgs: []string{"/bin/sh", "-c", "echo -n \"Test 1\nTest 2\n\""},
},
runner.WithType(runner.Once),
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
)
suite.Require().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
suite.Assert().NoError(r.Run())
logFile, err := os.Open(filepath.Join(suite.tmpDir, "logtest.log"))
@ -154,13 +163,11 @@ func (suite *ContainerdSuite) TestStopFailingAndRestarting() {
// nolint: errcheck
_ = os.Remove(testFile)
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
r := restart.New(containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "endless",
ProcessArgs: []string{"/bin/sh", "-c", "test -f " + testFile + " && echo ok || (echo fail; false)"},
},
runner.WithType(runner.Forever),
runner.WithLogPath(suite.tmpDir),
runner.WithRestartInterval(5*time.Millisecond),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
runner.WithOCISpecOpts(
@ -168,8 +175,14 @@ func (suite *ContainerdSuite) TestStopFailingAndRestarting() {
{Type: "bind", Destination: testDir, Source: testDir, Options: []string{"bind", "ro"}},
}),
),
),
restart.WithType(restart.Forever),
restart.WithRestartInterval(5*time.Millisecond),
)
suite.Require().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
done := make(chan error, 1)
go func() {
@ -217,15 +230,16 @@ func (suite *ContainerdSuite) TestStopFailingAndRestarting() {
func (suite *ContainerdSuite) TestStopSigKill() {
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "nokill",
ProcessArgs: []string{"/bin/sh", "-c", "trap -- '' SIGTERM; while true; do sleep 1; done"},
ProcessArgs: []string{"/bin/sh", "-c", "trap -- '' SIGTERM; while :; do :; done"},
},
runner.WithType(runner.Forever),
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
runner.WithRestartInterval(5*time.Millisecond),
runner.WithGracefulShutdownTimeout(10*time.Millisecond))
suite.Require().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
done := make(chan error, 1)
go func() {

View File

@ -30,9 +30,6 @@ type processRunner struct {
stopped chan struct{}
}
// errStopped is used internally to signal that task was stopped
var errStopped = errors.New("stopped")
// NewRunner creates runner.Runner that runs a process on the host
func NewRunner(data *userdata.UserData, args *runner.Args, setters ...runner.Option) runner.Runner {
r := &processRunner{
@ -50,20 +47,16 @@ func NewRunner(data *userdata.UserData, args *runner.Args, setters ...runner.Opt
return r
}
// Open implements the Runner interface.
func (p *processRunner) Open() error {
return nil
}
// Run implements the Runner interface.
func (p *processRunner) Run() error {
defer close(p.stopped)
switch p.opts.Type {
case runner.Forever:
p.waitAndRestart()
case runner.Once:
p.waitForSuccess()
default:
panic("unexpected runner type")
}
return nil
return p.run()
}
// Stop implements the Runner interface
@ -72,6 +65,14 @@ func (p *processRunner) Stop() error {
<-p.stopped
p.stop = make(chan struct{})
p.stopped = make(chan struct{})
return nil
}
// Close implements the Runner interface.
func (p *processRunner) Close() error {
return nil
}
@ -122,6 +123,7 @@ func (p *processRunner) run() error {
return err
case <-p.stop:
// graceful stop the service
log.Printf("sending SIGTERM to %s", p)
// nolint: errcheck
_ = cmd.Process.Signal(syscall.SIGTERM)
@ -130,50 +132,20 @@ func (p *processRunner) run() error {
select {
case <-waitCh:
// stopped process exited
return errStopped
return nil
case <-time.After(p.opts.GracefulShutdownTimeout):
// kill the process
log.Printf("sending SIGKILL to %s", p)
// nolint: errcheck
_ = cmd.Process.Kill()
_ = cmd.Process.Signal(syscall.SIGKILL)
}
// wait for process to terminate
<-waitCh
return errStopped
return nil
}
func (p *processRunner) waitAndRestart() {
for {
err := p.run()
if err == errStopped {
return
}
if err != nil {
log.Printf("error running %v, going to restart forever: %s", p.args.ProcessArgs, err)
}
select {
case <-p.stop:
return
case <-time.After(p.opts.RestartInterval):
}
}
}
func (p *processRunner) waitForSuccess() {
for {
err := p.run()
if err == errStopped || err == nil {
break
}
log.Printf("error running %v, going to restart until it succeeds: %s", p.args.ProcessArgs, err)
select {
case <-p.stop:
return
case <-time.After(p.opts.RestartInterval):
}
}
func (p *processRunner) String() string {
return fmt.Sprintf("Process(%v)", p.args.ProcessArgs)
}

View File

@ -15,6 +15,7 @@ import (
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -39,7 +40,10 @@ func (suite *ProcessSuite) TestRunSuccess() {
r := process.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "test",
ProcessArgs: []string{"/bin/bash", "-c", "exit 0"},
}, runner.WithType(runner.Once), runner.WithLogPath(suite.tmpDir))
}, runner.WithLogPath(suite.tmpDir))
suite.Assert().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
suite.Assert().NoError(r.Run())
// calling stop when Run has finished is no-op
@ -50,7 +54,10 @@ func (suite *ProcessSuite) TestRunLogs() {
r := process.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "logtest",
ProcessArgs: []string{"/bin/bash", "-c", "echo -n \"Test 1\nTest 2\n\""},
}, runner.WithType(runner.Once), runner.WithLogPath(suite.tmpDir))
}, runner.WithLogPath(suite.tmpDir))
suite.Assert().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
suite.Assert().NoError(r.Run())
@ -71,10 +78,13 @@ func (suite *ProcessSuite) TestRunRestartFailed() {
// nolint: errcheck
_ = os.Remove(testFile)
r := process.NewRunner(&userdata.UserData{}, &runner.Args{
r := restart.New(process.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "restarter",
ProcessArgs: []string{"/bin/bash", "-c", "echo \"ran\"; test -f " + testFile},
}, runner.WithType(runner.Once), runner.WithLogPath(suite.tmpDir), runner.WithRestartInterval(time.Millisecond))
}, runner.WithLogPath(suite.tmpDir)), restart.WithType(restart.UntilSuccess), restart.WithRestartInterval(time.Millisecond))
suite.Assert().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
var wg sync.WaitGroup
@ -108,10 +118,13 @@ func (suite *ProcessSuite) TestStopFailingAndRestarting() {
// nolint: errcheck
_ = os.Remove(testFile)
r := process.NewRunner(&userdata.UserData{}, &runner.Args{
r := restart.New(process.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "endless",
ProcessArgs: []string{"/bin/bash", "-c", "test -f " + testFile},
}, runner.WithType(runner.Forever), runner.WithLogPath(suite.tmpDir), runner.WithRestartInterval(5*time.Millisecond))
}, runner.WithLogPath(suite.tmpDir)), restart.WithType(restart.Forever), restart.WithRestartInterval(5*time.Millisecond))
suite.Assert().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
done := make(chan error, 1)
@ -148,9 +161,14 @@ func (suite *ProcessSuite) TestStopFailingAndRestarting() {
func (suite *ProcessSuite) TestStopSigKill() {
r := process.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "nokill",
ProcessArgs: []string{"/bin/bash", "-c", "trap -- '' SIGTERM; while true; do sleep 1; done"},
}, runner.WithType(runner.Forever), runner.WithLogPath(suite.tmpDir),
runner.WithRestartInterval(5*time.Millisecond), runner.WithGracefulShutdownTimeout(10*time.Millisecond))
ProcessArgs: []string{"/bin/bash", "-c", "trap -- '' SIGTERM; while :; do :; done"},
},
runner.WithLogPath(suite.tmpDir),
runner.WithGracefulShutdownTimeout(10*time.Millisecond),
)
suite.Assert().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
done := make(chan error, 1)

View File

@ -0,0 +1,174 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package restart
import (
"fmt"
"log"
"time"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
)
type restarter struct {
wrappedRunner runner.Runner
opts *Options
stop chan struct{}
stopped chan struct{}
}
// New wraps runner.Runner with restart policy
func New(wrapRunner runner.Runner, opts ...Option) runner.Runner {
r := &restarter{
wrappedRunner: wrapRunner,
opts: DefaultOptions(),
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
for _, opt := range opts {
opt(r.opts)
}
return r
}
// Options is the functional options struct.
type Options struct {
// Type describes the service's restart policy.
Type Type
// RestartInterval is the interval between restarts for failed runs
RestartInterval time.Duration
// GracefulShutdownTimeout is the time to wait for process to exit after SIGTERM
// before sending SIGKILL
GracefulShutdownTimeout time.Duration
}
// Option is the functional option func.
type Option func(*Options)
// Type represents the service's restart policy.
type Type int
const (
// Forever will always restart a process.
Forever Type = iota
// Once will run process exactly once
Once
// UntilSuccess will restart process until run succeeds
UntilSuccess
)
func (t Type) String() string {
switch t {
case Forever:
return "Forever"
case Once:
return "Once"
case UntilSuccess:
return "UntilSuccess"
default:
return "Unknown"
}
}
// DefaultOptions describes the default options to a runner.
func DefaultOptions() *Options {
return &Options{
Type: Forever,
RestartInterval: 5 * time.Second,
}
}
// WithType sets the type of a service.
func WithType(o Type) Option {
return func(args *Options) {
args.Type = o
}
}
// WithRestartInterval sets the interval between restarts of the failed task
func WithRestartInterval(interval time.Duration) Option {
return func(args *Options) {
args.RestartInterval = interval
}
}
// Open implements the Runner interface
func (r *restarter) Open() error {
return r.wrappedRunner.Open()
}
// Run implements the Runner interface
// nolint: gocyclo
func (r *restarter) Run() error {
defer close(r.stopped)
for {
errCh := make(chan error)
go func() {
errCh <- r.wrappedRunner.Run()
}()
var err error
select {
case <-r.stop:
// nolint: errcheck
_ = r.wrappedRunner.Stop()
return <-errCh
case err = <-errCh:
}
errStop := r.wrappedRunner.Stop()
if errStop != nil {
return errStop
}
switch r.opts.Type {
case Once:
return err
case UntilSuccess:
if err == nil {
return nil
}
log.Printf("error running %s, going to restart until it succeeds: %s", r.wrappedRunner, err)
case Forever:
if err == nil {
log.Printf("runner %s exited without error, going to restart it", r.wrappedRunner)
} else {
log.Printf("error running %v, going to restart forever: %s", r.wrappedRunner, err)
}
}
select {
case <-r.stop:
return nil
case <-time.After(r.opts.RestartInterval):
}
}
}
// Stop implements the Runner interface
func (r *restarter) Stop() error {
close(r.stop)
<-r.stopped
return nil
}
// Close implements the Runner interface
func (r *restarter) Close() error {
return r.wrappedRunner.Close()
}
// String implements the Runner interface
func (r *restarter) String() string {
return fmt.Sprintf("Restart(%s, %s)", r.opts.Type, r.wrappedRunner)
}

View File

@ -0,0 +1,168 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package restart_test
import (
"errors"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
)
type RestartSuite struct {
suite.Suite
}
type MockRunner struct {
exitCh chan error
times int
stop chan struct{}
stopped chan struct{}
}
func (m *MockRunner) Open() error {
m.stop = make(chan struct{})
m.stopped = make(chan struct{})
return nil
}
func (m *MockRunner) Close() error {
close(m.exitCh)
return nil
}
func (m *MockRunner) Run() error {
defer close(m.stopped)
select {
case err := <-m.exitCh:
m.times++
return err
case <-m.stop:
return nil
}
}
func (m *MockRunner) Stop() error {
close(m.stop)
<-m.stopped
m.stop = make(chan struct{})
m.stopped = make(chan struct{})
return nil
}
func (m *MockRunner) String() string {
return "MockRunner()"
}
func (suite *RestartSuite) TestString() {
suite.Assert().Equal("Restart(UntilSuccess, MockRunner())", restart.New(&MockRunner{}, restart.WithType(restart.UntilSuccess)).String())
}
func (suite *RestartSuite) TestRunOnce() {
mock := MockRunner{
exitCh: make(chan error),
}
r := restart.New(&mock, restart.WithType(restart.Once))
suite.Assert().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
failed := errors.New("failed")
go func() {
mock.exitCh <- failed
}()
suite.Assert().EqualError(r.Run(), failed.Error())
suite.Assert().NoError(r.Stop())
}
func (suite *RestartSuite) TestRunOnceStop() {
mock := MockRunner{
exitCh: make(chan error),
}
r := restart.New(&mock, restart.WithType(restart.Once))
suite.Assert().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
errCh := make(chan error)
go func() {
errCh <- r.Run()
}()
suite.Assert().NoError(r.Stop())
suite.Assert().NoError(<-errCh)
}
func (suite *RestartSuite) TestRunUntilSuccess() {
mock := MockRunner{
exitCh: make(chan error),
}
r := restart.New(&mock, restart.WithType(restart.UntilSuccess), restart.WithRestartInterval(time.Millisecond))
suite.Assert().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
failed := errors.New("failed")
errCh := make(chan error)
go func() {
errCh <- r.Run()
}()
mock.exitCh <- failed
mock.exitCh <- failed
mock.exitCh <- failed
mock.exitCh <- nil
suite.Assert().NoError(<-errCh)
suite.Assert().NoError(r.Stop())
suite.Assert().Equal(4, mock.times)
}
func (suite *RestartSuite) TestRunForever() {
mock := MockRunner{
exitCh: make(chan error),
}
r := restart.New(&mock, restart.WithType(restart.Forever), restart.WithRestartInterval(time.Millisecond))
suite.Assert().NoError(r.Open())
defer func() { suite.Assert().NoError(r.Close()) }()
failed := errors.New("failed")
errCh := make(chan error)
go func() {
errCh <- r.Run()
}()
mock.exitCh <- failed
mock.exitCh <- nil
mock.exitCh <- failed
mock.exitCh <- nil
select {
case <-errCh:
suite.Assert().Fail("runner should be still running")
default:
}
suite.Assert().NoError(r.Stop())
suite.Assert().NoError(<-errCh)
suite.Assert().Equal(4, mock.times)
}
func TestRestartSuite(t *testing.T) {
suite.Run(t, new(RestartSuite))
}

View File

@ -5,6 +5,7 @@
package runner
import (
"fmt"
"time"
"github.com/containerd/containerd"
@ -13,8 +14,11 @@ import (
// Runner describes the requirements for running a process.
type Runner interface {
fmt.Stringer
Open() error
Run() error
Stop() error
Close() error
}
// Args represents the required options for services.
@ -36,12 +40,8 @@ type Options struct {
ContainerImage string
// Namespace is the containerd namespace.
Namespace string
// Type describes the service's restart policy.
Type Type
// LogPath is the root path to store logs
LogPath string
// RestartInterval is the interval between restarts for failed runs
RestartInterval time.Duration
// GracefulShutdownTimeout is the time to wait for process to exit after SIGTERM
// before sending SIGKILL
GracefulShutdownTimeout time.Duration
@ -50,35 +50,16 @@ type Options struct {
// Option is the functional option func.
type Option func(*Options)
// Type represents the service's restart policy.
type Type int
const (
// Forever will always restart a process.
Forever Type = iota
// Once will restart the process only if it did not exit successfully.
Once
)
// DefaultOptions describes the default options to a runner.
func DefaultOptions() *Options {
return &Options{
Env: []string{},
Type: Forever,
Namespace: "system",
LogPath: "/var/log",
RestartInterval: 5 * time.Second,
GracefulShutdownTimeout: 10 * time.Second,
}
}
// WithType sets the type of a service.
func WithType(o Type) Option {
return func(args *Options) {
args.Type = o
}
}
// WithEnv sets the environment variables of a service.
func WithEnv(o []string) Option {
return func(args *Options) {
@ -121,13 +102,6 @@ func WithLogPath(path string) Option {
}
}
// WithRestartInterval sets the interval between restarts of the failed task
func WithRestartInterval(interval time.Duration) Option {
return func(args *Options) {
args.RestartInterval = interval
}
}
// WithGracefulShutdownTimeout sets the timeout for the task to terminate before sending SIGKILL
func WithGracefulShutdownTimeout(timeout time.Duration) Option {
return func(args *Options) {

View File

@ -12,6 +12,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -40,8 +41,8 @@ func (c *Containerd) ConditionFunc(data *userdata.UserData) conditions.Condition
return conditions.None()
}
// Start implements the Service interface.
func (c *Containerd) Start(data *userdata.UserData) error {
// Runner implements the Service interface.
func (c *Containerd) Runner(data *userdata.UserData) (runner.Runner, error) {
// Set the process arguments.
args := &runner.Args{
ID: c.ID(data),
@ -53,11 +54,11 @@ func (c *Containerd) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := process.NewRunner(
return restart.New(process.NewRunner(
data,
args,
runner.WithEnv(env),
)
return r.Run()
),
restart.WithType(restart.Forever),
), nil
}

View File

@ -96,14 +96,14 @@ func (k *Kubeadm) ConditionFunc(data *userdata.UserData) conditions.ConditionFun
return conditions.WaitForFilesToExist(files...)
}
// Start implements the Service interface.
// Runner implements the Service interface.
// nolint: dupl
func (k *Kubeadm) Start(data *userdata.UserData) error {
func (k *Kubeadm) Runner(data *userdata.UserData) (runner.Runner, error) {
image := constants.KubernetesImage
// We only wan't to run kubeadm if it hasn't been ran already.
if _, err := os.Stat("/etc/kubernetes/kubelet.conf"); !os.IsNotExist(err) {
return nil
return nil, nil
}
// Set the process arguments.
@ -170,7 +170,7 @@ func (k *Kubeadm) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.NewRunner(
return containerd.NewRunner(
data,
&args,
runner.WithNamespace(criconstants.K8sContainerdNamespace),
@ -184,10 +184,7 @@ func (k *Kubeadm) Start(data *userdata.UserData) error {
oci.WithParentCgroupDevices,
oci.WithPrivileged,
),
runner.WithType(runner.Once),
)
return r.Run()
), nil
}
func enforceMasterOverrides(initConfiguration *kubeadmapi.InitConfiguration) {

View File

@ -17,6 +17,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -62,8 +63,8 @@ func (k *Kubelet) ConditionFunc(data *userdata.UserData) conditions.ConditionFun
return conditions.WaitForFilesToExist("/var/lib/kubelet/kubeadm-flags.env", constants.ContainerdAddress)
}
// Start implements the Service interface.
func (k *Kubelet) Start(data *userdata.UserData) error {
// Runner implements the Service interface.
func (k *Kubelet) Runner(data *userdata.UserData) (runner.Runner, error) {
image := constants.KubernetesImage
// Set the process arguments.
@ -83,7 +84,7 @@ func (k *Kubelet) Start(data *userdata.UserData) error {
fileBytes, err := ioutil.ReadFile("/var/lib/kubelet/kubeadm-flags.env")
if err != nil {
return err
return nil, err
}
argsString := strings.TrimPrefix(string(fileBytes), "KUBELET_KUBEADM_ARGS=")
argsString = strings.TrimSuffix(argsString, "\n")
@ -106,7 +107,7 @@ func (k *Kubelet) Start(data *userdata.UserData) error {
// Add in the additional CNI mounts.
cniMounts, err := cni.Mounts(data)
if err != nil {
return err
return nil, err
}
mounts = append(mounts, cniMounts...)
@ -123,7 +124,7 @@ func (k *Kubelet) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.NewRunner(
return restart.New(containerd.NewRunner(
data,
&args,
runner.WithNamespace(criconstants.K8sContainerdNamespace),
@ -136,8 +137,7 @@ func (k *Kubelet) Start(data *userdata.UserData) error {
oci.WithParentCgroupDevices,
oci.WithPrivileged,
),
runner.WithType(runner.Forever),
)
return r.Run()
),
restart.WithType(restart.Forever),
), nil
}

View File

@ -13,6 +13,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -41,7 +42,7 @@ func (n *NTPd) ConditionFunc(data *userdata.UserData) conditions.ConditionFunc {
return conditions.None()
}
func (n *NTPd) Start(data *userdata.UserData) error {
func (n *NTPd) Runner(data *userdata.UserData) (runner.Runner, error) {
image := "talos/ntpd"
args := runner.Args{
@ -58,7 +59,7 @@ func (n *NTPd) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.NewRunner(
return restart.New(containerd.NewRunner(
data,
&args,
runner.WithContainerImage(image),
@ -67,7 +68,7 @@ func (n *NTPd) Start(data *userdata.UserData) error {
containerd.WithMemoryLimit(int64(1000000*32)),
oci.WithMounts(mounts),
),
)
return r.Run()
),
restart.WithType(restart.Forever),
), nil
}

View File

@ -14,6 +14,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -42,7 +43,7 @@ func (o *OSD) ConditionFunc(data *userdata.UserData) conditions.ConditionFunc {
return conditions.None()
}
func (o *OSD) Start(data *userdata.UserData) error {
func (o *OSD) Runner(data *userdata.UserData) (runner.Runner, error) {
image := "talos/osd"
// Set the process arguments.
@ -68,7 +69,7 @@ func (o *OSD) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.NewRunner(
return restart.New(containerd.NewRunner(
data,
&args,
runner.WithContainerImage(image),
@ -76,7 +77,7 @@ func (o *OSD) Start(data *userdata.UserData) error {
runner.WithOCISpecOpts(
oci.WithMounts(mounts),
),
)
return r.Run()
),
restart.WithType(restart.Forever),
), nil
}

View File

@ -14,6 +14,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -41,7 +42,7 @@ func (p *Proxyd) ConditionFunc(data *userdata.UserData) conditions.ConditionFunc
return conditions.WaitForFilesToExist("/etc/kubernetes/pki/ca.crt", "/etc/kubernetes/admin.conf")
}
func (p *Proxyd) Start(data *userdata.UserData) error {
func (p *Proxyd) Runner(data *userdata.UserData) (runner.Runner, error) {
image := "talos/proxyd"
// Set the process arguments.
@ -62,7 +63,7 @@ func (p *Proxyd) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.NewRunner(
return restart.New(containerd.NewRunner(
data,
&args,
runner.WithContainerImage(image),
@ -72,7 +73,7 @@ func (p *Proxyd) Start(data *userdata.UserData) error {
oci.WithMounts(mounts),
oci.WithPrivileged,
),
)
return r.Run()
),
restart.WithType(restart.Forever),
), nil
}

View File

@ -13,6 +13,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -41,7 +42,7 @@ func (t *Trustd) ConditionFunc(data *userdata.UserData) conditions.ConditionFunc
return conditions.None()
}
func (t *Trustd) Start(data *userdata.UserData) error {
func (t *Trustd) Runner(data *userdata.UserData) (runner.Runner, error) {
image := "talos/trustd"
// Set the process arguments.
@ -62,7 +63,7 @@ func (t *Trustd) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.NewRunner(
return restart.New(containerd.NewRunner(
data,
&args,
runner.WithContainerImage(image),
@ -71,7 +72,7 @@ func (t *Trustd) Start(data *userdata.UserData) error {
containerd.WithMemoryLimit(int64(1000000*512)),
oci.WithMounts(mounts),
),
)
return r.Run()
),
restart.WithType(restart.Forever),
), nil
}

View File

@ -10,6 +10,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -37,8 +38,8 @@ func (c *Udevd) ConditionFunc(data *userdata.UserData) conditions.ConditionFunc
return conditions.None()
}
// Start implements the Service interface.
func (c *Udevd) Start(data *userdata.UserData) error {
// Runner implements the Service interface.
func (c *Udevd) Runner(data *userdata.UserData) (runner.Runner, error) {
// Set the process arguments.
args := &runner.Args{
ID: c.ID(data),
@ -50,11 +51,11 @@ func (c *Udevd) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := process.NewRunner(
return restart.New(process.NewRunner(
data,
args,
runner.WithEnv(env),
)
return r.Run()
),
restart.WithType(restart.Forever),
), nil
}

View File

@ -8,7 +8,9 @@ import (
"log"
"sync"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -23,11 +25,11 @@ var once sync.Once
type Service interface {
// ID is the service id.
ID(*userdata.UserData) string
// PreFunc is invoked before a command is executed.
// PreFunc is invoked before a runner is created
PreFunc(*userdata.UserData) error
// Start
Start(*userdata.UserData) error
// PostFunc is invoked after a command is executed.
// Runner creates runner for the service
Runner(*userdata.UserData) (runner.Runner, error)
// PostFunc is invoked after a runner is closed.
PostFunc(*userdata.UserData) error
// ConditionFunc describes the conditions under which a service should
// start.
@ -45,6 +47,26 @@ func Services(data *userdata.UserData) *singleton {
return instance
}
func runService(runnr runner.Runner) error {
if runnr == nil {
// special case - run nothing (TODO: we should handle it better, e.g. in PreFunc)
return nil
}
if err := runnr.Open(); err != nil {
return errors.Wrap(err, "error opening runner")
}
// nolint: errcheck
defer runnr.Close()
if err := runnr.Run(); err != nil {
return errors.Wrap(err, "error running service")
}
return nil
}
// Start will invoke the service's Pre, Condition, and Type funcs. If the any
// error occurs in the Pre or Condition invocations, it is up to the caller to
// to restart the service.
@ -64,11 +86,16 @@ func (s *singleton) Start(services ...Service) {
}
log.Printf("starting service %q", id)
if err := service.Start(s.UserData); err != nil {
log.Printf("failed to start service %q: %v", id, err)
runnr, err := service.Runner(s.UserData)
if err != nil {
log.Printf("failed to create runner for service %q: %v", id, err)
return
}
if err := runService(runnr); err != nil {
log.Printf("failed running service %q: %v", id, err)
}
if err := service.PostFunc(s.UserData); err != nil {
log.Printf("failed to run post stage of service %q: %v", id, err)
return

View File

@ -212,9 +212,16 @@ func (r *Registrator) Reset(ctx context.Context, in *empty.Empty) (reply *proto.
oci.WithParentCgroupDevices,
oci.WithPrivileged,
),
runner.WithType(runner.Once),
)
err = cr.Open()
if err != nil {
return nil, err
}
// nolint: errcheck
defer cr.Close()
err = cr.Run()
if err != nil {
return nil, err