diff --git a/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml b/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml index 78fb6cbac..1371eb7ed 100644 --- a/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml +++ b/charts/ceph-csi-rbd/templates/nodeplugin-daemonset.yaml @@ -66,6 +66,7 @@ spec: args: - "--nodeid=$(NODE_ID)" - "--pluginpath={{ .Values.kubeletDir }}/plugins" + - "--stagingpath={{ .Values.kubeletDir }}/plugins/kubernetes.io/csi/pv/" - "--type=rbd" - "--nodeserver=true" - "--pidlimit=-1" diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index b22c17ecb..b97869824 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -49,7 +49,8 @@ const ( // use default namespace if namespace is not set. defaultNS = "default" - defaultPluginPath = "/var/lib/kubelet/plugins" + defaultPluginPath = "/var/lib/kubelet/plugins" + defaultStagingPath = defaultPluginPath + "/kubernetes.io/csi/pv/" ) var conf util.Config @@ -62,6 +63,7 @@ func init() { flag.StringVar(&conf.DriverNamespace, "drivernamespace", defaultNS, "namespace in which driver is deployed") flag.StringVar(&conf.NodeID, "nodeid", "", "node id") flag.StringVar(&conf.PluginPath, "pluginpath", defaultPluginPath, "plugin path") + flag.StringVar(&conf.StagingPath, "stagingpath", defaultStagingPath, "staging path") flag.StringVar(&conf.InstanceID, "instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+ " instances, when sharing Ceph clusters across CSI instances for provisioning") flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups") diff --git a/deploy/rbd/kubernetes/csi-rbdplugin.yaml b/deploy/rbd/kubernetes/csi-rbdplugin.yaml index a5d3f8b38..07b9969ae 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin.yaml @@ -52,6 +52,7 @@ spec: args: - "--nodeid=$(NODE_ID)" - "--pluginpath=/var/lib/kubelet/plugins" + - "--stagingpath=/var/lib/kubelet/plugins/kubernetes.io/csi/pv/" - "--type=rbd" - "--nodeserver=true" - "--endpoint=$(CSI_ENDPOINT)" diff --git a/internal/rbd/driver.go b/internal/rbd/driver.go index 9cd5c214f..27738f000 100644 --- a/internal/rbd/driver.go +++ b/internal/rbd/driver.go @@ -199,5 +199,14 @@ func (r *Driver) Run(conf *util.Config) { util.DebugLogMsg("Registering profiling handler") go util.EnableProfiling() } + if conf.IsNodeServer { + go func() { + // TODO: move the healer to csi-addons + err := runVolumeHealer(r.ns, conf) + if err != nil { + util.ErrorLogMsg("healer had failures, err %v\n", err) + } + }() + } s.Wait() } diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index c7e0380fd..0fb058b1e 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -96,6 +96,20 @@ var ( xfsHasReflink = xfsReflinkUnset ) +// isHealerContext checks if the request is been made from volumeHealer. +func isHealerContext(parameters map[string]string) bool { + var err error + healerContext := false + + val, ok := parameters["volumeHealerContext"] + if ok { + if healerContext, err = strconv.ParseBool(val); err != nil { + return false + } + } + return healerContext +} + // isStaticVolume checks if the volume is static. func isStaticVolume(parameters map[string]string) bool { var err error @@ -110,6 +124,26 @@ func isStaticVolume(parameters map[string]string) bool { return staticVol } +// healerStageTransaction attempts to attach the rbd Image with previously +// updated device path at stashFile. +func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *rbdVolume, metaDataPath string) error { + imgInfo, err := lookupRBDImageMetadataStash(metaDataPath) + if err != nil { + util.ErrorLog(ctx, "failed to find image metadata, at stagingPath: %s, err: %v", metaDataPath, err) + return err + } + if imgInfo.DevicePath == "" { + return fmt.Errorf("device is empty in image metadata, at stagingPath: %s", metaDataPath) + } + var devicePath string + devicePath, err = attachRBDImage(ctx, volOps, imgInfo.DevicePath, cr) + if err != nil { + return err + } + util.DebugLog(ctx, "rbd volID: %s was successfully attached to device: %s", volOps.VolID, devicePath) + return nil +} + // NodeStageVolume mounts the volume to a staging path on the node. // Implementation notes: // - stagingTargetPath is the directory passed in the request where the volume needs to be staged @@ -124,6 +158,7 @@ func isStaticVolume(parameters map[string]string) bool { // - Create the staging file/directory under staging path // - Stage the device (mount the device mapped for image) // TODO: make this function less complex. +// nolint:gocyclo // reduce complexity func (ns *NodeServer) NodeStageVolume( ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { @@ -168,13 +203,17 @@ func (ns *NodeServer) NodeStageVolume( stagingParentPath := req.GetStagingTargetPath() stagingTargetPath := stagingParentPath + "/" + volID - // check if stagingPath is already mounted - isNotMnt, err := isNotMountPoint(ns.mounter, stagingTargetPath) - if err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } else if !isNotMnt { - util.DebugLog(ctx, "rbd: volume %s is already mounted to %s, skipping", volID, stagingTargetPath) - return &csi.NodeStageVolumeResponse{}, nil + isHealer := isHealerContext(req.GetVolumeContext()) + if !isHealer { + var isNotMnt bool + // check if stagingPath is already mounted + isNotMnt, err = isNotMountPoint(ns.mounter, stagingTargetPath) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } else if !isNotMnt { + util.DebugLog(ctx, "rbd: volume %s is already mounted to %s, skipping", volID, stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil + } } isStaticVol := isStaticVolume(req.GetVolumeContext()) @@ -222,12 +261,26 @@ func (ns *NodeServer) NodeStageVolume( } volOptions.VolID = volID - transaction := stageTransaction{} - volOptions.MapOptions = req.GetVolumeContext()["mapOptions"] volOptions.UnmapOptions = req.GetVolumeContext()["unmapOptions"] volOptions.Mounter = req.GetVolumeContext()["mounter"] + err = volOptions.Connect(cr) + if err != nil { + util.ErrorLog(ctx, "failed to connect to volume %s: %v", volOptions, err) + return nil, status.Error(codes.Internal, err.Error()) + } + defer volOptions.Destroy() + + if isHealer { + err = healerStageTransaction(ctx, cr, volOptions, stagingParentPath) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.NodeStageVolumeResponse{}, nil + } + + transaction := stageTransaction{} // Stash image details prior to mapping the image (useful during Unstage as it has no // voloptions passed to the RPC as per the CSI spec) err = stashRBDImageMetadata(volOptions, stagingParentPath) @@ -250,7 +303,7 @@ func (ns *NodeServer) NodeStageVolume( util.DebugLog( ctx, "rbd: successfully mounted volume %s to stagingTargetPath %s", - req.GetVolumeId(), + volID, stagingTargetPath) return &csi.NodeStageVolumeResponse{}, nil @@ -275,13 +328,6 @@ func (ns *NodeServer) stageTransaction( } defer cr.DeleteCredentials() - err = volOptions.Connect(cr) - if err != nil { - util.ErrorLog(ctx, "failed to connect to volume %v: %v", volOptions.RbdImageName, err) - return transaction, err - } - defer volOptions.Destroy() - // Allow image to be mounted on multiple nodes if it is ROX if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY { util.ExtendedLog(ctx, "setting disableInUseChecks on rbd volume to: %v", req.GetVolumeId) @@ -314,11 +360,12 @@ func (ns *NodeServer) stageTransaction( } // Mapping RBD image var devicePath string - devicePath, err = attachRBDImage(ctx, volOptions, cr) + devicePath, err = attachRBDImage(ctx, volOptions, devicePath, cr) if err != nil { return transaction, err } transaction.devicePath = devicePath + util.DebugLog(ctx, "rbd image: %s/%s was successfully mapped at %s\n", req.GetVolumeId(), volOptions.Pool, devicePath) diff --git a/internal/rbd/rbd_attach.go b/internal/rbd/rbd_attach.go index 1e485208f..38144592b 100644 --- a/internal/rbd/rbd_attach.go +++ b/internal/rbd/rbd_attach.go @@ -47,6 +47,11 @@ const ( rbdUnmapCmdkRbdMissingMap = "rbd: %s: not a mapped image or snapshot" rbdUnmapCmdNbdMissingMap = "rbd-nbd: %s is not mapped" rbdMapConnectionTimeout = "Connection timed out" + + defaultNbdReAttachTimeout = 300 + + useNbdNetlink = "try-netlink" + setNbdReattach = "reattach-timeout" ) var hasNBD = false @@ -183,7 +188,7 @@ func checkRbdNbdTools() bool { return true } -func attachRBDImage(ctx context.Context, volOptions *rbdVolume, cr *util.Credentials) (string, error) { +func attachRBDImage(ctx context.Context, volOptions *rbdVolume, device string, cr *util.Credentials) (string, error) { var err error image := volOptions.RbdImageName @@ -205,7 +210,7 @@ func attachRBDImage(ctx context.Context, volOptions *rbdVolume, cr *util.Credent if err != nil { return "", err } - devicePath, err = createPath(ctx, volOptions, cr) + devicePath, err = createPath(ctx, volOptions, device, cr) } return devicePath, err @@ -223,6 +228,13 @@ func appendDeviceTypeAndOptions(cmdArgs []string, isNbd, isThick bool, userOptio // namespace (e.g. for Multus CNI). The network namespace must be // owned by the initial user namespace. cmdArgs = append(cmdArgs, "--options", "noudev") + } else { + if !strings.Contains(userOptions, useNbdNetlink) { + cmdArgs = append(cmdArgs, "--options", useNbdNetlink) + } + if !strings.Contains(userOptions, setNbdReattach) { + cmdArgs = append(cmdArgs, "--options", fmt.Sprintf("%s=%d", setNbdReattach, defaultNbdReAttachTimeout)) + } } if isThick { // When an image is thick-provisioned, any discard/unmap/trim @@ -238,7 +250,26 @@ func appendDeviceTypeAndOptions(cmdArgs []string, isNbd, isThick bool, userOptio return cmdArgs } -func createPath(ctx context.Context, volOpt *rbdVolume, cr *util.Credentials) (string, error) { +// appendRbdNbdCliOptions append mandatory options and convert list of useroptions +// provided for rbd integrated cli to rbd-nbd cli format specific. +func appendRbdNbdCliOptions(cmdArgs []string, userOptions string) []string { + if !strings.Contains(userOptions, useNbdNetlink) { + cmdArgs = append(cmdArgs, fmt.Sprintf("--%s", useNbdNetlink)) + } + if !strings.Contains(userOptions, setNbdReattach) { + cmdArgs = append(cmdArgs, fmt.Sprintf("--%s=%d", setNbdReattach, defaultNbdReAttachTimeout)) + } + if userOptions != "" { + options := strings.Split(userOptions, ",") + for _, opt := range options { + cmdArgs = append(cmdArgs, fmt.Sprintf("--%s", opt)) + } + } + + return cmdArgs +} + +func createPath(ctx context.Context, volOpt *rbdVolume, device string, cr *util.Credentials) (string, error) { isNbd := false imagePath := volOpt.String() @@ -248,7 +279,6 @@ func createPath(ctx context.Context, volOpt *rbdVolume, cr *util.Credentials) (s "--id", cr.ID, "-m", volOpt.Monitors, "--keyfile=" + cr.KeyFile, - "map", imagePath, } // Choose access protocol @@ -262,13 +292,23 @@ func createPath(ctx context.Context, volOpt *rbdVolume, cr *util.Credentials) (s util.WarningLog(ctx, "failed to detect if image %q is thick-provisioned: %v", volOpt, err) } - mapArgs = appendDeviceTypeAndOptions(mapArgs, isNbd, isThick, volOpt.MapOptions) + cli := rbd + if device != "" { + // TODO: use rbd cli for attach/detach in the future + cli = rbdNbdMounter + mapArgs = append(mapArgs, "attach", imagePath, "--device", device) + mapArgs = appendRbdNbdCliOptions(mapArgs, volOpt.MapOptions) + } else { + mapArgs = append(mapArgs, "map", imagePath) + mapArgs = appendDeviceTypeAndOptions(mapArgs, isNbd, isThick, volOpt.MapOptions) + } + if volOpt.readOnly { mapArgs = append(mapArgs, "--read-only") } // Execute map - stdout, stderr, err := util.ExecCommand(ctx, rbd, mapArgs...) + stdout, stderr, err := util.ExecCommand(ctx, cli, mapArgs...) if err != nil { util.WarningLog(ctx, "rbd: map error %v, rbd output: %s", err, stderr) // unmap rbd image if connection timeout diff --git a/internal/rbd/rbd_healer.go b/internal/rbd/rbd_healer.go new file mode 100644 index 000000000..25ae0eebc --- /dev/null +++ b/internal/rbd/rbd_healer.go @@ -0,0 +1,176 @@ +/* +Copyright 2021 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbd + +import ( + "context" + + "github.com/ceph/ceph-csi/internal/util" + + "github.com/container-storage-interface/spec/lib/go/csi" + v1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8s "k8s.io/client-go/kubernetes" +) + +const ( + fsTypeBlockName = "block" +) + +// accessModeStrToInt convert access mode type string to int32. +// Makesure to update this function as and when there are new modes introduced. +func accessModeStrToInt(mode v1.PersistentVolumeAccessMode) csi.VolumeCapability_AccessMode_Mode { + switch mode { + case v1.ReadWriteOnce: + return csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER + case v1.ReadOnlyMany: + return csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY + case v1.ReadWriteMany: + return csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER + } + return csi.VolumeCapability_AccessMode_UNKNOWN +} + +// getSecret get the secret details by name. +func getSecret(c *k8s.Clientset, ns, name string) (map[string]string, error) { + deviceSecret := make(map[string]string) + + secret, err := c.CoreV1().Secrets(ns).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + util.ErrorLogMsg("get secret failed, err: %v", err) + return nil, err + } + + for k, v := range secret.Data { + deviceSecret[k] = string(v) + } + + return deviceSecret, nil +} + +func callNodeStageVolume(ns *NodeServer, c *k8s.Clientset, pv *v1.PersistentVolume, stagingPath string) error { + publishContext := make(map[string]string) + + volID := pv.Spec.PersistentVolumeSource.CSI.VolumeHandle + stagingParentPath := stagingPath + pv.Name + "/globalmount" + + util.DefaultLog("sending nodeStageVolume for volID: %s, stagingPath: %s", + volID, stagingParentPath) + + deviceSecret, err := getSecret(c, + pv.Spec.PersistentVolumeSource.CSI.NodeStageSecretRef.Namespace, + pv.Spec.PersistentVolumeSource.CSI.NodeStageSecretRef.Name) + if err != nil { + util.ErrorLogMsg("getSecret failed for volID: %s, err: %v", volID, err) + return err + } + + volumeContext := pv.Spec.PersistentVolumeSource.CSI.VolumeAttributes + volumeContext["volumeHealerContext"] = "true" + + req := &csi.NodeStageVolumeRequest{ + VolumeId: volID, + PublishContext: publishContext, + StagingTargetPath: stagingParentPath, + VolumeCapability: &csi.VolumeCapability{ + AccessMode: &csi.VolumeCapability_AccessMode{ + Mode: accessModeStrToInt(pv.Spec.AccessModes[0]), + }, + }, + Secrets: deviceSecret, + VolumeContext: volumeContext, + } + if pv.Spec.PersistentVolumeSource.CSI.FSType == fsTypeBlockName { + req.VolumeCapability.AccessType = &csi.VolumeCapability_Block{ + Block: &csi.VolumeCapability_BlockVolume{}, + } + } else { + req.VolumeCapability.AccessType = &csi.VolumeCapability_Mount{ + Mount: &csi.VolumeCapability_MountVolume{ + FsType: pv.Spec.PersistentVolumeSource.CSI.FSType, + MountFlags: pv.Spec.MountOptions, + }, + } + } + + _, err = ns.NodeStageVolume(context.TODO(), req) + if err != nil { + util.ErrorLogMsg("nodeStageVolume request failed, volID: %s, stagingPath: %s, err: %v", + volID, stagingParentPath, err) + return err + } + + return nil +} + +// runVolumeHealer heal the volumes attached on a node. +func runVolumeHealer(ns *NodeServer, conf *util.Config) error { + c := util.NewK8sClient() + val, err := c.StorageV1().VolumeAttachments().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + util.ErrorLogMsg("list volumeAttachments failed, err: %v", err) + return err + } + + for i := range val.Items { + // skip if the volumeattachments doesn't belong to current node or driver + if val.Items[i].Spec.NodeName != conf.NodeID || val.Items[i].Spec.Attacher != conf.DriverName { + continue + } + pvName := *val.Items[i].Spec.Source.PersistentVolumeName + pv, err := c.CoreV1().PersistentVolumes().Get(context.TODO(), pvName, metav1.GetOptions{}) + if err != nil { + // skip if volume doesn't exist + if !apierrors.IsNotFound(err) { + util.ErrorLogMsg("get persistentVolumes failed for pv: %s, err: %v", pvName, err) + } + continue + } + // TODO: check with pv delete annotations, for eg: what happens when the pv is marked for delete + // skip this volumeattachment if its pv is not bound + if pv.Status.Phase != v1.VolumeBound { + continue + } + // skip if mounter is not rbd-nbd + if pv.Spec.PersistentVolumeSource.CSI.VolumeAttributes["mounter"] != "rbd-nbd" { + continue + } + + // ensure that the volume is still in attached state + va, err := c.StorageV1().VolumeAttachments().Get(context.TODO(), val.Items[i].Name, metav1.GetOptions{}) + if err != nil { + // skip if volume attachment doesn't exist + if !apierrors.IsNotFound(err) { + util.ErrorLogMsg("get volumeAttachments failed for volumeAttachment: %s, volID: %s, err: %v", + val.Items[i].Name, pv.Spec.PersistentVolumeSource.CSI.VolumeHandle, err) + } + continue + } + if !va.Status.Attached { + continue + } + + err = callNodeStageVolume(ns, c, pv, conf.StagingPath) + if err != nil { + util.ErrorLogMsg("callNodeStageVolume failed for VolID: %s, err: %v", + pv.Spec.PersistentVolumeSource.CSI.VolumeHandle, err) + } + } + + return nil +} diff --git a/internal/util/util.go b/internal/util/util.go index 3fa54a40c..10972c1da 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -72,6 +72,7 @@ type Config struct { NodeID string // node id InstanceID string // unique ID distinguishing this instance of Ceph CSI PluginPath string // location of cephcsi plugin + StagingPath string // location of cephcsi staging path DomainLabels string // list of domain labels to read from the node // metrics related flags