diff --git a/deploy/cephfs/helm/templates/provisioner-statefulset.yaml b/deploy/cephfs/helm/templates/provisioner-statefulset.yaml index 51e56a722..6ed1ab2f2 100644 --- a/deploy/cephfs/helm/templates/provisioner-statefulset.yaml +++ b/deploy/cephfs/helm/templates/provisioner-statefulset.yaml @@ -32,6 +32,8 @@ spec: args: - "--csi-address=$(ADDRESS)" - "--v=5" + - "--timeout=60s" + - "--retry-interval-start=500ms" env: - name: ADDRESS value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}" diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml index 9c19a33c8..38d68b0b4 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml @@ -35,6 +35,8 @@ spec: args: - "--csi-address=$(ADDRESS)" - "--v=5" + - "--timeout=60s" + - "--retry-interval-start=500ms" env: - name: ADDRESS value: unix:///csi/csi-provisioner.sock diff --git a/deploy/rbd/helm/templates/provisioner-statefulset.yaml b/deploy/rbd/helm/templates/provisioner-statefulset.yaml index 7f80d0a3f..633cddcad 100644 --- a/deploy/rbd/helm/templates/provisioner-statefulset.yaml +++ b/deploy/rbd/helm/templates/provisioner-statefulset.yaml @@ -32,6 +32,8 @@ spec: args: - "--csi-address=$(ADDRESS)" - "--v=5" + - "--timeout=60s" + - "--retry-interval-start=500ms" env: - name: ADDRESS value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}" @@ -46,8 +48,8 @@ spec: imagePullPolicy: {{ .Values.nodeplugin.plugin.image.pullPolicy }} args: - "--csi-address=$(ADDRESS)" - - "--connection-timeout=15s" - "--v=5" + - "--timeout=60s" env: - name: ADDRESS value: "{{ .Values.socketDir }}/{{ .Values.socketFile }}" diff --git a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml index 5efb0bcb8..8511a2591 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml @@ -35,6 +35,8 @@ spec: args: - "--csi-address=$(ADDRESS)" - "--v=5" + - "--timeout=60s" + - "--retry-interval-start=500ms" env: - name: ADDRESS value: unix:///csi/csi-provisioner.sock @@ -46,8 +48,8 @@ spec: image: quay.io/k8scsi/csi-snapshotter:v1.1.0 args: - "--csi-address=$(ADDRESS)" - - "--connection-timeout=15s" - "--v=5" + - "--timeout=60s" env: - name: ADDRESS value: unix:///csi/csi-provisioner.sock diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index dfbc0e949..538cff351 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -25,7 +25,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" - "k8s.io/utils/keymutex" ) // ControllerServer struct of CEPH CSI driver with supported methods of CSI @@ -41,8 +40,8 @@ type controllerCacheEntry struct { } var ( - mtxControllerVolumeID = keymutex.NewHashed(0) - mtxControllerVolumeName = keymutex.NewHashed(0) + volumeIDLocker = util.NewIDLocker() + volumeNameLocker = util.NewIDLocker() ) // createBackingVolume creates the backing subvolume and user/key for the given volOptions and vID, @@ -91,8 +90,8 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } // Existence and conflict checks - mtxControllerVolumeName.LockKey(requestName) - defer mustUnlock(mtxControllerVolumeName, requestName) + idLk := volumeNameLocker.Lock(requestName) + defer volumeNameLocker.Unlock(idLk, requestName) vID, err := checkVolExists(volOptions, secret) if err != nil { @@ -181,8 +180,8 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) return nil, status.Error(codes.InvalidArgument, err.Error()) } - mtxControllerVolumeID.LockKey(string(volID)) - defer mustUnlock(mtxControllerVolumeID, string(volID)) + idLk := volumeIDLocker.Lock(string(volID)) + defer volumeIDLocker.Unlock(idLk, string(volID)) if err = purgeVolume(volID, cr, &ce.VolOptions); err != nil { klog.Errorf("failed to delete volume %s: %v", volID, err) @@ -240,8 +239,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol // lock out parallel delete and create requests against the same volume name as we // cleanup the subvolume and associated omaps for the same - mtxControllerVolumeName.LockKey(volOptions.RequestName) - defer mustUnlock(mtxControllerVolumeName, volOptions.RequestName) + idLk := volumeNameLocker.Lock(volOptions.RequestName) + defer volumeNameLocker.Unlock(idLk, volOptions.RequestName) if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil { klog.Errorf("failed to delete volume %s: %v", volID, err) diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index d9f48c677..9656cfd16 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -28,7 +28,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/klog" - "k8s.io/utils/keymutex" ) // NodeServer struct of ceph CSI driver with supported methods of CSI @@ -38,7 +37,7 @@ type NodeServer struct { } var ( - mtxNodeVolumeID = keymutex.NewHashed(0) + nodeVolumeIDLocker = util.NewIDLocker() ) func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) { @@ -121,8 +120,8 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.Internal, err.Error()) } - mtxNodeVolumeID.LockKey(string(volID)) - defer mustUnlock(mtxNodeVolumeID, string(volID)) + idLk := nodeVolumeIDLocker.Lock(string(volID)) + defer nodeVolumeIDLocker.Unlock(idLk, string(volID)) // Check if the volume is already mounted diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 193089cb5..573cedb39 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -31,17 +31,10 @@ import ( "github.com/ceph/ceph-csi/pkg/util" "github.com/container-storage-interface/spec/lib/go/csi" "k8s.io/kubernetes/pkg/util/mount" - "k8s.io/utils/keymutex" ) type volumeID string -func mustUnlock(m keymutex.KeyMutex, key string) { - if err := m.UnlockKey(key); err != nil { - klog.Fatalf("failed to unlock mutex for %s: %v", key, err) - } -} - func execCommand(program string, args ...string) (stdout, stderr []byte, err error) { var ( cmd = exec.Command(program, args...) // nolint: gosec diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index 53f452378..c42bc74f5 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -116,18 +116,14 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.Internal, err.Error()) } - volumeNameMutex.LockKey(req.GetName()) - defer func() { - if err = volumeNameMutex.UnlockKey(req.GetName()); err != nil { - klog.Warningf("failed to unlock mutex volume:%s %v", req.GetName(), err) - } - }() - rbdVol, err := cs.parseVolCreateRequest(req) if err != nil { return nil, err } + idLk := volumeNameLocker.Lock(req.GetName()) + defer volumeNameLocker.Unlock(idLk, req.GetName()) + found, err := checkVolExists(rbdVol, cr) if err != nil { if _, ok := err.(ErrVolNameConflict); ok { @@ -248,12 +244,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol if volumeID == "" { return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") } - volumeIDMutex.LockKey(volumeID) - defer func() { - if err := volumeIDMutex.UnlockKey(volumeID); err != nil { - klog.Warningf("failed to unlock mutex volume:%s %v", volumeID, err) - } - }() rbdVol := &rbdVolume{} if err := genVolFromVolID(rbdVol, volumeID, cr); err != nil { @@ -272,12 +262,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol // If error is ErrImageNotFound then we failed to find the image, but found the imageOMap // to lead us to the image, hence the imageOMap needs to be garbage collected, by calling // unreserve for the same - volumeNameMutex.LockKey(rbdVol.RequestName) - defer func() { - if err := volumeNameMutex.UnlockKey(rbdVol.RequestName); err != nil { - klog.Warningf("failed to unlock mutex volume:%s %v", rbdVol.RequestName, err) - } - }() + idLk := volumeNameLocker.Lock(rbdVol.RequestName) + defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName) if err := undoVolReservation(rbdVol, cr); err != nil { return nil, status.Error(codes.Internal, err.Error()) @@ -287,12 +273,8 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol // lock out parallel create requests against the same volume name as we // cleanup the image and associated omaps for the same - volumeNameMutex.LockKey(rbdVol.RequestName) - defer func() { - if err := volumeNameMutex.UnlockKey(rbdVol.RequestName); err != nil { - klog.Warningf("failed to unlock mutex volume:%s %v", rbdVol.RequestName, err) - } - }() + idLk := volumeNameLocker.Lock(rbdVol.RequestName) + defer volumeNameLocker.Unlock(idLk, rbdVol.RequestName) // Deleting rbd image klog.V(4).Infof("deleting image %s", rbdVol.RbdImageName) @@ -341,13 +323,6 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS return nil, status.Error(codes.Internal, err.Error()) } - snapshotNameMutex.LockKey(req.GetName()) - defer func() { - if err = snapshotNameMutex.UnlockKey(req.GetName()); err != nil { - klog.Warningf("failed to unlock mutex snapshot:%s %v", req.GetName(), err) - } - }() - // Fetch source volume information rbdVol := new(rbdVolume) err = genVolFromVolID(rbdVol, req.GetSourceVolumeId(), cr) @@ -370,6 +345,9 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS rbdSnap.SourceVolumeID = req.GetSourceVolumeId() rbdSnap.RequestName = req.GetName() + idLk := snapshotNameLocker.Lock(req.GetName()) + defer snapshotNameLocker.Unlock(idLk, req.GetName()) + // Need to check for already existing snapshot name, and if found // check for the requested source volume id and already allocated source volume id found, err := checkSnapExists(rbdSnap, cr) @@ -502,13 +480,6 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty") } - snapshotIDMutex.LockKey(snapshotID) - defer func() { - if err = snapshotIDMutex.UnlockKey(snapshotID); err != nil { - klog.Warningf("failed to unlock mutex snapshot:%s %v", snapshotID, err) - } - }() - rbdSnap := &rbdSnapshot{} if err = genSnapFromSnapID(rbdSnap, snapshotID, cr); err != nil { // if error is ErrKeyNotFound, then a previous attempt at deletion was complete @@ -518,24 +489,25 @@ func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS return &csi.DeleteSnapshotResponse{}, nil } - // Consider missing snap as already deleted, and proceed to remove the omap values + // All errors other than ErrSnapNotFound should return an error back to the caller if _, ok := err.(ErrSnapNotFound); !ok { return nil, status.Error(codes.Internal, err.Error()) } + + // Consider missing snap as already deleted, and proceed to remove the omap values, + // safeguarding against parallel create or delete requests against the same name. + idLk := snapshotNameLocker.Lock(rbdSnap.RequestName) + defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName) + if err = undoSnapReservation(rbdSnap, cr); err != nil { return nil, status.Error(codes.Internal, err.Error()) } return &csi.DeleteSnapshotResponse{}, nil } - // lock out parallel create requests against the same snap name as we - // cleanup the image and associated omaps for the same - snapshotNameMutex.LockKey(rbdSnap.RequestName) - defer func() { - if err = snapshotNameMutex.UnlockKey(rbdSnap.RequestName); err != nil { - klog.Warningf("failed to unlock mutex snapshot:%s %v", rbdSnap.RequestName, err) - } - }() + // safeguard against parallel create or delete requests against the same name + idLk := snapshotNameLocker.Lock(rbdSnap.RequestName) + defer snapshotNameLocker.Unlock(idLk, rbdSnap.RequestName) // Unprotect snapshot err = unprotectSnapshot(rbdSnap, cr) diff --git a/pkg/rbd/nodeserver.go b/pkg/rbd/nodeserver.go index 369a38d3f..08a18d465 100644 --- a/pkg/rbd/nodeserver.go +++ b/pkg/rbd/nodeserver.go @@ -62,18 +62,10 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis return nil, status.Error(codes.Internal, err.Error()) } - targetPathMutex.LockKey(targetPath) - defer func() { - if err = targetPathMutex.UnlockKey(targetPath); err != nil { - klog.Warningf("failed to unlock mutex targetpath:%s %v", targetPath, err) - } - }() - disableInUseChecks := false + idLk := targetPathLocker.Lock(targetPath) + defer targetPathLocker.Unlock(idLk, targetPath) - volName, err := ns.getVolumeName(req) - if err != nil { - return nil, err - } + disableInUseChecks := false isBlock := req.GetVolumeCapability().GetBlock() != nil // Check if that target path exists properly @@ -100,7 +92,13 @@ func (ns *NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis if err != nil { return nil, err } + + volName, err := ns.getVolumeName(req) + if err != nil { + return nil, err + } volOptions.RbdImageName = volName + // Mapping RBD image devicePath, err := attachRBDImage(volOptions, cr) if err != nil { @@ -206,13 +204,8 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") } - targetPathMutex.LockKey(targetPath) - - defer func() { - if err := targetPathMutex.UnlockKey(targetPath); err != nil { - klog.Warningf("failed to unlock mutex targetpath:%s %v", targetPath, err) - } - }() + idLk := targetPathLocker.Lock(targetPath) + defer targetPathLocker.Unlock(idLk, targetPath) notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath) if err != nil { diff --git a/pkg/rbd/rbd_attach.go b/pkg/rbd/rbd_attach.go index 2751e0be5..8b966a099 100644 --- a/pkg/rbd/rbd_attach.go +++ b/pkg/rbd/rbd_attach.go @@ -242,13 +242,8 @@ func attachRBDImage(volOptions *rbdVolume, cr *util.Credentials) (string, error) devicePath, found := waitForPath(volOptions.Pool, image, 1, useNBD) if !found { - attachdetachMutex.LockKey(imagePath) - - defer func() { - if err = attachdetachMutex.UnlockKey(imagePath); err != nil { - klog.Warningf("failed to unlock mutex imagepath:%s %v", imagePath, err) - } - }() + idLk := attachdetachLocker.Lock(imagePath) + defer attachdetachLocker.Unlock(idLk, imagePath) _, err = execCommand("modprobe", []string{moduleName}) if err != nil { diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index be7d6d40f..0b674089c 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -30,7 +30,6 @@ import ( "github.com/pkg/errors" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog" - "k8s.io/utils/keymutex" ) const ( @@ -83,17 +82,13 @@ type rbdSnapshot struct { var ( // serializes operations based on "/" as key - attachdetachMutex = keymutex.NewHashed(0) + attachdetachLocker = util.NewIDLocker() // serializes operations based on "volume name" as key - volumeNameMutex = keymutex.NewHashed(0) - // serializes operations based on "volume id" as key - volumeIDMutex = keymutex.NewHashed(0) + volumeNameLocker = util.NewIDLocker() // serializes operations based on "snapshot name" as key - snapshotNameMutex = keymutex.NewHashed(0) - // serializes operations based on "snapshot id" as key - snapshotIDMutex = keymutex.NewHashed(0) + snapshotNameLocker = util.NewIDLocker() // serializes operations based on "mount target path" as key - targetPathMutex = keymutex.NewHashed(0) + targetPathLocker = util.NewIDLocker() supportedFeatures = sets.NewString("layering") ) diff --git a/pkg/util/idlocker.go b/pkg/util/idlocker.go new file mode 100644 index 000000000..ef997578f --- /dev/null +++ b/pkg/util/idlocker.go @@ -0,0 +1,77 @@ +/* +Copyright 2019 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "sync" +) + +/* +IDLock is a per identifier lock with a use counter that retains a number of users of the lock. +IDLocker is a map of IDLocks holding the IDLocks based on a passed in identifier. +Typical usage (post creating an IDLocker) is to Lock/Unlock based on identifiers as per the API. +*/ +type ( + IDLock struct { + mtx sync.Mutex + useCount int + } + + IDLocker struct { + lMutex sync.Mutex + lMap map[string]*IDLock + } +) + +func NewIDLocker() *IDLocker { + return &IDLocker{ + lMap: make(map[string]*IDLock), + } +} + +func (lkr *IDLocker) Lock(identifier string) *IDLock { + var ( + lk *IDLock + ok bool + ) + + newlk := new(IDLock) + + lkr.lMutex.Lock() + + if lk, ok = lkr.lMap[identifier]; !ok { + lk = newlk + lkr.lMap[identifier] = lk + } + lk.useCount++ + lkr.lMutex.Unlock() + + lk.mtx.Lock() + + return lk +} + +func (lkr *IDLocker) Unlock(lk *IDLock, identifier string) { + lk.mtx.Unlock() + + lkr.lMutex.Lock() + lk.useCount-- + if lk.useCount == 0 { + delete(lkr.lMap, identifier) + } + lkr.lMutex.Unlock() +} diff --git a/pkg/util/idlocker_test.go b/pkg/util/idlocker_test.go new file mode 100644 index 000000000..635cd387d --- /dev/null +++ b/pkg/util/idlocker_test.go @@ -0,0 +1,38 @@ +/* +Copyright 2019 ceph-csi authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" +) + +// very basic tests for the moment +func TestIDLocker(t *testing.T) { + myIDLocker := NewIDLocker() + + lk1 := myIDLocker.Lock("lk1") + lk2 := myIDLocker.Lock("lk2") + lk3 := myIDLocker.Lock("lk3") + + if lk1 == lk2 || lk2 == lk3 || lk3 == lk1 { + t.Errorf("Failed: lock variables clash when they should not!") + } + + myIDLocker.Unlock(lk1, "lk1") + myIDLocker.Unlock(lk2, "lk2") + myIDLocker.Unlock(lk3, "lk3") +}