From c4a3675ceca7c911166e63e4add4071ca0993352 Mon Sep 17 00:00:00 2001 From: ShyamsundarR Date: Sat, 22 Jun 2019 12:43:28 -0400 Subject: [PATCH] Move locks to more granular locking than CPU count based As detailed in issue #279, current lock scheme has hash buckets that are count of CPUs. This causes a lot of contention when parallel requests are made to the CSI plugin. To reduce lock contention, this commit introduces granular locks per identifier. The commit also changes the timeout for gRPC requests to Create and Delete volumes, as the current timeout is 10s (kubernetes documentation says 15s but code defaults are 10s). A virtual setup takes about 12-15s to complete a request at times, that leads to unwanted retries of the same request, hence the increased timeout to enable operation completion with minimal retries. Tests to create PVCs before and after these changes look like so, Before: Default master code + sidecar provisioner --timeout option set to 30 seconds 20 PVCs Creation: 3 runs, 396/391/400 seconds Deletion: 3 runs, 218/271/118 seconds - Once was stalled for more than 8 minutes and cancelled the run After: Current commit + sidecar provisioner --timeout option set to 30 sec 20 PVCs Creation: 3 runs, 42/59/65 seconds Deletion: 3 runs, 32/32/31 seconds Fixes: #279 Signed-off-by: ShyamsundarR --- .../templates/provisioner-statefulset.yaml | 2 + .../csi-cephfsplugin-provisioner.yaml | 2 + .../templates/provisioner-statefulset.yaml | 4 +- .../kubernetes/csi-rbdplugin-provisioner.yaml | 4 +- pkg/cephfs/controllerserver.go | 17 ++-- pkg/cephfs/nodeserver.go | 7 +- pkg/cephfs/util.go | 7 -- pkg/rbd/controllerserver.go | 68 +++++----------- pkg/rbd/nodeserver.go | 29 +++---- pkg/rbd/rbd_attach.go | 9 +-- pkg/rbd/rbd_util.go | 13 +--- pkg/util/idlocker.go | 77 +++++++++++++++++++ pkg/util/idlocker_test.go | 38 +++++++++ 13 files changed, 173 insertions(+), 104 deletions(-) create mode 100644 pkg/util/idlocker.go create mode 100644 pkg/util/idlocker_test.go diff --git a/deploy/cephfs/helm/templates/provisioner-statefulset.yaml b/deploy/cephfs/helm/templates/provisioner-statefulset.yaml index 51e56a722..6ed1ab2f2 100644 --- a/deploy/cephfs/helm/templates/provisioner-statefulset.yaml +++ b/deploy/cephfs/helm/templates/provisioner-statefulset.yaml @@ -32,6 +32,8 @@ spec: args: - "--csi-address=$(ADDRESS)" - "--v=5" + - "--timeout=60s" + - "--retry-interval-start=500ms" env: - name: ADDRESS value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}" diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml index 9c19a33c8..38d68b0b4 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml @@ -35,6 +35,8 @@ spec: args: - "--csi-address=$(ADDRESS)" - "--v=5" + - "--timeout=60s" + - "--retry-interval-start=500ms" env: - name: ADDRESS value: unix:///csi/csi-provisioner.sock diff --git a/deploy/rbd/helm/templates/provisioner-statefulset.yaml b/deploy/rbd/helm/templates/provisioner-statefulset.yaml index 7f80d0a3f..633cddcad 100644 --- a/deploy/rbd/helm/templates/provisioner-statefulset.yaml +++ b/deploy/rbd/helm/templates/provisioner-statefulset.yaml @@ -32,6 +32,8 @@ spec: args: - "--csi-address=$(ADDRESS)" - "--v=5" + - "--timeout=60s" + - "--retry-interval-start=500ms" env: - name: ADDRESS value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}" @@ -46,8 +48,8 @@ spec: imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }} args: - "--csi-address=$(ADDRESS)" - - "--connection-timeout=15s" - "--v=5" + - "--timeout=60s" env: - name: ADDRESS value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}" diff --git a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml index 5efb0bcb8..8511a2591 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml @@ -35,6 +35,8 @@ spec: args: - "--csi-address=$(ADDRESS)" - "--v=5" + - "--timeout=60s" + - "--retry-interval-start=500ms" env: - name: ADDRESS value: unix:///csi/csi-provisioner.sock @@ -46,8 +48,8 @@ spec: image: quay.io/k8scsi/csi-snapshotter:v1.1.0 args: - "--csi-address=$(ADDRESS)" - - "--connection-timeout=15s" - "--v=5" + - "--timeout=60s" env: - name: ADDRESS value: unix:///csi/csi-provisioner.sock diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index dfbc0e949..538cff351 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -25,7 +25,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" - "k8s.io/utils/keymutex" ) // ControllerServer struct of CEPH CSI driver with supported methods of CSI @@ -41,8 +40,8 @@ type controllerCacheEntry struct { } var ( - mtxControllerVolumeID = keymutex.NewHashed(0) - mtxControllerVolumeName = keymutex.NewHashed(0) + volumeIDLocker = util.NewIDLocker() + volumeNameLocker = util.NewIDLocker() ) // createBackingVolume creates the backing subvolume and user/key for the given volOptions and vID, @@ -91,8 +90,8 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } // Existence and conflict checks - mtxControllerVolumeName.LockKey(requestName) - defer mustUnlock(mtxControllerVolumeName, requestName) + idLk := volumeNameLocker.Lock(requestName) + defer volumeNameLocker.Unlock(idLk, requestName) vID, err := checkVolExists(volOptions, secret) if err != nil { @@ -181,8 +180,8 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) return nil, status.Error(codes.InvalidArgument, err.Error()) } - mtxControllerVolumeID.LockKey(string(volID)) - defer mustUnlock(mtxControllerVolumeID, string(volID)) + idLk := volumeIDLocker.Lock(string(volID)) + defer volumeIDLocker.Unlock(idLk, string(volID)) if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil { klog.Errorf("failed to delete volume %s: %v", volID, err) @@ -240,8 +239,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol // lock out parallel delete and create requests against the same volume name as we // cleanup the subvolume and associated omaps for the same - mtxControllerVolumeName.LockKey(volOptions.RequestName) - defer mustUnlock(mtxControllerVolumeName, volOptions.RequestName) + idLk := volumeNameLocker.Lock(volOptions.RequestName) + defer volumeNameLocker.Unlock(idLk, volOptions.RequestName) if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil { klog.Errorf("failed to delete volume %s: %v", volID, err) diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index d9f48c677..9656cfd16 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -28,7 +28,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" - "k8s.io/utils/keymutex" ) // NodeServer struct of ceph CSI driver with supported methods of CSI @@ -38,7 +37,7 @@ type NodeServer struct { } var ( - mtxNodeVolumeID = keymutex.NewHashed(0) + nodeVolumeIDLocker = util.NewIDLocker() ) func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) { @@ -121,8 +120,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.Internal, err.Error()) } - mtxNodeVolumeID.LockKey(string(volID)) - defer mustUnlock(mtxNodeVolumeID, string(volID)) + idLk := nodeVolumeIDLocker.Lock(string(volID)) + defer nodeVolumeIDLocker.Unlock(idLk, string(volID)) // Check if the volume is already mounted diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 193089cb5..573cedb39 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -31,17 +31,10 @@ import ( "github.com/ceph/ceph-csi/pkg/util" "github.com/container-storage-interface/spec/lib/go/csi" "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/utils/keymutex" ) type volumeID string -func mustUnlock(m keymutex.KeyMutex, key string) { - if err := m.UnlockKey(key); err != nil { - klog.Fatalf("failed to unlock mutex for %s: %v", key, err) - } -} - func execCommand(program string, args ...string) (stdout, stderr []byte, err error) { var ( cmd = exec.Command(program, args...) // nolint: gosec diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index 53f452378..c42bc74f5 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -116,18 +116,14 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.Internal, err.Error()) } - volumeNameMutex.LockKey(req.GetName()) - defer func() { - if err = volumeNameMutex.UnlockKey(req.GetName()); err != nil { - klog.Warningf("failed to unlock mutex volume:%s %v", req.GetName(), err) - } - }() - rbdVol, err := cs.parseVolCreateRequest(req) if err != nil { return nil, err } + idLk := volumeNameLocker.Lock(req.GetName()) + defer volumeNameLocker.Unlock(idLk, req.GetName()) + found, err := checkVolExists(rbdVol, cr) if err != nil { if _, ok := err.(ErrVolNameConflict); ok { @@ -248,12 +244,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") } - volumeIDMutex.LockKey(volumeID) - defer func() { - if err := volumeIDMutex.UnlockKey(volumeID); err != nil { - klog.Warningf("failed to unlock mutex volume:%s %v", volumeID, err) - } - }() rbdVol := &rbdVolume{} if err := genVolFromVolID(rbdVol, volumeID, cr); err != nil { @@ -272,12 +262,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol // If error is ErrImageNotFound then we failed to find the image, but found the imageOMap // to lead us to the image, hence the imageOMap needs to be garbage collected, by calling // unreserve for the same - volumeNameMutex.LockKey(rbdVol.RequestName) - defer func() { - if err := volumeNameMutex.UnlockKey(rbdVol.RequestName); err != nil { - klog.Warningf("failed to unlock mutex volume:%s %v", rbdVol.RequestName, err) - } - }() + idLk := volumeNameLocker.Lock(rbdVol.RequestName) + defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName) if err := undoVolReservation(rbdVol, cr); err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -287,12 +273,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol // lock out parallel create requests against the same volume name as we // cleanup the image and associated omaps for the same - volumeNameMutex.LockKey(rbdVol.RequestName) - defer func() { - if err := volumeNameMutex.UnlockKey(rbdVol.RequestName); err != nil { - klog.Warningf("failed to unlock mutex volume:%s %v", rbdVol.RequestName, err) - } - }() + idLk := volumeNameLocker.Lock(rbdVol.RequestName) + defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName) // Deleting rbd image klog.V(4).Infof("deleting image %s", rbdVol.RbdImageName) @@ -341,13 +323,6 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS return nil, status.Error(codes.Internal, err.Error()) } - snapshotNameMutex.LockKey(req.GetName()) - defer func() { - if err = snapshotNameMutex.UnlockKey(req.GetName()); err != nil { - klog.Warningf("failed to unlock mutex snapshot:%s %v", req.GetName(), err) - } - }() - // Fetch source volume information rbdVol := new(rbdVolume) err = genVolFromVolID(rbdVol, req.GetSourceVolumeId(), cr) @@ -370,6 +345,9 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS rbdSnap.SourceVolumeID = req.GetSourceVolumeId() rbdSnap.RequestName = req.GetName() + idLk := snapshotNameLocker.Lock(req.GetName()) + defer snapshotNameLocker.Unlock(idLk, req.GetName()) + // Need to check for already existing snapshot name, and if found // check for the requested source volume id and already allocated source volume id found, err := checkSnapExists(rbdSnap, cr) @@ -502,13 +480,6 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty") } - snapshotIDMutex.LockKey(snapshotID) - defer func() { - if err = snapshotIDMutex.UnlockKey(snapshotID); err != nil { - klog.Warningf("failed to unlock mutex snapshot:%s %v", snapshotID, err) - } - }() - rbdSnap := &rbdSnapshot{} if err = genSnapFromSnapID(rbdSnap, snapshotID, cr); err != nil { // if error is ErrKeyNotFound, then a previous attempt at deletion was complete @@ -518,24 +489,25 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS return &csi.DeleteSnapshotResponse{}, nil } - // Consider missing snap as already deleted, and proceed to remove the omap values + // All errors other than ErrSnapNotFound should return an error back to the caller if _, ok := err.(ErrSnapNotFound); !ok { return nil, status.Error(codes.Internal, err.Error()) } + + // Consider missing snap as already deleted, and proceed to remove the omap values, + // safeguarding against parallel create or delete requests against the same name. + idLk := snapshotNameLocker.Lock(rbdSnap.RequestName) + defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName) + if err = undoSnapReservation(rbdSnap, cr); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &csi.DeleteSnapshotResponse{}, nil } - // lock out parallel create requests against the same snap name as we - // cleanup the image and associated omaps for the same - snapshotNameMutex.LockKey(rbdSnap.RequestName) - defer func() { - if err = snapshotNameMutex.UnlockKey(rbdSnap.RequestName); err != nil { - klog.Warningf("failed to unlock mutex snapshot:%s %v", rbdSnap.RequestName, err) - } - }() + // safeguard against parallel create or delete requests against the same name + idLk := snapshotNameLocker.Lock(rbdSnap.RequestName) + defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName) // Unprotect snapshot err = unprotectSnapshot(rbdSnap, cr) diff --git a/pkg/rbd/nodeserver.go b/pkg/rbd/nodeserver.go index 369a38d3f..08a18d465 100644 --- a/pkg/rbd/nodeserver.go +++ b/pkg/rbd/nodeserver.go @@ -62,18 +62,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.Internal, err.Error()) } - targetPathMutex.LockKey(targetPath) - defer func() { - if err = targetPathMutex.UnlockKey(targetPath); err != nil { - klog.Warningf("failed to unlock mutex targetpath:%s %v", targetPath, err) - } - }() - disableInUseChecks := false + idLk := targetPathLocker.Lock(targetPath) + defer targetPathLocker.Unlock(idLk, targetPath) - volName, err := ns.getVolumeName(req) - if err != nil { - return nil, err - } + disableInUseChecks := false isBlock := req.GetVolumeCapability().GetBlock() != nil // Check if that target path exists properly @@ -100,7 +92,13 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err != nil { return nil, err } + + volName, err := ns.getVolumeName(req) + if err != nil { + return nil, err + } volOptions.RbdImageName = volName + // Mapping RBD image devicePath, err := attachRBDImage(volOptions, cr) if err != nil { @@ -206,13 +204,8 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") } - targetPathMutex.LockKey(targetPath) - - defer func() { - if err := targetPathMutex.UnlockKey(targetPath); err != nil { - klog.Warningf("failed to unlock mutex targetpath:%s %v", targetPath, err) - } - }() + idLk := targetPathLocker.Lock(targetPath) + defer targetPathLocker.Unlock(idLk, targetPath) notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath) if err != nil { diff --git a/pkg/rbd/rbd_attach.go b/pkg/rbd/rbd_attach.go index 2751e0be5..8b966a099 100644 --- a/pkg/rbd/rbd_attach.go +++ b/pkg/rbd/rbd_attach.go @@ -242,13 +242,8 @@ func attachRBDImage(volOptions *rbdVolume, cr *util.Credentials) (string, error) devicePath, found := waitForPath(volOptions.Pool, image, 1, useNBD) if !found { - attachdetachMutex.LockKey(imagePath) - - defer func() { - if err = attachdetachMutex.UnlockKey(imagePath); err != nil { - klog.Warningf("failed to unlock mutex imagepath:%s %v", imagePath, err) - } - }() + idLk := attachdetachLocker.Lock(imagePath) + defer attachdetachLocker.Unlock(idLk, imagePath) _, err = execCommand("modprobe", []string{moduleName}) if err != nil { diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index be7d6d40f..0b674089c 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -30,7 +30,6 @@ import ( "github.com/pkg/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog" - "k8s.io/utils/keymutex" ) const ( @@ -83,17 +82,13 @@ type rbdSnapshot struct { var ( // serializes operations based on "/" as key - attachdetachMutex = keymutex.NewHashed(0) + attachdetachLocker = util.NewIDLocker() // serializes operations based on "volume name" as key - volumeNameMutex = keymutex.NewHashed(0) - // serializes operations based on "volume id" as key - volumeIDMutex = keymutex.NewHashed(0) + volumeNameLocker = util.NewIDLocker() // serializes operations based on "snapshot name" as key - snapshotNameMutex = keymutex.NewHashed(0) - // serializes operations based on "snapshot id" as key - snapshotIDMutex = keymutex.NewHashed(0) + snapshotNameLocker = util.NewIDLocker() // serializes operations based on "mount target path" as key - targetPathMutex = keymutex.NewHashed(0) + targetPathLocker = util.NewIDLocker() supportedFeatures = sets.NewString("layering") ) diff --git a/pkg/util/idlocker.go b/pkg/util/idlocker.go new file mode 100644 index 000000000..ef997578f --- /dev/null +++ b/pkg/util/idlocker.go @@ -0,0 +1,77 @@ +/* +Copyright 2019 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" +) + +/* +IDLock is a per identifier lock with a use counter that retains a number of users of the lock. +IDLocker is a map of IDLocks holding the IDLocks based on a passed in identifier. +Typical usage (post creating an IDLocker) is to Lock/Unlock based on identifiers as per the API. +*/ +type ( + IDLock struct { + mtx sync.Mutex + useCount int + } + + IDLocker struct { + lMutex sync.Mutex + lMap map[string]*IDLock + } +) + +func NewIDLocker() *IDLocker { + return &IDLocker{ + lMap: make(map[string]*IDLock), + } +} + +func (lkr *IDLocker) Lock(identifier string) *IDLock { + var ( + lk *IDLock + ok bool + ) + + newlk := new(IDLock) + + lkr.lMutex.Lock() + + if lk, ok = lkr.lMap[identifier]; !ok { + lk = newlk + lkr.lMap[identifier] = lk + } + lk.useCount++ + lkr.lMutex.Unlock() + + lk.mtx.Lock() + + return lk +} + +func (lkr *IDLocker) Unlock(lk *IDLock, identifier string) { + lk.mtx.Unlock() + + lkr.lMutex.Lock() + lk.useCount-- + if lk.useCount == 0 { + delete(lkr.lMap, identifier) + } + lkr.lMutex.Unlock() +} diff --git a/pkg/util/idlocker_test.go b/pkg/util/idlocker_test.go new file mode 100644 index 000000000..635cd387d --- /dev/null +++ b/pkg/util/idlocker_test.go @@ -0,0 +1,38 @@ +/* +Copyright 2019 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" +) + +// very basic tests for the moment +func TestIDLocker(t *testing.T) { + myIDLocker := NewIDLocker() + + lk1 := myIDLocker.Lock("lk1") + lk2 := myIDLocker.Lock("lk2") + lk3 := myIDLocker.Lock("lk3") + + if lk1 == lk2 || lk2 == lk3 || lk3 == lk1 { + t.Errorf("Failed: lock variables clash when they should not!") + } + + myIDLocker.Unlock(lk1, "lk1") + myIDLocker.Unlock(lk2, "lk2") + myIDLocker.Unlock(lk3, "lk3") +}