/* Copyright 2024 The Ceph-CSI Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package store import ( "context" "fmt" "github.com/ceph/ceph-csi/internal/cephfs/core" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" "github.com/container-storage-interface/spec/lib/go/csi" ) type VolumeGroupOptions struct { *VolumeOptions } // NewVolumeGroupOptions generates a new instance of volumeGroupOptions from the provided // CSI request parameters. func NewVolumeGroupOptions( ctx context.Context, req *csi.CreateVolumeGroupSnapshotRequest, cr *util.Credentials, ) (*VolumeGroupOptions, error) { var ( opts = &VolumeGroupOptions{} err error ) volOptions := req.GetParameters() opts.VolumeOptions, err = getVolumeOptions(volOptions) if err != nil { return nil, err } if err = extractOptionalOption(&opts.NamePrefix, "volumeGroupNamePrefix", volOptions); err != nil { return nil, err } opts.RequestName = req.GetName() err = opts.Connect(cr) if err != nil { return nil, err } defer func() { if err != nil { opts.Destroy() } }() fs := core.NewFileSystem(opts.conn) opts.FscID, err = fs.GetFscID(ctx, opts.FsName) if err != nil { return nil, err } opts.MetadataPool, err = fs.GetMetadataPool(ctx, opts.FsName) if err != nil { return nil, err } return opts, nil } type VolumeGroupSnapshotIdentifier struct { ReservedID string FsVolumeGroupSnapshotName string VolumeGroupSnapshotID string RequestName string VolumeSnapshotMap map[string]string } // GetVolumeIDs returns the list of volumeIDs in the VolumeSnaphotMap. func (vgsi *VolumeGroupSnapshotIdentifier) GetVolumeIDs() []string { keys := make([]string, 0, len(vgsi.VolumeSnapshotMap)) for k := range vgsi.VolumeSnapshotMap { keys = append(keys, k) } return keys } // NewVolumeGroupOptionsFromID generates a new instance of volumeGroupOptions and GroupIdentifier // from the provided CSI volumeGroupSnapshotID. func NewVolumeGroupOptionsFromID( ctx context.Context, volumeGroupSnapshotID string, cr *util.Credentials, ) (*VolumeGroupOptions, *VolumeGroupSnapshotIdentifier, error) { var ( vi util.CSIIdentifier volOptions = &VolumeGroupOptions{} vgs VolumeGroupSnapshotIdentifier ) // Decode the snapID first, to detect pre-provisioned snapshot before other errors err := vi.DecomposeCSIID(volumeGroupSnapshotID) if err != nil { return nil, nil, cerrors.ErrInvalidVolID } volOptions.VolumeOptions = &VolumeOptions{} volOptions.ClusterID = vi.ClusterID vgs.VolumeGroupSnapshotID = volumeGroupSnapshotID volOptions.FscID = vi.LocationID vgs.ReservedID = vi.ObjectUUID if volOptions.Monitors, err = util.Mons(util.CsiConfigFile, vi.ClusterID); err != nil { return nil, nil, fmt.Errorf( "failed to fetch monitor list using clusterID (%s): %w", vi.ClusterID, err) } err = volOptions.Connect(cr) if err != nil { return nil, nil, err } // in case of an error, volOptions is returned, but callers may not // expect to need to call Destroy() on it. So, make sure to release any // resources that may have been allocated defer func() { if err != nil { volOptions.Destroy() } }() fs := core.NewFileSystem(volOptions.conn) volOptions.FsName, err = fs.GetFsName(ctx, volOptions.FscID) if err != nil { return nil, nil, err } volOptions.MetadataPool, err = fs.GetMetadataPool(ctx, volOptions.FsName) if err != nil { return nil, nil, err } j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) if err != nil { return nil, nil, err } defer j.Destroy() groupAttributes, err := j.GetVolumeGroupAttributes( ctx, volOptions.MetadataPool, vi.ObjectUUID) if err != nil { return nil, nil, err } vgs.RequestName = groupAttributes.RequestName vgs.FsVolumeGroupSnapshotName = groupAttributes.GroupName vgs.VolumeGroupSnapshotID = volumeGroupSnapshotID vgs.VolumeSnapshotMap = groupAttributes.VolumeSnapshotMap return volOptions, &vgs, nil } /* CheckVolumeGroupSnapExists checks to determine if passed in RequestName in volGroupOptions exists on the backend. **NOTE:** These functions manipulate the rados omaps that hold information regarding volume group snapshot names as requested by the CSI drivers. Hence, these need to be invoked only when the respective CSI driver generated volume group snapshot name based locks are held, as otherwise racy access to these omaps may end up leaving them in an inconsistent state. */ func CheckVolumeGroupSnapExists( ctx context.Context, volOptions *VolumeGroupOptions, cr *util.Credentials, ) (*VolumeGroupSnapshotIdentifier, error) { // Connect to cephfs' default radosNamespace (csi) j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) if err != nil { return nil, err } defer j.Destroy() volGroupData, err := j.CheckReservation( ctx, volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix) if err != nil { return nil, err } if volGroupData == nil { return nil, nil } vgs := &VolumeGroupSnapshotIdentifier{} vgs.RequestName = volOptions.RequestName vgs.ReservedID = volGroupData.GroupUUID vgs.FsVolumeGroupSnapshotName = volGroupData.GroupName vgs.VolumeSnapshotMap = volGroupData.VolumeGroupAttributes.VolumeSnapshotMap // found a snapshot already available, process and return it! vgs.VolumeGroupSnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID, "", volOptions.ClusterID, volGroupData.GroupUUID) if err != nil { return nil, err } log.DebugLog(ctx, "Found existing volume group snapshot (%s) with UUID (%s) for request (%s) and mapping %v", vgs.RequestName, volGroupData.GroupUUID, vgs.RequestName, vgs.VolumeSnapshotMap) return vgs, nil } // ReserveVolumeGroup is a helper routine to request a UUID reservation for the // CSI request name and, // to generate the volumegroup snapshot identifier for the reserved UUID. func ReserveVolumeGroup( ctx context.Context, volOptions *VolumeGroupOptions, cr *util.Credentials, ) (*VolumeGroupSnapshotIdentifier, error) { var ( vgsi VolumeGroupSnapshotIdentifier groupUUID string err error ) vgsi.RequestName = volOptions.RequestName // Connect to cephfs' default radosNamespace (csi) j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) if err != nil { return nil, err } defer j.Destroy() groupUUID, vgsi.FsVolumeGroupSnapshotName, err = j.ReserveName( ctx, volOptions.MetadataPool, util.InvalidPoolID, volOptions.RequestName, volOptions.NamePrefix) if err != nil { return nil, err } // generate the snapshot ID to return to the CO system vgsi.VolumeGroupSnapshotID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID, "", volOptions.ClusterID, groupUUID) if err != nil { return nil, err } log.DebugLog(ctx, "Generated volume group snapshot ID (%s) for request name (%s)", vgsi.VolumeGroupSnapshotID, volOptions.RequestName) return &vgsi, nil } // UndoVolumeGroupReservation is a helper routine to undo a name reservation // for a CSI volumeGroupSnapshot name. func UndoVolumeGroupReservation( ctx context.Context, volOptions *VolumeGroupOptions, vgsi *VolumeGroupSnapshotIdentifier, cr *util.Credentials, ) error { // Connect to cephfs' default radosNamespace (csi) j, err := VolumeGroupJournal.Connect(volOptions.Monitors, fsutil.RadosNamespace, cr) if err != nil { return err } defer j.Destroy() err = j.UndoReservation(ctx, volOptions.MetadataPool, vgsi.FsVolumeGroupSnapshotName, vgsi.RequestName) return err }