diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index 9eb46daa9..1e5e19417 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -76,15 +76,26 @@ func (vs *VolumeGroupServer) RegisterService(server grpc.ServiceRegistrar) { // // Implementation steps: // 1. resolve all volumes given in the volume_ids list (can be empty) -// 2. create the Volume Group -// 3. add all volumes to the Volume Group +// 2. check if the volumes belong to a (and all the same) group +// 3. create the Volume Group +// 4. verify that the Volume Group contains all the images (if it pre-exists) +// 5. add all volumes to the Volume Group // // Idempotency should be handled by the rbd.Manager, keeping this function and // the potential error handling as simple as possible. +// +//nolint:gocyclo,cyclop // FIXME: make this function simpler func (vs *VolumeGroupServer) CreateVolumeGroup( ctx context.Context, req *volumegroup.CreateVolumeGroupRequest, ) (*volumegroup.CreateVolumeGroupResponse, error) { + var ( + err error + vgHandle string + vg types.VolumeGroup + groupName = req.GetName() + ) + mgr := rbd.NewManager(vs.driverInstance, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) @@ -102,62 +113,137 @@ func (vs *VolumeGroupServer) CreateVolumeGroup( codes.InvalidArgument, "failed to find required volume %q for volume group %q: %s", id, - req.GetName(), + groupName, err.Error()) } volumes = append(volumes, vol) - } - log.DebugLog(ctx, "all %d Volumes for VolumeGroup %q have been found", len(volumes), req.GetName()) + // only resolve vgHandle the 1st time + if i != 0 { + continue + } - // create a RBDVolumeGroup - vg, err := mgr.CreateVolumeGroup(ctx, req.GetName()) - if err != nil { - return nil, status.Errorf( - codes.Internal, - "failed to create volume group %q: %s", - req.GetName(), - err.Error()) - } - - log.DebugLog(ctx, "VolumeGroup %q has been created: %+v", req.GetName(), vg) - - // extract the flatten mode - flattenMode, err := getFlattenMode(ctx, req.GetParameters()) - if err != nil { - return nil, err - } - // Flatten the image if the flatten mode is set to FlattenModeForce - // before adding it to the volume group. - for _, v := range volumes { - err = v.HandleParentImageExistence(ctx, flattenMode) - if err != nil { - err = fmt.Errorf("failed to handle parent image for volume group %q: %w", vg, err) - - return nil, getGRPCError(err) + // reuse the existing group, it contains the volumes already + // check for pre-existing volume group name + vgHandle, err = vol.GetVolumeGroupID(ctx, mgr) + if err != nil && !errors.Is(err, rbderrors.ErrGroupNotFound) { + return nil, status.Errorf( + codes.Internal, + "could not get name of group for volume %q: %v", + volumes[0], + err) } } - // add each rbd-image to the RBDVolumeGroup - for _, vol := range volumes { - err = vg.AddVolume(ctx, vol) + + log.DebugLog(ctx, "all %d Volumes for VolumeGroup %q have been found", len(volumes), groupName) + + // verify that the volumes are not in a group yet + // if one volume is in a group, all other volumes need to be in the same group + if groupMatches, err := mgr.VolumesInSameGroup(ctx, volumes); !groupMatches || err != nil { + return nil, status.Errorf( + codes.Internal, + "not all volumes belong to the same group: %v", + err) + } + + if vgHandle == "" { + // create a RBDVolumeGroup + vg, err = mgr.CreateVolumeGroup(ctx, groupName) if err != nil { return nil, status.Errorf( codes.Internal, - "failed to add volume %q to volume group %q: %s", - vol, - req.GetName(), + "failed to create volume group %q: %s", + groupName, err.Error()) } + + log.DebugLog(ctx, "volume group %q has been created: %+v", groupName, vg) + } else { + vg, err = mgr.GetVolumeGroupByID(ctx, vgHandle) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to get volume group with id %q: %s", + vgHandle, + err.Error()) + } + + groupName, err = vg.GetName(ctx) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to get name of volume group %q: %s", + vg, + err.Error()) + } + + log.DebugLog(ctx, "existing volume group %q has been resolved: %+v", groupName, vg) } - log.DebugLog(ctx, "all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName()) + // TODO: check the number of volumes in the vg, it needs to be empty, or match len(volumes) + matches, err := mgr.CompareVolumesInGroup(ctx, volumes, vg) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to compare %d volumes with volume group %q: %s", + len(volumes), + groupName, + err.Error()) + } else if !matches { + return nil, status.Errorf( + codes.Internal, + "volume group %q does not match with requested volumes", + groupName) + } + + vols, err := vg.ListVolumes(ctx) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to list volumes of volume group %q: %s", + vg, + err.Error()) + } else if len(vols) == 0 { + // need to add the volumes to the group + + // extract the flatten mode + var flattenMode types.FlattenMode + flattenMode, err = getFlattenMode(ctx, req.GetParameters()) + if err != nil { + return nil, err + } + // Flatten the image if the flatten mode is set to FlattenModeForce + // before adding it to the volume group. + for _, v := range volumes { + err = v.HandleParentImageExistence(ctx, flattenMode) + if err != nil { + err = fmt.Errorf("failed to handle parent image for volume group %q: %w", vg, err) + + return nil, getGRPCError(err) + } + } + // add each rbd-image to the RBDVolumeGroup + for _, vol := range volumes { + err = vg.AddVolume(ctx, vol) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to add volume %q to volume group %q: %s", + vol, + groupName, + err.Error()) + } + } + + log.DebugLog(ctx, "all %d Volumes have been added to for volume group %q", len(volumes), vg) + } csiVG, err := vg.ToCSI(ctx) if err != nil { return nil, status.Errorf( codes.Internal, "failed to convert volume group %q to CSI type: %s", - req.GetName(), + groupName, err.Error()) } diff --git a/internal/rbd/group_controllerserver.go b/internal/rbd/group_controllerserver.go index 2dc171b00..0f28938cc 100644 --- a/internal/rbd/group_controllerserver.go +++ b/internal/rbd/group_controllerserver.go @@ -40,7 +40,7 @@ import ( // group and its snapshot around, the group snapshot can be inspected to list // the snapshots of the images. // -//nolint:gocyclo,cyclop // TODO: reduce complexity. +//nolint:gocyclo,cyclop,gocognit // TODO: reduce complexity. func (cs *ControllerServer) CreateVolumeGroupSnapshot( ctx context.Context, req *csi.CreateVolumeGroupSnapshotRequest, @@ -53,6 +53,8 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot( // the VG and VGS should not have the same name vgName = req.GetName() + "-vg" // stable temporary name vgsName = req.GetName() + + vgNeedsDeletion = true ) // Existence and conflict checks @@ -70,7 +72,7 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot( volumes := make([]types.Volume, len(req.GetSourceVolumeIds())) defer func() { for _, volume := range volumes { - if vg != nil { + if vg != nil && vgNeedsDeletion { // 'normal' cleanup, remove all images from the group vgErr := vg.RemoveVolume(ctx, volume) if vgErr != nil { @@ -87,7 +89,7 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot( } } - if vg != nil { + if vg != nil && vgNeedsDeletion { // the VG should always be deleted, volumes can only belong to a single VG log.DebugLog(ctx, "removing temporary volume group %q", vg) @@ -116,6 +118,51 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot( log.DebugLog(ctx, "all %d Volumes for VolumeGroup %q have been found", len(volumes), vgsName) + // all volumes should belong to the same group, or to no group at all + groupMatches, err := mgr.VolumesInSameGroup(ctx, volumes) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to compare %d volumes: %s", + len(volumes), + err.Error()) + } else if !groupMatches { + return nil, status.Error( + codes.Internal, + "not all volumes belong to the same group") + } + + // check for pre-existing volume group name + vgHandle, err := volumes[0].GetVolumeGroupID(ctx, mgr) + if err != nil && !errors.Is(err, rbderrors.ErrGroupNotFound) { + return nil, status.Errorf( + codes.Internal, + "could not get volume group for volume %q: %v", + volumes[0], + err) + } else if vgHandle != "" { + vg, err = mgr.GetVolumeGroupByID(ctx, vgHandle) + // vg.Destroy(ctx) is called in a defer further up ^ + if err != nil { + return nil, status.Errorf( + codes.InvalidArgument, + "failed to resolve volume group with id %q: %s", + vgHandle, + err.Error()) + } + + vgName, err = vg.GetName(ctx) + if err != nil { + return nil, status.Errorf( + codes.InvalidArgument, + "failed to get name of volume group %q: %s", + vg, + err.Error()) + } + + vgNeedsDeletion = false + } + groupSnapshot, err = mgr.GetVolumeGroupSnapshotByName(ctx, vgsName) if groupSnapshot != nil { defer groupSnapshot.Destroy(ctx) @@ -155,24 +202,35 @@ func (cs *ControllerServer) CreateVolumeGroupSnapshot( errList) } - // create a temporary VolumeGroup with a different name - vg, err = mgr.CreateVolumeGroup(ctx, vgName) + if vg == nil { + // create a temporary VolumeGroup with a different name + vg, err = mgr.CreateVolumeGroup(ctx, vgName) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to create volume group %q: %s", + vgName, + err.Error()) + } + // vg.Destroy(ctx) is called in a defer further up ^ + + log.DebugLog(ctx, "VolumeGroup %q has been created: %+v", vgName, vg) + } + + vols, err := vg.ListVolumes(ctx) if err != nil { return nil, status.Errorf( codes.Internal, - "failed to create volume group %q: %s", - vgName, + "failed to list volumes of volume group %q: %s", + vg, err.Error()) - } - // vg.Destroy(ctx) is called in a defer further up ^ - - log.DebugLog(ctx, "VolumeGroup %q has been created: %+v", vgName, vg) - - // add images to the group - for _, volume := range volumes { - err = vg.AddVolume(ctx, volume) - if err != nil { - return nil, status.Error(codes.Aborted, err.Error()) + } else if len(vols) == 0 { + // add images to the group + for _, volume := range volumes { + err = vg.AddVolume(ctx, volume) + if err != nil { + return nil, status.Error(codes.Aborted, err.Error()) + } } }