cephfs: added locks for {Create,Delete}Volume, NodeStageVolume

This commit is contained in:
gman 2019-02-26 11:06:25 +01:00
parent 60588d8968
commit 143003bcfd
2 changed files with 20 additions and 1 deletions

View File

@ -24,6 +24,7 @@ import (
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/drivers/pkg/csi-common" "github.com/kubernetes-csi/drivers/pkg/csi-common"
"k8s.io/kubernetes/pkg/util/keymutex"
"github.com/ceph/ceph-csi/pkg/util" "github.com/ceph/ceph-csi/pkg/util"
) )
@ -40,6 +41,10 @@ type controllerCacheEntry struct {
VolumeID volumeID VolumeID volumeID
} }
var (
mtxControllerVolumeID = keymutex.NewHashed(0)
)
// CreateVolume creates the volume in backend and store the volume metadata // CreateVolume creates the volume in backend and store the volume metadata
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateCreateVolumeRequest(req); err != nil { if err := cs.validateCreateVolumeRequest(req); err != nil {
@ -58,6 +63,9 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
volID := makeVolumeID(req.GetName()) volID := makeVolumeID(req.GetName())
mtxControllerVolumeID.LockKey(string(volID))
defer mustUnlock(mtxControllerVolumeID, string(volID))
// Create a volume in case the user didn't provide one // Create a volume in case the user didn't provide one
if volOptions.ProvisionVolume { if volOptions.ProvisionVolume {
@ -143,6 +151,9 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
mtxControllerVolumeID.LockKey(string(volID))
defer mustUnlock(mtxControllerVolumeID, string(volID))
if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil { if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil {
klog.Errorf("failed to delete volume %s: %v", volID, err) klog.Errorf("failed to delete volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())

View File

@ -24,6 +24,7 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"k8s.io/klog" "k8s.io/klog"
"k8s.io/kubernetes/pkg/util/keymutex"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/drivers/pkg/csi-common" "github.com/kubernetes-csi/drivers/pkg/csi-common"
@ -35,6 +36,10 @@ type NodeServer struct {
*csicommon.DefaultNodeServer *csicommon.DefaultNodeServer
} }
var (
mtxNodeVolumeID = keymutex.NewHashed(0)
)
func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) {
var ( var (
cr *credentials cr *credentials
@ -44,7 +49,7 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi
if volOptions.ProvisionVolume { if volOptions.ProvisionVolume {
// The volume is provisioned dynamically, get the credentials directly from Ceph // The volume is provisioned dynamically, get the credentials directly from Ceph
// First, store admin credentials - those are needed for retrieving the user credentials // First, get admin credentials - those are needed for retrieving the user credentials
adminCr, err := getAdminCredentials(secrets) adminCr, err := getAdminCredentials(secrets)
if err != nil { if err != nil {
@ -100,6 +105,9 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
mtxNodeVolumeID.LockKey(string(volID))
defer mustUnlock(mtxNodeVolumeID, string(volID))
// Check if the volume is already mounted // Check if the volume is already mounted
isMnt, err := isMountPoint(stagingTargetPath) isMnt, err := isMountPoint(stagingTargetPath)