diff --git a/internal/rbd/group/util.go b/internal/rbd/group/util.go index 6c4e56d5d..bbdec576e 100644 --- a/internal/rbd/group/util.go +++ b/internal/rbd/group/util.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "strconv" "time" "github.com/ceph/go-ceph/rados" @@ -63,18 +64,8 @@ type commonVolumeGroup struct { journal journal.VolumeGroupJournal } -func (cvg *commonVolumeGroup) initCommonVolumeGroup( - ctx context.Context, - id string, - csiDriver string, - creds *util.Credentials, -) error { - csiID := util.CSIIdentifier{} - err := csiID.DecomposeCSIID(id) - if err != nil { - return fmt.Errorf("failed to decompose volume group id %q: %w", id, err) - } - +// generateVolumeGroup generates a commonVolumeGroup structure from the volumeGroup identifier. +func (cvg *commonVolumeGroup) generateVolumeGroup(csiID util.CSIIdentifier) error { mons, err := util.Mons(util.CsiConfigFile, csiID.ClusterID) if err != nil { return fmt.Errorf("failed to get MONs for cluster id %q: %w", csiID.ClusterID, err) @@ -85,9 +76,80 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup( return fmt.Errorf("failed to get RADOS namespace for cluster id %q: %w", csiID.ClusterID, err) } - pool, err := util.GetPoolName(mons, creds, csiID.LocationID) + pool, err := util.GetPoolName(mons, cvg.credentials, csiID.LocationID) if err != nil { - return fmt.Errorf("failed to get pool for volume group id %q: %w", id, err) + return fmt.Errorf("failed to get pool for volume group id %q: %w", cvg.id, err) + } + + cvg.monitors = mons + cvg.namespace = namespace + cvg.pool = pool + + return nil +} + +// generateVolumeGroupFromMapping checks the clusterID and poolID mapping and +// generates commonVolumeGroup structure for the mapped clusterID and poolID. +func (cvg *commonVolumeGroup) generateVolumeGroupFromMapping( + ctx context.Context, + csiID util.CSIIdentifier, + mapping *[]util.ClusterMappingInfo, +) error { + mcsiID := csiID + existingClusterID := csiID.ClusterID + existingPoolID := strconv.FormatInt(csiID.LocationID, 10) + + for _, cm := range *mapping { + for key, val := range cm.ClusterIDMapping { + mappedClusterID := util.GetMappedID(key, val, csiID.ClusterID) + if mappedClusterID == "" { + continue + } + + log.DebugLog(ctx, + "found new clusterID mapping %s for existing clusterID %s", mappedClusterID, existingClusterID) + + // Add mapped clusterID to Identifier + mcsiID.ClusterID = mappedClusterID + for _, pools := range cm.RBDpoolIDMappingInfo { + for key, val := range pools { + mappedPoolID := util.GetMappedID(key, val, existingPoolID) + if mappedPoolID == "" { + continue + } + log.DebugLog(ctx, + "found new poolID mapping %s for existing poolID %s", mappedPoolID, existingPoolID) + + mPID, err := strconv.ParseInt(mappedPoolID, 10, 64) + if err != nil { + return err + } + mcsiID.LocationID = mPID + err = cvg.generateVolumeGroup(mcsiID) + if util.ShouldRetryVolumeGeneration(err) { + continue + } + + return err + } + } + } + } + + return util.ErrPoolNotFound +} + +func (cvg *commonVolumeGroup) initCommonVolumeGroup( + ctx context.Context, + id string, + csiDriver string, + creds *util.Credentials, +) error { + csiID := util.CSIIdentifier{} + + err := csiID.DecomposeCSIID(id) + if err != nil { + return fmt.Errorf("failed to decompose volume group id %q: %w", id, err) } cvg.csiDriver = csiDriver @@ -95,9 +157,28 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup( cvg.id = id cvg.clusterID = csiID.ClusterID cvg.objectUUID = csiID.ObjectUUID - cvg.monitors = mons - cvg.pool = pool - cvg.namespace = namespace + // cvg.monitors, cvg.namespace, cvg.pool are set in generateVolumeGroup + + err = cvg.generateVolumeGroup(csiID) + // If the error is not a retryable error, return from here. + if err != nil && !util.ShouldRetryVolumeGeneration(err) { + return err + } + + // If the error is a retryable error, we should try to get the cluster mapping + // and generate the volume group from the mapping. + if util.ShouldRetryVolumeGeneration(err) { + mapping, err := util.GetClusterMappingInfo(csiID.ClusterID) + if err != nil { + return err + } + if mapping != nil { + err = cvg.generateVolumeGroupFromMapping(ctx, csiID, mapping) + if err != nil { + return err + } + } + } log.DebugLog(ctx, "object for volume group %q has been initialized", cvg.id)