rbd: read clusterID and PoolID from mapping

Whenever Ceph-CSI receives a CSI/Replication
request it will first decode the
volumeHandle and try to get the required
OMAP details if it is not able to
retrieve, receives a `Not Found` error
message and Ceph-CSI will check for the
clusterID mapping. If the old volumeID
`0001-00013-site1-storage-0000000000000001
-b0285c97-a0ce-11eb-8c66-0242ac110002`
contains the `site1-storage` as the clusterID,
now Ceph-CSI will look for the corresponding
clusterID `site2-storage` from the above configmap.
If the clusterID mapping is found now Ceph-CSI
will look for the poolID mapping ie mapping between
`1` and `2`. Example:- pool with name exists on
both the clusters with different ID's Replicapool
with ID `1` on site1 and Replicapool with ID `2`
on site2. After getting the required mapping Ceph-CSI
has the required information to get more details
from the rados OMAP. If we have multiple clusterID mapping
it will loop through all the mapping and checks the
corresponding pool to get the OMAP data. If the clusterID
mapping does not exist Ceph-CSI will return an `Not Found`
error message to the caller.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2021-07-22 16:06:39 +05:30 committed by mergify[bot]
parent ac11d71e19
commit 92ad2ceec9
2 changed files with 155 additions and 10 deletions

View File

@ -932,11 +932,11 @@ func genSnapFromSnapID(
func generateVolumeFromVolumeID( func generateVolumeFromVolumeID(
ctx context.Context, ctx context.Context,
volumeID string, volumeID string,
vi util.CSIIdentifier,
cr *util.Credentials, cr *util.Credentials,
secrets map[string]string) (*rbdVolume, error) { secrets map[string]string) (*rbdVolume, error) {
var ( var (
options map[string]string options map[string]string
vi util.CSIIdentifier
rbdVol *rbdVolume rbdVol *rbdVolume
err error err error
) )
@ -946,13 +946,6 @@ func generateVolumeFromVolumeID(
// Mounter, MultiNodeWritable // Mounter, MultiNodeWritable
rbdVol = &rbdVolume{} rbdVol = &rbdVolume{}
rbdVol.VolID = volumeID rbdVol.VolID = volumeID
err = vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {
return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
ErrInvalidVolID, err, rbdVol.VolID)
}
// TODO check clusterID mapping exists // TODO check clusterID mapping exists
rbdVol.ClusterID = vi.ClusterID rbdVol.ClusterID = vi.ClusterID
@ -1031,12 +1024,37 @@ func genVolFromVolID(
volumeID string, volumeID string,
cr *util.Credentials, cr *util.Credentials,
secrets map[string]string) (*rbdVolume, error) { secrets map[string]string) (*rbdVolume, error) {
vol, err := generateVolumeFromVolumeID(ctx, volumeID, cr, secrets) var (
vi util.CSIIdentifier
vol *rbdVolume
)
err := vi.DecomposeCSIID(volumeID)
if err != nil {
return vol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
ErrInvalidVolID, err, volumeID)
}
vol, err = generateVolumeFromVolumeID(ctx, volumeID, vi, cr, secrets)
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) && if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) &&
!errors.Is(err, ErrImageNotFound) { !errors.Is(err, ErrImageNotFound) {
return vol, err return vol, err
} }
// Check clusterID mapping exists
mapping, mErr := util.GetClusterMappingInfo(vi.ClusterID)
if mErr != nil {
return vol, mErr
}
if mapping != nil {
rbdVol, vErr := generateVolumeFromMapping(ctx, mapping, volumeID, vi, cr, secrets)
if !errors.Is(vErr, util.ErrKeyNotFound) && !errors.Is(vErr, util.ErrPoolNotFound) &&
!errors.Is(vErr, ErrImageNotFound) {
return rbdVol, vErr
}
}
// TODO: remove extracting volumeID from PV annotations.
// If the volume details are not found in the OMAP it can be a mirrored RBD // If the volume details are not found in the OMAP it can be a mirrored RBD
// image and the OMAP is already generated and the volumeHandle might not // image and the OMAP is already generated and the volumeHandle might not
// be the same in the PV.Spec.CSI.VolumeHandle. Check the PV annotation for // be the same in the PV.Spec.CSI.VolumeHandle. Check the PV annotation for
@ -1054,8 +1072,13 @@ func genVolFromVolID(
if pvlist.Items[i].Spec.CSI != nil && pvlist.Items[i].Spec.CSI.VolumeHandle == volumeID { if pvlist.Items[i].Spec.CSI != nil && pvlist.Items[i].Spec.CSI.VolumeHandle == volumeID {
if v, ok := pvlist.Items[i].Annotations[PVVolumeHandleAnnotationKey]; ok { if v, ok := pvlist.Items[i].Annotations[PVVolumeHandleAnnotationKey]; ok {
util.UsefulLog(ctx, "found new volumeID %s for existing volumeID %s", v, volumeID) util.UsefulLog(ctx, "found new volumeID %s for existing volumeID %s", v, volumeID)
err = vi.DecomposeCSIID(v)
if err != nil {
return vol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
ErrInvalidVolID, err, v)
}
return generateVolumeFromVolumeID(ctx, v, cr, secrets) return generateVolumeFromVolumeID(ctx, v, vi, cr, secrets)
} }
} }
} }
@ -1063,6 +1086,76 @@ func genVolFromVolID(
return vol, err return vol, err
} }
// generateVolumeFromMapping checks the clusterID and poolID mapping and
// generates retrieves the OMAP information from the poolID got from the
// mapping.
func generateVolumeFromMapping(
ctx context.Context,
mapping *[]util.ClusterMappingInfo,
volumeID string,
vi util.CSIIdentifier,
cr *util.Credentials,
secrets map[string]string) (*rbdVolume, error) {
nvi := vi
vol := &rbdVolume{}
// extract clusterID mapping
for _, cm := range *mapping {
for key, val := range cm.ClusterIDMapping {
mappedClusterID := getMappedID(key, val, vi.ClusterID)
if mappedClusterID == "" {
continue
}
util.DebugLog(ctx,
"found new clusterID mapping %s for existing clusterID %s",
mappedClusterID,
vi.ClusterID)
// Add mapping clusterID to Identifier
nvi.ClusterID = mappedClusterID
poolID := fmt.Sprintf("%d", (vi.LocationID))
for _, pools := range cm.RBDpoolIDMappingInfo {
for key, val := range pools {
mappedPoolID := getMappedID(key, val, poolID)
if mappedPoolID == "" {
continue
}
util.DebugLog(ctx,
"found new poolID mapping %s for existing pooID %s",
mappedPoolID,
poolID)
pID, err := strconv.ParseInt(mappedPoolID, 10, 64)
if err != nil {
return vol, err
}
// Add mapping poolID to Identifier
nvi.LocationID = pID
vol, err = generateVolumeFromVolumeID(ctx, volumeID, nvi, cr, secrets)
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) &&
!errors.Is(err, ErrImageNotFound) {
return vol, err
}
}
}
}
}
return vol, util.ErrPoolNotFound
}
// getMappedID check the input id is matching key or value.
// If key==id the value will be returned.
// If value==id the key will be returned.
func getMappedID(key, value, id string) string {
if key == id {
return value
}
if value == id {
return key
}
return ""
}
func genVolFromVolumeOptions( func genVolFromVolumeOptions(
ctx context.Context, ctx context.Context,
volOptions, credentials map[string]string, volOptions, credentials map[string]string,

View File

@ -137,3 +137,55 @@ func TestValidateImageFeatures(t *testing.T) {
assert.Nil(t, err) assert.Nil(t, err)
} }
} }
func TestGetMappedID(t *testing.T) {
t.Parallel()
type args struct {
key string
value string
id string
}
tests := []struct {
name string
args args
expected string
}{
{
name: "test for matching key",
args: args{
key: "cluster1",
value: "cluster2",
id: "cluster1",
},
expected: "cluster2",
},
{
name: "test for matching value",
args: args{
key: "cluster1",
value: "cluster2",
id: "cluster2",
},
expected: "cluster1",
},
{
name: "test for invalid match",
args: args{
key: "cluster1",
value: "cluster2",
id: "cluster3",
},
expected: "",
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
val := getMappedID(tt.args.key, tt.args.value, tt.args.id)
if val != tt.expected {
t.Errorf("getMappedID() got = %v, expected %v", val, tt.expected)
}
})
}
}