diff --git a/internal/csi-addons/rbd/reclaimspace.go b/internal/csi-addons/rbd/reclaimspace.go index 9fb124a21..1d3f4bbfa 100644 --- a/internal/csi-addons/rbd/reclaimspace.go +++ b/internal/csi-addons/rbd/reclaimspace.go @@ -37,12 +37,13 @@ import ( // of CSI-addons reclaimspace controller service spec. type ReclaimSpaceControllerServer struct { *rs.UnimplementedReclaimSpaceControllerServer + volumeLocks *util.VolumeLocks } // NewReclaimSpaceControllerServer creates a new ReclaimSpaceControllerServer which handles // the ReclaimSpace Service requests from the CSI-Addons specification. -func NewReclaimSpaceControllerServer() *ReclaimSpaceControllerServer { - return &ReclaimSpaceControllerServer{} +func NewReclaimSpaceControllerServer(volumeLocks *util.VolumeLocks) *ReclaimSpaceControllerServer { + return &ReclaimSpaceControllerServer{volumeLocks: volumeLocks} } func (rscs *ReclaimSpaceControllerServer) RegisterService(server grpc.ServiceRegistrar) { @@ -64,6 +65,13 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace( } defer cr.DeleteCredentials() + if acquired := rscs.volumeLocks.TryAcquire(volumeID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rscs.volumeLocks.Release(volumeID) + rbdVol, err := rbdutil.GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets()) if err != nil { return nil, status.Errorf(codes.Aborted, "failed to find volume with ID %q: %s", volumeID, err.Error()) @@ -90,12 +98,13 @@ func (rscs *ReclaimSpaceControllerServer) ControllerReclaimSpace( // of CSI-addons reclaimspace controller service spec. type ReclaimSpaceNodeServer struct { *rs.UnimplementedReclaimSpaceNodeServer + volumeLocks *util.VolumeLocks } // NewReclaimSpaceNodeServer creates a new IdentityServer which handles the // Identity Service requests from the CSI-Addons specification. -func NewReclaimSpaceNodeServer() *ReclaimSpaceNodeServer { - return &ReclaimSpaceNodeServer{} +func NewReclaimSpaceNodeServer(volumeLocks *util.VolumeLocks) *ReclaimSpaceNodeServer { + return &ReclaimSpaceNodeServer{volumeLocks: volumeLocks} } func (rsns *ReclaimSpaceNodeServer) RegisterService(server grpc.ServiceRegistrar) { @@ -116,6 +125,13 @@ func (rsns *ReclaimSpaceNodeServer) NodeReclaimSpace( return nil, status.Error(codes.InvalidArgument, "empty volume ID in request") } + if acquired := rsns.volumeLocks.TryAcquire(volumeID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, volumeID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID) + } + defer rsns.volumeLocks.Release(volumeID) + // path can either be the staging path on the node, or the volume path // inside an application container path := req.GetStagingTargetPath() diff --git a/internal/csi-addons/rbd/reclaimspace_test.go b/internal/csi-addons/rbd/reclaimspace_test.go index 6effbc0fa..d9649f22a 100644 --- a/internal/csi-addons/rbd/reclaimspace_test.go +++ b/internal/csi-addons/rbd/reclaimspace_test.go @@ -20,6 +20,8 @@ import ( "context" "testing" + "github.com/ceph/ceph-csi/internal/util" + rs "github.com/csi-addons/spec/lib/go/reclaimspace" "github.com/stretchr/testify/require" ) @@ -30,7 +32,7 @@ import ( func TestControllerReclaimSpace(t *testing.T) { t.Parallel() - controller := NewReclaimSpaceControllerServer() + controller := NewReclaimSpaceControllerServer(util.NewVolumeLocks()) req := &rs.ControllerReclaimSpaceRequest{ VolumeId: "", @@ -47,7 +49,7 @@ func TestControllerReclaimSpace(t *testing.T) { func TestNodeReclaimSpace(t *testing.T) { t.Parallel() - node := NewReclaimSpaceNodeServer() + node := NewReclaimSpaceNodeServer(&util.VolumeLocks{}) req := &rs.NodeReclaimSpaceRequest{ VolumeId: "", diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index fd0dda43a..859c626af 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -144,10 +144,8 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) - if err != nil { - log.FatalLogMsg("failed to start node server, err %v\n", err) - } + r.ns = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) + var attr string attr, err = rbd.GetKrbdSupportedFeatures() if err != nil && !errors.Is(err, os.ErrNotExist) { @@ -213,7 +211,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error { r.cas.RegisterService(is) if conf.IsControllerServer { - rs := casrbd.NewReclaimSpaceControllerServer() + rs := casrbd.NewReclaimSpaceControllerServer(r.cs.VolumeLocks) r.cas.RegisterService(rs) fcs := casrbd.NewFenceControllerServer() @@ -227,7 +225,7 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error { } if conf.IsNodeServer { - rs := casrbd.NewReclaimSpaceNodeServer() + rs := casrbd.NewReclaimSpaceNodeServer(r.ns.VolumeLocks) r.cas.RegisterService(rs) ekr := casrbd.NewEncryptionKeyRotationServer(r.ns.VolumeLocks)