cephfs: subvolumes honor --setmetadata option

`--setmetadata` is false by default, honoring it
will keep the metadata disabled by default

Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
This commit is contained in:
Prasanna Kumar Kalever 2022-07-28 17:35:33 +05:30 committed by mergify[bot]
parent cf97e377fa
commit 14d6211d6d
8 changed files with 81 additions and 46 deletions

View File

@ -55,6 +55,9 @@ type ControllerServer struct {
// Cluster name // Cluster name
ClusterName string ClusterName string
// Set metadata on volume
SetMetadata bool
} }
// createBackingVolume creates the backing subvolume and on any error cleans up any created entities. // createBackingVolume creates the backing subvolume and on any error cleans up any created entities.
@ -67,7 +70,7 @@ func (cs *ControllerServer) createBackingVolume(
) error { ) error {
var err error var err error
volClient := core.NewSubVolume(volOptions.GetConnection(), volClient := core.NewSubVolume(volOptions.GetConnection(),
&volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName) &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
if sID != nil { if sID != nil {
return cs.createBackingVolumeFromSnapshotSource(ctx, volOptions, parentVolOpt, volClient, sID) return cs.createBackingVolumeFromSnapshotSource(ctx, volOptions, parentVolOpt, volClient, sID)
@ -101,7 +104,7 @@ func (cs *ControllerServer) createBackingVolumeFromSnapshotSource(
defer cs.OperationLocks.ReleaseRestoreLock(sID.SnapshotID) defer cs.OperationLocks.ReleaseRestoreLock(sID.SnapshotID)
if volOptions.BackingSnapshot { if volOptions.BackingSnapshot {
if err := store.AddSnapshotBackedVolumeRef(ctx, volOptions, cs.ClusterName); err != nil { if err := store.AddSnapshotBackedVolumeRef(ctx, volOptions, cs.ClusterName, cs.SetMetadata); err != nil {
log.ErrorLog(ctx, "failed to create snapshot-backed volume from snapshot %s: %v", log.ErrorLog(ctx, "failed to create snapshot-backed volume from snapshot %s: %v",
sID.FsSnapshotName, err) sID.FsSnapshotName, err)
@ -146,11 +149,10 @@ func (cs *ControllerServer) createBackingVolumeFromVolumeSource(
return nil return nil
} }
func checkContentSource( func (cs *ControllerServer) checkContentSource(
ctx context.Context, ctx context.Context,
req *csi.CreateVolumeRequest, req *csi.CreateVolumeRequest,
cr *util.Credentials, cr *util.Credentials,
clusterName string,
) (*store.VolumeOptions, *store.VolumeIdentifier, *store.SnapshotIdentifier, error) { ) (*store.VolumeOptions, *store.VolumeIdentifier, *store.SnapshotIdentifier, error) {
if req.VolumeContentSource == nil { if req.VolumeContentSource == nil {
return nil, nil, nil, nil return nil, nil, nil, nil
@ -159,7 +161,7 @@ func checkContentSource(
switch volumeSource.Type.(type) { switch volumeSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot: case *csi.VolumeContentSource_Snapshot:
snapshotID := req.VolumeContentSource.GetSnapshot().GetSnapshotId() snapshotID := req.VolumeContentSource.GetSnapshot().GetSnapshotId()
volOpt, _, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, clusterName) volOpt, _, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, cs.ClusterName, cs.SetMetadata)
if err != nil { if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) { if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil, nil, nil, status.Error(codes.NotFound, err.Error()) return nil, nil, nil, status.Error(codes.NotFound, err.Error())
@ -172,7 +174,8 @@ func checkContentSource(
case *csi.VolumeContentSource_Volume: case *csi.VolumeContentSource_Volume:
// Find the volume using the provided VolumeID // Find the volume using the provided VolumeID
volID := req.VolumeContentSource.GetVolume().GetVolumeId() volID := req.VolumeContentSource.GetVolume().GetVolumeId()
parentVol, pvID, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, req.Secrets, clusterName) parentVol, pvID, err := store.NewVolumeOptionsFromVolID(ctx,
volID, nil, req.Secrets, cs.ClusterName, cs.SetMetadata)
if err != nil { if err != nil {
if !errors.Is(err, cerrors.ErrVolumeNotFound) { if !errors.Is(err, cerrors.ErrVolumeNotFound) {
return nil, nil, nil, status.Error(codes.NotFound, err.Error()) return nil, nil, nil, status.Error(codes.NotFound, err.Error())
@ -257,7 +260,7 @@ func (cs *ControllerServer) CreateVolume(
} }
defer cs.VolumeLocks.Release(requestName) defer cs.VolumeLocks.Release(requestName)
volOptions, err := store.NewVolumeOptions(ctx, requestName, cs.ClusterName, req, cr) volOptions, err := store.NewVolumeOptions(ctx, requestName, cs.ClusterName, cs.SetMetadata, req, cr)
if err != nil { if err != nil {
log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err)
@ -269,7 +272,7 @@ func (cs *ControllerServer) CreateVolume(
volOptions.Size = util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes()) volOptions.Size = util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes())
} }
parentVol, pvID, sID, err := checkContentSource(ctx, req, cr, cs.ClusterName) parentVol, pvID, sID, err := cs.checkContentSource(ctx, req, cr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -282,7 +285,7 @@ func (cs *ControllerServer) CreateVolume(
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
vID, err := store.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr, cs.ClusterName) vID, err := store.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr, cs.ClusterName, cs.SetMetadata)
if err != nil { if err != nil {
if cerrors.IsCloneRetryError(err) { if cerrors.IsCloneRetryError(err) {
return nil, status.Error(codes.Aborted, err.Error()) return nil, status.Error(codes.Aborted, err.Error())
@ -295,7 +298,7 @@ func (cs *ControllerServer) CreateVolume(
metadata := k8s.GetVolumeMetadata(req.GetParameters()) metadata := k8s.GetVolumeMetadata(req.GetParameters())
if vID != nil { if vID != nil {
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume,
volOptions.ClusterID, cs.ClusterName) volOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
if sID != nil || pvID != nil && !volOptions.BackingSnapshot { if sID != nil || pvID != nil && !volOptions.BackingSnapshot {
err = volClient.ExpandVolume(ctx, volOptions.Size) err = volClient.ExpandVolume(ctx, volOptions.Size)
if err != nil { if err != nil {
@ -376,7 +379,7 @@ func (cs *ControllerServer) CreateVolume(
} }
volClient := core.NewSubVolume(volOptions.GetConnection(), volClient := core.NewSubVolume(volOptions.GetConnection(),
&volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName) &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
if !volOptions.BackingSnapshot { if !volOptions.BackingSnapshot {
// Get root path for the created subvolume. // Get root path for the created subvolume.
// Note that root path for snapshot-backed volumes has been already set when // Note that root path for snapshot-backed volumes has been already set when
@ -464,7 +467,8 @@ func (cs *ControllerServer) DeleteVolume(
defer cs.OperationLocks.ReleaseDeleteLock(req.GetVolumeId()) defer cs.OperationLocks.ReleaseDeleteLock(req.GetVolumeId())
// Find the volume using the provided VolumeID // Find the volume using the provided VolumeID
volOptions, vID, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets, cs.ClusterName) volOptions, vID, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets,
cs.ClusterName, cs.SetMetadata)
if err != nil { if err != nil {
// if error is ErrPoolNotFound, the pool is already deleted we dont // if error is ErrPoolNotFound, the pool is already deleted we dont
// need to worry about deleting subvolume or omap data, return success // need to worry about deleting subvolume or omap data, return success
@ -519,7 +523,7 @@ func (cs *ControllerServer) DeleteVolume(
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
if err := cleanUpBackingVolume(ctx, volOptions, vID, cr, cs.ClusterName); err != nil { if err := cs.cleanUpBackingVolume(ctx, volOptions, vID, cr); err != nil {
return nil, err return nil, err
} }
@ -532,18 +536,17 @@ func (cs *ControllerServer) DeleteVolume(
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil
} }
func cleanUpBackingVolume( func (cs *ControllerServer) cleanUpBackingVolume(
ctx context.Context, ctx context.Context,
volOptions *store.VolumeOptions, volOptions *store.VolumeOptions,
volID *store.VolumeIdentifier, volID *store.VolumeIdentifier,
cr *util.Credentials, cr *util.Credentials,
clusterName string,
) error { ) error {
if !volOptions.BackingSnapshot { if !volOptions.BackingSnapshot {
// Regular volumes need to be purged. // Regular volumes need to be purged.
volClient := core.NewSubVolume(volOptions.GetConnection(), volClient := core.NewSubVolume(volOptions.GetConnection(),
&volOptions.SubVolume, volOptions.ClusterID, clusterName) &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
if err := volClient.PurgeVolume(ctx, false); err != nil { if err := volClient.PurgeVolume(ctx, false); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", volID, err) log.ErrorLog(ctx, "failed to delete volume %s: %v", volID, err)
if errors.Is(err, cerrors.ErrVolumeHasSnapshots) { if errors.Is(err, cerrors.ErrVolumeHasSnapshots) {
@ -576,7 +579,7 @@ func cleanUpBackingVolume(
} }
snapParentVolOptions, _, snapID, err := store.NewSnapshotOptionsFromID(ctx, snapParentVolOptions, _, snapID, err := store.NewSnapshotOptionsFromID(ctx,
volOptions.BackingSnapshotID, cr, clusterName) volOptions.BackingSnapshotID, cr, cs.ClusterName, cs.SetMetadata)
if err != nil { if err != nil {
absorbErrs := []error{ absorbErrs := []error{
util.ErrPoolNotFound, util.ErrPoolNotFound,
@ -602,7 +605,7 @@ func cleanUpBackingVolume(
snapParentVolOptions.GetConnection(), snapParentVolOptions.GetConnection(),
snapID.FsSnapshotName, snapID.FsSnapshotName,
volOptions.ClusterID, volOptions.ClusterID,
clusterName, cs.ClusterName,
&snapParentVolOptions.SubVolume, &snapParentVolOptions.SubVolume,
) )
@ -671,7 +674,8 @@ func (cs *ControllerServer) ControllerExpandVolume(
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, secret, cs.ClusterName) volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, secret,
cs.ClusterName, cs.SetMetadata)
if err != nil { if err != nil {
log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err) log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err)
@ -686,7 +690,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
RoundOffSize := util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes()) RoundOffSize := util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes())
volClient := core.NewSubVolume(volOptions.GetConnection(), volClient := core.NewSubVolume(volOptions.GetConnection(),
&volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName) &volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
if err = volClient.ResizeVolume(ctx, RoundOffSize); err != nil { if err = volClient.ResizeVolume(ctx, RoundOffSize); err != nil {
log.ErrorLog(ctx, "failed to expand volume %s: %v", fsutil.VolumeID(volIdentifier.FsSubvolName), err) log.ErrorLog(ctx, "failed to expand volume %s: %v", fsutil.VolumeID(volIdentifier.FsSubvolName), err)
@ -740,7 +744,7 @@ func (cs *ControllerServer) CreateSnapshot(
// Find the volume using the provided VolumeID // Find the volume using the provided VolumeID
parentVolOptions, vid, err := store.NewVolumeOptionsFromVolID(ctx, parentVolOptions, vid, err := store.NewVolumeOptionsFromVolID(ctx,
sourceVolID, nil, req.GetSecrets(), cs.ClusterName) sourceVolID, nil, req.GetSecrets(), cs.ClusterName, cs.SetMetadata)
if err != nil { if err != nil {
if errors.Is(err, util.ErrPoolNotFound) { if errors.Is(err, util.ErrPoolNotFound) {
log.WarningLog(ctx, "failed to get backend volume for %s: %v", sourceVolID, err) log.WarningLog(ctx, "failed to get backend volume for %s: %v", sourceVolID, err)
@ -792,10 +796,8 @@ func (cs *ControllerServer) CreateSnapshot(
// request to create snapshot. // request to create snapshot.
// TODO: For this purpose we could make use of cached clusterAdditionalInfo // TODO: For this purpose we could make use of cached clusterAdditionalInfo
// too. // too.
volClient := core.NewSubVolume(parentVolOptions.GetConnection(), volClient := core.NewSubVolume(parentVolOptions.GetConnection(), &parentVolOptions.SubVolume,
&parentVolOptions.SubVolume, parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
parentVolOptions.ClusterID,
cs.ClusterName)
info, err := volClient.GetSubVolumeInfo(ctx) info, err := volClient.GetSubVolumeInfo(ctx)
if err != nil { if err != nil {
// Check error code value against ErrInvalidCommand to understand the cluster // Check error code value against ErrInvalidCommand to understand the cluster
@ -998,7 +1000,7 @@ func (cs *ControllerServer) DeleteSnapshot(
} }
defer cs.OperationLocks.ReleaseDeleteLock(snapshotID) defer cs.OperationLocks.ReleaseDeleteLock(snapshotID)
volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, cs.ClusterName) volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, cs.ClusterName, cs.SetMetadata)
if err != nil { if err != nil {
switch { switch {
case errors.Is(err, util.ErrPoolNotFound): case errors.Is(err, util.ErrPoolNotFound):

View File

@ -93,6 +93,10 @@ func (s *subVolumeClient) removeMetadata(key string) error {
// SetAllMetadata set all the metadata from arg parameters on Ssubvolume. // SetAllMetadata set all the metadata from arg parameters on Ssubvolume.
func (s *subVolumeClient) SetAllMetadata(parameters map[string]string) error { func (s *subVolumeClient) SetAllMetadata(parameters map[string]string) error {
if !s.enableMetadata {
return nil
}
for k, v := range parameters { for k, v := range parameters {
err := s.setMetadata(k, v) err := s.setMetadata(k, v)
if err != nil { if err != nil {
@ -113,6 +117,10 @@ func (s *subVolumeClient) SetAllMetadata(parameters map[string]string) error {
// UnsetAllMetadata unset all the metadata from arg keys on subvolume. // UnsetAllMetadata unset all the metadata from arg keys on subvolume.
func (s *subVolumeClient) UnsetAllMetadata(keys []string) error { func (s *subVolumeClient) UnsetAllMetadata(keys []string) error {
if !s.enableMetadata {
return nil
}
for _, key := range keys { for _, key := range keys {
err := s.removeMetadata(key) err := s.removeMetadata(key)
// TODO: replace string comparison with errno. // TODO: replace string comparison with errno.

View File

@ -84,6 +84,7 @@ type subVolumeClient struct {
*SubVolume // Embedded SubVolume struct. *SubVolume // Embedded SubVolume struct.
clusterID string // Cluster ID to check subvolumegroup and resize functionality. clusterID string // Cluster ID to check subvolumegroup and resize functionality.
clusterName string // Cluster name clusterName string // Cluster name
enableMetadata bool // Set metadata on volume
conn *util.ClusterConnection // Cluster connection. conn *util.ClusterConnection // Cluster connection.
} }
@ -98,11 +99,18 @@ type SubVolume struct {
} }
// NewSubVolume returns a new subvolume client. // NewSubVolume returns a new subvolume client.
func NewSubVolume(conn *util.ClusterConnection, vol *SubVolume, clusterID, clusterName string) SubVolumeClient { func NewSubVolume(
conn *util.ClusterConnection,
vol *SubVolume,
clusterID,
clusterName string,
setMetadata bool,
) SubVolumeClient {
return &subVolumeClient{ return &subVolumeClient{
SubVolume: vol, SubVolume: vol,
clusterID: clusterID, clusterID: clusterID,
clusterName: clusterName, clusterName: clusterName,
enableMetadata: setMetadata,
conn: conn, conn: conn,
} }
} }

View File

@ -136,6 +136,7 @@ func (fs *Driver) Run(conf *util.Config) {
if conf.IsControllerServer { if conf.IsControllerServer {
fs.cs = NewControllerServer(fs.cd) fs.cs = NewControllerServer(fs.cd)
fs.cs.ClusterName = conf.ClusterName fs.cs.ClusterName = conf.ClusterName
fs.cs.SetMetadata = conf.SetMetadata
} }
if !conf.IsControllerServer && !conf.IsNodeServer { if !conf.IsControllerServer && !conf.IsNodeServer {
topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName)

View File

@ -82,7 +82,7 @@ func (ns *NodeServer) getVolumeOptions(
volContext, volContext,
volSecrets map[string]string, volSecrets map[string]string,
) (*store.VolumeOptions, error) { ) (*store.VolumeOptions, error) {
volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), volContext, volSecrets, "") volOptions, _, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), volContext, volSecrets, "", false)
if err != nil { if err != nil {
if !errors.Is(err, cerrors.ErrInvalidVolID) { if !errors.Is(err, cerrors.ErrInvalidVolID) {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())

View File

@ -35,6 +35,7 @@ func AddSnapshotBackedVolumeRef(
ctx context.Context, ctx context.Context,
volOptions *VolumeOptions, volOptions *VolumeOptions,
clusterName string, clusterName string,
setMetadata bool,
) error { ) error {
ioctx, err := volOptions.conn.GetIoctx(volOptions.MetadataPool) ioctx, err := volOptions.conn.GetIoctx(volOptions.MetadataPool)
if err != nil { if err != nil {
@ -96,7 +97,8 @@ func AddSnapshotBackedVolumeRef(
// There may have been a race between adding a ref to the reftracker and // There may have been a race between adding a ref to the reftracker and
// deleting the backing snapshot. Make sure the snapshot still exists by // deleting the backing snapshot. Make sure the snapshot still exists by
// trying to retrieve it again. // trying to retrieve it again.
_, _, _, err = NewSnapshotOptionsFromID(ctx, volOptions.BackingSnapshotID, volOptions.conn.Creds, clusterName) _, _, _, err = NewSnapshotOptionsFromID(ctx,
volOptions.BackingSnapshotID, volOptions.conn.Creds, clusterName, setMetadata)
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed to get backing snapshot %s: %v", volOptions.BackingSnapshotID, err) log.ErrorLog(ctx, "failed to get backing snapshot %s: %v", volOptions.BackingSnapshotID, err)
} }

View File

@ -79,6 +79,7 @@ func CheckVolExists(ctx context.Context,
sID *SnapshotIdentifier, sID *SnapshotIdentifier,
cr *util.Credentials, cr *util.Credentials,
clusterName string, clusterName string,
setMetadata bool,
) (*VolumeIdentifier, error) { ) (*VolumeIdentifier, error) {
var vid VolumeIdentifier var vid VolumeIdentifier
// Connect to cephfs' default radosNamespace (csi) // Connect to cephfs' default radosNamespace (csi)
@ -100,7 +101,7 @@ func CheckVolExists(ctx context.Context,
vid.FsSubvolName = imageData.ImageAttributes.ImageName vid.FsSubvolName = imageData.ImageAttributes.ImageName
volOptions.VolID = vid.FsSubvolName volOptions.VolID = vid.FsSubvolName
vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName) vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName, setMetadata)
if (sID != nil || pvID != nil) && imageData.ImageAttributes.BackingSnapshotID == "" { if (sID != nil || pvID != nil) && imageData.ImageAttributes.BackingSnapshotID == "" {
cloneState, cloneStateErr := vol.GetCloneState(ctx) cloneState, cloneStateErr := vol.GetCloneState(ctx)
if cloneStateErr != nil { if cloneStateErr != nil {

View File

@ -196,7 +196,12 @@ func fmtBackingSnapshotOptionMismatch(optName, expected, actual string) error {
// NewVolumeOptions generates a new instance of volumeOptions from the provided // NewVolumeOptions generates a new instance of volumeOptions from the provided
// CSI request parameters. // CSI request parameters.
func NewVolumeOptions(ctx context.Context, requestName, clusterName string, req *csi.CreateVolumeRequest, func NewVolumeOptions(
ctx context.Context,
requestName,
clusterName string,
setMetadata bool,
req *csi.CreateVolumeRequest,
cr *util.Credentials, cr *util.Credentials,
) (*VolumeOptions, error) { ) (*VolumeOptions, error) {
var ( var (
@ -289,7 +294,7 @@ func NewVolumeOptions(ctx context.Context, requestName, clusterName string, req
opts.BackingSnapshotID = req.GetVolumeContentSource().GetSnapshot().GetSnapshotId() opts.BackingSnapshotID = req.GetVolumeContentSource().GetSnapshot().GetSnapshotId()
err = opts.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName) err = opts.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName, setMetadata)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -305,6 +310,7 @@ func NewVolumeOptionsFromVolID(
volID string, volID string,
volOpt, secrets map[string]string, volOpt, secrets map[string]string,
clusterName string, clusterName string,
setMetadata bool,
) (*VolumeOptions, *VolumeIdentifier, error) { ) (*VolumeOptions, *VolumeIdentifier, error) {
var ( var (
vi util.CSIIdentifier vi util.CSIIdentifier
@ -408,16 +414,20 @@ func NewVolumeOptionsFromVolID(
volOptions.SubVolume.VolID = vid.FsSubvolName volOptions.SubVolume.VolID = vid.FsSubvolName
if volOptions.BackingSnapshot { if volOptions.BackingSnapshot {
err = volOptions.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName) err = volOptions.populateVolumeOptionsFromBackingSnapshot(ctx, cr, clusterName, setMetadata)
} else { } else {
err = volOptions.populateVolumeOptionsFromSubvolume(ctx, clusterName) err = volOptions.populateVolumeOptionsFromSubvolume(ctx, clusterName, setMetadata)
} }
return &volOptions, &vid, err return &volOptions, &vid, err
} }
func (vo *VolumeOptions) populateVolumeOptionsFromSubvolume(ctx context.Context, clusterName string) error { func (vo *VolumeOptions) populateVolumeOptionsFromSubvolume(
vol := core.NewSubVolume(vo.conn, &vo.SubVolume, vo.ClusterID, clusterName) ctx context.Context,
clusterName string,
setMetadata bool,
) error {
vol := core.NewSubVolume(vo.conn, &vo.SubVolume, vo.ClusterID, clusterName, setMetadata)
var info *core.Subvolume var info *core.Subvolume
info, err := vol.GetSubVolumeInfo(ctx) info, err := vol.GetSubVolumeInfo(ctx)
@ -438,6 +448,7 @@ func (vo *VolumeOptions) populateVolumeOptionsFromBackingSnapshot(
ctx context.Context, ctx context.Context,
cr *util.Credentials, cr *util.Credentials,
clusterName string, clusterName string,
setMetadata bool,
) error { ) error {
// As of CephFS snapshot v2 API, snapshots may be found in two locations: // As of CephFS snapshot v2 API, snapshots may be found in two locations:
// //
@ -459,7 +470,8 @@ func (vo *VolumeOptions) populateVolumeOptionsFromBackingSnapshot(
return nil return nil
} }
parentBackingSnapVolOpts, _, snapID, err := NewSnapshotOptionsFromID(ctx, vo.BackingSnapshotID, cr, clusterName) parentBackingSnapVolOpts, _, snapID, err := NewSnapshotOptionsFromID(ctx,
vo.BackingSnapshotID, cr, clusterName, setMetadata)
if err != nil { if err != nil {
return fmt.Errorf("failed to retrieve backing snapshot %s: %w", vo.BackingSnapshotID, err) return fmt.Errorf("failed to retrieve backing snapshot %s: %w", vo.BackingSnapshotID, err)
} }
@ -655,6 +667,7 @@ func NewSnapshotOptionsFromID(
snapID string, snapID string,
cr *util.Credentials, cr *util.Credentials,
clusterName string, clusterName string,
setMetadata bool,
) (*VolumeOptions, *core.SnapshotInfo, *SnapshotIdentifier, error) { ) (*VolumeOptions, *core.SnapshotInfo, *SnapshotIdentifier, error) {
var ( var (
vi util.CSIIdentifier vi util.CSIIdentifier
@ -726,7 +739,7 @@ func NewSnapshotOptionsFromID(
sid.FsSubvolName = imageAttributes.SourceName sid.FsSubvolName = imageAttributes.SourceName
volOptions.SubVolume.VolID = sid.FsSubvolName volOptions.SubVolume.VolID = sid.FsSubvolName
vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName) vol := core.NewSubVolume(volOptions.conn, &volOptions.SubVolume, volOptions.ClusterID, clusterName, setMetadata)
subvolInfo, err := vol.GetSubVolumeInfo(ctx) subvolInfo, err := vol.GetSubVolumeInfo(ctx)
if err != nil { if err != nil {