mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-26 00:00:23 +00:00
Merge pull request #92 from pohly/concurrency
rbd: protect against concurrent gRPC calls
This commit is contained in:
commit
47a7b1ff8e
@ -55,6 +55,9 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
|||||||
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
|
return nil, status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
volumeNameMutex.LockKey(req.GetName())
|
||||||
|
defer volumeNameMutex.UnlockKey(req.GetName())
|
||||||
|
|
||||||
// Need to check for already existing volume name, and if found
|
// Need to check for already existing volume name, and if found
|
||||||
// check for the requested capacity and already allocated capacity
|
// check for the requested capacity and already allocated capacity
|
||||||
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
|
if exVol, err := getRBDVolumeByName(req.GetName()); err == nil {
|
||||||
@ -156,6 +159,8 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
}
|
}
|
||||||
// For now the image get unconditionally deleted, but here retention policy can be checked
|
// For now the image get unconditionally deleted, but here retention policy can be checked
|
||||||
volumeID := req.GetVolumeId()
|
volumeID := req.GetVolumeId()
|
||||||
|
volumeIDMutex.LockKey(volumeID)
|
||||||
|
defer volumeIDMutex.UnlockKey(volumeID)
|
||||||
rbdVol := &rbdVolume{}
|
rbdVol := &rbdVolume{}
|
||||||
if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil {
|
if err := loadVolInfo(volumeID, path.Join(PluginFolder, "controller"), rbdVol); err != nil {
|
||||||
if os.IsNotExist(errors.Cause(err)) {
|
if os.IsNotExist(errors.Cause(err)) {
|
||||||
@ -174,8 +179,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
|||||||
}
|
}
|
||||||
// Removing persistent storage file for the unmapped volume
|
// Removing persistent storage file for the unmapped volume
|
||||||
if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil {
|
if err := deleteVolInfo(volumeID, path.Join(PluginFolder, "controller")); err != nil {
|
||||||
// TODO: we can theoretically end up here when two DeleteVolume calls
|
|
||||||
// get invoked concurrently. Serialize?
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,6 +217,9 @@ func (cs *controllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
|||||||
return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
|
return nil, status.Error(codes.InvalidArgument, "Source Volume ID cannot be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
snapshotNameMutex.LockKey(req.GetName())
|
||||||
|
defer snapshotNameMutex.UnlockKey(req.GetName())
|
||||||
|
|
||||||
// Need to check for already existing snapshot name, and if found
|
// Need to check for already existing snapshot name, and if found
|
||||||
// check for the requested source volume id and already allocated source volume id
|
// check for the requested source volume id and already allocated source volume id
|
||||||
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
|
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
|
||||||
@ -332,6 +338,9 @@ func (cs *controllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteS
|
|||||||
if len(snapshotID) == 0 {
|
if len(snapshotID) == 0 {
|
||||||
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
|
return nil, status.Error(codes.InvalidArgument, "Snapshot ID cannot be empty")
|
||||||
}
|
}
|
||||||
|
snapshotIDMutex.LockKey(snapshotID)
|
||||||
|
defer snapshotIDMutex.UnlockKey(snapshotID)
|
||||||
|
|
||||||
rbdSnap := &rbdSnapshot{}
|
rbdSnap := &rbdSnapshot{}
|
||||||
if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
|
if err := loadSnapInfo(snapshotID, path.Join(PluginFolder, "controller-snap"), rbdSnap); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -368,6 +377,7 @@ func (cs *controllerServer) ListSnapshots(ctx context.Context, req *csi.ListSnap
|
|||||||
sourceVolumeId := req.GetSourceVolumeId()
|
sourceVolumeId := req.GetSourceVolumeId()
|
||||||
|
|
||||||
// TODO (sngchlko) list with token
|
// TODO (sngchlko) list with token
|
||||||
|
// TODO (#94) protect concurrent access to global data structures
|
||||||
|
|
||||||
// list only a specific snapshot which has snapshot ID
|
// list only a specific snapshot which has snapshot ID
|
||||||
if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 {
|
if snapshotID := req.GetSnapshotId(); len(snapshotID) != 0 {
|
||||||
|
@ -47,6 +47,9 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
|
s := strings.Split(strings.TrimSuffix(targetPath, "/mount"), "/")
|
||||||
volName := s[len(s)-1]
|
volName := s[len(s)-1]
|
||||||
|
|
||||||
|
targetPathMutex.LockKey(targetPath)
|
||||||
|
defer targetPathMutex.UnlockKey(targetPath)
|
||||||
|
|
||||||
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
|
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
@ -97,6 +100,8 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
|||||||
|
|
||||||
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||||
targetPath := req.GetTargetPath()
|
targetPath := req.GetTargetPath()
|
||||||
|
targetPathMutex.LockKey(targetPath)
|
||||||
|
defer targetPathMutex.UnlockKey(targetPath)
|
||||||
|
|
||||||
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
|
notMnt, err := ns.mounter.IsLikelyNotMountPoint(targetPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -75,7 +75,19 @@ type rbdSnapshot struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// serializes operations based on "<rbd pool>/<rbd image>" as key
|
||||||
attachdetachMutex = keymutex.NewKeyMutex()
|
attachdetachMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "volume name" as key
|
||||||
|
volumeNameMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "volume id" as key
|
||||||
|
volumeIDMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "snapshot name" as key
|
||||||
|
snapshotNameMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "snapshot id" as key
|
||||||
|
snapshotIDMutex = keymutex.NewKeyMutex()
|
||||||
|
// serializes operations based on "mount target path" as key
|
||||||
|
targetPathMutex = keymutex.NewKeyMutex()
|
||||||
|
|
||||||
supportedFeatures = sets.NewString("layering")
|
supportedFeatures = sets.NewString("layering")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user