cephfs: set cluster Name as metadata on the subvolume

This change helps read the cluster name from the cmdline args,
the provisioner will set the same on the subvolume.

Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
This commit is contained in:
Prasanna Kumar Kalever
2022-06-14 18:53:29 +05:30
committed by mergify[bot]
parent 25ce21f496
commit 21d811096b
8 changed files with 72 additions and 37 deletions

View File

@ -52,6 +52,9 @@ type ControllerServer struct {
// A map storing all volumes/snapshots with ongoing operations.
OperationLocks *util.OperationLock
// Cluster name
ClusterName string
}
// createBackingVolume creates the backing subvolume and on any error cleans up any created entities.
@ -63,7 +66,8 @@ func (cs *ControllerServer) createBackingVolume(
sID *store.SnapshotIdentifier,
) error {
var err error
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
volClient := core.NewSubVolume(volOptions.GetConnection(),
&volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName)
if sID != nil {
return cs.createBackingVolumeFromSnapshotSource(ctx, volOptions, parentVolOpt, volClient, sID)
@ -97,7 +101,7 @@ func (cs *ControllerServer) createBackingVolumeFromSnapshotSource(
defer cs.OperationLocks.ReleaseRestoreLock(sID.SnapshotID)
if volOptions.BackingSnapshot {
if err := store.AddSnapshotBackedVolumeRef(ctx, volOptions); err != nil {
if err := store.AddSnapshotBackedVolumeRef(ctx, volOptions, cs.ClusterName); err != nil {
log.ErrorLog(ctx, "failed to create snapshot-backed volume from snapshot %s: %v",
sID.FsSnapshotName, err)
@ -146,6 +150,7 @@ func checkContentSource(
ctx context.Context,
req *csi.CreateVolumeRequest,
cr *util.Credentials,
clusterName string,
) (*store.VolumeOptions, *store.VolumeIdentifier, *store.SnapshotIdentifier, error) {
if req.VolumeContentSource == nil {
return nil, nil, nil, nil
@ -154,7 +159,7 @@ func checkContentSource(
switch volumeSource.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
snapshotID := req.VolumeContentSource.GetSnapshot().GetSnapshotId()
volOpt, _, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr)
volOpt, _, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, clusterName)
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil, nil, nil, status.Error(codes.NotFound, err.Error())
@ -167,7 +172,7 @@ func checkContentSource(
case *csi.VolumeContentSource_Volume:
// Find the volume using the provided VolumeID
volID := req.VolumeContentSource.GetVolume().GetVolumeId()
parentVol, pvID, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, req.Secrets)
parentVol, pvID, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, req.Secrets, clusterName)
if err != nil {
if !errors.Is(err, cerrors.ErrVolumeNotFound) {
return nil, nil, nil, status.Error(codes.NotFound, err.Error())
@ -252,7 +257,7 @@ func (cs *ControllerServer) CreateVolume(
}
defer cs.VolumeLocks.Release(requestName)
volOptions, err := store.NewVolumeOptions(ctx, requestName, req, cr)
volOptions, err := store.NewVolumeOptions(ctx, requestName, cs.ClusterName, req, cr)
if err != nil {
log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err)
@ -264,7 +269,7 @@ func (cs *ControllerServer) CreateVolume(
volOptions.Size = util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes())
}
parentVol, pvID, sID, err := checkContentSource(ctx, req, cr)
parentVol, pvID, sID, err := checkContentSource(ctx, req, cr, cs.ClusterName)
if err != nil {
return nil, err
}
@ -277,7 +282,7 @@ func (cs *ControllerServer) CreateVolume(
return nil, status.Error(codes.InvalidArgument, err.Error())
}
vID, err := store.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr)
vID, err := store.CheckVolExists(ctx, volOptions, parentVol, pvID, sID, cr, cs.ClusterName)
if err != nil {
if cerrors.IsCloneRetryError(err) {
return nil, status.Error(codes.Aborted, err.Error())
@ -289,7 +294,8 @@ func (cs *ControllerServer) CreateVolume(
metadata := k8s.GetVolumeMetadata(req.GetParameters())
if vID != nil {
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume,
volOptions.ClusterID, cs.ClusterName)
if sID != nil || pvID != nil && !volOptions.BackingSnapshot {
err = volClient.ExpandVolume(ctx, volOptions.Size)
if err != nil {
@ -369,12 +375,13 @@ func (cs *ControllerServer) CreateVolume(
return nil, err
}
volClient := core.NewSubVolume(volOptions.GetConnection(),
&volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName)
if !volOptions.BackingSnapshot {
// Get root path for the created subvolume.
// Note that root path for snapshot-backed volumes has been already set when
// building VolumeOptions.
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
volOptions.RootPath, err = volClient.GetVolumeRootPathCeph(ctx)
if err != nil {
purgeErr := volClient.PurgeVolume(ctx, true)
@ -457,7 +464,7 @@ func (cs *ControllerServer) DeleteVolume(
defer cs.OperationLocks.ReleaseDeleteLock(req.GetVolumeId())
// Find the volume using the provided VolumeID
volOptions, vID, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets)
volOptions, vID, err := store.NewVolumeOptionsFromVolID(ctx, string(volID), nil, secrets, cs.ClusterName)
if err != nil {
// if error is ErrPoolNotFound, the pool is already deleted we dont
// need to worry about deleting subvolume or omap data, return success
@ -512,7 +519,7 @@ func (cs *ControllerServer) DeleteVolume(
}
defer cr.DeleteCredentials()
if err := cleanUpBackingVolume(ctx, volOptions, vID, cr); err != nil {
if err := cleanUpBackingVolume(ctx, volOptions, vID, cr, cs.ClusterName); err != nil {
return nil, err
}
@ -530,11 +537,13 @@ func cleanUpBackingVolume(
volOptions *store.VolumeOptions,
volID *store.VolumeIdentifier,
cr *util.Credentials,
clusterName string,
) error {
if !volOptions.BackingSnapshot {
// Regular volumes need to be purged.
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
volClient := core.NewSubVolume(volOptions.GetConnection(),
&volOptions.SubVolume, volOptions.ClusterID, clusterName)
if err := volClient.PurgeVolume(ctx, false); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", volID, err)
if errors.Is(err, cerrors.ErrVolumeHasSnapshots) {
@ -566,7 +575,8 @@ func cleanUpBackingVolume(
return nil
}
snapParentVolOptions, _, snapID, err := store.NewSnapshotOptionsFromID(ctx, volOptions.BackingSnapshotID, cr)
snapParentVolOptions, _, snapID, err := store.NewSnapshotOptionsFromID(ctx,
volOptions.BackingSnapshotID, cr, clusterName)
if err != nil {
absorbErrs := []error{
util.ErrPoolNotFound,
@ -659,7 +669,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
}
defer cr.DeleteCredentials()
volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, secret)
volOptions, volIdentifier, err := store.NewVolumeOptionsFromVolID(ctx, volID, nil, secret, cs.ClusterName)
if err != nil {
log.ErrorLog(ctx, "validation and extraction of volume options failed: %v", err)
@ -673,7 +683,8 @@ func (cs *ControllerServer) ControllerExpandVolume(
RoundOffSize := util.RoundOffCephFSVolSize(req.GetCapacityRange().GetRequiredBytes())
volClient := core.NewSubVolume(volOptions.GetConnection(), &volOptions.SubVolume, volOptions.ClusterID)
volClient := core.NewSubVolume(volOptions.GetConnection(),
&volOptions.SubVolume, volOptions.ClusterID, cs.ClusterName)
if err = volClient.ResizeVolume(ctx, RoundOffSize); err != nil {
log.ErrorLog(ctx, "failed to expand volume %s: %v", fsutil.VolumeID(volIdentifier.FsSubvolName), err)
@ -726,7 +737,8 @@ func (cs *ControllerServer) CreateSnapshot(
defer cs.OperationLocks.ReleaseSnapshotCreateLock(sourceVolID)
// Find the volume using the provided VolumeID
parentVolOptions, vid, err := store.NewVolumeOptionsFromVolID(ctx, sourceVolID, nil, req.GetSecrets())
parentVolOptions, vid, err := store.NewVolumeOptionsFromVolID(ctx,
sourceVolID, nil, req.GetSecrets(), cs.ClusterName)
if err != nil {
if errors.Is(err, util.ErrPoolNotFound) {
log.WarningLog(ctx, "failed to get backend volume for %s: %v", sourceVolID, err)
@ -780,7 +792,8 @@ func (cs *ControllerServer) CreateSnapshot(
// too.
volClient := core.NewSubVolume(parentVolOptions.GetConnection(),
&parentVolOptions.SubVolume,
parentVolOptions.ClusterID)
parentVolOptions.ClusterID,
cs.ClusterName)
info, err := volClient.GetSubVolumeInfo(ctx)
if err != nil {
// Check error code value against ErrInvalidCommand to understand the cluster
@ -806,7 +819,8 @@ func (cs *ControllerServer) CreateSnapshot(
// check snapshot is protected
protected := true
if !(snapInfo.Protected == core.SnapshotIsProtected) {
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName, &parentVolOptions.SubVolume)
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(),
sid.FsSnapshotName, &parentVolOptions.SubVolume)
err = snapClient.ProtectSnapshot(ctx)
if err != nil {
protected = false
@ -956,7 +970,7 @@ func (cs *ControllerServer) DeleteSnapshot(
}
defer cs.OperationLocks.ReleaseDeleteLock(snapshotID)
volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr)
volOpt, snapInfo, sid, err := store.NewSnapshotOptionsFromID(ctx, snapshotID, cr, cs.ClusterName)
if err != nil {
switch {
case errors.Is(err, util.ErrPoolNotFound):