fix(osd): Fix k8s.io namespace logs (#557)

Signed-off-by: Brad Beam <brad.beam@talos-systems.com>
This commit is contained in:
Brad Beam 2019-04-18 10:49:33 -05:00 committed by Andrew Rynhard
parent 7da7c8c2ff
commit 271d28244b
10 changed files with 176 additions and 92 deletions

View File

@ -13,13 +13,13 @@ import (
"github.com/containerd/containerd"
"github.com/containerd/containerd/cio"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -59,7 +59,7 @@ func (c *containerdRunner) Run() error {
defer close(c.stopped)
// Wait for the containerd socket.
_, err := conditions.WaitForFileToExist(defaults.DefaultAddress)()
_, err := conditions.WaitForFileToExist(constants.ContainerdAddress)()
if err != nil {
return err
}
@ -67,7 +67,7 @@ func (c *containerdRunner) Run() error {
// Create the containerd client.
ctx := namespaces.WithNamespace(context.Background(), c.opts.Namespace)
client, err := containerd.New(defaults.DefaultAddress)
client, err := containerd.New(constants.ContainerdAddress)
if err != nil {
return err
}

View File

@ -15,7 +15,6 @@ import (
"time"
"github.com/containerd/containerd"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
@ -68,7 +67,7 @@ func (suite *ContainerdSuite) SetupSuite() {
suite.Require().NoError(suite.containerdRunner.Run())
}()
suite.client, err = containerd.New(defaults.DefaultAddress)
suite.client, err = containerd.New(constants.ContainerdAddress)
suite.Require().NoError(err)
ctx := namespaces.WithNamespace(context.Background(), containerdNamespace)

View File

@ -10,11 +10,11 @@ import (
"os"
"github.com/containerd/containerd"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/namespaces"
multierror "github.com/hashicorp/go-multierror"
"github.com/pkg/errors"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/pkg/constants"
)
// ImportRequest represents an image import request.
@ -25,13 +25,13 @@ type ImportRequest struct {
// Import imports the images specified by the import requests.
func Import(namespace string, reqs ...*ImportRequest) error {
_, err := conditions.WaitForFileToExist(defaults.DefaultAddress)()
_, err := conditions.WaitForFileToExist(constants.ContainerdAddress)()
if err != nil {
return err
}
ctx := namespaces.WithNamespace(context.Background(), namespace)
client, err := containerd.New(defaults.DefaultAddress)
client, err := containerd.New(constants.ContainerdAddress)
if err != nil {
return err
}

View File

@ -12,6 +12,7 @@ import (
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner"
"github.com/talos-systems/talos/internal/app/init/pkg/system/runner/process"
"github.com/talos-systems/talos/internal/pkg/constants"
"github.com/talos-systems/talos/pkg/userdata"
)
@ -44,7 +45,7 @@ func (c *Containerd) Start(data *userdata.UserData) error {
// Set the process arguments.
args := &runner.Args{
ID: c.ID(data),
ProcessArgs: []string{"/bin/containerd"},
ProcessArgs: []string{"/bin/containerd", "--address", constants.ContainerdAddress},
}
env := []string{}

View File

@ -17,7 +17,6 @@ import (
"strings"
"time"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/oci"
criconstants "github.com/containerd/cri/pkg/constants"
specs "github.com/opencontainers/runtime-spec/specs-go"
@ -92,7 +91,7 @@ func (k *Kubeadm) PostFunc(data *userdata.UserData) error {
// ConditionFunc implements the Service interface.
func (k *Kubeadm) ConditionFunc(data *userdata.UserData) conditions.ConditionFunc {
files := []string{defaults.DefaultAddress}
files := []string{constants.ContainerdAddress}
return conditions.WaitForFilesToExist(files...)
}
@ -203,7 +202,7 @@ func writeKubeadmConfig(data *userdata.UserData) (err error) {
if !ok {
return errors.New("expected InitConfiguration")
}
initConfiguration.NodeRegistration.CRISocket = defaults.DefaultAddress
initConfiguration.NodeRegistration.CRISocket = constants.ContainerdAddress
enforceMasterOverrides(initConfiguration)
if err = cis.EnforceMasterRequirements(initConfiguration); err != nil {
return err
@ -217,7 +216,7 @@ func writeKubeadmConfig(data *userdata.UserData) (err error) {
if !ok {
return errors.New("expected JoinConfiguration")
}
joinConfiguration.NodeRegistration.CRISocket = defaults.DefaultAddress
joinConfiguration.NodeRegistration.CRISocket = constants.ContainerdAddress
if err = cis.EnforceWorkerRequirements(joinConfiguration); err != nil {
return err
}

View File

@ -10,7 +10,6 @@ import (
"os"
"strings"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/oci"
criconstants "github.com/containerd/cri/pkg/constants"
specs "github.com/opencontainers/runtime-spec/specs-go"
@ -60,7 +59,7 @@ func (k *Kubelet) PostFunc(data *userdata.UserData) (err error) {
// ConditionFunc implements the Service interface.
func (k *Kubelet) ConditionFunc(data *userdata.UserData) conditions.ConditionFunc {
return conditions.WaitForFilesToExist("/var/lib/kubelet/kubeadm-flags.env", defaults.DefaultAddress)
return conditions.WaitForFilesToExist("/var/lib/kubelet/kubeadm-flags.env", constants.ContainerdAddress)
}
// Start implements the Service interface.
@ -78,7 +77,7 @@ func (k *Kubelet) Start(data *userdata.UserData) error {
"--config=/var/lib/kubelet/config.yaml",
"--container-runtime=remote",
"--runtime-request-timeout=15m",
"--container-runtime-endpoint=unix://" + defaults.DefaultAddress,
"--container-runtime-endpoint=unix://" + constants.ContainerdAddress,
},
}

View File

@ -9,7 +9,6 @@ import (
"fmt"
"os"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/oci"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/talos-systems/talos/internal/app/init/pkg/system/conditions"
@ -56,9 +55,9 @@ func (o *OSD) Start(data *userdata.UserData) error {
mounts := []specs.Mount{
{Type: "bind", Destination: "/tmp", Source: "/tmp", Options: []string{"rbind", "rshared", "rw"}},
{Type: "bind", Destination: constants.UserDataPath, Source: constants.UserDataPath, Options: []string{"rbind", "ro"}},
{Type: "bind", Destination: defaults.DefaultAddress, Source: defaults.DefaultAddress, Options: []string{"bind", "ro"}},
{Type: "bind", Destination: "/var/run", Source: "/var/run", Options: []string{"rbind", "rw"}},
{Type: "bind", Destination: "/run", Source: "/run", Options: []string{"rbind", "rw"}},
{Type: "bind", Destination: constants.ContainerdAddress, Source: constants.ContainerdAddress, Options: []string{"bind", "ro"}},
{Type: "bind", Destination: "/etc/kubernetes", Source: "/etc/kubernetes", Options: []string{"bind", "rw"}},
{Type: "bind", Destination: "/etc/ssl", Source: "/etc/ssl", Options: []string{"bind", "ro"}},
{Type: "bind", Destination: "/var/log", Source: "/var/log", Options: []string{"rbind", "rw"}},

View File

@ -8,60 +8,81 @@ import (
"context"
"log"
"path"
"path/filepath"
"strings"
"github.com/containerd/containerd"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/namespaces"
"github.com/talos-systems/talos/internal/pkg/constants"
)
type containerProc struct {
Name string // Friendly name
ID string // container sha/id
Digest string // Container Digest
Status string // Running state of container
Pid uint32
type container struct {
Display string // Friendly Name
Name string // container name
ID string // container sha/id
Digest string // Container Digest
Image string
Status string // Running state of container
Pid uint32
LogFile string
}
Container containerd.Container
Context context.Context
type pod struct {
Name string
Sandbox string
Containers []*container
}
func connect(namespace string) (*containerd.Client, context.Context, error) {
client, err := containerd.New(defaults.DefaultAddress)
client, err := containerd.New(constants.ContainerdAddress)
return client, namespaces.WithNamespace(context.Background(), namespace), err
}
func containerID(namespace string) ([]containerProc, error) {
// nolint: gocyclo
func podInfo(namespace string) ([]*pod, error) {
pods := []*pod{}
client, ctx, err := connect(namespace)
if err != nil {
return nil, err
return pods, err
}
// nolint: errcheck
defer client.Close()
containers, err := client.Containers(ctx)
var imageList map[string]string
imageList, err = images(namespace)
if err != nil {
return nil, err
return pods, err
}
cps := make([]containerProc, len(containers))
containers, err := client.Containers(ctx)
if err != nil {
return pods, err
}
for _, container := range containers {
cp := containerProc{}
for _, cntr := range containers {
cp := &container{}
info, err := container.Info(ctx)
info, err := cntr.Info(ctx)
if err != nil {
log.Println(err)
continue
}
img, err := container.Image(ctx)
spec, err := cntr.Spec(ctx)
if err != nil {
log.Println(err)
continue
}
task, err := container.Task(ctx, nil)
img, err := cntr.Image(ctx)
if err != nil {
log.Println(err)
continue
}
task, err := cntr.Task(ctx, nil)
if err != nil {
log.Println(err)
continue
@ -73,22 +94,78 @@ func containerID(namespace string) ([]containerProc, error) {
continue
}
cp.ID = container.ID()
cp.Name = container.ID()
cp.ID = cntr.ID()
cp.Name = cntr.ID()
cp.Display = cntr.ID()
cp.Digest = img.Target().Digest.String()
cp.Container = container
cp.Context = ctx
cp.Image = imageList[img.Target().Digest.String()]
cp.Pid = task.Pid()
cp.Status = strings.ToUpper(string(status.Status))
if _, ok := info.Labels["io.kubernetes.pod.name"]; ok {
cp.Name = path.Join(info.Labels["io.kubernetes.pod.namespace"], info.Labels["io.kubernetes.pod.name"])
if cname, ok := info.Labels["io.kubernetes.pod.name"]; ok {
if cns, ok := info.Labels["io.kubernetes.pod.namespace"]; ok {
cp.Display = path.Join(cns, cname)
}
}
cps = append(cps, cp)
// Save off an identifier for the pod
// this is typically the container name (non-k8s namespace)
// or will be k8s namespace"/"k8s pod name":"container name
podName := cp.Display
// Typically on actual application containers inside the pod/sandbox
if _, ok := info.Labels["io.kubernetes.container.name"]; ok {
cp.Name = info.Labels["io.kubernetes.container.name"]
cp.Display = cp.Display + ":" + info.Labels["io.kubernetes.container.name"]
}
// Typically on the 'infrastructure' container, aka k8s.gcr.io/pause
var sandbox string
if _, ok := spec.Annotations["io.kubernetes.cri.sandbox-log-directory"]; ok {
sandbox = spec.Annotations["io.kubernetes.cri.sandbox-log-directory"]
}
// Figure out if we need to create a new pod or append
// to an existing pod
// Also set pod sandbox ID if defined
found := false
for _, pod := range pods {
if pod.Name != podName {
continue
}
if sandbox != "" {
pod.Sandbox = sandbox
}
pod.Containers = append(pod.Containers, cp)
found = true
break
}
if !found {
p := &pod{
Name: podName,
Containers: []*container{cp},
}
if sandbox != "" {
p.Sandbox = sandbox
}
pods = append(pods, p)
}
}
return cps, nil
// This seems janky because it is
// But we need to loop through everything again to associate
// the sandbox with the container name so we can get a proper
// filepath to the location of the logfile
for _, contents := range pods {
for _, cntr := range contents.Containers {
if strings.Contains(cntr.Display, ":") && contents.Sandbox != "" {
cntr.LogFile = filepath.Join(contents.Sandbox, cntr.Name, "0.log")
}
}
}
return pods, nil
}
func images(namespace string) (map[string]string, error) {
@ -107,6 +184,9 @@ func images(namespace string) (map[string]string, error) {
// create a map[sha]name for easier lookups later
imageList := make(map[string]string, len(images))
for _, image := range images {
if strings.HasPrefix(image.Name(), "sha256:") {
continue
}
imageList[image.Target().Digest.String()] = image.Name()
}
return imageList, nil

View File

@ -13,7 +13,6 @@ import (
"github.com/containerd/cgroups"
"github.com/containerd/containerd"
tasks "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/namespaces"
"github.com/containerd/containerd/oci"
"github.com/containerd/typeurl"
@ -59,32 +58,25 @@ func (r *Registrator) Kubeconfig(ctx context.Context, in *empty.Empty) (data *pr
}
// Processes implements the proto.OSDServer interface.
// nolint: gocyclo
func (r *Registrator) Processes(ctx context.Context, in *proto.ProcessesRequest) (reply *proto.ProcessesReply, err error) {
imageList, err := images(in.Namespace)
if err != nil {
return nil, err
}
containers, err := containerID(in.Namespace)
pods, err := podInfo(in.Namespace)
if err != nil {
return nil, err
}
processes := []*proto.Process{}
for _, container := range containers {
if container.Name == "" {
continue
for _, containers := range pods {
for _, container := range containers.Containers {
process := &proto.Process{
Namespace: in.Namespace,
Id: container.Display,
Image: container.Image,
Pid: container.Pid,
Status: container.Status,
}
processes = append(processes, process)
}
process := &proto.Process{
Namespace: in.Namespace,
Id: container.Name,
Image: imageList[container.Digest],
Pid: container.Pid,
Status: container.Status,
}
processes = append(processes, process)
}
return &proto.ProcessesReply{Processes: processes}, nil
@ -168,7 +160,7 @@ func (r *Registrator) Stats(ctx context.Context, in *proto.StatsRequest) (reply
// Restart implements the proto.OSDServer interface.
func (r *Registrator) Restart(ctx context.Context, in *proto.RestartRequest) (reply *proto.RestartReply, err error) {
ctx = namespaces.WithNamespace(ctx, in.Namespace)
client, err := containerd.New(defaults.DefaultAddress)
client, err := containerd.New(constants.ContainerdAddress)
if err != nil {
return nil, err
}
@ -267,15 +259,16 @@ func (r *Registrator) Dmesg(ctx context.Context, in *empty.Empty) (data *proto.D
// Logs implements the proto.OSDServer interface. Service or container logs can
// be requested and the contents of the log file are streamed in chunks.
// nolint: gocyclo
func (r *Registrator) Logs(req *proto.LogsRequest, l proto.OSD_LogsServer) (err error) {
var (
client *containerd.Client
containers []containerProc
ctx context.Context
task *tasks.GetResponse
client *containerd.Client
ctx context.Context
pods []*pod
task *tasks.GetResponse
)
containers, err = containerID(req.Namespace)
pods, err = podInfo(req.Namespace)
if err != nil {
return err
}
@ -286,32 +279,39 @@ func (r *Registrator) Logs(req *proto.LogsRequest, l proto.OSD_LogsServer) (err
// nolint: errcheck
defer client.Close()
for _, container := range containers {
if container.Name != req.Id {
continue
}
for _, containers := range pods {
for _, container := range containers.Containers {
if container.Display != req.Id {
continue
}
task, err = client.TaskService().Get(ctx, &tasks.GetRequest{ContainerID: req.Id})
if err != nil {
return
}
if container.LogFile == "" {
task, err = client.TaskService().Get(ctx, &tasks.GetRequest{ContainerID: container.ID})
if err != nil {
return
}
var file *os.File
file, err = os.OpenFile(task.Process.Stdout, os.O_RDONLY, 0)
if err != nil {
return
}
chunk := filechunker.NewChunker(file)
container.LogFile = task.Process.Stdout
}
if chunk == nil {
err = errors.New("no log reader found")
return
}
for data := range chunk.Read(l.Context()) {
if err = l.Send(&proto.Data{Bytes: data}); err != nil {
var file *os.File
file, err = os.OpenFile(container.LogFile, os.O_RDONLY, 0)
if err != nil {
return
}
chunk := filechunker.NewChunker(file)
if chunk == nil {
err = errors.New("no log reader found")
return
}
for data := range chunk.Read(l.Context()) {
if err = l.Send(&proto.Data{Bytes: data}); err != nil {
return
}
}
}
}

View File

@ -4,6 +4,8 @@
package constants
import "github.com/containerd/containerd/defaults"
const (
// KernelParamUserData is the kernel parameter name for specifying the URL
// to the user data.
@ -109,3 +111,8 @@ const (
// nolint: golint
SYSLOG_ACTION_READ_ALL = 3
)
// Containerd
const (
ContainerdAddress = defaults.DefaultAddress
)