diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 2ff56750f..781c89e9a 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -25,6 +25,7 @@ import ( casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs" csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -82,6 +83,7 @@ func NewNodeServer( VolumeLocks: util.NewVolumeLocks(), kernelMountOptions: kernelMountOptions, fuseMountOptions: fuseMountOptions, + healthChecker: hc.NewHealthCheckManager(), } } diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 7287cde76..94a9cb492 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -29,6 +29,7 @@ import ( "github.com/ceph/ceph-csi/internal/cephfs/store" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" csicommon "github.com/ceph/ceph-csi/internal/csi-common" + hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/fscrypt" "github.com/ceph/ceph-csi/internal/util/log" @@ -47,6 +48,7 @@ type NodeServer struct { VolumeLocks *util.VolumeLocks kernelMountOptions string fuseMountOptions string + healthChecker hc.Manager } func getCredentialsForVolume( @@ -228,6 +230,8 @@ func (ns *NodeServer) NodeStageVolume( return nil, status.Error(codes.Internal, err.Error()) } + ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil } @@ -270,9 +274,24 @@ func (ns *NodeServer) NodeStageVolume( } } + ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil } +// startSharedHealthChecker starts a health-checker on the stagingTargetPath. +// This checker can be shared between multiple containers. +// +// TODO: start a FileChecker for read-writable volumes that have an app-data subdir. +func (ns *NodeServer) startSharedHealthChecker(ctx context.Context, volumeID, dir string) { + // The StatChecker works for volumes that do not have a dedicated app-data + // subdirectory, or are read-only. + err := ns.healthChecker.StartSharedChecker(volumeID, dir, hc.StatCheckerType) + if err != nil { + log.WarningLog(ctx, "failed to start healthchecker: %v", err) + } +} + func (ns *NodeServer) mount( ctx context.Context, mnt mounter.VolumeMounter, @@ -479,7 +498,8 @@ func (ns *NodeServer) NodePublishVolume( // Ensure staging target path is a mountpoint. - if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil { + isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath) + if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) return nil, status.Error(codes.Internal, err.Error()) @@ -491,7 +511,7 @@ func (ns *NodeServer) NodePublishVolume( // Check if the volume is already mounted - isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) + isMnt, err = util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -545,6 +565,10 @@ func (ns *NodeServer) NodeUnpublishVolume( // considering kubelet make sure node operations like unpublish/unstage...etc can not be called // at same time, an explicit locking at time of nodeunpublish is not required. targetPath := req.GetTargetPath() + + // stop the health-checker that may have been started in NodeGetVolumeStats() + ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath) + isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) if err != nil { log.ErrorLog(ctx, "stat failed: %v", err) @@ -599,6 +623,9 @@ func (ns *NodeServer) NodeUnstageVolume( } volID := req.GetVolumeId() + + ns.healthChecker.StopSharedChecker(volID) + if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired { log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID) @@ -670,6 +697,13 @@ func (ns *NodeServer) NodeGetCapabilities( }, }, }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION, + }, + }, + }, { Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ @@ -694,6 +728,35 @@ func (ns *NodeServer) NodeGetVolumeStats( return nil, status.Error(codes.InvalidArgument, err.Error()) } + // health check first, return without stats if unhealthy + healthy, msg := ns.healthChecker.IsHealthy(req.GetVolumeId(), targetPath) + + // If healthy and an error is returned, it means that the checker was not + // started. This could happen when the node-plugin was restarted and the + // volume is already staged and published. + if healthy && msg != nil { + // Start a StatChecker for the mounted targetPath, this prevents + // writing a file in the user-visible location. Ideally a (shared) + // FileChecker is started with the stagingTargetPath, but we can't + // get the stagingPath from the request easily. + // TODO: resolve the stagingPath like rbd.getStagingPath() does + err = ns.healthChecker.StartChecker(req.GetVolumeId(), targetPath, hc.StatCheckerType) + if err != nil { + log.WarningLog(ctx, "failed to start healthchecker: %v", err) + } + } + + // !healthy indicates a problem with the volume + if !healthy { + return &csi.NodeGetVolumeStatsResponse{ + VolumeCondition: &csi.VolumeCondition{ + Abnormal: true, + Message: msg.Error(), + }, + }, nil + } + + // warning: stat() may hang on an unhealthy volume stat, err := os.Stat(targetPath) if err != nil { if util.IsCorruptedMountError(err) {