feat(init): implement complete API for service lifecycle (start/stop)

It is now possible to `start`/`stop`/`restart` any service via `osctl`
commands.

There are some changes in `ServiceRunner` to support re-use (re-entering
running state). `Services` singleton now tracks service running state to
avoid calling `Start()` on already running `ServiceRunner` instance.
Method `Start()` was renamed to `LoadAndStart()` to break up service
loading (adding to the list of service) and actual service start.

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2019-07-31 00:49:42 +03:00 committed by Andrew Rynhard
parent 91ac1d7a8c
commit 9c63f4ed0a
11 changed files with 266 additions and 45 deletions

View File

@ -20,21 +20,44 @@ import (
// serviceCmd represents the service command
var serviceCmd = &cobra.Command{
Use: "service [<id>]",
Use: "service [<id> [start|stop|restart|status]]",
Aliases: []string{"services"},
Short: "Retrieve the state of a service (or all services)",
Long: ``,
Short: "Retrieve the state of a service (or all services), control service state",
Long: `Service control command. If run without arguments, lists all the services and their state.
If service ID is specified, default action 'status' is executed which shows status of a single list service.
With actions 'start', 'stop', 'restart', service state is updated respectively.`,
Run: func(cmd *cobra.Command, args []string) {
if len(args) > 1 {
if len(args) > 2 {
helpers.Should(cmd.Usage())
os.Exit(1)
}
action := "status"
serviceID := ""
if len(args) >= 1 {
serviceID = args[0]
}
if len(args) == 2 {
action = args[1]
}
setupClient(func(c *client.Client) {
if len(args) == 0 {
serviceList(c)
} else {
serviceInfo(c, args[0])
switch action {
case "status":
if serviceID == "" {
serviceList(c)
} else {
serviceInfo(c, serviceID)
}
case "start":
serviceStart(c, serviceID)
case "stop":
serviceStop(c, serviceID)
case "restart":
serviceStop(c, serviceID)
serviceStart(c, serviceID)
default:
helpers.Fatalf("unsupported service action: %q", action)
}
})
},
@ -87,6 +110,24 @@ func serviceInfo(c *client.Client, id string) {
}
}
func serviceStart(c *client.Client, id string) {
resp, err := c.Start(globalCtx, id)
if err != nil {
helpers.Fatalf("error starting service: %s", err)
}
fmt.Fprintln(os.Stderr, resp)
}
func serviceStop(c *client.Client, id string) {
resp, err := c.Stop(globalCtx, id)
if err != nil {
helpers.Fatalf("error starting service: %s", err)
}
fmt.Fprintln(os.Stderr, resp)
}
type serviceInfoWrapper struct {
*initproto.ServiceInfo
}

View File

@ -316,3 +316,23 @@ func (c *Client) ServiceInfo(ctx context.Context, id string) (*initproto.Service
return nil, nil
}
// Start starts a service.
func (c *Client) Start(ctx context.Context, id string) (string, error) {
r, err := c.initClient.Start(ctx, &initproto.StartRequest{Id: id})
if err != nil {
return "", err
}
return r.Resp, nil
}
// Stop stops a service.
func (c *Client) Stop(ctx context.Context, id string) (string, error) {
r, err := c.initClient.Stop(ctx, &initproto.StopRequest{Id: id})
if err != nil {
return "", err
}
return r.Resp, nil
}

View File

@ -142,6 +142,17 @@ func (r *Registrator) ServiceList(ctx context.Context, in *empty.Empty) (result
return result, nil
}
// Start implements the proto.InitServer interface and starts a
// service running on Talos.
func (r *Registrator) Start(ctx context.Context, in *proto.StartRequest) (reply *proto.StartReply, err error) {
if err = system.Services(r.Data).Start(in.Id); err != nil {
return &proto.StartReply{}, err
}
reply = &proto.StartReply{Resp: fmt.Sprintf("Service %q started", in.Id)}
return reply, err
}
// Stop implements the proto.InitServer interface and stops a
// service running on Talos.
func (r *Registrator) Stop(ctx context.Context, in *proto.StopRequest) (reply *proto.StopReply, err error) {

View File

@ -36,7 +36,7 @@ func (task *Services) runtime(platform platform.Platform, data *userdata.UserDat
func startSystemServices(data *userdata.UserData) {
svcs := system.Services(data)
// Start the services common to all nodes.
svcs.Start(
svcs.LoadAndStart(
&services.Networkd{},
&services.Containerd{},
&services.Udevd{},
@ -46,7 +46,7 @@ func startSystemServices(data *userdata.UserData) {
)
// Start the services common to all master nodes.
if data.Services.Kubeadm.IsControlPlane() {
svcs.Start(
svcs.LoadAndStart(
&services.Trustd{},
&services.Proxyd{},
)
@ -56,7 +56,7 @@ func startSystemServices(data *userdata.UserData) {
func startKubernetesServices(data *userdata.UserData) {
svcs := system.Services(data)
svcs.Start(
svcs.LoadAndStart(
&services.Kubelet{},
&services.Kubeadm{},
)

View File

@ -28,7 +28,7 @@ type serviceCondition struct {
func (sc *serviceCondition) Wait(ctx context.Context) error {
instance.mu.Lock()
svcrunner := instance.State[sc.service]
svcrunner := instance.state[sc.service]
instance.mu.Unlock()
if svcrunner == nil {

View File

@ -41,6 +41,7 @@ type ServiceRunner struct {
stateSubscribers map[StateEvent][]chan<- struct{}
ctxMu sync.Mutex
ctx context.Context
ctxCancel context.CancelFunc
}
@ -53,10 +54,10 @@ func NewServiceRunner(service Service, userData *userdata.UserData) *ServiceRunn
service: service,
userData: userData,
id: service.ID(userData),
ctx: ctx,
ctxCancel: ctxCancel,
state: events.StateInitialized,
stateSubscribers: make(map[StateEvent][]chan<- struct{}),
ctx: ctx,
ctxCancel: ctxCancel,
}
}
@ -128,13 +129,13 @@ func (svcrunner *ServiceRunner) GetEventHistory(count int) []events.ServiceEvent
return svcrunner.events.Get(count)
}
func (svcrunner *ServiceRunner) waitFor(condition conditions.Condition) error {
func (svcrunner *ServiceRunner) waitFor(ctx context.Context, condition conditions.Condition) error {
description := condition.String()
svcrunner.UpdateState(events.StateWaiting, "Waiting for %s", description)
errCh := make(chan error)
go func() {
errCh <- condition.Wait(svcrunner.ctx)
errCh <- condition.Wait(ctx)
}()
ticker := time.NewTicker(WaitConditionCheckInterval)
@ -160,6 +161,17 @@ func (svcrunner *ServiceRunner) waitFor(condition conditions.Condition) error {
// Start should be run in a goroutine.
// nolint: gocyclo
func (svcrunner *ServiceRunner) Start() {
defer func() {
// reset context for the next run
svcrunner.ctxMu.Lock()
svcrunner.ctx, svcrunner.ctxCancel = context.WithCancel(context.Background())
svcrunner.ctxMu.Unlock()
}()
svcrunner.ctxMu.Lock()
ctx := svcrunner.ctx
svcrunner.ctxMu.Unlock()
condition := svcrunner.service.Condition(svcrunner.userData)
dependencies := svcrunner.service.DependsOn(svcrunner.userData)
if len(dependencies) > 0 {
@ -176,14 +188,14 @@ func (svcrunner *ServiceRunner) Start() {
}
if condition != nil {
if err := svcrunner.waitFor(condition); err != nil {
if err := svcrunner.waitFor(ctx, condition); err != nil {
svcrunner.UpdateState(events.StateFailed, "Condition failed: %v", err)
return
}
}
svcrunner.UpdateState(events.StatePreparing, "Running pre state")
if err := svcrunner.service.PreFunc(svcrunner.ctx, svcrunner.userData); err != nil {
if err := svcrunner.service.PreFunc(ctx, svcrunner.userData); err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed to run pre stage: %v", err)
return
}
@ -200,7 +212,7 @@ func (svcrunner *ServiceRunner) Start() {
return
}
if err := svcrunner.run(runnr); err != nil {
if err := svcrunner.run(ctx, runnr); err != nil {
svcrunner.UpdateState(events.StateFailed, "Failed running service: %v", err)
} else {
svcrunner.UpdateState(events.StateFinished, "Service finished successfully")
@ -213,13 +225,13 @@ func (svcrunner *ServiceRunner) Start() {
}
// nolint: gocyclo
func (svcrunner *ServiceRunner) run(runnr runner.Runner) error {
func (svcrunner *ServiceRunner) run(ctx context.Context, runnr runner.Runner) error {
if runnr == nil {
// special case - run nothing (TODO: we should handle it better, e.g. in PreFunc)
return nil
}
if err := runnr.Open(svcrunner.ctx); err != nil {
if err := runnr.Open(ctx); err != nil {
return errors.Wrap(err, "error opening runner")
}
@ -241,7 +253,7 @@ func (svcrunner *ServiceRunner) run(runnr runner.Runner) error {
defer healthWg.Done()
// nolint: errcheck
health.Run(svcrunner.ctx, healthSvc.HealthSettings(svcrunner.userData), &svcrunner.healthState, healthSvc.HealthFunc(svcrunner.userData))
health.Run(ctx, healthSvc.HealthSettings(svcrunner.userData), &svcrunner.healthState, healthSvc.HealthFunc(svcrunner.userData))
}()
notifyCh := make(chan health.StateChange, 2)
@ -254,7 +266,7 @@ func (svcrunner *ServiceRunner) run(runnr runner.Runner) error {
for {
select {
case <-svcrunner.ctx.Done():
case <-ctx.Done():
return
case change := <-notifyCh:
svcrunner.healthUpdate(change)
@ -269,7 +281,7 @@ func (svcrunner *ServiceRunner) run(runnr runner.Runner) error {
defer svcrunner.ctxCancel()
select {
case <-svcrunner.ctx.Done():
case <-ctx.Done():
err := runnr.Stop()
<-errCh
if err != nil {
@ -288,6 +300,8 @@ func (svcrunner *ServiceRunner) run(runnr runner.Runner) error {
//
// Shutdown completes when Start() returns
func (svcrunner *ServiceRunner) Shutdown() {
svcrunner.ctxMu.Lock()
defer svcrunner.ctxMu.Unlock()
svcrunner.ctxCancel()
}

View File

@ -320,6 +320,62 @@ func (suite *ServiceRunnerSuite) TestRunFail() {
}, sr)
}
func (suite *ServiceRunnerSuite) TestFullFlowRestart() {
sr := system.NewServiceRunner(&MockService{
condition: conditions.None(),
}, nil)
finished := make(chan struct{})
go func() {
defer close(finished)
sr.Start()
}()
time.Sleep(50 * time.Millisecond)
select {
case <-finished:
suite.Require().Fail("service running should be still running")
default:
}
sr.Shutdown()
<-finished
finished = make(chan struct{})
go func() {
defer close(finished)
sr.Start()
}()
time.Sleep(50 * time.Millisecond)
select {
case <-finished:
suite.Require().Fail("service running should be still running")
default:
}
sr.Shutdown()
<-finished
suite.assertStateSequence([]events.ServiceState{
events.StateWaiting,
events.StatePreparing,
events.StatePreparing,
events.StateRunning,
events.StateFinished,
events.StateWaiting,
events.StatePreparing,
events.StatePreparing,
events.StateRunning,
events.StateFinished,
}, sr)
}
func TestServiceRunnerSuite(t *testing.T) {
suite.Run(t, new(ServiceRunnerSuite))
}

View File

@ -11,6 +11,9 @@ import (
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/machined/pkg/system/conditions"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -19,7 +22,14 @@ type singleton struct {
UserData *userdata.UserData
// State of running services by ID
State map[string]*ServiceRunner
state map[string]*ServiceRunner
// List of running services at the moment.
//
// Service might be in any state, but service ID in the map
// implies ServiceRunner.Start() method is running at the momemnt
runningMu sync.Mutex
running map[string]struct{}
mu sync.Mutex
wg sync.WaitGroup
@ -35,41 +45,94 @@ func Services(data *userdata.UserData) *singleton {
once.Do(func() {
instance = &singleton{
UserData: data,
State: make(map[string]*ServiceRunner),
state: make(map[string]*ServiceRunner),
running: make(map[string]struct{}),
}
})
return instance
}
// Start will invoke the service's Pre, Condition, and Type funcs. If the any
// error occurs in the Pre or Condition invocations, it is up to the caller to
// to restart the service.
func (s *singleton) Start(services ...Service) {
// Load adds service to the list of services managed by the runner.
//
// Load returns service IDs for each of the services.
func (s *singleton) Load(services ...Service) []string {
s.mu.Lock()
defer s.mu.Unlock()
if s.terminating {
return
return nil
}
ids := make([]string, 0, len(services))
for _, service := range services {
id := service.ID(s.UserData)
ids = append(ids, id)
if _, exists := s.State[id]; exists {
// service already started?
// TODO: it might be nice to handle case when service
// should be restarted (e.g. kubeadm after reset)
if _, exists := s.state[id]; exists {
// service already loaded, ignore
continue
}
svcrunner := NewServiceRunner(service, s.UserData)
s.State[id] = svcrunner
s.state[id] = svcrunner
}
return ids
}
// Start will invoke the service's Pre, Condition, and Type funcs. If the any
// error occurs in the Pre or Condition invocations, it is up to the caller to
// to restart the service.
func (s *singleton) Start(serviceIDs ...string) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.terminating {
return nil
}
var multiErr *multierror.Error
for _, id := range serviceIDs {
svcrunner := s.state[id]
if svcrunner == nil {
multiErr = multierror.Append(multiErr, errors.Errorf("service %q not defined", id))
}
s.runningMu.Lock()
_, running := s.running[id]
s.runningMu.Unlock()
if running {
// service already running, skip
continue
}
s.wg.Add(1)
go func(svcrunner *ServiceRunner) {
go func(id string, svcrunner *ServiceRunner) {
defer func() {
s.runningMu.Lock()
delete(s.running, id)
s.runningMu.Unlock()
}()
defer s.wg.Done()
s.runningMu.Lock()
s.running[id] = struct{}{}
s.runningMu.Unlock()
svcrunner.Start()
}(svcrunner)
}(id, svcrunner)
}
return multiErr.ErrorOrNil()
}
// LoadAndStart combines Load and Start into single call.
func (s *singleton) LoadAndStart(services ...Service) {
err := s.Start(s.Load(services...)...)
if err != nil {
// should never happen
panic(err)
}
}
@ -82,7 +145,7 @@ func (s *singleton) Shutdown() {
}
stateCopy := make(map[string]*ServiceRunner)
s.terminating = true
for name, svcrunner := range s.State {
for name, svcrunner := range s.state {
stateCopy[name] = svcrunner
}
s.mu.Unlock()
@ -128,8 +191,8 @@ func (s *singleton) List() (result []*ServiceRunner) {
s.mu.Lock()
defer s.mu.Unlock()
result = make([]*ServiceRunner, 0, len(s.State))
for _, svcrunner := range s.State {
result = make([]*ServiceRunner, 0, len(s.state))
for _, svcrunner := range s.state {
result = append(result, svcrunner)
}
@ -155,10 +218,10 @@ func (s *singleton) Stop(ctx context.Context, serviceIDs ...string) (err error)
// Copy current service state
stateCopy := make(map[string]*ServiceRunner)
for _, id := range serviceIDs {
if _, ok := s.State[id]; !ok {
if _, ok := s.state[id]; !ok {
return fmt.Errorf("service not found: %s", id)
}
stateCopy[id] = s.State[id]
stateCopy[id] = s.state[id]
}
s.mu.Unlock()

View File

@ -18,7 +18,7 @@ type SystemServicesSuite struct {
}
func (suite *SystemServicesSuite) TestStartShutdown() {
system.Services(nil).Start(
system.Services(nil).LoadAndStart(
&MockService{name: "containerd"},
&MockService{name: "proxyd", dependencies: []string{"containerd"}},
&MockService{name: "trustd", dependencies: []string{"containerd", "proxyd"}},
@ -33,7 +33,7 @@ func TestSystemServicesSuite(t *testing.T) {
}
func (suite *SystemServicesSuite) TestStartStop() {
system.Services(nil).Start(
system.Services(nil).LoadAndStart(
&MockService{name: "yolo"},
)
time.Sleep(10 * time.Millisecond)

View File

@ -14,6 +14,8 @@ service Init {
rpc Reboot(google.protobuf.Empty) returns (RebootReply) {}
rpc Reset(google.protobuf.Empty) returns (ResetReply) {}
rpc Shutdown(google.protobuf.Empty) returns (ShutdownReply) {}
rpc Start(StartRequest) returns (StartReply) {}
rpc Stop(StopRequest) returns (StopReply) {}
rpc Upgrade(UpgradeRequest) returns (UpgradeReply) {}
rpc ServiceList(google.protobuf.Empty) returns (ServiceListReply) {}
}
@ -55,6 +57,10 @@ message ServiceHealth {
google.protobuf.Timestamp lastChange = 4;
}
message StartRequest { string id = 1; }
message StartReply { string resp = 1; }
message StopRequest { string id = 1; }
message StopReply { string resp = 1; }

View File

@ -54,6 +54,16 @@ func (c *InitServiceClient) Reset(ctx context.Context, in *empty.Empty) (data *p
return c.InitClient.Reset(ctx, in)
}
// Start executes the init Start() API.
func (c *InitServiceClient) Start(ctx context.Context, in *proto.StartRequest) (data *proto.StartReply, err error) {
return c.InitClient.Start(ctx, in)
}
// Stop executes the init Stop() API.
func (c *InitServiceClient) Stop(ctx context.Context, in *proto.StopRequest) (data *proto.StopReply, err error) {
return c.InitClient.Stop(ctx, in)
}
// ServiceList executes the init ServiceList() API.
func (c *InitServiceClient) ServiceList(ctx context.Context, in *empty.Empty) (data *proto.ServiceListReply, err error) {
return c.InitClient.ServiceList(ctx, in)