rbd: check for volume group existence

Signed-off-by: Praveen M <m.praveen@ibm.com>
This commit is contained in:
Praveen M 2025-03-03 13:49:08 +05:30
parent c1b3325677
commit c429894704
2 changed files with 50 additions and 10 deletions

View File

@ -91,11 +91,18 @@ func (cvg *commonVolumeGroup) generateVolumeGroup(csiID util.CSIIdentifier) erro
// generateVolumeGroupFromMapping checks the clusterID and poolID mapping and
// generates commonVolumeGroup structure for the mapped clusterID and poolID.
// If the mapping is not found, it returns ErrRBDGroupNotFound.
func (cvg *commonVolumeGroup) generateVolumeGroupFromMapping(
ctx context.Context,
csiID util.CSIIdentifier,
mapping *[]util.ClusterMappingInfo,
) error {
var (
volumeGroupGenerationError error
volumeGroupExistsError error
volumeGroupExists bool
)
mcsiID := csiID
existingClusterID := csiID.ClusterID
existingPoolID := strconv.FormatInt(csiID.LocationID, 10)
@ -126,18 +133,27 @@ func (cvg *commonVolumeGroup) generateVolumeGroupFromMapping(
return err
}
mcsiID.LocationID = mPID
err = cvg.generateVolumeGroup(mcsiID)
if ShouldRetryVolumeGroupGeneration(err) {
volumeGroupGenerationError = cvg.generateVolumeGroup(mcsiID)
if volumeGroupGenerationError == nil {
// Check if the volume group exists in the journal.
volumeGroupExists, volumeGroupExistsError = cvg.Exists(ctx)
if !errors.Is(volumeGroupExistsError, rbd_errors.ErrRBDGroupNotFound) {
return volumeGroupExistsError
}
}
// If Volume Group doesn't exists or the error is a retryable error,
// we should try to get the cluster mapping and generate the volume group from the mapping.
if !volumeGroupExists || ShouldRetryVolumeGroupGeneration(volumeGroupGenerationError) {
continue
}
return err
return volumeGroupGenerationError
}
}
}
}
return util.ErrPoolNotFound
return volumeGroupGenerationError
}
func (cvg *commonVolumeGroup) initCommonVolumeGroup(
@ -146,6 +162,11 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
csiDriver string,
creds *util.Credentials,
) error {
var (
volumeGroupGenerationError error
volumeGroupExistsError error
volumeGroupExists bool
)
csiID := util.CSIIdentifier{}
err := csiID.DecomposeCSIID(id)
@ -160,15 +181,21 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
cvg.objectUUID = csiID.ObjectUUID
// cvg.monitors, cvg.namespace, cvg.pool are set in generateVolumeGroup
err = cvg.generateVolumeGroup(csiID)
volumeGroupGenerationError = cvg.generateVolumeGroup(csiID)
// If the error is not a retryable error, return from here.
if err != nil && !ShouldRetryVolumeGroupGeneration(err) {
return err
if volumeGroupGenerationError != nil && !ShouldRetryVolumeGroupGeneration(volumeGroupGenerationError) {
return volumeGroupGenerationError
} else {
// Check if the volume group exists in the journal.
volumeGroupExists, volumeGroupExistsError = cvg.Exists(ctx)
if !errors.Is(volumeGroupExistsError, rbd_errors.ErrRBDGroupNotFound) {
return volumeGroupExistsError
}
}
// If the error is a retryable error, we should try to get the cluster mapping
// and generate the volume group from the mapping.
if ShouldRetryVolumeGroupGeneration(err) {
// If Volume Group doesn't exists or the error is a retryable error,
// we should try to get the cluster mapping and generate the volume group from the mapping.
if !volumeGroupExists || ShouldRetryVolumeGroupGeneration(volumeGroupGenerationError) {
mapping, err := util.GetClusterMappingInfo(csiID.ClusterID)
if err != nil {
return err
@ -443,3 +470,13 @@ func ShouldRetryVolumeGroupGeneration(err error) bool {
errors.Is(err, rbd_errors.ErrRBDGroupNotFound) ||
errors.Is(err, rados.ErrPermissionDenied))
}
// Exists checks if the volume group exists in the journal.
func (cvg *commonVolumeGroup) Exists(ctx context.Context) (bool, error) {
_, err := cvg.getVolumeGroupAttributes(ctx)
if err != nil {
return false, err
}
return true, nil
}

View File

@ -73,4 +73,7 @@ type VolumeGroup interface {
// The Snapshots are crash consistent, and created as a consistency
// group.
CreateSnapshots(ctx context.Context, cr *util.Credentials, name string) ([]Snapshot, error)
// Exists checks if the volume group exists in the journal.
Exists(ctx context.Context) (bool, error)
}