diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index 14d3db153..d7f5fd6d7 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -52,6 +52,9 @@ type ControllerServer struct { // A map storing all volumes/snapshots with ongoing operations. OperationLocks *util.OperationLock + + // Cluster name + ClusterName string } // createBackingVolume creates the backing subvolume and on any error cleans up any created entities. @@ -63,7 +66,8 @@ func (cs *ControllerServer) createBackingVolume( sID *store.SnapshotIdentifier, ) error { var err error - volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) + volClient := core.NewSubVolume(volOptions.GetConnection(), + &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName) if sID != nil { return cs.createBackingVolumeFromSnapshotSource(ctx, volOptions, parentVolOpt, volClient, sID) @@ -97,7 +101,7 @@ func (cs *ControllerServer) createBackingVolumeFromSnapshotSource( defer cs.OperationLocks.ReleaseRestoreLock(sID.SnapshotID) if volOptions.BackingSnapshot { - if err := store.AddSnapshotBackedVolumeRef(ctx, volOptions); err != nil { + if err := store.AddSnapshotBackedVolumeRef(ctx, volOptions, cs.ClusterName); err != nil { log.ErrorLog(ctx, "failed to create snapshot-backed volume from snapshot %s: %v", sID.FsSnapshotName, err) @@ -146,6 +150,7 @@ func checkContentSource( ctx context.Context, req *csi.CreateVolumeRequest, cr *util.Credentials, + clusterName string, ) (*store.VolumeOptions, *store.VolumeIdentifier, *store.SnapshotIdentifier, error) { if req.VolumeContentSource == nil { return nil, nil, nil, nil @@ -154,7 +159,7 @@ func checkContentSource( switch volumeSource.Type.(type) { case *csi.VolumeContentSource_Snapshot: snapshotID := req.VolumeContentSource.GetSnapshot().GetSnapshotId() - volOpt, _, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr) + volOpt, _, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, clusterName) if err != nil { if errors.Is(err, cerrors.ErrSnapNotFound) { return nil, nil, nil, status.Error(codes.NotFound, err.Error()) @@ -167,7 +172,7 @@ func checkContentSource( case *csi.VolumeContentSource_Volume: // Find the volume using the provided VolumeID volID := req.VolumeContentSource.GetVolume().GetVolumeId() - parentVol, pvID, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, req.Secrets) + parentVol, pvID, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, req.Secrets, clusterName) if err != nil { if !errors.Is(err, cerrors.ErrVolumeNotFound) { return nil, nil, nil, status.Error(codes.NotFound, err.Error()) @@ -252,7 +257,7 @@ func (cs *ControllerServer) CreateVolume( } defer cs.VolumeLocks.Release(requestName) - volOptions, err := store.NewVolumeOptions(ctx, requestName, req, cr) + volOptions, err := store.NewVolumeOptions(ctx, requestName, cs.ClusterName, req, cr) if err != nil { log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) @@ -264,7 +269,7 @@ func (cs *ControllerServer) CreateVolume( volOptions.Size = util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes()) } - parentVol, pvID, sID, err := checkContentSource(ctx, req, cr) + parentVol, pvID, sID, err := checkContentSource(ctx, req, cr, cs.ClusterName) if err != nil { return nil, err } @@ -277,7 +282,7 @@ func (cs *ControllerServer) CreateVolume( return nil, status.Error(codes.InvalidArgument, err.Error()) } - vID, err := store.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr) + vID, err := store.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr, cs.ClusterName) if err != nil { if cerrors.IsCloneRetryError(err) { return nil, status.Error(codes.Aborted, err.Error()) @@ -289,7 +294,8 @@ func (cs *ControllerServer) CreateVolume( metadata := k8s.GetVolumeMetadata(req.GetParameters()) if vID != nil { - volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) + volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, + volOptions.ClusterID, cs.ClusterName) if sID != nil || pvID != nil && !volOptions.BackingSnapshot { err = volClient.ExpandVolume(ctx, volOptions.Size) if err != nil { @@ -369,12 +375,13 @@ func (cs *ControllerServer) CreateVolume( return nil, err } + volClient := core.NewSubVolume(volOptions.GetConnection(), + &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName) if !volOptions.BackingSnapshot { // Get root path for the created subvolume. // Note that root path for snapshot-backed volumes has been already set when // building VolumeOptions. - volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) volOptions.RootPath, err = volClient.GetVolumeRootPathCeph(ctx) if err != nil { purgeErr := volClient.PurgeVolume(ctx, true) @@ -457,7 +464,7 @@ func (cs *ControllerServer) DeleteVolume( defer cs.OperationLocks.ReleaseDeleteLock(req.GetVolumeId()) // Find the volume using the provided VolumeID - volOptions, vID, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets) + volOptions, vID, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets, cs.ClusterName) if err != nil { // if error is ErrPoolNotFound, the pool is already deleted we dont // need to worry about deleting subvolume or omap data, return success @@ -512,7 +519,7 @@ func (cs *ControllerServer) DeleteVolume( } defer cr.DeleteCredentials() - if err := cleanUpBackingVolume(ctx, volOptions, vID, cr); err != nil { + if err := cleanUpBackingVolume(ctx, volOptions, vID, cr, cs.ClusterName); err != nil { return nil, err } @@ -530,11 +537,13 @@ func cleanUpBackingVolume( volOptions *store.VolumeOptions, volID *store.VolumeIdentifier, cr *util.Credentials, + clusterName string, ) error { if !volOptions.BackingSnapshot { // Regular volumes need to be purged. - volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) + volClient := core.NewSubVolume(volOptions.GetConnection(), + &volOptions.SubVolume, volOptions.ClusterID, clusterName) if err := volClient.PurgeVolume(ctx, false); err != nil { log.ErrorLog(ctx, "failed to delete volume %s: %v", volID, err) if errors.Is(err, cerrors.ErrVolumeHasSnapshots) { @@ -566,7 +575,8 @@ func cleanUpBackingVolume( return nil } - snapParentVolOptions, _, snapID, err := store.NewSnapshotOptionsFromID(ctx, volOptions.BackingSnapshotID, cr) + snapParentVolOptions, _, snapID, err := store.NewSnapshotOptionsFromID(ctx, + volOptions.BackingSnapshotID, cr, clusterName) if err != nil { absorbErrs := []error{ util.ErrPoolNotFound, @@ -659,7 +669,7 @@ func (cs *ControllerServer) ControllerExpandVolume( } defer cr.DeleteCredentials() - volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, secret) + volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, secret, cs.ClusterName) if err != nil { log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) @@ -673,7 +683,8 @@ func (cs *ControllerServer) ControllerExpandVolume( RoundOffSize := util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes()) - volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID) + volClient := core.NewSubVolume(volOptions.GetConnection(), + &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName) if err = volClient.ResizeVolume(ctx, RoundOffSize); err != nil { log.ErrorLog(ctx, "failed to expand volume %s: %v", fsutil.VolumeID(volIdentifier.FsSubvolName), err) @@ -726,7 +737,8 @@ func (cs *ControllerServer) CreateSnapshot( defer cs.OperationLocks.ReleaseSnapshotCreateLock(sourceVolID) // Find the volume using the provided VolumeID - parentVolOptions, vid, err := store.NewVolumeOptionsFromVolID(ctx, sourceVolID, nil, req.GetSecrets()) + parentVolOptions, vid, err := store.NewVolumeOptionsFromVolID(ctx, + sourceVolID, nil, req.GetSecrets(), cs.ClusterName) if err != nil { if errors.Is(err, util.ErrPoolNotFound) { log.WarningLog(ctx, "failed to get backend volume for %s: %v", sourceVolID, err) @@ -780,7 +792,8 @@ func (cs *ControllerServer) CreateSnapshot( // too. volClient := core.NewSubVolume(parentVolOptions.GetConnection(), &parentVolOptions.SubVolume, - parentVolOptions.ClusterID) + parentVolOptions.ClusterID, + cs.ClusterName) info, err := volClient.GetSubVolumeInfo(ctx) if err != nil { // Check error code value against ErrInvalidCommand to understand the cluster @@ -806,7 +819,8 @@ func (cs *ControllerServer) CreateSnapshot( // check snapshot is protected protected := true if !(snapInfo.Protected == core.SnapshotIsProtected) { - snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName, &parentVolOptions.SubVolume) + snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), + sid.FsSnapshotName, &parentVolOptions.SubVolume) err = snapClient.ProtectSnapshot(ctx) if err != nil { protected = false @@ -956,7 +970,7 @@ func (cs *ControllerServer) DeleteSnapshot( } defer cs.OperationLocks.ReleaseDeleteLock(snapshotID) - volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr) + volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, cs.ClusterName) if err != nil { switch { case errors.Is(err, util.ErrPoolNotFound): diff --git a/internal/cephfs/core/metadata.go b/internal/cephfs/core/metadata.go index 369aaf772..f15e662c7 100644 --- a/internal/cephfs/core/metadata.go +++ b/internal/cephfs/core/metadata.go @@ -21,6 +21,11 @@ import ( "strings" ) +const ( + // clusterNameKey cluster Key, set on cephfs subvolume. + clusterNameKey = "csi.ceph.com/cluster/name" +) + // setMetadata sets custom metadata on the subvolume in a volume as a // key-value pair. func (s *subVolumeClient) setMetadata(key, value string) error { @@ -52,6 +57,14 @@ func (s *subVolumeClient) SetAllMetadata(parameters map[string]string) error { } } + if s.clusterName != "" { + err := s.setMetadata(clusterNameKey, s.clusterName) + if err != nil { + return fmt.Errorf("failed to set metadata key %q, value %q on subvolume %v: %w", + clusterNameKey, s.clusterName, s, err) + } + } + return nil } diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index c5792c5bb..ab255f166 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -81,9 +81,10 @@ type SubVolumeClient interface { // subVolumeClient implements SubVolumeClient interface. type subVolumeClient struct { - *SubVolume // Embedded SubVolume struct. - clusterID string // Cluster ID to check subvolumegroup and resize functionality. - conn *util.ClusterConnection // Cluster connection. + *SubVolume // Embedded SubVolume struct. + clusterID string // Cluster ID to check subvolumegroup and resize functionality. + clusterName string // Cluster name + conn *util.ClusterConnection // Cluster connection. } // SubVolume holds the information about the subvolume. @@ -97,11 +98,12 @@ type SubVolume struct { } // NewSubVolume returns a new subvolume client. -func NewSubVolume(conn *util.ClusterConnection, vol *SubVolume, clusterID string) SubVolumeClient { +func NewSubVolume(conn *util.ClusterConnection, vol *SubVolume, clusterID, clusterName string) SubVolumeClient { return &subVolumeClient{ - SubVolume: vol, - clusterID: clusterID, - conn: conn, + SubVolume: vol, + clusterID: clusterID, + clusterName: clusterName, + conn: conn, } } diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index b9e39cb68..94d4d2d4d 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -135,6 +135,7 @@ func (fs *Driver) Run(conf *util.Config) { if conf.IsControllerServer { fs.cs = NewControllerServer(fs.cd) + fs.cs.ClusterName = conf.ClusterName } if !conf.IsControllerServer && !conf.IsNodeServer { topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 4f9af2253..d8cdf0063 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -82,7 +82,7 @@ func (ns *NodeServer) getVolumeOptions( volContext, volSecrets map[string]string, ) (*store.VolumeOptions, error) { - volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), volContext, volSecrets) + volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), volContext, volSecrets, "") if err != nil { if !errors.Is(err, cerrors.ErrInvalidVolID) { return nil, status.Error(codes.Internal, err.Error()) diff --git a/internal/cephfs/store/backingsnapshot.go b/internal/cephfs/store/backingsnapshot.go index 2c22839d3..d9500e28c 100644 --- a/internal/cephfs/store/backingsnapshot.go +++ b/internal/cephfs/store/backingsnapshot.go @@ -34,6 +34,7 @@ func fmtBackingSnapshotReftrackerName(backingSnapID string) string { func AddSnapshotBackedVolumeRef( ctx context.Context, volOptions *VolumeOptions, + clusterName string, ) error { ioctx, err := volOptions.conn.GetIoctx(volOptions.MetadataPool) if err != nil { @@ -95,7 +96,7 @@ func AddSnapshotBackedVolumeRef( // There may have been a race between adding a ref to the reftracker and // deleting the backing snapshot. Make sure the snapshot still exists by // trying to retrieve it again. - _, _, _, err = NewSnapshotOptionsFromID(ctx, volOptions.BackingSnapshotID, volOptions.conn.Creds) + _, _, _, err = NewSnapshotOptionsFromID(ctx, volOptions.BackingSnapshotID, volOptions.conn.Creds, clusterName) if err != nil { log.ErrorLog(ctx, "failed to get backing snapshot %s: %v", volOptions.BackingSnapshotID, err) } diff --git a/internal/cephfs/store/fsjournal.go b/internal/cephfs/store/fsjournal.go index 9d84256fd..82c226de0 100644 --- a/internal/cephfs/store/fsjournal.go +++ b/internal/cephfs/store/fsjournal.go @@ -78,6 +78,7 @@ func CheckVolExists(ctx context.Context, pvID *VolumeIdentifier, sID *SnapshotIdentifier, cr *util.Credentials, + clusterName string, ) (*VolumeIdentifier, error) { var vid VolumeIdentifier // Connect to cephfs' default radosNamespace (csi) @@ -99,7 +100,7 @@ func CheckVolExists(ctx context.Context, vid.FsSubvolName = imageData.ImageAttributes.ImageName volOptions.VolID = vid.FsSubvolName - vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID) + vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName) if (sID != nil || pvID != nil) && imageData.ImageAttributes.BackingSnapshotID == "" { cloneState, cloneStateErr := vol.GetCloneState(ctx) if cloneStateErr != nil { diff --git a/internal/cephfs/store/volumeoptions.go b/internal/cephfs/store/volumeoptions.go index b1765d2f1..70ace8cb1 100644 --- a/internal/cephfs/store/volumeoptions.go +++ b/internal/cephfs/store/volumeoptions.go @@ -196,7 +196,7 @@ func fmtBackingSnapshotOptionMismatch(optName, expected, actual string) error { // NewVolumeOptions generates a new instance of volumeOptions from the provided // CSI request parameters. -func NewVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVolumeRequest, +func NewVolumeOptions(ctx context.Context, requestName, clusterName string, req *csi.CreateVolumeRequest, cr *util.Credentials, ) (*VolumeOptions, error) { var ( @@ -289,7 +289,7 @@ func NewVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVo opts.BackingSnapshotID = req.GetVolumeContentSource().GetSnapshot().GetSnapshotId() - err = opts.populateVolumeOptionsFromBackingSnapshot(ctx, cr) + err = opts.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName) if err != nil { return nil, err } @@ -304,6 +304,7 @@ func NewVolumeOptionsFromVolID( ctx context.Context, volID string, volOpt, secrets map[string]string, + clusterName string, ) (*VolumeOptions, *VolumeIdentifier, error) { var ( vi util.CSIIdentifier @@ -407,16 +408,16 @@ func NewVolumeOptionsFromVolID( volOptions.SubVolume.VolID = vid.FsSubvolName if volOptions.BackingSnapshot { - err = volOptions.populateVolumeOptionsFromBackingSnapshot(ctx, cr) + err = volOptions.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName) } else { - err = volOptions.populateVolumeOptionsFromSubvolume(ctx) + err = volOptions.populateVolumeOptionsFromSubvolume(ctx, clusterName) } return &volOptions, &vid, err } -func (vo *VolumeOptions) populateVolumeOptionsFromSubvolume(ctx context.Context) error { - vol := core.NewSubVolume(vo.conn, &vo.SubVolume, vo.ClusterID) +func (vo *VolumeOptions) populateVolumeOptionsFromSubvolume(ctx context.Context, clusterName string) error { + vol := core.NewSubVolume(vo.conn, &vo.SubVolume, vo.ClusterID, clusterName) var info *core.Subvolume info, err := vol.GetSubVolumeInfo(ctx) @@ -436,6 +437,7 @@ func (vo *VolumeOptions) populateVolumeOptionsFromSubvolume(ctx context.Context) func (vo *VolumeOptions) populateVolumeOptionsFromBackingSnapshot( ctx context.Context, cr *util.Credentials, + clusterName string, ) error { // As of CephFS snapshot v2 API, snapshots may be found in two locations: // @@ -457,7 +459,7 @@ func (vo *VolumeOptions) populateVolumeOptionsFromBackingSnapshot( return nil } - parentBackingSnapVolOpts, _, snapID, err := NewSnapshotOptionsFromID(ctx, vo.BackingSnapshotID, cr) + parentBackingSnapVolOpts, _, snapID, err := NewSnapshotOptionsFromID(ctx, vo.BackingSnapshotID, cr, clusterName) if err != nil { return fmt.Errorf("failed to retrieve backing snapshot %s: %w", vo.BackingSnapshotID, err) } @@ -652,6 +654,7 @@ func NewSnapshotOptionsFromID( ctx context.Context, snapID string, cr *util.Credentials, + clusterName string, ) (*VolumeOptions, *core.SnapshotInfo, *SnapshotIdentifier, error) { var ( vi util.CSIIdentifier @@ -723,7 +726,7 @@ func NewSnapshotOptionsFromID( sid.FsSubvolName = imageAttributes.SourceName volOptions.SubVolume.VolID = sid.FsSubvolName - vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID) + vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName) subvolInfo, err := vol.GetSubVolumeInfo(ctx) if err != nil {