From 829fc5ed957fd977b69f4c603b97a5b44ad50838 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Thu, 22 Jul 2021 16:06:39 +0530 Subject: [PATCH] rbd: read clusterID and PoolID from mapping Whenever Ceph-CSI receives a CSI/Replication request it will first decode the volumeHandle and try to get the required OMAP details if it is not able to retrieve, receives a `Not Found` error message and Ceph-CSI will check for the clusterID mapping. If the old volumeID `0001-00013-site1-storage-0000000000000001 -b0285c97-a0ce-11eb-8c66-0242ac110002` contains the `site1-storage` as the clusterID, now Ceph-CSI will look for the corresponding clusterID `site2-storage` from the above configmap. If the clusterID mapping is found now Ceph-CSI will look for the poolID mapping ie mapping between `1` and `2`. Example:- pool with name exists on both the clusters with different ID's Replicapool with ID `1` on site1 and Replicapool with ID `2` on site2. After getting the required mapping Ceph-CSI has the required information to get more details from the rados OMAP. If we have multiple clusterID mapping it will loop through all the mapping and checks the corresponding pool to get the OMAP data. If the clusterID mapping does not exist Ceph-CSI will return an `Not Found` error message to the caller. Signed-off-by: Madhu Rajanna (cherry picked from commit 92ad2ceec977f482060544c94f9228cfdcf586cb) --- internal/rbd/rbd_util.go | 113 +++++++++++++++++++++++++++++++--- internal/rbd/rbd_util_test.go | 52 ++++++++++++++++ 2 files changed, 155 insertions(+), 10 deletions(-) diff --git a/internal/rbd/rbd_util.go b/internal/rbd/rbd_util.go index 858cce1bd..8a6a24352 100644 --- a/internal/rbd/rbd_util.go +++ b/internal/rbd/rbd_util.go @@ -932,11 +932,11 @@ func genSnapFromSnapID( func generateVolumeFromVolumeID( ctx context.Context, volumeID string, + vi util.CSIIdentifier, cr *util.Credentials, secrets map[string]string) (*rbdVolume, error) { var ( options map[string]string - vi util.CSIIdentifier rbdVol *rbdVolume err error ) @@ -946,13 +946,6 @@ func generateVolumeFromVolumeID( // Mounter, MultiNodeWritable rbdVol = &rbdVolume{} rbdVol.VolID = volumeID - - err = vi.DecomposeCSIID(rbdVol.VolID) - if err != nil { - return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)", - ErrInvalidVolID, err, rbdVol.VolID) - } - // TODO check clusterID mapping exists rbdVol.ClusterID = vi.ClusterID @@ -1031,12 +1024,37 @@ func genVolFromVolID( volumeID string, cr *util.Credentials, secrets map[string]string) (*rbdVolume, error) { - vol, err := generateVolumeFromVolumeID(ctx, volumeID, cr, secrets) + var ( + vi util.CSIIdentifier + vol *rbdVolume + ) + + err := vi.DecomposeCSIID(volumeID) + if err != nil { + return vol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)", + ErrInvalidVolID, err, volumeID) + } + + vol, err = generateVolumeFromVolumeID(ctx, volumeID, vi, cr, secrets) if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) && !errors.Is(err, ErrImageNotFound) { return vol, err } + // Check clusterID mapping exists + mapping, mErr := util.GetClusterMappingInfo(vi.ClusterID) + if mErr != nil { + return vol, mErr + } + if mapping != nil { + rbdVol, vErr := generateVolumeFromMapping(ctx, mapping, volumeID, vi, cr, secrets) + if !errors.Is(vErr, util.ErrKeyNotFound) && !errors.Is(vErr, util.ErrPoolNotFound) && + !errors.Is(vErr, ErrImageNotFound) { + return rbdVol, vErr + } + } + // TODO: remove extracting volumeID from PV annotations. + // If the volume details are not found in the OMAP it can be a mirrored RBD // image and the OMAP is already generated and the volumeHandle might not // be the same in the PV.Spec.CSI.VolumeHandle. Check the PV annotation for @@ -1054,8 +1072,13 @@ func genVolFromVolID( if pvlist.Items[i].Spec.CSI != nil && pvlist.Items[i].Spec.CSI.VolumeHandle == volumeID { if v, ok := pvlist.Items[i].Annotations[PVVolumeHandleAnnotationKey]; ok { util.UsefulLog(ctx, "found new volumeID %s for existing volumeID %s", v, volumeID) + err = vi.DecomposeCSIID(v) + if err != nil { + return vol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)", + ErrInvalidVolID, err, v) + } - return generateVolumeFromVolumeID(ctx, v, cr, secrets) + return generateVolumeFromVolumeID(ctx, v, vi, cr, secrets) } } } @@ -1063,6 +1086,76 @@ func genVolFromVolID( return vol, err } +// generateVolumeFromMapping checks the clusterID and poolID mapping and +// generates retrieves the OMAP information from the poolID got from the +// mapping. +func generateVolumeFromMapping( + ctx context.Context, + mapping *[]util.ClusterMappingInfo, + volumeID string, + vi util.CSIIdentifier, + cr *util.Credentials, + secrets map[string]string) (*rbdVolume, error) { + nvi := vi + vol := &rbdVolume{} + // extract clusterID mapping + for _, cm := range *mapping { + for key, val := range cm.ClusterIDMapping { + mappedClusterID := getMappedID(key, val, vi.ClusterID) + if mappedClusterID == "" { + continue + } + + util.DebugLog(ctx, + "found new clusterID mapping %s for existing clusterID %s", + mappedClusterID, + vi.ClusterID) + // Add mapping clusterID to Identifier + nvi.ClusterID = mappedClusterID + poolID := fmt.Sprintf("%d", (vi.LocationID)) + for _, pools := range cm.RBDpoolIDMappingInfo { + for key, val := range pools { + mappedPoolID := getMappedID(key, val, poolID) + if mappedPoolID == "" { + continue + } + util.DebugLog(ctx, + "found new poolID mapping %s for existing pooID %s", + mappedPoolID, + poolID) + pID, err := strconv.ParseInt(mappedPoolID, 10, 64) + if err != nil { + return vol, err + } + // Add mapping poolID to Identifier + nvi.LocationID = pID + vol, err = generateVolumeFromVolumeID(ctx, volumeID, nvi, cr, secrets) + if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) && + !errors.Is(err, ErrImageNotFound) { + return vol, err + } + } + } + } + } + + return vol, util.ErrPoolNotFound +} + +// getMappedID check the input id is matching key or value. +// If key==id the value will be returned. +// If value==id the key will be returned. +func getMappedID(key, value, id string) string { + if key == id { + return value + } + if value == id { + return key + } + + return "" +} + func genVolFromVolumeOptions( ctx context.Context, volOptions, credentials map[string]string, diff --git a/internal/rbd/rbd_util_test.go b/internal/rbd/rbd_util_test.go index 705463bba..1dfa35d7d 100644 --- a/internal/rbd/rbd_util_test.go +++ b/internal/rbd/rbd_util_test.go @@ -137,3 +137,55 @@ func TestValidateImageFeatures(t *testing.T) { assert.Nil(t, err) } } + +func TestGetMappedID(t *testing.T) { + t.Parallel() + type args struct { + key string + value string + id string + } + tests := []struct { + name string + args args + expected string + }{ + { + name: "test for matching key", + args: args{ + key: "cluster1", + value: "cluster2", + id: "cluster1", + }, + expected: "cluster2", + }, + { + name: "test for matching value", + args: args{ + key: "cluster1", + value: "cluster2", + id: "cluster2", + }, + expected: "cluster1", + }, + { + name: "test for invalid match", + args: args{ + key: "cluster1", + value: "cluster2", + id: "cluster3", + }, + expected: "", + }, + } + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + val := getMappedID(tt.args.key, tt.args.value, tt.args.id) + if val != tt.expected { + t.Errorf("getMappedID() got = %v, expected %v", val, tt.expected) + } + }) + } +}