mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-02-21 09:59:29 +00:00
rbd: get volumegroup in secondary cluster
Currently, `GetVolumeGroup()` fetches the RBD group from the pool using the clusterID & poolID encoded in the VolumeGroupHandle. However, this approach may fail in a secondary mirrored cluster, where the clusterID & poolID could differ. This commit ensures that `GetVolumeGroup` leverages the clusterIDMapping and RBDPoolIDMapping to locate the RBD group in the appropriate pool if it is not found in the pool corresponding to the poolID encoded in the VolumeGroupHandle. Signed-off-by: Praveen M <m.praveen@ibm.com>
This commit is contained in:
parent
cbd73f296d
commit
e4d41c42d6
@ -20,6 +20,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/ceph/go-ceph/rados"
|
||||
@ -63,18 +64,8 @@ type commonVolumeGroup struct {
|
||||
journal journal.VolumeGroupJournal
|
||||
}
|
||||
|
||||
func (cvg *commonVolumeGroup) initCommonVolumeGroup(
|
||||
ctx context.Context,
|
||||
id string,
|
||||
csiDriver string,
|
||||
creds *util.Credentials,
|
||||
) error {
|
||||
csiID := util.CSIIdentifier{}
|
||||
err := csiID.DecomposeCSIID(id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
|
||||
}
|
||||
|
||||
// generateVolumeGroup generates a commonVolumeGroup structure from the volumeGroup identifier.
|
||||
func (cvg *commonVolumeGroup) generateVolumeGroup(csiID util.CSIIdentifier) error {
|
||||
mons, err := util.Mons(util.CsiConfigFile, csiID.ClusterID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get MONs for cluster id %q: %w", csiID.ClusterID, err)
|
||||
@ -85,9 +76,80 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
|
||||
return fmt.Errorf("failed to get RADOS namespace for cluster id %q: %w", csiID.ClusterID, err)
|
||||
}
|
||||
|
||||
pool, err := util.GetPoolName(mons, creds, csiID.LocationID)
|
||||
pool, err := util.GetPoolName(mons, cvg.credentials, csiID.LocationID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get pool for volume group id %q: %w", id, err)
|
||||
return fmt.Errorf("failed to get pool for volume group id %q: %w", cvg.id, err)
|
||||
}
|
||||
|
||||
cvg.monitors = mons
|
||||
cvg.namespace = namespace
|
||||
cvg.pool = pool
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// generateVolumeGroupFromMapping checks the clusterID and poolID mapping and
|
||||
// generates commonVolumeGroup structure for the mapped clusterID and poolID.
|
||||
func (cvg *commonVolumeGroup) generateVolumeGroupFromMapping(
|
||||
ctx context.Context,
|
||||
csiID util.CSIIdentifier,
|
||||
mapping *[]util.ClusterMappingInfo,
|
||||
) error {
|
||||
mcsiID := csiID
|
||||
existingClusterID := csiID.ClusterID
|
||||
existingPoolID := strconv.FormatInt(csiID.LocationID, 10)
|
||||
|
||||
for _, cm := range *mapping {
|
||||
for key, val := range cm.ClusterIDMapping {
|
||||
mappedClusterID := util.GetMappedID(key, val, csiID.ClusterID)
|
||||
if mappedClusterID == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
log.DebugLog(ctx,
|
||||
"found new clusterID mapping %s for existing clusterID %s", mappedClusterID, existingClusterID)
|
||||
|
||||
// Add mapped clusterID to Identifier
|
||||
mcsiID.ClusterID = mappedClusterID
|
||||
for _, pools := range cm.RBDpoolIDMappingInfo {
|
||||
for key, val := range pools {
|
||||
mappedPoolID := util.GetMappedID(key, val, existingPoolID)
|
||||
if mappedPoolID == "" {
|
||||
continue
|
||||
}
|
||||
log.DebugLog(ctx,
|
||||
"found new poolID mapping %s for existing poolID %s", mappedPoolID, existingPoolID)
|
||||
|
||||
mPID, err := strconv.ParseInt(mappedPoolID, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mcsiID.LocationID = mPID
|
||||
err = cvg.generateVolumeGroup(mcsiID)
|
||||
if util.ShouldRetryVolumeGeneration(err) {
|
||||
continue
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return util.ErrPoolNotFound
|
||||
}
|
||||
|
||||
func (cvg *commonVolumeGroup) initCommonVolumeGroup(
|
||||
ctx context.Context,
|
||||
id string,
|
||||
csiDriver string,
|
||||
creds *util.Credentials,
|
||||
) error {
|
||||
csiID := util.CSIIdentifier{}
|
||||
|
||||
err := csiID.DecomposeCSIID(id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
|
||||
}
|
||||
|
||||
cvg.csiDriver = csiDriver
|
||||
@ -95,9 +157,28 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
|
||||
cvg.id = id
|
||||
cvg.clusterID = csiID.ClusterID
|
||||
cvg.objectUUID = csiID.ObjectUUID
|
||||
cvg.monitors = mons
|
||||
cvg.pool = pool
|
||||
cvg.namespace = namespace
|
||||
// cvg.monitors, cvg.namespace, cvg.pool are set in generateVolumeGroup
|
||||
|
||||
err = cvg.generateVolumeGroup(csiID)
|
||||
// If the error is not a retryable error, return from here.
|
||||
if err != nil && !util.ShouldRetryVolumeGeneration(err) {
|
||||
return err
|
||||
}
|
||||
|
||||
// If the error is a retryable error, we should try to get the cluster mapping
|
||||
// and generate the volume group from the mapping.
|
||||
if util.ShouldRetryVolumeGeneration(err) {
|
||||
mapping, err := util.GetClusterMappingInfo(csiID.ClusterID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if mapping != nil {
|
||||
err = cvg.generateVolumeGroupFromMapping(ctx, csiID, mapping)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
log.DebugLog(ctx, "object for volume group %q has been initialized", cvg.id)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user