From 6b3484f285a832da1dea41d389d65715dc266717 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 5 Feb 2024 09:11:51 +0100 Subject: [PATCH 01/17] cephfs: add volumegroup service capability Add GROUP_CONTROLLER_SERVICE capabilities to the GetPluginCapabilities of the cephFS plugin. Signed-off-by: Madhu Rajanna --- internal/cephfs/identityserver.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/internal/cephfs/identityserver.go b/internal/cephfs/identityserver.go index 625fc3842..b2a041a32 100644 --- a/internal/cephfs/identityserver.go +++ b/internal/cephfs/identityserver.go @@ -58,6 +58,13 @@ func (is *IdentityServer) GetPluginCapabilities( }, }, }, + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_GROUP_CONTROLLER_SERVICE, + }, + }, + }, }, }, nil } From f17ea38736b28922d7f7a26c4f7be9635da3d8cf Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 5 Feb 2024 10:05:32 +0100 Subject: [PATCH 02/17] cephfs: advertise group snapshot capability Advertise VOLUME_GROUP_SNAPSHOT capability from the cephfs driver. Signed-off-by: Madhu Rajanna --- internal/cephfs/driver.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 9f1957fd6..d12f3fd69 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -146,6 +146,10 @@ func (fs *Driver) Run(conf *util.Config) { csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER, csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, }) + + fs.cd.AddGroupControllerServiceCapabilities([]csi.GroupControllerServiceCapability_RPC_Type{ + csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT, + }) } // Create gRPC servers From 68e93a31ccac7b9b383d55f8d1c7d3eef26f2a23 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 19 Mar 2024 14:48:27 +0100 Subject: [PATCH 03/17] journal: fix connection problem with groupjournal Same group jounral config need to be reused for multiple connection where different monitors and users are used, for that reason create a unique connection each time. Signed-off-by: Madhu Rajanna --- internal/journal/volumegroupjournal.go | 106 +++++++++++++------------ 1 file changed, 55 insertions(+), 51 deletions(-) diff --git a/internal/journal/volumegroupjournal.go b/internal/journal/volumegroupjournal.go index 1498748d3..75f06a355 100644 --- a/internal/journal/volumegroupjournal.go +++ b/internal/journal/volumegroupjournal.go @@ -32,15 +32,7 @@ const ( ) type VolumeGroupJournal interface { - // Connect establishes a new connection to a ceph cluster for journal metadata. - Connect( - monitors, - namespace string, - cr *util.Credentials) error - // Destroy frees any resources and invalidates the journal connection. Destroy() - // SetNamespace sets the namespace for the journal. - SetNamespace(ns string) CheckReservation( ctx context.Context, journalPool, @@ -78,16 +70,20 @@ type VolumeGroupJournal interface { volumeID string) error } -// volumeGroupJournalConfig contains the configuration and connection details. -type volumeGroupJournalConfig struct { - *Config - *Connection +// VolumeGroupJournalConfig contains the configuration. +type VolumeGroupJournalConfig struct { + Config +} + +type VolumeGroupJournalConnection struct { + config *VolumeGroupJournalConfig + connection *Connection } // NewCSIVolumeGroupJournal returns an instance of VolumeGroupJournal for groups. -func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal { - return &volumeGroupJournalConfig{ - Config: &Config{ +func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournalConfig { + return VolumeGroupJournalConfig{ + Config: Config{ csiDirectory: "csi.groups." + suffix, csiNameKeyPrefix: "csi.volume.group.", cephUUIDDirectoryPrefix: "csi.volume.group.", @@ -98,35 +94,42 @@ func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal { } } -func (sgj *volumeGroupJournalConfig) SetNamespace(ns string) { - sgj.Config.namespace = ns +// SetNamespace sets the namespace for the journal. +func (vgc *VolumeGroupJournalConfig) SetNamespace(ns string) { + vgc.Config.namespace = ns } // NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for // volume groups using a predetermined namespace value. -func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournal { +func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournalConfig { j := NewCSIVolumeGroupJournal(suffix) j.SetNamespace(ns) return j } -func (sgj *volumeGroupJournalConfig) Connect( +// Connect establishes a new connection to a ceph cluster for journal metadata. +func (vgc *VolumeGroupJournalConfig) Connect( monitors, namespace string, cr *util.Credentials, -) error { - conn, err := sgj.Config.Connect(monitors, namespace, cr) - if err != nil { - return err +) (VolumeGroupJournal, error) { + vgjc := &VolumeGroupJournalConnection{} + vgjc.config = &VolumeGroupJournalConfig{ + Config: vgc.Config, } - sgj.Connection = conn + conn, err := vgc.Config.Connect(monitors, namespace, cr) + if err != nil { + return nil, err + } + vgjc.connection = conn - return nil + return vgjc, nil } -func (sgj *volumeGroupJournalConfig) Destroy() { - sgj.Connection.Destroy() +// Destroy frees any resources and invalidates the journal connection. +func (vgjc *VolumeGroupJournalConnection) Destroy() { + vgjc.connection.Destroy() } // VolumeGroupData contains the GroupUUID and VolumeGroupAttributes for a @@ -162,11 +165,11 @@ Return values: reservation found. - error: non-nil in case of any errors. */ -func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, +func (vgjc *VolumeGroupJournalConnection) CheckReservation(ctx context.Context, journalPool, reqName, namePrefix string, ) (*VolumeGroupData, error) { var ( - cj = sgj.Config + cj = vgjc.config volGroupData = &VolumeGroupData{} ) @@ -175,7 +178,7 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, cj.csiNameKeyPrefix + reqName, } values, err := getOMapValues( - ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, + ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory, cj.commonPrefix, fetchKeys) if err != nil { if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) { @@ -195,13 +198,13 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, } volGroupData.GroupUUID = objUUID - savedVolumeGroupAttributes, err := sgj.GetVolumeGroupAttributes(ctx, journalPool, + savedVolumeGroupAttributes, err := vgjc.GetVolumeGroupAttributes(ctx, journalPool, objUUID) if err != nil { // error should specifically be not found, for image to be absent, any other error // is not conclusive, and we should not proceed if errors.Is(err, util.ErrKeyNotFound) { - err = sgj.UndoReservation(ctx, journalPool, + err = vgjc.UndoReservation(ctx, journalPool, generateVolumeGroupName(namePrefix, objUUID), reqName) } @@ -239,11 +242,11 @@ Input arguments: - groupID: ID of the volume group, generated from the UUID - reqName: Request name for the volume group */ -func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, +func (vgjc *VolumeGroupJournalConnection) UndoReservation(ctx context.Context, csiJournalPool, groupID, reqName string, ) error { // delete volume UUID omap (first, inverse of create order) - cj := sgj.Config + cj := vgjc.config if groupID != "" { if len(groupID) < uuidEncodedLength { return fmt.Errorf("unable to parse UUID from %s, too short", groupID) @@ -256,8 +259,8 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, err := util.RemoveObject( ctx, - sgj.Connection.monitors, - sgj.Connection.cr, + vgjc.connection.monitors, + vgjc.connection.cr, csiJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+groupUUID) @@ -271,7 +274,7 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, } // delete the request name key (last, inverse of create order) - err := removeMapKeys(ctx, sgj.Connection, csiJournalPool, cj.namespace, cj.csiDirectory, + err := removeMapKeys(ctx, vgjc.connection, csiJournalPool, cj.namespace, cj.csiDirectory, []string{cj.csiNameKeyPrefix + reqName}) if err != nil { log.ErrorLog(ctx, "failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err) @@ -299,11 +302,11 @@ Return values: - string: Contains the VolumeGroup name that was reserved for the passed in reqName - error: non-nil in case of any errors */ -func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, +func (vgjc *VolumeGroupJournalConnection) ReserveName(ctx context.Context, journalPool string, journalPoolID int64, reqName, namePrefix string, ) (string, string, error) { - cj := sgj.Config + cj := vgjc.config // Create the UUID based omap first, to reserve the same and avoid conflicts // NOTE: If any service loss occurs post creation of the UUID directory, and before @@ -311,8 +314,8 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, // UUID directory key will be leaked objUUID, err := reserveOMapName( ctx, - sgj.Connection.monitors, - sgj.Connection.cr, + vgjc.connection.monitors, + vgjc.connection.cr, journalPool, cj.namespace, cj.cephUUIDDirectoryPrefix, @@ -325,7 +328,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, // After generating the UUID Directory omap, we populate the csiDirectory // omap with a key-value entry to map the request to the backend volume group: // `csiNameKeyPrefix + reqName: nameKeyVal` - err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, + err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory, map[string]string{cj.csiNameKeyPrefix + reqName: nameKeyVal}) if err != nil { return "", "", err @@ -333,7 +336,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, defer func() { if err != nil { log.WarningLog(ctx, "reservation failed for volume group: %s", reqName) - errDefer := sgj.UndoReservation(ctx, journalPool, groupName, reqName) + errDefer := vgjc.UndoReservation(ctx, journalPool, groupName, reqName) if errDefer != nil { log.WarningLog(ctx, "failed undoing reservation of volume group: %s (%v)", reqName, errDefer) } @@ -347,7 +350,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, omapValues[cj.csiNameKey] = reqName omapValues[cj.csiImageKey] = groupName - err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, oid, omapValues) + err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, oid, omapValues) if err != nil { return "", "", err } @@ -363,18 +366,18 @@ type VolumeGroupAttributes struct { VolumeSnapshotMap map[string]string // Contains the volumeID and the corresponding snapshotID mapping } -func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes( +func (vgjc *VolumeGroupJournalConnection) GetVolumeGroupAttributes( ctx context.Context, pool, objectUUID string, ) (*VolumeGroupAttributes, error) { var ( err error groupAttributes = &VolumeGroupAttributes{} - cj = sgj.Config + cj = vgjc.config ) values, err := listOMapValues( - ctx, sgj.Connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, + ctx, vgjc.connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.commonPrefix) if err != nil { if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) { @@ -398,14 +401,14 @@ func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes( return groupAttributes, nil } -func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping( +func (vgjc *VolumeGroupJournalConnection) AddVolumeSnapshotMapping( ctx context.Context, pool, reservedUUID, volumeID, snapshotID string, ) error { - err := setOMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, + err := setOMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace, vgjc.config.cephUUIDDirectoryPrefix+reservedUUID, map[string]string{volumeID: snapshotID}) if err != nil { log.ErrorLog(ctx, "failed adding volume snapshot mapping: %v", err) @@ -416,13 +419,14 @@ func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping( return nil } -func (sgj *volumeGroupJournalConfig) RemoveVolumeSnapshotMapping( +func (vgjc *VolumeGroupJournalConnection) RemoveVolumeSnapshotMapping( ctx context.Context, pool, reservedUUID, volumeID string, ) error { - err := removeMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, + err := removeMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace, + vgjc.config.cephUUIDDirectoryPrefix+reservedUUID, []string{volumeID}) if err != nil { log.ErrorLog(ctx, "failed removing volume snapshot mapping: %v", err) From 445de7926d877141824086e58a84eb25bbd279a0 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 5 Feb 2024 11:49:45 +0100 Subject: [PATCH 04/17] cephfs: add validateCreateVolumeGroupSnapshotRequest added validateCreateVolumeGroupSnapshotRequest to validate the CreateVolumeGroupSnapshotRequest request and ensure that all the requirement options are set. if not, reject the RPC request. Signed-off-by: Madhu Rajanna --- internal/cephfs/groupcontrollerserver.go | 63 ++++++++++++++++++++++++ 1 file changed, 63 insertions(+) create mode 100644 internal/cephfs/groupcontrollerserver.go diff --git a/internal/cephfs/groupcontrollerserver.go b/internal/cephfs/groupcontrollerserver.go new file mode 100644 index 000000000..6092f8a14 --- /dev/null +++ b/internal/cephfs/groupcontrollerserver.go @@ -0,0 +1,63 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "context" + + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// validateCreateVolumeGroupSnapshotRequest validates the request for creating +// a group snapshot of volumes. +func (cs *ControllerServer) validateCreateVolumeGroupSnapshotRequest( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest, +) error { + if err := cs.Driver.ValidateGroupControllerServiceRequest( + csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil { + log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req)) + + return err + } + + // Check sanity of request volume group snapshot Name, Source Volume Id's + if req.GetName() == "" { + return status.Error(codes.InvalidArgument, "volume group snapshot Name cannot be empty") + } + + if len(req.GetSourceVolumeIds()) == 0 { + return status.Error(codes.InvalidArgument, "source volume ids cannot be empty") + } + + param := req.GetParameters() + // check for ClusterID and fsName + if value, ok := param["clusterID"]; !ok || value == "" { + return status.Error(codes.InvalidArgument, "missing or empty clusterID") + } + + if value, ok := param["fsName"]; !ok || value == "" { + return status.Error(codes.InvalidArgument, "missing or empty fsName") + } + + return nil +} From ff6eda0de1b1253b340a8f34c4a8d8815dd8ebf5 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 5 Feb 2024 13:21:26 +0100 Subject: [PATCH 05/17] cephfs: initialize VolumeGroupJournal initialize VolumeGroupJournal which is required for volumegroup rados communication Signed-off-by: Madhu Rajanna --- internal/cephfs/driver.go | 4 ++++ internal/cephfs/store/fsjournal.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index d12f3fd69..3aa8caf6b 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -124,6 +124,10 @@ func (fs *Driver) Run(conf *util.Config) { store.VolJournal = journal.NewCSIVolumeJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) store.SnapJournal = journal.NewCSISnapshotJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) + + store.VolumeGroupJournal = journal.NewCSIVolumeGroupJournalWithNamespace( + CSIInstanceID, + fsutil.RadosNamespace) // Initialize default library driver fs.cd = csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID) diff --git a/internal/cephfs/store/fsjournal.go b/internal/cephfs/store/fsjournal.go index a8fbcdb50..c9f9a16d7 100644 --- a/internal/cephfs/store/fsjournal.go +++ b/internal/cephfs/store/fsjournal.go @@ -40,6 +40,10 @@ var ( // SnapJournal is used to maintain RADOS based journals for CO generated. // SnapshotName to backing CephFS subvolumes. SnapJournal *journal.Config + + // VolumeGroupJournal is used to maintain RADOS based journals for CO + // generate request name to CephFS snapshot group attributes. + VolumeGroupJournal journal.VolumeGroupJournalConfig ) // VolumeIdentifier structure contains an association between the CSI VolumeID to its subvolume From 6ec86879e6ee3a310684ab3a77eade34f3e59544 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 6 Feb 2024 14:54:38 +0100 Subject: [PATCH 06/17] cephfs: register group controller register the group controller service for the cephfs. Signed-off-by: Madhu Rajanna --- internal/cephfs/driver.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 3aa8caf6b..d0f90c1e4 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -200,6 +200,7 @@ func (fs *Driver) Run(conf *util.Config) { IS: fs.is, CS: fs.cs, NS: fs.ns, + GS: fs.cs, } server.Start(conf.Endpoint, srv) From b30da094b04cc6a393720772679da2d6c133102e Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Fri, 1 Mar 2024 16:11:55 +0100 Subject: [PATCH 07/17] build: add ceph_preview to GO_TAGS_LIST added required ceph_preview tag to the GO_TAGS_LIST in Makefile which is required for FSQuiesce API. Signed-off-by: Madhu Rajanna --- Makefile | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Makefile b/Makefile index 24116fbb0..57024dc20 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,8 @@ endif GO_PROJECT=github.com/ceph/ceph-csi CEPH_VERSION ?= $(shell . $(CURDIR)/build.env ; echo $${CEPH_VERSION}) -GO_TAGS_LIST ?= $(CEPH_VERSION) +# TODO: ceph_preview tag required for FSQuiesce API +GO_TAGS_LIST ?= $(CEPH_VERSION) ceph_preview # go build flags LDFLAGS ?= From ef25a816a75d31b2787e2f763db23988868de8f7 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 13 Feb 2024 14:24:45 +0100 Subject: [PATCH 08/17] cephfs: add locks for volumegroup Adding a lock for the volumegroup so that we can take care of serializing the same requests to ensure same requests are not served in parallel. Signed-off-by: Madhu Rajanna --- internal/cephfs/controllerserver.go | 4 ++++ internal/cephfs/driver.go | 1 + 2 files changed, 5 insertions(+) diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index dac4bad3b..e6de82ea5 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -56,6 +56,10 @@ type ControllerServer struct { // A map storing all volumes/snapshots with ongoing operations. OperationLocks *util.OperationLock + // A map storing all volumes with ongoing operations so that additional operations + // for that same volume (as defined by volumegroup ID/volumegroup name) return an Aborted error + VolumeGroupLocks *util.VolumeLocks + // Cluster name ClusterName string diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index d0f90c1e4..dd78cf5c8 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -67,6 +67,7 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { DefaultControllerServer: csicommon.NewDefaultControllerServer(d), VolumeLocks: util.NewVolumeLocks(), SnapshotLocks: util.NewVolumeLocks(), + VolumeGroupLocks: util.NewVolumeLocks(), OperationLocks: util.NewOperationLock(), } } From eff0fe3a238731d4fde83fa7d4281c6a693715d9 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 13 Feb 2024 14:33:10 +0100 Subject: [PATCH 09/17] cephfs: add error for quiesce operation added ErrInProgress to indicate the the quiesce operation is in progress. Signed-off-by: Madhu Rajanna --- internal/cephfs/errors/errors.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/cephfs/errors/errors.go b/internal/cephfs/errors/errors.go index 790119d9a..a354aa57e 100644 --- a/internal/cephfs/errors/errors.go +++ b/internal/cephfs/errors/errors.go @@ -58,6 +58,9 @@ var ( // ErrVolumeHasSnapshots is returned when a subvolume has snapshots. ErrVolumeHasSnapshots = coreError.New("volume has snapshots") + + // ErrQuiesceInProgress is returned when quiesce operation is in progress. + ErrQuiesceInProgress = coreError.New("quiesce operation is in progress") ) // IsCloneRetryError returns true if the clone error is pending,in-progress From 6a4c45deebc7cff7c2679a5c80cc411eaff5d54b Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 13 Feb 2024 14:34:38 +0100 Subject: [PATCH 10/17] cephfs: add helper for quiesce api added helper function which calls the go-ceph API for the quiesce operations. Signed-off-by: Madhu Rajanna --- internal/cephfs/core/quiesce.go | 250 ++++++++++++++++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 internal/cephfs/core/quiesce.go diff --git a/internal/cephfs/core/quiesce.go b/internal/cephfs/core/quiesce.go new file mode 100644 index 000000000..d867e3ef9 --- /dev/null +++ b/internal/cephfs/core/quiesce.go @@ -0,0 +1,250 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package core + +import ( + "context" + + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/ceph/go-ceph/cephfs/admin" +) + +type QuiesceState string + +const ( + Released QuiesceState = "RELEASED" + Quiescing QuiesceState = "QUIESCING" + Quiesced QuiesceState = "QUIESCED" +) + +// GetQuiesceState returns the quiesce state of the filesystem. +func GetQuiesceState(set admin.QuiesceState) QuiesceState { + var state QuiesceState + switch set.Name { + case "RELEASED": + state = Released + case "QUIESCING": + state = Quiescing + case "QUIESCED": + state = Quiesced + default: + state = QuiesceState(set.Name) + } + + return state +} + +type FSQuiesceClient interface { + // Destroy destroys the connection used for FSAdmin. + Destroy() + // FSQuiesce quiesces the subvolumes in the filesystem. + FSQuiesce( + ctx context.Context, + reserveName string, + ) (*admin.QuiesceInfo, error) + // GetVolumes returns the list of volumes in the filesystem that are to be + // quiesced. + GetVolumes() []Volume + // FSQuiesceWithExpireTimeout quiesces the subvolumes in the filesystem + // with an expiration timeout. it should be used after FSQuiesce to reset + // the expire timeout. This helps in keeping the subvolumes in the + // filesystem in quiesced state until all snapshots are taken. + FSQuiesceWithExpireTimeout(ctx context.Context, + reserveName string, + ) (*admin.QuiesceInfo, error) + // ResetFSQuiesce resets the quiesce timeout for the subvolumes in + // the filesystem. + ResetFSQuiesce(ctx context.Context, + reserveName string, + ) (*admin.QuiesceInfo, error) + // ReleaseFSQuiesce releases the quiesce on the subvolumes in the + // filesystem. + ReleaseFSQuiesce(ctx context.Context, + reserveName string, + ) (*admin.QuiesceInfo, error) +} + +type Volume struct { + VolumeID string + ClusterID string +} + +type fsQuiesce struct { + connection *util.ClusterConnection + fsName string + volumes []Volume + // subVolumeGroupMapping is a map of subvolumes to groups. + subVolumeGroupMapping map[string][]string + fsa *admin.FSAdmin +} + +// NewFSQuiesce returns a new instance of fsQuiesce. It +// take the filesystem name, the list of volumes to be quiesced, the mapping of +// subvolumes to groups and the cluster connection as input. +func NewFSQuiesce( + fsName string, + volumes []Volume, + mapping map[string][]string, + conn *util.ClusterConnection, +) (FSQuiesceClient, error) { + fsa, err := conn.GetFSAdmin() + if err != nil { + return nil, err + } + + return &fsQuiesce{ + connection: conn, + fsName: fsName, + volumes: volumes, + subVolumeGroupMapping: mapping, + fsa: fsa, + }, nil +} + +// Destroy destroys the connection used for FSAdmin. +func (fq *fsQuiesce) Destroy() { + if fq.connection != nil { + fq.connection.Destroy() + } +} + +// GetVolumes returns the list of volumes in the filesystem that are to be +// quiesced. +func (fq *fsQuiesce) GetVolumes() []Volume { + return fq.volumes +} + +// getMembers returns the list of names in the format +// group/subvolume that are to be quiesced. This is the format that the +// ceph fs quiesce expects. +// Example: ["group1/subvolume1", "group1/subvolume2", "group2/subvolume1"]. +func (fq *fsQuiesce) getMembers() []string { + volName := []string{} + for svg, sb := range fq.subVolumeGroupMapping { + for _, s := range sb { + name := svg + "/" + s + volName = append(volName, name) + } + } + + return volName +} + +func (fq *fsQuiesce) FSQuiesce( + ctx context.Context, + reserveName string, +) (*admin.QuiesceInfo, error) { + opt := &admin.FSQuiesceOptions{ + Timeout: 180, + AwaitFor: 0, + Expiration: 180, + } + log.DebugLog(ctx, + "FSQuiesce for reserveName %s: members:%v options:%v", + reserveName, + fq.getMembers(), + opt) + resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, fq.getMembers(), reserveName, opt) + if resp != nil { + qInfo := resp.Sets[reserveName] + + return &qInfo, nil + } + + log.ErrorLog(ctx, "failed to quiesce filesystem %s", err) + + return nil, err +} + +func (fq *fsQuiesce) FSQuiesceWithExpireTimeout(ctx context.Context, + reserveName string, +) (*admin.QuiesceInfo, error) { + opt := &admin.FSQuiesceOptions{ + Timeout: 180, + AwaitFor: 0, + Expiration: 180, + } + log.DebugLog(ctx, + "FSQuiesceWithExpireTimeout for reserveName %s: members:%v options:%v", + reserveName, + fq.getMembers(), + opt) + resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, fq.getMembers(), reserveName, opt) + if resp != nil { + qInfo := resp.Sets[reserveName] + + return &qInfo, nil + } + + log.ErrorLog(ctx, "failed to quiesce filesystem with expire timeout %s", err) + + return nil, err +} + +func (fq *fsQuiesce) ResetFSQuiesce(ctx context.Context, + reserveName string, +) (*admin.QuiesceInfo, error) { + opt := &admin.FSQuiesceOptions{ + Reset: true, + AwaitFor: 0, + Timeout: 180, + Expiration: 180, + } + // Reset the filesystem quiesce so that the timer will be reset, and we can + // reuse the same reservation if it has already failed or timed out. + log.DebugLog(ctx, + "ResetFSQuiesce for reserveName %s: members:%v options:%v", + reserveName, + fq.getMembers(), + opt) + resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, fq.getMembers(), reserveName, opt) + if resp != nil { + qInfo := resp.Sets[reserveName] + + return &qInfo, nil + } + + log.ErrorLog(ctx, "failed to reset timeout for quiesce filesystem %s", err) + + return nil, err +} + +func (fq *fsQuiesce) ReleaseFSQuiesce(ctx context.Context, + reserveName string, +) (*admin.QuiesceInfo, error) { + opt := &admin.FSQuiesceOptions{ + AwaitFor: 0, + Release: true, + } + log.DebugLog(ctx, + "ReleaseFSQuiesce for reserveName %s: members:%v options:%v", + reserveName, + fq.getMembers(), + opt) + resp, err := fq.fsa.FSQuiesce(fq.fsName, admin.NoGroup, []string{}, reserveName, opt) + if resp != nil { + qInfo := resp.Sets[reserveName] + + return &qInfo, nil + } + + log.ErrorLog(ctx, "failed to release quiesce of filesystem %s", err) + + return nil, err +} From 86bf74bb5cf87cc9c303497f556ea216b2011d2b Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Mon, 19 Feb 2024 09:38:28 +0100 Subject: [PATCH 11/17] cephfs: add helper function to getVolumeOptions added helper function to extract basic details from the parameters related to volume options. Signed-off-by: Madhu Rajanna --- internal/cephfs/store/volumeoptions.go | 39 ++++++++++++++++++-------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index 068afd725..5ed493e86 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -209,10 +209,32 @@ func fmtBackingSnapshotOptionMismatch(optName, expected, actual string) error { optName, actual, expected) } +// getVolumeOptions validates the basic required basic options provided in the +// volume parameters and extract the volumeOptions from volume parameters. +// It contains the following checks: +// - clusterID must be set +// - monitors must be set +// - fsName must be set. +func getVolumeOptions(vo map[string]string) (*VolumeOptions, error) { + opts := VolumeOptions{} + clusterData, err := GetClusterInformation(vo) + if err != nil { + return nil, err + } + + opts.ClusterID = clusterData.ClusterID + opts.Monitors = strings.Join(clusterData.Monitors, ",") + opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup + + if err = extractOption(&opts.FsName, "fsName", vo); err != nil { + return nil, err + } + + return &opts, nil +} + // NewVolumeOptions generates a new instance of volumeOptions from the provided // CSI request parameters. -// -//nolint:gocyclo,cyclop // TODO: reduce complexity func NewVolumeOptions( ctx context.Context, requestName, @@ -222,20 +244,17 @@ func NewVolumeOptions( cr *util.Credentials, ) (*VolumeOptions, error) { var ( - opts VolumeOptions + opts *VolumeOptions backingSnapshotBool string err error ) volOptions := req.GetParameters() - clusterData, err := GetClusterInformation(volOptions) + opts, err = getVolumeOptions(volOptions) if err != nil { return nil, err } - opts.ClusterID = clusterData.ClusterID - opts.Monitors = strings.Join(clusterData.Monitors, ",") - opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup opts.Owner = k8s.GetOwner(volOptions) opts.BackingSnapshot = IsShallowVolumeSupported(req) @@ -247,10 +266,6 @@ func NewVolumeOptions( return nil, err } - if err = extractOption(&opts.FsName, "fsName", volOptions); err != nil { - return nil, err - } - if err = extractOptionalOption(&opts.KernelMountOptions, "kernelMountOptions", volOptions); err != nil { return nil, err } @@ -323,7 +338,7 @@ func NewVolumeOptions( } } - return &opts, nil + return opts, nil } // IsShallowVolumeSupported returns true only for ReadOnly volume requests From ffb2b1144dfe3d01934fa71bac45abbda8b5c68a Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 13 Feb 2024 14:49:48 +0100 Subject: [PATCH 12/17] cephfs: add helper for group options volumegroup.go holders all the helpers to extra the group details from the request and also to extra group details from the groupID. This also provide helpers to reserve group for the request Name and also an undo function incase if somethings goes wrong and we need to cleanup the reserved omap entries. Signed-off-by: Madhu Rajanna --- internal/cephfs/store/volumegroup.go | 285 +++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 internal/cephfs/store/volumegroup.go diff --git a/internal/cephfs/store/volumegroup.go b/internal/cephfs/store/volumegroup.go new file mode 100644 index 000000000..4286ad76f --- /dev/null +++ b/internal/cephfs/store/volumegroup.go @@ -0,0 +1,285 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package store + +import ( + "context" + "fmt" + + "github.com/ceph/ceph-csi/internal/cephfs/core" + cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" + fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +type VolumeGroupOptions struct { + *VolumeOptions +} + +// NewVolumeGroupOptions generates a new instance of volumeGroupOptions from the provided +// CSI request parameters. +func NewVolumeGroupOptions( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest, + cr *util.Credentials, +) (*VolumeGroupOptions, error) { + var ( + opts = &VolumeGroupOptions{} + err error + ) + + volOptions := req.GetParameters() + opts.VolumeOptions, err = getVolumeOptions(volOptions) + if err != nil { + return nil, err + } + + if err = extractOptionalOption(&opts.NamePrefix, "volumeGroupNamePrefix", volOptions); err != nil { + return nil, err + } + + opts.RequestName = req.GetName() + + err = opts.Connect(cr) + if err != nil { + return nil, err + } + + defer func() { + if err != nil { + opts.Destroy() + } + }() + + fs := core.NewFileSystem(opts.conn) + opts.FscID, err = fs.GetFscID(ctx, opts.FsName) + if err != nil { + return nil, err + } + + opts.MetadataPool, err = fs.GetMetadataPool(ctx, opts.FsName) + if err != nil { + return nil, err + } + + return opts, nil +} + +type VolumeGroupSnapshotIdentifier struct { + ReservedID string + FsVolumeGroupSnapshotName string + VolumeGroupSnapshotID string + RequestName string + VolumeSnapshotMap map[string]string +} + +// GetVolumeIDs returns the list of volumeIDs in the VolumeSnaphotMap. +func (vgsi *VolumeGroupSnapshotIdentifier) GetVolumeIDs() []string { + keys := make([]string, 0, len(vgsi.VolumeSnapshotMap)) + for k := range vgsi.VolumeSnapshotMap { + keys = append(keys, k) + } + + return keys +} + +// NewVolumeGroupOptionsFromID generates a new instance of volumeGroupOptions and GroupIdentifier +// from the provided CSI volumeGroupSnapshotID. +func NewVolumeGroupOptionsFromID( + ctx context.Context, + volumeGroupSnapshotID string, + cr *util.Credentials, +) (*VolumeGroupOptions, *VolumeGroupSnapshotIdentifier, error) { + var ( + vi util.CSIIdentifier + volOptions = &VolumeGroupOptions{} + vgs VolumeGroupSnapshotIdentifier + ) + // Decode the snapID first, to detect pre-provisioned snapshot before other errors + err := vi.DecomposeCSIID(volumeGroupSnapshotID) + if err != nil { + return nil, nil, cerrors.ErrInvalidVolID + } + volOptions.VolumeOptions = &VolumeOptions{} + volOptions.ClusterID = vi.ClusterID + vgs.VolumeGroupSnapshotID = volumeGroupSnapshotID + volOptions.FscID = vi.LocationID + vgs.ReservedID = vi.ObjectUUID + + if volOptions.Monitors, err = util.Mons(util.CsiConfigFile, vi.ClusterID); err != nil { + return nil, nil, fmt.Errorf( + "failed to fetch monitor list using clusterID (%s): %w", + vi.ClusterID, + err) + } + + err = volOptions.Connect(cr) + if err != nil { + return nil, nil, err + } + // in case of an error, volOptions is returned, but callers may not + // expect to need to call Destroy() on it. So, make sure to release any + // resources that may have been allocated + defer func() { + if err != nil { + volOptions.Destroy() + } + }() + + fs := core.NewFileSystem(volOptions.conn) + volOptions.FsName, err = fs.GetFsName(ctx, volOptions.FscID) + if err != nil { + return nil, nil, err + } + + volOptions.MetadataPool, err = fs.GetMetadataPool(ctx, volOptions.FsName) + if err != nil { + return nil, nil, err + } + + j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return nil, nil, err + } + defer j.Destroy() + + groupAttributes, err := j.GetVolumeGroupAttributes( + ctx, volOptions.MetadataPool, vi.ObjectUUID) + if err != nil { + return nil, nil, err + } + + vgs.RequestName = groupAttributes.RequestName + vgs.FsVolumeGroupSnapshotName = groupAttributes.GroupName + vgs.VolumeGroupSnapshotID = volumeGroupSnapshotID + vgs.VolumeSnapshotMap = groupAttributes.VolumeSnapshotMap + + return volOptions, &vgs, nil +} + +/* +CheckVolumeGroupSnapExists checks to determine if passed in RequestName in +volGroupOptions exists on the backend. + +**NOTE:** These functions manipulate the rados omaps that hold information +regarding volume group snapshot names as requested by the CSI drivers. Hence, +these need to be invoked only when the respective CSI driver generated volume +group snapshot name based locks are held, as otherwise racy access to these +omaps may end up leaving them in an inconsistent state. +*/ +func CheckVolumeGroupSnapExists( + ctx context.Context, + volOptions *VolumeGroupOptions, + cr *util.Credentials, +) (*VolumeGroupSnapshotIdentifier, error) { + // Connect to cephfs' default radosNamespace (csi) + j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return nil, err + } + defer j.Destroy() + + volGroupData, err := j.CheckReservation( + ctx, volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix) + if err != nil { + return nil, err + } + if volGroupData == nil { + return nil, nil + } + vgs := &VolumeGroupSnapshotIdentifier{} + vgs.RequestName = volOptions.RequestName + vgs.ReservedID = volGroupData.GroupUUID + vgs.FsVolumeGroupSnapshotName = volGroupData.GroupName + vgs.VolumeSnapshotMap = volGroupData.VolumeGroupAttributes.VolumeSnapshotMap + + // found a snapshot already available, process and return it! + vgs.VolumeGroupSnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID, + "", volOptions.ClusterID, volGroupData.GroupUUID) + if err != nil { + return nil, err + } + log.DebugLog(ctx, "Found existing volume group snapshot (%s) with UUID (%s) for request (%s) and mapping %v", + vgs.RequestName, volGroupData.GroupUUID, vgs.RequestName, vgs.VolumeSnapshotMap) + + return vgs, nil +} + +// ReserveVolumeGroup is a helper routine to request a UUID reservation for the +// CSI request name and, +// to generate the volumegroup snapshot identifier for the reserved UUID. +func ReserveVolumeGroup( + ctx context.Context, + volOptions *VolumeGroupOptions, + cr *util.Credentials, +) (*VolumeGroupSnapshotIdentifier, error) { + var ( + vgsi VolumeGroupSnapshotIdentifier + groupUUID string + err error + ) + + vgsi.RequestName = volOptions.RequestName + // Connect to cephfs' default radosNamespace (csi) + j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return nil, err + } + defer j.Destroy() + + groupUUID, vgsi.FsVolumeGroupSnapshotName, err = j.ReserveName( + ctx, volOptions.MetadataPool, util.InvalidPoolID, volOptions.RequestName, volOptions.NamePrefix) + if err != nil { + return nil, err + } + + // generate the snapshot ID to return to the CO system + vgsi.VolumeGroupSnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID, + "", volOptions.ClusterID, groupUUID) + if err != nil { + return nil, err + } + + log.DebugLog(ctx, "Generated volume group snapshot ID (%s) for request name (%s)", + vgsi.VolumeGroupSnapshotID, volOptions.RequestName) + + return &vgsi, nil +} + +// UndoVolumeGroupReservation is a helper routine to undo a name reservation +// for a CSI volumeGroupSnapshot name. +func UndoVolumeGroupReservation( + ctx context.Context, + volOptions *VolumeGroupOptions, + vgsi *VolumeGroupSnapshotIdentifier, + cr *util.Credentials, +) error { + // Connect to cephfs' default radosNamespace (csi) + j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return err + } + defer j.Destroy() + + err = j.UndoReservation(ctx, volOptions.MetadataPool, + vgsi.FsVolumeGroupSnapshotName, vgsi.RequestName) + + return err +} From df770e4139de7b4a5448d15c1f9c4ea26c7ca503 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 14 Feb 2024 13:31:50 +0100 Subject: [PATCH 13/17] cephfs: implement CreateVolumeGroupSnapshot RPC implemented CreateVolumeGroupSnapshot RPC which does below operations * Basic request validation * Reserve the UUID for the group name * Quiesce the filesystem for all the subvolumes from the input volumeId's * Take the snapshot for all the input volumeId's * Add the mapping between volumeId's and snapshot Id's in omap * Release the quiesce for the filesystem for all the subvolumes from the input volumeId's Undo all the operations if anything fails. Signed-off-by: Madhu Rajanna --- internal/cephfs/groupcontrollerserver.go | 610 +++++++++++++++++++++++ 1 file changed, 610 insertions(+) diff --git a/internal/cephfs/groupcontrollerserver.go b/internal/cephfs/groupcontrollerserver.go index 6092f8a14..ae05b7195 100644 --- a/internal/cephfs/groupcontrollerserver.go +++ b/internal/cephfs/groupcontrollerserver.go @@ -18,13 +18,25 @@ package cephfs import ( "context" + "errors" + "fmt" + "sort" + "time" + "github.com/ceph/ceph-csi/internal/cephfs/core" + cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" + "github.com/ceph/ceph-csi/internal/cephfs/store" + fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" + "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" + "github.com/ceph/go-ceph/cephfs/admin" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/kubernetes-csi/csi-lib-utils/protosanitizer" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/timestamppb" + "k8s.io/utils/strings/slices" ) // validateCreateVolumeGroupSnapshotRequest validates the request for creating @@ -61,3 +73,601 @@ func (cs *ControllerServer) validateCreateVolumeGroupSnapshotRequest( return nil } + +// CreateVolumeGroupSnapshot creates a group snapshot of volumes. +func (cs *ControllerServer) CreateVolumeGroupSnapshot( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest) ( + *csi.CreateVolumeGroupSnapshotResponse, + error, +) { + if err := cs.validateCreateVolumeGroupSnapshotRequest(ctx, req); err != nil { + return nil, err + } + + requestName := req.GetName() + // Existence and conflict checks + if acquired := cs.VolumeGroupLocks.TryAcquire(requestName); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, requestName) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, requestName) + } + defer cs.VolumeGroupLocks.Release(requestName) + + cr, err := util.NewAdminCredentials(req.GetSecrets()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer cr.DeleteCredentials() + + vg, err := store.NewVolumeGroupOptions(ctx, req, cr) + if err != nil { + log.ErrorLog(ctx, "failed to get volume group options: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + defer vg.Destroy() + + vgs, err := store.CheckVolumeGroupSnapExists(ctx, vg, cr) + if err != nil { + log.ErrorLog(ctx, "failed to check volume group snapshot exists: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + + // Get the fs names and subvolume from the volume ids to execute quiesce commands. + fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), req.GetSourceVolumeIds(), cr) + if err != nil { + log.ErrorLog(ctx, "failed to get fs names and subvolume from volume ids: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + defer destroyFSConnections(fsMap) + + needRelease := checkIfFSNeedQuiesceRelease(vgs, req.GetSourceVolumeIds()) + if needRelease { + return cs.releaseQuiesceAndGetVolumeGroupSnapshotResponse(ctx, req, vgs, fsMap, vg, cr) + } + + // If the volume group snapshot does not exist, reserve the volume group + if vgs == nil { + vgs, err = store.ReserveVolumeGroup(ctx, vg, cr) + if err != nil { + log.ErrorLog(ctx, "failed to reserve volume group: %v", err) + + return nil, status.Error(codes.Internal, err.Error()) + } + } + + inProgress, err := cs.queisceFileSystems(ctx, vgs, fsMap) + if err != nil { + log.ErrorLog(ctx, "failed to quiesce filesystems: %v", err) + if !errors.Is(err, cerrors.ErrQuiesceInProgress) { + uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets()) + if uErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr) + } + } + + return nil, status.Error(codes.Internal, err.Error()) + } + + if inProgress { + return nil, status.Error(codes.Internal, "Quiesce operation is in progress") + } + + resp, err := cs.createSnapshotAddToVolumeGroupJournal(ctx, req, vg, vgs, cr, fsMap) + if err != nil { + log.ErrorLog(ctx, "failed to create snapshot and add to volume group journal: %v", err) + + if !errors.Is(err, cerrors.ErrQuiesceInProgress) { + // Handle Undo reservation and timeout as well + uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets()) + if uErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr) + } + } + + return nil, status.Error(codes.Internal, err.Error()) + } + + response := &csi.CreateVolumeGroupSnapshotResponse{} + response.GroupSnapshot = &csi.VolumeGroupSnapshot{ + GroupSnapshotId: vgs.VolumeGroupSnapshotID, + ReadyToUse: true, + CreationTime: timestamppb.New(time.Now()), + } + + for _, r := range *resp { + r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID + response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot) + } + + return response, nil +} + +// queisceFileSystems quiesces the subvolumes and subvolume groups present in +// the filesystems of the volumeID's present in the +// CreateVolumeGroupSnapshotRequest. +func (cs *ControllerServer) queisceFileSystems(ctx context.Context, + vgs *store.VolumeGroupSnapshotIdentifier, + fsMap map[string]core.FSQuiesceClient, +) (bool, error) { + var inProgress bool + for _, fm := range fsMap { + // Quiesce the fs, subvolumes and subvolume groups + data, err := fm.FSQuiesce(ctx, vgs.RequestName) + if err != nil { + log.ErrorLog(ctx, "failed to quiesce filesystem: %v", err) + + return inProgress, err + } + state := core.GetQuiesceState(data.State) + if state == core.Quiescing { + inProgress = true + } else if state != core.Quiesced { + return inProgress, fmt.Errorf("quiesce operation is in %s state", state) + } + } + + return inProgress, nil +} + +// releaseQuiesceAndGetVolumeGroupSnapshotResponse releases the quiesce of the +// subvolumes and subvolume groups in the filesystems for the volumeID's +// present in the CreateVolumeGroupSnapshotRequest. +func (cs *ControllerServer) releaseQuiesceAndGetVolumeGroupSnapshotResponse( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest, + vgs *store.VolumeGroupSnapshotIdentifier, + fsMap map[string]core.FSQuiesceClient, + vg *store.VolumeGroupOptions, + cr *util.Credentials, +) (*csi.CreateVolumeGroupSnapshotResponse, error) { + matchesSourceVolumeIDs := matchesSourceVolumeIDs(vgs.GetVolumeIDs(), req.GetSourceVolumeIds()) + if !matchesSourceVolumeIDs { + return nil, status.Errorf( + codes.InvalidArgument, + "source volume ids %v do not match in the existing volume group snapshot %v", + req.GetSourceVolumeIds(), + vgs.GetVolumeIDs()) + } + // Release the quiesce of the subvolumes and subvolume groups in the + // filesystems for the volumes. + for _, fm := range fsMap { + // UnFreeze the filesystems, subvolumes and subvolume groups + data, err := fm.ReleaseFSQuiesce(ctx, vg.RequestName) + if err != nil { + log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err) + uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets()) + if uErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr) + } + + return nil, status.Errorf(codes.Internal, "failed to release filesystem quiesce: %v", err) + } + state := core.GetQuiesceState(data.State) + if state != core.Released { + return nil, status.Errorf(codes.Internal, "quiesce operation is in %s state", state) + } + } + var err error + defer func() { + if err != nil && !errors.Is(err, cerrors.ErrQuiesceInProgress) { + uErr := cs.deleteSnapshotsAndUndoReservation(ctx, vgs, cr, fsMap, req.GetSecrets()) + if uErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", uErr) + } + } + }() + snapshotResponses := make([]csi.CreateSnapshotResponse, 0) + for _, volID := range req.GetSourceVolumeIds() { + // Create the snapshot for the volumeID + clusterID := getClusterIDForVolumeID(fsMap, volID) + if clusterID == "" { + return nil, status.Errorf(codes.Internal, "failed to get clusterID for volumeID %s", volID) + } + + req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName, + clusterID, + req.GetSecrets()) + var resp *csi.CreateSnapshotResponse + resp, err = cs.createSnapshotAndAddMapping(ctx, req, vg, vgs, cr) + if err != nil { + // Handle cleanup + log.ErrorLog(ctx, "failed to create snapshot: %v", err) + + return nil, status.Errorf(codes.Internal, + "failed to create snapshot and add to volume group journal: %v", + err) + } + snapshotResponses = append(snapshotResponses, *resp) + } + + response := &csi.CreateVolumeGroupSnapshotResponse{} + response.GroupSnapshot = &csi.VolumeGroupSnapshot{ + GroupSnapshotId: vgs.VolumeGroupSnapshotID, + ReadyToUse: true, + CreationTime: timestamppb.New(time.Now()), + } + + for _, r := range snapshotResponses { + r.Snapshot.GroupSnapshotId = vgs.VolumeGroupSnapshotID + response.GroupSnapshot.Snapshots = append(response.GroupSnapshot.Snapshots, r.Snapshot) + } + + return response, nil +} + +// createSnapshotAddToVolumeGroupJournal creates the snapshot and adds the +// snapshotID and volumeID to the volume group journal omap. If the freeze is +// true then it will freeze the subvolumes and subvolume groups before creating +// the snapshot and unfreeze them after creating the snapshot. If the freeze is +// false it will call createSnapshot and get the snapshot details for the +// volume and add the snapshotID and volumeID to the volume group journal omap. +// If any error occurs other than ErrInProgress it will delete the snapshots +// and undo the reservation and return the error. +func (cs *ControllerServer) createSnapshotAddToVolumeGroupJournal( + ctx context.Context, + req *csi.CreateVolumeGroupSnapshotRequest, + vgo *store.VolumeGroupOptions, + vgs *store.VolumeGroupSnapshotIdentifier, + cr *util.Credentials, + fsMap map[string]core.FSQuiesceClient) ( + *[]csi.CreateSnapshotResponse, + error, +) { + var err error + var resp *csi.CreateSnapshotResponse + + responses := make([]csi.CreateSnapshotResponse, 0) + for _, volID := range req.GetSourceVolumeIds() { + err = fsQuiesceWithExpireTimeout(ctx, vgo.RequestName, fsMap) + if err != nil { + log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err) + + return nil, err + } + + // Create the snapshot for the volumeID + clusterID := getClusterIDForVolumeID(fsMap, volID) + if clusterID == "" { + return nil, fmt.Errorf("failed to get clusterID for volumeID %s", volID) + } + + req := formatCreateSnapshotRequest(volID, vgs.FsVolumeGroupSnapshotName, + clusterID, + req.GetSecrets()) + resp, err = cs.createSnapshotAndAddMapping(ctx, req, vgo, vgs, cr) + if err != nil { + // Handle cleanup + log.ErrorLog(ctx, "failed to create snapshot: %v", err) + + return nil, err + } + responses = append(responses, *resp) + } + + err = releaseFSQuiesce(ctx, vgo.RequestName, fsMap) + if err != nil { + log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err) + + return nil, err + } + + return &responses, nil +} + +func formatCreateSnapshotRequest(volID, groupSnapshotName, + clusterID string, + secret map[string]string, +) *csi.CreateSnapshotRequest { + return &csi.CreateSnapshotRequest{ + SourceVolumeId: volID, + Name: groupSnapshotName + "-" + volID, + Secrets: secret, + Parameters: map[string]string{ + "clusterID": clusterID, + }, + } +} + +// releaseSubvolumeQuiesce releases the quiesce of the subvolumes and subvolume +// groups in the filesystems for the volumeID's present in the +// CreateVolumeGroupSnapshotRequest. +func releaseFSQuiesce(ctx context.Context, + requestName string, + fsMap map[string]core.FSQuiesceClient, +) error { + inProgress := false + var err error + var data *admin.QuiesceInfo + for _, fm := range fsMap { + // UnFreeze the filesystems, subvolumes and subvolume groups + data, err = fm.ReleaseFSQuiesce(ctx, requestName) + if err != nil { + log.ErrorLog(ctx, "failed to release filesystem quiesce: %v", err) + + return err + } + state := core.GetQuiesceState(data.State) + if state != core.Released { + inProgress = true + } + } + + if inProgress { + return cerrors.ErrQuiesceInProgress + } + + return nil +} + +// fsQuiesceWithExpireTimeout quiesces the subvolumes and subvolume +// groups in the filesystems for the volumeID's present in the +// CreateVolumeGroupSnapshotRequest. +func fsQuiesceWithExpireTimeout(ctx context.Context, + requestName string, + fsMap map[string]core.FSQuiesceClient, +) error { + var err error + + var data *admin.QuiesceInfo + inProgress := false + for _, fm := range fsMap { + // reinitialize the expiry timer for the quiesce + data, err = fm.FSQuiesceWithExpireTimeout(ctx, requestName) + if err != nil { + log.ErrorLog(ctx, "failed to quiesce filesystem with timeout: %v", err) + + return err + } + state := core.GetQuiesceState(data.State) + if state == core.Quiescing { + inProgress = true + } else if state != core.Quiesced { + return fmt.Errorf("quiesce operation is in %s state", state) + } + } + + if inProgress { + return cerrors.ErrQuiesceInProgress + } + + return nil +} + +// createSnapshotAndAddMapping creates the snapshot and adds the snapshotID and +// volumeID to the volume group journal omap. If any error occurs it will +// delete the last created snapshot as its still not added to the journal. +func (cs *ControllerServer) createSnapshotAndAddMapping( + ctx context.Context, + req *csi.CreateSnapshotRequest, + vgo *store.VolumeGroupOptions, + vgs *store.VolumeGroupSnapshotIdentifier, + cr *util.Credentials, +) (*csi.CreateSnapshotResponse, error) { + // Create the snapshot + resp, err := cs.CreateSnapshot(ctx, req) + if err != nil { + // Handle cleanup + log.ErrorLog(ctx, "failed to create snapshot: %v", err) + + return nil, err + } + j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return nil, err + } + defer j.Destroy() + // Add the snapshot to the volume group journal + err = j.AddVolumeSnapshotMapping(ctx, + vgo.MetadataPool, + vgs.ReservedID, + req.GetSourceVolumeId(), + resp.GetSnapshot().GetSnapshotId()) + if err != nil { + log.ErrorLog(ctx, "failed to add volume snapshot mapping: %v", err) + // Delete the last created snapshot as its still not added to the + // journal + delReq := &csi.DeleteSnapshotRequest{ + SnapshotId: resp.GetSnapshot().GetSnapshotId(), + Secrets: req.GetSecrets(), + } + _, dErr := cs.DeleteSnapshot(ctx, delReq) + if dErr != nil { + log.ErrorLog(ctx, "failed to delete snapshot %s: %v", resp.GetSnapshot().GetSnapshotId(), dErr) + } + + return nil, err + } + + return resp, nil +} + +// checkIfFSNeedQuiesceRelease checks that do we have snapshots for all the +// volumes stored in the omap so that we can release the quiesce. +func checkIfFSNeedQuiesceRelease(vgs *store.VolumeGroupSnapshotIdentifier, volIDs []string) bool { + if vgs == nil { + return false + } + // If the number of volumes in the snapshot is not equal to the number of volumes + + return len(vgs.GetVolumeIDs()) == len(volIDs) +} + +// getClusterIDForVolumeID gets the clusterID for the volumeID from the fms map. +func getClusterIDForVolumeID(fms map[string]core.FSQuiesceClient, volumeID string) string { + for _, fm := range fms { + for _, vol := range fm.GetVolumes() { + if vol.VolumeID == volumeID { + return vol.ClusterID + } + } + } + + return "" +} + +// getFsNamesAndSubVolumeFromVolumeIDs gets the filesystem names and subvolumes +// from the volumeIDs present in the CreateVolumeGroupSnapshotRequest. It also +// returns the SubVolumeQuiesceClient for the filesystems present in the +// volumeIDs. +func getFsNamesAndSubVolumeFromVolumeIDs(ctx context.Context, + secret map[string]string, + volIDs []string, + cr *util.Credentials) ( + map[string]core.FSQuiesceClient, + error, +) { + type fs struct { + fsName string + volumes []core.Volume + subVolumeGroupMapping map[string][]string + monitors string + } + fm := make(map[string]fs, 0) + for _, volID := range volIDs { + // Find the volume using the provided VolumeID + volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, + volID, nil, secret, "", false) + if err != nil { + return nil, err + } + volOptions.Destroy() + // choosing monitorIP's and fsName as the unique key + // TODO: Need to use something else as the unique key as users can + // still choose the different monitorIP's and fsName for subvolumes + uniqueName := volOptions.Monitors + volOptions.FsName + if _, ok := fm[uniqueName]; !ok { + fm[uniqueName] = fs{ + fsName: volOptions.FsName, + volumes: make([]core.Volume, 0), + subVolumeGroupMapping: make(map[string][]string), // Initialize the map + monitors: volOptions.Monitors, + } + } + a := core.Volume{ + VolumeID: volID, + ClusterID: volOptions.ClusterID, + } + // Retrieve the value, modify it, and assign it back + val := fm[uniqueName] + val.volumes = append(val.volumes, a) + existingVolIDInMap := val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup] + val.subVolumeGroupMapping[volOptions.SubVolume.SubvolumeGroup] = append( + existingVolIDInMap, + volOptions.SubVolume.VolID) + fm[uniqueName] = val + } + fsk := map[string]core.FSQuiesceClient{} + var err error + defer func() { + if err != nil { + destroyFSConnections(fsk) + } + }() + for k, v := range fm { + conn := &util.ClusterConnection{} + if err = conn.Connect(v.monitors, cr); err != nil { + return nil, err + } + fsk[k], err = core.NewFSQuiesce(v.fsName, v.volumes, v.subVolumeGroupMapping, conn) + if err != nil { + log.ErrorLog(ctx, "failed to get subvolume quiesce: %v", err) + conn.Destroy() + + return nil, err + } + } + + return fsk, nil +} + +// destroyFSConnections destroys connections of all FSQuiesceClient. +func destroyFSConnections(fsMap map[string]core.FSQuiesceClient) { + for _, fm := range fsMap { + if fm != nil { + fm.Destroy() + } + } +} + +// matchesSourceVolumeIDs checks if the sourceVolumeIDs and volumeIDsInOMap are +// equal. +func matchesSourceVolumeIDs(sourceVolumeIDs, volumeIDsInOMap []string) bool { + // sort the array as its required for slices.Equal call. + sort.Strings(sourceVolumeIDs) + sort.Strings(volumeIDsInOMap) + + return slices.Equal(sourceVolumeIDs, volumeIDsInOMap) +} + +// deleteSnapshotsAndUndoReservation deletes the snapshots and undoes the +// volume group reservation. It also resets the quiesce of the subvolumes and +// subvolume groups in the filesystems for the volumeID's present in the +// CreateVolumeGroupSnapshotRequest. +func (cs *ControllerServer) deleteSnapshotsAndUndoReservation(ctx context.Context, + vgs *store.VolumeGroupSnapshotIdentifier, + cr *util.Credentials, + fsMap map[string]core.FSQuiesceClient, + secrets map[string]string, +) error { + // get the omap from the snapshot and volume mapping + vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, vgs.VolumeGroupSnapshotID, cr) + if err != nil { + log.ErrorLog(ctx, "failed to get volume group options from id: %v", err) + + return err + } + defer vgo.Destroy() + + for volID, snapID := range vgsi.VolumeSnapshotMap { + // delete the snapshots + req := &csi.DeleteSnapshotRequest{ + SnapshotId: snapID, + Secrets: secrets, + } + _, err = cs.DeleteSnapshot(ctx, req) + if err != nil { + log.ErrorLog(ctx, "failed to delete snapshot: %v", err) + + return err + } + + j, err := store.VolumeGroupJournal.Connect(vgo.Monitors, fsutil.RadosNamespace, cr) + if err != nil { + return err + } + // remove the entry from the omap + err = j.RemoveVolumeSnapshotMapping( + ctx, + vgo.MetadataPool, + vgsi.ReservedID, + volID) + j.Destroy() + if err != nil { + log.ErrorLog(ctx, "failed to remove volume snapshot mapping: %v", err) + + return err + } + // undo the reservation + err = store.UndoVolumeGroupReservation(ctx, vgo, vgsi, cr) + if err != nil { + log.ErrorLog(ctx, "failed to undo volume group reservation: %v", err) + + return err + } + } + + for _, fm := range fsMap { + _, err := fm.ResetFSQuiesce(ctx, vgs.RequestName) + if err != nil { + log.ErrorLog(ctx, "failed to reset filesystem quiesce: %v", err) + + return err + } + } + + return nil +} From 0f724480f5d93f70a6c6a55d048bf5d6a50d546d Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 14 Feb 2024 13:52:58 +0100 Subject: [PATCH 14/17] cephfs: implement DeleteVolumeGroupSnapshot RPC implemented DeleteVolumeGroupSnapshot RPC which does below operations * Basic request validation * Get the snapshotId's and volumeId's mapping reserved for the UUID * Delete snapshot and remove its mapping from the omap * Repeat above steps until all the mapping are removed * Remove the reserved uuid from the omap * Reset the filesystem quiesce, This might be required as cephfs doesnt provide any options to remove the quiesce, if we get any request with same ID again we can reuse the quiesce API for same set-id * Return success if the received error is Pool not found or key not found. Signed-off-by: Madhu Rajanna --- internal/cephfs/groupcontrollerserver.go | 106 +++++++++++++++++++++++ 1 file changed, 106 insertions(+) diff --git a/internal/cephfs/groupcontrollerserver.go b/internal/cephfs/groupcontrollerserver.go index ae05b7195..42ab7aec1 100644 --- a/internal/cephfs/groupcontrollerserver.go +++ b/internal/cephfs/groupcontrollerserver.go @@ -671,3 +671,109 @@ func (cs *ControllerServer) deleteSnapshotsAndUndoReservation(ctx context.Contex return nil } + +// validateVolumeGroupSnapshotDeleteRequest validates the request for creating a group +// snapshot of volumes. +func (cs *ControllerServer) validateVolumeGroupSnapshotDeleteRequest( + ctx context.Context, + req *csi.DeleteVolumeGroupSnapshotRequest, +) error { + if err := cs.Driver.ValidateGroupControllerServiceRequest( + csi.GroupControllerServiceCapability_RPC_CREATE_DELETE_GET_VOLUME_GROUP_SNAPSHOT); err != nil { + log.ErrorLog(ctx, "invalid create volume group snapshot req: %v", protosanitizer.StripSecrets(req)) + + return err + } + + // Check sanity of request volume group snapshot Name, Source Volume Id's + if req.GetGroupSnapshotId() == "" { + return status.Error(codes.InvalidArgument, "volume group snapshot id cannot be empty") + } + + return nil +} + +// DeleteVolumeGroupSnapshot deletes a group snapshot of volumes. +func (cs *ControllerServer) DeleteVolumeGroupSnapshot(ctx context.Context, + req *csi.DeleteVolumeGroupSnapshotRequest) ( + *csi.DeleteVolumeGroupSnapshotResponse, + error, +) { + if err := cs.validateVolumeGroupSnapshotDeleteRequest(ctx, req); err != nil { + return nil, err + } + + groupSnapshotID := req.GroupSnapshotId + // Existence and conflict checks + if acquired := cs.VolumeGroupLocks.TryAcquire(groupSnapshotID); !acquired { + log.ErrorLog(ctx, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID) + + return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, groupSnapshotID) + } + defer cs.VolumeGroupLocks.Release(groupSnapshotID) + + cr, err := util.NewAdminCredentials(req.GetSecrets()) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + defer cr.DeleteCredentials() + + vgo, vgsi, err := store.NewVolumeGroupOptionsFromID(ctx, req.GroupSnapshotId, cr) + if err != nil { + log.ErrorLog(ctx, "failed to get volume group options: %v", err) + err = extractDeleteVolumeGroupError(err) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.DeleteVolumeGroupSnapshotResponse{}, nil + } + vgo.Destroy() + + volIds := vgsi.GetVolumeIDs() + fsMap, err := getFsNamesAndSubVolumeFromVolumeIDs(ctx, req.GetSecrets(), volIds, cr) + err = extractDeleteVolumeGroupError(err) + if err != nil { + log.ErrorLog(ctx, "failed to get volume group options: %v", err) + err = extractDeleteVolumeGroupError(err) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.DeleteVolumeGroupSnapshotResponse{}, nil + } + + defer destroyFSConnections(fsMap) + + err = cs.deleteSnapshotsAndUndoReservation(ctx, vgsi, cr, fsMap, req.GetSecrets()) + if err != nil { + log.ErrorLog(ctx, "failed to delete snapshot and undo reservation: %v", err) + err = extractDeleteVolumeGroupError(err) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return &csi.DeleteVolumeGroupSnapshotResponse{}, nil + } + + return &csi.DeleteVolumeGroupSnapshotResponse{}, nil +} + +// extractDeleteVolumeGroupError extracts the error from the delete volume +// group snapshot and returns the error if it is not a ErrKeyNotFound or +// ErrPoolNotFound error. +func extractDeleteVolumeGroupError(err error) error { + switch { + case errors.Is(err, util.ErrPoolNotFound): + // if error is ErrPoolNotFound, the pool is already deleted we dont + // need to worry about deleting snapshot or omap data, return success + return nil + case errors.Is(err, util.ErrKeyNotFound): + // if error is ErrKeyNotFound, then a previous attempt at deletion was complete + // or partially complete (snap and snapOMap are garbage collected already), hence return + // success as deletion is complete + return nil + } + + return err +} From 728a7f5ac796e5a83f9507922ebbfb555c13fb44 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 14 Feb 2024 14:02:31 +0100 Subject: [PATCH 15/17] util: add UnimplementedGroupControllerServer adding UnimplementedGroupControllerServer to the DefaultControllerServer struct to avoid build errors when some non mandatory RPC's are not implemented. Signed-off-by: Madhu Rajanna --- internal/csi-common/controllerserver-default.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/csi-common/controllerserver-default.go b/internal/csi-common/controllerserver-default.go index 2ce290928..114638056 100644 --- a/internal/csi-common/controllerserver-default.go +++ b/internal/csi-common/controllerserver-default.go @@ -29,6 +29,7 @@ import ( // DefaultControllerServer points to default driver. type DefaultControllerServer struct { csi.UnimplementedControllerServer + csi.UnimplementedGroupControllerServer Driver *CSIDriver } From aa2094ba307c944c04aa41b502b86b99596af066 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Thu, 15 Feb 2024 08:19:08 +0100 Subject: [PATCH 16/17] cephfs: unit test for validateVolumeGroupSnapshotRequest Added unit test for validateVolumeGroupSnapshotRequest API which validates the input VolumeGroupSnapshotRequest request Signed-off-by: Madhu Rajanna --- internal/cephfs/groupcontrollerserver_test.go | 121 ++++++++++++++++++ 1 file changed, 121 insertions(+) create mode 100644 internal/cephfs/groupcontrollerserver_test.go diff --git a/internal/cephfs/groupcontrollerserver_test.go b/internal/cephfs/groupcontrollerserver_test.go new file mode 100644 index 000000000..eae8d834a --- /dev/null +++ b/internal/cephfs/groupcontrollerserver_test.go @@ -0,0 +1,121 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "context" + "testing" + + csicommon "github.com/ceph/ceph-csi/internal/csi-common" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestControllerServer_validateCreateVolumeGroupSnapshotRequest(t *testing.T) { + t.Parallel() + cs := ControllerServer{ + DefaultControllerServer: csicommon.NewDefaultControllerServer( + csicommon.NewCSIDriver("cephfs.csi.ceph.com", "1.0.0", "test")), + } + + type args struct { + ctx context.Context + req *csi.CreateVolumeGroupSnapshotRequest + } + tests := []struct { + name string + args args + wantErr bool + code codes.Code + }{ + { + "valid CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + Name: "vg-snap-1", + SourceVolumeIds: []string{"vg-1"}, + Parameters: map[string]string{ + "clusterID": "value", + "fsName": "value", + }, + }, + }, + false, + codes.OK, + }, + { + "empty request name in CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + SourceVolumeIds: []string{"vg-1"}, + }, + }, + true, + codes.InvalidArgument, + }, + { + "empty SourceVolumeIds in CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + Name: "vg-snap-1", + SourceVolumeIds: []string{"vg-1"}, + }, + }, + true, + codes.InvalidArgument, + }, + { + "empty clusterID in CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + Name: "vg-snap-1", + SourceVolumeIds: []string{"vg-1"}, + Parameters: map[string]string{"fsName": "value"}, + }, + }, + true, + codes.InvalidArgument, + }, + { + "empty fsName in CreateVolumeGroupSnapshotRequest", + args{ + context.Background(), &csi.CreateVolumeGroupSnapshotRequest{ + Name: "vg-snap-1", + SourceVolumeIds: []string{"vg-1"}, + Parameters: map[string]string{"clusterID": "value"}, + }, + }, + true, + codes.InvalidArgument, + }, + } + for _, tt := range tests { + ts := tt + t.Run(ts.name, func(t *testing.T) { + t.Parallel() + err := cs.validateCreateVolumeGroupSnapshotRequest(ts.args.ctx, ts.args.req) + if ts.wantErr { + c := status.Code(err) + if c != ts.code { + t.Errorf("ControllerServer.validateVolumeGroupSnapshotRequest() error = %v, want code %v", err, c) + } + } + }) + } +} From cd184904513057b8a7543515d8a3b8280a4a8eb6 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Tue, 19 Mar 2024 12:37:51 +0100 Subject: [PATCH 17/17] rebase: update go-ceph to latest commit updating go-ceph to latest commit to pull the changes required for ceph fs quiesce. This is also updating aws sdk dependency. Signed-off-by: Madhu Rajanna --- go.mod | 4 +- go.sum | 8 +- .../aws/aws-sdk-go/aws/endpoints/defaults.go | 210 ++++++++++++++++++ .../github.com/aws/aws-sdk-go/aws/version.go | 2 +- .../ceph/go-ceph/cephfs/admin/fs_quiesce.go | 135 +++++++++++ .../ceph/go-ceph/rbd/snap_group_namespace.go | 53 +++++ vendor/modules.txt | 4 +- 7 files changed, 407 insertions(+), 9 deletions(-) create mode 100644 vendor/github.com/ceph/go-ceph/cephfs/admin/fs_quiesce.go create mode 100644 vendor/github.com/ceph/go-ceph/rbd/snap_group_namespace.go diff --git a/go.mod b/go.mod index b5e20db05..ac06b7d44 100644 --- a/go.mod +++ b/go.mod @@ -6,10 +6,10 @@ toolchain go1.21.5 require ( github.com/IBM/keyprotect-go-client v0.12.2 - github.com/aws/aws-sdk-go v1.50.26 + github.com/aws/aws-sdk-go v1.50.32 github.com/aws/aws-sdk-go-v2/service/sts v1.28.1 github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000 - github.com/ceph/go-ceph v0.26.0 + github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243 github.com/container-storage-interface/spec v1.9.0 github.com/csi-addons/spec v0.2.1-0.20230606140122-d20966d2e444 github.com/gemalto/kmip-go v0.0.10 diff --git a/go.sum b/go.sum index 14e0f5f64..cbf0c5133 100644 --- a/go.sum +++ b/go.sum @@ -833,8 +833,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a h1:idn718Q4B6AGu/h5Sxe66HYVdqdGu2l9Iebqhi/AEoA= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/aws/aws-sdk-go v1.44.164/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= -github.com/aws/aws-sdk-go v1.50.26 h1:tuv8+dje59DBK1Pj65tSCdD36oamBxKYJgbng4bFylc= -github.com/aws/aws-sdk-go v1.50.26/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go v1.50.32 h1:POt81DvegnpQKM4DMDLlHz1CO6OBnEoQ1gRhYFd7QRY= +github.com/aws/aws-sdk-go v1.50.32/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= github.com/aws/aws-sdk-go-v2 v1.25.2 h1:/uiG1avJRgLGiQM9X3qJM8+Qa6KRGK5rRPuXE0HUM+w= github.com/aws/aws-sdk-go-v2 v1.25.2/go.mod h1:Evoc5AsmtveRt1komDwIsjHFyrP5tDuF1D1U+6z6pNo= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.2 h1:bNo4LagzUKbjdxE0tIcR9pMzLR2U/Tgie1Hq1HQ3iH8= @@ -870,8 +870,8 @@ github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyY github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.4.1/go.mod h1:4T9NM4+4Vw91VeyqjLS6ao50K5bOcLKN6Q42XnYaRYw= -github.com/ceph/go-ceph v0.26.0 h1:LZoATo25ZH5aeL5t85BwIbrNLKCDfcDM+e0qV0cmwHY= -github.com/ceph/go-ceph v0.26.0/go.mod h1:ISxb295GszZwtLPkeWi+L2uLYBVsqbsh0M104jZMOX4= +github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243 h1:O99PJ2rNxY+XiN2swRSmJC24V3YInVt5Lk48Em1cdVE= +github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243/go.mod h1:PS15ql+uqcnZN8uD3WuxlImxdaTYtxqJoaTmlFJYnbI= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= diff --git a/vendor/github.com/aws/aws-sdk-go/aws/endpoints/defaults.go b/vendor/github.com/aws/aws-sdk-go/aws/endpoints/defaults.go index a18c83304..25055d6b8 100644 --- a/vendor/github.com/aws/aws-sdk-go/aws/endpoints/defaults.go +++ b/vendor/github.com/aws/aws-sdk-go/aws/endpoints/defaults.go @@ -12547,6 +12547,9 @@ var awsPartition = partition{ endpointKey{ Region: "eu-south-1", }: endpoint{}, + endpointKey{ + Region: "eu-south-2", + }: endpoint{}, endpointKey{ Region: "eu-west-1", }: endpoint{}, @@ -14554,6 +14557,9 @@ var awsPartition = partition{ endpointKey{ Region: "ca-central-1", }: endpoint{}, + endpointKey{ + Region: "ca-west-1", + }: endpoint{}, endpointKey{ Region: "eu-central-1", }: endpoint{}, @@ -19213,66 +19219,222 @@ var awsPartition = partition{ endpointKey{ Region: "af-south-1", }: endpoint{}, + endpointKey{ + Region: "af-south-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.af-south-1.api.aws", + }, endpointKey{ Region: "ap-east-1", }: endpoint{}, + endpointKey{ + Region: "ap-east-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-east-1.api.aws", + }, endpointKey{ Region: "ap-northeast-1", }: endpoint{}, + endpointKey{ + Region: "ap-northeast-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-northeast-1.api.aws", + }, endpointKey{ Region: "ap-northeast-2", }: endpoint{}, + endpointKey{ + Region: "ap-northeast-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-northeast-2.api.aws", + }, endpointKey{ Region: "ap-northeast-3", }: endpoint{}, + endpointKey{ + Region: "ap-northeast-3", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-northeast-3.api.aws", + }, endpointKey{ Region: "ap-south-1", }: endpoint{}, + endpointKey{ + Region: "ap-south-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-south-1.api.aws", + }, endpointKey{ Region: "ap-south-2", }: endpoint{}, + endpointKey{ + Region: "ap-south-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-south-2.api.aws", + }, endpointKey{ Region: "ap-southeast-1", }: endpoint{}, + endpointKey{ + Region: "ap-southeast-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-southeast-1.api.aws", + }, endpointKey{ Region: "ap-southeast-2", }: endpoint{}, + endpointKey{ + Region: "ap-southeast-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-southeast-2.api.aws", + }, endpointKey{ Region: "ap-southeast-3", }: endpoint{}, + endpointKey{ + Region: "ap-southeast-3", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-southeast-3.api.aws", + }, endpointKey{ Region: "ap-southeast-4", }: endpoint{}, + endpointKey{ + Region: "ap-southeast-4", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ap-southeast-4.api.aws", + }, endpointKey{ Region: "ca-central-1", }: endpoint{}, + endpointKey{ + Region: "ca-central-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ca-central-1.api.aws", + }, + endpointKey{ + Region: "ca-central-1", + Variant: fipsVariant, + }: endpoint{ + Hostname: "logs-fips.ca-central-1.amazonaws.com", + }, endpointKey{ Region: "ca-west-1", }: endpoint{}, + endpointKey{ + Region: "ca-west-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.ca-west-1.api.aws", + }, + endpointKey{ + Region: "ca-west-1", + Variant: fipsVariant, + }: endpoint{ + Hostname: "logs-fips.ca-west-1.amazonaws.com", + }, endpointKey{ Region: "eu-central-1", }: endpoint{}, + endpointKey{ + Region: "eu-central-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-central-1.api.aws", + }, endpointKey{ Region: "eu-central-2", }: endpoint{}, + endpointKey{ + Region: "eu-central-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-central-2.api.aws", + }, endpointKey{ Region: "eu-north-1", }: endpoint{}, + endpointKey{ + Region: "eu-north-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-north-1.api.aws", + }, endpointKey{ Region: "eu-south-1", }: endpoint{}, + endpointKey{ + Region: "eu-south-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-south-1.api.aws", + }, endpointKey{ Region: "eu-south-2", }: endpoint{}, + endpointKey{ + Region: "eu-south-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-south-2.api.aws", + }, endpointKey{ Region: "eu-west-1", }: endpoint{}, + endpointKey{ + Region: "eu-west-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-west-1.api.aws", + }, endpointKey{ Region: "eu-west-2", }: endpoint{}, + endpointKey{ + Region: "eu-west-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-west-2.api.aws", + }, endpointKey{ Region: "eu-west-3", }: endpoint{}, + endpointKey{ + Region: "eu-west-3", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.eu-west-3.api.aws", + }, + endpointKey{ + Region: "fips-ca-central-1", + }: endpoint{ + Hostname: "logs-fips.ca-central-1.amazonaws.com", + CredentialScope: credentialScope{ + Region: "ca-central-1", + }, + Deprecated: boxedTrue, + }, + endpointKey{ + Region: "fips-ca-west-1", + }: endpoint{ + Hostname: "logs-fips.ca-west-1.amazonaws.com", + CredentialScope: credentialScope{ + Region: "ca-west-1", + }, + Deprecated: boxedTrue, + }, endpointKey{ Region: "fips-us-east-1", }: endpoint{ @@ -19312,18 +19474,48 @@ var awsPartition = partition{ endpointKey{ Region: "il-central-1", }: endpoint{}, + endpointKey{ + Region: "il-central-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.il-central-1.api.aws", + }, endpointKey{ Region: "me-central-1", }: endpoint{}, + endpointKey{ + Region: "me-central-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.me-central-1.api.aws", + }, endpointKey{ Region: "me-south-1", }: endpoint{}, + endpointKey{ + Region: "me-south-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.me-south-1.api.aws", + }, endpointKey{ Region: "sa-east-1", }: endpoint{}, + endpointKey{ + Region: "sa-east-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.sa-east-1.api.aws", + }, endpointKey{ Region: "us-east-1", }: endpoint{}, + endpointKey{ + Region: "us-east-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.us-east-1.api.aws", + }, endpointKey{ Region: "us-east-1", Variant: fipsVariant, @@ -19333,6 +19525,12 @@ var awsPartition = partition{ endpointKey{ Region: "us-east-2", }: endpoint{}, + endpointKey{ + Region: "us-east-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.us-east-2.api.aws", + }, endpointKey{ Region: "us-east-2", Variant: fipsVariant, @@ -19342,6 +19540,12 @@ var awsPartition = partition{ endpointKey{ Region: "us-west-1", }: endpoint{}, + endpointKey{ + Region: "us-west-1", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.us-west-1.api.aws", + }, endpointKey{ Region: "us-west-1", Variant: fipsVariant, @@ -19351,6 +19555,12 @@ var awsPartition = partition{ endpointKey{ Region: "us-west-2", }: endpoint{}, + endpointKey{ + Region: "us-west-2", + Variant: dualStackVariant, + }: endpoint{ + Hostname: "logs.us-west-2.api.aws", + }, endpointKey{ Region: "us-west-2", Variant: fipsVariant, diff --git a/vendor/github.com/aws/aws-sdk-go/aws/version.go b/vendor/github.com/aws/aws-sdk-go/aws/version.go index ed0729d44..554b0ebde 100644 --- a/vendor/github.com/aws/aws-sdk-go/aws/version.go +++ b/vendor/github.com/aws/aws-sdk-go/aws/version.go @@ -5,4 +5,4 @@ package aws const SDKName = "aws-sdk-go" // SDKVersion is the version of this SDK -const SDKVersion = "1.50.26" +const SDKVersion = "1.50.32" diff --git a/vendor/github.com/ceph/go-ceph/cephfs/admin/fs_quiesce.go b/vendor/github.com/ceph/go-ceph/cephfs/admin/fs_quiesce.go new file mode 100644 index 000000000..3c93dcc8c --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/cephfs/admin/fs_quiesce.go @@ -0,0 +1,135 @@ +//go:build ceph_preview + +package admin + +import "fmt" + +// fixedPointFloat is a custom type that implements the MarshalJSON interface. +// This is used to format float64 values to two decimal places. +// By default these get converted to integers in the JSON output and +// fail the command. +type fixedPointFloat float64 + +// MarshalJSON provides a custom implementation for the JSON marshalling +// of fixedPointFloat. It formats the float to two decimal places. +func (fpf fixedPointFloat) MarshalJSON() ([]byte, error) { + return []byte(fmt.Sprintf("%.2f", float64(fpf))), nil +} + +// fSQuiesceFields is the internal type used to create JSON for ceph. +// See FSQuiesceOptions for the type that users of the library +// interact with. +type fSQuiesceFields struct { + Prefix string `json:"prefix"` + VolName string `json:"vol_name"` + GroupName string `json:"group_name,omitempty"` + Members []string `json:"members,omitempty"` + SetId string `json:"set_id,omitempty"` + Timeout fixedPointFloat `json:"timeout,omitempty"` + Expiration fixedPointFloat `json:"expiration,omitempty"` + AwaitFor fixedPointFloat `json:"await_for,omitempty"` + Await bool `json:"await,omitempty"` + IfVersion int `json:"if_version,omitempty"` + Include bool `json:"include,omitempty"` + Exclude bool `json:"exclude,omitempty"` + Reset bool `json:"reset,omitempty"` + Release bool `json:"release,omitempty"` + Query bool `json:"query,omitempty"` + All bool `json:"all,omitempty"` + Cancel bool `json:"cancel,omitempty"` +} + +// FSQuiesceOptions are used to specify optional, non-identifying, values +// to be used when quiescing a cephfs volume. +type FSQuiesceOptions struct { + Timeout float64 + Expiration float64 + AwaitFor float64 + Await bool + IfVersion int + Include bool + Exclude bool + Reset bool + Release bool + Query bool + All bool + Cancel bool +} + +// toFields is used to convert the FSQuiesceOptions to the internal +// fSQuiesceFields type. +func (o *FSQuiesceOptions) toFields(volume, group string, subvolumes []string, setId string) *fSQuiesceFields { + return &fSQuiesceFields{ + Prefix: "fs quiesce", + VolName: volume, + GroupName: group, + Members: subvolumes, + SetId: setId, + Timeout: fixedPointFloat(o.Timeout), + Expiration: fixedPointFloat(o.Expiration), + AwaitFor: fixedPointFloat(o.AwaitFor), + Await: o.Await, + IfVersion: o.IfVersion, + Include: o.Include, + Exclude: o.Exclude, + Reset: o.Reset, + Release: o.Release, + Query: o.Query, + All: o.All, + Cancel: o.Cancel, + } +} + +// QuiesceState is used to report the state of a quiesced fs volume. +type QuiesceState struct { + Name string `json:"name"` + Age float64 `json:"age"` +} + +// QuiesceInfoMember is used to report the state of a quiesced fs volume. +// This is part of sets members object array in the json. +type QuiesceInfoMember struct { + Excluded bool `json:"excluded"` + State QuiesceState `json:"state"` +} + +// QuiesceInfo reports various informational values about a quiesced volume. +// This is returned as sets object array in the json. +type QuiesceInfo struct { + Version int `json:"version"` + AgeRef float64 `json:"age_ref"` + State QuiesceState `json:"state"` + Timeout float64 `json:"timeout"` + Expiration float64 `json:"expiration"` + Members map[string]QuiesceInfoMember `json:"members"` +} + +// FSQuiesceInfo reports various informational values about quiesced volumes. +type FSQuiesceInfo struct { + Epoch int `json:"epoch"` + SetVersion int `json:"set_version"` + Sets map[string]QuiesceInfo `json:"sets"` +} + +// parseFSQuiesceInfo is used to parse the response from the quiesce command. It returns a FSQuiesceInfo object. +func parseFSQuiesceInfo(res response) (*FSQuiesceInfo, error) { + var info FSQuiesceInfo + if err := res.NoStatus().Unmarshal(&info).End(); err != nil { + return nil, err + } + return &info, nil +} + +// FSQuiesce will quiesce the specified subvolumes in a volume. +// Quiescing a fs will prevent new writes to the subvolumes. +// Similar To: +// +// ceph fs quiesce +func (fsa *FSAdmin) FSQuiesce(volume, group string, subvolumes []string, setId string, o *FSQuiesceOptions) (*FSQuiesceInfo, error) { + if o == nil { + o = &FSQuiesceOptions{} + } + f := o.toFields(volume, group, subvolumes, setId) + + return parseFSQuiesceInfo(fsa.marshalMgrCommand(f)) +} diff --git a/vendor/github.com/ceph/go-ceph/rbd/snap_group_namespace.go b/vendor/github.com/ceph/go-ceph/rbd/snap_group_namespace.go new file mode 100644 index 000000000..88f975d05 --- /dev/null +++ b/vendor/github.com/ceph/go-ceph/rbd/snap_group_namespace.go @@ -0,0 +1,53 @@ +//go:build ceph_preview + +package rbd + +// #cgo LDFLAGS: -lrbd +// #include +import "C" + +// SnapGroupNamespace provides details about a single snapshot that was taken +// as part of an RBD group. +type SnapGroupNamespace struct { + Pool uint64 + GroupName string + GroupSnapName string +} + +// GetSnapGroupNamespace returns the SnapGroupNamespace of the snapshot which +// is part of a group. The caller should make sure that the snapshot ID passed +// in this function belongs to a snapshot that was taken as part of a group +// snapshot. +// +// Implements: +// +// int rbd_snap_get_group_namespace(rbd_image_t image, uint64_t snap_id, +// rbd_snap_group_namespace_t *group_snap, +// size_t group_snap_size) +func (image *Image) GetSnapGroupNamespace(snapID uint64) (*SnapGroupNamespace, error) { + if err := image.validate(imageIsOpen); err != nil { + return nil, err + } + + var ( + err error + sgn C.rbd_snap_group_namespace_t + ) + + ret := C.rbd_snap_get_group_namespace(image.image, + C.uint64_t(snapID), + &sgn, + C.sizeof_rbd_snap_group_namespace_t) + err = getError(ret) + if err != nil { + return nil, err + } + + defer C.rbd_snap_group_namespace_cleanup(&sgn, C.sizeof_rbd_snap_group_namespace_t) + + return &SnapGroupNamespace{ + Pool: uint64(sgn.group_pool), + GroupName: C.GoString(sgn.group_name), + GroupSnapName: C.GoString(sgn.group_snap_name), + }, nil +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 0ff4bae3d..914a8b8f1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -81,7 +81,7 @@ github.com/antlr/antlr4/runtime/Go/antlr/v4 # github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a ## explicit github.com/asaskevich/govalidator -# github.com/aws/aws-sdk-go v1.50.26 +# github.com/aws/aws-sdk-go v1.50.32 ## explicit; go 1.19 github.com/aws/aws-sdk-go/aws github.com/aws/aws-sdk-go/aws/auth/bearer @@ -204,7 +204,7 @@ github.com/ceph/ceph-csi/api/deploy/kubernetes/cephfs github.com/ceph/ceph-csi/api/deploy/kubernetes/nfs github.com/ceph/ceph-csi/api/deploy/kubernetes/rbd github.com/ceph/ceph-csi/api/deploy/ocp -# github.com/ceph/go-ceph v0.26.0 +# github.com/ceph/go-ceph v0.26.1-0.20240319113421-755481f8c243 ## explicit; go 1.19 github.com/ceph/go-ceph/cephfs github.com/ceph/go-ceph/cephfs/admin