cephfs: enable VolumeCondition with new health-checker

The HealthChecker is configured to use the Staging path pf the volume,
with a `.csi/` subdirectory. In the future this directory could be a
directory that is not under the Published directory.

Fixes: #4219
Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos 2023-09-13 16:05:27 +02:00 committed by mergify[bot]
parent 54fc65a561
commit 7d96cafad7
2 changed files with 67 additions and 2 deletions

View File

@ -25,6 +25,7 @@ import (
casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs" casceph "github.com/ceph/ceph-csi/internal/csi-addons/cephfs"
csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server" csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server"
csicommon "github.com/ceph/ceph-csi/internal/csi-common" csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
@ -82,6 +83,7 @@ func NewNodeServer(
VolumeLocks: util.NewVolumeLocks(), VolumeLocks: util.NewVolumeLocks(),
kernelMountOptions: kernelMountOptions, kernelMountOptions: kernelMountOptions,
fuseMountOptions: fuseMountOptions, fuseMountOptions: fuseMountOptions,
healthChecker: hc.NewHealthCheckManager(),
} }
} }

View File

@ -29,6 +29,7 @@ import (
"github.com/ceph/ceph-csi/internal/cephfs/store" "github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
csicommon "github.com/ceph/ceph-csi/internal/csi-common" csicommon "github.com/ceph/ceph-csi/internal/csi-common"
hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt" "github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
@ -47,6 +48,7 @@ type NodeServer struct {
VolumeLocks *util.VolumeLocks VolumeLocks *util.VolumeLocks
kernelMountOptions string kernelMountOptions string
fuseMountOptions string fuseMountOptions string
healthChecker hc.Manager
} }
func getCredentialsForVolume( func getCredentialsForVolume(
@ -228,6 +230,8 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil return &csi.NodeStageVolumeResponse{}, nil
} }
@ -270,9 +274,24 @@ func (ns *NodeServer) NodeStageVolume(
} }
} }
ns.startSharedHealthChecker(ctx, req.GetVolumeId(), stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil return &csi.NodeStageVolumeResponse{}, nil
} }
// startSharedHealthChecker starts a health-checker on the stagingTargetPath.
// This checker can be shared between multiple containers.
//
// TODO: start a FileChecker for read-writable volumes that have an app-data subdir.
func (ns *NodeServer) startSharedHealthChecker(ctx context.Context, volumeID, dir string) {
// The StatChecker works for volumes that do not have a dedicated app-data
// subdirectory, or are read-only.
err := ns.healthChecker.StartSharedChecker(volumeID, dir, hc.StatCheckerType)
if err != nil {
log.WarningLog(ctx, "failed to start healthchecker: %v", err)
}
}
func (ns *NodeServer) mount( func (ns *NodeServer) mount(
ctx context.Context, ctx context.Context,
mnt mounter.VolumeMounter, mnt mounter.VolumeMounter,
@ -479,7 +498,8 @@ func (ns *NodeServer) NodePublishVolume(
// Ensure staging target path is a mountpoint. // Ensure staging target path is a mountpoint.
if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil { isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err) log.ErrorLog(ctx, "stat failed: %v", err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -491,7 +511,7 @@ func (ns *NodeServer) NodePublishVolume(
// Check if the volume is already mounted // Check if the volume is already mounted
isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) isMnt, err = util.IsMountPoint(ns.Mounter, targetPath)
if err != nil { if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err) log.ErrorLog(ctx, "stat failed: %v", err)
@ -545,6 +565,10 @@ func (ns *NodeServer) NodeUnpublishVolume(
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called // considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required. // at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
// stop the health-checker that may have been started in NodeGetVolumeStats()
ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath)
isMnt, err := util.IsMountPoint(ns.Mounter, targetPath) isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil { if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err) log.ErrorLog(ctx, "stat failed: %v", err)
@ -599,6 +623,9 @@ func (ns *NodeServer) NodeUnstageVolume(
} }
volID := req.GetVolumeId() volID := req.GetVolumeId()
ns.healthChecker.StopSharedChecker(volID)
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired { if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID) log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volID)
@ -670,6 +697,13 @@ func (ns *NodeServer) NodeGetCapabilities(
}, },
}, },
}, },
{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_VOLUME_CONDITION,
},
},
},
{ {
Type: &csi.NodeServiceCapability_Rpc{ Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{ Rpc: &csi.NodeServiceCapability_RPC{
@ -694,6 +728,35 @@ func (ns *NodeServer) NodeGetVolumeStats(
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
// health check first, return without stats if unhealthy
healthy, msg := ns.healthChecker.IsHealthy(req.GetVolumeId(), targetPath)
// If healthy and an error is returned, it means that the checker was not
// started. This could happen when the node-plugin was restarted and the
// volume is already staged and published.
if healthy && msg != nil {
// Start a StatChecker for the mounted targetPath, this prevents
// writing a file in the user-visible location. Ideally a (shared)
// FileChecker is started with the stagingTargetPath, but we can't
// get the stagingPath from the request easily.
// TODO: resolve the stagingPath like rbd.getStagingPath() does
err = ns.healthChecker.StartChecker(req.GetVolumeId(), targetPath, hc.StatCheckerType)
if err != nil {
log.WarningLog(ctx, "failed to start healthchecker: %v", err)
}
}
// !healthy indicates a problem with the volume
if !healthy {
return &csi.NodeGetVolumeStatsResponse{
VolumeCondition: &csi.VolumeCondition{
Abnormal: true,
Message: msg.Error(),
},
}, nil
}
// warning: stat() may hang on an unhealthy volume
stat, err := os.Stat(targetPath) stat, err := os.Stat(targetPath)
if err != nil { if err != nil {
if util.IsCorruptedMountError(err) { if util.IsCorruptedMountError(err) {