cephfs: detect corrupt ceph-fuse mounts and try to remount

Mounts managed by ceph-fuse may get corrupted by e.g. the ceph-fuse process
exiting abruptly, or its parent container being terminated, taking down its
child processes with it.

This commit adds checks to NodeStageVolume and NodePublishVolume procedures
to detect whether a mountpoint in staging_target_path and/or target_path is
corrupted, and remount is performed if corruption is detected.

Signed-off-by: Robert Vasek <robert.vasek@cern.ch>
This commit is contained in:
Robert Vasek 2022-02-02 13:23:06 +01:00 committed by mergify[bot]
parent aa6297e164
commit 80dda7cc30
6 changed files with 520 additions and 2 deletions

View File

@ -126,6 +126,8 @@ spec:
mountPath: /etc/ceph-csi-config/
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: ceph-csi-mountinfo
mountPath: /csi/mountinfo
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
{{- if .Values.nodeplugin.httpMetrics.enabled }}
@ -207,6 +209,10 @@ spec:
emptyDir: {
medium: "Memory"
}
- name: ceph-csi-mountinfo
hostPath:
path: {{ .Values.kubeletDir }}/plugins/{{ .Values.driverName }}/mountinfo
type: DirectoryOrCreate
{{- if .Values.nodeplugin.affinity }}
affinity:
{{ toYaml .Values.nodeplugin.affinity | indent 8 -}}

View File

@ -100,6 +100,8 @@ spec:
mountPath: /etc/ceph-csi-config/
- name: keys-tmp-dir
mountPath: /tmp/csi/keys
- name: ceph-csi-mountinfo
mountPath: /csi/mountinfo
- name: liveness-prometheus
securityContext:
privileged: true
@ -164,6 +166,10 @@ spec:
emptyDir: {
medium: "Memory"
}
- name: ceph-csi-mountinfo
hostPath:
path: /var/lib/kubelet/plugins/cephfs.csi.ceph.com/mountinfo
type: DirectoryOrCreate
---
# This is a service to expose the liveness metrics
apiVersion: v1

View File

@ -0,0 +1,273 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"context"
"github.com/ceph/ceph-csi/internal/cephfs/mounter"
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
mountutil "k8s.io/mount-utils"
)
type (
mountState int
)
const (
msUnknown mountState = iota
msNotMounted
msMounted
msCorrupted
// ceph-fuse fsType in /proc/<PID>/mountinfo.
cephFuseFsType = "fuse.ceph-fuse"
)
func (ms mountState) String() string {
return [...]string{
"UNKNOWN",
"NOT_MOUNTED",
"MOUNTED",
"CORRUPTED",
}[int(ms)]
}
func getMountState(path string) (mountState, error) {
isMnt, err := util.IsMountPoint(path)
if err != nil {
if util.IsCorruptedMountError(err) {
return msCorrupted, nil
}
return msUnknown, err
}
if isMnt {
return msMounted, nil
}
return msNotMounted, nil
}
func findMountinfo(mountpoint string, mis []mountutil.MountInfo) int {
for i := range mis {
if mis[i].MountPoint == mountpoint {
return i
}
}
return -1
}
// Ensures that given mountpoint is of specified fstype.
// Returns true if fstype matches, or if no such mountpoint exists.
func validateFsType(mountpoint, fsType string, mis []mountutil.MountInfo) bool {
if idx := findMountinfo(mountpoint, mis); idx > 0 {
mi := mis[idx]
if mi.FsType != fsType {
return false
}
}
return true
}
// tryRestoreFuseMountsInNodePublish tries to restore staging and publish
// volume moutpoints inside the NodePublishVolume call.
//
// Restoration is performed in following steps:
// 1. Detection: staging target path must be a working mountpoint, and target
// path must not be a corrupted mountpoint (see getMountState()). If either
// of those checks fail, mount recovery is performed.
// 2. Recovery preconditions:
// * NodeStageMountinfo is present for this volume,
// * if staging target path and target path are mountpoints, they must be
// managed by ceph-fuse,
// * VolumeOptions.Mounter must evaluate to "fuse".
// 3. Recovery:
// * staging target path is unmounted and mounted again using ceph-fuse,
// * target path is only unmounted; NodePublishVolume is then expected to
// continue normally.
func (ns *NodeServer) tryRestoreFuseMountsInNodePublish(
ctx context.Context,
volID fsutil.VolumeID,
stagingTargetPath string,
targetPath string,
volContext map[string]string,
) error {
// Check if there is anything to restore.
stagingTargetMs, err := getMountState(stagingTargetPath)
if err != nil {
return err
}
targetMs, err := getMountState(targetPath)
if err != nil {
return err
}
if stagingTargetMs == msMounted && targetMs != msCorrupted {
// Mounts seem to be fine.
return nil
}
// Something is broken. Try to proceed with mount recovery.
log.WarningLog(ctx, "cephfs: mount problem detected when publishing a volume: %s is %s, %s is %s; attempting recovery",
stagingTargetPath, stagingTargetMs, targetPath, targetMs)
// NodeStageMountinfo entry must be present for this volume.
var nsMountinfo *fsutil.NodeStageMountinfo
if nsMountinfo, err = fsutil.GetNodeStageMountinfo(volID); err != nil {
return err
} else if nsMountinfo == nil {
log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery because NodeStageMountinfo record is missing")
return nil
}
// Check that the existing stage and publish mounts for this volume are
// managed by ceph-fuse, and that the mounter is of the FuseMounter type.
// Then try to restore them.
var (
volMounter mounter.VolumeMounter
volOptions *store.VolumeOptions
)
procMountInfo, err := util.ReadMountInfoForProc("self")
if err != nil {
return err
}
if !validateFsType(stagingTargetPath, cephFuseFsType, procMountInfo) ||
!validateFsType(targetPath, cephFuseFsType, procMountInfo) {
// We can't restore mounts not managed by ceph-fuse.
log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery on non-FUSE mountpoints")
return nil
}
volOptions, err = ns.getVolumeOptions(ctx, volID, volContext, nsMountinfo.Secrets)
if err != nil {
return err
}
volMounter, err = mounter.New(volOptions)
if err != nil {
return err
}
if _, ok := volMounter.(*mounter.FuseMounter); !ok {
// We can't restore mounts with non-FUSE mounter.
log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery with non-FUSE mounter")
return nil
}
// Try to restore mount in staging target path.
// Unmount and mount the volume.
if stagingTargetMs != msMounted {
if err := mounter.UnmountVolume(ctx, stagingTargetPath); err != nil {
return err
}
if err := ns.mount(
ctx,
volMounter,
volOptions,
volID,
stagingTargetPath,
nsMountinfo.Secrets,
nsMountinfo.VolumeCapability,
); err != nil {
return err
}
}
// Try to restore mount in target path.
// Only unmount the bind mount. NodePublishVolume should then
// create the bind mount by itself.
if err := mounter.UnmountVolume(ctx, targetPath); err != nil {
return err
}
return nil
}
// Try to restore FUSE mount of the staging target path in NodeStageVolume.
// If corruption is detected, try to only unmount the volume. NodeStageVolume
// should be able to continue with mounting the volume normally afterwards.
func (ns *NodeServer) tryRestoreFuseMountInNodeStage(
ctx context.Context,
mnt mounter.VolumeMounter,
stagingTargetPath string,
) error {
// Check if there is anything to restore.
stagingTargetMs, err := getMountState(stagingTargetPath)
if err != nil {
return err
}
if stagingTargetMs != msCorrupted {
// Mounts seem to be fine.
return nil
}
// Something is broken. Try to proceed with mount recovery.
log.WarningLog(ctx, "cephfs: mountpoint problem detected when staging a volume: %s is %s; attempting recovery",
stagingTargetPath, stagingTargetMs)
// Check that the existing stage mount for this volume is managed by
// ceph-fuse, and that the mounter is FuseMounter. Then try to restore them.
procMountInfo, err := util.ReadMountInfoForProc("self")
if err != nil {
return err
}
if !validateFsType(stagingTargetPath, cephFuseFsType, procMountInfo) {
// We can't restore mounts not managed by ceph-fuse.
log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery on non-FUSE mountpoints")
return nil
}
if _, ok := mnt.(*mounter.FuseMounter); !ok {
// We can't restore mounts with non-FUSE mounter.
log.WarningLog(ctx, "cephfs: cannot proceed with mount recovery with non-FUSE mounter")
return nil
}
// Restoration here means only unmounting the volume.
// NodeStageVolume should take care of the rest.
return mounter.UnmountVolume(ctx, stagingTargetPath)
}

View File

@ -135,6 +135,10 @@ func (ns *NodeServer) NodeStageVolume(
// Check if the volume is already mounted
if err = ns.tryRestoreFuseMountInNodeStage(ctx, mnt, stagingTargetPath); err != nil {
return nil, status.Errorf(codes.Internal, "failed to try to restore FUSE mounts: %v", err)
}
isMnt, err := util.IsMountPoint(stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)
@ -164,6 +168,25 @@ func (ns *NodeServer) NodeStageVolume(
log.DebugLog(ctx, "cephfs: successfully mounted volume %s to %s", volID, stagingTargetPath)
if _, isFuse := mnt.(*mounter.FuseMounter); isFuse {
// FUSE mount recovery needs NodeStageMountinfo records.
if err = fsutil.WriteNodeStageMountinfo(volID, &fsutil.NodeStageMountinfo{
VolumeCapability: req.GetVolumeCapability(),
Secrets: req.GetSecrets(),
}); err != nil {
log.ErrorLog(ctx, "cephfs: failed to write NodeStageMountinfo for volume %s: %v", volID, err)
// Try to clean node stage mount.
if unmountErr := mounter.UnmountVolume(ctx, stagingTargetPath); unmountErr != nil {
log.ErrorLog(ctx, "cephfs: failed to unmount %s in WriteNodeStageMountinfo clean up: %v",
stagingTargetPath, unmountErr)
}
return nil, status.Error(codes.Internal, err.Error())
}
}
return &csi.NodeStageVolumeResponse{}, nil
}
@ -237,12 +260,34 @@ func (ns *NodeServer) NodePublishVolume(
return nil, status.Error(codes.Internal, err.Error())
}
if err := ns.tryRestoreFuseMountsInNodePublish(
ctx,
volID,
stagingTargetPath,
targetPath,
req.GetVolumeContext(),
); err != nil {
return nil, status.Errorf(codes.Internal, "failed to try to restore FUSE mounts: %v", err)
}
if req.GetReadonly() {
mountOptions = append(mountOptions, "ro")
}
mountOptions = csicommon.ConstructMountOptions(mountOptions, req.GetVolumeCapability())
// Ensure staging target path is a mountpoint.
if isMnt, err := util.IsMountPoint(stagingTargetPath); err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)
return nil, status.Error(codes.Internal, err.Error())
} else if !isMnt {
return nil, status.Errorf(
codes.Internal, "staging path %s for volume %s is not a mountpoint", stagingTargetPath, volID,
)
}
// Check if the volume is already mounted
isMnt, err := util.IsMountPoint(targetPath)
@ -284,11 +329,14 @@ func (ns *NodeServer) NodeUnpublishVolume(
if err = util.ValidateNodeUnpublishVolumeRequest(req); err != nil {
return nil, err
}
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath()
isMnt, err := util.IsMountPoint(targetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)
if os.IsNotExist(err) {
// targetPath has already been deleted
log.DebugLog(ctx, "targetPath: %s has already been deleted", targetPath)
@ -296,7 +344,14 @@ func (ns *NodeServer) NodeUnpublishVolume(
return &csi.NodeUnpublishVolumeResponse{}, nil
}
return nil, status.Error(codes.Internal, err.Error())
if !util.IsCorruptedMountError(err) {
return nil, status.Error(codes.Internal, err.Error())
}
// Corrupted mounts need to be unmounted properly too,
// regardless of the mounter used. Continue as normal.
log.DebugLog(ctx, "cephfs: detected corrupted mount in publish target path %s, trying to unmount anyway", targetPath)
isMnt = true
}
if !isMnt {
if err = os.RemoveAll(targetPath); err != nil {
@ -340,8 +395,16 @@ func (ns *NodeServer) NodeUnstageVolume(
stagingTargetPath := req.GetStagingTargetPath()
if err = fsutil.RemoveNodeStageMountinfo(fsutil.VolumeID(volID)); err != nil {
log.ErrorLog(ctx, "cephfs: failed to remove NodeStageMountinfo for volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
isMnt, err := util.IsMountPoint(stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err)
if os.IsNotExist(err) {
// targetPath has already been deleted
log.DebugLog(ctx, "targetPath: %s has already been deleted", stagingTargetPath)
@ -349,7 +412,16 @@ func (ns *NodeServer) NodeUnstageVolume(
return &csi.NodeUnstageVolumeResponse{}, nil
}
return nil, status.Error(codes.Internal, err.Error())
if !util.IsCorruptedMountError(err) {
return nil, status.Error(codes.Internal, err.Error())
}
// Corrupted mounts need to be unmounted properly too,
// regardless of the mounter used. Continue as normal.
log.DebugLog(ctx,
"cephfs: detected corrupted mount in staging target path %s, trying to unmount anyway",
stagingTargetPath)
isMnt = true
}
if !isMnt {
return &csi.NodeUnstageVolumeResponse{}, nil

View File

@ -0,0 +1,149 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"fmt"
"os"
"path"
"github.com/container-storage-interface/spec/lib/go/csi"
// google.golang.org/protobuf/encoding doesn't offer MessageV2().
"github.com/golang/protobuf/proto" // nolint:staticcheck // See comment above.
"google.golang.org/protobuf/encoding/protojson"
)
// This file provides functionality to store various mount information
// in a file. It's currently used to restore ceph-fuse mounts.
// Mount info is stored in `/csi/mountinfo`.
const (
mountinfoDir = "/csi/mountinfo"
)
// nodeStageMountinfoRecord describes a single
// record of mountinfo of a staged volume.
// encoding/json-friendly format.
// Only for internal use for marshaling and unmarshaling.
type nodeStageMountinfoRecord struct {
VolumeCapabilityProtoJSON string `json:",omitempty"`
MountOptions []string `json:",omitempty"`
Secrets map[string]string `json:",omitempty"`
}
// NodeStageMountinfo describes mountinfo of a volume.
type NodeStageMountinfo struct {
VolumeCapability *csi.VolumeCapability
Secrets map[string]string
MountOptions []string
}
func fmtNodeStageMountinfoFilename(volID VolumeID) string {
return path.Join(mountinfoDir, fmt.Sprintf("nodestage-%s.json", volID))
}
func (mi *NodeStageMountinfo) toNodeStageMountinfoRecord() (*nodeStageMountinfoRecord, error) {
bs, err := protojson.Marshal(proto.MessageV2(mi.VolumeCapability))
if err != nil {
return nil, err
}
return &nodeStageMountinfoRecord{
VolumeCapabilityProtoJSON: string(bs),
MountOptions: mi.MountOptions,
Secrets: mi.Secrets,
}, nil
}
func (r *nodeStageMountinfoRecord) toNodeStageMountinfo() (*NodeStageMountinfo, error) {
volCapability := &csi.VolumeCapability{}
if err := protojson.Unmarshal([]byte(r.VolumeCapabilityProtoJSON), proto.MessageV2(volCapability)); err != nil {
return nil, err
}
return &NodeStageMountinfo{
VolumeCapability: volCapability,
MountOptions: r.MountOptions,
Secrets: r.Secrets,
}, nil
}
// WriteNodeStageMountinfo writes mount info to a file.
func WriteNodeStageMountinfo(volID VolumeID, mi *NodeStageMountinfo) error {
// Write NodeStageMountinfo into JSON-formatted byte slice.
r, err := mi.toNodeStageMountinfoRecord()
if err != nil {
return err
}
bs, err := json.Marshal(r)
if err != nil {
return err
}
// Write the byte slice into file.
err = os.WriteFile(fmtNodeStageMountinfoFilename(volID), bs, 0o600)
if os.IsNotExist(err) {
return nil
}
return err
}
// GetNodeStageMountinfo tries to retrieve NodeStageMountinfoRecord for `volID`.
// If it doesn't exist, `(nil, nil)` is returned.
func GetNodeStageMountinfo(volID VolumeID) (*NodeStageMountinfo, error) {
// Read the file.
bs, err := os.ReadFile(fmtNodeStageMountinfoFilename(volID))
if err != nil {
if os.IsNotExist(err) {
return nil, nil
}
return nil, err
}
// Unmarshall JSON-formatted byte slice into NodeStageMountinfo struct.
r := &nodeStageMountinfoRecord{}
if err = json.Unmarshal(bs, r); err != nil {
return nil, err
}
mi, err := r.toNodeStageMountinfo()
if err != nil {
return nil, err
}
return mi, err
}
// RemoveNodeStageMountinfo tries to remove NodeStageMountinfo for `volID`.
// If no such record exists for `volID`, it's considered success too.
func RemoveNodeStageMountinfo(volID VolumeID) error {
if err := os.Remove(fmtNodeStageMountinfoFilename(volID)); err != nil {
if !os.IsNotExist(err) {
return err
}
}
return nil
}

View File

@ -308,6 +308,18 @@ func IsMountPoint(p string) (bool, error) {
return !notMnt, nil
}
// IsCorruptedMountError checks if the given error is a result of a corrupted
// mountpoint.
func IsCorruptedMountError(err error) bool {
return mount.IsCorruptedMnt(err)
}
// ReadMountInfoForProc reads /proc/<PID>/mountpoint and marshals it into
// MountInfo structs.
func ReadMountInfoForProc(proc string) ([]mount.MountInfo, error) {
return mount.ParseMountInfo(fmt.Sprintf("/proc/%s/mountinfo", proc))
}
// Mount mounts the source to target path.
func Mount(source, target, fstype string, options []string) error {
dummyMount := mount.New("")