diff --git a/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml b/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml index f0fe4fe58..b2244d195 100644 --- a/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml +++ b/charts/ceph-csi-cephfs/templates/nodeplugin-daemonset.yaml @@ -126,6 +126,8 @@ spec: mountPath: /etc/ceph-csi-config/ - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: ceph-csi-mountinfo + mountPath: /csi/mountinfo resources: {{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }} {{- if .Values.nodeplugin.httpMetrics.enabled }} @@ -207,6 +209,10 @@ spec: emptyDir: { medium: "Memory" } + - name: ceph-csi-mountinfo + hostPath: + path: {{ .Values.kubeletDir }}/plugins/{{ .Values.driverName }}/mountinfo + type: DirectoryOrCreate {{- if .Values.nodeplugin.affinity }} affinity: {{ toYaml .Values.nodeplugin.affinity | indent 8 -}} diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml index f044fc958..5e1d0f9d6 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml @@ -100,6 +100,8 @@ spec: mountPath: /etc/ceph-csi-config/ - name: keys-tmp-dir mountPath: /tmp/csi/keys + - name: ceph-csi-mountinfo + mountPath: /csi/mountinfo - name: liveness-prometheus securityContext: privileged: true @@ -164,6 +166,10 @@ spec: emptyDir: { medium: "Memory" } + - name: ceph-csi-mountinfo + hostPath: + path: /var/lib/kubelet/plugins/cephfs.csi.ceph.com/mountinfo + type: DirectoryOrCreate --- # This is a service to expose the liveness metrics apiVersion: v1 diff --git a/internal/cephfs/fuserecovery.go b/internal/cephfs/fuserecovery.go new file mode 100644 index 000000000..32018a328 --- /dev/null +++ b/internal/cephfs/fuserecovery.go @@ -0,0 +1,273 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "context" + + "github.com/ceph/ceph-csi/internal/cephfs/mounter" + "github.com/ceph/ceph-csi/internal/cephfs/store" + fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + mountutil "k8s.io/mount-utils" +) + +type ( + mountState int +) + +const ( + msUnknown mountState = iota + msNotMounted + msMounted + msCorrupted + + // ceph-fuse fsType in /proc//mountinfo. + cephFuseFsType = "fuse.ceph-fuse" +) + +func (ms mountState) String() string { + return [...]string{ + "UNKNOWN", + "NOT_MOUNTED", + "MOUNTED", + "CORRUPTED", + }[int(ms)] +} + +func getMountState(path string) (mountState, error) { + isMnt, err := util.IsMountPoint(path) + if err != nil { + if util.IsCorruptedMountError(err) { + return msCorrupted, nil + } + + return msUnknown, err + } + + if isMnt { + return msMounted, nil + } + + return msNotMounted, nil +} + +func findMountinfo(mountpoint string, mis []mountutil.MountInfo) int { + for i := range mis { + if mis[i].MountPoint == mountpoint { + return i + } + } + + return -1 +} + +// Ensures that given mountpoint is of specified fstype. +// Returns true if fstype matches, or if no such mountpoint exists. +func validateFsType(mountpoint, fsType string, mis []mountutil.MountInfo) bool { + if idx := findMountinfo(mountpoint, mis); idx > 0 { + mi := mis[idx] + + if mi.FsType != fsType { + return false + } + } + + return true +} + +// tryRestoreFuseMountsInNodePublish tries to restore staging and publish +// volume moutpoints inside the NodePublishVolume call. +// +// Restoration is performed in following steps: +// 1. Detection: staging target path must be a working mountpoint, and target +// path must not be a corrupted mountpoint (see getMountState()). If either +// of those checks fail, mount recovery is performed. +// 2. Recovery preconditions: +// * NodeStageMountinfo is present for this volume, +// * if staging target path and target path are mountpoints, they must be +// managed by ceph-fuse, +// * VolumeOptions.Mounter must evaluate to "fuse". +// 3. Recovery: +// * staging target path is unmounted and mounted again using ceph-fuse, +// * target path is only unmounted; NodePublishVolume is then expected to +// continue normally. +func (ns *NodeServer) tryRestoreFuseMountsInNodePublish( + ctx context.Context, + volID fsutil.VolumeID, + stagingTargetPath string, + targetPath string, + volContext map[string]string, +) error { + // Check if there is anything to restore. + + stagingTargetMs, err := getMountState(stagingTargetPath) + if err != nil { + return err + } + + targetMs, err := getMountState(targetPath) + if err != nil { + return err + } + + if stagingTargetMs == msMounted && targetMs != msCorrupted { + // Mounts seem to be fine. + return nil + } + + // Something is broken. Try to proceed with mount recovery. + + log.WarningLog(ctx, "cephfs: mount problem detected when publishing a volume: %s is %s, %s is %s; attempting recovery", + stagingTargetPath, stagingTargetMs, targetPath, targetMs) + + // NodeStageMountinfo entry must be present for this volume. + + var nsMountinfo *fsutil.NodeStageMountinfo + + if nsMountinfo, err = fsutil.GetNodeStageMountinfo(volID); err != nil { + return err + } else if nsMountinfo == nil { + log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery because NodeStageMountinfo record is missing") + + return nil + } + + // Check that the existing stage and publish mounts for this volume are + // managed by ceph-fuse, and that the mounter is of the FuseMounter type. + // Then try to restore them. + + var ( + volMounter mounter.VolumeMounter + volOptions *store.VolumeOptions + ) + + procMountInfo, err := util.ReadMountInfoForProc("self") + if err != nil { + return err + } + + if !validateFsType(stagingTargetPath, cephFuseFsType, procMountInfo) || + !validateFsType(targetPath, cephFuseFsType, procMountInfo) { + // We can't restore mounts not managed by ceph-fuse. + log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery on non-FUSE mountpoints") + + return nil + } + + volOptions, err = ns.getVolumeOptions(ctx, volID, volContext, nsMountinfo.Secrets) + if err != nil { + return err + } + + volMounter, err = mounter.New(volOptions) + if err != nil { + return err + } + + if _, ok := volMounter.(*mounter.FuseMounter); !ok { + // We can't restore mounts with non-FUSE mounter. + log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery with non-FUSE mounter") + + return nil + } + + // Try to restore mount in staging target path. + // Unmount and mount the volume. + + if stagingTargetMs != msMounted { + if err := mounter.UnmountVolume(ctx, stagingTargetPath); err != nil { + return err + } + + if err := ns.mount( + ctx, + volMounter, + volOptions, + volID, + stagingTargetPath, + nsMountinfo.Secrets, + nsMountinfo.VolumeCapability, + ); err != nil { + return err + } + } + + // Try to restore mount in target path. + // Only unmount the bind mount. NodePublishVolume should then + // create the bind mount by itself. + + if err := mounter.UnmountVolume(ctx, targetPath); err != nil { + return err + } + + return nil +} + +// Try to restore FUSE mount of the staging target path in NodeStageVolume. +// If corruption is detected, try to only unmount the volume. NodeStageVolume +// should be able to continue with mounting the volume normally afterwards. +func (ns *NodeServer) tryRestoreFuseMountInNodeStage( + ctx context.Context, + mnt mounter.VolumeMounter, + stagingTargetPath string, +) error { + // Check if there is anything to restore. + + stagingTargetMs, err := getMountState(stagingTargetPath) + if err != nil { + return err + } + + if stagingTargetMs != msCorrupted { + // Mounts seem to be fine. + return nil + } + + // Something is broken. Try to proceed with mount recovery. + + log.WarningLog(ctx, "cephfs: mountpoint problem detected when staging a volume: %s is %s; attempting recovery", + stagingTargetPath, stagingTargetMs) + + // Check that the existing stage mount for this volume is managed by + // ceph-fuse, and that the mounter is FuseMounter. Then try to restore them. + + procMountInfo, err := util.ReadMountInfoForProc("self") + if err != nil { + return err + } + + if !validateFsType(stagingTargetPath, cephFuseFsType, procMountInfo) { + // We can't restore mounts not managed by ceph-fuse. + log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery on non-FUSE mountpoints") + + return nil + } + + if _, ok := mnt.(*mounter.FuseMounter); !ok { + // We can't restore mounts with non-FUSE mounter. + log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery with non-FUSE mounter") + + return nil + } + + // Restoration here means only unmounting the volume. + // NodeStageVolume should take care of the rest. + return mounter.UnmountVolume(ctx, stagingTargetPath) +} diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index a6a6cb794..21457ff38 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -135,6 +135,10 @@ func (ns *NodeServer) NodeStageVolume( // Check if the volume is already mounted + if err = ns.tryRestoreFuseMountInNodeStage(ctx, mnt, stagingTargetPath); err != nil { + return nil, status.Errorf(codes.Internal, "failed to try to restore FUSE mounts: %v", err) + } + isMnt, err := util.IsMountPoint(stagingTargetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -164,6 +168,25 @@ func (ns *NodeServer) NodeStageVolume( log.DebugLog(ctx, "cephfs: successfully mounted volume %s to %s", volID, stagingTargetPath) + if _, isFuse := mnt.(*mounter.FuseMounter); isFuse { + // FUSE mount recovery needs NodeStageMountinfo records. + + if err = fsutil.WriteNodeStageMountinfo(volID, &fsutil.NodeStageMountinfo{ + VolumeCapability: req.GetVolumeCapability(), + Secrets: req.GetSecrets(), + }); err != nil { + log.ErrorLog(ctx, "cephfs: failed to write NodeStageMountinfo for volume %s: %v", volID, err) + + // Try to clean node stage mount. + if unmountErr := mounter.UnmountVolume(ctx, stagingTargetPath); unmountErr != nil { + log.ErrorLog(ctx, "cephfs: failed to unmount %s in WriteNodeStageMountinfo clean up: %v", + stagingTargetPath, unmountErr) + } + + return nil, status.Error(codes.Internal, err.Error()) + } + } + return &csi.NodeStageVolumeResponse{}, nil } @@ -237,12 +260,34 @@ func (ns *NodeServer) NodePublishVolume( return nil, status.Error(codes.Internal, err.Error()) } + if err := ns.tryRestoreFuseMountsInNodePublish( + ctx, + volID, + stagingTargetPath, + targetPath, + req.GetVolumeContext(), + ); err != nil { + return nil, status.Errorf(codes.Internal, "failed to try to restore FUSE mounts: %v", err) + } + if req.GetReadonly() { mountOptions = append(mountOptions, "ro") } mountOptions = csicommon.ConstructMountOptions(mountOptions, req.GetVolumeCapability()) + // Ensure staging target path is a mountpoint. + + if isMnt, err := util.IsMountPoint(stagingTargetPath); err != nil { + log.ErrorLog(ctx, "stat failed: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } else if !isMnt { + return nil, status.Errorf( + codes.Internal, "staging path %s for volume %s is not a mountpoint", stagingTargetPath, volID, + ) + } + // Check if the volume is already mounted isMnt, err := util.IsMountPoint(targetPath) @@ -284,11 +329,14 @@ func (ns *NodeServer) NodeUnpublishVolume( if err = util.ValidateNodeUnpublishVolumeRequest(req); err != nil { return nil, err } + // considering kubelet make sure node operations like unpublish/unstage...etc can not be called // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() isMnt, err := util.IsMountPoint(targetPath) if err != nil { + log.ErrorLog(ctx, "stat failed: %v", err) + if os.IsNotExist(err) { // targetPath has already been deleted log.DebugLog(ctx, "targetPath: %s has already been deleted", targetPath) @@ -296,7 +344,14 @@ func (ns *NodeServer) NodeUnpublishVolume( return &csi.NodeUnpublishVolumeResponse{}, nil } - return nil, status.Error(codes.Internal, err.Error()) + if !util.IsCorruptedMountError(err) { + return nil, status.Error(codes.Internal, err.Error()) + } + + // Corrupted mounts need to be unmounted properly too, + // regardless of the mounter used. Continue as normal. + log.DebugLog(ctx, "cephfs: detected corrupted mount in publish target path %s, trying to unmount anyway", targetPath) + isMnt = true } if !isMnt { if err = os.RemoveAll(targetPath); err != nil { @@ -340,8 +395,16 @@ func (ns *NodeServer) NodeUnstageVolume( stagingTargetPath := req.GetStagingTargetPath() + if err = fsutil.RemoveNodeStageMountinfo(fsutil.VolumeID(volID)); err != nil { + log.ErrorLog(ctx, "cephfs: failed to remove NodeStageMountinfo for volume %s: %v", volID, err) + + return nil, status.Error(codes.Internal, err.Error()) + } + isMnt, err := util.IsMountPoint(stagingTargetPath) if err != nil { + log.ErrorLog(ctx, "stat failed: %v", err) + if os.IsNotExist(err) { // targetPath has already been deleted log.DebugLog(ctx, "targetPath: %s has already been deleted", stagingTargetPath) @@ -349,7 +412,16 @@ func (ns *NodeServer) NodeUnstageVolume( return &csi.NodeUnstageVolumeResponse{}, nil } - return nil, status.Error(codes.Internal, err.Error()) + if !util.IsCorruptedMountError(err) { + return nil, status.Error(codes.Internal, err.Error()) + } + + // Corrupted mounts need to be unmounted properly too, + // regardless of the mounter used. Continue as normal. + log.DebugLog(ctx, + "cephfs: detected corrupted mount in staging target path %s, trying to unmount anyway", + stagingTargetPath) + isMnt = true } if !isMnt { return &csi.NodeUnstageVolumeResponse{}, nil diff --git a/internal/cephfs/util/mountinfo.go b/internal/cephfs/util/mountinfo.go new file mode 100644 index 000000000..cca1320f6 --- /dev/null +++ b/internal/cephfs/util/mountinfo.go @@ -0,0 +1,149 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "encoding/json" + "fmt" + "os" + "path" + + "github.com/container-storage-interface/spec/lib/go/csi" + // google.golang.org/protobuf/encoding doesn't offer MessageV2(). + "github.com/golang/protobuf/proto" // nolint:staticcheck // See comment above. + "google.golang.org/protobuf/encoding/protojson" +) + +// This file provides functionality to store various mount information +// in a file. It's currently used to restore ceph-fuse mounts. +// Mount info is stored in `/csi/mountinfo`. + +const ( + mountinfoDir = "/csi/mountinfo" +) + +// nodeStageMountinfoRecord describes a single +// record of mountinfo of a staged volume. +// encoding/json-friendly format. +// Only for internal use for marshaling and unmarshaling. +type nodeStageMountinfoRecord struct { + VolumeCapabilityProtoJSON string `json:",omitempty"` + MountOptions []string `json:",omitempty"` + Secrets map[string]string `json:",omitempty"` +} + +// NodeStageMountinfo describes mountinfo of a volume. +type NodeStageMountinfo struct { + VolumeCapability *csi.VolumeCapability + Secrets map[string]string + MountOptions []string +} + +func fmtNodeStageMountinfoFilename(volID VolumeID) string { + return path.Join(mountinfoDir, fmt.Sprintf("nodestage-%s.json", volID)) +} + +func (mi *NodeStageMountinfo) toNodeStageMountinfoRecord() (*nodeStageMountinfoRecord, error) { + bs, err := protojson.Marshal(proto.MessageV2(mi.VolumeCapability)) + if err != nil { + return nil, err + } + + return &nodeStageMountinfoRecord{ + VolumeCapabilityProtoJSON: string(bs), + MountOptions: mi.MountOptions, + Secrets: mi.Secrets, + }, nil +} + +func (r *nodeStageMountinfoRecord) toNodeStageMountinfo() (*NodeStageMountinfo, error) { + volCapability := &csi.VolumeCapability{} + if err := protojson.Unmarshal([]byte(r.VolumeCapabilityProtoJSON), proto.MessageV2(volCapability)); err != nil { + return nil, err + } + + return &NodeStageMountinfo{ + VolumeCapability: volCapability, + MountOptions: r.MountOptions, + Secrets: r.Secrets, + }, nil +} + +// WriteNodeStageMountinfo writes mount info to a file. +func WriteNodeStageMountinfo(volID VolumeID, mi *NodeStageMountinfo) error { + // Write NodeStageMountinfo into JSON-formatted byte slice. + + r, err := mi.toNodeStageMountinfoRecord() + if err != nil { + return err + } + + bs, err := json.Marshal(r) + if err != nil { + return err + } + + // Write the byte slice into file. + + err = os.WriteFile(fmtNodeStageMountinfoFilename(volID), bs, 0o600) + if os.IsNotExist(err) { + return nil + } + + return err +} + +// GetNodeStageMountinfo tries to retrieve NodeStageMountinfoRecord for `volID`. +// If it doesn't exist, `(nil, nil)` is returned. +func GetNodeStageMountinfo(volID VolumeID) (*NodeStageMountinfo, error) { + // Read the file. + + bs, err := os.ReadFile(fmtNodeStageMountinfoFilename(volID)) + if err != nil { + if os.IsNotExist(err) { + return nil, nil + } + + return nil, err + } + + // Unmarshall JSON-formatted byte slice into NodeStageMountinfo struct. + + r := &nodeStageMountinfoRecord{} + if err = json.Unmarshal(bs, r); err != nil { + return nil, err + } + + mi, err := r.toNodeStageMountinfo() + if err != nil { + return nil, err + } + + return mi, err +} + +// RemoveNodeStageMountinfo tries to remove NodeStageMountinfo for `volID`. +// If no such record exists for `volID`, it's considered success too. +func RemoveNodeStageMountinfo(volID VolumeID) error { + if err := os.Remove(fmtNodeStageMountinfoFilename(volID)); err != nil { + if !os.IsNotExist(err) { + return err + } + } + + return nil +} diff --git a/internal/util/util.go b/internal/util/util.go index 6bf522713..c67fd0570 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -308,6 +308,18 @@ func IsMountPoint(p string) (bool, error) { return !notMnt, nil } +// IsCorruptedMountError checks if the given error is a result of a corrupted +// mountpoint. +func IsCorruptedMountError(err error) bool { + return mount.IsCorruptedMnt(err) +} + +// ReadMountInfoForProc reads /proc//mountpoint and marshals it into +// MountInfo structs. +func ReadMountInfoForProc(proc string) ([]mount.MountInfo, error) { + return mount.ParseMountInfo(fmt.Sprintf("/proc/%s/mountinfo", proc)) +} + // Mount mounts the source to target path. func Mount(source, target, fstype string, options []string) error { dummyMount := mount.New("")