feat(init): add node join functionality (#38)

This commit is contained in:
Andrew Rynhard
2018-04-02 13:44:19 -07:00
committed by GitHub
parent f244075f84
commit 0251868146
6 changed files with 143 additions and 56 deletions

View File

@ -49,10 +49,6 @@ func main() {
if err := mount.Mount(); err != nil {
panic(err)
}
// Execute the user data.
if err := userdata.Execute(); err != nil {
panic(err)
}
// Move the initial file systems to the new root.
if err := mount.Move(); err != nil {
panic(err)
@ -68,17 +64,18 @@ func main() {
}
}
// Execute the user data.
data, err := userdata.Execute()
if err != nil {
panic(err)
}
// Start the processes essential to running Kubernetes.
processManager := process.NewManager()
if err := processManager.Start(&process.CRIO{}); err != nil {
panic(err)
}
if err := processManager.Start(&process.Kubeadm{}); err != nil {
panic(err)
}
if err := processManager.Start(&process.Kubelet{}); err != nil {
panic(err)
processManager := &process.Manager{
UserData: data,
}
processManager.Start(&process.CRIO{})
processManager.Start(&process.Kubeadm{})
processManager.Start(&process.Kubelet{})
// TODO: Authn/Authz.
// TODO: Errors API that admins can use to debug.

View File

@ -1,9 +1,11 @@
package process
import (
"fmt"
"io/ioutil"
"github.com/autonomy/dianemo/initramfs/src/init/pkg/process/conditions"
"github.com/autonomy/dianemo/initramfs/src/init/pkg/userdata"
)
const crioConf = `
@ -179,15 +181,18 @@ const crioPolicy = `
type CRIO struct{}
func init() {
func (p *CRIO) Pre(data userdata.UserData) error {
if err := ioutil.WriteFile("/etc/crio/crio.conf", []byte(crioConf), 0644); err != nil {
return fmt.Errorf("write crio.conf: %s", err.Error())
}
if err := ioutil.WriteFile("/etc/containers/policy.json", []byte(crioPolicy), 0644); err != nil {
return fmt.Errorf("write policy.json: %s", err.Error())
}
return nil
}
func (p *CRIO) Cmd() (name string, args []string) {
func (p *CRIO) Cmd(data userdata.UserData) (name string, args []string) {
name = "/bin/crio"
args = []string{
"--runtime=/bin/runc",

View File

@ -1,14 +1,20 @@
package process
import (
"bytes"
"fmt"
"io/ioutil"
"text/template"
"github.com/autonomy/dianemo/initramfs/src/init/pkg/process/conditions"
"github.com/autonomy/dianemo/initramfs/src/init/pkg/userdata"
)
const MasterConfiguration = `
kind: MasterConfiguration
apiVersion: kubeadm.k8s.io/v1alpha1
token: {{ .Token }}
TokenTTL: 0s
criSocket: /var/run/crio/crio.sock
skipTokenPrint: true
kubernetesVersion: v1.10.0
@ -23,19 +29,61 @@ featureGates:
DynamicKubeletConfig: true
`
const NodeConfiguration = `
kind: NodeConfiguration
apiVersion: kubeadm.k8s.io/v1alpha1
token: {{ .Token }}
criSocket: /var/run/crio/crio.sock
discoveryTokenAPIServers:
- {{ .APIServer }}
discoveryTokenUnsafeSkipCAVerification: true
nodeName: {{ .NodeName }}
`
type Kubeadm struct{}
func init() {
if err := ioutil.WriteFile("/etc/kubernetes/kubeadm.yaml", []byte(MasterConfiguration), 0644); err != nil {
var cmd string
func (p *Kubeadm) Pre(data userdata.UserData) error {
var configuration string
if data.Join {
configuration = NodeConfiguration
} else {
configuration = MasterConfiguration
}
tmpl, err := template.New("").Parse(configuration)
if err != nil {
return err
}
var buf []byte
writer := bytes.NewBuffer(buf)
err = tmpl.Execute(writer, data)
if err != nil {
return err
}
if err := ioutil.WriteFile("/etc/kubernetes/kubeadm.yaml", writer.Bytes(), 0644); err != nil {
return fmt.Errorf("write kubeadm.yaml: %s", err.Error())
}
// TODO: "modprobe -a ip_vs ip_vs_rr ip_vs_wrr ip_vs_sh nf_conntrack_ipv4"
return nil
}
func (p *Kubeadm) Cmd() (name string, args []string) {
func (p *Kubeadm) Cmd(data userdata.UserData) (name string, args []string) {
var cmd string
if data.Join {
cmd = "join"
} else {
cmd = "init"
}
name = "/bin/kubeadm"
args = []string{
"init",
cmd,
"--config=/etc/kubernetes/kubeadm.yaml",
"--ignore-preflight-errors=cri",
}
return name, args

View File

@ -1,19 +1,30 @@
package process
import (
"fmt"
"os"
"github.com/autonomy/dianemo/initramfs/src/init/pkg/process/conditions"
"github.com/autonomy/dianemo/initramfs/src/init/pkg/userdata"
)
type Kubelet struct{}
func init() {
os.Mkdir("/run/flannel", os.ModeDir)
os.MkdirAll("/etc/cni/net.d", os.ModeDir)
func (p *Kubelet) Pre(data userdata.UserData) error {
if err := os.Mkdir("/run/flannel", os.ModeDir); err != nil {
return fmt.Errorf("create /run/flannel: %s", err.Error())
}
if err := os.MkdirAll("/etc/cni/net.d", os.ModeDir); err != nil {
return fmt.Errorf("create /etc/cni/net.d: %s", err.Error())
}
if err := os.MkdirAll("/etc/kubernetes/manifests", os.ModeDir); err != nil {
return fmt.Errorf("create /etc/kubernetes/manifests: %s", err.Error())
}
return nil
}
func (p *Kubelet) Cmd() (name string, args []string) {
func (p *Kubelet) Cmd(data userdata.UserData) (name string, args []string) {
name = "/bin/kubelet"
args = []string{
"--container-runtime=remote",
@ -38,11 +49,19 @@ func (p *Kubelet) Cmd() (name string, args []string) {
"--v=4",
}
if data.Join {
labels := "--node-labels="
for k, v := range data.Labels {
labels += k + "=" + v + ","
}
args = append(args, labels)
}
return name, args
}
func (p *Kubelet) Condition() func() (bool, error) {
return conditions.WaitForFileExists("/etc/kubernetes/kubelet.conf")
return conditions.WaitForFileExists("/etc/containers/policy.json")
}
func (p *Kubelet) Env() []string { return []string{} }

View File

@ -9,6 +9,7 @@ import (
"github.com/autonomy/dianemo/initramfs/src/init/pkg/constants"
logstream "github.com/autonomy/dianemo/initramfs/src/init/pkg/log"
"github.com/autonomy/dianemo/initramfs/src/init/pkg/userdata"
)
type Type int
@ -20,20 +21,19 @@ const (
)
type Process interface {
Cmd() (string, []string)
Pre(userdata.UserData) error
Cmd(userdata.UserData) (string, []string)
Condition() func() (bool, error)
Env() []string
Type() Type
}
type Manager struct{}
func NewManager() *Manager {
return &Manager{}
type Manager struct {
UserData userdata.UserData
}
func (m *Manager) build(proc Process) (*exec.Cmd, error) {
name, args := proc.Cmd()
name, args := proc.Cmd(m.UserData)
cmd := exec.Command(name, args...)
// Set the environment for the process.
cmd.Env = append(proc.Env(), fmt.Sprintf("PATH=%s", constants.PATH))
@ -47,34 +47,43 @@ func (m *Manager) build(proc Process) (*exec.Cmd, error) {
return cmd, nil
}
func (m *Manager) Start(proc Process) error {
func (m *Manager) Start(proc Process) {
go func(proc Process) {
err := proc.Pre(m.UserData)
if err != nil {
log.Printf("pre: %s", err.Error())
}
satisfied, err := proc.Condition()()
if !satisfied || err != nil {
if err != nil {
// TODO: Write the error to the log writer.
log.Printf("condition: %s", err.Error())
}
if !satisfied {
log.Printf("condition not satisfied")
return
}
// Wait for the command to exit. Then, based on the process Type, take
// the appropriate actions.
switch proc.Type() {
case Forever:
m.waitAndRestart(proc)
if err := m.waitAndRestart(proc); err != nil {
log.Printf("run: %s", err.Error())
}
case Once:
m.waitForSuccess(proc)
if err := m.waitForSuccess(proc); err != nil {
log.Printf("run: %s", err.Error())
}
}
}(proc)
return nil
}
func (m *Manager) waitAndRestart(proc Process) {
func (m *Manager) waitAndRestart(proc Process) error {
cmd, err := m.build(proc)
if err != nil {
return
return err
}
if err := cmd.Start(); err != nil {
log.Println(err.Error())
return
return err
}
state, err := cmd.Process.Wait()
if err != nil {
@ -82,18 +91,19 @@ func (m *Manager) waitAndRestart(proc Process) {
}
if state.Exited() {
time.Sleep(5 * time.Second)
m.waitAndRestart(proc)
return m.waitAndRestart(proc)
}
return nil
}
func (m *Manager) waitForSuccess(proc Process) {
func (m *Manager) waitForSuccess(proc Process) error {
cmd, err := m.build(proc)
if err != nil {
return
return err
}
if err := cmd.Start(); err != nil {
log.Println(err.Error())
return
return err
}
state, err := cmd.Process.Wait()
if err != nil {
@ -101,6 +111,8 @@ func (m *Manager) waitForSuccess(proc Process) {
}
if !state.Success() {
time.Sleep(5 * time.Second)
m.waitForSuccess(proc)
return m.waitForSuccess(proc)
}
return nil
}

View File

@ -12,31 +12,37 @@ import (
// UserData represents the user data.
type UserData struct {
Version string `yaml:"version"`
Version string `yaml:"version"`
Token string `yaml:"token"`
Join bool `yaml:"join,omitempty"`
APIServer string `yaml:"apiServer,omitempty"`
NodeName string `yaml:"nodeName,omitempty"`
Labels map[string]string `yaml:"labels,omitempty"`
}
// Execute downloads the user data and executes the instructions.
func Execute() error {
func Execute() (UserData, error) {
userData := UserData{}
arguments, err := kernel.ParseProcCmdline()
if err != nil {
return fmt.Errorf("parse /proc/cmdline: %s", err.Error())
return userData, fmt.Errorf("parse kernel parameters: %s", err.Error())
}
url, ok := arguments[constants.UserDataURLFlag]
if !ok {
return nil
return userData, nil
}
userDataBytes, err := download(url)
if err != nil {
return fmt.Errorf("download user data: %s", err.Error())
return userData, fmt.Errorf("download user data: %s", err.Error())
}
userData := &UserData{}
if err := yaml.Unmarshal(userDataBytes, userData); err != nil {
return fmt.Errorf("decode user data: %s", err.Error())
if err := yaml.Unmarshal(userDataBytes, &userData); err != nil {
return userData, fmt.Errorf("decode user data: %s", err.Error())
}
return nil
return userData, nil
}
func download(url string) ([]byte, error) {