util: add helper function to read clusterID mapping

added helper function to read the clusterID mapping
from the mounted file.

The clusterID mapping contains below mappings
* ClusterID mappings (to cluster to which we are failingover
and from which cluster failover happened)
* RBD PoolID mapping of between the clusters.
* CephFS FscID mapping between the clusters.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2021-07-22 16:03:48 +05:30 committed by mergify[bot]
parent fce5a181d0
commit ac11d71e19
2 changed files with 365 additions and 0 deletions

View File

@ -0,0 +1,116 @@
/*
Copyright 2021 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"os"
)
// clusterMappingConfigFile is the location of the cluster mapping config file.
var clusterMappingConfigFile = "/etc/ceph-csi-config/cluster-mapping.json"
// ClusterMappingInfo holds the details of clusterID mapping and poolID mapping.
type ClusterMappingInfo struct {
// ClusterIDMapping holds the details of clusterID mapping
ClusterIDMapping map[string]string `json:"clusterIDMapping"`
// rbdpoolIDMappingInfo holds the details of RBD poolID mapping.
RBDpoolIDMappingInfo []map[string]string `json:"RBDPoolIDMapping"`
// cephFSpoolIDMappingInfo holds the details of CephFS Fscid mapping.
CephFSFscIDMappingInfo []map[string]string `json:"CephFSFscIDMapping"`
}
// Expected JSON structure in the passed in config file is,
// [{
// "clusterIDMapping": {
// "site1-storage": "site2-storage"
// },
// "RBDPoolIDMapping": [{
// "1": "2",
// "11": "12"
// }],
// "CephFSFscIDMapping": [{
// "13": "34",
// "3": "4"
// }]
// }, {
// "clusterIDMapping": {
// "site3-storage": "site2-storage"
// },
// "RBDPoolIDMapping": [{
// "5": "2",
// "16": "12"
// }],
// "CephFSFscIDMapping": [{
// "3": "34",
// "4": "4"
// }]
// ...
// }]
func readClusterMappingInfo() (*[]ClusterMappingInfo, error) {
var info []ClusterMappingInfo
content, err := ioutil.ReadFile(clusterMappingConfigFile)
if err != nil {
err = fmt.Errorf("error fetching clusterID mapping %w", err)
return nil, err
}
err = json.Unmarshal(content, &info)
if err != nil {
return nil, fmt.Errorf("unmarshal failed (%w), raw buffer response: %s",
err, string(content))
}
return &info, nil
}
// GetClusterMappingInfo returns corresponding cluster details like clusterID's
// poolID,fscID lists read from configfile.
func GetClusterMappingInfo(clusterID string) (*[]ClusterMappingInfo, error) {
var mappingInfo []ClusterMappingInfo
info, err := readClusterMappingInfo()
if err != nil {
// discard not found error as this file is expected to be created by
// the admin in case of failover.
if errors.Is(err, os.ErrNotExist) {
return nil, nil
}
return nil, fmt.Errorf("failed to fetch cluster mapping: %w", err)
}
for _, i := range *info {
for key, val := range i.ClusterIDMapping {
// Same file will be copied to the failover cluster check for both
// key and value to check clusterID mapping exists
if key == clusterID || val == clusterID {
mappingInfo = append(mappingInfo, i)
}
}
}
// if the mapping is not found return response as nil
if len(mappingInfo) == 0 {
return nil, nil
}
return &mappingInfo, nil
}

View File

@ -0,0 +1,249 @@
/*
Copyright 2021 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"encoding/json"
"fmt"
"io/ioutil"
"reflect"
"testing"
)
func TestGetClusterMappingInfo(t *testing.T) {
t.Parallel()
mappingBasePath := t.TempDir()
// clusterID,poolID on site1
clusterIDOfSite1 := "site1-storage"
rbdPoolIDOfSite1 := "1"
cephfsFscIDOfSite1 := "11"
// clusterID,poolID on site2
clusterIDOfSite2 := "site2-storage"
rbdPoolIDOfSite2 := "3"
cephfsFscIDOfSite2 := "5"
// clusterID,poolID on site3
clusterIDOfSite3 := "site3-storage"
rbdPoolIDOfSite3 := "8"
cephfsFscIDOfSite3 := "10"
clusterMappingInfos := make([]ClusterMappingInfo, 2)
// create mapping between site1 and site2
clusterIDmappingOfSite1To2 := make(map[string]string)
clusterIDmappingOfSite1To2[clusterIDOfSite1] = clusterIDOfSite2
rbdMappingOfSite1To2 := make([]map[string]string, 1)
rbdMappingOfSite1To2[0] = map[string]string{rbdPoolIDOfSite1: rbdPoolIDOfSite2}
cephFSMappingOfSite1To2 := make([]map[string]string, 1)
cephFSMappingOfSite1To2[0] = map[string]string{cephfsFscIDOfSite1: cephfsFscIDOfSite2}
mappingOfSite1To2 := ClusterMappingInfo{
clusterIDmappingOfSite1To2,
rbdMappingOfSite1To2,
cephFSMappingOfSite1To2,
}
// create mapping between site3 and site2
clusterIDmappingOfSite3To2 := make(map[string]string)
clusterIDmappingOfSite3To2[clusterIDOfSite3] = clusterIDOfSite2
rbdMappingOfSite3To2 := make([]map[string]string, 1)
rbdMappingOfSite3To2[0] = map[string]string{rbdPoolIDOfSite3: rbdPoolIDOfSite2}
cephFSMappingOfSite3To2 := make([]map[string]string, 1)
cephFSMappingOfSite3To2[0] = map[string]string{cephfsFscIDOfSite3: cephfsFscIDOfSite2}
mappingOfSite3To2 := ClusterMappingInfo{
clusterIDmappingOfSite3To2,
rbdMappingOfSite3To2,
cephFSMappingOfSite3To2,
}
clusterMappingInfos[0] = mappingOfSite1To2
clusterMappingInfos[1] = mappingOfSite3To2
mappingFileContent, err := json.Marshal(clusterMappingInfos)
if err != nil {
t.Errorf("failed to marshal mapping info %v", err)
}
// expected output of mapping
expectedSite2Data := clusterMappingInfos
expectedSite1To2Data := clusterMappingInfos[:1]
expectedSite3To2Data := clusterMappingInfos[1:]
tests := []struct {
name string
clusterID string
mappingFilecontent []byte
expectedData *[]ClusterMappingInfo
createMappingConfigFile bool
expectErr bool
}{
{
name: "mapping file not found",
clusterID: "site-a-clusterid",
createMappingConfigFile: false,
mappingFilecontent: []byte{},
expectedData: nil,
expectErr: false,
},
{
name: "mapping file found with empty data",
clusterID: "site-a-clusterid",
createMappingConfigFile: true,
mappingFilecontent: []byte{},
expectedData: nil,
expectErr: false,
},
{
name: "cluster-id mapping not found",
clusterID: "site-a-clusterid",
createMappingConfigFile: true,
mappingFilecontent: mappingFileContent,
expectedData: nil,
expectErr: false,
},
{
name: "site2-storage cluster-id mapping",
clusterID: clusterIDOfSite2,
createMappingConfigFile: true,
mappingFilecontent: mappingFileContent,
expectedData: &expectedSite2Data,
expectErr: false,
},
{
name: "site1-storage cluster-id mapping",
clusterID: clusterIDOfSite1,
createMappingConfigFile: true,
mappingFilecontent: mappingFileContent,
expectedData: &expectedSite1To2Data,
expectErr: false,
},
{
name: "site3-storage cluster-id mapping",
clusterID: clusterIDOfSite3,
createMappingConfigFile: true,
mappingFilecontent: mappingFileContent,
expectedData: &expectedSite3To2Data,
expectErr: false,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
if tt.createMappingConfigFile {
clusterMappingConfigFile = fmt.Sprintf("%s/mapping.json", mappingBasePath)
}
if len(tt.mappingFilecontent) != 0 {
err = ioutil.WriteFile(clusterMappingConfigFile, tt.mappingFilecontent, 0o600)
if err != nil {
t.Errorf("GetClusterMappingInfo() error = %v", err)
}
}
data, mErr := GetClusterMappingInfo(tt.clusterID)
if (mErr != nil) != tt.expectErr {
t.Errorf("GetClusterMappingInfo() error = %v, expected Error %v", mErr, tt.expectErr)
}
if !reflect.DeepEqual(data, tt.expectedData) {
t.Errorf("GetClusterMappingInfo() = %v, expected data %v", data, tt.expectedData)
}
})
}
clusterMappingConfigFile = fmt.Sprintf("%s/mapping.json", mappingBasePath)
err = ioutil.WriteFile(clusterMappingConfigFile, mappingFileContent, 0o600)
if err != nil {
t.Errorf("failed to write mapping content error = %v", err)
}
// validate site-3 to site-2 and site-1 to site-2 mappings when failover to
// site-2.
// The volumeId's from site-1 looks like `0001-0013-site1-storage-xyz` we
// need to have validate `site1-storage` to `site2-storage` mapping exists
// The volumeId's from site-3 looks like `0001-0013-site3-storage-xyz` we
// need to have validate `site3-storage` to `site2-storage` mapping exists
mappedClusterCount := 2
err = validateMapping(t, clusterIDOfSite2, rbdPoolIDOfSite2, cephfsFscIDOfSite2, mappedClusterCount)
if err != nil {
t.Error(err)
}
// validate site-2 and site-1 mappings when failback to site-1.
// The volumeId's from site-2 looks like `0001-0013-site2-storage-xyz` we
// need to have validate `site2-storage` to `site3-storage` mapping exists
mappedClusterCount = 1
err = validateMapping(t, clusterIDOfSite1, rbdPoolIDOfSite1, cephfsFscIDOfSite1, mappedClusterCount)
if err != nil {
t.Error(err)
}
// validate site-2 and site-3 mappings when failback to site-3
// The volumeId's from site-2 looks like `0001-0013-site2-storage-xyz` we
// need to have validate `site2-storage` to `site3-storage` mapping exists
mappedClusterCount = 1
err = validateMapping(t, clusterIDOfSite3, rbdPoolIDOfSite3, cephfsFscIDOfSite3, mappedClusterCount)
if err != nil {
t.Error(err)
}
}
func validateMapping(t *testing.T, clusterID, rbdPoolID, cephFSPoolID string, mappingCount int) error {
t.Helper()
mapping, err := GetClusterMappingInfo(clusterID)
if err != nil {
return fmt.Errorf("failed to retrieve mapping %w", err)
}
// verify we are able to retrieve both site-1:site2 and site-2:site3 mapping
if mapping == nil || len(*mapping) != mappingCount {
return fmt.Errorf(
"clusterID mapping got length=%v, expected length=%v",
len(*mapping),
mappingCount)
}
// check mapping rbd pool mapping exists in mapping
foundRBDPoolMappingCount := 0
foundCephFSPoolMappingCount := 0
for _, c := range *mapping {
for _, rp := range c.RBDpoolIDMappingInfo {
for k, v := range rp {
if k == rbdPoolID || v == rbdPoolID {
foundRBDPoolMappingCount++
}
}
}
for _, cp := range c.CephFSFscIDMappingInfo {
for k, v := range cp {
if k == cephFSPoolID || v == cephFSPoolID {
foundCephFSPoolMappingCount++
}
}
}
}
if foundRBDPoolMappingCount != mappingCount {
return fmt.Errorf(
"rbd pool mapping got length= %v, expected length=%v",
foundRBDPoolMappingCount,
mappingCount)
}
if foundCephFSPoolMappingCount != mappingCount {
return fmt.Errorf(
"cephFS filesystem mapping got length= %v, expected length=%v",
foundCephFSPoolMappingCount,
mappingCount)
}
return nil
}