cleanup: create k8s.io/mount-utils Mounter only once

Recently the k8s.io/mount-utils package added more runtime dectection.
When creating a new Mounter, the detect is run every time. This is
unfortunate, as it logs a message like the following:

```
mount_linux.go:283] Detected umount with safe 'not mounted' behavior
```

This message might be useful, so it probably good to keep it.

In Ceph-CSI there are various locations where Mounter instances are
created. Moving that to the DefaultNodeServer type reduces it to a
single place. Some utility functions need to accept the additional
parameter too, so that has been modified as well.

See-also: kubernetes/kubernetes#109676
Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2022-07-18 18:13:36 +02:00 committed by mergify[bot]
parent 5ed305850f
commit 011d4fc81c
8 changed files with 33 additions and 32 deletions

View File

@ -51,8 +51,8 @@ func (ms mountState) String() string {
}[int(ms)] }[int(ms)]
} }
func getMountState(path string) (mountState, error) { func (ns *NodeServer) getMountState(path string) (mountState, error) {
isMnt, err := util.IsMountPoint(path) isMnt, err := util.IsMountPoint(ns.Mounter, path)
if err != nil { if err != nil {
if util.IsCorruptedMountError(err) { if util.IsCorruptedMountError(err) {
return msCorrupted, nil return msCorrupted, nil
@ -117,12 +117,12 @@ func (ns *NodeServer) tryRestoreFuseMountsInNodePublish(
) error { ) error {
// Check if there is anything to restore. // Check if there is anything to restore.
stagingTargetMs, err := getMountState(stagingTargetPath) stagingTargetMs, err := ns.getMountState(stagingTargetPath)
if err != nil { if err != nil {
return err return err
} }
targetMs, err := getMountState(targetPath) targetMs, err := ns.getMountState(targetPath)
if err != nil { if err != nil {
return err return err
} }
@ -230,7 +230,7 @@ func (ns *NodeServer) tryRestoreFuseMountInNodeStage(
) error { ) error {
// Check if there is anything to restore. // Check if there is anything to restore.
stagingTargetMs, err := getMountState(stagingTargetPath) stagingTargetMs, err := ns.getMountState(stagingTargetPath)
if err != nil { if err != nil {
return err return err
} }

View File

@ -176,7 +176,7 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Errorf(codes.Internal, "failed to try to restore FUSE mounts: %v", err) return nil, status.Errorf(codes.Internal, "failed to try to restore FUSE mounts: %v", err)
} }
isMnt, err := util.IsMountPoint(stagingTargetPath) isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath)
if err != nil { if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err) log.ErrorLog(ctx, "stat failed: %v", err)
@ -426,7 +426,7 @@ func (ns *NodeServer) NodePublishVolume(
// Ensure staging target path is a mountpoint. // Ensure staging target path is a mountpoint.
if isMnt, err := util.IsMountPoint(stagingTargetPath); err != nil { if isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath); err != nil {
log.ErrorLog(ctx, "stat failed: %v", err) log.ErrorLog(ctx, "stat failed: %v", err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -438,7 +438,7 @@ func (ns *NodeServer) NodePublishVolume(
// Check if the volume is already mounted // Check if the volume is already mounted
isMnt, err := util.IsMountPoint(targetPath) isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil { if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err) log.ErrorLog(ctx, "stat failed: %v", err)
@ -482,7 +482,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called // considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required. // at same time, an explicit locking at time of nodeunpublish is not required.
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
isMnt, err := util.IsMountPoint(targetPath) isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
if err != nil { if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err) log.ErrorLog(ctx, "stat failed: %v", err)
@ -551,7 +551,7 @@ func (ns *NodeServer) NodeUnstageVolume(
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
isMnt, err := util.IsMountPoint(stagingTargetPath) isMnt, err := util.IsMountPoint(ns.Mounter, stagingTargetPath)
if err != nil { if err != nil {
log.ErrorLog(ctx, "stat failed: %v", err) log.ErrorLog(ctx, "stat failed: %v", err)
@ -637,7 +637,7 @@ func (ns *NodeServer) NodeGetVolumeStats(
} }
if stat.Mode().IsDir() { if stat.Mode().IsDir() {
return csicommon.FilesystemNodeGetVolumeStats(ctx, targetPath) return csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath)
} }
return nil, status.Errorf(codes.InvalidArgument, "targetpath %q is not a directory or device", targetPath) return nil, status.Errorf(codes.InvalidArgument, "targetpath %q is not a directory or device", targetPath)

View File

@ -24,12 +24,14 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
mount "k8s.io/mount-utils"
) )
// DefaultNodeServer stores driver object. // DefaultNodeServer stores driver object.
type DefaultNodeServer struct { type DefaultNodeServer struct {
Driver *CSIDriver Driver *CSIDriver
Type string Type string
Mounter mount.Interface
} }
// NodeExpandVolume returns unimplemented response. // NodeExpandVolume returns unimplemented response.

View File

@ -38,6 +38,7 @@ import (
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/volume" "k8s.io/kubernetes/pkg/volume"
mount "k8s.io/mount-utils"
) )
func parseEndpoint(ep string) (string, string, error) { func parseEndpoint(ep string) (string, string, error) {
@ -63,6 +64,7 @@ func NewDefaultNodeServer(d *CSIDriver, t string, topology map[string]string) *D
return &DefaultNodeServer{ return &DefaultNodeServer{
Driver: d, Driver: d,
Type: t, Type: t,
Mounter: mount.New(""),
} }
} }
@ -229,8 +231,12 @@ func panicHandler(
// requested by the NodeGetVolumeStats CSI procedure. // requested by the NodeGetVolumeStats CSI procedure.
// It is shared for FileMode volumes, both the CephFS and RBD NodeServers call // It is shared for FileMode volumes, both the CephFS and RBD NodeServers call
// this. // this.
func FilesystemNodeGetVolumeStats(ctx context.Context, targetPath string) (*csi.NodeGetVolumeStatsResponse, error) { func FilesystemNodeGetVolumeStats(
isMnt, err := util.IsMountPoint(targetPath) ctx context.Context,
mounter mount.Interface,
targetPath string,
) (*csi.NodeGetVolumeStatsResponse, error) {
isMnt, err := util.IsMountPoint(mounter, targetPath)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
return nil, status.Errorf(codes.InvalidArgument, "targetpath %s does not exist", targetPath) return nil, status.Errorf(codes.InvalidArgument, "targetpath %s does not exist", targetPath)

View File

@ -26,6 +26,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
mount "k8s.io/mount-utils"
) )
var fakeID = "fake-id" var fakeID = "fake-id"
@ -87,7 +88,7 @@ func TestFilesystemNodeGetVolumeStats(t *testing.T) {
// retry until a mountpoint is found // retry until a mountpoint is found
for { for {
stats, err := FilesystemNodeGetVolumeStats(context.TODO(), cwd) stats, err := FilesystemNodeGetVolumeStats(context.TODO(), mount.New(""), cwd)
if err != nil && cwd != "/" && strings.HasSuffix(err.Error(), "is not mounted") { if err != nil && cwd != "/" && strings.HasSuffix(err.Error(), "is not mounted") {
// try again with the parent directory // try again with the parent directory
cwd = filepath.Dir(cwd) cwd = filepath.Dir(cwd)

View File

@ -29,7 +29,6 @@ import (
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
mount "k8s.io/mount-utils"
) )
// Driver contains the default identity,node and controller struct. // Driver contains the default identity,node and controller struct.
@ -73,11 +72,8 @@ func NewReplicationServer(c *rbd.ControllerServer) *rbd.ReplicationServer {
// NewNodeServer initialize a node server for rbd CSI driver. // NewNodeServer initialize a node server for rbd CSI driver.
func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*rbd.NodeServer, error) { func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*rbd.NodeServer, error) {
mounter := mount.New("")
return &rbd.NodeServer{ return &rbd.NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
Mounter: mounter,
VolumeLocks: util.NewVolumeLocks(), VolumeLocks: util.NewVolumeLocks(),
}, nil }, nil
} }

View File

@ -42,7 +42,6 @@ import (
// node server spec. // node server spec.
type NodeServer struct { type NodeServer struct {
*csicommon.DefaultNodeServer *csicommon.DefaultNodeServer
Mounter mount.Interface
// A map storing all volumes with ongoing operations so that additional operations // A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error // for that same volume (as defined by VolumeID) return an Aborted error
VolumeLocks *util.VolumeLocks VolumeLocks *util.VolumeLocks
@ -806,7 +805,7 @@ func (ns *NodeServer) mountVolume(ctx context.Context, stagingPath string, req *
if readOnly { if readOnly {
mountOptions = append(mountOptions, "ro") mountOptions = append(mountOptions, "ro")
} }
if err := util.Mount(stagingPath, targetPath, fsType, mountOptions); err != nil { if err := util.Mount(ns.Mounter, stagingPath, targetPath, fsType, mountOptions); err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
@ -1241,7 +1240,7 @@ func (ns *NodeServer) NodeGetVolumeStats(
} }
if stat.Mode().IsDir() { if stat.Mode().IsDir() {
return csicommon.FilesystemNodeGetVolumeStats(ctx, targetPath) return csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath)
} else if (stat.Mode() & os.ModeDevice) == os.ModeDevice { } else if (stat.Mode() & os.ModeDevice) == os.ModeDevice {
return blockNodeGetVolumeStats(ctx, targetPath) return blockNodeGetVolumeStats(ctx, targetPath)
} }

View File

@ -325,9 +325,8 @@ func checkDirExists(p string) bool {
} }
// IsMountPoint checks if the given path is mountpoint or not. // IsMountPoint checks if the given path is mountpoint or not.
func IsMountPoint(p string) (bool, error) { func IsMountPoint(mounter mount.Interface, p string) (bool, error) {
dummyMount := mount.New("") notMnt, err := mounter.IsLikelyNotMountPoint(p)
notMnt, err := dummyMount.IsLikelyNotMountPoint(p)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -348,10 +347,8 @@ func ReadMountInfoForProc(proc string) ([]mount.MountInfo, error) {
} }
// Mount mounts the source to target path. // Mount mounts the source to target path.
func Mount(source, target, fstype string, options []string) error { func Mount(mounter mount.Interface, source, target, fstype string, options []string) error {
dummyMount := mount.New("") return mounter.MountSensitiveWithoutSystemd(source, target, fstype, options, nil)
return dummyMount.MountSensitiveWithoutSystemd(source, target, fstype, options, nil)
} }
// MountOptionsAdd adds the `add` mount options to the `options` and returns a // MountOptionsAdd adds the `add` mount options to the `options` and returns a