mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-10 00:10:20 +00:00
Move locks to more granular locking than CPU count based
As detailed in issue #279, current lock scheme has hash buckets that are count of CPUs. This causes a lot of contention when parallel requests are made to the CSI plugin. To reduce lock contention, this commit introduces granular locks per identifier. The commit also changes the timeout for gRPC requests to Create and Delete volumes, as the current timeout is 10s (kubernetes documentation says 15s but code defaults are 10s). A virtual setup takes about 12-15s to complete a request at times, that leads to unwanted retries of the same request, hence the increased timeout to enable operation completion with minimal retries. Tests to create PVCs before and after these changes look like so, Before: Default master code + sidecar provisioner --timeout option set to 30 seconds 20 PVCs Creation: 3 runs, 396/391/400 seconds Deletion: 3 runs, 218/271/118 seconds - Once was stalled for more than 8 minutes and cancelled the run After: Current commit + sidecar provisioner --timeout option set to 30 sec 20 PVCs Creation: 3 runs, 42/59/65 seconds Deletion: 3 runs, 32/32/31 seconds Fixes: #279 Signed-off-by: ShyamsundarR <srangana@redhat.com>
This commit is contained in:
parent
bc39c523b7
commit
c4a3675cec
@ -32,6 +32,8 @@ spec:
|
|||||||
args:
|
args:
|
||||||
- "--csi-address=$(ADDRESS)"
|
- "--csi-address=$(ADDRESS)"
|
||||||
- "--v=5"
|
- "--v=5"
|
||||||
|
- "--timeout=60s"
|
||||||
|
- "--retry-interval-start=500ms"
|
||||||
env:
|
env:
|
||||||
- name: ADDRESS
|
- name: ADDRESS
|
||||||
value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}"
|
value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}"
|
||||||
|
@ -35,6 +35,8 @@ spec:
|
|||||||
args:
|
args:
|
||||||
- "--csi-address=$(ADDRESS)"
|
- "--csi-address=$(ADDRESS)"
|
||||||
- "--v=5"
|
- "--v=5"
|
||||||
|
- "--timeout=60s"
|
||||||
|
- "--retry-interval-start=500ms"
|
||||||
env:
|
env:
|
||||||
- name: ADDRESS
|
- name: ADDRESS
|
||||||
value: unix:///csi/csi-provisioner.sock
|
value: unix:///csi/csi-provisioner.sock
|
||||||
|
@ -32,6 +32,8 @@ spec:
|
|||||||
args:
|
args:
|
||||||
- "--csi-address=$(ADDRESS)"
|
- "--csi-address=$(ADDRESS)"
|
||||||
- "--v=5"
|
- "--v=5"
|
||||||
|
- "--timeout=60s"
|
||||||
|
- "--retry-interval-start=500ms"
|
||||||
env:
|
env:
|
||||||
- name: ADDRESS
|
- name: ADDRESS
|
||||||
value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}"
|
value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}"
|
||||||
@ -46,8 +48,8 @@ spec:
|
|||||||
imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }}
|
imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }}
|
||||||
args:
|
args:
|
||||||
- "--csi-address=$(ADDRESS)"
|
- "--csi-address=$(ADDRESS)"
|
||||||
- "--connection-timeout=15s"
|
|
||||||
- "--v=5"
|
- "--v=5"
|
||||||
|
- "--timeout=60s"
|
||||||
env:
|
env:
|
||||||
- name: ADDRESS
|
- name: ADDRESS
|
||||||
value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}"
|
value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}"
|
||||||
|
@ -35,6 +35,8 @@ spec:
|
|||||||
args:
|
args:
|
||||||
- "--csi-address=$(ADDRESS)"
|
- "--csi-address=$(ADDRESS)"
|
||||||
- "--v=5"
|
- "--v=5"
|
||||||
|
- "--timeout=60s"
|
||||||
|
- "--retry-interval-start=500ms"
|
||||||
env:
|
env:
|
||||||
- name: ADDRESS
|
- name: ADDRESS
|
||||||
value: unix:///csi/csi-provisioner.sock
|
value: unix:///csi/csi-provisioner.sock
|
||||||
@ -46,8 +48,8 @@ spec:
|
|||||||
image: quay.io/k8scsi/csi-snapshotter:v1.1.0
|
image: quay.io/k8scsi/csi-snapshotter:v1.1.0
|
||||||
args:
|
args:
|
||||||
- "--csi-address=$(ADDRESS)"
|
- "--csi-address=$(ADDRESS)"
|
||||||
- "--connection-timeout=15s"
|
|
||||||
- "--v=5"
|
- "--v=5"
|
||||||
|
- "--timeout=60s"
|
||||||
env:
|
env:
|
||||||
- name: ADDRESS
|
- name: ADDRESS
|
||||||
value: unix:///csi/csi-provisioner.sock
|
value: unix:///csi/csi-provisioner.sock
|
||||||
|
@ -25,7 +25,6 @@ import (
|
|||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/utils/keymutex"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// ControllerServer struct of CEPH CSI driver with supported methods of CSI
|
// ControllerServer struct of CEPH CSI driver with supported methods of CSI
|
||||||
@ -41,8 +40,8 @@ type controllerCacheEntry struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mtxControllerVolumeID = keymutex.NewHashed(0)
|
volumeIDLocker = util.NewIDLocker()
|
||||||
mtxControllerVolumeName = keymutex.NewHashed(0)
|
volumeNameLocker = util.NewIDLocker()
|
||||||
)
|
)
|
||||||
|
|
||||||
// createBackingVolume creates the backing subvolume and user/key for the given volOptions and vID,
|
// createBackingVolume creates the backing subvolume and user/key for the given volOptions and vID,
|
||||||
@ -91,8 +90,8 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Existence and conflict checks
|
// Existence and conflict checks
|
||||||
mtxControllerVolumeName.LockKey(requestName)
|
idLk := volumeNameLocker.Lock(requestName)
|
||||||
defer mustUnlock(mtxControllerVolumeName, requestName)
|
defer volumeNameLocker.Unlock(idLk, requestName)
|
||||||
|
|
||||||
vID, err := checkVolExists(volOptions, secret)
|
vID, err := checkVolExists(volOptions, secret)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -181,8 +180,8 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest)
|
|||||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
mtxControllerVolumeID.LockKey(string(volID))
|
idLk := volumeIDLocker.Lock(string(volID))
|
||||||
defer mustUnlock(mtxControllerVolumeID, string(volID))
|
defer volumeIDLocker.Unlock(idLk, string(volID))
|
||||||
|
|
||||||
if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil {
|
if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil {
|
||||||
klog.Errorf("failed to delete volume %s: %v", volID, err)
|
klog.Errorf("failed to delete volume %s: %v", volID, err)
|
||||||
@ -240,8 +239,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
|
|
||||||
// lock out parallel delete and create requests against the same volume name as we
|
// lock out parallel delete and create requests against the same volume name as we
|
||||||
// cleanup the subvolume and associated omaps for the same
|
// cleanup the subvolume and associated omaps for the same
|
||||||
mtxControllerVolumeName.LockKey(volOptions.RequestName)
|
idLk := volumeNameLocker.Lock(volOptions.RequestName)
|
||||||
defer mustUnlock(mtxControllerVolumeName, volOptions.RequestName)
|
defer volumeNameLocker.Unlock(idLk, volOptions.RequestName)
|
||||||
|
|
||||||
if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
|
if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
|
||||||
klog.Errorf("failed to delete volume %s: %v", volID, err)
|
klog.Errorf("failed to delete volume %s: %v", volID, err)
|
||||||
|
@ -28,7 +28,6 @@ import (
|
|||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/utils/keymutex"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// NodeServer struct of ceph CSI driver with supported methods of CSI
|
// NodeServer struct of ceph CSI driver with supported methods of CSI
|
||||||
@ -38,7 +37,7 @@ type NodeServer struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
mtxNodeVolumeID = keymutex.NewHashed(0)
|
nodeVolumeIDLocker = util.NewIDLocker()
|
||||||
)
|
)
|
||||||
|
|
||||||
func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) {
|
func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) {
|
||||||
@ -121,8 +120,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
|
|||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
mtxNodeVolumeID.LockKey(string(volID))
|
idLk := nodeVolumeIDLocker.Lock(string(volID))
|
||||||
defer mustUnlock(mtxNodeVolumeID, string(volID))
|
defer nodeVolumeIDLocker.Unlock(idLk, string(volID))
|
||||||
|
|
||||||
// Check if the volume is already mounted
|
// Check if the volume is already mounted
|
||||||
|
|
||||||
|
@ -31,17 +31,10 @@ import (
|
|||||||
"github.com/ceph/ceph-csi/pkg/util"
|
"github.com/ceph/ceph-csi/pkg/util"
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"k8s.io/kubernetes/pkg/util/mount"
|
"k8s.io/kubernetes/pkg/util/mount"
|
||||||
"k8s.io/utils/keymutex"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type volumeID string
|
type volumeID string
|
||||||
|
|
||||||
func mustUnlock(m keymutex.KeyMutex, key string) {
|
|
||||||
if err := m.UnlockKey(key); err != nil {
|
|
||||||
klog.Fatalf("failed to unlock mutex for %s: %v", key, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func execCommand(program string, args ...string) (stdout, stderr []byte, err error) {
|
func execCommand(program string, args ...string) (stdout, stderr []byte, err error) {
|
||||||
var (
|
var (
|
||||||
cmd = exec.Command(program, args...) // nolint: gosec
|
cmd = exec.Command(program, args...) // nolint: gosec
|
||||||
|
@ -116,18 +116,14 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
|||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
volumeNameMutex.LockKey(req.GetName())
|
|
||||||
defer func() {
|
|
||||||
if err = volumeNameMutex.UnlockKey(req.GetName()); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex volume:%s %v", req.GetName(), err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
rbdVol, err := cs.parseVolCreateRequest(req)
|
rbdVol, err := cs.parseVolCreateRequest(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
idLk := volumeNameLocker.Lock(req.GetName())
|
||||||
|
defer volumeNameLocker.Unlock(idLk, req.GetName())
|
||||||
|
|
||||||
found, err := checkVolExists(rbdVol, cr)
|
found, err := checkVolExists(rbdVol, cr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(ErrVolNameConflict); ok {
|
if _, ok := err.(ErrVolNameConflict); ok {
|
||||||
@ -248,12 +244,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
if volumeID == "" {
|
if volumeID == "" {
|
||||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
||||||
}
|
}
|
||||||
volumeIDMutex.LockKey(volumeID)
|
|
||||||
defer func() {
|
|
||||||
if err := volumeIDMutex.UnlockKey(volumeID); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex volume:%s %v", volumeID, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
rbdVol := &rbdVolume{}
|
rbdVol := &rbdVolume{}
|
||||||
if err := genVolFromVolID(rbdVol, volumeID, cr); err != nil {
|
if err := genVolFromVolID(rbdVol, volumeID, cr); err != nil {
|
||||||
@ -272,12 +262,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
// If error is ErrImageNotFound then we failed to find the image, but found the imageOMap
|
// If error is ErrImageNotFound then we failed to find the image, but found the imageOMap
|
||||||
// to lead us to the image, hence the imageOMap needs to be garbage collected, by calling
|
// to lead us to the image, hence the imageOMap needs to be garbage collected, by calling
|
||||||
// unreserve for the same
|
// unreserve for the same
|
||||||
volumeNameMutex.LockKey(rbdVol.RequestName)
|
idLk := volumeNameLocker.Lock(rbdVol.RequestName)
|
||||||
defer func() {
|
defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName)
|
||||||
if err := volumeNameMutex.UnlockKey(rbdVol.RequestName); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex volume:%s %v", rbdVol.RequestName, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
if err := undoVolReservation(rbdVol, cr); err != nil {
|
if err := undoVolReservation(rbdVol, cr); err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
@ -287,12 +273,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
|
|
||||||
// lock out parallel create requests against the same volume name as we
|
// lock out parallel create requests against the same volume name as we
|
||||||
// cleanup the image and associated omaps for the same
|
// cleanup the image and associated omaps for the same
|
||||||
volumeNameMutex.LockKey(rbdVol.RequestName)
|
idLk := volumeNameLocker.Lock(rbdVol.RequestName)
|
||||||
defer func() {
|
defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName)
|
||||||
if err := volumeNameMutex.UnlockKey(rbdVol.RequestName); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex volume:%s %v", rbdVol.RequestName, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Deleting rbd image
|
// Deleting rbd image
|
||||||
klog.V(4).Infof("deleting image %s", rbdVol.RbdImageName)
|
klog.V(4).Infof("deleting image %s", rbdVol.RbdImageName)
|
||||||
@ -341,13 +323,6 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
|||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshotNameMutex.LockKey(req.GetName())
|
|
||||||
defer func() {
|
|
||||||
if err = snapshotNameMutex.UnlockKey(req.GetName()); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex snapshot:%s %v", req.GetName(), err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Fetch source volume information
|
// Fetch source volume information
|
||||||
rbdVol := new(rbdVolume)
|
rbdVol := new(rbdVolume)
|
||||||
err = genVolFromVolID(rbdVol, req.GetSourceVolumeId(), cr)
|
err = genVolFromVolID(rbdVol, req.GetSourceVolumeId(), cr)
|
||||||
@ -370,6 +345,9 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
|||||||
rbdSnap.SourceVolumeID = req.GetSourceVolumeId()
|
rbdSnap.SourceVolumeID = req.GetSourceVolumeId()
|
||||||
rbdSnap.RequestName = req.GetName()
|
rbdSnap.RequestName = req.GetName()
|
||||||
|
|
||||||
|
idLk := snapshotNameLocker.Lock(req.GetName())
|
||||||
|
defer snapshotNameLocker.Unlock(idLk, req.GetName())
|
||||||
|
|
||||||
// Need to check for already existing snapshot name, and if found
|
// Need to check for already existing snapshot name, and if found
|
||||||
// check for the requested source volume id and already allocated source volume id
|
// check for the requested source volume id and already allocated source volume id
|
||||||
found, err := checkSnapExists(rbdSnap, cr)
|
found, err := checkSnapExists(rbdSnap, cr)
|
||||||
@ -502,13 +480,6 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
|||||||
return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty")
|
return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
snapshotIDMutex.LockKey(snapshotID)
|
|
||||||
defer func() {
|
|
||||||
if err = snapshotIDMutex.UnlockKey(snapshotID); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex snapshot:%s %v", snapshotID, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
rbdSnap := &rbdSnapshot{}
|
rbdSnap := &rbdSnapshot{}
|
||||||
if err = genSnapFromSnapID(rbdSnap, snapshotID, cr); err != nil {
|
if err = genSnapFromSnapID(rbdSnap, snapshotID, cr); err != nil {
|
||||||
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
|
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
|
||||||
@ -518,24 +489,25 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
|||||||
return &csi.DeleteSnapshotResponse{}, nil
|
return &csi.DeleteSnapshotResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Consider missing snap as already deleted, and proceed to remove the omap values
|
// All errors other than ErrSnapNotFound should return an error back to the caller
|
||||||
if _, ok := err.(ErrSnapNotFound); !ok {
|
if _, ok := err.(ErrSnapNotFound); !ok {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Consider missing snap as already deleted, and proceed to remove the omap values,
|
||||||
|
// safeguarding against parallel create or delete requests against the same name.
|
||||||
|
idLk := snapshotNameLocker.Lock(rbdSnap.RequestName)
|
||||||
|
defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName)
|
||||||
|
|
||||||
if err = undoSnapReservation(rbdSnap, cr); err != nil {
|
if err = undoSnapReservation(rbdSnap, cr); err != nil {
|
||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
return &csi.DeleteSnapshotResponse{}, nil
|
return &csi.DeleteSnapshotResponse{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// lock out parallel create requests against the same snap name as we
|
// safeguard against parallel create or delete requests against the same name
|
||||||
// cleanup the image and associated omaps for the same
|
idLk := snapshotNameLocker.Lock(rbdSnap.RequestName)
|
||||||
snapshotNameMutex.LockKey(rbdSnap.RequestName)
|
defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName)
|
||||||
defer func() {
|
|
||||||
if err = snapshotNameMutex.UnlockKey(rbdSnap.RequestName); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex snapshot:%s %v", rbdSnap.RequestName, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Unprotect snapshot
|
// Unprotect snapshot
|
||||||
err = unprotectSnapshot(rbdSnap, cr)
|
err = unprotectSnapshot(rbdSnap, cr)
|
||||||
|
@ -62,18 +62,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
return nil, status.Error(codes.Internal, err.Error())
|
return nil, status.Error(codes.Internal, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
targetPathMutex.LockKey(targetPath)
|
idLk := targetPathLocker.Lock(targetPath)
|
||||||
defer func() {
|
defer targetPathLocker.Unlock(idLk, targetPath)
|
||||||
if err = targetPathMutex.UnlockKey(targetPath); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex targetpath:%s %v", targetPath, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
disableInUseChecks := false
|
|
||||||
|
|
||||||
volName, err := ns.getVolumeName(req)
|
disableInUseChecks := false
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
isBlock := req.GetVolumeCapability().GetBlock() != nil
|
||||||
// Check if that target path exists properly
|
// Check if that target path exists properly
|
||||||
@ -100,7 +92,13 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
volName, err := ns.getVolumeName(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
volOptions.RbdImageName = volName
|
volOptions.RbdImageName = volName
|
||||||
|
|
||||||
// Mapping RBD image
|
// Mapping RBD image
|
||||||
devicePath, err := attachRBDImage(volOptions, cr)
|
devicePath, err := attachRBDImage(volOptions, cr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -206,13 +204,8 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
|||||||
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
return nil, status.Error(codes.InvalidArgument, "empty volume ID in request")
|
||||||
}
|
}
|
||||||
|
|
||||||
targetPathMutex.LockKey(targetPath)
|
idLk := targetPathLocker.Lock(targetPath)
|
||||||
|
defer targetPathLocker.Unlock(idLk, targetPath)
|
||||||
defer func() {
|
|
||||||
if err := targetPathMutex.UnlockKey(targetPath); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex targetpath:%s %v", targetPath, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath)
|
notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -242,13 +242,8 @@ func attachRBDImage(volOptions *rbdVolume, cr *util.Credentials) (string, error)
|
|||||||
|
|
||||||
devicePath, found := waitForPath(volOptions.Pool, image, 1, useNBD)
|
devicePath, found := waitForPath(volOptions.Pool, image, 1, useNBD)
|
||||||
if !found {
|
if !found {
|
||||||
attachdetachMutex.LockKey(imagePath)
|
idLk := attachdetachLocker.Lock(imagePath)
|
||||||
|
defer attachdetachLocker.Unlock(idLk, imagePath)
|
||||||
defer func() {
|
|
||||||
if err = attachdetachMutex.UnlockKey(imagePath); err != nil {
|
|
||||||
klog.Warningf("failed to unlock mutex imagepath:%s %v", imagePath, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
_, err = execCommand("modprobe", []string{moduleName})
|
_, err = execCommand("modprobe", []string{moduleName})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -30,7 +30,6 @@ import (
|
|||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
"k8s.io/utils/keymutex"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -83,17 +82,13 @@ type rbdSnapshot struct {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
// serializes operations based on "<rbd pool>/<rbd image>" as key
|
// serializes operations based on "<rbd pool>/<rbd image>" as key
|
||||||
attachdetachMutex = keymutex.NewHashed(0)
|
attachdetachLocker = util.NewIDLocker()
|
||||||
// serializes operations based on "volume name" as key
|
// serializes operations based on "volume name" as key
|
||||||
volumeNameMutex = keymutex.NewHashed(0)
|
volumeNameLocker = util.NewIDLocker()
|
||||||
// serializes operations based on "volume id" as key
|
|
||||||
volumeIDMutex = keymutex.NewHashed(0)
|
|
||||||
// serializes operations based on "snapshot name" as key
|
// serializes operations based on "snapshot name" as key
|
||||||
snapshotNameMutex = keymutex.NewHashed(0)
|
snapshotNameLocker = util.NewIDLocker()
|
||||||
// serializes operations based on "snapshot id" as key
|
|
||||||
snapshotIDMutex = keymutex.NewHashed(0)
|
|
||||||
// serializes operations based on "mount target path" as key
|
// serializes operations based on "mount target path" as key
|
||||||
targetPathMutex = keymutex.NewHashed(0)
|
targetPathLocker = util.NewIDLocker()
|
||||||
|
|
||||||
supportedFeatures = sets.NewString("layering")
|
supportedFeatures = sets.NewString("layering")
|
||||||
)
|
)
|
||||||
|
77
pkg/util/idlocker.go
Normal file
77
pkg/util/idlocker.go
Normal file
@ -0,0 +1,77 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 ceph-csi authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
/*
|
||||||
|
IDLock is a per identifier lock with a use counter that retains a number of users of the lock.
|
||||||
|
IDLocker is a map of IDLocks holding the IDLocks based on a passed in identifier.
|
||||||
|
Typical usage (post creating an IDLocker) is to Lock/Unlock based on identifiers as per the API.
|
||||||
|
*/
|
||||||
|
type (
|
||||||
|
IDLock struct {
|
||||||
|
mtx sync.Mutex
|
||||||
|
useCount int
|
||||||
|
}
|
||||||
|
|
||||||
|
IDLocker struct {
|
||||||
|
lMutex sync.Mutex
|
||||||
|
lMap map[string]*IDLock
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewIDLocker() *IDLocker {
|
||||||
|
return &IDLocker{
|
||||||
|
lMap: make(map[string]*IDLock),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lkr *IDLocker) Lock(identifier string) *IDLock {
|
||||||
|
var (
|
||||||
|
lk *IDLock
|
||||||
|
ok bool
|
||||||
|
)
|
||||||
|
|
||||||
|
newlk := new(IDLock)
|
||||||
|
|
||||||
|
lkr.lMutex.Lock()
|
||||||
|
|
||||||
|
if lk, ok = lkr.lMap[identifier]; !ok {
|
||||||
|
lk = newlk
|
||||||
|
lkr.lMap[identifier] = lk
|
||||||
|
}
|
||||||
|
lk.useCount++
|
||||||
|
lkr.lMutex.Unlock()
|
||||||
|
|
||||||
|
lk.mtx.Lock()
|
||||||
|
|
||||||
|
return lk
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lkr *IDLocker) Unlock(lk *IDLock, identifier string) {
|
||||||
|
lk.mtx.Unlock()
|
||||||
|
|
||||||
|
lkr.lMutex.Lock()
|
||||||
|
lk.useCount--
|
||||||
|
if lk.useCount == 0 {
|
||||||
|
delete(lkr.lMap, identifier)
|
||||||
|
}
|
||||||
|
lkr.lMutex.Unlock()
|
||||||
|
}
|
38
pkg/util/idlocker_test.go
Normal file
38
pkg/util/idlocker_test.go
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 ceph-csi authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// very basic tests for the moment
|
||||||
|
func TestIDLocker(t *testing.T) {
|
||||||
|
myIDLocker := NewIDLocker()
|
||||||
|
|
||||||
|
lk1 := myIDLocker.Lock("lk1")
|
||||||
|
lk2 := myIDLocker.Lock("lk2")
|
||||||
|
lk3 := myIDLocker.Lock("lk3")
|
||||||
|
|
||||||
|
if lk1 == lk2 || lk2 == lk3 || lk3 == lk1 {
|
||||||
|
t.Errorf("Failed: lock variables clash when they should not!")
|
||||||
|
}
|
||||||
|
|
||||||
|
myIDLocker.Unlock(lk1, "lk1")
|
||||||
|
myIDLocker.Unlock(lk2, "lk2")
|
||||||
|
myIDLocker.Unlock(lk3, "lk3")
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user