feat: support lvm auto activation

Support lvm auto-activation as per
https://man7.org/linux/man-pages/man7/lvmautoactivation.7.html.

This changes from how Talos previously used to unconditionally tried to
activate all volume groups to based on udev events.

Fixes: #9300

Signed-off-by: Noel Georgi <git@frezbo.dev>
(cherry picked from commit d8ab4981b626ff41fbcdb526a032a5584519e3df)
This commit is contained in:
Noel Georgi 2024-09-18 16:06:51 +05:30 committed by Andrey Smirnov
parent 4d44677f41
commit 70d3c91fb7
No known key found for this signature in database
GPG Key ID: FE042E3D4085A811
10 changed files with 695 additions and 410 deletions

View File

@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-09-12T16:43:46Z by kres 8be5fa7.
# Generated on 2024-09-20T00:40:37Z by kres 8be5fa7.
name: default
concurrency:
@ -2771,7 +2771,7 @@ jobs:
- name: e2e-qemu
env:
IMAGE_REGISTRY: registry.dev.siderolabs.io
QEMU_EXTRA_DISKS: "2"
QEMU_EXTRA_DISKS: "3"
QEMU_EXTRA_DISKS_DRIVERS: ide,nvme
QEMU_EXTRA_DISKS_SIZE: "10240"
WITH_CONFIG_PATCH_WORKER: '@hack/test/patches/ephemeral-nvme.yaml'

View File

@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-09-09T12:40:13Z by kres 8be5fa7.
# Generated on 2024-09-21T09:26:00Z by kres 8be5fa7.
name: integration-extensions-cron
concurrency:

View File

@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2024-09-09T12:40:13Z by kres 8be5fa7.
# Generated on 2024-09-21T09:26:00Z by kres 8be5fa7.
name: integration-qemu-cron
concurrency:
@ -81,7 +81,7 @@ jobs:
- name: e2e-qemu
env:
IMAGE_REGISTRY: registry.dev.siderolabs.io
QEMU_EXTRA_DISKS: "2"
QEMU_EXTRA_DISKS: "3"
QEMU_EXTRA_DISKS_DRIVERS: ide,nvme
QEMU_EXTRA_DISKS_SIZE: "10240"
WITH_CONFIG_PATCH_WORKER: '@hack/test/patches/ephemeral-nvme.yaml'

View File

@ -329,7 +329,7 @@ spec:
withSudo: true
environment:
IMAGE_REGISTRY: registry.dev.siderolabs.io
QEMU_EXTRA_DISKS: "2"
QEMU_EXTRA_DISKS: "3"
QEMU_EXTRA_DISKS_SIZE: "10240"
QEMU_EXTRA_DISKS_DRIVERS: "ide,nvme"
WITH_CONFIG_PATCH_WORKER: "@hack/test/patches/ephemeral-nvme.yaml"

View File

@ -0,0 +1,176 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package block
import (
"context"
"fmt"
"strings"
"github.com/cosi-project/runtime/pkg/controller"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/cosi-project/runtime/pkg/state"
"github.com/hashicorp/go-multierror"
"github.com/siderolabs/gen/optional"
"github.com/siderolabs/go-cmd/pkg/cmd"
"go.uber.org/zap"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/resources/block"
runtimeres "github.com/siderolabs/talos/pkg/machinery/resources/runtime"
"github.com/siderolabs/talos/pkg/machinery/resources/v1alpha1"
)
// LVMActivationController activates LVM volumes when they are discovered by the block.DiscoveryController.
type LVMActivationController struct {
seenVolumes map[string]struct{}
activatedVGs map[string]struct{}
}
// Name implements controller.Controller interface.
func (ctrl *LVMActivationController) Name() string {
return "block.LVMActivationController"
}
// Inputs implements controller.Controller interface.
func (ctrl *LVMActivationController) Inputs() []controller.Input {
return []controller.Input{
{
Namespace: v1alpha1.NamespaceName,
Type: runtimeres.MountStatusType,
ID: optional.Some(constants.EphemeralPartitionLabel),
Kind: controller.InputWeak,
},
{
Namespace: block.NamespaceName,
Type: block.DiscoveredVolumeType,
Kind: controller.InputWeak,
},
}
}
// Outputs implements controller.Controller interface.
func (ctrl *LVMActivationController) Outputs() []controller.Output {
return nil
}
// Run implements controller.Controller interface.
//
//nolint:gocyclo
func (ctrl *LVMActivationController) Run(ctx context.Context, r controller.Runtime, logger *zap.Logger) error {
if ctrl.seenVolumes == nil {
ctrl.seenVolumes = make(map[string]struct{})
}
if ctrl.activatedVGs == nil {
ctrl.activatedVGs = make(map[string]struct{})
}
for {
select {
case <-ctx.Done():
return nil
case <-r.EventCh():
}
if _, err := safe.ReaderGetByID[*runtimeres.MountStatus](ctx, r, constants.EphemeralPartitionLabel); err != nil {
if state.IsNotFoundError(err) {
// wait for the mount status to be available
continue
}
return fmt.Errorf("failed to get mount status: %w", err)
}
discoveredVolumes, err := safe.ReaderListAll[*block.DiscoveredVolume](ctx, r)
if err != nil {
return fmt.Errorf("failed to list discovered volumes: %w", err)
}
var multiErr error
for iterator := discoveredVolumes.Iterator(); iterator.Next(); {
if _, ok := ctrl.seenVolumes[iterator.Value().Metadata().ID()]; ok {
continue
}
if iterator.Value().TypedSpec().Name != "lvm2-pv" {
ctrl.seenVolumes[iterator.Value().Metadata().ID()] = struct{}{}
continue
}
logger.Info("checking device for LVM volume activation", zap.String("device", iterator.Value().TypedSpec().DevPath))
vgName, err := ctrl.checkVGNeedsActivation(ctx, iterator.Value().TypedSpec().DevPath)
if err != nil {
multiErr = multierror.Append(multiErr, err)
continue
}
if vgName == "" {
ctrl.seenVolumes[iterator.Value().Metadata().ID()] = struct{}{}
continue
}
if _, ok := ctrl.activatedVGs[vgName]; ok {
continue
}
logger.Info("activating LVM volume", zap.String("name", vgName))
// activate the volume group
if _, err = cmd.RunContext(ctx,
"/sbin/lvm",
"vgchange",
"-aay",
"--autoactivation",
"event",
vgName,
); err != nil {
return fmt.Errorf("failed to activate LVM volume %s: %w", vgName, err)
}
ctrl.activatedVGs[vgName] = struct{}{}
}
if multiErr != nil {
return multiErr
}
}
}
// checkVGNeedsActivation checks if the device is part of a volume group and returns the volume group name
// if it needs to be activated, otherwise it returns an empty string.
func (ctrl *LVMActivationController) checkVGNeedsActivation(ctx context.Context, devicePath string) (string, error) {
// first we check if all associated volumes are available
// https://man7.org/linux/man-pages/man7/lvmautoactivation.7.html
stdOut, err := cmd.RunContext(ctx,
"/sbin/lvm",
"pvscan",
"--cache",
"--verbose",
"--listvg",
"--checkcomplete",
"--vgonline",
"--autoactivation",
"event",
"--udevoutput",
devicePath,
)
if err != nil {
return "", fmt.Errorf("failed to check if LVM volume backed by device %s needs activation: %w", devicePath, err)
}
if strings.HasPrefix(stdOut, "LVM_VG_NAME_INCOMPLETE") {
return "", nil
}
vgName := strings.TrimSuffix(strings.TrimPrefix(strings.TrimSuffix(stdOut, "\n"), "LVM_VG_NAME_COMPLETE='"), "'")
return vgName, nil
}

View File

@ -93,6 +93,7 @@ func (ctrl *Controller) Run(ctx context.Context, drainer *runtime.Drainer) error
},
&block.DiscoveryController{},
&block.DisksController{},
&block.LVMActivationController{},
&block.SystemDiskController{},
&block.UserDiskConfigController{},
&block.VolumeConfigController{},

View File

@ -8,7 +8,6 @@ package api
import (
"context"
"crypto/rand"
"fmt"
"io"
"net"
@ -36,7 +35,7 @@ import (
"github.com/siderolabs/talos/pkg/machinery/resources/network"
)
// ExtensionsSuiteQEMU verifies Talos is securebooted.
// ExtensionsSuiteQEMU verifies Talos extensions on QEMU.
type ExtensionsSuiteQEMU struct {
base.K8sSuite
@ -146,134 +145,126 @@ func (suite *ExtensionsSuiteQEMU) TestExtensionsISCSI() {
ctx := client.WithNode(suite.ctx, node)
iscsiTargetExists := func() bool {
var iscsiTargetExists bool
iscsiCreatePodDef, err := suite.NewPodOp("iscsi-create", "kube-system")
suite.Require().NoError(err)
disks, err := safe.ReaderListAll[*block.Disk](ctx, suite.Client.COSI)
suite.Require().NoError(err)
suite.Require().NoError(iscsiCreatePodDef.Create(suite.ctx, 5*time.Minute))
for iter := disks.Iterator(); iter.Next(); {
if iter.Value().TypedSpec().Transport == "iscsi" {
iscsiTargetExists = true
defer iscsiCreatePodDef.Delete(suite.ctx) //nolint:errcheck
break
}
}
reader, err := suite.Client.Read(ctx, "/system/iscsi/initiatorname.iscsi")
suite.Require().NoError(err)
return iscsiTargetExists
}
defer reader.Close() //nolint:errcheck
if !iscsiTargetExists() {
_, err := suite.Clientset.CoreV1().Pods("kube-system").Create(suite.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "iscsi-test",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "iscsi-test",
Image: "alpine",
Command: []string{
"tail",
"-f",
"/dev/null",
},
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.To(true),
},
},
},
HostNetwork: true,
HostPID: true,
},
}, metav1.CreateOptions{})
defer suite.Clientset.CoreV1().Pods("kube-system").Delete(suite.ctx, "iscsi-test", metav1.DeleteOptions{}) //nolint:errcheck
body, err := io.ReadAll(reader)
suite.Require().NoError(err)
suite.Require().NoError(err)
initiatorName := strings.TrimPrefix(strings.TrimSpace(string(body)), "InitiatorName=")
// wait for the pod to be ready
suite.Require().NoError(suite.WaitForPodToBeRunning(suite.ctx, 5*time.Minute, "kube-system", "iscsi-test"))
stdout, stderr, err := iscsiCreatePodDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- tgtadm --lld iscsi --op new --mode target --tid 1 -T %s", initiatorName),
)
suite.Require().NoError(err)
reader, err := suite.Client.Read(ctx, "/system/iscsi/initiatorname.iscsi")
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Equal("", stdout)
defer reader.Close() //nolint:errcheck
stdout, stderr, err = iscsiCreatePodDef.Exec(
suite.ctx,
"dd if=/dev/zero of=/proc/$(pgrep tgtd)/root/var/run/tgtd/iscsi.disk bs=1M count=100",
)
suite.Require().NoError(err)
body, err := io.ReadAll(reader)
suite.Require().NoError(err)
suite.Require().Contains(stderr, "100+0 records in\n100+0 records out\n")
suite.Require().Equal("", stdout)
initiatorName := strings.TrimPrefix(strings.TrimSpace(string(body)), "InitiatorName=")
stdout, stderr, err = iscsiCreatePodDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- tgtadm --lld iscsi --op new --mode logicalunit --tid 1 --lun 1 -b /var/run/tgtd/iscsi.disk",
)
suite.Require().NoError(err)
stdout, stderr, err := suite.ExecuteCommandInPod(
suite.Require().Equal("", stderr)
suite.Require().Equal("", stdout)
stdout, stderr, err = iscsiCreatePodDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- tgtadm --lld iscsi --op bind --mode target --tid 1 -I ALL",
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Equal("", stdout)
stdout, stderr, err = iscsiCreatePodDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/$(pgrep iscsid)/ns/mnt --net=/proc/$(pgrep iscsid)/ns/net -- iscsiadm --mode discovery --type sendtargets --portal %s:3260", node),
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Equal(fmt.Sprintf("%s:3260,1 %s\n", node, initiatorName), stdout)
stdout, stderr, err = iscsiCreatePodDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/$(pgrep iscsid)/ns/mnt --net=/proc/$(pgrep iscsid)/ns/net -- iscsiadm --mode node --targetname %s --portal %s:3260 --login", initiatorName, node),
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Contains(stdout, "successful.")
defer func() {
stdout, stderr, err = iscsiCreatePodDef.Exec(
suite.ctx,
"kube-system",
"iscsi-test",
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- tgtadm --lld iscsi --op new --mode target --tid 1 -T %s", initiatorName),
fmt.Sprintf("nsenter --mount=/proc/$(pgrep iscsid)/ns/mnt --net=/proc/$(pgrep iscsid)/ns/net -- iscsiadm --mode node --targetname %s --portal %s:3260 --logout", initiatorName, node),
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
stdout, stderr, err = iscsiCreatePodDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- tgtadm --lld iscsi --op delete --mode logicalunit --tid 1 --lun 1",
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Equal("", stdout)
stdout, stderr, err = suite.ExecuteCommandInPod(
stdout, stderr, err = iscsiCreatePodDef.Exec(
suite.ctx,
"kube-system",
"iscsi-test",
"/bin/sh -c 'dd if=/dev/zero of=/proc/$(pgrep tgtd)/root/var/run/tgtd/iscsi.disk bs=1M count=100'",
"nsenter --mount=/proc/1/ns/mnt -- tgtadm --lld iscsi --op delete --mode target --tid 1",
)
suite.Require().NoError(err)
suite.Require().Contains(stderr, "100+0 records in\n100+0 records out\n")
suite.Require().Equal("", stdout)
stdout, stderr, err = suite.ExecuteCommandInPod(
suite.ctx,
"kube-system",
"iscsi-test",
"nsenter --mount=/proc/1/ns/mnt -- tgtadm --lld iscsi --op new --mode logicalunit --tid 1 --lun 1 -b /var/run/tgtd/iscsi.disk",
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Equal("", stdout)
stdout, stderr, err = suite.ExecuteCommandInPod(
suite.ctx,
"kube-system",
"iscsi-test",
"nsenter --mount=/proc/1/ns/mnt -- tgtadm --lld iscsi --op bind --mode target --tid 1 -I ALL",
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Equal("", stdout)
stdout, stderr, err = suite.ExecuteCommandInPod(
suite.ctx,
"kube-system",
"iscsi-test",
fmt.Sprintf("/bin/sh -c 'nsenter --mount=/proc/$(pgrep iscsid)/ns/mnt --net=/proc/$(pgrep iscsid)/ns/net -- iscsiadm --mode discovery --type sendtargets --portal %s:3260'", node),
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Equal(fmt.Sprintf("%s:3260,1 %s\n", node, initiatorName), stdout)
stdout, stderr, err = suite.ExecuteCommandInPod(
suite.ctx,
"kube-system",
"iscsi-test",
fmt.Sprintf("/bin/sh -c 'nsenter --mount=/proc/$(pgrep iscsid)/ns/mnt --net=/proc/$(pgrep iscsid)/ns/net -- iscsiadm --mode node --targetname %s --portal %s:3260 --login'", initiatorName, node),
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Contains(stdout, "successful.")
}
}()
suite.Eventually(func() bool {
return iscsiTargetExists()
}, 5*time.Second, 1*time.Second)
return suite.iscsiTargetExists()
}, 5*time.Second, 1*time.Second, "expected iscsi target to exist")
}
func (suite *ExtensionsSuiteQEMU) iscsiTargetExists() bool {
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
ctx := client.WithNode(suite.ctx, node)
disks, err := safe.ReaderListAll[*block.Disk](ctx, suite.Client.COSI)
suite.Require().NoError(err)
for iter := disks.Iterator(); iter.Next(); {
if iter.Value().TypedSpec().Transport == "iscsi" {
return true
}
}
return false
}
// TestExtensionsNutClient verifies nut client is working.
@ -448,59 +439,55 @@ func (suite *ExtensionsSuiteQEMU) TestExtensionsStargz() {
func (suite *ExtensionsSuiteQEMU) TestExtensionsMdADM() {
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
var mdADMArrayExists bool
userDisks, err := suite.UserDisks(suite.ctx, node)
suite.Require().NoError(err)
uuid := suite.mdADMScan()
if uuid != "" {
mdADMArrayExists = true
}
suite.Require().GreaterOrEqual(len(userDisks), 2, "expected at least two user disks to be available")
if !mdADMArrayExists {
userDisks, err := suite.UserDisks(suite.ctx, node, 4)
userDisksJoined := strings.Join(userDisks[:2], " ")
mdAdmCreatePodDef, err := suite.NewPodOp("mdadm-create", "kube-system")
suite.Require().NoError(err)
suite.Require().NoError(mdAdmCreatePodDef.Create(suite.ctx, 5*time.Minute))
defer mdAdmCreatePodDef.Delete(suite.ctx) //nolint:errcheck
stdout, _, err := mdAdmCreatePodDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- mdadm --create /dev/md/testmd --raid-devices=2 --metadata=1.2 --level=1 %s", userDisksJoined),
)
suite.Require().NoError(err)
suite.Require().Contains(stdout, "mdadm: array /dev/md/testmd started.")
defer func() {
hostNameStatus, err := safe.StateGetByID[*network.HostnameStatus](client.WithNode(suite.ctx, node), suite.Client.COSI, "hostname")
suite.Require().NoError(err)
suite.Require().GreaterOrEqual(len(userDisks), 2, "expected at least two user disks with size greater than 4GB to be available")
_, err = suite.Clientset.CoreV1().Pods("kube-system").Create(suite.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "mdadm-create",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "mdadm-create",
Image: "alpine",
Command: []string{
"tail",
"-f",
"/dev/null",
},
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.To(true),
},
},
},
HostNetwork: true,
HostPID: true,
},
}, metav1.CreateOptions{})
defer suite.Clientset.CoreV1().Pods("kube-system").Delete(suite.ctx, "mdadm-create", metav1.DeleteOptions{}) //nolint:errcheck
hostname := hostNameStatus.TypedSpec().Hostname
deletePodDef, err := suite.NewPodOp("mdadm-destroy", "kube-system")
suite.Require().NoError(err)
// wait for the pod to be ready
suite.Require().NoError(suite.WaitForPodToBeRunning(suite.ctx, 5*time.Minute, "kube-system", "mdadm-create"))
suite.Require().NoError(deletePodDef.Create(suite.ctx, 5*time.Minute))
_, stderr, err := suite.ExecuteCommandInPod(
defer deletePodDef.Delete(suite.ctx) //nolint:errcheck
if _, _, err := deletePodDef.Exec(
suite.ctx,
"kube-system",
"mdadm-create",
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- mdadm --create --verbose /dev/md0 --metadata=0.90 --level=1 --raid-devices=2 %s", strings.Join(userDisks[:2], " ")),
)
suite.Require().NoError(err)
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- mdadm --wait --stop /dev/md/%s:testmd", hostname),
); err != nil {
suite.T().Logf("failed to stop mdadm array: %v", err)
}
suite.Require().Contains(stderr, "mdadm: size set to")
}
if _, _, err := deletePodDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- mdadm --zero-superblock %s", userDisksJoined),
); err != nil {
suite.T().Logf("failed to remove md array backed by volumes %s: %v", userDisksJoined, err)
}
}()
// now we want to reboot the node and make sure the array is still mounted
suite.AssertRebooted(
@ -509,60 +496,24 @@ func (suite *ExtensionsSuiteQEMU) TestExtensionsMdADM() {
}, 5*time.Minute,
)
suite.Require().NotEmpty(suite.mdADMScan())
suite.Require().True(suite.mdADMArrayExists(), "expected mdadm array to be present")
}
func (suite *ExtensionsSuiteQEMU) mdADMScan() string {
// create a random suffix for the mdadm-scan pod
randomSuffix := make([]byte, 4)
_, err := rand.Read(randomSuffix)
func (suite *ExtensionsSuiteQEMU) mdADMArrayExists() bool {
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
ctx := client.WithNode(suite.ctx, node)
disks, err := safe.StateListAll[*block.Disk](ctx, suite.Client.COSI)
suite.Require().NoError(err)
podName := fmt.Sprintf("mdadm-scan-%x", randomSuffix)
for iterator := disks.Iterator(); iterator.Next(); {
if strings.HasPrefix(iterator.Value().TypedSpec().DevPath, "/dev/md") {
return true
}
}
_, err = suite.Clientset.CoreV1().Pods("kube-system").Create(suite.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: podName,
Image: "alpine",
Command: []string{
"tail",
"-f",
"/dev/null",
},
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.To(true),
},
},
},
HostNetwork: true,
HostPID: true,
},
}, metav1.CreateOptions{})
defer suite.Clientset.CoreV1().Pods("kube-system").Delete(suite.ctx, podName, metav1.DeleteOptions{}) //nolint:errcheck
suite.Require().NoError(err)
// wait for the pod to be ready
suite.Require().NoError(suite.WaitForPodToBeRunning(suite.ctx, 5*time.Minute, "kube-system", podName))
stdout, stderr, err := suite.ExecuteCommandInPod(
suite.ctx,
"kube-system",
podName,
"nsenter --mount=/proc/1/ns/mnt -- mdadm --detail --scan",
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
stdOutSplit := strings.Split(stdout, " ")
return strings.TrimPrefix(stdOutSplit[len(stdOutSplit)-1], "UUID=")
return false
}
// TestExtensionsZFS verifies zfs is working, udev rules work and the pool is mounted on reboot.
@ -570,148 +521,60 @@ func (suite *ExtensionsSuiteQEMU) TestExtensionsZFS() {
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
suite.AssertServicesRunning(suite.ctx, node, map[string]string{"ext-zpool-importer": "Finished"})
ctx := client.WithNode(suite.ctx, node)
var zfsPoolExists bool
_, err := suite.Clientset.CoreV1().Pods("kube-system").Create(suite.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "zpool-list",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "zpool-list",
Image: "alpine",
Command: []string{
"tail",
"-f",
"/dev/null",
},
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.To(true),
},
},
},
HostNetwork: true,
HostPID: true,
},
}, metav1.CreateOptions{})
defer suite.Clientset.CoreV1().Pods("kube-system").Delete(suite.ctx, "zpool-list", metav1.DeleteOptions{}) //nolint:errcheck
userDisks, err := suite.UserDisks(suite.ctx, node)
suite.Require().NoError(err)
// wait for the pod to be ready
suite.Require().NoError(suite.WaitForPodToBeRunning(suite.ctx, 5*time.Minute, "kube-system", "zpool-list"))
suite.Require().NotEmpty(userDisks, "expected at least one user disks to be available")
stdout, stderr, err := suite.ExecuteCommandInPod(
zfsPodDef, err := suite.NewPodOp("zpool-create", "kube-system")
suite.Require().NoError(err)
suite.Require().NoError(zfsPodDef.Create(suite.ctx, 5*time.Minute))
defer zfsPodDef.Delete(suite.ctx) //nolint:errcheck
stdout, stderr, err := zfsPodDef.Exec(
suite.ctx,
"kube-system",
"zpool-list",
"nsenter --mount=/proc/1/ns/mnt -- zpool list",
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- zpool create -m /var/tank tank %s", userDisks[0]),
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().NotEmpty(stdout)
suite.Require().Equal("", stdout)
if stdout != "no pools available\n" {
zfsPoolExists = true
}
stdout, stderr, err = zfsPodDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- zfs create -V 1gb tank/vol",
)
suite.Require().NoError(err)
if !zfsPoolExists {
userDisks, err := suite.UserDisks(suite.ctx, node, 4)
suite.Require().Equal("", stderr)
suite.Require().Equal("", stdout)
defer func() {
deletePodDef, err := suite.NewPodOp("zpool-destroy", "kube-system")
suite.Require().NoError(err)
suite.Require().NotEmpty(userDisks, "expected at least one user disk with size greater than 4GB to be available")
suite.Require().NoError(deletePodDef.Create(suite.ctx, 5*time.Minute))
_, err = suite.Clientset.CoreV1().Pods("kube-system").Create(suite.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "zpool-create",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "zpool-create",
Image: "alpine",
Command: []string{
"tail",
"-f",
"/dev/null",
},
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.To(true),
},
},
},
HostNetwork: true,
HostPID: true,
},
}, metav1.CreateOptions{})
defer suite.Clientset.CoreV1().Pods("kube-system").Delete(suite.ctx, "zpool-create", metav1.DeleteOptions{}) //nolint:errcheck
defer deletePodDef.Delete(suite.ctx) //nolint:errcheck
suite.Require().NoError(err)
// wait for the pod to be ready
suite.Require().NoError(suite.WaitForPodToBeRunning(suite.ctx, 5*time.Minute, "kube-system", "zpool-create"))
stdout, stderr, err := suite.ExecuteCommandInPod(
if _, _, err := deletePodDef.Exec(
suite.ctx,
"kube-system",
"zpool-create",
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- zpool create -m /var/tank tank %s", userDisks[0]),
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Equal("", stdout)
stdout, stderr, err = suite.ExecuteCommandInPod(
suite.ctx,
"kube-system",
"zpool-create",
"nsenter --mount=/proc/1/ns/mnt -- zfs create -V 1gb tank/vol",
)
suite.Require().NoError(err)
suite.Require().Equal("", stderr)
suite.Require().Equal("", stdout)
}
checkZFSPoolMounted := func() bool {
mountsResp, err := suite.Client.Mounts(ctx)
suite.Require().NoError(err)
for _, msg := range mountsResp.Messages {
for _, stats := range msg.Stats {
if stats.MountedOn == "/var/tank" {
return true
}
}
"nsenter --mount=/proc/1/ns/mnt -- zfs destroy tank/vol",
); err != nil {
suite.T().Logf("failed to remove zfs dataset tank/vol: %v", err)
}
return false
}
if _, _, err := deletePodDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- zpool destroy tank",
); err != nil {
suite.T().Logf("failed to remove zpool tank: %v", err)
}
}()
checkZFSVolumePathPopulatedByUdev := func() {
// this is the path that udev will populate, which is a symlink to the actual device
path := "/dev/zvol/tank/vol"
stream, err := suite.Client.LS(ctx, &machineapi.ListRequest{
Root: path,
})
suite.Require().NoError(err)
suite.Require().NoError(helpers.ReadGRPCStream(stream, func(info *machineapi.FileInfo, node string, multipleNodes bool) error {
suite.Require().Equal("/dev/zd0", info.Name, "expected %s to exist", path)
return nil
}))
}
suite.Require().True(checkZFSPoolMounted())
checkZFSVolumePathPopulatedByUdev()
suite.Require().True(suite.checkZFSPoolMounted(), "expected zfs pool to be mounted")
// now we want to reboot the node and make sure the pool is still mounted
suite.AssertRebooted(
@ -720,46 +583,37 @@ func (suite *ExtensionsSuiteQEMU) TestExtensionsZFS() {
}, 5*time.Minute,
)
suite.Require().True(checkZFSPoolMounted())
checkZFSVolumePathPopulatedByUdev()
suite.Require().True(suite.checkZFSPoolMounted(), "expected zfs pool to be mounted")
}
func (suite *ExtensionsSuiteQEMU) checkZFSPoolMounted() bool {
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
ctx := client.WithNode(suite.ctx, node)
disks, err := safe.StateListAll[*block.Disk](ctx, suite.Client.COSI)
suite.Require().NoError(err)
for iterator := disks.Iterator(); iterator.Next(); {
if strings.HasPrefix(iterator.Value().TypedSpec().DevPath, "/dev/zd") {
return true
}
}
return false
}
// TestExtensionsUtilLinuxTools verifies util-linux-tools are working.
func (suite *ExtensionsSuiteQEMU) TestExtensionsUtilLinuxTools() {
_, err := suite.Clientset.CoreV1().Pods("kube-system").Create(suite.ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "util-linux-tools-test",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "util-linux-tools-test",
Image: "alpine",
Command: []string{
"tail",
"-f",
"/dev/null",
},
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.To(true),
},
},
},
HostNetwork: true,
HostPID: true,
},
}, metav1.CreateOptions{})
defer suite.Clientset.CoreV1().Pods("kube-system").Delete(suite.ctx, "util-linux-tools-test", metav1.DeleteOptions{}) //nolint:errcheck
utilLinuxPodDef, err := suite.NewPodOp("util-linux-tools-test", "kube-system")
suite.Require().NoError(err)
// wait for the pod to be ready
suite.Require().NoError(suite.WaitForPodToBeRunning(suite.ctx, 10*time.Minute, "kube-system", "util-linux-tools-test"))
suite.Require().NoError(utilLinuxPodDef.Create(suite.ctx, 5*time.Minute))
stdout, stderr, err := suite.ExecuteCommandInPod(
defer utilLinuxPodDef.Delete(suite.ctx) //nolint:errcheck
stdout, stderr, err := utilLinuxPodDef.Exec(
suite.ctx,
"kube-system",
"util-linux-tools-test",
"nsenter --mount=/proc/1/ns/mnt -- /usr/local/sbin/fstrim --version",
)
suite.Require().NoError(err)

View File

@ -8,18 +8,22 @@ package api
import (
"context"
"fmt"
"strings"
"testing"
"time"
"github.com/cosi-project/runtime/pkg/safe"
"github.com/siderolabs/talos/internal/integration/base"
"github.com/siderolabs/talos/pkg/machinery/client"
"github.com/siderolabs/talos/pkg/machinery/config/machine"
"github.com/siderolabs/talos/pkg/machinery/resources/block"
)
// VolumesSuite ...
type VolumesSuite struct {
base.APISuite
base.K8sSuite
ctx context.Context //nolint:containedctx
ctxCancel context.CancelFunc
@ -175,6 +179,105 @@ func (suite *VolumesSuite) TestDisks() {
}
}
// TestLVMActivation verifies that LVM volume group is activated after reboot.
func (suite *VolumesSuite) TestLVMActivation() {
if testing.Short() {
suite.T().Skip("skipping test in short mode.")
}
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
userDisks, err := suite.UserDisks(suite.ctx, node)
suite.Require().NoError(err)
suite.Require().GreaterOrEqual(len(userDisks), 2, "expected at least two user disks to be available")
userDisksJoined := strings.Join(userDisks[:2], " ")
podDef, err := suite.NewPodOp("pv-create", "kube-system")
suite.Require().NoError(err)
suite.Require().NoError(podDef.Create(suite.ctx, 5*time.Minute))
defer podDef.Delete(suite.ctx) //nolint:errcheck
stdout, _, err := podDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- vgcreate vg0 %s", userDisksJoined),
)
suite.Require().NoError(err)
suite.Require().Contains(stdout, "Volume group \"vg0\" successfully created")
stdout, _, err = podDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- lvcreate -n lv0 -L 1G vg0",
)
suite.Require().NoError(err)
suite.Require().Contains(stdout, "Logical volume \"lv0\" created.")
stdout, _, err = podDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- lvcreate -n lv1 -L 1G vg0",
)
suite.Require().NoError(err)
suite.Require().Contains(stdout, "Logical volume \"lv1\" created.")
defer func() {
deletePodDef, err := suite.NewPodOp("pv-destroy", "kube-system")
suite.Require().NoError(err)
suite.Require().NoError(deletePodDef.Create(suite.ctx, 5*time.Minute))
defer deletePodDef.Delete(suite.ctx) //nolint:errcheck
if _, _, err := deletePodDef.Exec(
suite.ctx,
"nsenter --mount=/proc/1/ns/mnt -- vgremove --yes vg0",
); err != nil {
suite.T().Logf("failed to remove pv vg0: %v", err)
}
if _, _, err := deletePodDef.Exec(
suite.ctx,
fmt.Sprintf("nsenter --mount=/proc/1/ns/mnt -- pvremove --yes %s", userDisksJoined),
); err != nil {
suite.T().Logf("failed to remove pv backed by volumes %s: %v", userDisksJoined, err)
}
}()
// now we want to reboot the node and make sure the array is still mounted
suite.AssertRebooted(
suite.ctx, node, func(nodeCtx context.Context) error {
return base.IgnoreGRPCUnavailable(suite.Client.Reboot(nodeCtx))
}, 5*time.Minute,
)
suite.Require().True(suite.lvmVolumeExists(), "LVM volume group was not activated after reboot")
}
func (suite *VolumesSuite) lvmVolumeExists() bool {
node := suite.RandomDiscoveredNodeInternalIP(machine.TypeWorker)
ctx := client.WithNode(suite.ctx, node)
disks, err := safe.StateListAll[*block.Disk](ctx, suite.Client.COSI)
suite.Require().NoError(err)
var lvmVolumeCount int
for iterator := disks.Iterator(); iterator.Next(); {
if strings.HasPrefix(iterator.Value().TypedSpec().DevPath, "/dev/dm") {
lvmVolumeCount++
}
}
// we test with creating a volume group with two logical volumes
return lvmVolumeCount == 2
}
func init() {
allSuites = append(allSuites, new(VolumesSuite))
}

View File

@ -28,13 +28,11 @@ import (
"google.golang.org/grpc/codes"
"gopkg.in/yaml.v3"
"github.com/siderolabs/talos/cmd/talosctl/pkg/talos/helpers"
"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/pkg/cluster"
"github.com/siderolabs/talos/pkg/cluster/check"
"github.com/siderolabs/talos/pkg/machinery/api/common"
machineapi "github.com/siderolabs/talos/pkg/machinery/api/machine"
"github.com/siderolabs/talos/pkg/machinery/api/storage"
"github.com/siderolabs/talos/pkg/machinery/client"
clientconfig "github.com/siderolabs/talos/pkg/machinery/client/config"
"github.com/siderolabs/talos/pkg/machinery/config"
@ -44,6 +42,7 @@ import (
"github.com/siderolabs/talos/pkg/machinery/config/machine"
"github.com/siderolabs/talos/pkg/machinery/config/types/v1alpha1"
"github.com/siderolabs/talos/pkg/machinery/constants"
"github.com/siderolabs/talos/pkg/machinery/resources/block"
configres "github.com/siderolabs/talos/pkg/machinery/resources/config"
runtimeres "github.com/siderolabs/talos/pkg/machinery/resources/runtime"
"github.com/siderolabs/talos/pkg/provision"
@ -467,69 +466,40 @@ func (apiSuite *APISuite) ReadConfigFromNode(nodeCtx context.Context) (config.Pr
}
// UserDisks returns list of user disks on with size greater than sizeGreaterThanGB and not having any partitions present.
//
//nolint:gocyclo
func (apiSuite *APISuite) UserDisks(ctx context.Context, node string, sizeGreaterThanGB int) ([]string, error) {
nodeCtx := client.WithNodes(ctx, node)
func (apiSuite *APISuite) UserDisks(ctx context.Context, node string) ([]string, error) {
nodeCtx := client.WithNode(ctx, node)
resp, err := apiSuite.Client.Disks(nodeCtx)
disks, err := safe.ReaderListAll[*block.Disk](nodeCtx, apiSuite.Client.COSI)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to list disks: %w", err)
}
var disks []string
var candidateDisks []string
blockDeviceInUse := func(deviceName string) (bool, error) {
devicePart := strings.Split(deviceName, "/dev/")[1]
for iterator := disks.Iterator(); iterator.Next(); {
// skip CD-ROM, readonly and disks witho for iteratorut transport (this is usually lvms, md, zfs devices etc)
// also skip iscsi disks (these are created in tests)
if iterator.Value().TypedSpec().Readonly || iterator.Value().TypedSpec().CDROM || iterator.Value().TypedSpec().Transport == "" || iterator.Value().TypedSpec().Transport == "iscsi" {
continue
}
// https://unix.stackexchange.com/questions/111779/how-to-find-out-easily-whether-a-block-device-or-a-part-of-it-is-mounted-someh
// this was the only easy way I could find to check if the block device is already in use by something like raid
stream, err := apiSuite.Client.LS(nodeCtx, &machineapi.ListRequest{
Root: fmt.Sprintf("/sys/block/%s/holders", devicePart),
})
candidateDisks = append(candidateDisks, iterator.Value().Metadata().ID())
}
var availableDisks []string
for _, disk := range candidateDisks {
discoveredVolume, err := safe.ReaderGetByID[*block.DiscoveredVolume](nodeCtx, apiSuite.Client.COSI, disk)
if err != nil {
return false, err
return nil, fmt.Errorf("failed to get discovered volume: %w", err)
}
counter := 0
if err = helpers.ReadGRPCStream(stream, func(info *machineapi.FileInfo, node string, multipleNodes bool) error {
counter++
return nil
}); err != nil {
return false, err
}
if counter > 1 {
return true, nil
}
return false, nil
}
for _, msg := range resp.Messages {
for _, disk := range msg.Disks {
if disk.SystemDisk || disk.Readonly || disk.Type == storage.Disk_CD {
continue
}
if disk.BusPath == "/virtual" {
continue
}
blockDeviceUsed, err := blockDeviceInUse(disk.DeviceName)
if err != nil {
return nil, err
}
if disk.Size > uint64(sizeGreaterThanGB)*1024*1024*1024 && !blockDeviceUsed {
disks = append(disks, disk.DeviceName)
}
if discoveredVolume.TypedSpec().Name == "" {
availableDisks = append(availableDisks, discoveredVolume.TypedSpec().DevPath)
}
}
return disks, nil
return availableDisks, nil
}
// AssertServicesRunning verifies that services are running on the node.

View File

@ -10,6 +10,7 @@ import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
@ -21,6 +22,7 @@ import (
"time"
"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-pointer"
"github.com/siderolabs/go-retry/retry"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
@ -196,6 +198,185 @@ func (k8sSuite *K8sSuite) WaitForEventExists(ctx context.Context, ns string, che
})
}
type podInfo interface {
Name() string
Create(ctx context.Context, waitTimeout time.Duration) error
Delete(ctx context.Context) error
Exec(ctx context.Context, command string) (string, string, error)
}
type pod struct {
name string
namespace string
client *kubernetes.Clientset
restConfig *rest.Config
logF func(format string, args ...any)
}
func (p *pod) Name() string {
return p.name
}
func (p *pod) Create(ctx context.Context, waitTimeout time.Duration) error {
_, err := p.client.CoreV1().Pods(p.namespace).Create(ctx, &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: p.name,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: p.name,
Image: "alpine",
Command: []string{
"/bin/sh",
"-c",
"--",
},
Args: []string{
"trap : TERM INT; (tail -f /dev/null) & wait",
},
SecurityContext: &corev1.SecurityContext{
Privileged: pointer.To(true),
},
// lvm commands even though executed in the host mount namespace, still need access to /dev 🤷🏼,
// otherwise lvcreate commands hangs on semop syscall
VolumeMounts: []corev1.VolumeMount{
{
Name: "dev",
MountPath: "/dev",
},
},
},
},
Volumes: []corev1.Volume{
{
Name: "dev",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/dev",
},
},
},
},
HostNetwork: true,
HostIPC: true,
HostPID: true,
},
}, metav1.CreateOptions{})
if err != nil {
return err
}
return p.waitForRunning(ctx, waitTimeout)
}
func (p *pod) Exec(ctx context.Context, command string) (string, string, error) {
cmd := []string{
"/bin/sh",
"-c",
command,
}
req := p.client.CoreV1().RESTClient().Post().Resource("pods").Name(p.name).
Namespace(p.namespace).SubResource("exec")
option := &corev1.PodExecOptions{
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}
req.VersionedParams(
option,
scheme.ParameterCodec,
)
exec, err := remotecommand.NewSPDYExecutor(p.restConfig, "POST", req.URL())
if err != nil {
return "", "", err
}
var stdout, stderr strings.Builder
err = exec.StreamWithContext(ctx, remotecommand.StreamOptions{
Stdout: &stdout,
Stderr: &stderr,
})
if err != nil {
p.logF(
"error executing command in pod %s/%s: %v\n\ncommand %q stdout:\n%s\n\ncommand %q stderr:\n%s",
p.namespace,
p.name,
err,
command,
stdout.String(),
command,
stderr.String(),
)
}
return stdout.String(), stderr.String(), err
}
func (p *pod) Delete(ctx context.Context) error {
return p.client.CoreV1().Pods(p.namespace).Delete(ctx, p.name, metav1.DeleteOptions{})
}
func (p *pod) waitForRunning(ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
watcher, err := p.client.CoreV1().Pods(p.namespace).Watch(ctx, metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector("metadata.name", p.name).String(),
})
if err != nil {
return err
}
defer watcher.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case event := <-watcher.ResultChan():
if event.Type == watch.Error {
return fmt.Errorf("error watching pod: %v", event.Object)
}
pod, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}
if pod.Name == p.name && pod.Status.Phase == corev1.PodRunning {
return nil
}
}
}
}
// NewPodOp creates a new pod operation with the given name and namespace.
func (k8sSuite *K8sSuite) NewPodOp(name, namespace string) (podInfo, error) {
randomSuffix := make([]byte, 4)
if _, err := rand.Read(randomSuffix); err != nil {
return nil, fmt.Errorf("failed to generate random suffix: %w", err)
}
return &pod{
name: fmt.Sprintf("%s-%x", name, randomSuffix),
namespace: namespace,
client: k8sSuite.Clientset,
restConfig: k8sSuite.RestConfig,
logF: k8sSuite.T().Logf,
}, nil
}
// WaitForPodToBeRunning waits for the pod with the given namespace and name to be running.
func (k8sSuite *K8sSuite) WaitForPodToBeRunning(ctx context.Context, timeout time.Duration, namespace, podName string) error {
ctx, cancel := context.WithTimeout(ctx, timeout)