From 403cad682ceb6abe55fd1eb170be68f652b43e14 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 17 Oct 2018 14:52:45 +0200 Subject: [PATCH] rbd: protect against concurrent gRPC calls The timeout value in external-provisioner is fairly low. It's not uncommon that it times out and retries before the rbdplugin is done with CreateVolume. rbdplugin has to serialize calls and ensure that they are idempotent to deal with this. --- pkg/rbd/controllerserver.go | 14 ++++++++++++-- pkg/rbd/nodeserver.go | 5 +++++ pkg/rbd/rbd_util.go | 12 ++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index b8a5c96ec..f7a968483 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -55,6 +55,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty") } + volumeNameMutex.LockKey(req.GetName()) + defer volumeNameMutex.UnlockKey(req.GetName()) + // Need to check for already existing volume name, and if found // check for the requested capacity and already allocated capacity if exVol, err := getRBDVolumeByName(req.GetName()); err == nil { @@ -156,6 +159,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } // For now the image get unconditionally deleted, but here retention policy can be checked volumeID := req.GetVolumeId() + volumeIDMutex.LockKey(volumeID) + defer volumeIDMutex.UnlockKey(volumeID) rbdVol := &rbdVolume{} if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil { if os.IsNotExist(errors.Cause(err)) { @@ -174,8 +179,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } // Removing persistent storage file for the unmapped volume if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil { - // TODO: we can theoretically end up here when two DeleteVolume calls - // get invoked concurrently. Serialize? return nil, err } @@ -214,6 +217,9 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty") } + snapshotNameMutex.LockKey(req.GetName()) + defer snapshotNameMutex.UnlockKey(req.GetName()) + // Need to check for already existing snapshot name, and if found // check for the requested source volume id and already allocated source volume id if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil { @@ -332,6 +338,9 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS if len(snapshotID) == 0 { return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty") } + snapshotIDMutex.LockKey(snapshotID) + defer snapshotIDMutex.UnlockKey(snapshotID) + rbdSnap := &rbdSnapshot{} if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil { return nil, err @@ -368,6 +377,7 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap sourceVolumeId := req.GetSourceVolumeId() // TODO (sngchlko) list with token + // TODO (#94) protect concurrent access to global data structures // list only a specific snapshot which has snapshot ID if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 { diff --git a/pkg/rbd/nodeserver.go b/pkg/rbd/nodeserver.go index eb75cf48b..65c5ad194 100644 --- a/pkg/rbd/nodeserver.go +++ b/pkg/rbd/nodeserver.go @@ -47,6 +47,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/") volName := s[len(s)-1] + targetPathMutex.LockKey(targetPath) + defer targetPathMutex.UnlockKey(targetPath) + notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath) if err != nil { if os.IsNotExist(err) { @@ -97,6 +100,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { targetPath := req.GetTargetPath() + targetPathMutex.LockKey(targetPath) + defer targetPathMutex.UnlockKey(targetPath) notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath) if err != nil { diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index ce3d87a59..accde1091 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -75,7 +75,19 @@ type rbdSnapshot struct { } var ( + // serializes operations based on "/" as key attachdetachMutex = keymutex.NewKeyMutex() + // serializes operations based on "volume name" as key + volumeNameMutex = keymutex.NewKeyMutex() + // serializes operations based on "volume id" as key + volumeIDMutex = keymutex.NewKeyMutex() + // serializes operations based on "snapshot name" as key + snapshotNameMutex = keymutex.NewKeyMutex() + // serializes operations based on "snapshot id" as key + snapshotIDMutex = keymutex.NewKeyMutex() + // serializes operations based on "mount target path" as key + targetPathMutex = keymutex.NewKeyMutex() + supportedFeatures = sets.NewString("layering") )