fix(osd): Fix osctl ps output (#554)

Signed-off-by: Brad Beam <brad.beam@talos-systems.com>
This commit is contained in:
Brad Beam 2019-04-17 08:51:19 -05:00 committed by GitHub
parent 7cbc177a59
commit 46bdf2371c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 158 additions and 50 deletions

View File

@ -0,0 +1,113 @@
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package reg
import (
"context"
"log"
"path"
"strings"
"github.com/containerd/containerd"
"github.com/containerd/containerd/defaults"
"github.com/containerd/containerd/namespaces"
)
type containerProc struct {
Name string // Friendly name
ID string // container sha/id
Digest string // Container Digest
Status string // Running state of container
Pid uint32
Container containerd.Container
Context context.Context
}
func connect(namespace string) (*containerd.Client, context.Context, error) {
client, err := containerd.New(defaults.DefaultAddress)
return client, namespaces.WithNamespace(context.Background(), namespace), err
}
func containerID(namespace string) ([]containerProc, error) {
client, ctx, err := connect(namespace)
if err != nil {
return nil, err
}
// nolint: errcheck
defer client.Close()
containers, err := client.Containers(ctx)
if err != nil {
return nil, err
}
cps := make([]containerProc, len(containers))
for _, container := range containers {
cp := containerProc{}
info, err := container.Info(ctx)
if err != nil {
log.Println(err)
continue
}
img, err := container.Image(ctx)
if err != nil {
log.Println(err)
continue
}
task, err := container.Task(ctx, nil)
if err != nil {
log.Println(err)
continue
}
status, err := task.Status(ctx)
if err != nil {
log.Println(err)
continue
}
cp.ID = container.ID()
cp.Name = container.ID()
cp.Digest = img.Target().Digest.String()
cp.Container = container
cp.Context = ctx
cp.Pid = task.Pid()
cp.Status = strings.ToUpper(string(status.Status))
if _, ok := info.Labels["io.kubernetes.pod.name"]; ok {
cp.Name = path.Join(info.Labels["io.kubernetes.pod.namespace"], info.Labels["io.kubernetes.pod.name"])
}
cps = append(cps, cp)
}
return cps, nil
}
func images(namespace string) (map[string]string, error) {
client, ctx, err := connect(namespace)
if err != nil {
return nil, err
}
// nolint: errcheck
defer client.Close()
images, err := client.ListImages(ctx, "")
if err != nil {
return nil, err
}
// create a map[sha]name for easier lookups later
imageList := make(map[string]string, len(images))
for _, image := range images {
imageList[image.Target().Digest.String()] = image.Name()
}
return imageList, nil
}

View File

@ -9,7 +9,6 @@ import (
"io/ioutil"
"log"
"os"
"strings"
"github.com/containerd/cgroups"
"github.com/containerd/containerd"
@ -62,16 +61,12 @@ func (r *Registrator) Kubeconfig(ctx context.Context, in *empty.Empty) (data *pr
// Processes implements the proto.OSDServer interface.
// nolint: gocyclo
func (r *Registrator) Processes(ctx context.Context, in *proto.ProcessesRequest) (reply *proto.ProcessesReply, err error) {
client, err := containerd.New(defaults.DefaultAddress)
imageList, err := images(in.Namespace)
if err != nil {
return nil, err
}
// nolint: errcheck
defer client.Close()
ctx = namespaces.WithNamespace(ctx, in.Namespace)
containers, err := client.Containers(ctx)
containers, err := containerID(in.Namespace)
if err != nil {
return nil, err
}
@ -79,52 +74,33 @@ func (r *Registrator) Processes(ctx context.Context, in *proto.ProcessesRequest)
processes := []*proto.Process{}
for _, container := range containers {
task, err := container.Task(ctx, nil)
if err != nil {
log.Println(err)
if container.Name == "" {
continue
}
image, err := container.Image(ctx)
if err != nil {
log.Println(err)
continue
}
status, err := task.Status(ctx)
if err != nil {
log.Println(err)
continue
}
process := &proto.Process{
Namespace: in.Namespace,
Id: container.ID(),
Image: image.Name(),
Pid: task.Pid(),
Status: strings.ToUpper(string(status.Status)),
Id: container.Name,
Image: imageList[container.Digest],
Pid: container.Pid,
Status: container.Status,
}
processes = append(processes, process)
}
reply = &proto.ProcessesReply{Processes: processes}
return &proto.ProcessesReply{Processes: processes}, nil
return reply, nil
}
// Stats implements the proto.OSDServer interface.
// nolint: gocyclo
func (r *Registrator) Stats(ctx context.Context, in *proto.StatsRequest) (reply *proto.StatsReply, err error) {
client, err := containerd.New(defaults.DefaultAddress)
client, _, err := connect(in.Namespace)
if err != nil {
return nil, err
}
// nolint: errcheck
defer client.Close()
ctx = namespaces.WithNamespace(ctx, in.Namespace)
containers, err := client.Containers(ctx)
if err != nil {
return nil, err
@ -292,32 +268,51 @@ func (r *Registrator) Dmesg(ctx context.Context, in *empty.Empty) (data *proto.D
// Logs implements the proto.OSDServer interface. Service or container logs can
// be requested and the contents of the log file are streamed in chunks.
func (r *Registrator) Logs(req *proto.LogsRequest, l proto.OSD_LogsServer) (err error) {
ctx := namespaces.WithNamespace(context.Background(), req.Namespace)
client, err := containerd.New(defaults.DefaultAddress)
var (
client *containerd.Client
containers []containerProc
ctx context.Context
task *tasks.GetResponse
)
containers, err = containerID(req.Namespace)
if err != nil {
return err
}
client, ctx, err = connect(req.Namespace)
if err != nil {
return err
}
// nolint: errcheck
defer client.Close()
task, err := client.TaskService().Get(ctx, &tasks.GetRequest{ContainerID: req.Id})
if err != nil {
return err
}
file, _err := os.OpenFile(task.Process.Stdout, os.O_RDONLY, 0)
if _err != nil {
err = _err
return
}
chunk := filechunker.NewChunker(file)
if chunk == nil {
return errors.New("no log reader found")
}
for _, container := range containers {
if container.Name != req.Id {
continue
}
for data := range chunk.Read(l.Context()) {
if err = l.Send(&proto.Data{Bytes: data}); err != nil {
task, err = client.TaskService().Get(ctx, &tasks.GetRequest{ContainerID: req.Id})
if err != nil {
return
}
var file *os.File
file, err = os.OpenFile(task.Process.Stdout, os.O_RDONLY, 0)
if err != nil {
return
}
chunk := filechunker.NewChunker(file)
if chunk == nil {
err = errors.New("no log reader found")
return
}
for data := range chunk.Read(l.Context()) {
if err = l.Send(&proto.Data{Bytes: data}); err != nil {
return
}
}
}
return nil