mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-04-11 18:13:00 +00:00
Merge pull request #429 from yati1998/DFBUGS-1011
DFBUGS-1011: Prevent dataloss due to the concurrent RPC calls (occurrence is very low)
This commit is contained in:
commit
edb16f257e
@ -22,8 +22,8 @@ curl -X GET http://10.109.65.142:8080/metrics 2>/dev/null | grep csi
|
||||
csi_liveness 1
|
||||
```
|
||||
|
||||
Promethues can be deployed through the promethues operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html).
|
||||
The [service-monitor](../deploy/service-monitor.yaml) will tell promethues how
|
||||
Prometheus can be deployed through the prometheus operator described [here](https://coreos.com/operators/prometheus/docs/latest/user-guides/getting-started.html).
|
||||
The [service-monitor](../deploy/service-monitor.yaml) will tell prometheus how
|
||||
to pull metrics out of CSI.
|
||||
|
||||
Each CSI pod has a service to expose the endpoint to prometheus. By default, rbd
|
||||
|
@ -125,7 +125,7 @@ parameters:
|
||||
# "file": Enable file encryption on the mounted filesystem
|
||||
# "block": Encrypt RBD block device
|
||||
# When unspecified assume type "block". "file" and "block" are
|
||||
# mutally exclusive.
|
||||
# mutually exclusive.
|
||||
# encryptionType: "block"
|
||||
|
||||
# (optional) Use external key management system for encryption passphrases by
|
||||
|
@ -450,6 +450,13 @@ func (ns *NodeServer) NodePublishVolume(
|
||||
targetPath := req.GetTargetPath()
|
||||
volID := fsutil.VolumeID(req.GetVolumeId())
|
||||
|
||||
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
|
||||
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)
|
||||
|
||||
return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(targetPath)
|
||||
|
||||
volOptions := &store.VolumeOptions{}
|
||||
defer volOptions.Destroy()
|
||||
|
||||
@ -462,9 +469,6 @@ func (ns *NodeServer) NodePublishVolume(
|
||||
return nil, status.Errorf(codes.Internal, "failed to create mounter for volume %s: %v", volID, err.Error())
|
||||
}
|
||||
|
||||
// Considering kubelet make sure the stage and publish operations
|
||||
// are serialized, we dont need any extra locking in nodePublish
|
||||
|
||||
if err = util.CreateMountPoint(targetPath); err != nil {
|
||||
log.ErrorLog(ctx, "failed to create mount point at %s: %v", targetPath, err)
|
||||
|
||||
@ -555,12 +559,17 @@ func (ns *NodeServer) NodeUnpublishVolume(
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
|
||||
// at same time, an explicit locking at time of nodeunpublish is not required.
|
||||
targetPath := req.GetTargetPath()
|
||||
volID := req.GetVolumeId()
|
||||
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
|
||||
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)
|
||||
|
||||
return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(targetPath)
|
||||
|
||||
// stop the health-checker that may have been started in NodeGetVolumeStats()
|
||||
ns.healthChecker.StopChecker(req.GetVolumeId(), targetPath)
|
||||
ns.healthChecker.StopChecker(volID, targetPath)
|
||||
|
||||
isMnt, err := util.IsMountPoint(ns.Mounter, targetPath)
|
||||
if err != nil {
|
||||
@ -583,7 +592,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
|
||||
isMnt = true
|
||||
}
|
||||
if !isMnt {
|
||||
if err = os.RemoveAll(targetPath); err != nil {
|
||||
if err = os.Remove(targetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
|
@ -151,8 +151,8 @@ func validateMounter(m string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *VolumeOptions) DetectMounter(options map[string]string) error {
|
||||
return extractMounter(&v.Mounter, options)
|
||||
func (vo *VolumeOptions) DetectMounter(options map[string]string) error {
|
||||
return extractMounter(&vo.Mounter, options)
|
||||
}
|
||||
|
||||
func extractMounter(dest *string, options map[string]string) error {
|
||||
|
@ -37,7 +37,7 @@ type checker struct {
|
||||
// timeout contains the delay (interval + timeout)
|
||||
timeout time.Duration
|
||||
|
||||
// mutex protects against concurrent access to healty, err and
|
||||
// mutex protects against concurrent access to healthy, err and
|
||||
// lastUpdate
|
||||
mutex *sync.RWMutex
|
||||
|
||||
|
@ -709,8 +709,12 @@ func (ns *NodeServer) NodePublishVolume(
|
||||
volID := req.GetVolumeId()
|
||||
stagingPath += "/" + volID
|
||||
|
||||
// Considering kubelet make sure the stage and publish operations
|
||||
// are serialized, we dont need any extra locking in nodePublish
|
||||
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
|
||||
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)
|
||||
|
||||
return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(targetPath)
|
||||
|
||||
// Check if that target path exists properly
|
||||
notMnt, err := ns.createTargetMountPath(ctx, targetPath, isBlock)
|
||||
@ -913,8 +917,14 @@ func (ns *NodeServer) NodeUnpublishVolume(
|
||||
}
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
|
||||
// at same time, an explicit locking at time of nodeunpublish is not required.
|
||||
|
||||
if acquired := ns.VolumeLocks.TryAcquire(targetPath); !acquired {
|
||||
log.ErrorLog(ctx, util.TargetPathOperationAlreadyExistsFmt, targetPath)
|
||||
|
||||
return nil, status.Errorf(codes.Aborted, util.TargetPathOperationAlreadyExistsFmt, targetPath)
|
||||
}
|
||||
defer ns.VolumeLocks.Release(targetPath)
|
||||
|
||||
isMnt, err := ns.Mounter.IsMountPoint(targetPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
@ -927,7 +937,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
|
||||
return nil, status.Error(codes.NotFound, err.Error())
|
||||
}
|
||||
if !isMnt {
|
||||
if err = os.RemoveAll(targetPath); err != nil {
|
||||
if err = os.Remove(targetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
@ -938,7 +948,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = os.RemoveAll(targetPath); err != nil {
|
||||
if err = os.Remove(targetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ func Test_getCrushLocationMap(t *testing.T) {
|
||||
want: map[string]string{"zone": "zone1"},
|
||||
},
|
||||
{
|
||||
name: "multuple matching crushlocation and node labels",
|
||||
name: "multiple matching crushlocation and node labels",
|
||||
args: input{
|
||||
crushLocationLabels: "topology.io/zone,topology.io/rack",
|
||||
nodeLabels: map[string]string{
|
||||
|
@ -28,6 +28,9 @@ const (
|
||||
|
||||
// SnapshotOperationAlreadyExistsFmt string format to return for concurrent operation.
|
||||
SnapshotOperationAlreadyExistsFmt = "an operation with the given Snapshot ID %s already exists"
|
||||
|
||||
// TargetPathOperationAlreadyExistsFmt string format to return for concurrent operation on target path.
|
||||
TargetPathOperationAlreadyExistsFmt = "an operation with the given target path %s already exists"
|
||||
)
|
||||
|
||||
// VolumeLocks implements a map with atomic operations. It stores a set of all volume IDs
|
||||
|
Loading…
Reference in New Issue
Block a user