rbd: protect against concurrent gRPC calls

The timeout value in external-provisioner is fairly low. It's not
uncommon that it times out and retries before the rbdplugin is done
with CreateVolume. rbdplugin has to serialize calls and ensure that
they are idempotent to deal with this.
This commit is contained in:
Patrick Ohly 2018-10-17 14:52:45 +02:00 committed by Huamin Chen
parent 188cdd1d68
commit 403cad682c
3 changed files with 29 additions and 2 deletions

View File

@ -55,6 +55,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty") return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
} }
volumeNameMutex.LockKey(req.GetName())
defer volumeNameMutex.UnlockKey(req.GetName())
// Need to check for already existing volume name, and if found // Need to check for already existing volume name, and if found
// check for the requested capacity and already allocated capacity // check for the requested capacity and already allocated capacity
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil { if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
@ -156,6 +159,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
} }
// For now the image get unconditionally deleted, but here retention policy can be checked // For now the image get unconditionally deleted, but here retention policy can be checked
volumeID := req.GetVolumeId() volumeID := req.GetVolumeId()
volumeIDMutex.LockKey(volumeID)
defer volumeIDMutex.UnlockKey(volumeID)
rbdVol := &rbdVolume{} rbdVol := &rbdVolume{}
if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil { if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil {
if os.IsNotExist(errors.Cause(err)) { if os.IsNotExist(errors.Cause(err)) {
@ -174,8 +179,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
} }
// Removing persistent storage file for the unmapped volume // Removing persistent storage file for the unmapped volume
if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil { if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil {
// TODO: we can theoretically end up here when two DeleteVolume calls
// get invoked concurrently. Serialize?
return nil, err return nil, err
} }
@ -214,6 +217,9 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty") return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
} }
snapshotNameMutex.LockKey(req.GetName())
defer snapshotNameMutex.UnlockKey(req.GetName())
// Need to check for already existing snapshot name, and if found // Need to check for already existing snapshot name, and if found
// check for the requested source volume id and already allocated source volume id // check for the requested source volume id and already allocated source volume id
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil { if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
@ -332,6 +338,9 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
if len(snapshotID) == 0 { if len(snapshotID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty") return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
} }
snapshotIDMutex.LockKey(snapshotID)
defer snapshotIDMutex.UnlockKey(snapshotID)
rbdSnap := &rbdSnapshot{} rbdSnap := &rbdSnapshot{}
if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil { if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
return nil, err return nil, err
@ -368,6 +377,7 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
sourceVolumeId := req.GetSourceVolumeId() sourceVolumeId := req.GetSourceVolumeId()
// TODO (sngchlko) list with token // TODO (sngchlko) list with token
// TODO (#94) protect concurrent access to global data structures
// list only a specific snapshot which has snapshot ID // list only a specific snapshot which has snapshot ID
if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 { if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 {

View File

@ -47,6 +47,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/") s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
volName := s[len(s)-1] volName := s[len(s)-1]
targetPathMutex.LockKey(targetPath)
defer targetPathMutex.UnlockKey(targetPath)
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath) notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@ -97,6 +100,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
targetPathMutex.LockKey(targetPath)
defer targetPathMutex.UnlockKey(targetPath)
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath) notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
if err != nil { if err != nil {

View File

@ -75,7 +75,19 @@ type rbdSnapshot struct {
} }
var ( var (
// serializes operations based on "<rbd pool>/<rbd image>" as key
attachdetachMutex = keymutex.NewKeyMutex() attachdetachMutex = keymutex.NewKeyMutex()
// serializes operations based on "volume name" as key
volumeNameMutex = keymutex.NewKeyMutex()
// serializes operations based on "volume id" as key
volumeIDMutex = keymutex.NewKeyMutex()
// serializes operations based on "snapshot name" as key
snapshotNameMutex = keymutex.NewKeyMutex()
// serializes operations based on "snapshot id" as key
snapshotIDMutex = keymutex.NewKeyMutex()
// serializes operations based on "mount target path" as key
targetPathMutex = keymutex.NewKeyMutex()
supportedFeatures = sets.NewString("layering") supportedFeatures = sets.NewString("layering")
) )