rbd: add volume healer

Problem:
-------
For rbd nbd userspace mounter backends, after a restart of the nodeplugin
all the mounts will start seeing IO errors. This is because, for rbd-nbd
backends there will be a userspace mount daemon running per volume, post
restart of the nodeplugin pod, there is no way to restore the daemons
back to life.

Solution:
--------
The volume healer is a one-time activity that is triggered at the startup
time of the rbd nodeplugin. It navigates through the list of volume
attachments on the node and acts accordingly.

For now, it is limited to nbd type storage only, but it is flexible and
can be extended in the future for other backend types as needed.

From a few feets above:
This solves a severe problem for nbd backed csi volumes. The healer while
going through the list of volume attachments on the node, if finds the
volume is in attached state and is of type nbd, then it will attempt to
fix the rbd-nbd volumes by sending a NodeStageVolume request with the
required volume attributes like secrets, device name, image attributes,
and etc.. which will finally help start the required rbd-nbd daemons in
the nodeplugin csi-rbdplugin container. This will allow reattaching the
backend images with the right nbd device, thus allowing the applications
to perform IO without any interruptions even after a nodeplugin restart.

Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
This commit is contained in:
Prasanna Kumar Kalever 2021-05-31 16:43:54 +05:30 committed by mergify[bot]
parent 6007fc9bfe
commit b6a88dd728
8 changed files with 302 additions and 25 deletions

View File

@ -66,6 +66,7 @@ spec:
args:
- "--nodeid=$(NODE_ID)"
- "--pluginpath={{ .Values.kubeletDir }}/plugins"
- "--stagingpath={{ .Values.kubeletDir }}/plugins/kubernetes.io/csi/pv/"
- "--type=rbd"
- "--nodeserver=true"
- "--pidlimit=-1"

View File

@ -49,7 +49,8 @@ const (
// use default namespace if namespace is not set.
defaultNS = "default"
defaultPluginPath = "/var/lib/kubelet/plugins"
defaultPluginPath = "/var/lib/kubelet/plugins"
defaultStagingPath = defaultPluginPath + "/kubernetes.io/csi/pv/"
)
var conf util.Config
@ -62,6 +63,7 @@ func init() {
flag.StringVar(&conf.DriverNamespace, "drivernamespace", defaultNS, "namespace in which driver is deployed")
flag.StringVar(&conf.NodeID, "nodeid", "", "node id")
flag.StringVar(&conf.PluginPath, "pluginpath", defaultPluginPath, "plugin path")
flag.StringVar(&conf.StagingPath, "stagingpath", defaultStagingPath, "staging path")
flag.StringVar(&conf.InstanceID, "instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+
" instances, when sharing Ceph clusters across CSI instances for provisioning")
flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups")

View File

@ -52,6 +52,7 @@ spec:
args:
- "--nodeid=$(NODE_ID)"
- "--pluginpath=/var/lib/kubelet/plugins"
- "--stagingpath=/var/lib/kubelet/plugins/kubernetes.io/csi/pv/"
- "--type=rbd"
- "--nodeserver=true"
- "--endpoint=$(CSI_ENDPOINT)"

View File

@ -199,5 +199,14 @@ func (r *Driver) Run(conf *util.Config) {
util.DebugLogMsg("Registering profiling handler")
go util.EnableProfiling()
}
if conf.IsNodeServer {
go func() {
// TODO: move the healer to csi-addons
err := runVolumeHealer(r.ns, conf)
if err != nil {
util.ErrorLogMsg("healer had failures, err %v\n", err)
}
}()
}
s.Wait()
}

View File

@ -96,6 +96,20 @@ var (
xfsHasReflink = xfsReflinkUnset
)
// isHealerContext checks if the request is been made from volumeHealer.
func isHealerContext(parameters map[string]string) bool {
var err error
healerContext := false
val, ok := parameters["volumeHealerContext"]
if ok {
if healerContext, err = strconv.ParseBool(val); err != nil {
return false
}
}
return healerContext
}
// isStaticVolume checks if the volume is static.
func isStaticVolume(parameters map[string]string) bool {
var err error
@ -110,6 +124,26 @@ func isStaticVolume(parameters map[string]string) bool {
return staticVol
}
// healerStageTransaction attempts to attach the rbd Image with previously
// updated device path at stashFile.
func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *rbdVolume, metaDataPath string) error {
imgInfo, err := lookupRBDImageMetadataStash(metaDataPath)
if err != nil {
util.ErrorLog(ctx, "failed to find image metadata, at stagingPath: %s, err: %v", metaDataPath, err)
return err
}
if imgInfo.DevicePath == "" {
return fmt.Errorf("device is empty in image metadata, at stagingPath: %s", metaDataPath)
}
var devicePath string
devicePath, err = attachRBDImage(ctx, volOps, imgInfo.DevicePath, cr)
if err != nil {
return err
}
util.DebugLog(ctx, "rbd volID: %s was successfully attached to device: %s", volOps.VolID, devicePath)
return nil
}
// NodeStageVolume mounts the volume to a staging path on the node.
// Implementation notes:
// - stagingTargetPath is the directory passed in the request where the volume needs to be staged
@ -124,6 +158,7 @@ func isStaticVolume(parameters map[string]string) bool {
// - Create the staging file/directory under staging path
// - Stage the device (mount the device mapped for image)
// TODO: make this function less complex.
// nolint:gocyclo // reduce complexity
func (ns *NodeServer) NodeStageVolume(
ctx context.Context,
req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
@ -168,13 +203,17 @@ func (ns *NodeServer) NodeStageVolume(
stagingParentPath := req.GetStagingTargetPath()
stagingTargetPath := stagingParentPath + "/" + volID
// check if stagingPath is already mounted
isNotMnt, err := isNotMountPoint(ns.mounter, stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if !isNotMnt {
util.DebugLog(ctx, "rbd: volume %s is already mounted to %s, skipping", volID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
isHealer := isHealerContext(req.GetVolumeContext())
if !isHealer {
var isNotMnt bool
// check if stagingPath is already mounted
isNotMnt, err = isNotMountPoint(ns.mounter, stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if !isNotMnt {
util.DebugLog(ctx, "rbd: volume %s is already mounted to %s, skipping", volID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
}
isStaticVol := isStaticVolume(req.GetVolumeContext())
@ -222,12 +261,26 @@ func (ns *NodeServer) NodeStageVolume(
}
volOptions.VolID = volID
transaction := stageTransaction{}
volOptions.MapOptions = req.GetVolumeContext()["mapOptions"]
volOptions.UnmapOptions = req.GetVolumeContext()["unmapOptions"]
volOptions.Mounter = req.GetVolumeContext()["mounter"]
err = volOptions.Connect(cr)
if err != nil {
util.ErrorLog(ctx, "failed to connect to volume %s: %v", volOptions, err)
return nil, status.Error(codes.Internal, err.Error())
}
defer volOptions.Destroy()
if isHealer {
err = healerStageTransaction(ctx, cr, volOptions, stagingParentPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.NodeStageVolumeResponse{}, nil
}
transaction := stageTransaction{}
// Stash image details prior to mapping the image (useful during Unstage as it has no
// voloptions passed to the RPC as per the CSI spec)
err = stashRBDImageMetadata(volOptions, stagingParentPath)
@ -250,7 +303,7 @@ func (ns *NodeServer) NodeStageVolume(
util.DebugLog(
ctx,
"rbd: successfully mounted volume %s to stagingTargetPath %s",
req.GetVolumeId(),
volID,
stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
@ -275,13 +328,6 @@ func (ns *NodeServer) stageTransaction(
}
defer cr.DeleteCredentials()
err = volOptions.Connect(cr)
if err != nil {
util.ErrorLog(ctx, "failed to connect to volume %v: %v", volOptions.RbdImageName, err)
return transaction, err
}
defer volOptions.Destroy()
// Allow image to be mounted on multiple nodes if it is ROX
if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY {
util.ExtendedLog(ctx, "setting disableInUseChecks on rbd volume to: %v", req.GetVolumeId)
@ -314,11 +360,12 @@ func (ns *NodeServer) stageTransaction(
}
// Mapping RBD image
var devicePath string
devicePath, err = attachRBDImage(ctx, volOptions, cr)
devicePath, err = attachRBDImage(ctx, volOptions, devicePath, cr)
if err != nil {
return transaction, err
}
transaction.devicePath = devicePath
util.DebugLog(ctx, "rbd image: %s/%s was successfully mapped at %s\n",
req.GetVolumeId(), volOptions.Pool, devicePath)

View File

@ -47,6 +47,11 @@ const (
rbdUnmapCmdkRbdMissingMap = "rbd: %s: not a mapped image or snapshot"
rbdUnmapCmdNbdMissingMap = "rbd-nbd: %s is not mapped"
rbdMapConnectionTimeout = "Connection timed out"
defaultNbdReAttachTimeout = 300
useNbdNetlink = "try-netlink"
setNbdReattach = "reattach-timeout"
)
var hasNBD = false
@ -183,7 +188,7 @@ func checkRbdNbdTools() bool {
return true
}
func attachRBDImage(ctx context.Context, volOptions *rbdVolume, cr *util.Credentials) (string, error) {
func attachRBDImage(ctx context.Context, volOptions *rbdVolume, device string, cr *util.Credentials) (string, error) {
var err error
image := volOptions.RbdImageName
@ -205,7 +210,7 @@ func attachRBDImage(ctx context.Context, volOptions *rbdVolume, cr *util.Credent
if err != nil {
return "", err
}
devicePath, err = createPath(ctx, volOptions, cr)
devicePath, err = createPath(ctx, volOptions, device, cr)
}
return devicePath, err
@ -223,6 +228,13 @@ func appendDeviceTypeAndOptions(cmdArgs []string, isNbd, isThick bool, userOptio
// namespace (e.g. for Multus CNI). The network namespace must be
// owned by the initial user namespace.
cmdArgs = append(cmdArgs, "--options", "noudev")
} else {
if !strings.Contains(userOptions, useNbdNetlink) {
cmdArgs = append(cmdArgs, "--options", useNbdNetlink)
}
if !strings.Contains(userOptions, setNbdReattach) {
cmdArgs = append(cmdArgs, "--options", fmt.Sprintf("%s=%d", setNbdReattach, defaultNbdReAttachTimeout))
}
}
if isThick {
// When an image is thick-provisioned, any discard/unmap/trim
@ -238,7 +250,26 @@ func appendDeviceTypeAndOptions(cmdArgs []string, isNbd, isThick bool, userOptio
return cmdArgs
}
func createPath(ctx context.Context, volOpt *rbdVolume, cr *util.Credentials) (string, error) {
// appendRbdNbdCliOptions append mandatory options and convert list of useroptions
// provided for rbd integrated cli to rbd-nbd cli format specific.
func appendRbdNbdCliOptions(cmdArgs []string, userOptions string) []string {
if !strings.Contains(userOptions, useNbdNetlink) {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s", useNbdNetlink))
}
if !strings.Contains(userOptions, setNbdReattach) {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%d", setNbdReattach, defaultNbdReAttachTimeout))
}
if userOptions != "" {
options := strings.Split(userOptions, ",")
for _, opt := range options {
cmdArgs = append(cmdArgs, fmt.Sprintf("--%s", opt))
}
}
return cmdArgs
}
func createPath(ctx context.Context, volOpt *rbdVolume, device string, cr *util.Credentials) (string, error) {
isNbd := false
imagePath := volOpt.String()
@ -248,7 +279,6 @@ func createPath(ctx context.Context, volOpt *rbdVolume, cr *util.Credentials) (s
"--id", cr.ID,
"-m", volOpt.Monitors,
"--keyfile=" + cr.KeyFile,
"map", imagePath,
}
// Choose access protocol
@ -262,13 +292,23 @@ func createPath(ctx context.Context, volOpt *rbdVolume, cr *util.Credentials) (s
util.WarningLog(ctx, "failed to detect if image %q is thick-provisioned: %v", volOpt, err)
}
mapArgs = appendDeviceTypeAndOptions(mapArgs, isNbd, isThick, volOpt.MapOptions)
cli := rbd
if device != "" {
// TODO: use rbd cli for attach/detach in the future
cli = rbdNbdMounter
mapArgs = append(mapArgs, "attach", imagePath, "--device", device)
mapArgs = appendRbdNbdCliOptions(mapArgs, volOpt.MapOptions)
} else {
mapArgs = append(mapArgs, "map", imagePath)
mapArgs = appendDeviceTypeAndOptions(mapArgs, isNbd, isThick, volOpt.MapOptions)
}
if volOpt.readOnly {
mapArgs = append(mapArgs, "--read-only")
}
// Execute map
stdout, stderr, err := util.ExecCommand(ctx, rbd, mapArgs...)
stdout, stderr, err := util.ExecCommand(ctx, cli, mapArgs...)
if err != nil {
util.WarningLog(ctx, "rbd: map error %v, rbd output: %s", err, stderr)
// unmap rbd image if connection timeout

176
internal/rbd/rbd_healer.go Normal file
View File

@ -0,0 +1,176 @@
/*
Copyright 2021 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"context"
"github.com/ceph/ceph-csi/internal/util"
"github.com/container-storage-interface/spec/lib/go/csi"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8s "k8s.io/client-go/kubernetes"
)
const (
fsTypeBlockName = "block"
)
// accessModeStrToInt convert access mode type string to int32.
// Makesure to update this function as and when there are new modes introduced.
func accessModeStrToInt(mode v1.PersistentVolumeAccessMode) csi.VolumeCapability_AccessMode_Mode {
switch mode {
case v1.ReadWriteOnce:
return csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER
case v1.ReadOnlyMany:
return csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
case v1.ReadWriteMany:
return csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER
}
return csi.VolumeCapability_AccessMode_UNKNOWN
}
// getSecret get the secret details by name.
func getSecret(c *k8s.Clientset, ns, name string) (map[string]string, error) {
deviceSecret := make(map[string]string)
secret, err := c.CoreV1().Secrets(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
util.ErrorLogMsg("get secret failed, err: %v", err)
return nil, err
}
for k, v := range secret.Data {
deviceSecret[k] = string(v)
}
return deviceSecret, nil
}
func callNodeStageVolume(ns *NodeServer, c *k8s.Clientset, pv *v1.PersistentVolume, stagingPath string) error {
publishContext := make(map[string]string)
volID := pv.Spec.PersistentVolumeSource.CSI.VolumeHandle
stagingParentPath := stagingPath + pv.Name + "/globalmount"
util.DefaultLog("sending nodeStageVolume for volID: %s, stagingPath: %s",
volID, stagingParentPath)
deviceSecret, err := getSecret(c,
pv.Spec.PersistentVolumeSource.CSI.NodeStageSecretRef.Namespace,
pv.Spec.PersistentVolumeSource.CSI.NodeStageSecretRef.Name)
if err != nil {
util.ErrorLogMsg("getSecret failed for volID: %s, err: %v", volID, err)
return err
}
volumeContext := pv.Spec.PersistentVolumeSource.CSI.VolumeAttributes
volumeContext["volumeHealerContext"] = "true"
req := &csi.NodeStageVolumeRequest{
VolumeId: volID,
PublishContext: publishContext,
StagingTargetPath: stagingParentPath,
VolumeCapability: &csi.VolumeCapability{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: accessModeStrToInt(pv.Spec.AccessModes[0]),
},
},
Secrets: deviceSecret,
VolumeContext: volumeContext,
}
if pv.Spec.PersistentVolumeSource.CSI.FSType == fsTypeBlockName {
req.VolumeCapability.AccessType = &csi.VolumeCapability_Block{
Block: &csi.VolumeCapability_BlockVolume{},
}
} else {
req.VolumeCapability.AccessType = &csi.VolumeCapability_Mount{
Mount: &csi.VolumeCapability_MountVolume{
FsType: pv.Spec.PersistentVolumeSource.CSI.FSType,
MountFlags: pv.Spec.MountOptions,
},
}
}
_, err = ns.NodeStageVolume(context.TODO(), req)
if err != nil {
util.ErrorLogMsg("nodeStageVolume request failed, volID: %s, stagingPath: %s, err: %v",
volID, stagingParentPath, err)
return err
}
return nil
}
// runVolumeHealer heal the volumes attached on a node.
func runVolumeHealer(ns *NodeServer, conf *util.Config) error {
c := util.NewK8sClient()
val, err := c.StorageV1().VolumeAttachments().List(context.TODO(), metav1.ListOptions{})
if err != nil {
util.ErrorLogMsg("list volumeAttachments failed, err: %v", err)
return err
}
for i := range val.Items {
// skip if the volumeattachments doesn't belong to current node or driver
if val.Items[i].Spec.NodeName != conf.NodeID || val.Items[i].Spec.Attacher != conf.DriverName {
continue
}
pvName := *val.Items[i].Spec.Source.PersistentVolumeName
pv, err := c.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{})
if err != nil {
// skip if volume doesn't exist
if !apierrors.IsNotFound(err) {
util.ErrorLogMsg("get persistentVolumes failed for pv: %s, err: %v", pvName, err)
}
continue
}
// TODO: check with pv delete annotations, for eg: what happens when the pv is marked for delete
// skip this volumeattachment if its pv is not bound
if pv.Status.Phase != v1.VolumeBound {
continue
}
// skip if mounter is not rbd-nbd
if pv.Spec.PersistentVolumeSource.CSI.VolumeAttributes["mounter"] != "rbd-nbd" {
continue
}
// ensure that the volume is still in attached state
va, err := c.StorageV1().VolumeAttachments().Get(context.TODO(), val.Items[i].Name, metav1.GetOptions{})
if err != nil {
// skip if volume attachment doesn't exist
if !apierrors.IsNotFound(err) {
util.ErrorLogMsg("get volumeAttachments failed for volumeAttachment: %s, volID: %s, err: %v",
val.Items[i].Name, pv.Spec.PersistentVolumeSource.CSI.VolumeHandle, err)
}
continue
}
if !va.Status.Attached {
continue
}
err = callNodeStageVolume(ns, c, pv, conf.StagingPath)
if err != nil {
util.ErrorLogMsg("callNodeStageVolume failed for VolID: %s, err: %v",
pv.Spec.PersistentVolumeSource.CSI.VolumeHandle, err)
}
}
return nil
}

View File

@ -72,6 +72,7 @@ type Config struct {
NodeID string // node id
InstanceID string // unique ID distinguishing this instance of Ceph CSI
PluginPath string // location of cephcsi plugin
StagingPath string // location of cephcsi staging path
DomainLabels string // list of domain labels to read from the node
// metrics related flags