mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-09 16:00:22 +00:00
Change the logic of locking
if any on going opearation is seen,we
have to return Abort error message
Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
(cherry picked from commit 6aac399075
)
This commit is contained in:
parent
cbd42a2181
commit
1ae09be924
@ -33,6 +33,9 @@ import (
|
||||
type ControllerServer struct {
|
||||
*csicommon.DefaultControllerServer
|
||||
MetadataStore util.CachePersister
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same volume (as defined by VolumeID/volume name) return an Aborted error
|
||||
VolumeLocks *util.VolumeLocks
|
||||
}
|
||||
|
||||
type controllerCacheEntry struct {
|
||||
@ -40,11 +43,6 @@ type controllerCacheEntry struct {
|
||||
VolumeID volumeID
|
||||
}
|
||||
|
||||
var (
|
||||
volumeIDLocker = util.NewIDLocker()
|
||||
volumeNameLocker = util.NewIDLocker()
|
||||
)
|
||||
|
||||
// createBackingVolume creates the backing subvolume and on any error cleans up any created entities
|
||||
func (cs *ControllerServer) createBackingVolume(ctx context.Context, volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error {
|
||||
cr, err := util.NewAdminCredentials(secret)
|
||||
@ -78,6 +76,14 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
// Configuration
|
||||
secret := req.GetSecrets()
|
||||
requestName := req.GetName()
|
||||
|
||||
// Existence and conflict checks
|
||||
if acquired := cs.VolumeLocks.TryAcquire(requestName); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), requestName)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, requestName)
|
||||
}
|
||||
defer cs.VolumeLocks.Release(requestName)
|
||||
|
||||
volOptions, err := newVolumeOptions(ctx, requestName, req.GetCapacityRange().GetRequiredBytes(),
|
||||
req.GetParameters(), secret)
|
||||
if err != nil {
|
||||
@ -85,10 +91,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
// Existence and conflict checks
|
||||
idLk := volumeNameLocker.Lock(requestName)
|
||||
defer volumeNameLocker.Unlock(idLk, requestName)
|
||||
|
||||
vID, err := checkVolExists(ctx, volOptions, secret)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -177,8 +179,11 @@ func (cs *ControllerServer) deleteVolumeDeprecated(ctx context.Context, req *csi
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
idLk := volumeIDLocker.Lock(string(volID))
|
||||
defer volumeIDLocker.Unlock(idLk, string(volID))
|
||||
if acquired := cs.VolumeLocks.TryAcquire(string(volID)); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, string(volID))
|
||||
}
|
||||
defer cs.VolumeLocks.Release(string(volID))
|
||||
|
||||
if err = purgeVolumeDeprecated(ctx, volID, cr, &ce.VolOptions); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err)
|
||||
@ -209,6 +214,13 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
volID := volumeID(req.GetVolumeId())
|
||||
secrets := req.GetSecrets()
|
||||
|
||||
// lock out parallel delete operations
|
||||
if acquired := cs.VolumeLocks.TryAcquire(string(volID)); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, string(volID))
|
||||
}
|
||||
defer cs.VolumeLocks.Release(string(volID))
|
||||
|
||||
// Find the volume using the provided VolumeID
|
||||
volOptions, vID, err := newVolumeOptionsFromVolID(ctx, string(volID), nil, secrets)
|
||||
if err != nil {
|
||||
@ -227,6 +239,13 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// lock out parallel delete and create requests against the same volume name as we
|
||||
// cleanup the subvolume and associated omaps for the same
|
||||
if acquired := cs.VolumeLocks.TryAcquire(volOptions.RequestName); !acquired {
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volOptions.RequestName)
|
||||
}
|
||||
defer cs.VolumeLocks.Release(string(volID))
|
||||
|
||||
// Deleting a volume requires admin credentials
|
||||
cr, err := util.NewAdminCredentials(secrets)
|
||||
if err != nil {
|
||||
@ -235,11 +254,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
// lock out parallel delete and create requests against the same volume name as we
|
||||
// cleanup the subvolume and associated omaps for the same
|
||||
idLk := volumeNameLocker.Lock(volOptions.RequestName)
|
||||
defer volumeNameLocker.Unlock(idLk, volOptions.RequestName)
|
||||
|
||||
if err = purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
|
@ -76,6 +76,7 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
|
||||
return &ControllerServer{
|
||||
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
||||
MetadataStore: cachePersister,
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -83,6 +84,7 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
|
||||
func NewNodeServer(d *csicommon.CSIDriver, t string) *NodeServer {
|
||||
return &NodeServer{
|
||||
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t),
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,12 +34,11 @@ import (
|
||||
// node server spec.
|
||||
type NodeServer struct {
|
||||
*csicommon.DefaultNodeServer
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same volume (as defined by VolumeID) return an Aborted error
|
||||
VolumeLocks *util.VolumeLocks
|
||||
}
|
||||
|
||||
var (
|
||||
nodeVolumeIDLocker = util.NewIDLocker()
|
||||
)
|
||||
|
||||
func getCredentialsForVolume(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) {
|
||||
var (
|
||||
err error
|
||||
@ -80,6 +79,12 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
volID := volumeID(req.GetVolumeId())
|
||||
|
||||
if acquired := ns.VolumeLocks.TryAcquire(req.GetVolumeId()); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetVolumeId())
|
||||
}
|
||||
defer ns.VolumeLocks.Release(req.GetVolumeId())
|
||||
|
||||
volOptions, _, err := newVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets())
|
||||
if err != nil {
|
||||
if _, ok := err.(ErrInvalidVolID); !ok {
|
||||
@ -102,9 +107,6 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
}
|
||||
}
|
||||
|
||||
idLk := nodeVolumeIDLocker.Lock(string(volID))
|
||||
defer nodeVolumeIDLocker.Unlock(idLk, string(volID))
|
||||
|
||||
// Check if the volume is already mounted
|
||||
|
||||
isMnt, err := util.IsMountPoint(stagingTargetPath)
|
||||
@ -167,11 +169,15 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Configuration
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
volID := req.GetVolumeId()
|
||||
|
||||
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(volID)
|
||||
|
||||
if err := util.CreateMountPoint(targetPath); err != nil {
|
||||
klog.Errorf(util.Log(ctx, "failed to create mount point at %s: %v"), targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -243,9 +249,15 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
targetPath := req.GetTargetPath()
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(volID)
|
||||
|
||||
if err = volumeMountCache.nodeUnPublishVolume(ctx, volID, targetPath); err != nil {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to unpublish volume %s %s: %v"), volID, targetPath, err)
|
||||
}
|
||||
@ -271,9 +283,15 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(volID)
|
||||
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
if err = volumeMountCache.nodeUnStageVolume(volID); err != nil {
|
||||
klog.Warningf(util.Log(ctx, "mount-cache: failed to unstage volume %s %s: %v"), volID, stagingTargetPath, err)
|
||||
}
|
||||
|
@ -39,6 +39,13 @@ const (
|
||||
type ControllerServer struct {
|
||||
*csicommon.DefaultControllerServer
|
||||
MetadataStore util.CachePersister
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same volume (as defined by VolumeID/volume name) return an Aborted error
|
||||
VolumeLocks *util.VolumeLocks
|
||||
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same snapshot (as defined by SnapshotID/snapshot name) return an Aborted error
|
||||
SnapshotLocks *util.VolumeLocks
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error {
|
||||
@ -123,8 +130,12 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
return nil, err
|
||||
}
|
||||
|
||||
idLk := volumeNameLocker.Lock(req.GetName())
|
||||
defer volumeNameLocker.Unlock(idLk, req.GetName())
|
||||
// Existence and conflict checks
|
||||
if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), req.GetName())
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetName())
|
||||
}
|
||||
defer cs.VolumeLocks.Release(req.GetName())
|
||||
|
||||
found, err := checkVolExists(ctx, rbdVol, cr)
|
||||
if err != nil {
|
||||
@ -239,8 +250,11 @@ func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.Del
|
||||
" proceed with deleting legacy volume ID (%s)", volumeID)
|
||||
}
|
||||
|
||||
idLk := legacyVolumeIDLocker.Lock(volumeID)
|
||||
defer legacyVolumeIDLocker.Unlock(idLk, volumeID)
|
||||
if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volumeID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
||||
}
|
||||
defer cs.VolumeLocks.Release(volumeID)
|
||||
|
||||
rbdVol := &rbdVolume{}
|
||||
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
|
||||
@ -294,6 +308,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
||||
}
|
||||
|
||||
if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volumeID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
|
||||
}
|
||||
defer cs.VolumeLocks.Release(volumeID)
|
||||
|
||||
rbdVol := &rbdVolume{}
|
||||
if err := genVolFromVolID(ctx, rbdVol, volumeID, cr); err != nil {
|
||||
// If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt
|
||||
@ -323,8 +343,11 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
// If error is ErrImageNotFound then we failed to find the image, but found the imageOMap
|
||||
// to lead us to the image, hence the imageOMap needs to be garbage collected, by calling
|
||||
// unreserve for the same
|
||||
idLk := volumeNameLocker.Lock(rbdVol.RequestName)
|
||||
defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName)
|
||||
if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdVol.RequestName)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
|
||||
}
|
||||
defer cs.VolumeLocks.Release(rbdVol.RequestName)
|
||||
|
||||
if err := undoVolReservation(ctx, rbdVol, cr); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -334,8 +357,11 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
|
||||
// lock out parallel create requests against the same volume name as we
|
||||
// cleanup the image and associated omaps for the same
|
||||
idLk := volumeNameLocker.Lock(rbdVol.RequestName)
|
||||
defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName)
|
||||
if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdVol.RequestName)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
|
||||
}
|
||||
defer cs.VolumeLocks.Release(rbdVol.RequestName)
|
||||
|
||||
// Deleting rbd image
|
||||
klog.V(4).Infof(util.Log(ctx, "deleting image %s"), rbdVol.RbdImageName)
|
||||
@ -413,8 +439,11 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
||||
rbdSnap.SourceVolumeID = req.GetSourceVolumeId()
|
||||
rbdSnap.RequestName = req.GetName()
|
||||
|
||||
idLk := snapshotNameLocker.Lock(req.GetName())
|
||||
defer snapshotNameLocker.Unlock(idLk, req.GetName())
|
||||
if acquired := cs.SnapshotLocks.TryAcquire(req.GetName()); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), req.GetName())
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetName())
|
||||
}
|
||||
defer cs.SnapshotLocks.Release(req.GetName())
|
||||
|
||||
// Need to check for already existing snapshot name, and if found
|
||||
// check for the requested source volume id and already allocated source volume id
|
||||
@ -549,6 +578,12 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
||||
return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty")
|
||||
}
|
||||
|
||||
if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), snapshotID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, snapshotID)
|
||||
}
|
||||
defer cs.SnapshotLocks.Release(snapshotID)
|
||||
|
||||
rbdSnap := &rbdSnapshot{}
|
||||
if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil {
|
||||
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
|
||||
@ -564,9 +599,13 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
||||
}
|
||||
|
||||
// Consider missing snap as already deleted, and proceed to remove the omap values,
|
||||
// safeguarding against parallel create or delete requests against the same name.
|
||||
idLk := snapshotNameLocker.Lock(rbdSnap.RequestName)
|
||||
defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName)
|
||||
// safeguarding against parallel create or delete requests against the
|
||||
// same name.
|
||||
if acquired := cs.SnapshotLocks.TryAcquire(rbdSnap.RequestName); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), rbdSnap.RequestName)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdSnap.RequestName)
|
||||
}
|
||||
defer cs.SnapshotLocks.Release(rbdSnap.RequestName)
|
||||
|
||||
if err = undoSnapReservation(ctx, rbdSnap, cr); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
@ -574,9 +613,13 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
||||
return &csi.DeleteSnapshotResponse{}, nil
|
||||
}
|
||||
|
||||
// safeguard against parallel create or delete requests against the same name
|
||||
idLk := snapshotNameLocker.Lock(rbdSnap.RequestName)
|
||||
defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName)
|
||||
// safeguard against parallel create or delete requests against the same
|
||||
// name
|
||||
if acquired := cs.SnapshotLocks.TryAcquire(rbdSnap.RequestName); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), rbdSnap.RequestName)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdSnap.RequestName)
|
||||
}
|
||||
defer cs.SnapshotLocks.Release(rbdSnap.RequestName)
|
||||
|
||||
// Unprotect snapshot
|
||||
err = unprotectSnapshot(ctx, rbdSnap, cr)
|
||||
|
@ -74,6 +74,8 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
|
||||
return &ControllerServer{
|
||||
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
|
||||
MetadataStore: cachePersister,
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
SnapshotLocks: util.NewVolumeLocks(),
|
||||
}
|
||||
}
|
||||
|
||||
@ -90,6 +92,7 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool, t string) (*NodeS
|
||||
return &NodeServer{
|
||||
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t),
|
||||
mounter: mounter,
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -37,6 +37,9 @@ import (
|
||||
type NodeServer struct {
|
||||
*csicommon.DefaultNodeServer
|
||||
mounter mount.Interface
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same volume (as defined by VolumeID) return an Aborted error
|
||||
VolumeLocks *util.VolumeLocks
|
||||
}
|
||||
|
||||
// NodeStageVolume mounts the volume to a staging path on the node.
|
||||
@ -77,12 +80,18 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
}
|
||||
defer cr.DeleteCredentials()
|
||||
|
||||
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(volID)
|
||||
|
||||
isLegacyVolume := false
|
||||
volName, err := getVolumeName(req.GetVolumeId())
|
||||
volName, err := getVolumeName(volID)
|
||||
if err != nil {
|
||||
// error ErrInvalidVolID may mean this is an 1.0.0 version volume, check for name
|
||||
// pattern match in addition to error to ensure this is a likely v1.0.0 volume
|
||||
if _, ok := err.(ErrInvalidVolID); !ok || !isLegacyVolumeID(req.GetVolumeId()) {
|
||||
if _, ok := err.(ErrInvalidVolID); !ok || !isLegacyVolumeID(volID) {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
@ -94,10 +103,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
||||
}
|
||||
|
||||
stagingParentPath := req.GetStagingTargetPath()
|
||||
stagingTargetPath := stagingParentPath + "/" + req.GetVolumeId()
|
||||
|
||||
idLk := nodeVolumeIDLocker.Lock(volID)
|
||||
defer nodeVolumeIDLocker.Unlock(idLk, volID)
|
||||
stagingTargetPath := stagingParentPath + "/" + volID
|
||||
|
||||
var isNotMnt bool
|
||||
// check if stagingPath is already mounted
|
||||
@ -238,10 +244,14 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
targetPath := req.GetTargetPath()
|
||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||
stagingPath := req.GetStagingTargetPath()
|
||||
stagingPath += "/" + req.GetVolumeId()
|
||||
volID := req.GetVolumeId()
|
||||
stagingPath += "/" + volID
|
||||
|
||||
idLk := targetPathLocker.Lock(targetPath)
|
||||
defer targetPathLocker.Unlock(idLk, targetPath)
|
||||
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(volID)
|
||||
|
||||
// Check if that target path exists properly
|
||||
notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock)
|
||||
@ -378,6 +388,14 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
}
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
volID := req.GetVolumeId()
|
||||
|
||||
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(volID)
|
||||
|
||||
notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
@ -414,6 +432,14 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volID := req.GetVolumeId()
|
||||
|
||||
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
|
||||
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
|
||||
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(volID)
|
||||
|
||||
stagingParentPath := req.GetStagingTargetPath()
|
||||
stagingTargetPath := stagingParentPath + "/" + req.GetVolumeId()
|
||||
|
||||
|
@ -100,17 +100,6 @@ type rbdSnapshot struct {
|
||||
}
|
||||
|
||||
var (
|
||||
// serializes operations based on "volume name" as key
|
||||
volumeNameLocker = util.NewIDLocker()
|
||||
// serializes operations based on "snapshot name" as key
|
||||
snapshotNameLocker = util.NewIDLocker()
|
||||
// serializes delete operations on legacy volumes
|
||||
legacyVolumeIDLocker = util.NewIDLocker()
|
||||
// serializes operations based on "mount staging path" as key
|
||||
nodeVolumeIDLocker = util.NewIDLocker()
|
||||
// serializes operations based on "mount target path" as key
|
||||
targetPathLocker = util.NewIDLocker()
|
||||
|
||||
supportedFeatures = sets.NewString("layering")
|
||||
)
|
||||
|
||||
|
@ -1,12 +1,9 @@
|
||||
/*
|
||||
Copyright 2019 ceph-csi authors.
|
||||
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
@ -18,60 +15,46 @@ package util
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
/*
|
||||
IDLock is a per identifier lock with a use counter that retains a number of users of the lock.
|
||||
IDLocker is a map of IDLocks holding the IDLocks based on a passed in identifier.
|
||||
Typical usage (post creating an IDLocker) is to Lock/Unlock based on identifiers as per the API.
|
||||
*/
|
||||
type (
|
||||
IDLock struct {
|
||||
mtx sync.Mutex
|
||||
useCount int
|
||||
}
|
||||
const (
|
||||
// VolumeOperationAlreadyExistsFmt string format to return for concerrent operation
|
||||
VolumeOperationAlreadyExistsFmt = "an operation with the given Volume ID %s already exists"
|
||||
|
||||
IDLocker struct {
|
||||
lMutex sync.Mutex
|
||||
lMap map[string]*IDLock
|
||||
}
|
||||
// SnapshotOperationAlreadyExistsFmt string format to return for concerrent operation
|
||||
SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists"
|
||||
)
|
||||
|
||||
func NewIDLocker() *IDLocker {
|
||||
return &IDLocker{
|
||||
lMap: make(map[string]*IDLock),
|
||||
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
|
||||
// with an ongoing operation.
|
||||
type VolumeLocks struct {
|
||||
locks sets.String
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
// NewVolumeLocks returns new VolumeLocks
|
||||
func NewVolumeLocks() *VolumeLocks {
|
||||
return &VolumeLocks{
|
||||
locks: sets.NewString(),
|
||||
}
|
||||
}
|
||||
|
||||
func (lkr *IDLocker) Lock(identifier string) *IDLock {
|
||||
var (
|
||||
lk *IDLock
|
||||
ok bool
|
||||
)
|
||||
|
||||
newlk := new(IDLock)
|
||||
|
||||
lkr.lMutex.Lock()
|
||||
|
||||
if lk, ok = lkr.lMap[identifier]; !ok {
|
||||
lk = newlk
|
||||
lkr.lMap[identifier] = lk
|
||||
// TryAcquire tries to acquire the lock for operating on volumeID and returns true if successful.
|
||||
// If another operation is already using volumeID, returns false.
|
||||
func (vl *VolumeLocks) TryAcquire(volumeID string) bool {
|
||||
vl.mux.Lock()
|
||||
defer vl.mux.Unlock()
|
||||
if vl.locks.Has(volumeID) {
|
||||
return false
|
||||
}
|
||||
lk.useCount++
|
||||
lkr.lMutex.Unlock()
|
||||
|
||||
lk.mtx.Lock()
|
||||
|
||||
return lk
|
||||
vl.locks.Insert(volumeID)
|
||||
return true
|
||||
}
|
||||
|
||||
func (lkr *IDLocker) Unlock(lk *IDLock, identifier string) {
|
||||
lk.mtx.Unlock()
|
||||
|
||||
lkr.lMutex.Lock()
|
||||
lk.useCount--
|
||||
if lk.useCount == 0 {
|
||||
delete(lkr.lMap, identifier)
|
||||
}
|
||||
lkr.lMutex.Unlock()
|
||||
func (vl *VolumeLocks) Release(volumeID string) {
|
||||
vl.mux.Lock()
|
||||
defer vl.mux.Unlock()
|
||||
vl.locks.Delete(volumeID)
|
||||
}
|
||||
|
@ -22,17 +22,31 @@ import (
|
||||
|
||||
// very basic tests for the moment
|
||||
func TestIDLocker(t *testing.T) {
|
||||
myIDLocker := NewIDLocker()
|
||||
fakeID := "fake-id"
|
||||
locks := NewVolumeLocks()
|
||||
// acquire lock for fake-id
|
||||
ok := locks.TryAcquire(fakeID)
|
||||
|
||||
lk1 := myIDLocker.Lock("lk1")
|
||||
lk2 := myIDLocker.Lock("lk2")
|
||||
lk3 := myIDLocker.Lock("lk3")
|
||||
|
||||
if lk1 == lk2 || lk2 == lk3 || lk3 == lk1 {
|
||||
t.Errorf("Failed: lock variables clash when they should not!")
|
||||
if !ok {
|
||||
t.Errorf("TryAcquire failed: want (%v), got (%v)",
|
||||
true, ok)
|
||||
}
|
||||
|
||||
myIDLocker.Unlock(lk1, "lk1")
|
||||
myIDLocker.Unlock(lk2, "lk2")
|
||||
myIDLocker.Unlock(lk3, "lk3")
|
||||
// try to acquire lock again for fake-id, as lock is already present
|
||||
// it should fail
|
||||
ok = locks.TryAcquire(fakeID)
|
||||
|
||||
if ok {
|
||||
t.Errorf("TryAcquire failed: want (%v), got (%v)",
|
||||
false, ok)
|
||||
}
|
||||
|
||||
// release the lock for fake-id and try to get lock again, it should pass
|
||||
locks.Release(fakeID)
|
||||
ok = locks.TryAcquire(fakeID)
|
||||
|
||||
if !ok {
|
||||
t.Errorf("TryAcquire failed: want (%v), got (%v)",
|
||||
true, ok)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user