mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-12-11 15:40:24 +00:00
8e57efad98
The "slices" package has been introduced in Go 1.21 and can be used instead of the Kubernetes package that will be replaced by the standard package at one point too. Signed-off-by: Niels de Vos <ndevos@ibm.com>
780 lines
24 KiB
Go
780 lines
24 KiB
Go
/*
|
|
Copyright 2024 The Ceph-CSI Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package cephfs
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"slices"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/ceph/ceph-csi/internal/cephfs/core"
|
|
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
|
|
"github.com/ceph/ceph-csi/internal/cephfs/store"
|
|
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
|
|
"github.com/ceph/ceph-csi/internal/util"
|
|
"github.com/ceph/ceph-csi/internal/util/log"
|
|
|
|
"github.com/ceph/go-ceph/cephfs/admin"
|
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
|
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
// validateCreateVolumeGroupSnapshotRequest validates the request for creating
|
|
// a group snapshot of volumes.
|
|
func (cs *ControllerServer) validateCreateVolumeGroupSnapshotRequest(
|
|
ctx context.Context,
|
|
req *csi.CreateVolumeGroupSnapshotRequest,
|
|
) error {
|
|
if err := cs.Driver.ValidateGroupControllerServiceRequest(
|
|
csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil {
|
|
log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req))
|
|
|
|
return err
|
|
}
|
|
|
|
// Check sanity of request volume group snapshot Name, Source Volume Id's
|
|
if req.GetName() == "" {
|
|
return status.Error(codes.InvalidArgument, "volume group snapshot Name cannot be empty")
|
|
}
|
|
|
|
if len(req.GetSourceVolumeIds()) == 0 {
|
|
return status.Error(codes.InvalidArgument, "source volume ids cannot be empty")
|
|
}
|
|
|
|
param := req.GetParameters()
|
|
// check for ClusterID and fsName
|
|
if value, ok := param["clusterID"]; !ok || value == "" {
|
|
return status.Error(codes.InvalidArgument, "missing or empty clusterID")
|
|
}
|
|
|
|
if value, ok := param["fsName"]; !ok || value == "" {
|
|
return status.Error(codes.InvalidArgument, "missing or empty fsName")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CreateVolumeGroupSnapshot creates a group snapshot of volumes.
|
|
func (cs *ControllerServer) CreateVolumeGroupSnapshot(
|
|
ctx context.Context,
|
|
req *csi.CreateVolumeGroupSnapshotRequest) (
|
|
*csi.CreateVolumeGroupSnapshotResponse,
|
|
error,
|
|
) {
|
|
if err := cs.validateCreateVolumeGroupSnapshotRequest(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
requestName := req.GetName()
|
|
// Existence and conflict checks
|
|
if acquired := cs.VolumeGroupLocks.TryAcquire(requestName); !acquired {
|
|
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, requestName)
|
|
|
|
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, requestName)
|
|
}
|
|
defer cs.VolumeGroupLocks.Release(requestName)
|
|
|
|
cr, err := util.NewAdminCredentials(req.GetSecrets())
|
|
if err != nil {
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
defer cr.DeleteCredentials()
|
|
|
|
vg, err := store.NewVolumeGroupOptions(ctx, req, cr)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to get volume group options: %v", err)
|
|
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
defer vg.Destroy()
|
|
|
|
vgs, err := store.CheckVolumeGroupSnapExists(ctx, vg, cr)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to check volume group snapshot exists: %v", err)
|
|
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
|
|
// Get the fs names and subvolume from the volume ids to execute quiesce commands.
|
|
fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), req.GetSourceVolumeIds(), cr)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to get fs names and subvolume from volume ids: %v", err)
|
|
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
defer destroyFSConnections(fsMap)
|
|
|
|
needRelease := checkIfFSNeedQuiesceRelease(vgs, req.GetSourceVolumeIds())
|
|
if needRelease {
|
|
return cs.releaseQuiesceAndGetVolumeGroupSnapshotResponse(ctx, req, vgs, fsMap, vg, cr)
|
|
}
|
|
|
|
// If the volume group snapshot does not exist, reserve the volume group
|
|
if vgs == nil {
|
|
vgs, err = store.ReserveVolumeGroup(ctx, vg, cr)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to reserve volume group: %v", err)
|
|
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
}
|
|
|
|
inProgress, err := cs.queisceFileSystems(ctx, vgs, fsMap)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to quiesce filesystems: %v", err)
|
|
if !errors.Is(err, cerrors.ErrQuiesceInProgress) {
|
|
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
|
|
if uErr != nil {
|
|
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
|
|
}
|
|
}
|
|
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
|
|
if inProgress {
|
|
return nil, status.Error(codes.Internal, "Quiesce operation is in progress")
|
|
}
|
|
|
|
resp, err := cs.createSnapshotAddToVolumeGroupJournal(ctx, req, vg, vgs, cr, fsMap)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to create snapshot and add to volume group journal: %v", err)
|
|
|
|
if !errors.Is(err, cerrors.ErrQuiesceInProgress) {
|
|
// Handle Undo reservation and timeout as well
|
|
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
|
|
if uErr != nil {
|
|
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
|
|
}
|
|
}
|
|
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
|
|
response := &csi.CreateVolumeGroupSnapshotResponse{}
|
|
response.GroupSnapshot = &csi.VolumeGroupSnapshot{
|
|
GroupSnapshotId: vgs.VolumeGroupSnapshotID,
|
|
ReadyToUse: true,
|
|
CreationTime: timestamppb.New(time.Now()),
|
|
}
|
|
|
|
for _, r := range *resp {
|
|
r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID
|
|
response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot)
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// queisceFileSystems quiesces the subvolumes and subvolume groups present in
|
|
// the filesystems of the volumeID's present in the
|
|
// CreateVolumeGroupSnapshotRequest.
|
|
func (cs *ControllerServer) queisceFileSystems(ctx context.Context,
|
|
vgs *store.VolumeGroupSnapshotIdentifier,
|
|
fsMap map[string]core.FSQuiesceClient,
|
|
) (bool, error) {
|
|
var inProgress bool
|
|
for _, fm := range fsMap {
|
|
// Quiesce the fs, subvolumes and subvolume groups
|
|
data, err := fm.FSQuiesce(ctx, vgs.RequestName)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to quiesce filesystem: %v", err)
|
|
|
|
return inProgress, err
|
|
}
|
|
state := core.GetQuiesceState(data.State)
|
|
if state == core.Quiescing {
|
|
inProgress = true
|
|
} else if state != core.Quiesced {
|
|
return inProgress, fmt.Errorf("quiesce operation is in %s state", state)
|
|
}
|
|
}
|
|
|
|
return inProgress, nil
|
|
}
|
|
|
|
// releaseQuiesceAndGetVolumeGroupSnapshotResponse releases the quiesce of the
|
|
// subvolumes and subvolume groups in the filesystems for the volumeID's
|
|
// present in the CreateVolumeGroupSnapshotRequest.
|
|
func (cs *ControllerServer) releaseQuiesceAndGetVolumeGroupSnapshotResponse(
|
|
ctx context.Context,
|
|
req *csi.CreateVolumeGroupSnapshotRequest,
|
|
vgs *store.VolumeGroupSnapshotIdentifier,
|
|
fsMap map[string]core.FSQuiesceClient,
|
|
vg *store.VolumeGroupOptions,
|
|
cr *util.Credentials,
|
|
) (*csi.CreateVolumeGroupSnapshotResponse, error) {
|
|
matchesSourceVolumeIDs := matchesSourceVolumeIDs(vgs.GetVolumeIDs(), req.GetSourceVolumeIds())
|
|
if !matchesSourceVolumeIDs {
|
|
return nil, status.Errorf(
|
|
codes.InvalidArgument,
|
|
"source volume ids %v do not match in the existing volume group snapshot %v",
|
|
req.GetSourceVolumeIds(),
|
|
vgs.GetVolumeIDs())
|
|
}
|
|
// Release the quiesce of the subvolumes and subvolume groups in the
|
|
// filesystems for the volumes.
|
|
for _, fm := range fsMap {
|
|
// UnFreeze the filesystems, subvolumes and subvolume groups
|
|
data, err := fm.ReleaseFSQuiesce(ctx, vg.RequestName)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err)
|
|
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
|
|
if uErr != nil {
|
|
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
|
|
}
|
|
|
|
return nil, status.Errorf(codes.Internal, "failed to release filesystem quiesce: %v", err)
|
|
}
|
|
state := core.GetQuiesceState(data.State)
|
|
if state != core.Released {
|
|
return nil, status.Errorf(codes.Internal, "quiesce operation is in %s state", state)
|
|
}
|
|
}
|
|
var err error
|
|
defer func() {
|
|
if err != nil && !errors.Is(err, cerrors.ErrQuiesceInProgress) {
|
|
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
|
|
if uErr != nil {
|
|
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
|
|
}
|
|
}
|
|
}()
|
|
snapshotResponses := make([]csi.CreateSnapshotResponse, 0)
|
|
for _, volID := range req.GetSourceVolumeIds() {
|
|
// Create the snapshot for the volumeID
|
|
clusterID := getClusterIDForVolumeID(fsMap, volID)
|
|
if clusterID == "" {
|
|
return nil, status.Errorf(codes.Internal, "failed to get clusterID for volumeID %s", volID)
|
|
}
|
|
|
|
req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName,
|
|
clusterID,
|
|
req.GetSecrets())
|
|
var resp *csi.CreateSnapshotResponse
|
|
resp, err = cs.createSnapshotAndAddMapping(ctx, req, vg, vgs, cr)
|
|
if err != nil {
|
|
// Handle cleanup
|
|
log.ErrorLog(ctx, "failed to create snapshot: %v", err)
|
|
|
|
return nil, status.Errorf(codes.Internal,
|
|
"failed to create snapshot and add to volume group journal: %v",
|
|
err)
|
|
}
|
|
snapshotResponses = append(snapshotResponses, *resp)
|
|
}
|
|
|
|
response := &csi.CreateVolumeGroupSnapshotResponse{}
|
|
response.GroupSnapshot = &csi.VolumeGroupSnapshot{
|
|
GroupSnapshotId: vgs.VolumeGroupSnapshotID,
|
|
ReadyToUse: true,
|
|
CreationTime: timestamppb.New(time.Now()),
|
|
}
|
|
|
|
for _, r := range snapshotResponses {
|
|
r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID
|
|
response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot)
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// createSnapshotAddToVolumeGroupJournal creates the snapshot and adds the
|
|
// snapshotID and volumeID to the volume group journal omap. If the freeze is
|
|
// true then it will freeze the subvolumes and subvolume groups before creating
|
|
// the snapshot and unfreeze them after creating the snapshot. If the freeze is
|
|
// false it will call createSnapshot and get the snapshot details for the
|
|
// volume and add the snapshotID and volumeID to the volume group journal omap.
|
|
// If any error occurs other than ErrInProgress it will delete the snapshots
|
|
// and undo the reservation and return the error.
|
|
func (cs *ControllerServer) createSnapshotAddToVolumeGroupJournal(
|
|
ctx context.Context,
|
|
req *csi.CreateVolumeGroupSnapshotRequest,
|
|
vgo *store.VolumeGroupOptions,
|
|
vgs *store.VolumeGroupSnapshotIdentifier,
|
|
cr *util.Credentials,
|
|
fsMap map[string]core.FSQuiesceClient) (
|
|
*[]csi.CreateSnapshotResponse,
|
|
error,
|
|
) {
|
|
var err error
|
|
var resp *csi.CreateSnapshotResponse
|
|
|
|
responses := make([]csi.CreateSnapshotResponse, 0)
|
|
for _, volID := range req.GetSourceVolumeIds() {
|
|
err = fsQuiesceWithExpireTimeout(ctx, vgo.RequestName, fsMap)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// Create the snapshot for the volumeID
|
|
clusterID := getClusterIDForVolumeID(fsMap, volID)
|
|
if clusterID == "" {
|
|
return nil, fmt.Errorf("failed to get clusterID for volumeID %s", volID)
|
|
}
|
|
|
|
req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName,
|
|
clusterID,
|
|
req.GetSecrets())
|
|
resp, err = cs.createSnapshotAndAddMapping(ctx, req, vgo, vgs, cr)
|
|
if err != nil {
|
|
// Handle cleanup
|
|
log.ErrorLog(ctx, "failed to create snapshot: %v", err)
|
|
|
|
return nil, err
|
|
}
|
|
responses = append(responses, *resp)
|
|
}
|
|
|
|
err = releaseFSQuiesce(ctx, vgo.RequestName, fsMap)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
return &responses, nil
|
|
}
|
|
|
|
func formatCreateSnapshotRequest(volID, groupSnapshotName,
|
|
clusterID string,
|
|
secret map[string]string,
|
|
) *csi.CreateSnapshotRequest {
|
|
return &csi.CreateSnapshotRequest{
|
|
SourceVolumeId: volID,
|
|
Name: groupSnapshotName + "-" + volID,
|
|
Secrets: secret,
|
|
Parameters: map[string]string{
|
|
"clusterID": clusterID,
|
|
},
|
|
}
|
|
}
|
|
|
|
// releaseSubvolumeQuiesce releases the quiesce of the subvolumes and subvolume
|
|
// groups in the filesystems for the volumeID's present in the
|
|
// CreateVolumeGroupSnapshotRequest.
|
|
func releaseFSQuiesce(ctx context.Context,
|
|
requestName string,
|
|
fsMap map[string]core.FSQuiesceClient,
|
|
) error {
|
|
inProgress := false
|
|
var err error
|
|
var data *admin.QuiesceInfo
|
|
for _, fm := range fsMap {
|
|
// UnFreeze the filesystems, subvolumes and subvolume groups
|
|
data, err = fm.ReleaseFSQuiesce(ctx, requestName)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err)
|
|
|
|
return err
|
|
}
|
|
state := core.GetQuiesceState(data.State)
|
|
if state != core.Released {
|
|
inProgress = true
|
|
}
|
|
}
|
|
|
|
if inProgress {
|
|
return cerrors.ErrQuiesceInProgress
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// fsQuiesceWithExpireTimeout quiesces the subvolumes and subvolume
|
|
// groups in the filesystems for the volumeID's present in the
|
|
// CreateVolumeGroupSnapshotRequest.
|
|
func fsQuiesceWithExpireTimeout(ctx context.Context,
|
|
requestName string,
|
|
fsMap map[string]core.FSQuiesceClient,
|
|
) error {
|
|
var err error
|
|
|
|
var data *admin.QuiesceInfo
|
|
inProgress := false
|
|
for _, fm := range fsMap {
|
|
// reinitialize the expiry timer for the quiesce
|
|
data, err = fm.FSQuiesceWithExpireTimeout(ctx, requestName)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err)
|
|
|
|
return err
|
|
}
|
|
state := core.GetQuiesceState(data.State)
|
|
if state == core.Quiescing {
|
|
inProgress = true
|
|
} else if state != core.Quiesced {
|
|
return fmt.Errorf("quiesce operation is in %s state", state)
|
|
}
|
|
}
|
|
|
|
if inProgress {
|
|
return cerrors.ErrQuiesceInProgress
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// createSnapshotAndAddMapping creates the snapshot and adds the snapshotID and
|
|
// volumeID to the volume group journal omap. If any error occurs it will
|
|
// delete the last created snapshot as its still not added to the journal.
|
|
func (cs *ControllerServer) createSnapshotAndAddMapping(
|
|
ctx context.Context,
|
|
req *csi.CreateSnapshotRequest,
|
|
vgo *store.VolumeGroupOptions,
|
|
vgs *store.VolumeGroupSnapshotIdentifier,
|
|
cr *util.Credentials,
|
|
) (*csi.CreateSnapshotResponse, error) {
|
|
// Create the snapshot
|
|
resp, err := cs.CreateSnapshot(ctx, req)
|
|
if err != nil {
|
|
// Handle cleanup
|
|
log.ErrorLog(ctx, "failed to create snapshot: %v", err)
|
|
|
|
return nil, err
|
|
}
|
|
j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer j.Destroy()
|
|
// Add the snapshot to the volume group journal
|
|
err = j.AddVolumeSnapshotMapping(ctx,
|
|
vgo.MetadataPool,
|
|
vgs.ReservedID,
|
|
req.GetSourceVolumeId(),
|
|
resp.GetSnapshot().GetSnapshotId())
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to add volume snapshot mapping: %v", err)
|
|
// Delete the last created snapshot as its still not added to the
|
|
// journal
|
|
delReq := &csi.DeleteSnapshotRequest{
|
|
SnapshotId: resp.GetSnapshot().GetSnapshotId(),
|
|
Secrets: req.GetSecrets(),
|
|
}
|
|
_, dErr := cs.DeleteSnapshot(ctx, delReq)
|
|
if dErr != nil {
|
|
log.ErrorLog(ctx, "failed to delete snapshot %s: %v", resp.GetSnapshot().GetSnapshotId(), dErr)
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
// checkIfFSNeedQuiesceRelease checks that do we have snapshots for all the
|
|
// volumes stored in the omap so that we can release the quiesce.
|
|
func checkIfFSNeedQuiesceRelease(vgs *store.VolumeGroupSnapshotIdentifier, volIDs []string) bool {
|
|
if vgs == nil {
|
|
return false
|
|
}
|
|
// If the number of volumes in the snapshot is not equal to the number of volumes
|
|
|
|
return len(vgs.GetVolumeIDs()) == len(volIDs)
|
|
}
|
|
|
|
// getClusterIDForVolumeID gets the clusterID for the volumeID from the fms map.
|
|
func getClusterIDForVolumeID(fms map[string]core.FSQuiesceClient, volumeID string) string {
|
|
for _, fm := range fms {
|
|
for _, vol := range fm.GetVolumes() {
|
|
if vol.VolumeID == volumeID {
|
|
return vol.ClusterID
|
|
}
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
// getFsNamesAndSubVolumeFromVolumeIDs gets the filesystem names and subvolumes
|
|
// from the volumeIDs present in the CreateVolumeGroupSnapshotRequest. It also
|
|
// returns the SubVolumeQuiesceClient for the filesystems present in the
|
|
// volumeIDs.
|
|
func getFsNamesAndSubVolumeFromVolumeIDs(ctx context.Context,
|
|
secret map[string]string,
|
|
volIDs []string,
|
|
cr *util.Credentials) (
|
|
map[string]core.FSQuiesceClient,
|
|
error,
|
|
) {
|
|
type fs struct {
|
|
fsName string
|
|
volumes []core.Volume
|
|
subVolumeGroupMapping map[string][]string
|
|
monitors string
|
|
}
|
|
fm := make(map[string]fs, 0)
|
|
for _, volID := range volIDs {
|
|
// Find the volume using the provided VolumeID
|
|
volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx,
|
|
volID, nil, secret, "", false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
volOptions.Destroy()
|
|
// choosing monitorIP's and fsName as the unique key
|
|
// TODO: Need to use something else as the unique key as users can
|
|
// still choose the different monitorIP's and fsName for subvolumes
|
|
uniqueName := volOptions.Monitors + volOptions.FsName
|
|
if _, ok := fm[uniqueName]; !ok {
|
|
fm[uniqueName] = fs{
|
|
fsName: volOptions.FsName,
|
|
volumes: make([]core.Volume, 0),
|
|
subVolumeGroupMapping: make(map[string][]string), // Initialize the map
|
|
monitors: volOptions.Monitors,
|
|
}
|
|
}
|
|
a := core.Volume{
|
|
VolumeID: volID,
|
|
ClusterID: volOptions.ClusterID,
|
|
}
|
|
// Retrieve the value, modify it, and assign it back
|
|
val := fm[uniqueName]
|
|
val.volumes = append(val.volumes, a)
|
|
existingVolIDInMap := val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup]
|
|
val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup] = append(
|
|
existingVolIDInMap,
|
|
volOptions.SubVolume.VolID)
|
|
fm[uniqueName] = val
|
|
}
|
|
fsk := map[string]core.FSQuiesceClient{}
|
|
var err error
|
|
defer func() {
|
|
if err != nil {
|
|
destroyFSConnections(fsk)
|
|
}
|
|
}()
|
|
for k, v := range fm {
|
|
conn := &util.ClusterConnection{}
|
|
if err = conn.Connect(v.monitors, cr); err != nil {
|
|
return nil, err
|
|
}
|
|
fsk[k], err = core.NewFSQuiesce(v.fsName, v.volumes, v.subVolumeGroupMapping, conn)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to get subvolume quiesce: %v", err)
|
|
conn.Destroy()
|
|
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return fsk, nil
|
|
}
|
|
|
|
// destroyFSConnections destroys connections of all FSQuiesceClient.
|
|
func destroyFSConnections(fsMap map[string]core.FSQuiesceClient) {
|
|
for _, fm := range fsMap {
|
|
if fm != nil {
|
|
fm.Destroy()
|
|
}
|
|
}
|
|
}
|
|
|
|
// matchesSourceVolumeIDs checks if the sourceVolumeIDs and volumeIDsInOMap are
|
|
// equal.
|
|
func matchesSourceVolumeIDs(sourceVolumeIDs, volumeIDsInOMap []string) bool {
|
|
// sort the array as its required for slices.Equal call.
|
|
sort.Strings(sourceVolumeIDs)
|
|
sort.Strings(volumeIDsInOMap)
|
|
|
|
return slices.Equal(sourceVolumeIDs, volumeIDsInOMap)
|
|
}
|
|
|
|
// deleteSnapshotsAndUndoReservation deletes the snapshots and undoes the
|
|
// volume group reservation. It also resets the quiesce of the subvolumes and
|
|
// subvolume groups in the filesystems for the volumeID's present in the
|
|
// CreateVolumeGroupSnapshotRequest.
|
|
func (cs *ControllerServer) deleteSnapshotsAndUndoReservation(ctx context.Context,
|
|
vgs *store.VolumeGroupSnapshotIdentifier,
|
|
cr *util.Credentials,
|
|
fsMap map[string]core.FSQuiesceClient,
|
|
secrets map[string]string,
|
|
) error {
|
|
// get the omap from the snapshot and volume mapping
|
|
vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, vgs.VolumeGroupSnapshotID, cr)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to get volume group options from id: %v", err)
|
|
|
|
return err
|
|
}
|
|
defer vgo.Destroy()
|
|
|
|
for volID, snapID := range vgsi.VolumeSnapshotMap {
|
|
// delete the snapshots
|
|
req := &csi.DeleteSnapshotRequest{
|
|
SnapshotId: snapID,
|
|
Secrets: secrets,
|
|
}
|
|
_, err = cs.DeleteSnapshot(ctx, req)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to delete snapshot: %v", err)
|
|
|
|
return err
|
|
}
|
|
|
|
j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// remove the entry from the omap
|
|
err = j.RemoveVolumeSnapshotMapping(
|
|
ctx,
|
|
vgo.MetadataPool,
|
|
vgsi.ReservedID,
|
|
volID)
|
|
j.Destroy()
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to remove volume snapshot mapping: %v", err)
|
|
|
|
return err
|
|
}
|
|
// undo the reservation
|
|
err = store.UndoVolumeGroupReservation(ctx, vgo, vgsi, cr)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to undo volume group reservation: %v", err)
|
|
|
|
return err
|
|
}
|
|
}
|
|
|
|
for _, fm := range fsMap {
|
|
_, err := fm.ResetFSQuiesce(ctx, vgs.RequestName)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to reset filesystem quiesce: %v", err)
|
|
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// validateVolumeGroupSnapshotDeleteRequest validates the request for creating a group
|
|
// snapshot of volumes.
|
|
func (cs *ControllerServer) validateVolumeGroupSnapshotDeleteRequest(
|
|
ctx context.Context,
|
|
req *csi.DeleteVolumeGroupSnapshotRequest,
|
|
) error {
|
|
if err := cs.Driver.ValidateGroupControllerServiceRequest(
|
|
csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil {
|
|
log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req))
|
|
|
|
return err
|
|
}
|
|
|
|
// Check sanity of request volume group snapshot Name, Source Volume Id's
|
|
if req.GetGroupSnapshotId() == "" {
|
|
return status.Error(codes.InvalidArgument, "volume group snapshot id cannot be empty")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeleteVolumeGroupSnapshot deletes a group snapshot of volumes.
|
|
func (cs *ControllerServer) DeleteVolumeGroupSnapshot(ctx context.Context,
|
|
req *csi.DeleteVolumeGroupSnapshotRequest) (
|
|
*csi.DeleteVolumeGroupSnapshotResponse,
|
|
error,
|
|
) {
|
|
if err := cs.validateVolumeGroupSnapshotDeleteRequest(ctx, req); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
groupSnapshotID := req.GroupSnapshotId
|
|
// Existence and conflict checks
|
|
if acquired := cs.VolumeGroupLocks.TryAcquire(groupSnapshotID); !acquired {
|
|
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID)
|
|
|
|
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID)
|
|
}
|
|
defer cs.VolumeGroupLocks.Release(groupSnapshotID)
|
|
|
|
cr, err := util.NewAdminCredentials(req.GetSecrets())
|
|
if err != nil {
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
defer cr.DeleteCredentials()
|
|
|
|
vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, req.GroupSnapshotId, cr)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to get volume group options: %v", err)
|
|
err = extractDeleteVolumeGroupError(err)
|
|
if err != nil {
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
|
|
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
|
|
}
|
|
vgo.Destroy()
|
|
|
|
volIds := vgsi.GetVolumeIDs()
|
|
fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), volIds, cr)
|
|
err = extractDeleteVolumeGroupError(err)
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to get volume group options: %v", err)
|
|
err = extractDeleteVolumeGroupError(err)
|
|
if err != nil {
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
|
|
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
|
|
}
|
|
|
|
defer destroyFSConnections(fsMap)
|
|
|
|
err = cs.deleteSnapshotsAndUndoReservation(ctx, vgsi, cr, fsMap, req.GetSecrets())
|
|
if err != nil {
|
|
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", err)
|
|
err = extractDeleteVolumeGroupError(err)
|
|
if err != nil {
|
|
return nil, status.Error(codes.Internal, err.Error())
|
|
}
|
|
|
|
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
|
|
}
|
|
|
|
return &csi.DeleteVolumeGroupSnapshotResponse{}, nil
|
|
}
|
|
|
|
// extractDeleteVolumeGroupError extracts the error from the delete volume
|
|
// group snapshot and returns the error if it is not a ErrKeyNotFound or
|
|
// ErrPoolNotFound error.
|
|
func extractDeleteVolumeGroupError(err error) error {
|
|
switch {
|
|
case errors.Is(err, util.ErrPoolNotFound):
|
|
// if error is ErrPoolNotFound, the pool is already deleted we dont
|
|
// need to worry about deleting snapshot or omap data, return success
|
|
return nil
|
|
case errors.Is(err, util.ErrKeyNotFound):
|
|
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
|
|
// or partially complete (snap and snapOMap are garbage collected already), hence return
|
|
// success as deletion is complete
|
|
return nil
|
|
}
|
|
|
|
return err
|
|
}
|