From 143003bcfdacd4b4ef32dd90b43b2a4d7055c2db Mon Sep 17 00:00:00 2001 From: gman Date: Tue, 26 Feb 2019 11:06:25 +0100 Subject: [PATCH] cephfs: added locks for {Create,Delete}Volume, NodeStageVolume --- pkg/cephfs/controllerserver.go | 11 +++++++++++ pkg/cephfs/nodeserver.go | 10 +++++++++- 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index 67a342040..a3ea1290e 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -24,6 +24,7 @@ import ( "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/drivers/pkg/csi-common" + "k8s.io/kubernetes/pkg/util/keymutex" "github.com/ceph/ceph-csi/pkg/util" ) @@ -40,6 +41,10 @@ type controllerCacheEntry struct { VolumeID volumeID } +var ( + mtxControllerVolumeID = keymutex.NewHashed(0) +) + // CreateVolume creates the volume in backend and store the volume metadata func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { if err := cs.validateCreateVolumeRequest(req); err != nil { @@ -58,6 +63,9 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol volID := makeVolumeID(req.GetName()) + mtxControllerVolumeID.LockKey(string(volID)) + defer mustUnlock(mtxControllerVolumeID, string(volID)) + // Create a volume in case the user didn't provide one if volOptions.ProvisionVolume { @@ -143,6 +151,9 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.InvalidArgument, err.Error()) } + mtxControllerVolumeID.LockKey(string(volID)) + defer mustUnlock(mtxControllerVolumeID, string(volID)) + if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil { klog.Errorf("failed to delete volume %s: %v", volID, err) return nil, status.Error(codes.Internal, err.Error()) diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index b9ec7284c..a5ffe1ad3 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" + "k8s.io/kubernetes/pkg/util/keymutex" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/drivers/pkg/csi-common" @@ -35,6 +36,10 @@ type NodeServer struct { *csicommon.DefaultNodeServer } +var ( + mtxNodeVolumeID = keymutex.NewHashed(0) +) + func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { var ( cr *credentials @@ -44,7 +49,7 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi if volOptions.ProvisionVolume { // The volume is provisioned dynamically, get the credentials directly from Ceph - // First, store admin credentials - those are needed for retrieving the user credentials + // First, get admin credentials - those are needed for retrieving the user credentials adminCr, err := getAdminCredentials(secrets) if err != nil { @@ -100,6 +105,9 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.Internal, err.Error()) } + mtxNodeVolumeID.LockKey(string(volID)) + defer mustUnlock(mtxNodeVolumeID, string(volID)) + // Check if the volume is already mounted isMnt, err := isMountPoint(stagingTargetPath)