diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 02553fcc1..bcbbfcd4b 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -18,7 +18,6 @@ package cephfs import ( "context" - "fmt" "github.com/golang/glog" "google.golang.org/grpc/codes" @@ -26,15 +25,12 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi/v0" "github.com/kubernetes-csi/drivers/pkg/csi-common" - "k8s.io/kubernetes/pkg/util/keymutex" ) type nodeServer struct { *csicommon.DefaultNodeServer } -var nsMtx = keymutex.NewKeyMutex() - func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { if req.GetVolumeCapability() == nil { return status.Error(codes.InvalidArgument, "Volume capability missing in request") @@ -63,54 +59,11 @@ func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) err return nil } -func newMounter(volOptions *volumeOptions, key string, readOnly bool) (volumeMounter, error) { - var m volumeMounter - - if volOptions.Mounter == volumeMounter_fuse { - keyring := cephKeyringData{ - User: volOptions.User, - Key: key, - RootPath: volOptions.RootPath, - ReadOnly: readOnly, - } - - if err := keyring.writeToFile(); err != nil { - msg := fmt.Sprintf("couldn't write ceph keyring for user %s: %v", volOptions.User, err) - glog.Error(msg) - return nil, status.Error(codes.Internal, msg) - } - - m = &fuseMounter{} - } else if volOptions.Mounter == volumeMounter_kernel { - secret := cephSecretData{ - User: volOptions.User, - Key: key, - } - - if err := secret.writeToFile(); err != nil { - msg := fmt.Sprintf("couldn't write ceph secret for user %s: %v", volOptions.User, err) - glog.Error(msg) - return nil, status.Error(codes.Internal, msg) - } - - m = &kernelMounter{} - } - - return m, nil -} - func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { if err := validateNodePublishVolumeRequest(req); err != nil { return nil, err } - /* - if err = tryLock(volId, nsMtx, "NodeServer"); err != nil { - return nil, err - } - defer nsMtx.UnlockKey(volId) - */ - // Configuration targetPath := req.GetTargetPath() @@ -178,13 +131,6 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu volId := req.GetVolumeId() - /* - if err := tryLock(volId, nsMtx, "NodeServer"); err != nil { - return nil, err - } - defer nsMtx.UnlockKey(volId) - */ - if err := unmountVolume(req.GetTargetPath()); err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 1f9a3ab1d..471b9659f 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -20,6 +20,7 @@ import ( "fmt" "os/exec" + "github.com/golang/glog" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -62,3 +63,39 @@ func getKeyFromCredentials(creds map[string]string) (string, error) { return "", fmt.Errorf("missing key in credentials") } } + +func newMounter(volOptions *volumeOptions, key string, readOnly bool) (volumeMounter, error) { + var m volumeMounter + + if volOptions.Mounter == volumeMounter_fuse { + keyring := cephKeyringData{ + User: volOptions.User, + Key: key, + RootPath: volOptions.RootPath, + ReadOnly: readOnly, + } + + if err := keyring.writeToFile(); err != nil { + msg := fmt.Sprintf("couldn't write ceph keyring for user %s: %v", volOptions.User, err) + glog.Error(msg) + return nil, status.Error(codes.Internal, msg) + } + + m = &fuseMounter{} + } else if volOptions.Mounter == volumeMounter_kernel { + secret := cephSecretData{ + User: volOptions.User, + Key: key, + } + + if err := secret.writeToFile(); err != nil { + msg := fmt.Sprintf("couldn't write ceph secret for user %s: %v", volOptions.User, err) + glog.Error(msg) + return nil, status.Error(codes.Internal, msg) + } + + m = &kernelMounter{} + } + + return m, nil +} diff --git a/pkg/cephfs/volume.go b/pkg/cephfs/volume.go index febdca282..6aaa46f59 100644 --- a/pkg/cephfs/volume.go +++ b/pkg/cephfs/volume.go @@ -32,46 +32,37 @@ type volumeMounter interface { type fuseMounter struct{} -func (m *fuseMounter) mount(mountPoint string, volOptions *volumeOptions) error { - out, err := execCommand("ceph-fuse", mountPoint, "-n", "client."+volOptions.User, "-r", volOptions.RootPath) +func execCommandAndValidate(program string, args ...string) error { + out, err := execCommand(program, args...) if err != nil { - return fmt.Errorf("cephfs: ceph-fuse failed with following error: %s\ncephfs: cephf-fuse output: %s", err, out) + return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out) } return nil } +func (m *fuseMounter) mount(mountPoint string, volOptions *volumeOptions) error { + return execCommandAndValidate("ceph-fuse", mountPoint, "-n", "client."+volOptions.User, "-r", volOptions.RootPath) +} + type kernelMounter struct{} func (m *kernelMounter) mount(mountPoint string, volOptions *volumeOptions) error { - out, err := execCommand("modprobe", "ceph") - if err != nil { - return fmt.Errorf("cephfs: modprobe failed with following error, %s\ncephfs: modprobe output: %s", err, out) + if err := execCommandAndValidate("modprobe", "ceph"); err != nil { + return err } - args := [...]string{ + return execCommandAndValidate("mount", "-t", "ceph", fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath), mountPoint, "-o", fmt.Sprintf("name=%s,secretfile=%s", volOptions.User, getCephSecretPath(volOptions.User)), - } - - out, err = execCommand("mount", args[:]...) - if err != nil { - return fmt.Errorf("cephfs: mount.ceph failed with following error: %s\ncephfs: mount.ceph output: %s", err, out) - } - - return nil + ) } func unmountVolume(mountPoint string) error { - out, err := execCommand("umount", mountPoint) - if err != nil { - return fmt.Errorf("cephfs: umount failed with following error: %v\ncephfs: umount output: %s", err, out) - } - - return nil + return execCommandAndValidate("umount", mountPoint) } func createMountPoint(root string) error {