feat(init): implement graceful shutdown of 'init' (#562)

Most crucial changes in `init/main.go`: on shutdown now Talos tries
to stop gracefully all the services. All the shutdown paths are unified,
including poweroff, reboot and panic handling on startup.

While I was at it, I also fixed bug with containers failing to start
when old snapshot is still around.

Service lifecycle is wrapped with `ServiceRunner` object now which
handles state transitions and captures events related to state changes.
Every change goes to the log as well.

There's no way to capture service state yet, but that is planned to be
implemented as RPC API for `init` which is exposed via `osd` to `osctl`.

Future steps:

1. Implement service dependencies for correct startup order and
shutdown order.

2. Implement service health, so that we can say "start trustd when
containerd is up and healthy".

3. Implement gRPC API for init, expose via osd (service status, restart,
poweroff, ...)

4. Impement 'String()' for conditions, so that we can see what service
is waiting on right now.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2019-04-26 16:53:19 +03:00 committed by GitHub
parent 1a5be8da47
commit 505b5022c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1117 additions and 190 deletions

View File

@ -9,6 +9,8 @@ import (
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/containerd/containerd"
@ -30,6 +32,7 @@ import (
var (
switchRoot *bool
inContainer *bool
rebootFlag = unix.LINUX_REBOOT_CMD_RESTART
)
func init() {
@ -140,6 +143,7 @@ func initram() (err error) {
return nil
}
// nolint: gocyclo
func root() (err error) {
if !*inContainer {
// Setup logging to /dev/kmsg.
@ -175,25 +179,39 @@ func root() (err error) {
}
}
go func() {
if err := listenForPowerButton(); err != nil {
log.Printf("WARNING: power off events will be ignored: %+v", err)
}
}()
poweroffCh, err := listenForPowerButton()
if err != nil {
log.Printf("WARNING: power off events will be ignored: %+v", err)
}
startupErrCh := make(chan error)
termCh := make(chan os.Signal, 1)
signal.Notify(termCh, syscall.SIGTERM)
// Get a handle to the system services API.
svcs := system.Services(data)
defer svcs.Shutdown()
// Start containerd.
svcs.Start(&services.Containerd{})
go startSystemServices(data)
go startKubernetesServices(data)
go startSystemServices(startupErrCh, data)
go startKubernetesServices(startupErrCh, data)
select {
case <-poweroffCh:
// poweroff, proceed to shutdown but mark as poweroff
rebootFlag = unix.LINUX_REBOOT_CMD_POWER_OFF
case err = <-startupErrCh:
panic(err)
case <-termCh:
log.Printf("SIGTERM received, rebooting...")
}
return nil
}
func startSystemServices(data *userdata.UserData) {
func startSystemServices(startupErrCh chan<- error, data *userdata.UserData) {
var err error
svcs := system.Services(data)
@ -226,7 +244,8 @@ func startSystemServices(data *userdata.UserData) {
},
}
if err = ctrdrunner.Import(constants.SystemContainerdNamespace, reqs...); err != nil {
panic(err)
startupErrCh <- err
return
}
log.Println("starting system services")
@ -257,7 +276,7 @@ func startSystemServices(data *userdata.UserData) {
}
}
func startKubernetesServices(data *userdata.UserData) {
func startKubernetesServices(startupErrCh chan<- error, data *userdata.UserData) {
svcs := system.Services(data)
// Import the Kubernetes images.
@ -277,7 +296,8 @@ func startKubernetesServices(data *userdata.UserData) {
},
}
if err := ctrdrunner.Import(criconstants.K8sContainerdNamespace, reqs...); err != nil {
panic(err)
startupErrCh <- err
return
}
log.Println("starting kubernetes services")
@ -287,6 +307,45 @@ func startKubernetesServices(data *userdata.UserData) {
)
}
func sync() {
syncdone := make(chan struct{})
go func() {
defer close(syncdone)
unix.Sync()
}()
log.Printf("Waiting for sync...")
for i := 29; i >= 0; i-- {
select {
case <-syncdone:
log.Printf("Sync done")
return
case <-time.After(time.Second):
}
if i != 0 {
log.Printf("Waiting %d more seconds for sync to finish", i)
}
}
log.Printf("Sync hasn't completed in time, aborting...")
}
func reboot() {
// See http://man7.org/linux/man-pages/man2/reboot.2.html.
sync()
// nolint: errcheck
unix.Reboot(rebootFlag)
if *inContainer {
return
}
select {}
}
func recovery() {
if r := recover(); r != nil {
log.Printf("recovered from: %+v\n", r)
@ -294,15 +353,24 @@ func recovery() {
log.Printf("rebooting in %d seconds\n", i)
time.Sleep(1 * time.Second)
}
// nolint: errcheck
unix.Reboot(int(unix.LINUX_REBOOT_CMD_RESTART))
}
select {}
}
func main() {
// This is main entrypoint into init() execution, after kernel boot control is passsed
// to this function.
//
// When initram() finishes, it execs into itself with -switch-root flag, so control is passed
// once again into this function.
//
// When init() terminates either on normal shutdown (reboot, poweroff), or due to panic, control
// goes through recovery() and reboot() functions below, which finalize node state - sync buffers,
// initiate poweroff or reboot. Also on shutdown, other deferred function are called, for example
// services are gracefully shutdown.
// on any return from init.main(), initiate host reboot or shutdown
defer reboot()
// handle any panics in the main goroutine, and proceed to reboot() above
defer recovery()
// TODO(andrewrynhard): Remove this and be explicit.
@ -310,16 +378,14 @@ func main() {
panic(errors.New("error setting PATH"))
}
if *switchRoot {
switch {
case *switchRoot:
if err := root(); err != nil {
panic(errors.Wrap(err, "boot failed"))
}
// Hang forever.
select {}
}
if *inContainer {
// root() hangs until reboot
case *inContainer:
if err := container(); err != nil {
panic(errors.Wrap(err, "failed to prepare container based deploy"))
}
@ -327,14 +393,13 @@ func main() {
panic(errors.Wrap(err, "boot failed"))
}
// Hang forever.
select {}
}
// root() hangs until reboot
default:
if err := initram(); err != nil {
panic(errors.Wrap(err, "early boot failed"))
}
if err := initram(); err != nil {
panic(errors.Wrap(err, "early boot failed"))
// We should never reach this point if things are working as intended.
panic(errors.New("unknown error"))
}
// We should never reach this point if things are working as intended.
panic(errors.New("unknown error"))
}

View File

@ -5,16 +5,17 @@
package conditions
import (
"context"
"os"
"time"
)
// ConditionFunc is the signature that all condition funcs must have.
type ConditionFunc = func() (bool, error)
type ConditionFunc = func(ctx context.Context) (bool, error)
// None is a service condition that has no conditions.
func None() ConditionFunc {
return func() (bool, error) {
return func(ctx context.Context) (bool, error) {
return true, nil
}
}
@ -22,7 +23,7 @@ func None() ConditionFunc {
// FileExists is a service condition that checks for the existence of a file
// once and only once.
func FileExists(file string) ConditionFunc {
return func() (bool, error) {
return func(ctx context.Context) (bool, error) {
_, err := os.Stat(file)
if err != nil {
if os.IsNotExist(err) {
@ -39,9 +40,9 @@ func FileExists(file string) ConditionFunc {
// WaitForFileToExist is a service condition that will wait for the existence of
// a file.
func WaitForFileToExist(file string) ConditionFunc {
return func() (bool, error) {
return func(ctx context.Context) (bool, error) {
for {
exists, err := FileExists(file)()
exists, err := FileExists(file)(ctx)
if err != nil {
return false, err
}
@ -49,7 +50,12 @@ func WaitForFileToExist(file string) ConditionFunc {
if exists {
return true, nil
}
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(1 * time.Second):
}
}
}
}
@ -57,23 +63,25 @@ func WaitForFileToExist(file string) ConditionFunc {
// WaitForFilesToExist is a service condition that will wait for the existence a
// set of files.
func WaitForFilesToExist(files ...string) ConditionFunc {
return func() (exists bool, err error) {
L:
return func(ctx context.Context) (bool, error) {
for {
allExist := true
for _, f := range files {
exists, err = FileExists(f)()
exists, err := FileExists(f)(ctx)
if err != nil {
return false, err
}
if !exists {
time.Sleep(1 * time.Second)
continue L
}
allExist = allExist && exists
}
if exists {
if allExist {
return true, nil
}
time.Sleep(1 * time.Second)
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(1 * time.Second):
}
}
}
}

View File

@ -4,11 +4,160 @@
package conditions_test
import "testing"
import (
"context"
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
func TestEmpty(t *testing.T) {
// added for accurate coverage estimation
//
// please remove it once any unit-test is added
// for this package
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
)
type ConditionsSuite struct {
suite.Suite
tempDir string
}
func (suite *ConditionsSuite) SetupSuite() {
var err error
suite.tempDir, err = ioutil.TempDir("", "talos")
suite.Require().NoError(err)
}
func (suite *ConditionsSuite) TearDownSuite() {
suite.Require().NoError(os.RemoveAll(suite.tempDir))
}
func (suite *ConditionsSuite) createFile(name string) (path string) {
path = filepath.Join(suite.tempDir, name)
f, err := os.Create(path)
suite.Require().NoError(err)
suite.Require().NoError(f.Close())
return
}
func (suite *ConditionsSuite) TestFileExists() {
exists, err := conditions.FileExists("no-such-file")(context.Background())
suite.Require().NoError(err)
suite.Require().False(exists)
exists, err = conditions.FileExists(suite.createFile("a.txt"))(context.Background())
suite.Require().NoError(err)
suite.Require().True(exists)
}
func (suite *ConditionsSuite) TestWaitForFileToExist() {
path := suite.createFile("w.txt")
exists, err := conditions.WaitForFileToExist(path)(context.Background())
suite.Require().NoError(err)
suite.Require().True(exists)
suite.Require().NoError(os.Remove(path))
errCh := make(chan error)
go func() {
exists, err = conditions.WaitForFileToExist(path)(context.Background())
suite.Require().True(exists)
errCh <- err
}()
time.Sleep(50 * time.Millisecond)
select {
case <-errCh:
suite.Fail("unexpected return")
default:
}
suite.createFile("w.txt")
suite.Require().NoError(<-errCh)
suite.Require().NoError(os.Remove(path))
ctx, ctxCancel := context.WithCancel(context.Background())
go func() {
_, err = conditions.WaitForFileToExist(path)(ctx)
errCh <- err
}()
time.Sleep(50 * time.Millisecond)
select {
case <-errCh:
suite.Fail("unexpected return")
default:
}
ctxCancel()
suite.Require().EqualError(<-errCh, context.Canceled.Error())
}
func (suite *ConditionsSuite) TestWaitForFilesToExist() {
pathA := suite.createFile("wA.txt")
pathB := suite.createFile("wB.txt")
exists, err := conditions.WaitForFilesToExist(pathA, pathB)(context.Background())
suite.Require().NoError(err)
suite.Require().True(exists)
suite.Require().NoError(os.Remove(pathB))
errCh := make(chan error)
go func() {
exists, err = conditions.WaitForFilesToExist(pathA, pathB)(context.Background())
suite.Require().True(exists)
errCh <- err
}()
time.Sleep(50 * time.Millisecond)
select {
case <-errCh:
suite.Fail("unexpected return")
default:
}
suite.createFile("wB.txt")
suite.Require().NoError(<-errCh)
suite.Require().NoError(os.Remove(pathA))
suite.Require().NoError(os.Remove(pathB))
ctx, ctxCancel := context.WithCancel(context.Background())
go func() {
_, err = conditions.WaitForFilesToExist(pathA, pathB)(ctx)
errCh <- err
}()
time.Sleep(50 * time.Millisecond)
select {
case <-errCh:
suite.Fail("unexpected return")
default:
}
ctxCancel()
suite.Require().EqualError(<-errCh, context.Canceled.Error())
}
func TestConditionsSuite(t *testing.T) {
suite.Run(t, new(ConditionsSuite))
}

View File

@ -0,0 +1,104 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package events
import (
"time"
)
// MaxEventsToKeep is maximum number of events to keep per service before dropping old entries
const MaxEventsToKeep = 64
// ServiceState is enum of service run states
type ServiceState int
// ServiceState constants
const (
StateInitialized ServiceState = iota
StatePreparing
StateWaiting
StateRunning
StateStopping
StateFinished
StateFailed
)
func (state ServiceState) String() string {
switch state {
case StateInitialized:
return "Initialized"
case StatePreparing:
return "Preparing"
case StateWaiting:
return "Waiting"
case StateRunning:
return "Running"
case StateStopping:
return "Stopping"
case StateFinished:
return "Finished"
case StateFailed:
return "Failed"
default:
return "Unknown"
}
}
// ServiceEvent describes state change of the running service
type ServiceEvent struct {
Message string
State ServiceState
Timestamp time.Time
}
// ServiceEvents is a fixed length history of events
type ServiceEvents struct {
events []ServiceEvent
pos int
discarded uint
}
// Push appends new event to the history popping out oldest event on overflow
func (events *ServiceEvents) Push(event ServiceEvent) {
if events.events == nil {
events.events = make([]ServiceEvent, MaxEventsToKeep)
}
if events.events[events.pos].Message != "" {
// overwriting some entry
events.discarded++
}
events.events[events.pos] = event
events.pos = (events.pos + 1) % len(events.events)
}
// Get return a copy of event history, with most recent event being the last one
func (events *ServiceEvents) Get(count int) (result []ServiceEvent) {
if events.events == nil {
return
}
if count > MaxEventsToKeep {
count = MaxEventsToKeep
}
n := len(events.events)
for i := (events.pos - count + n) % n; count > 0; i = (i + 1) % n {
if events.events[i].Message != "" {
result = append(result, events.events[i])
}
count--
}
return
}
// Recorder adds new event to the history of events, formatting message with args using Sprintf
type Recorder func(newstate ServiceState, message string, args ...interface{})
// NullRecorder discards events
func NullRecorder(newstate ServiceState, message string, args ...interface{}) {
}

View File

@ -0,0 +1,77 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package events_test
import (
"strconv"
"testing"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
)
type EventsSuite struct {
suite.Suite
}
func (suite *EventsSuite) assertEvents(expectedMessages []string, evs []events.ServiceEvent) {
messages := make([]string, len(evs))
for i := range evs {
messages[i] = evs[i].Message
}
suite.Assert().Equal(expectedMessages, messages)
}
func (suite *EventsSuite) TestEmpty() {
var e events.ServiceEvents
suite.Assert().Equal([]events.ServiceEvent(nil), e.Get(100))
}
func (suite *EventsSuite) TestSome() {
var e events.ServiceEvents
for i := 0; i < 5; i++ {
e.Push(events.ServiceEvent{
Message: strconv.Itoa(i),
})
}
suite.Assert().Equal([]events.ServiceEvent(nil), e.Get(0))
suite.assertEvents([]string{"4"}, e.Get(1))
suite.assertEvents([]string{"1", "2", "3", "4"}, e.Get(4))
suite.assertEvents([]string{"0", "1", "2", "3", "4"}, e.Get(5))
suite.assertEvents([]string{"0", "1", "2", "3", "4"}, e.Get(6))
suite.assertEvents([]string{"0", "1", "2", "3", "4"}, e.Get(100))
}
func (suite *EventsSuite) TestOverflow() {
var e events.ServiceEvents
numEvents := events.MaxEventsToKeep*2 + 3
for i := 0; i < numEvents; i++ {
e.Push(events.ServiceEvent{
Message: strconv.Itoa(i),
})
}
suite.Assert().Equal([]events.ServiceEvent(nil), e.Get(0))
suite.assertEvents([]string{strconv.Itoa(numEvents - 1)}, e.Get(1))
expected := []string{}
for i := numEvents - events.MaxEventsToKeep; i < numEvents; i++ {
expected = append(expected, strconv.Itoa(i))
}
suite.assertEvents(expected, e.Get(events.MaxEventsToKeep*10))
suite.assertEvents(expected[len(expected)-3:], e.Get(3))
}
func TestEventsSuite(t *testing.T) {
suite.Run(t, new(EventsSuite))
}

View File

@ -0,0 +1,80 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package system_test
import (
"context"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/pkg/userdata"
)
type MockService struct {
name string
preError error
runnerError error
runner runner.Runner
condition conditions.ConditionFunc
postError error
}
func (m *MockService) ID(*userdata.UserData) string {
if m.name != "" {
return m.name
}
return "MockRunner"
}
func (m *MockService) PreFunc(*userdata.UserData) error {
return m.preError
}
func (m *MockService) Runner(*userdata.UserData) (runner.Runner, error) {
if m.runner != nil {
return m.runner, m.runnerError
}
return &MockRunner{exitCh: make(chan error)}, m.runnerError
}
func (m *MockService) PostFunc(*userdata.UserData) error {
return m.postError
}
func (m *MockService) ConditionFunc(*userdata.UserData) conditions.ConditionFunc {
if m.condition != nil {
return m.condition
}
return conditions.None()
}
type MockRunner struct {
exitCh chan error
}
func (m *MockRunner) Open(ctx context.Context) error {
return nil
}
func (m *MockRunner) Close() error {
return nil
}
func (m *MockRunner) Run(eventSink events.Recorder) error {
eventSink(events.StateRunning, "Running")
return <-m.exitCh
}
func (m *MockRunner) Stop() error {
close(m.exitCh)
return nil
}
func (m *MockRunner) String() string {
return "MockRunner()"
}

View File

@ -7,7 +7,6 @@ package containerd
import (
"context"
"fmt"
"log"
"path/filepath"
"syscall"
"time"
@ -19,6 +18,7 @@ import (
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
@ -56,10 +56,10 @@ func NewRunner(data *userdata.UserData, args *runner.Args, setters ...runner.Opt
}
// Open implements the Runner interface.
func (c *containerdRunner) Open() error {
func (c *containerdRunner) Open(ctx context.Context) error {
// Wait for the containerd socket.
_, err := conditions.WaitForFileToExist(constants.ContainerdAddress)()
_, err := conditions.WaitForFileToExist(constants.ContainerdAddress)(ctx)
if err != nil {
return err
}
@ -77,8 +77,15 @@ func (c *containerdRunner) Open() error {
return err
}
// Create the container.
// See if there's previous container/snapshot to clean up
var oldcontainer containerd.Container
if oldcontainer, err = c.client.LoadContainer(c.ctx, c.args.ID); err == nil {
if err = oldcontainer.Delete(c.ctx, containerd.WithSnapshotCleanup); err != nil {
return errors.Wrap(err, "error deleting old container instance")
}
}
// Create the container.
specOpts := c.newOCISpecOpts(image)
containerOpts := c.newContainerOpts(image, specOpts)
c.container, err = c.client.NewContainer(
@ -110,7 +117,9 @@ func (c *containerdRunner) Close() error {
}
// Run implements runner.Runner interface
func (c *containerdRunner) Run() error {
//
// nolint: gocyclo
func (c *containerdRunner) Run(eventSink events.Recorder) error {
defer close(c.stopped)
// Create the task and start it.
@ -124,6 +133,8 @@ func (c *containerdRunner) Run() error {
return errors.Wrapf(err, "failed to start task: %q", c.args.ID)
}
eventSink(events.StateRunning, "Started task %s (PID %d) for container %s", task.ID(), task.Pid(), c.container.ID())
statusC, err := task.Wait(c.ctx)
if err != nil {
return errors.Wrapf(err, "failed waiting for task: %q", c.args.ID)
@ -138,10 +149,11 @@ func (c *containerdRunner) Run() error {
return nil
case <-c.stop:
// graceful stop the task
log.Printf("sending SIGTERM to %v", c.args.ID)
eventSink(events.StateStopping, "Sending SIGTERM to task %s (PID %d, container %s)", task.ID(), task.Pid(), c.container.ID())
// nolint: errcheck
_ = task.Kill(c.ctx, syscall.SIGTERM, containerd.WithKillAll)
if err = task.Kill(c.ctx, syscall.SIGTERM, containerd.WithKillAll); err != nil {
return errors.Wrap(err, "error sending SIGTERM")
}
}
select {
@ -150,13 +162,15 @@ func (c *containerdRunner) Run() error {
return nil
case <-time.After(c.opts.GracefulShutdownTimeout):
// kill the process
log.Printf("sending SIGKILL to %v", c.args.ID)
eventSink(events.StateStopping, "Sending SIGKILL to task %s (PID %d, container %s)", task.ID(), task.Pid(), c.container.ID())
// nolint: errcheck
_ = task.Kill(c.ctx, syscall.SIGKILL, containerd.WithKillAll)
if err = task.Kill(c.ctx, syscall.SIGKILL, containerd.WithKillAll); err != nil {
return errors.Wrap(err, "error sending SIGKILL")
}
}
<-statusC
return nil
}

View File

@ -7,7 +7,9 @@ package containerd_test
import (
"bytes"
"context"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sync"
@ -19,6 +21,7 @@ import (
"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
containerdrunner "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process"
@ -32,6 +35,10 @@ const (
busyboxImage = "docker.io/library/busybox:latest"
)
func MockEventSink(state events.ServiceState, message string, args ...interface{}) {
log.Printf("state %s: %s", state, fmt.Sprintf(message, args...))
}
type ContainerdSuite struct {
suite.Suite
@ -61,12 +68,12 @@ func (suite *ContainerdSuite) SetupSuite() {
runner.WithLogPath(suite.tmpDir),
runner.WithEnv([]string{"PATH=/rootfs/bin:" + constants.PATH}),
)
suite.Require().NoError(suite.containerdRunner.Open())
suite.Require().NoError(suite.containerdRunner.Open(context.Background()))
suite.containerdWg.Add(1)
go func() {
defer suite.containerdWg.Done()
defer func() { suite.Require().NoError(suite.containerdRunner.Close()) }()
suite.Require().NoError(suite.containerdRunner.Run())
suite.Require().NoError(suite.containerdRunner.Run(MockEventSink))
}()
suite.client, err = containerd.New(constants.ContainerdAddress)
@ -97,10 +104,10 @@ func (suite *ContainerdSuite) TestRunSuccess() {
runner.WithContainerImage(busyboxImage),
)
suite.Require().NoError(r.Open())
suite.Require().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
suite.Assert().NoError(r.Run())
suite.Assert().NoError(r.Run(MockEventSink))
// calling stop when Run has finished is no-op
suite.Assert().NoError(r.Stop())
}
@ -115,19 +122,54 @@ func (suite *ContainerdSuite) TestRunTwice() {
runner.WithContainerImage(busyboxImage),
)
suite.Require().NoError(r.Open())
suite.Require().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
// running same container twice should be fine
// (checks that containerd state is cleaned up properly)
for i := 0; i < 2; i++ {
suite.Assert().NoError(r.Run())
suite.Assert().NoError(r.Run(MockEventSink))
// calling stop when Run has finished is no-op
suite.Assert().NoError(r.Stop())
// TODO: workaround containerd (?) bug: https://github.com/docker/for-linux/issues/643
time.Sleep(100 * time.Millisecond)
}
}
func (suite *ContainerdSuite) TestContainerCleanup() {
// create two runners with the same container ID
//
// open first runner, but don't close it; second runner should be
// able to start the container by cleaning up container created by the first
// runner
r1 := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "cleanup1",
ProcessArgs: []string{"/bin/sh", "-c", "exit 1"},
},
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
)
suite.Require().NoError(r1.Open(context.Background()))
r2 := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "cleanup1",
ProcessArgs: []string{"/bin/sh", "-c", "exit 0"},
},
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
)
suite.Require().NoError(r2.Open(context.Background()))
defer func() { suite.Assert().NoError(r2.Close()) }()
suite.Assert().NoError(r2.Run(MockEventSink))
// calling stop when Run has finished is no-op
suite.Assert().NoError(r2.Stop())
}
func (suite *ContainerdSuite) TestRunLogs() {
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "logtest",
@ -138,10 +180,10 @@ func (suite *ContainerdSuite) TestRunLogs() {
runner.WithContainerImage(busyboxImage),
)
suite.Require().NoError(r.Open())
suite.Require().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
suite.Assert().NoError(r.Run())
suite.Assert().NoError(r.Run(MockEventSink))
logFile, err := os.Open(filepath.Join(suite.tmpDir, "logtest.log"))
suite.Assert().NoError(err)
@ -180,13 +222,13 @@ func (suite *ContainerdSuite) TestStopFailingAndRestarting() {
restart.WithRestartInterval(5*time.Millisecond),
)
suite.Require().NoError(r.Open())
suite.Require().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
done := make(chan error, 1)
go func() {
done <- r.Run()
done <- r.Run(MockEventSink)
}()
time.Sleep(500 * time.Millisecond)
@ -237,13 +279,13 @@ func (suite *ContainerdSuite) TestStopSigKill() {
runner.WithContainerImage(busyboxImage),
runner.WithGracefulShutdownTimeout(10*time.Millisecond))
suite.Require().NoError(r.Open())
suite.Require().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
done := make(chan error, 1)
go func() {
done <- r.Run()
done <- r.Run(MockEventSink)
}()
time.Sleep(50 * time.Millisecond)

View File

@ -25,7 +25,7 @@ type ImportRequest struct {
// Import imports the images specified by the import requests.
func Import(namespace string, reqs ...*ImportRequest) error {
_, err := conditions.WaitForFileToExist(constants.ContainerdAddress)()
_, err := conditions.WaitForFileToExist(constants.ContainerdAddress)(context.Background())
if err != nil {
return err
}

View File

@ -5,15 +5,16 @@
package process
import (
"context"
"fmt"
"io"
"log"
"os"
"os/exec"
"syscall"
"time"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
processlogger "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process/log"
"github.com/talos-systems/talos/internal/pkg/constants"
@ -48,15 +49,15 @@ func NewRunner(data *userdata.UserData, args *runner.Args, setters ...runner.Opt
}
// Open implements the Runner interface.
func (p *processRunner) Open() error {
func (p *processRunner) Open(ctx context.Context) error {
return nil
}
// Run implements the Runner interface.
func (p *processRunner) Run() error {
func (p *processRunner) Run(eventSink events.Recorder) error {
defer close(p.stopped)
return p.run()
return p.run(eventSink)
}
// Stop implements the Runner interface
@ -101,7 +102,7 @@ func (p *processRunner) build() (cmd *exec.Cmd, err error) {
return cmd, nil
}
func (p *processRunner) run() error {
func (p *processRunner) run(eventSink events.Recorder) error {
cmd, err := p.build()
if err != nil {
return errors.Wrap(err, "error building command")
@ -111,6 +112,8 @@ func (p *processRunner) run() error {
return errors.Wrap(err, "error starting process")
}
eventSink(events.StateRunning, "Process %s started with PID %d", p, cmd.Process.Pid)
waitCh := make(chan error)
go func() {
@ -123,7 +126,7 @@ func (p *processRunner) run() error {
return err
case <-p.stop:
// graceful stop the service
log.Printf("sending SIGTERM to %s", p)
eventSink(events.StateStopping, "Sending SIGTERM to %s", p)
// nolint: errcheck
_ = cmd.Process.Signal(syscall.SIGTERM)
@ -135,7 +138,7 @@ func (p *processRunner) run() error {
return nil
case <-time.After(p.opts.GracefulShutdownTimeout):
// kill the process
log.Printf("sending SIGKILL to %s", p)
eventSink(events.StateStopping, "Sending SIGKILL to %s", p)
// nolint: errcheck
_ = cmd.Process.Signal(syscall.SIGKILL)
@ -147,5 +150,5 @@ func (p *processRunner) run() error {
}
func (p *processRunner) String() string {
return fmt.Sprintf("Process(%v)", p.args.ProcessArgs)
return fmt.Sprintf("Process(%q)", p.args.ProcessArgs)
}

View File

@ -5,7 +5,10 @@
package process_test
import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sync"
@ -13,12 +16,17 @@ import (
"time"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
"github.com/talos-systems/talos/pkg/userdata"
)
func MockEventSink(state events.ServiceState, message string, args ...interface{}) {
log.Printf("state %s: %s", state, fmt.Sprintf(message, args...))
}
type ProcessSuite struct {
suite.Suite
@ -42,10 +50,10 @@ func (suite *ProcessSuite) TestRunSuccess() {
ProcessArgs: []string{"/bin/bash", "-c", "exit 0"},
}, runner.WithLogPath(suite.tmpDir))
suite.Assert().NoError(r.Open())
suite.Assert().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
suite.Assert().NoError(r.Run())
suite.Assert().NoError(r.Run(MockEventSink))
// calling stop when Run has finished is no-op
suite.Assert().NoError(r.Stop())
}
@ -56,10 +64,10 @@ func (suite *ProcessSuite) TestRunLogs() {
ProcessArgs: []string{"/bin/bash", "-c", "echo -n \"Test 1\nTest 2\n\""},
}, runner.WithLogPath(suite.tmpDir))
suite.Assert().NoError(r.Open())
suite.Assert().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
suite.Assert().NoError(r.Run())
suite.Assert().NoError(r.Run(MockEventSink))
logFile, err := os.Open(filepath.Join(suite.tmpDir, "logtest.log"))
suite.Assert().NoError(err)
@ -83,7 +91,7 @@ func (suite *ProcessSuite) TestRunRestartFailed() {
ProcessArgs: []string{"/bin/bash", "-c", "echo \"ran\"; test -f " + testFile},
}, runner.WithLogPath(suite.tmpDir)), restart.WithType(restart.UntilSuccess), restart.WithRestartInterval(time.Millisecond))
suite.Assert().NoError(r.Open())
suite.Assert().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
var wg sync.WaitGroup
@ -91,7 +99,7 @@ func (suite *ProcessSuite) TestRunRestartFailed() {
wg.Add(1)
go func() {
defer wg.Done()
suite.Assert().NoError(r.Run())
suite.Assert().NoError(r.Run(MockEventSink))
}()
time.Sleep(200 * time.Millisecond)
@ -123,13 +131,13 @@ func (suite *ProcessSuite) TestStopFailingAndRestarting() {
ProcessArgs: []string{"/bin/bash", "-c", "test -f " + testFile},
}, runner.WithLogPath(suite.tmpDir)), restart.WithType(restart.Forever), restart.WithRestartInterval(5*time.Millisecond))
suite.Assert().NoError(r.Open())
suite.Assert().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
done := make(chan error, 1)
go func() {
done <- r.Run()
done <- r.Run(MockEventSink)
}()
time.Sleep(40 * time.Millisecond)
@ -167,13 +175,13 @@ func (suite *ProcessSuite) TestStopSigKill() {
runner.WithGracefulShutdownTimeout(10*time.Millisecond),
)
suite.Assert().NoError(r.Open())
suite.Assert().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
done := make(chan error, 1)
go func() {
done <- r.Run()
done <- r.Run(MockEventSink)
}()
time.Sleep(100 * time.Millisecond)

View File

@ -5,10 +5,11 @@
package restart
import (
"context"
"fmt"
"log"
"time"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
)
@ -42,9 +43,6 @@ type Options struct {
Type Type
// RestartInterval is the interval between restarts for failed runs
RestartInterval time.Duration
// GracefulShutdownTimeout is the time to wait for process to exit after SIGTERM
// before sending SIGKILL
GracefulShutdownTimeout time.Duration
}
// Option is the functional option func.
@ -98,20 +96,20 @@ func WithRestartInterval(interval time.Duration) Option {
}
// Open implements the Runner interface
func (r *restarter) Open() error {
return r.wrappedRunner.Open()
func (r *restarter) Open(ctx context.Context) error {
return r.wrappedRunner.Open(ctx)
}
// Run implements the Runner interface
// nolint: gocyclo
func (r *restarter) Run() error {
func (r *restarter) Run(eventSink events.Recorder) error {
defer close(r.stopped)
for {
errCh := make(chan error)
go func() {
errCh <- r.wrappedRunner.Run()
errCh <- r.wrappedRunner.Run(eventSink)
}()
var err error
@ -137,17 +135,18 @@ func (r *restarter) Run() error {
if err == nil {
return nil
}
log.Printf("error running %s, going to restart until it succeeds: %s", r.wrappedRunner, err)
eventSink(events.StateWaiting, "Error running %s, going to restart until it succeeds: %s", r.wrappedRunner, err)
case Forever:
if err == nil {
log.Printf("runner %s exited without error, going to restart it", r.wrappedRunner)
eventSink(events.StateWaiting, "Runner %s exited without error, going to restart it", r.wrappedRunner)
} else {
log.Printf("error running %v, going to restart forever: %s", r.wrappedRunner, err)
eventSink(events.StateWaiting, "Error running %v, going to restart forever: %s", r.wrappedRunner, err)
}
}
select {
case <-r.stop:
eventSink(events.StateStopping, "Aborting restart sequence")
return nil
case <-time.After(r.opts.RestartInterval):
}

View File

@ -5,12 +5,16 @@
package restart_test
import (
"context"
"errors"
"fmt"
"log"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/restart"
)
@ -25,7 +29,7 @@ type MockRunner struct {
stopped chan struct{}
}
func (m *MockRunner) Open() error {
func (m *MockRunner) Open(ctx context.Context) error {
m.stop = make(chan struct{})
m.stopped = make(chan struct{})
return nil
@ -36,7 +40,7 @@ func (m *MockRunner) Close() error {
return nil
}
func (m *MockRunner) Run() error {
func (m *MockRunner) Run(eventSink events.Recorder) error {
defer close(m.stopped)
select {
@ -63,6 +67,10 @@ func (m *MockRunner) String() string {
return "MockRunner()"
}
func MockEventSink(state events.ServiceState, message string, args ...interface{}) {
log.Printf("state %s: %s", state, fmt.Sprintf(message, args...))
}
func (suite *RestartSuite) TestString() {
suite.Assert().Equal("Restart(UntilSuccess, MockRunner())", restart.New(&MockRunner{}, restart.WithType(restart.UntilSuccess)).String())
}
@ -73,7 +81,7 @@ func (suite *RestartSuite) TestRunOnce() {
}
r := restart.New(&mock, restart.WithType(restart.Once))
suite.Assert().NoError(r.Open())
suite.Assert().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
failed := errors.New("failed")
@ -82,7 +90,7 @@ func (suite *RestartSuite) TestRunOnce() {
mock.exitCh <- failed
}()
suite.Assert().EqualError(r.Run(), failed.Error())
suite.Assert().EqualError(r.Run(MockEventSink), failed.Error())
suite.Assert().NoError(r.Stop())
}
@ -92,13 +100,13 @@ func (suite *RestartSuite) TestRunOnceStop() {
}
r := restart.New(&mock, restart.WithType(restart.Once))
suite.Assert().NoError(r.Open())
suite.Assert().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
errCh := make(chan error)
go func() {
errCh <- r.Run()
errCh <- r.Run(MockEventSink)
}()
suite.Assert().NoError(r.Stop())
@ -111,14 +119,14 @@ func (suite *RestartSuite) TestRunUntilSuccess() {
}
r := restart.New(&mock, restart.WithType(restart.UntilSuccess), restart.WithRestartInterval(time.Millisecond))
suite.Assert().NoError(r.Open())
suite.Assert().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
failed := errors.New("failed")
errCh := make(chan error)
go func() {
errCh <- r.Run()
errCh <- r.Run(MockEventSink)
}()
mock.exitCh <- failed
@ -137,14 +145,14 @@ func (suite *RestartSuite) TestRunForever() {
}
r := restart.New(&mock, restart.WithType(restart.Forever), restart.WithRestartInterval(time.Millisecond))
suite.Assert().NoError(r.Open())
suite.Assert().NoError(r.Open(context.Background()))
defer func() { suite.Assert().NoError(r.Close()) }()
failed := errors.New("failed")
errCh := make(chan error)
go func() {
errCh <- r.Run()
errCh <- r.Run(MockEventSink)
}()
mock.exitCh <- failed

View File

@ -5,18 +5,21 @@
package runner
import (
"context"
"fmt"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/oci"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
)
// Runner describes the requirements for running a process.
type Runner interface {
fmt.Stringer
Open() error
Run() error
Open(ctx context.Context) error
Run(events.Recorder) error
Stop() error
Close() error
}

View File

@ -0,0 +1,150 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package system
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/pkg/userdata"
)
// ServiceRunner wraps the state of the service (running, stopped, ...)
type ServiceRunner struct {
mu sync.Mutex
userData *userdata.UserData
service Service
id string
state events.ServiceState
events events.ServiceEvents
ctx context.Context
ctxCancel context.CancelFunc
}
// NewServiceRunner creates new ServiceRunner around Service instance
func NewServiceRunner(service Service, userData *userdata.UserData) *ServiceRunner {
ctx, ctxCancel := context.WithCancel(context.Background())
return &ServiceRunner{
service: service,
userData: userData,
id: service.ID(userData),
ctx: ctx,
ctxCancel: ctxCancel,
state: events.StateInitialized,
}
}
// UpdateState implements events.Recorder
func (svcrunner *ServiceRunner) UpdateState(newstate events.ServiceState, message string, args ...interface{}) {
svcrunner.mu.Lock()
defer svcrunner.mu.Unlock()
event := events.ServiceEvent{
Message: fmt.Sprintf(message, args...),
State: newstate,
Timestamp: time.Now(),
}
svcrunner.state = newstate
svcrunner.events.Push(event)
log.Printf("service[%s](%s): %s", svcrunner.id, svcrunner.state, event.Message)
}
// GetEventHistory returns history of events for this service
func (svcrunner *ServiceRunner) GetEventHistory(count int) []events.ServiceEvent {
svcrunner.mu.Lock()
defer svcrunner.mu.Unlock()
return svcrunner.events.Get(count)
}
// Start initializes the service and runs it
//
// Start should be run in a goroutine.
func (svcrunner *ServiceRunner) Start() {
svcrunner.UpdateState(events.StatePreparing, "Running pre state")
if err := svcrunner.service.PreFunc(svcrunner.userData); err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed to run pre stage: %v", err)
return
}
svcrunner.UpdateState(events.StateWaiting, "Waiting for conditions")
_, err := svcrunner.service.ConditionFunc(svcrunner.userData)(svcrunner.ctx)
if err != nil {
svcrunner.UpdateState(events.StateFailed, "Condition failed: %v", err)
return
}
svcrunner.UpdateState(events.StatePreparing, "Creating service runner")
runnr, err := svcrunner.service.Runner(svcrunner.userData)
if err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed to create runner: %v", err)
return
}
if err := svcrunner.run(runnr); err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed running service: %v", err)
} else {
svcrunner.UpdateState(events.StateFinished, "Service finished successfully")
}
if err := svcrunner.service.PostFunc(svcrunner.userData); err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed to run post stage: %v", err)
return
}
}
func (svcrunner *ServiceRunner) run(runnr runner.Runner) error {
if runnr == nil {
// special case - run nothing (TODO: we should handle it better, e.g. in PreFunc)
return nil
}
if err := runnr.Open(svcrunner.ctx); err != nil {
return errors.Wrap(err, "error opening runner")
}
// nolint: errcheck
defer runnr.Close()
errCh := make(chan error)
go func() {
errCh <- runnr.Run(svcrunner.UpdateState)
}()
select {
case <-svcrunner.ctx.Done():
err := runnr.Stop()
<-errCh
if err != nil {
return errors.Wrap(err, "error stopping service")
}
case err := <-errCh:
if err != nil {
return errors.Wrap(err, "error running service")
}
}
return nil
}
// Shutdown initiates shutdown of the service runner
//
// Shutdown completes when Start() returns
func (svcrunner *ServiceRunner) Shutdown() {
svcrunner.ctxCancel()
}

View File

@ -0,0 +1,176 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package system_test
import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
)
type ServiceRunnerSuite struct {
suite.Suite
}
func (suite *ServiceRunnerSuite) assertStateSequence(expectedStates []events.ServiceState, sr *system.ServiceRunner) {
states := []events.ServiceState{}
for _, event := range sr.GetEventHistory(1000) {
states = append(states, event.State)
}
suite.Assert().Equal(expectedStates, states)
}
func (suite *ServiceRunnerSuite) TestFullFlow() {
sr := system.NewServiceRunner(&MockService{}, nil)
finished := make(chan struct{})
go func() {
defer close(finished)
sr.Start()
}()
time.Sleep(50 * time.Millisecond)
select {
case <-finished:
suite.Require().Fail("service running should be still running")
default:
}
sr.Shutdown()
<-finished
suite.assertStateSequence([]events.ServiceState{
events.StatePreparing,
events.StateWaiting,
events.StatePreparing,
events.StateRunning,
events.StateFinished,
}, sr)
}
func (suite *ServiceRunnerSuite) TestPreStageFail() {
svc := &MockService{
preError: errors.New("pre failed"),
}
sr := system.NewServiceRunner(svc, nil)
sr.Start()
suite.assertStateSequence([]events.ServiceState{
events.StatePreparing,
events.StateFailed,
}, sr)
}
func (suite *ServiceRunnerSuite) TestRunnerStageFail() {
svc := &MockService{
runnerError: errors.New("runner failed"),
}
sr := system.NewServiceRunner(svc, nil)
sr.Start()
suite.assertStateSequence([]events.ServiceState{
events.StatePreparing,
events.StateWaiting,
events.StatePreparing,
events.StateFailed,
}, sr)
}
func (suite *ServiceRunnerSuite) TestAbortOnCondition() {
svc := &MockService{
condition: conditions.WaitForFileToExist("/doesntexistever"),
}
sr := system.NewServiceRunner(svc, nil)
finished := make(chan struct{})
go func() {
defer close(finished)
sr.Start()
}()
time.Sleep(50 * time.Millisecond)
select {
case <-finished:
suite.Require().Fail("service running should be still running")
default:
}
sr.Shutdown()
<-finished
suite.assertStateSequence([]events.ServiceState{
events.StatePreparing,
events.StateWaiting,
events.StateFailed,
}, sr)
}
func (suite *ServiceRunnerSuite) TestPostStateFail() {
svc := &MockService{
postError: errors.New("post failed"),
}
sr := system.NewServiceRunner(svc, nil)
finished := make(chan struct{})
go func() {
defer close(finished)
sr.Start()
}()
sr.Shutdown()
<-finished
suite.assertStateSequence([]events.ServiceState{
events.StatePreparing,
events.StateWaiting,
events.StatePreparing,
events.StateRunning,
events.StateFinished,
events.StateFailed,
}, sr)
}
func (suite *ServiceRunnerSuite) TestRunFail() {
runner := &MockRunner{exitCh: make(chan error)}
svc := &MockService{runner: runner}
sr := system.NewServiceRunner(svc, nil)
finished := make(chan struct{})
go func() {
defer close(finished)
sr.Start()
}()
runner.exitCh <- errors.New("run failed")
<-finished
suite.assertStateSequence([]events.ServiceState{
events.StatePreparing,
events.StateWaiting,
events.StatePreparing,
events.StateRunning,
events.StateFailed,
}, sr)
}
func TestServiceRunnerSuite(t *testing.T) {
suite.Run(t, new(ServiceRunnerSuite))
}

View File

@ -5,10 +5,9 @@
package system
import (
"log"
"sync"
"time"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/pkg/userdata"
@ -16,6 +15,13 @@ import (
type singleton struct {
UserData *userdata.UserData
// State of running services by ID
State map[string]*ServiceRunner
mu sync.Mutex
wg sync.WaitGroup
terminating bool
}
var instance *singleton
@ -42,64 +48,77 @@ type Service interface {
// nolint: golint
func Services(data *userdata.UserData) *singleton {
once.Do(func() {
instance = &singleton{UserData: data}
instance = &singleton{
UserData: data,
State: make(map[string]*ServiceRunner),
}
})
return instance
}
func runService(runnr runner.Runner) error {
if runnr == nil {
// special case - run nothing (TODO: we should handle it better, e.g. in PreFunc)
return nil
}
if err := runnr.Open(); err != nil {
return errors.Wrap(err, "error opening runner")
}
// nolint: errcheck
defer runnr.Close()
if err := runnr.Run(); err != nil {
return errors.Wrap(err, "error running service")
}
return nil
}
// Start will invoke the service's Pre, Condition, and Type funcs. If the any
// error occurs in the Pre or Condition invocations, it is up to the caller to
// to restart the service.
func (s *singleton) Start(services ...Service) {
s.mu.Lock()
if s.terminating {
return
}
defer s.mu.Unlock()
for _, service := range services {
go func(service Service) {
id := service.ID(s.UserData)
if err := service.PreFunc(s.UserData); err != nil {
log.Printf("failed to run pre stage of service %q: %v", id, err)
return
}
id := service.ID(s.UserData)
_, err := service.ConditionFunc(s.UserData)()
if err != nil {
log.Printf("service %q condition failed: %v", id, err)
return
}
if _, exists := s.State[id]; exists {
// service already started?
// TODO: it might be nice to handle case when service
// should be restarted (e.g. kubeadm after reset)
continue
}
log.Printf("starting service %q", id)
runnr, err := service.Runner(s.UserData)
if err != nil {
log.Printf("failed to create runner for service %q: %v", id, err)
return
}
svcrunner := NewServiceRunner(service, s.UserData)
s.State[id] = svcrunner
if err := runService(runnr); err != nil {
log.Printf("failed running service %q: %v", id, err)
}
s.wg.Add(1)
go func(svcrunner *ServiceRunner) {
defer s.wg.Done()
if err := service.PostFunc(s.UserData); err != nil {
log.Printf("failed to run post stage of service %q: %v", id, err)
return
}
}(service)
svcrunner.Start()
}(svcrunner)
}
}
// ShutdownHackySleep is a variable to allow tests to override it
//
// TODO: part of a hack below
var ShutdownHackySleep = 10 * time.Second
// Shutdown all the services
func (s *singleton) Shutdown() {
s.mu.Lock()
if s.terminating {
s.mu.Unlock()
return
}
s.terminating = true
// TODO: this is a hack, we stop all service runners but containerd/udevd first.
// Tis is required for correct shutdown until service dependencies
// are implemented properly.
for name, svcrunner := range s.State {
if name != "containerd" && name != "udevd" {
svcrunner.Shutdown()
}
}
// TODO: 2nd part of a hack above
// sleep a bit to let containers actually terminate before stopping containerd
time.Sleep(ShutdownHackySleep)
for _, svcrunner := range s.State {
svcrunner.Shutdown()
}
s.mu.Unlock()
s.wg.Wait()
}

View File

@ -4,11 +4,27 @@
package system_test
import "testing"
import (
"testing"
func TestEmpty(t *testing.T) {
// added for accurate coverage estimation
//
// please remove it once any unit-test is added
// for this package
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system"
)
type SystemServicesSuite struct {
suite.Suite
}
func (suite *SystemServicesSuite) TestStartShutdown() {
prevShutdownHackySleep := system.ShutdownHackySleep
defer func() { system.ShutdownHackySleep = prevShutdownHackySleep }()
system.ShutdownHackySleep = 0
system.Services(nil).Start(&MockService{name: "containerd"}, &MockService{name: "proxyd"})
system.Services(nil).Shutdown()
}
func TestSystemServicesSuite(t *testing.T) {
suite.Run(t, new(SystemServicesSuite))
}

View File

@ -5,10 +5,11 @@
package main
import (
"log"
"github.com/mdlayher/genetlink"
"github.com/mdlayher/netlink"
"github.com/pkg/errors"
"golang.org/x/sys/unix"
)
const (
@ -17,19 +18,19 @@ const (
acpiGenlMcastGroupName = "acpi_mc_group"
)
func listenForPowerButton() (err error) {
func listenForPowerButton() (poweroffCh <-chan struct{}, err error) {
// Get the acpi_event family.
conn, err := genetlink.Dial(nil)
if err != nil {
return err
return nil, err
}
// nolint: errcheck
defer conn.Close()
f, err := conn.GetFamily(acpiGenlFamilyName)
if netlink.IsNotExist(err) {
return errors.Wrap(err, acpiGenlFamilyName+" not available")
return nil, errors.Wrap(err, acpiGenlFamilyName+" not available")
}
var id uint32
@ -39,22 +40,25 @@ func listenForPowerButton() (err error) {
}
}
if err = conn.JoinGroup(id); err != nil {
return err
return nil, err
}
// Listen for ACPI events.
ch := make(chan struct{})
msgs, _, err := conn.Receive()
if err != nil {
return err
}
if len(msgs) > 0 {
// TODO(andrewrynhard): Stop all running containerd tasks.
// See http://man7.org/linux/man-pages/man2/reboot.2.html.
unix.Sync()
// nolint: errcheck
unix.Reboot(unix.LINUX_REBOOT_CMD_POWER_OFF)
}
go func() {
for {
msgs, _, err := conn.Receive()
if err != nil {
log.Printf("error reading from ACPI channel: %s", err)
return
}
if len(msgs) > 0 {
close(ch)
return
}
}
}()
return nil
return ch, nil
}

View File

@ -21,6 +21,7 @@ import (
"github.com/golang/protobuf/ptypes/empty"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/pkg/system/events"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
containerdrunner "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/osd/proto"
@ -217,7 +218,7 @@ func (r *Registrator) Reset(ctx context.Context, in *empty.Empty) (reply *proto.
),
)
err = cr.Open()
err = cr.Open(context.Background())
if err != nil {
return nil, err
}
@ -225,7 +226,8 @@ func (r *Registrator) Reset(ctx context.Context, in *empty.Empty) (reply *proto.
// nolint: errcheck
defer cr.Close()
err = cr.Run()
// TODO: should this go through system.Services?
err = cr.Run(events.NullRecorder)
if err != nil {
return nil, err
}