refactor: containerd runner refactoring and unit-tests (#551)

Signed-off-by: Andrey Smirnov <smirnov.andrey@gmail.com>
This commit is contained in:
Andrey Smirnov 2019-04-16 23:56:52 +03:00 committed by Andrew Rynhard
parent bf94cbcb2c
commit d29e27ee33
13 changed files with 439 additions and 97 deletions

View File

@ -64,7 +64,6 @@ steps:
environment:
BUILDKIT_HOST: tcp://buildkitd.ci.svc:1234
commands:
- make clean
- make
depends_on:
- fetch

View File

@ -227,7 +227,8 @@ ENTRYPOINT ["entrypoint.sh"]
# The test target performs tests on the codebase.
FROM base-src AS test
# xfsprogs is required by the tests
# xfsprogs and containerd are required by the tests
COPY --from=rootfs-build /rootfs /rootfs
ENV PATH /rootfs/bin:$PATH
COPY hack/golang/test.sh /bin

View File

@ -148,7 +148,7 @@ talos: buildkitd
$(COMMON_ARGS)
@docker load < build/$@.tar
test: buildkitd
test: buildkitd rootfs
@mkdir -p build
@buildctl --addr $(BUILDKIT_HOST) \
build \

View File

@ -6,63 +6,67 @@ package containerd
import (
"context"
"fmt"
"log"
"path/filepath"
"syscall"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/containerd/containerd/runtime/restart"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/pkg/userdata"
)
// Containerd represents a service to be run in a container.
type Containerd struct{}
// containerdRunner is a runner.Runner that runs container in containerd
type containerdRunner struct {
data *userdata.UserData
args *runner.Args
opts *runner.Options
// WithMemoryLimit sets the linux resource memory limit field.
func WithMemoryLimit(limit int64) oci.SpecOpts {
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *specs.Spec) error {
s.Linux.Resources.Memory = &specs.LinuxMemory{
Limit: &limit,
// DisableOOMKiller: &disable,
}
return nil
}
stop chan struct{}
stopped chan struct{}
}
// WithRootfsPropagation sets the root filesystem propagation.
func WithRootfsPropagation(rp string) oci.SpecOpts {
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *specs.Spec) error {
s.Linux.RootfsPropagation = rp
return nil
// errStopped is used internally to signal that task was stopped
var errStopped = errors.New("stopped")
// NewRunner creates runner.Runner that runs a container in containerd
func NewRunner(data *userdata.UserData, args *runner.Args, setters ...runner.Option) runner.Runner {
r := &containerdRunner{
data: data,
args: args,
opts: runner.DefaultOptions(),
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
for _, setter := range setters {
setter(r.opts)
}
return r
}
// Run implements the Runner interface.
// nolint: gocyclo
func (c *Containerd) Run(data *userdata.UserData, args runner.Args, setters ...runner.Option) error {
// Wait for the containerd socket.
func (c *containerdRunner) Run() error {
defer close(c.stopped)
// Wait for the containerd socket.
_, err := conditions.WaitForFileToExist(defaults.DefaultAddress)()
if err != nil {
return err
}
// Create the default runner options.
opts := runner.DefaultOptions()
for _, setter := range setters {
setter(opts)
}
// Create the containerd client.
ctx := namespaces.WithNamespace(context.Background(), opts.Namespace)
ctx := namespaces.WithNamespace(context.Background(), c.opts.Namespace)
client, err := containerd.New(defaults.DefaultAddress)
if err != nil {
return err
@ -70,83 +74,139 @@ func (c *Containerd) Run(data *userdata.UserData, args runner.Args, setters ...r
// nolint: errcheck
defer client.Close()
image, err := client.GetImage(ctx, opts.ContainerImage)
image, err := client.GetImage(ctx, c.opts.ContainerImage)
if err != nil {
return err
}
// Create the container.
specOpts := newOCISpecOpts(image, args, opts)
containerOpts := newContainerOpts(image, args, opts, specOpts)
specOpts := c.newOCISpecOpts(image)
containerOpts := c.newContainerOpts(image, specOpts)
container, err := client.NewContainer(
ctx,
args.ID,
c.args.ID,
containerOpts...,
)
if err != nil {
return fmt.Errorf("failed to create container %q: %v", args.ID, err)
return errors.Wrapf(err, "failed to create container %q", c.args.ID)
}
defer container.Delete(ctx, containerd.WithSnapshotCleanup) // nolint: errcheck
// Create the task and start it.
task, err := container.NewTask(ctx, cio.LogFile(logPath(args)))
if err != nil {
return fmt.Errorf("failed to create task: %q: %v", args.ID, err)
}
if err := task.Start(ctx); err != nil {
return fmt.Errorf("failed to start task: %q: %v", args.ID, err)
}
// Wait for the task exit code.
if opts.Type == runner.Once {
defer container.Delete(ctx, containerd.WithSnapshotCleanup) // nolint: errcheck
defer task.Delete(ctx) // nolint: errcheck
statusC, err := task.Wait(ctx)
if err != nil {
return fmt.Errorf("failed waiting for task: %q: %v", args.ID, err)
// Manage task lifecycle
switch c.opts.Type {
case runner.Once:
err = c.runOnce(ctx, container)
if err == errStopped {
err = nil
}
status := <-statusC
return err
case runner.Forever:
for {
err = c.runOnce(ctx, container)
if err == errStopped {
return nil
}
if err != nil {
log.Printf("error running %v, going to restart forever: %s", c.args.ID, err)
}
select {
case <-c.stop:
return nil
case <-time.After(c.opts.RestartInterval):
}
}
default:
panic("unsupported runner type")
}
}
func (c *containerdRunner) runOnce(ctx context.Context, container containerd.Container) error {
// Create the task and start it.
task, err := container.NewTask(ctx, cio.LogFile(c.logPath()))
if err != nil {
return errors.Wrapf(err, "failed to create task: %q", c.args.ID)
}
defer task.Delete(ctx) // nolint: errcheck
if err = task.Start(ctx); err != nil {
return errors.Wrapf(err, "failed to start task: %q", c.args.ID)
}
statusC, err := task.Wait(ctx)
if err != nil {
return errors.Wrapf(err, "failed waiting for task: %q", c.args.ID)
}
select {
case status := <-statusC:
code := status.ExitCode()
if code != 0 {
return fmt.Errorf("task %q failed: exit code %d", args.ID, code)
return errors.Errorf("task %q failed: exit code %d", c.args.ID, code)
}
return nil
case <-c.stop:
// graceful stop the task
log.Printf("sending SIGTERM to %v", c.args.ID)
// nolint: errcheck
_ = task.Kill(ctx, syscall.SIGTERM, containerd.WithKillAll)
}
select {
case <-statusC:
// stopped process exited
return errStopped
case <-time.After(c.opts.GracefulShutdownTimeout):
// kill the process
log.Printf("sending SIGKILL to %v", c.args.ID)
// nolint: errcheck
_ = task.Kill(ctx, syscall.SIGKILL, containerd.WithKillAll)
}
<-statusC
return errStopped
}
// Stop implements runner.Runner interface
func (c *containerdRunner) Stop() error {
close(c.stop)
<-c.stopped
return nil
}
func newContainerOpts(image containerd.Image, args runner.Args, opts *runner.Options, specOpts []oci.SpecOpts) []containerd.NewContainerOpts {
func (c *containerdRunner) newContainerOpts(image containerd.Image, specOpts []oci.SpecOpts) []containerd.NewContainerOpts {
containerOpts := []containerd.NewContainerOpts{
containerd.WithImage(image),
containerd.WithNewSnapshot(args.ID, image),
containerd.WithNewSnapshot(c.args.ID, image),
containerd.WithNewSpec(specOpts...),
}
if opts.Type == runner.Forever {
containerOpts = append(containerOpts, restart.WithStatus(containerd.Running), restart.WithLogPath(logPath(args)))
}
containerOpts = append(containerOpts, opts.ContainerOpts...)
containerOpts = append(containerOpts, c.opts.ContainerOpts...)
return containerOpts
}
func newOCISpecOpts(image oci.Image, args runner.Args, opts *runner.Options) []oci.SpecOpts {
func (c *containerdRunner) newOCISpecOpts(image oci.Image) []oci.SpecOpts {
specOpts := []oci.SpecOpts{
oci.WithImageConfig(image),
oci.WithProcessArgs(args.ProcessArgs...),
oci.WithEnv(opts.Env),
oci.WithProcessArgs(c.args.ProcessArgs...),
oci.WithEnv(c.opts.Env),
oci.WithHostNamespace(specs.NetworkNamespace),
oci.WithHostNamespace(specs.PIDNamespace),
oci.WithHostHostsFile,
oci.WithHostResolvconf,
oci.WithPrivileged,
}
specOpts = append(specOpts, opts.OCISpecOpts...)
specOpts = append(specOpts, c.opts.OCISpecOpts...)
return specOpts
}
func logPath(args runner.Args) string {
return "/var/log/" + args.ID + ".log"
func (c *containerdRunner) logPath() string {
return filepath.Join(c.opts.LogPath, c.args.ID+".log")
}

View File

@ -0,0 +1,251 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package containerd_test
import (
"bytes"
"context"
"io/ioutil"
"os"
"path/filepath"
"sync"
"testing"
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/stretchr/testify/suite"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
containerdrunner "github.com/talos-systems/talos/internal/app/init/pkg/system/runner/containerd"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
)
const (
containerdNamespace = "talostest"
busyboxImage = "docker.io/library/busybox:latest"
)
type ContainerdSuite struct {
suite.Suite
tmpDir string
containerdRunner runner.Runner
containerdWg sync.WaitGroup
client *containerd.Client
image containerd.Image
}
func (suite *ContainerdSuite) SetupSuite() {
var err error
args := &runner.Args{
ID: "containerd",
ProcessArgs: []string{"/rootfs/bin/containerd"},
}
suite.tmpDir, err = ioutil.TempDir("", "talos")
suite.Require().NoError(err)
suite.containerdRunner = process.NewRunner(
&userdata.UserData{},
args,
runner.WithType(runner.Once),
runner.WithLogPath(suite.tmpDir),
runner.WithEnv([]string{"PATH=/rootfs/bin:" + constants.PATH}),
)
suite.containerdWg.Add(1)
go func() {
defer suite.containerdWg.Done()
suite.Require().NoError(suite.containerdRunner.Run())
}()
suite.client, err = containerd.New(defaults.DefaultAddress)
suite.Require().NoError(err)
ctx := namespaces.WithNamespace(context.Background(), containerdNamespace)
suite.image, err = suite.client.Pull(ctx, busyboxImage, containerd.WithPullUnpack)
suite.Require().NoError(err)
}
func (suite *ContainerdSuite) TeardownSuite() {
suite.Require().NoError(suite.client.Close())
suite.Require().NoError(suite.containerdRunner.Stop())
suite.containerdWg.Wait()
suite.Require().NoError(os.RemoveAll(suite.tmpDir))
}
func (suite *ContainerdSuite) TestRunSuccess() {
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "test",
ProcessArgs: []string{"/bin/sh", "-c", "exit 0"},
},
runner.WithType(runner.Once),
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
)
suite.Assert().NoError(r.Run())
// calling stop when Run has finished is no-op
suite.Assert().NoError(r.Stop())
}
func (suite *ContainerdSuite) TestRunTwice() {
// running same container twice should be fine
// (checks that containerd state is cleaned up properly)
for i := 0; i < 2; i++ {
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "runtwice",
ProcessArgs: []string{"/bin/sh", "-c", "exit 0"},
},
runner.WithType(runner.Once),
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
)
suite.Assert().NoError(r.Run())
// calling stop when Run has finished is no-op
suite.Assert().NoError(r.Stop())
}
}
func (suite *ContainerdSuite) TestRunLogs() {
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "logtest",
ProcessArgs: []string{"/bin/sh", "-c", "echo -n \"Test 1\nTest 2\n\""},
},
runner.WithType(runner.Once),
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
)
suite.Assert().NoError(r.Run())
logFile, err := os.Open(filepath.Join(suite.tmpDir, "logtest.log"))
suite.Assert().NoError(err)
// nolint: errcheck
defer logFile.Close()
logContents, err := ioutil.ReadAll(logFile)
suite.Assert().NoError(err)
suite.Assert().Equal([]byte("Test 1\nTest 2\n"), logContents)
}
func (suite *ContainerdSuite) TestStopFailingAndRestarting() {
testDir := filepath.Join(suite.tmpDir, "test")
suite.Assert().NoError(os.Mkdir(testDir, 0770))
testFile := filepath.Join(testDir, "talos-test")
// nolint: errcheck
_ = os.Remove(testFile)
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "endless",
ProcessArgs: []string{"/bin/sh", "-c", "test -f " + testFile + " && echo ok || (echo fail; false)"},
},
runner.WithType(runner.Forever),
runner.WithLogPath(suite.tmpDir),
runner.WithRestartInterval(5*time.Millisecond),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
runner.WithOCISpecOpts(
oci.WithMounts([]specs.Mount{
{Type: "bind", Destination: testDir, Source: testDir, Options: []string{"bind", "ro"}},
}),
),
)
done := make(chan error, 1)
go func() {
done <- r.Run()
}()
time.Sleep(500 * time.Millisecond)
select {
case err := <-done:
suite.Assert().Failf("task should be running", "error: %s", err)
return
default:
}
f, err := os.Create(testFile)
suite.Assert().NoError(err)
suite.Assert().NoError(f.Close())
time.Sleep(500 * time.Millisecond)
select {
case err = <-done:
suite.Assert().Failf("task should be running", "error: %s", err)
return
default:
}
suite.Assert().NoError(r.Stop())
<-done
logFile, err := os.Open(filepath.Join(suite.tmpDir, "endless.log"))
suite.Assert().NoError(err)
// nolint: errcheck
defer logFile.Close()
logContents, err := ioutil.ReadAll(logFile)
suite.Assert().NoError(err)
suite.Assert().Truef(bytes.Contains(logContents, []byte("ok\n")), "logContents doesn't contain success entry: %v", logContents)
suite.Assert().Truef(bytes.Contains(logContents, []byte("fail\n")), "logContents doesn't contain fail entry: %v", logContents)
}
func (suite *ContainerdSuite) TestStopSigKill() {
r := containerdrunner.NewRunner(&userdata.UserData{}, &runner.Args{
ID: "nokill",
ProcessArgs: []string{"/bin/sh", "-c", "trap -- '' SIGTERM; while true; do sleep 1; done"},
},
runner.WithType(runner.Forever),
runner.WithLogPath(suite.tmpDir),
runner.WithNamespace(containerdNamespace),
runner.WithContainerImage(busyboxImage),
runner.WithRestartInterval(5*time.Millisecond),
runner.WithGracefulShutdownTimeout(10*time.Millisecond))
done := make(chan error, 1)
go func() {
done <- r.Run()
}()
time.Sleep(50 * time.Millisecond)
select {
case <-done:
suite.Assert().Fail("container should be still running")
default:
}
time.Sleep(100 * time.Millisecond)
suite.Assert().NoError(r.Stop())
<-done
}
func TestContainerdSuite(t *testing.T) {
suite.Run(t, new(ContainerdSuite))
}

View File

@ -0,0 +1,32 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package containerd
import (
"context"
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
)
// WithMemoryLimit sets the linux resource memory limit field.
func WithMemoryLimit(limit int64) oci.SpecOpts {
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *specs.Spec) error {
s.Linux.Resources.Memory = &specs.LinuxMemory{
Limit: &limit,
// DisableOOMKiller: &disable,
}
return nil
}
}
// WithRootfsPropagation sets the root filesystem propagation.
func WithRootfsPropagation(rp string) oci.SpecOpts {
return func(_ context.Context, _ oci.Client, _ *containers.Container, s *specs.Spec) error {
s.Linux.RootfsPropagation = rp
return nil
}
}

View File

@ -171,11 +171,9 @@ func (k *Kubeadm) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.Containerd{}
return r.Run(
r := containerd.NewRunner(
data,
args,
&args,
runner.WithNamespace(criconstants.K8sContainerdNamespace),
runner.WithContainerImage(image),
runner.WithEnv(env),
@ -189,6 +187,8 @@ func (k *Kubeadm) Start(data *userdata.UserData) error {
),
runner.WithType(runner.Once),
)
return r.Run()
}
func enforceMasterOverrides(initConfiguration *kubeadmapi.InitConfiguration) {

View File

@ -124,11 +124,9 @@ func (k *Kubelet) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.Containerd{}
return r.Run(
r := containerd.NewRunner(
data,
args,
&args,
runner.WithNamespace(criconstants.K8sContainerdNamespace),
runner.WithContainerImage(image),
runner.WithEnv(env),
@ -141,4 +139,6 @@ func (k *Kubelet) Start(data *userdata.UserData) error {
),
runner.WithType(runner.Forever),
)
return r.Run()
}

View File

@ -58,11 +58,9 @@ func (n *NTPd) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.Containerd{}
return r.Run(
r := containerd.NewRunner(
data,
args,
&args,
runner.WithContainerImage(image),
runner.WithEnv(env),
runner.WithOCISpecOpts(
@ -70,4 +68,6 @@ func (n *NTPd) Start(data *userdata.UserData) error {
oci.WithMounts(mounts),
),
)
return r.Run()
}

View File

@ -69,15 +69,15 @@ func (o *OSD) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.Containerd{}
return r.Run(
r := containerd.NewRunner(
data,
args,
&args,
runner.WithContainerImage(image),
runner.WithEnv(env),
runner.WithOCISpecOpts(
oci.WithMounts(mounts),
),
)
return r.Run()
}

View File

@ -62,11 +62,9 @@ func (p *Proxyd) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.Containerd{}
return r.Run(
r := containerd.NewRunner(
data,
args,
&args,
runner.WithContainerImage(image),
runner.WithEnv(env),
runner.WithOCISpecOpts(
@ -75,4 +73,6 @@ func (p *Proxyd) Start(data *userdata.UserData) error {
oci.WithPrivileged,
),
)
return r.Run()
}

View File

@ -62,11 +62,9 @@ func (t *Trustd) Start(data *userdata.UserData) error {
env = append(env, fmt.Sprintf("%s=%s", key, val))
}
r := containerd.Containerd{}
return r.Run(
r := containerd.NewRunner(
data,
args,
&args,
runner.WithContainerImage(image),
runner.WithEnv(env),
runner.WithOCISpecOpts(
@ -74,4 +72,6 @@ func (t *Trustd) Start(data *userdata.UserData) error {
oci.WithMounts(mounts),
),
)
return r.Run()
}

View File

@ -232,11 +232,9 @@ func (r *Registrator) Reset(ctx context.Context, in *empty.Empty) (reply *proto.
{Type: "bind", Destination: "/bin/kubeadm", Source: "/bin/kubeadm", Options: []string{"bind", "ro"}},
}
cr := containerdrunner.Containerd{}
err = cr.Run(
cr := containerdrunner.NewRunner(
r.Data,
args,
&args,
runner.WithContainerImage(constants.KubernetesImage),
runner.WithOCISpecOpts(
containerdrunner.WithMemoryLimit(int64(1000000*512)),
@ -249,6 +247,7 @@ func (r *Registrator) Reset(ctx context.Context, in *empty.Empty) (reply *proto.
runner.WithType(runner.Once),
)
err = cr.Run()
if err != nil {
return nil, err
}