rbd: use the existing VolumeGroup if contents are matching

When a VolumeGroup has been created through the CSI-Addons API, the
VolumeGroupSnapshot CSI API will now use the existing VolumeGroup. There
are checks in place to validate that the Volumes in the VolumeGroup
match the Volumes in the VolumeGroupSnapshot request.

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos
2025-03-20 18:27:10 +01:00
committed by mergify[bot]
parent e489413dbd
commit 63df17171a
2 changed files with 199 additions and 55 deletions

View File

@ -76,15 +76,26 @@ func (vs *VolumeGroupServer) RegisterService(server grpc.ServiceRegistrar) {
//
// Implementation steps:
// 1. resolve all volumes given in the volume_ids list (can be empty)
// 2. create the Volume Group
// 3. add all volumes to the Volume Group
// 2. check if the volumes belong to a (and all the same) group
// 3. create the Volume Group
// 4. verify that the Volume Group contains all the images (if it pre-exists)
// 5. add all volumes to the Volume Group
//
// Idempotency should be handled by the rbd.Manager, keeping this function and
// the potential error handling as simple as possible.
//
//nolint:gocyclo,cyclop // FIXME: make this function simpler
func (vs *VolumeGroupServer) CreateVolumeGroup(
ctx context.Context,
req *volumegroup.CreateVolumeGroupRequest,
) (*volumegroup.CreateVolumeGroupResponse, error) {
var (
err error
vgHandle string
vg types.VolumeGroup
groupName = req.GetName()
)
mgr := rbd.NewManager(vs.driverInstance, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
@ -102,62 +113,137 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
codes.InvalidArgument,
"failed to find required volume %q for volume group %q: %s",
id,
req.GetName(),
groupName,
err.Error())
}
volumes = append(volumes, vol)
}
log.DebugLog(ctx, "all %d Volumes for VolumeGroup %q have been found", len(volumes), req.GetName())
// only resolve vgHandle the 1st time
if i != 0 {
continue
}
// create a RBDVolumeGroup
vg, err := mgr.CreateVolumeGroup(ctx, req.GetName())
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to create volume group %q: %s",
req.GetName(),
err.Error())
}
log.DebugLog(ctx, "VolumeGroup %q has been created: %+v", req.GetName(), vg)
// extract the flatten mode
flattenMode, err := getFlattenMode(ctx, req.GetParameters())
if err != nil {
return nil, err
}
// Flatten the image if the flatten mode is set to FlattenModeForce
// before adding it to the volume group.
for _, v := range volumes {
err = v.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
err = fmt.Errorf("failed to handle parent image for volume group %q: %w", vg, err)
return nil, getGRPCError(err)
// reuse the existing group, it contains the volumes already
// check for pre-existing volume group name
vgHandle, err = vol.GetVolumeGroupID(ctx, mgr)
if err != nil && !errors.Is(err, rbderrors.ErrGroupNotFound) {
return nil, status.Errorf(
codes.Internal,
"could not get name of group for volume %q: %v",
volumes[0],
err)
}
}
// add each rbd-image to the RBDVolumeGroup
for _, vol := range volumes {
err = vg.AddVolume(ctx, vol)
log.DebugLog(ctx, "all %d Volumes for VolumeGroup %q have been found", len(volumes), groupName)
// verify that the volumes are not in a group yet
// if one volume is in a group, all other volumes need to be in the same group
if groupMatches, err := mgr.VolumesInSameGroup(ctx, volumes); !groupMatches || err != nil {
return nil, status.Errorf(
codes.Internal,
"not all volumes belong to the same group: %v",
err)
}
if vgHandle == "" {
// create a RBDVolumeGroup
vg, err = mgr.CreateVolumeGroup(ctx, groupName)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to add volume %q to volume group %q: %s",
vol,
req.GetName(),
"failed to create volume group %q: %s",
groupName,
err.Error())
}
log.DebugLog(ctx, "volume group %q has been created: %+v", groupName, vg)
} else {
vg, err = mgr.GetVolumeGroupByID(ctx, vgHandle)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to get volume group with id %q: %s",
vgHandle,
err.Error())
}
groupName, err = vg.GetName(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to get name of volume group %q: %s",
vg,
err.Error())
}
log.DebugLog(ctx, "existing volume group %q has been resolved: %+v", groupName, vg)
}
log.DebugLog(ctx, "all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName())
// TODO: check the number of volumes in the vg, it needs to be empty, or match len(volumes)
matches, err := mgr.CompareVolumesInGroup(ctx, volumes, vg)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to compare %d volumes with volume group %q: %s",
len(volumes),
groupName,
err.Error())
} else if !matches {
return nil, status.Errorf(
codes.Internal,
"volume group %q does not match with requested volumes",
groupName)
}
vols, err := vg.ListVolumes(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to list volumes of volume group %q: %s",
vg,
err.Error())
} else if len(vols) == 0 {
// need to add the volumes to the group
// extract the flatten mode
var flattenMode types.FlattenMode
flattenMode, err = getFlattenMode(ctx, req.GetParameters())
if err != nil {
return nil, err
}
// Flatten the image if the flatten mode is set to FlattenModeForce
// before adding it to the volume group.
for _, v := range volumes {
err = v.HandleParentImageExistence(ctx, flattenMode)
if err != nil {
err = fmt.Errorf("failed to handle parent image for volume group %q: %w", vg, err)
return nil, getGRPCError(err)
}
}
// add each rbd-image to the RBDVolumeGroup
for _, vol := range volumes {
err = vg.AddVolume(ctx, vol)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to add volume %q to volume group %q: %s",
vol,
groupName,
err.Error())
}
}
log.DebugLog(ctx, "all %d Volumes have been added to for volume group %q", len(volumes), vg)
}
csiVG, err := vg.ToCSI(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to convert volume group %q to CSI type: %s",
req.GetName(),
groupName,
err.Error())
}