ceph-csi/internal/cephfs/groupcontrollerserver.go
Madhu Rajanna df770e4139 cephfs: implement CreateVolumeGroupSnapshot RPC
implemented CreateVolumeGroupSnapshot RPC which
does below operations

* Basic request validation
* Reserve the UUID for the group name
* Quiesce the filesystem for all the subvolumes
from the input volumeId's
* Take the snapshot for all the input volumeId's
* Add the mapping between volumeId's and snapshot
Id's in omap
* Release the quiesce for the filesystem for
all the subvolumes from the input volumeId's

Undo all the operations if anything fails.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
2024-03-21 06:16:42 +00:00

674 lines
21 KiB
Go

/*
Copyright 2024 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"context"
"errors"
"fmt"
"sort"
"time"
"github.com/ceph/ceph-csi/internal/cephfs/core"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/ceph/go-ceph/cephfs/admin"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/utils/strings/slices"
)
// validateCreateVolumeGroupSnapshotRequest validates the request for creating
// a group snapshot of volumes.
func (cs *ControllerServer) validateCreateVolumeGroupSnapshotRequest(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest,
) error {
if err := cs.Driver.ValidateGroupControllerServiceRequest(
csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil {
log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req))
return err
}
// Check sanity of request volume group snapshot Name, Source Volume Id's
if req.GetName() == "" {
return status.Error(codes.InvalidArgument, "volume group snapshot Name cannot be empty")
}
if len(req.GetSourceVolumeIds()) == 0 {
return status.Error(codes.InvalidArgument, "source volume ids cannot be empty")
}
param := req.GetParameters()
// check for ClusterID and fsName
if value, ok := param["clusterID"]; !ok || value == "" {
return status.Error(codes.InvalidArgument, "missing or empty clusterID")
}
if value, ok := param["fsName"]; !ok || value == "" {
return status.Error(codes.InvalidArgument, "missing or empty fsName")
}
return nil
}
// CreateVolumeGroupSnapshot creates a group snapshot of volumes.
func (cs *ControllerServer) CreateVolumeGroupSnapshot(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest) (
*csi.CreateVolumeGroupSnapshotResponse,
error,
) {
if err := cs.validateCreateVolumeGroupSnapshotRequest(ctx, req); err != nil {
return nil, err
}
requestName := req.GetName()
// Existence and conflict checks
if acquired := cs.VolumeGroupLocks.TryAcquire(requestName); !acquired {
log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, requestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, requestName)
}
defer cs.VolumeGroupLocks.Release(requestName)
cr, err := util.NewAdminCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
vg, err := store.NewVolumeGroupOptions(ctx, req, cr)
if err != nil {
log.ErrorLog(ctx, "failed to get volume group options: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
defer vg.Destroy()
vgs, err := store.CheckVolumeGroupSnapExists(ctx, vg, cr)
if err != nil {
log.ErrorLog(ctx, "failed to check volume group snapshot exists: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
// Get the fs names and subvolume from the volume ids to execute quiesce commands.
fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), req.GetSourceVolumeIds(), cr)
if err != nil {
log.ErrorLog(ctx, "failed to get fs names and subvolume from volume ids: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
defer destroyFSConnections(fsMap)
needRelease := checkIfFSNeedQuiesceRelease(vgs, req.GetSourceVolumeIds())
if needRelease {
return cs.releaseQuiesceAndGetVolumeGroupSnapshotResponse(ctx, req, vgs, fsMap, vg, cr)
}
// If the volume group snapshot does not exist, reserve the volume group
if vgs == nil {
vgs, err = store.ReserveVolumeGroup(ctx, vg, cr)
if err != nil {
log.ErrorLog(ctx, "failed to reserve volume group: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
}
inProgress, err := cs.queisceFileSystems(ctx, vgs, fsMap)
if err != nil {
log.ErrorLog(ctx, "failed to quiesce filesystems: %v", err)
if !errors.Is(err, cerrors.ErrQuiesceInProgress) {
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
if uErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
}
}
return nil, status.Error(codes.Internal, err.Error())
}
if inProgress {
return nil, status.Error(codes.Internal, "Quiesce operation is in progress")
}
resp, err := cs.createSnapshotAddToVolumeGroupJournal(ctx, req, vg, vgs, cr, fsMap)
if err != nil {
log.ErrorLog(ctx, "failed to create snapshot and add to volume group journal: %v", err)
if !errors.Is(err, cerrors.ErrQuiesceInProgress) {
// Handle Undo reservation and timeout as well
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
if uErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
}
}
return nil, status.Error(codes.Internal, err.Error())
}
response := &csi.CreateVolumeGroupSnapshotResponse{}
response.GroupSnapshot = &csi.VolumeGroupSnapshot{
GroupSnapshotId: vgs.VolumeGroupSnapshotID,
ReadyToUse: true,
CreationTime: timestamppb.New(time.Now()),
}
for _, r := range *resp {
r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID
response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot)
}
return response, nil
}
// queisceFileSystems quiesces the subvolumes and subvolume groups present in
// the filesystems of the volumeID's present in the
// CreateVolumeGroupSnapshotRequest.
func (cs *ControllerServer) queisceFileSystems(ctx context.Context,
vgs *store.VolumeGroupSnapshotIdentifier,
fsMap map[string]core.FSQuiesceClient,
) (bool, error) {
var inProgress bool
for _, fm := range fsMap {
// Quiesce the fs, subvolumes and subvolume groups
data, err := fm.FSQuiesce(ctx, vgs.RequestName)
if err != nil {
log.ErrorLog(ctx, "failed to quiesce filesystem: %v", err)
return inProgress, err
}
state := core.GetQuiesceState(data.State)
if state == core.Quiescing {
inProgress = true
} else if state != core.Quiesced {
return inProgress, fmt.Errorf("quiesce operation is in %s state", state)
}
}
return inProgress, nil
}
// releaseQuiesceAndGetVolumeGroupSnapshotResponse releases the quiesce of the
// subvolumes and subvolume groups in the filesystems for the volumeID's
// present in the CreateVolumeGroupSnapshotRequest.
func (cs *ControllerServer) releaseQuiesceAndGetVolumeGroupSnapshotResponse(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest,
vgs *store.VolumeGroupSnapshotIdentifier,
fsMap map[string]core.FSQuiesceClient,
vg *store.VolumeGroupOptions,
cr *util.Credentials,
) (*csi.CreateVolumeGroupSnapshotResponse, error) {
matchesSourceVolumeIDs := matchesSourceVolumeIDs(vgs.GetVolumeIDs(), req.GetSourceVolumeIds())
if !matchesSourceVolumeIDs {
return nil, status.Errorf(
codes.InvalidArgument,
"source volume ids %v do not match in the existing volume group snapshot %v",
req.GetSourceVolumeIds(),
vgs.GetVolumeIDs())
}
// Release the quiesce of the subvolumes and subvolume groups in the
// filesystems for the volumes.
for _, fm := range fsMap {
// UnFreeze the filesystems, subvolumes and subvolume groups
data, err := fm.ReleaseFSQuiesce(ctx, vg.RequestName)
if err != nil {
log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err)
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
if uErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
}
return nil, status.Errorf(codes.Internal, "failed to release filesystem quiesce: %v", err)
}
state := core.GetQuiesceState(data.State)
if state != core.Released {
return nil, status.Errorf(codes.Internal, "quiesce operation is in %s state", state)
}
}
var err error
defer func() {
if err != nil && !errors.Is(err, cerrors.ErrQuiesceInProgress) {
uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets())
if uErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr)
}
}
}()
snapshotResponses := make([]csi.CreateSnapshotResponse, 0)
for _, volID := range req.GetSourceVolumeIds() {
// Create the snapshot for the volumeID
clusterID := getClusterIDForVolumeID(fsMap, volID)
if clusterID == "" {
return nil, status.Errorf(codes.Internal, "failed to get clusterID for volumeID %s", volID)
}
req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName,
clusterID,
req.GetSecrets())
var resp *csi.CreateSnapshotResponse
resp, err = cs.createSnapshotAndAddMapping(ctx, req, vg, vgs, cr)
if err != nil {
// Handle cleanup
log.ErrorLog(ctx, "failed to create snapshot: %v", err)
return nil, status.Errorf(codes.Internal,
"failed to create snapshot and add to volume group journal: %v",
err)
}
snapshotResponses = append(snapshotResponses, *resp)
}
response := &csi.CreateVolumeGroupSnapshotResponse{}
response.GroupSnapshot = &csi.VolumeGroupSnapshot{
GroupSnapshotId: vgs.VolumeGroupSnapshotID,
ReadyToUse: true,
CreationTime: timestamppb.New(time.Now()),
}
for _, r := range snapshotResponses {
r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID
response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot)
}
return response, nil
}
// createSnapshotAddToVolumeGroupJournal creates the snapshot and adds the
// snapshotID and volumeID to the volume group journal omap. If the freeze is
// true then it will freeze the subvolumes and subvolume groups before creating
// the snapshot and unfreeze them after creating the snapshot. If the freeze is
// false it will call createSnapshot and get the snapshot details for the
// volume and add the snapshotID and volumeID to the volume group journal omap.
// If any error occurs other than ErrInProgress it will delete the snapshots
// and undo the reservation and return the error.
func (cs *ControllerServer) createSnapshotAddToVolumeGroupJournal(
ctx context.Context,
req *csi.CreateVolumeGroupSnapshotRequest,
vgo *store.VolumeGroupOptions,
vgs *store.VolumeGroupSnapshotIdentifier,
cr *util.Credentials,
fsMap map[string]core.FSQuiesceClient) (
*[]csi.CreateSnapshotResponse,
error,
) {
var err error
var resp *csi.CreateSnapshotResponse
responses := make([]csi.CreateSnapshotResponse, 0)
for _, volID := range req.GetSourceVolumeIds() {
err = fsQuiesceWithExpireTimeout(ctx, vgo.RequestName, fsMap)
if err != nil {
log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err)
return nil, err
}
// Create the snapshot for the volumeID
clusterID := getClusterIDForVolumeID(fsMap, volID)
if clusterID == "" {
return nil, fmt.Errorf("failed to get clusterID for volumeID %s", volID)
}
req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName,
clusterID,
req.GetSecrets())
resp, err = cs.createSnapshotAndAddMapping(ctx, req, vgo, vgs, cr)
if err != nil {
// Handle cleanup
log.ErrorLog(ctx, "failed to create snapshot: %v", err)
return nil, err
}
responses = append(responses, *resp)
}
err = releaseFSQuiesce(ctx, vgo.RequestName, fsMap)
if err != nil {
log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err)
return nil, err
}
return &responses, nil
}
func formatCreateSnapshotRequest(volID, groupSnapshotName,
clusterID string,
secret map[string]string,
) *csi.CreateSnapshotRequest {
return &csi.CreateSnapshotRequest{
SourceVolumeId: volID,
Name: groupSnapshotName + "-" + volID,
Secrets: secret,
Parameters: map[string]string{
"clusterID": clusterID,
},
}
}
// releaseSubvolumeQuiesce releases the quiesce of the subvolumes and subvolume
// groups in the filesystems for the volumeID's present in the
// CreateVolumeGroupSnapshotRequest.
func releaseFSQuiesce(ctx context.Context,
requestName string,
fsMap map[string]core.FSQuiesceClient,
) error {
inProgress := false
var err error
var data *admin.QuiesceInfo
for _, fm := range fsMap {
// UnFreeze the filesystems, subvolumes and subvolume groups
data, err = fm.ReleaseFSQuiesce(ctx, requestName)
if err != nil {
log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err)
return err
}
state := core.GetQuiesceState(data.State)
if state != core.Released {
inProgress = true
}
}
if inProgress {
return cerrors.ErrQuiesceInProgress
}
return nil
}
// fsQuiesceWithExpireTimeout quiesces the subvolumes and subvolume
// groups in the filesystems for the volumeID's present in the
// CreateVolumeGroupSnapshotRequest.
func fsQuiesceWithExpireTimeout(ctx context.Context,
requestName string,
fsMap map[string]core.FSQuiesceClient,
) error {
var err error
var data *admin.QuiesceInfo
inProgress := false
for _, fm := range fsMap {
// reinitialize the expiry timer for the quiesce
data, err = fm.FSQuiesceWithExpireTimeout(ctx, requestName)
if err != nil {
log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err)
return err
}
state := core.GetQuiesceState(data.State)
if state == core.Quiescing {
inProgress = true
} else if state != core.Quiesced {
return fmt.Errorf("quiesce operation is in %s state", state)
}
}
if inProgress {
return cerrors.ErrQuiesceInProgress
}
return nil
}
// createSnapshotAndAddMapping creates the snapshot and adds the snapshotID and
// volumeID to the volume group journal omap. If any error occurs it will
// delete the last created snapshot as its still not added to the journal.
func (cs *ControllerServer) createSnapshotAndAddMapping(
ctx context.Context,
req *csi.CreateSnapshotRequest,
vgo *store.VolumeGroupOptions,
vgs *store.VolumeGroupSnapshotIdentifier,
cr *util.Credentials,
) (*csi.CreateSnapshotResponse, error) {
// Create the snapshot
resp, err := cs.CreateSnapshot(ctx, req)
if err != nil {
// Handle cleanup
log.ErrorLog(ctx, "failed to create snapshot: %v", err)
return nil, err
}
j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return nil, err
}
defer j.Destroy()
// Add the snapshot to the volume group journal
err = j.AddVolumeSnapshotMapping(ctx,
vgo.MetadataPool,
vgs.ReservedID,
req.GetSourceVolumeId(),
resp.GetSnapshot().GetSnapshotId())
if err != nil {
log.ErrorLog(ctx, "failed to add volume snapshot mapping: %v", err)
// Delete the last created snapshot as its still not added to the
// journal
delReq := &csi.DeleteSnapshotRequest{
SnapshotId: resp.GetSnapshot().GetSnapshotId(),
Secrets: req.GetSecrets(),
}
_, dErr := cs.DeleteSnapshot(ctx, delReq)
if dErr != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s: %v", resp.GetSnapshot().GetSnapshotId(), dErr)
}
return nil, err
}
return resp, nil
}
// checkIfFSNeedQuiesceRelease checks that do we have snapshots for all the
// volumes stored in the omap so that we can release the quiesce.
func checkIfFSNeedQuiesceRelease(vgs *store.VolumeGroupSnapshotIdentifier, volIDs []string) bool {
if vgs == nil {
return false
}
// If the number of volumes in the snapshot is not equal to the number of volumes
return len(vgs.GetVolumeIDs()) == len(volIDs)
}
// getClusterIDForVolumeID gets the clusterID for the volumeID from the fms map.
func getClusterIDForVolumeID(fms map[string]core.FSQuiesceClient, volumeID string) string {
for _, fm := range fms {
for _, vol := range fm.GetVolumes() {
if vol.VolumeID == volumeID {
return vol.ClusterID
}
}
}
return ""
}
// getFsNamesAndSubVolumeFromVolumeIDs gets the filesystem names and subvolumes
// from the volumeIDs present in the CreateVolumeGroupSnapshotRequest. It also
// returns the SubVolumeQuiesceClient for the filesystems present in the
// volumeIDs.
func getFsNamesAndSubVolumeFromVolumeIDs(ctx context.Context,
secret map[string]string,
volIDs []string,
cr *util.Credentials) (
map[string]core.FSQuiesceClient,
error,
) {
type fs struct {
fsName string
volumes []core.Volume
subVolumeGroupMapping map[string][]string
monitors string
}
fm := make(map[string]fs, 0)
for _, volID := range volIDs {
// Find the volume using the provided VolumeID
volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx,
volID, nil, secret, "", false)
if err != nil {
return nil, err
}
volOptions.Destroy()
// choosing monitorIP's and fsName as the unique key
// TODO: Need to use something else as the unique key as users can
// still choose the different monitorIP's and fsName for subvolumes
uniqueName := volOptions.Monitors + volOptions.FsName
if _, ok := fm[uniqueName]; !ok {
fm[uniqueName] = fs{
fsName: volOptions.FsName,
volumes: make([]core.Volume, 0),
subVolumeGroupMapping: make(map[string][]string), // Initialize the map
monitors: volOptions.Monitors,
}
}
a := core.Volume{
VolumeID: volID,
ClusterID: volOptions.ClusterID,
}
// Retrieve the value, modify it, and assign it back
val := fm[uniqueName]
val.volumes = append(val.volumes, a)
existingVolIDInMap := val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup]
val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup] = append(
existingVolIDInMap,
volOptions.SubVolume.VolID)
fm[uniqueName] = val
}
fsk := map[string]core.FSQuiesceClient{}
var err error
defer func() {
if err != nil {
destroyFSConnections(fsk)
}
}()
for k, v := range fm {
conn := &util.ClusterConnection{}
if err = conn.Connect(v.monitors, cr); err != nil {
return nil, err
}
fsk[k], err = core.NewFSQuiesce(v.fsName, v.volumes, v.subVolumeGroupMapping, conn)
if err != nil {
log.ErrorLog(ctx, "failed to get subvolume quiesce: %v", err)
conn.Destroy()
return nil, err
}
}
return fsk, nil
}
// destroyFSConnections destroys connections of all FSQuiesceClient.
func destroyFSConnections(fsMap map[string]core.FSQuiesceClient) {
for _, fm := range fsMap {
if fm != nil {
fm.Destroy()
}
}
}
// matchesSourceVolumeIDs checks if the sourceVolumeIDs and volumeIDsInOMap are
// equal.
func matchesSourceVolumeIDs(sourceVolumeIDs, volumeIDsInOMap []string) bool {
// sort the array as its required for slices.Equal call.
sort.Strings(sourceVolumeIDs)
sort.Strings(volumeIDsInOMap)
return slices.Equal(sourceVolumeIDs, volumeIDsInOMap)
}
// deleteSnapshotsAndUndoReservation deletes the snapshots and undoes the
// volume group reservation. It also resets the quiesce of the subvolumes and
// subvolume groups in the filesystems for the volumeID's present in the
// CreateVolumeGroupSnapshotRequest.
func (cs *ControllerServer) deleteSnapshotsAndUndoReservation(ctx context.Context,
vgs *store.VolumeGroupSnapshotIdentifier,
cr *util.Credentials,
fsMap map[string]core.FSQuiesceClient,
secrets map[string]string,
) error {
// get the omap from the snapshot and volume mapping
vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, vgs.VolumeGroupSnapshotID, cr)
if err != nil {
log.ErrorLog(ctx, "failed to get volume group options from id: %v", err)
return err
}
defer vgo.Destroy()
for volID, snapID := range vgsi.VolumeSnapshotMap {
// delete the snapshots
req := &csi.DeleteSnapshotRequest{
SnapshotId: snapID,
Secrets: secrets,
}
_, err = cs.DeleteSnapshot(ctx, req)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot: %v", err)
return err
}
j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr)
if err != nil {
return err
}
// remove the entry from the omap
err = j.RemoveVolumeSnapshotMapping(
ctx,
vgo.MetadataPool,
vgsi.ReservedID,
volID)
j.Destroy()
if err != nil {
log.ErrorLog(ctx, "failed to remove volume snapshot mapping: %v", err)
return err
}
// undo the reservation
err = store.UndoVolumeGroupReservation(ctx, vgo, vgsi, cr)
if err != nil {
log.ErrorLog(ctx, "failed to undo volume group reservation: %v", err)
return err
}
}
for _, fm := range fsMap {
_, err := fm.ResetFSQuiesce(ctx, vgs.RequestName)
if err != nil {
log.ErrorLog(ctx, "failed to reset filesystem quiesce: %v", err)
return err
}
}
return nil
}