mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-27 16:50:23 +00:00
rbd: read clusterID and PoolID from mapping
Whenever Ceph-CSI receives a CSI/Replication
request it will first decode the
volumeHandle and try to get the required
OMAP details if it is not able to
retrieve, receives a `Not Found` error
message and Ceph-CSI will check for the
clusterID mapping. If the old volumeID
`0001-00013-site1-storage-0000000000000001
-b0285c97-a0ce-11eb-8c66-0242ac110002`
contains the `site1-storage` as the clusterID,
now Ceph-CSI will look for the corresponding
clusterID `site2-storage` from the above configmap.
If the clusterID mapping is found now Ceph-CSI
will look for the poolID mapping ie mapping between
`1` and `2`. Example:- pool with name exists on
both the clusters with different ID's Replicapool
with ID `1` on site1 and Replicapool with ID `2`
on site2. After getting the required mapping Ceph-CSI
has the required information to get more details
from the rados OMAP. If we have multiple clusterID mapping
it will loop through all the mapping and checks the
corresponding pool to get the OMAP data. If the clusterID
mapping does not exist Ceph-CSI will return an `Not Found`
error message to the caller.
Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
(cherry picked from commit 92ad2ceec9
)
This commit is contained in:
parent
daea5177e5
commit
829fc5ed95
@ -932,11 +932,11 @@ func genSnapFromSnapID(
|
||||
func generateVolumeFromVolumeID(
|
||||
ctx context.Context,
|
||||
volumeID string,
|
||||
vi util.CSIIdentifier,
|
||||
cr *util.Credentials,
|
||||
secrets map[string]string) (*rbdVolume, error) {
|
||||
var (
|
||||
options map[string]string
|
||||
vi util.CSIIdentifier
|
||||
rbdVol *rbdVolume
|
||||
err error
|
||||
)
|
||||
@ -946,13 +946,6 @@ func generateVolumeFromVolumeID(
|
||||
// Mounter, MultiNodeWritable
|
||||
rbdVol = &rbdVolume{}
|
||||
rbdVol.VolID = volumeID
|
||||
|
||||
err = vi.DecomposeCSIID(rbdVol.VolID)
|
||||
if err != nil {
|
||||
return rbdVol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
|
||||
ErrInvalidVolID, err, rbdVol.VolID)
|
||||
}
|
||||
|
||||
// TODO check clusterID mapping exists
|
||||
|
||||
rbdVol.ClusterID = vi.ClusterID
|
||||
@ -1031,12 +1024,37 @@ func genVolFromVolID(
|
||||
volumeID string,
|
||||
cr *util.Credentials,
|
||||
secrets map[string]string) (*rbdVolume, error) {
|
||||
vol, err := generateVolumeFromVolumeID(ctx, volumeID, cr, secrets)
|
||||
var (
|
||||
vi util.CSIIdentifier
|
||||
vol *rbdVolume
|
||||
)
|
||||
|
||||
err := vi.DecomposeCSIID(volumeID)
|
||||
if err != nil {
|
||||
return vol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
|
||||
ErrInvalidVolID, err, volumeID)
|
||||
}
|
||||
|
||||
vol, err = generateVolumeFromVolumeID(ctx, volumeID, vi, cr, secrets)
|
||||
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) &&
|
||||
!errors.Is(err, ErrImageNotFound) {
|
||||
return vol, err
|
||||
}
|
||||
|
||||
// Check clusterID mapping exists
|
||||
mapping, mErr := util.GetClusterMappingInfo(vi.ClusterID)
|
||||
if mErr != nil {
|
||||
return vol, mErr
|
||||
}
|
||||
if mapping != nil {
|
||||
rbdVol, vErr := generateVolumeFromMapping(ctx, mapping, volumeID, vi, cr, secrets)
|
||||
if !errors.Is(vErr, util.ErrKeyNotFound) && !errors.Is(vErr, util.ErrPoolNotFound) &&
|
||||
!errors.Is(vErr, ErrImageNotFound) {
|
||||
return rbdVol, vErr
|
||||
}
|
||||
}
|
||||
// TODO: remove extracting volumeID from PV annotations.
|
||||
|
||||
// If the volume details are not found in the OMAP it can be a mirrored RBD
|
||||
// image and the OMAP is already generated and the volumeHandle might not
|
||||
// be the same in the PV.Spec.CSI.VolumeHandle. Check the PV annotation for
|
||||
@ -1054,8 +1072,13 @@ func genVolFromVolID(
|
||||
if pvlist.Items[i].Spec.CSI != nil && pvlist.Items[i].Spec.CSI.VolumeHandle == volumeID {
|
||||
if v, ok := pvlist.Items[i].Annotations[PVVolumeHandleAnnotationKey]; ok {
|
||||
util.UsefulLog(ctx, "found new volumeID %s for existing volumeID %s", v, volumeID)
|
||||
err = vi.DecomposeCSIID(v)
|
||||
if err != nil {
|
||||
return vol, fmt.Errorf("%w: error decoding volume ID (%s) (%s)",
|
||||
ErrInvalidVolID, err, v)
|
||||
}
|
||||
|
||||
return generateVolumeFromVolumeID(ctx, v, cr, secrets)
|
||||
return generateVolumeFromVolumeID(ctx, v, vi, cr, secrets)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1063,6 +1086,76 @@ func genVolFromVolID(
|
||||
return vol, err
|
||||
}
|
||||
|
||||
// generateVolumeFromMapping checks the clusterID and poolID mapping and
|
||||
// generates retrieves the OMAP information from the poolID got from the
|
||||
// mapping.
|
||||
func generateVolumeFromMapping(
|
||||
ctx context.Context,
|
||||
mapping *[]util.ClusterMappingInfo,
|
||||
volumeID string,
|
||||
vi util.CSIIdentifier,
|
||||
cr *util.Credentials,
|
||||
secrets map[string]string) (*rbdVolume, error) {
|
||||
nvi := vi
|
||||
vol := &rbdVolume{}
|
||||
// extract clusterID mapping
|
||||
for _, cm := range *mapping {
|
||||
for key, val := range cm.ClusterIDMapping {
|
||||
mappedClusterID := getMappedID(key, val, vi.ClusterID)
|
||||
if mappedClusterID == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
util.DebugLog(ctx,
|
||||
"found new clusterID mapping %s for existing clusterID %s",
|
||||
mappedClusterID,
|
||||
vi.ClusterID)
|
||||
// Add mapping clusterID to Identifier
|
||||
nvi.ClusterID = mappedClusterID
|
||||
poolID := fmt.Sprintf("%d", (vi.LocationID))
|
||||
for _, pools := range cm.RBDpoolIDMappingInfo {
|
||||
for key, val := range pools {
|
||||
mappedPoolID := getMappedID(key, val, poolID)
|
||||
if mappedPoolID == "" {
|
||||
continue
|
||||
}
|
||||
util.DebugLog(ctx,
|
||||
"found new poolID mapping %s for existing pooID %s",
|
||||
mappedPoolID,
|
||||
poolID)
|
||||
pID, err := strconv.ParseInt(mappedPoolID, 10, 64)
|
||||
if err != nil {
|
||||
return vol, err
|
||||
}
|
||||
// Add mapping poolID to Identifier
|
||||
nvi.LocationID = pID
|
||||
vol, err = generateVolumeFromVolumeID(ctx, volumeID, nvi, cr, secrets)
|
||||
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) &&
|
||||
!errors.Is(err, ErrImageNotFound) {
|
||||
return vol, err
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return vol, util.ErrPoolNotFound
|
||||
}
|
||||
|
||||
// getMappedID check the input id is matching key or value.
|
||||
// If key==id the value will be returned.
|
||||
// If value==id the key will be returned.
|
||||
func getMappedID(key, value, id string) string {
|
||||
if key == id {
|
||||
return value
|
||||
}
|
||||
if value == id {
|
||||
return key
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func genVolFromVolumeOptions(
|
||||
ctx context.Context,
|
||||
volOptions, credentials map[string]string,
|
||||
|
@ -137,3 +137,55 @@ func TestValidateImageFeatures(t *testing.T) {
|
||||
assert.Nil(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetMappedID(t *testing.T) {
|
||||
t.Parallel()
|
||||
type args struct {
|
||||
key string
|
||||
value string
|
||||
id string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "test for matching key",
|
||||
args: args{
|
||||
key: "cluster1",
|
||||
value: "cluster2",
|
||||
id: "cluster1",
|
||||
},
|
||||
expected: "cluster2",
|
||||
},
|
||||
{
|
||||
name: "test for matching value",
|
||||
args: args{
|
||||
key: "cluster1",
|
||||
value: "cluster2",
|
||||
id: "cluster2",
|
||||
},
|
||||
expected: "cluster1",
|
||||
},
|
||||
{
|
||||
name: "test for invalid match",
|
||||
args: args{
|
||||
key: "cluster1",
|
||||
value: "cluster2",
|
||||
id: "cluster3",
|
||||
},
|
||||
expected: "",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tt := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
val := getMappedID(tt.args.key, tt.args.value, tt.args.id)
|
||||
if val != tt.expected {
|
||||
t.Errorf("getMappedID() got = %v, expected %v", val, tt.expected)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user