rbd: delete group only if its primary

This is a work of Nikhil which need to be
applied on top of this PR to test the
feature.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2025-01-20 10:01:20 +01:00
parent 22966e8a1c
commit 2b96558842
4 changed files with 99 additions and 28 deletions

View File

@ -27,6 +27,7 @@ import (
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/csi-addons/spec/lib/go/replication"
"github.com/csi-addons/spec/lib/go/volumegroup"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -206,27 +207,41 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
log.DebugLog(ctx, "VolumeGroup %q has been found", req.GetVolumeGroupId())
// verify that the volume group is empty
volumes, err := vg.ListVolumes(ctx)
volumes, mirror, err := mgr.GetMirrorSource(ctx, req.GetVolumeGroupId(), &replication.ReplicationSource{
Type: &replication.ReplicationSource_Volumegroup{
Volumegroup: &replication.ReplicationSource_VolumeGroupSource{
VolumeGroupId: req.GetVolumeGroupId(),
},
},
})
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"could not list volumes for voluem group %q: %s",
req.GetVolumeGroupId(),
err.Error())
return nil, getGRPCError(err)
}
defer destoryVolumes(ctx, volumes)
log.DebugLog(ctx, "VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes))
vgrMirrorInfo, err := mirror.GetMirroringInfo(ctx)
if len(volumes) != 0 {
return nil, status.Errorf(
codes.FailedPrecondition,
"rejecting to delete non-empty volume group %q",
req.GetVolumeGroupId())
// verify that the volume group is empty, if the group is primary
if vgrMirrorInfo.IsPrimary() {
volumes, err := vg.ListVolumes(ctx)
if err != nil {
return nil, status.Errorf(
codes.NotFound,
"could not list volumes for volume group %q: %s",
req.GetVolumeGroupId(),
err.Error())
}
log.DebugLog(ctx, "VolumeGroup %q contains %d volumes", req.GetVolumeGroupId(), len(volumes))
if len(volumes) != 0 {
return nil, status.Errorf(
codes.FailedPrecondition,
"rejecting to delete non-empty volume group %q",
req.GetVolumeGroupId())
}
}
// delete the volume group
err = vg.Delete(ctx)
err = vg.Delete(ctx, vgrMirrorInfo, mirror)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to delete volume group %q: %s",
@ -336,10 +351,27 @@ func (vs *VolumeGroupServer) ModifyVolumeGroupMembership(
}
}
vol, mirror, err := mgr.GetMirrorSource(ctx, req.GetVolumeGroupId(), &replication.ReplicationSource{
Type: &replication.ReplicationSource_Volumegroup{
Volumegroup: &replication.ReplicationSource_VolumeGroupSource{
VolumeGroupId: req.GetVolumeGroupId(),
},
},
})
if err != nil {
return nil, getGRPCError(err)
}
defer destoryVolumes(ctx, vol)
vgrMirrorInfo, err := mirror.GetMirroringInfo(ctx)
// Skip removing images from group if the group is secondary
removeImageFromGroup := vgrMirrorInfo.IsPrimary()
// remove the volume that should not be part of the group
for _, id := range toRemove {
vol := beforeIDs[id]
err = vg.RemoveVolume(ctx, vol)
err = vg.RemoveVolume(ctx, vol, removeImageFromGroup)
if err != nil {
return nil, status.Errorf(
codes.Internal,

View File

@ -26,6 +26,8 @@ import (
librbd "github.com/ceph/go-ceph/rbd"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/csi-addons/spec/lib/go/volumegroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
@ -184,7 +186,7 @@ func (vg *volumeGroup) Create(ctx context.Context) error {
return nil
}
func (vg *volumeGroup) Delete(ctx context.Context) error {
func (vg *volumeGroup) Delete(ctx context.Context, vgrMirrorInfo types.MirrorInfo, mirror types.Mirror) error {
name, err := vg.GetName(ctx)
if err != nil {
return err
@ -195,6 +197,41 @@ func (vg *volumeGroup) Delete(ctx context.Context) error {
return err
}
// Cleanup only omap data if the following condition is met
// Mirroring is enabled on the group
// Local group is secondary
// Local group is in up+replaying state
log.DebugLog(ctx, "volume group %v is in %v state and is primary %v", vg, vgrMirrorInfo.GetState, vgrMirrorInfo.IsPrimary())
if vgrMirrorInfo != nil && vgrMirrorInfo.GetState() == librbd.MirrorGroupEnabled.String() && !vgrMirrorInfo.IsPrimary() {
// If the group is in a secondary state and its up+replaying means its
// an healthy secondary and the group is primary somewhere in the
// remote cluster and the local group is getting replayed. Delete the
// OMAP data generated as we cannot delete the secondary group. When
// the group on the primary cluster gets deleted/mirroring disabled,
// the group on all the remote (secondary) clusters will get
// auto-deleted. This helps in garbage collecting the OMAP, VR, VGR,
// VGRC, PVC and PV objects after failback operation.
if mirror != nil {
sts, rErr := mirror.GetGlobalMirroringStatus(ctx)
if rErr != nil {
return status.Error(codes.Internal, rErr.Error())
}
localStatus, rErr := sts.GetLocalSiteStatus()
if rErr != nil {
log.ErrorLog(ctx, "failed to get local status for volume group%s: %w", name, rErr)
return status.Error(codes.Internal, rErr.Error())
}
log.DebugLog(ctx, "local status is %v and local state is %v", localStatus.IsUP(), localStatus.GetState())
if localStatus.IsUP() && localStatus.GetState() == librbd.MirrorGroupStatusStateReplaying.String() {
return vg.commonVolumeGroup.Delete(ctx)
}
log.ErrorLog(ctx,
"secondary group status is up=%t and state=%s",
localStatus.IsUP(),
localStatus.GetState())
}
}
err = librbd.GroupRemove(ioctx, name)
if err != nil && !errors.Is(err, rados.ErrNotFound) {
return fmt.Errorf("failed to remove volume group %q: %w", vg, err)
@ -252,21 +289,23 @@ func (vg *volumeGroup) AddVolume(ctx context.Context, vol types.Volume) error {
return nil
}
func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error {
func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume, removeImageFromGroup bool) error {
// volume was already removed from the group
if len(vg.volumes) == 0 {
return nil
}
err := vol.RemoveFromGroup(ctx, vg)
if err != nil {
if errors.Is(err, librbd.ErrNotExist) {
return nil
if removeImageFromGroup {
log.DebugLog(ctx, "removing image %v from group %v", vol, vg)
err := vol.RemoveFromGroup(ctx, vg)
if err != nil {
if errors.Is(err, librbd.ErrNotExist) {
return nil
}
return fmt.Errorf("failed to remove volume %q from volume group %q: %w", vol, vg, err)
}
return fmt.Errorf("failed to remove volume %q from volume group %q: %w", vol, vg, err)
}
// toRemove contain the ID of the volume that is removed from the group
toRemove, err := vol.GetID(ctx)
if err != nil {

View File

@ -72,7 +72,7 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(
for _, volume := range volumes {
if vg != nil {
// 'normal' cleanup, remove all images from the group
vgErr := vg.RemoveVolume(ctx, volume)
vgErr := vg.RemoveVolume(ctx, volume, true)
if vgErr != nil {
log.ErrorLog(
ctx,
@ -91,7 +91,7 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot(
// the VG should always be deleted, volumes can only belong to a single VG
log.DebugLog(ctx, "removing temporary volume group %q", vg)
vgErr := vg.Delete(ctx)
vgErr := vg.Delete(ctx, nil, nil)
if vgErr != nil {
log.ErrorLog(ctx, "failed to remove temporary volume group %q: %v", vg, vgErr)
}

View File

@ -58,13 +58,13 @@ type VolumeGroup interface {
Create(ctx context.Context) error
// Delete removes the VolumeGroup from the backend storage.
Delete(ctx context.Context) error
Delete(ctx context.Context, vgMirrorInfo MirrorInfo, mirror Mirror) error
// AddVolume adds the Volume to the VolumeGroup.
AddVolume(ctx context.Context, volume Volume) error
// RemoveVolume removes the Volume from the VolumeGroup.
RemoveVolume(ctx context.Context, volume Volume) error
RemoveVolume(ctx context.Context, volume Volume, removeImageFromGroup bool) error
// ListVolumes returns a slice with all Volumes in the VolumeGroup.
ListVolumes(ctx context.Context) ([]Volume, error)