mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-22 14:20:19 +00:00
rbd: protect against concurrent gRPC calls
The timeout value in external-provisioner is fairly low. It's not uncommon that it times out and retries before the rbdplugin is done with CreateVolume. rbdplugin has to serialize calls and ensure that they are idempotent to deal with this.
This commit is contained in:
parent
188cdd1d68
commit
720ad4afeb
@ -55,6 +55,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
|||||||
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
|
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
volumeNameMutex.LockKey(req.GetName())
|
||||||
|
defer volumeNameMutex.UnlockKey(req.GetName())
|
||||||
|
|
||||||
// Need to check for already existing volume name, and if found
|
// Need to check for already existing volume name, and if found
|
||||||
// check for the requested capacity and already allocated capacity
|
// check for the requested capacity and already allocated capacity
|
||||||
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
|
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
|
||||||
@ -156,6 +159,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
}
|
}
|
||||||
// For now the image get unconditionally deleted, but here retention policy can be checked
|
// For now the image get unconditionally deleted, but here retention policy can be checked
|
||||||
volumeID := req.GetVolumeId()
|
volumeID := req.GetVolumeId()
|
||||||
|
volumeIDMutex.LockKey(volumeID)
|
||||||
|
defer volumeIDMutex.UnlockKey(volumeID)
|
||||||
rbdVol := &rbdVolume{}
|
rbdVol := &rbdVolume{}
|
||||||
if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil {
|
if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil {
|
||||||
if os.IsNotExist(errors.Cause(err)) {
|
if os.IsNotExist(errors.Cause(err)) {
|
||||||
@ -174,8 +179,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
}
|
}
|
||||||
// Removing persistent storage file for the unmapped volume
|
// Removing persistent storage file for the unmapped volume
|
||||||
if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil {
|
if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil {
|
||||||
// TODO: we can theoretically end up here when two DeleteVolume calls
|
|
||||||
// get invoked concurrently. Serialize?
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,6 +217,9 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
|||||||
return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
|
return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
snapshotNameMutex.LockKey(req.GetName())
|
||||||
|
defer snapshotNameMutex.UnlockKey(req.GetName())
|
||||||
|
|
||||||
// Need to check for already existing snapshot name, and if found
|
// Need to check for already existing snapshot name, and if found
|
||||||
// check for the requested source volume id and already allocated source volume id
|
// check for the requested source volume id and already allocated source volume id
|
||||||
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
|
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
|
||||||
@ -332,6 +338,9 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
|||||||
if len(snapshotID) == 0 {
|
if len(snapshotID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
|
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
|
||||||
}
|
}
|
||||||
|
snapshotIDMutex.LockKey(snapshotID)
|
||||||
|
defer snapshotIDMutex.UnlockKey(snapshotID)
|
||||||
|
|
||||||
rbdSnap := &rbdSnapshot{}
|
rbdSnap := &rbdSnapshot{}
|
||||||
if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
|
if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -368,6 +377,7 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
|
|||||||
sourceVolumeId := req.GetSourceVolumeId()
|
sourceVolumeId := req.GetSourceVolumeId()
|
||||||
|
|
||||||
// TODO (sngchlko) list with token
|
// TODO (sngchlko) list with token
|
||||||
|
// TODO (#94) protect concurrent access to global data structures
|
||||||
|
|
||||||
// list only a specific snapshot which has snapshot ID
|
// list only a specific snapshot which has snapshot ID
|
||||||
if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 {
|
if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 {
|
||||||
|
@ -47,6 +47,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
|
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
|
||||||
volName := s[len(s)-1]
|
volName := s[len(s)-1]
|
||||||
|
|
||||||
|
targetPathMutex.LockKey(targetPath)
|
||||||
|
defer targetPathMutex.UnlockKey(targetPath)
|
||||||
|
|
||||||
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
|
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
@ -97,6 +100,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
|
|
||||||
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||||
targetPath := req.GetTargetPath()
|
targetPath := req.GetTargetPath()
|
||||||
|
targetPathMutex.LockKey(targetPath)
|
||||||
|
defer targetPathMutex.UnlockKey(targetPath)
|
||||||
|
|
||||||
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
|
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -75,7 +75,19 @@ type rbdSnapshot struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// serializes operations based on "<rbd pool>/<rbd image>" as key
|
||||||
attachdetachMutex = keymutex.NewKeyMutex()
|
attachdetachMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "volume name" as key
|
||||||
|
volumeNameMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "volume id" as key
|
||||||
|
volumeIDMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "snapshot name" as key
|
||||||
|
snapshotNameMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "snapshot id" as key
|
||||||
|
snapshotIDMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "mount target path" as key
|
||||||
|
targetPathMutex = keymutex.NewKeyMutex()
|
||||||
|
|
||||||
supportedFeatures = sets.NewString("layering")
|
supportedFeatures = sets.NewString("layering")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user