Change the logic of locking

if any on going opearation is seen,we
have to return Abort error message

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2019-09-12 10:23:37 +05:30 committed by mergify[bot]
parent 18151e9ca8
commit 6aac399075
9 changed files with 214 additions and 122 deletions

View File

@ -33,6 +33,9 @@ import (
type ControllerServer struct { type ControllerServer struct {
*csicommon.DefaultControllerServer *csicommon.DefaultControllerServer
MetadataStore util.CachePersister MetadataStore util.CachePersister
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID/volume name) return an Aborted error
VolumeLocks *util.VolumeLocks
} }
type controllerCacheEntry struct { type controllerCacheEntry struct {
@ -40,11 +43,6 @@ type controllerCacheEntry struct {
VolumeID volumeID VolumeID volumeID
} }
var (
volumeIDLocker = util.NewIDLocker()
volumeNameLocker = util.NewIDLocker()
)
// createBackingVolume creates the backing subvolume and on any error cleans up any created entities // createBackingVolume creates the backing subvolume and on any error cleans up any created entities
func (cs *ControllerServer) createBackingVolume(ctx context.Context, volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error { func (cs *ControllerServer) createBackingVolume(ctx context.Context, volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error {
cr, err := util.NewAdminCredentials(secret) cr, err := util.NewAdminCredentials(secret)
@ -78,6 +76,14 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
// Configuration // Configuration
secret := req.GetSecrets() secret := req.GetSecrets()
requestName := req.GetName() requestName := req.GetName()
// Existence and conflict checks
if acquired := cs.VolumeLocks.TryAcquire(requestName); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), requestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, requestName)
}
defer cs.VolumeLocks.Release(requestName)
volOptions, err := newVolumeOptions(ctx, requestName, req.GetCapacityRange().GetRequiredBytes(), volOptions, err := newVolumeOptions(ctx, requestName, req.GetCapacityRange().GetRequiredBytes(),
req.GetParameters(), secret) req.GetParameters(), secret)
if err != nil { if err != nil {
@ -85,10 +91,6 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
// Existence and conflict checks
idLk := volumeNameLocker.Lock(requestName)
defer volumeNameLocker.Unlock(idLk, requestName)
vID, err := checkVolExists(ctx, volOptions, secret) vID, err := checkVolExists(ctx, volOptions, secret)
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -177,8 +179,11 @@ func (cs *ControllerServer) deleteVolumeDeprecated(ctx context.Context, req *csi
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
idLk := volumeIDLocker.Lock(string(volID)) if acquired := cs.VolumeLocks.TryAcquire(string(volID)); !acquired {
defer volumeIDLocker.Unlock(idLk, string(volID)) klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, string(volID))
}
defer cs.VolumeLocks.Release(string(volID))
if err = purgeVolumeDeprecated(ctx, volID, cr, &ce.VolOptions); err != nil { if err = purgeVolumeDeprecated(ctx, volID, cr, &ce.VolOptions); err != nil {
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err) klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err)
@ -209,6 +214,13 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
volID := volumeID(req.GetVolumeId()) volID := volumeID(req.GetVolumeId())
secrets := req.GetSecrets() secrets := req.GetSecrets()
// lock out parallel delete operations
if acquired := cs.VolumeLocks.TryAcquire(string(volID)); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, string(volID))
}
defer cs.VolumeLocks.Release(string(volID))
// Find the volume using the provided VolumeID // Find the volume using the provided VolumeID
volOptions, vID, err := newVolumeOptionsFromVolID(ctx, string(volID), nil, secrets) volOptions, vID, err := newVolumeOptionsFromVolID(ctx, string(volID), nil, secrets)
if err != nil { if err != nil {
@ -227,6 +239,13 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
// lock out parallel delete and create requests against the same volume name as we
// cleanup the subvolume and associated omaps for the same
if acquired := cs.VolumeLocks.TryAcquire(volOptions.RequestName); !acquired {
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volOptions.RequestName)
}
defer cs.VolumeLocks.Release(string(volID))
// Deleting a volume requires admin credentials // Deleting a volume requires admin credentials
cr, err := util.NewAdminCredentials(secrets) cr, err := util.NewAdminCredentials(secrets)
if err != nil { if err != nil {
@ -235,11 +254,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
// lock out parallel delete and create requests against the same volume name as we
// cleanup the subvolume and associated omaps for the same
idLk := volumeNameLocker.Lock(volOptions.RequestName)
defer volumeNameLocker.Unlock(idLk, volOptions.RequestName)
if err = purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions); err != nil { if err = purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err) klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), volID, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())

View File

@ -76,6 +76,7 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
return &ControllerServer{ return &ControllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d), DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
MetadataStore: cachePersister, MetadataStore: cachePersister,
VolumeLocks: util.NewVolumeLocks(),
} }
} }
@ -83,6 +84,7 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
func NewNodeServer(d *csicommon.CSIDriver, t string) *NodeServer { func NewNodeServer(d *csicommon.CSIDriver, t string) *NodeServer {
return &NodeServer{ return &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t), DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t),
VolumeLocks: util.NewVolumeLocks(),
} }
} }

View File

@ -34,12 +34,11 @@ import (
// node server spec. // node server spec.
type NodeServer struct { type NodeServer struct {
*csicommon.DefaultNodeServer *csicommon.DefaultNodeServer
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
VolumeLocks *util.VolumeLocks
} }
var (
nodeVolumeIDLocker = util.NewIDLocker()
)
func getCredentialsForVolume(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) { func getCredentialsForVolume(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) {
var ( var (
err error err error
@ -80,6 +79,12 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
stagingTargetPath := req.GetStagingTargetPath() stagingTargetPath := req.GetStagingTargetPath()
volID := volumeID(req.GetVolumeId()) volID := volumeID(req.GetVolumeId())
if acquired := ns.VolumeLocks.TryAcquire(req.GetVolumeId()); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetVolumeId())
}
defer ns.VolumeLocks.Release(req.GetVolumeId())
volOptions, _, err := newVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets()) volOptions, _, err := newVolumeOptionsFromVolID(ctx, string(volID), req.GetVolumeContext(), req.GetSecrets())
if err != nil { if err != nil {
if _, ok := err.(ErrInvalidVolID); !ok { if _, ok := err.(ErrInvalidVolID); !ok {
@ -102,9 +107,6 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
} }
} }
idLk := nodeVolumeIDLocker.Lock(string(volID))
defer nodeVolumeIDLocker.Unlock(idLk, string(volID))
// Check if the volume is already mounted // Check if the volume is already mounted
isMnt, err := util.IsMountPoint(stagingTargetPath) isMnt, err := util.IsMountPoint(stagingTargetPath)
@ -167,11 +169,15 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
return nil, err return nil, err
} }
// Configuration
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
volID := req.GetVolumeId() volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
if err := util.CreateMountPoint(targetPath); err != nil { if err := util.CreateMountPoint(targetPath); err != nil {
klog.Errorf(util.Log(ctx, "failed to create mount point at %s: %v"), targetPath, err) klog.Errorf(util.Log(ctx, "failed to create mount point at %s: %v"), targetPath, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -243,9 +249,15 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, err return nil, err
} }
volID := req.GetVolumeId()
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
volID := req.GetVolumeId() if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
if err = volumeMountCache.nodeUnPublishVolume(ctx, volID, targetPath); err != nil { if err = volumeMountCache.nodeUnPublishVolume(ctx, volID, targetPath); err != nil {
klog.Warningf(util.Log(ctx, "mount-cache: failed to unpublish volume %s %s: %v"), volID, targetPath, err) klog.Warningf(util.Log(ctx, "mount-cache: failed to unpublish volume %s %s: %v"), volID, targetPath, err)
} }
@ -271,9 +283,15 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, err return nil, err
} }
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
stagingTargetPath := req.GetStagingTargetPath() stagingTargetPath := req.GetStagingTargetPath()
volID := req.GetVolumeId()
if err = volumeMountCache.nodeUnStageVolume(volID); err != nil { if err = volumeMountCache.nodeUnStageVolume(volID); err != nil {
klog.Warningf(util.Log(ctx, "mount-cache: failed to unstage volume %s %s: %v"), volID, stagingTargetPath, err) klog.Warningf(util.Log(ctx, "mount-cache: failed to unstage volume %s %s: %v"), volID, stagingTargetPath, err)
} }

View File

@ -39,6 +39,13 @@ const (
type ControllerServer struct { type ControllerServer struct {
*csicommon.DefaultControllerServer *csicommon.DefaultControllerServer
MetadataStore util.CachePersister MetadataStore util.CachePersister
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID/volume name) return an Aborted error
VolumeLocks *util.VolumeLocks
// A map storing all volumes with ongoing operations so that additional operations
// for that same snapshot (as defined by SnapshotID/snapshot name) return an Aborted error
SnapshotLocks *util.VolumeLocks
} }
func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error { func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.CreateVolumeRequest) error {
@ -127,8 +134,12 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, err return nil, err
} }
idLk := volumeNameLocker.Lock(req.GetName()) // Existence and conflict checks
defer volumeNameLocker.Unlock(idLk, req.GetName()) if acquired := cs.VolumeLocks.TryAcquire(req.GetName()); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), req.GetName())
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetName())
}
defer cs.VolumeLocks.Release(req.GetName())
found, err := checkVolExists(ctx, rbdVol, cr) found, err := checkVolExists(ctx, rbdVol, cr)
if err != nil { if err != nil {
@ -243,8 +254,11 @@ func (cs *ControllerServer) DeleteLegacyVolume(ctx context.Context, req *csi.Del
" proceed with deleting legacy volume ID (%s)", volumeID) " proceed with deleting legacy volume ID (%s)", volumeID)
} }
idLk := legacyVolumeIDLocker.Lock(volumeID) if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired {
defer legacyVolumeIDLocker.Unlock(idLk, volumeID) klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer cs.VolumeLocks.Release(volumeID)
rbdVol := &rbdVolume{} rbdVol := &rbdVolume{}
if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil { if err := cs.MetadataStore.Get(volumeID, rbdVol); err != nil {
@ -298,6 +312,12 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
} }
if acquired := cs.VolumeLocks.TryAcquire(volumeID); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volumeID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer cs.VolumeLocks.Release(volumeID)
rbdVol := &rbdVolume{} rbdVol := &rbdVolume{}
if err := genVolFromVolID(ctx, rbdVol, volumeID, cr); err != nil { if err := genVolFromVolID(ctx, rbdVol, volumeID, cr); err != nil {
// If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt // If error is ErrInvalidVolID it could be a version 1.0.0 or lower volume, attempt
@ -327,8 +347,11 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
// If error is ErrImageNotFound then we failed to find the image, but found the imageOMap // If error is ErrImageNotFound then we failed to find the image, but found the imageOMap
// to lead us to the image, hence the imageOMap needs to be garbage collected, by calling // to lead us to the image, hence the imageOMap needs to be garbage collected, by calling
// unreserve for the same // unreserve for the same
idLk := volumeNameLocker.Lock(rbdVol.RequestName) if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired {
defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName) klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdVol.RequestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
}
defer cs.VolumeLocks.Release(rbdVol.RequestName)
if err := undoVolReservation(ctx, rbdVol, cr); err != nil { if err := undoVolReservation(ctx, rbdVol, cr); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -338,8 +361,11 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
// lock out parallel create requests against the same volume name as we // lock out parallel create requests against the same volume name as we
// cleanup the image and associated omaps for the same // cleanup the image and associated omaps for the same
idLk := volumeNameLocker.Lock(rbdVol.RequestName) if acquired := cs.VolumeLocks.TryAcquire(rbdVol.RequestName); !acquired {
defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName) klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), rbdVol.RequestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdVol.RequestName)
}
defer cs.VolumeLocks.Release(rbdVol.RequestName)
// Deleting rbd image // Deleting rbd image
klog.V(4).Infof(util.Log(ctx, "deleting image %s"), rbdVol.RbdImageName) klog.V(4).Infof(util.Log(ctx, "deleting image %s"), rbdVol.RbdImageName)
@ -417,8 +443,11 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
rbdSnap.SourceVolumeID = req.GetSourceVolumeId() rbdSnap.SourceVolumeID = req.GetSourceVolumeId()
rbdSnap.RequestName = req.GetName() rbdSnap.RequestName = req.GetName()
idLk := snapshotNameLocker.Lock(req.GetName()) if acquired := cs.SnapshotLocks.TryAcquire(req.GetName()); !acquired {
defer snapshotNameLocker.Unlock(idLk, req.GetName()) klog.Infof(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), req.GetName())
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, req.GetName())
}
defer cs.SnapshotLocks.Release(req.GetName())
// Need to check for already existing snapshot name, and if found // Need to check for already existing snapshot name, and if found
// check for the requested source volume id and already allocated source volume id // check for the requested source volume id and already allocated source volume id
@ -553,6 +582,12 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty") return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty")
} }
if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired {
klog.Infof(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), snapshotID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, snapshotID)
}
defer cs.SnapshotLocks.Release(snapshotID)
rbdSnap := &rbdSnapshot{} rbdSnap := &rbdSnapshot{}
if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil { if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil {
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete // if error is ErrKeyNotFound, then a previous attempt at deletion was complete
@ -568,9 +603,13 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
} }
// Consider missing snap as already deleted, and proceed to remove the omap values, // Consider missing snap as already deleted, and proceed to remove the omap values,
// safeguarding against parallel create or delete requests against the same name. // safeguarding against parallel create or delete requests against the
idLk := snapshotNameLocker.Lock(rbdSnap.RequestName) // same name.
defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName) if acquired := cs.SnapshotLocks.TryAcquire(rbdSnap.RequestName); !acquired {
klog.Infof(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), rbdSnap.RequestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdSnap.RequestName)
}
defer cs.SnapshotLocks.Release(rbdSnap.RequestName)
if err = undoSnapReservation(ctx, rbdSnap, cr); err != nil { if err = undoSnapReservation(ctx, rbdSnap, cr); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -578,9 +617,13 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
return &csi.DeleteSnapshotResponse{}, nil return &csi.DeleteSnapshotResponse{}, nil
} }
// safeguard against parallel create or delete requests against the same name // safeguard against parallel create or delete requests against the same
idLk := snapshotNameLocker.Lock(rbdSnap.RequestName) // name
defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName) if acquired := cs.SnapshotLocks.TryAcquire(rbdSnap.RequestName); !acquired {
klog.Infof(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), rbdSnap.RequestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, rbdSnap.RequestName)
}
defer cs.SnapshotLocks.Release(rbdSnap.RequestName)
// Unprotect snapshot // Unprotect snapshot
err = unprotectSnapshot(ctx, rbdSnap, cr) err = unprotectSnapshot(ctx, rbdSnap, cr)

View File

@ -74,6 +74,8 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
return &ControllerServer{ return &ControllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d), DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
MetadataStore: cachePersister, MetadataStore: cachePersister,
VolumeLocks: util.NewVolumeLocks(),
SnapshotLocks: util.NewVolumeLocks(),
} }
} }
@ -90,6 +92,7 @@ func NewNodeServer(d *csicommon.CSIDriver, containerized bool, t string) (*NodeS
return &NodeServer{ return &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t), DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t),
mounter: mounter, mounter: mounter,
VolumeLocks: util.NewVolumeLocks(),
}, nil }, nil
} }

View File

@ -37,6 +37,9 @@ import (
type NodeServer struct { type NodeServer struct {
*csicommon.DefaultNodeServer *csicommon.DefaultNodeServer
mounter mount.Interface mounter mount.Interface
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
VolumeLocks *util.VolumeLocks
} }
// NodeStageVolume mounts the volume to a staging path on the node. // NodeStageVolume mounts the volume to a staging path on the node.
@ -77,12 +80,18 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
isLegacyVolume := false isLegacyVolume := false
volName, err := getVolumeName(req.GetVolumeId()) volName, err := getVolumeName(volID)
if err != nil { if err != nil {
// error ErrInvalidVolID may mean this is an 1.0.0 version volume, check for name // error ErrInvalidVolID may mean this is an 1.0.0 version volume, check for name
// pattern match in addition to error to ensure this is a likely v1.0.0 volume // pattern match in addition to error to ensure this is a likely v1.0.0 volume
if _, ok := err.(ErrInvalidVolID); !ok || !isLegacyVolumeID(req.GetVolumeId()) { if _, ok := err.(ErrInvalidVolID); !ok || !isLegacyVolumeID(volID) {
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
@ -94,10 +103,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
} }
stagingParentPath := req.GetStagingTargetPath() stagingParentPath := req.GetStagingTargetPath()
stagingTargetPath := stagingParentPath + "/" + req.GetVolumeId() stagingTargetPath := stagingParentPath + "/" + volID
idLk := nodeVolumeIDLocker.Lock(volID)
defer nodeVolumeIDLocker.Unlock(idLk, volID)
var isNotMnt bool var isNotMnt bool
// check if stagingPath is already mounted // check if stagingPath is already mounted
@ -238,10 +244,14 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
isBlock := req.GetVolumeCapability().GetBlock() != nil isBlock := req.GetVolumeCapability().GetBlock() != nil
stagingPath := req.GetStagingTargetPath() stagingPath := req.GetStagingTargetPath()
stagingPath += "/" + req.GetVolumeId() volID := req.GetVolumeId()
stagingPath += "/" + volID
idLk := targetPathLocker.Lock(targetPath) if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
defer targetPathLocker.Unlock(idLk, targetPath) klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
// Check if that target path exists properly // Check if that target path exists properly
notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock) notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock)
@ -378,6 +388,14 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
} }
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath) notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -414,6 +432,14 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, err return nil, err
} }
volID := req.GetVolumeId()
if acquired := ns.VolumeLocks.TryAcquire(volID); !acquired {
klog.Infof(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), volID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volID)
}
defer ns.VolumeLocks.Release(volID)
stagingParentPath := req.GetStagingTargetPath() stagingParentPath := req.GetStagingTargetPath()
stagingTargetPath := stagingParentPath + "/" + req.GetVolumeId() stagingTargetPath := stagingParentPath + "/" + req.GetVolumeId()

View File

@ -101,17 +101,6 @@ type rbdSnapshot struct {
} }
var ( var (
// serializes operations based on "volume name" as key
volumeNameLocker = util.NewIDLocker()
// serializes operations based on "snapshot name" as key
snapshotNameLocker = util.NewIDLocker()
// serializes delete operations on legacy volumes
legacyVolumeIDLocker = util.NewIDLocker()
// serializes operations based on "mount staging path" as key
nodeVolumeIDLocker = util.NewIDLocker()
// serializes operations based on "mount target path" as key
targetPathLocker = util.NewIDLocker()
supportedFeatures = sets.NewString("layering") supportedFeatures = sets.NewString("layering")
) )

View File

@ -1,12 +1,9 @@
/* /*
Copyright 2019 ceph-csi authors. Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -18,60 +15,46 @@ package util
import ( import (
"sync" "sync"
"k8s.io/apimachinery/pkg/util/sets"
) )
/* const (
IDLock is a per identifier lock with a use counter that retains a number of users of the lock. // VolumeOperationAlreadyExistsFmt string format to return for concerrent operation
IDLocker is a map of IDLocks holding the IDLocks based on a passed in identifier. VolumeOperationAlreadyExistsFmt = "an operation with the given Volume ID %s already exists"
Typical usage (post creating an IDLocker) is to Lock/Unlock based on identifiers as per the API.
*/
type (
IDLock struct {
mtx sync.Mutex
useCount int
}
IDLocker struct { // SnapshotOperationAlreadyExistsFmt string format to return for concerrent operation
lMutex sync.Mutex SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists"
lMap map[string]*IDLock
}
) )
func NewIDLocker() *IDLocker { // VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
return &IDLocker{ // with an ongoing operation.
lMap: make(map[string]*IDLock), type VolumeLocks struct {
locks sets.String
mux sync.Mutex
}
// NewVolumeLocks returns new VolumeLocks
func NewVolumeLocks() *VolumeLocks {
return &VolumeLocks{
locks: sets.NewString(),
} }
} }
func (lkr *IDLocker) Lock(identifier string) *IDLock { // TryAcquire tries to acquire the lock for operating on volumeID and returns true if successful.
var ( // If another operation is already using volumeID, returns false.
lk *IDLock func (vl *VolumeLocks) TryAcquire(volumeID string) bool {
ok bool vl.mux.Lock()
) defer vl.mux.Unlock()
if vl.locks.Has(volumeID) {
newlk := new(IDLock) return false
lkr.lMutex.Lock()
if lk, ok = lkr.lMap[identifier]; !ok {
lk = newlk
lkr.lMap[identifier] = lk
} }
lk.useCount++ vl.locks.Insert(volumeID)
lkr.lMutex.Unlock() return true
lk.mtx.Lock()
return lk
} }
func (lkr *IDLocker) Unlock(lk *IDLock, identifier string) { func (vl *VolumeLocks) Release(volumeID string) {
lk.mtx.Unlock() vl.mux.Lock()
defer vl.mux.Unlock()
lkr.lMutex.Lock() vl.locks.Delete(volumeID)
lk.useCount--
if lk.useCount == 0 {
delete(lkr.lMap, identifier)
}
lkr.lMutex.Unlock()
} }

View File

@ -22,17 +22,31 @@ import (
// very basic tests for the moment // very basic tests for the moment
func TestIDLocker(t *testing.T) { func TestIDLocker(t *testing.T) {
myIDLocker := NewIDLocker() fakeID := "fake-id"
locks := NewVolumeLocks()
// acquire lock for fake-id
ok := locks.TryAcquire(fakeID)
lk1 := myIDLocker.Lock("lk1") if !ok {
lk2 := myIDLocker.Lock("lk2") t.Errorf("TryAcquire failed: want (%v), got (%v)",
lk3 := myIDLocker.Lock("lk3") true, ok)
if lk1 == lk2 || lk2 == lk3 || lk3 == lk1 {
t.Errorf("Failed: lock variables clash when they should not!")
} }
myIDLocker.Unlock(lk1, "lk1") // try to acquire lock again for fake-id, as lock is already present
myIDLocker.Unlock(lk2, "lk2") // it should fail
myIDLocker.Unlock(lk3, "lk3") ok = locks.TryAcquire(fakeID)
if ok {
t.Errorf("TryAcquire failed: want (%v), got (%v)",
false, ok)
}
// release the lock for fake-id and try to get lock again, it should pass
locks.Release(fakeID)
ok = locks.TryAcquire(fakeID)
if !ok {
t.Errorf("TryAcquire failed: want (%v), got (%v)",
true, ok)
}
} }