rbd: check for clusterid mapping in RegenerateJournal()

This commit adds fetchMappedClusterIDAndMons() which returns
monitors and clusterID info after checking cluster mapping info.
This is required for regenerating omap entries in mirrored cluster
with different clusterID.

Signed-off-by: Rakshith R <rar@redhat.com>
This commit is contained in:
Rakshith R 2021-08-26 15:51:29 +05:30 committed by mergify[bot]
parent 496bcba85c
commit 99168dc822
3 changed files with 192 additions and 14 deletions

View File

@ -523,6 +523,7 @@ func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent
// complete omap mapping between imageName and volumeID. // complete omap mapping between imageName and volumeID.
// RegenerateJournal performs below operations // RegenerateJournal performs below operations
// Extract clusterID, Mons after checkig clusterID mapping
// Extract parameters journalPool, pool from volumeAttributes // Extract parameters journalPool, pool from volumeAttributes
// Extract optional parameters volumeNamePrefix, kmsID, owner from volumeAttributes // Extract optional parameters volumeNamePrefix, kmsID, owner from volumeAttributes
// Extract information from volumeID // Extract information from volumeID
@ -538,15 +539,13 @@ func RegenerateJournal(
cr *util.Credentials) (string, error) { cr *util.Credentials) (string, error) {
ctx := context.Background() ctx := context.Background()
var ( var (
options map[string]string vi util.CSIIdentifier
vi util.CSIIdentifier rbdVol *rbdVolume
rbdVol *rbdVolume kmsID string
kmsID string err error
err error ok bool
ok bool
) )
options = make(map[string]string)
rbdVol = &rbdVolume{} rbdVol = &rbdVolume{}
rbdVol.VolID = volumeID rbdVol.VolID = volumeID
@ -561,14 +560,8 @@ func RegenerateJournal(
return "", err return "", err
} }
// TODO check clusterID mapping exists rbdVol.Monitors, rbdVol.ClusterID, err = util.FetchMappedClusterIDAndMons(ctx, vi.ClusterID)
rbdVol.ClusterID = vi.ClusterID
options["clusterID"] = rbdVol.ClusterID
rbdVol.Monitors, _, err = util.GetMonsAndClusterID(options)
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed getting mons (%s)", err)
return "", err return "", err
} }

View File

@ -17,11 +17,14 @@ limitations under the License.
package util package util
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"github.com/ceph/ceph-csi/internal/util/log"
) )
// clusterMappingConfigFile is the location of the cluster mapping config file. // clusterMappingConfigFile is the location of the cluster mapping config file.
@ -134,3 +137,53 @@ func GetMappedID(key, value, id string) string {
return "" return ""
} }
// fetchMappedClusterIDAndMons returns monitors and clusterID info after checking cluster mapping.
func fetchMappedClusterIDAndMons(ctx context.Context,
clusterID, clusterMappingConfigFile, csiConfigFile string) (string, string, error) {
var mons string
clusterMappingInfo, err := getClusterMappingInfo(clusterID, clusterMappingConfigFile)
if err != nil {
return "", "", err
}
if clusterMappingInfo != nil {
for _, cm := range *clusterMappingInfo {
for key, val := range cm.ClusterIDMapping {
mappedClusterID := GetMappedID(key, val, clusterID)
if mappedClusterID == "" {
continue
}
log.DebugLog(ctx,
"found new clusterID mapping %q for existing clusterID %q",
mappedClusterID,
clusterID)
mons, err = Mons(csiConfigFile, mappedClusterID)
if err != nil {
log.DebugLog(ctx, "failed getting mons with mapped cluster id %q: %v",
mappedClusterID, err)
continue
}
return mons, mappedClusterID, nil
}
}
}
// check original clusterID for backward compatibility when cluster ids were expected to be same.
mons, err = Mons(csiConfigFile, clusterID)
if err != nil {
log.ErrorLog(ctx, "failed getting mons with cluster id %q: %v", clusterID, err)
return "", "", err
}
return mons, clusterID, err
}
// FetchMappedClusterIDAndMons returns monitors and clusterID info after checking cluster mapping.
func FetchMappedClusterIDAndMons(ctx context.Context, clusterID string) (string, string, error) {
return fetchMappedClusterIDAndMons(ctx, clusterID, clusterMappingConfigFile, CsiConfigFile)
}

View File

@ -17,10 +17,12 @@ limitations under the License.
package util package util
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"reflect" "reflect"
"strings"
"testing" "testing"
) )
@ -291,3 +293,133 @@ func TestGetMappedID(t *testing.T) {
}) })
} }
} }
func TestFetchMappedClusterIDAndMons(t *testing.T) {
t.Parallel()
ctx := context.TODO()
type args struct {
ctx context.Context
clusterID string
}
mappingBasePath := t.TempDir()
csiConfigFile := mappingBasePath + "/config.json"
clusterMappingConfigFile := mappingBasePath + "/cluster-mapping.json"
csiConfig := []ClusterInfo{
{
ClusterID: "cluster-1",
Monitors: []string{"ip-1", "ip-2"},
},
{
ClusterID: "cluster-2",
Monitors: []string{"ip-3", "ip-4"},
},
}
csiConfigFileContent, err := json.Marshal(csiConfig)
if err != nil {
t.Errorf("failed to marshal csi config info %v", err)
}
err = ioutil.WriteFile(csiConfigFile, csiConfigFileContent, 0o600)
if err != nil {
t.Errorf("failed to write %s file content: %v", CsiConfigFile, err)
}
t.Run("cluster-mapping.json does not exist", func(t *testing.T) {
_, _, err = fetchMappedClusterIDAndMons(ctx, "cluster-2", clusterMappingConfigFile, csiConfigFile)
if err != nil {
t.Errorf("FetchMappedClusterIDAndMons() error = %v, wantErr %v", err, nil)
}
})
clusterMapping := []ClusterMappingInfo{
{
ClusterIDMapping: map[string]string{
"cluster-1": "cluster-3",
},
},
{
ClusterIDMapping: map[string]string{
"cluster-1": "cluster-4",
},
},
{
ClusterIDMapping: map[string]string{
"cluster-4": "cluster-3",
},
},
}
clusterMappingFileContent, err := json.Marshal(clusterMapping)
if err != nil {
t.Errorf("failed to marshal mapping info %v", err)
}
err = ioutil.WriteFile(clusterMappingConfigFile, clusterMappingFileContent, 0o600)
if err != nil {
t.Errorf("failed to write %s file content: %v", clusterMappingFileContent, err)
}
tests := []struct {
name string
args args
want string
want1 string
wantErr bool
}{
{
name: "test cluster id=cluster-1",
args: args{
ctx: ctx,
clusterID: "cluster-1",
},
want: strings.Join(csiConfig[0].Monitors, ","),
want1: "cluster-1",
wantErr: false,
},
{
name: "test cluster id=cluster-3",
args: args{
ctx: ctx,
clusterID: "cluster-3",
},
want: strings.Join(csiConfig[0].Monitors, ","),
want1: "cluster-1",
wantErr: false,
},
{
name: "test cluster id=cluster-4",
args: args{
ctx: ctx,
clusterID: "cluster-4",
},
want: strings.Join(csiConfig[0].Monitors, ","),
want1: "cluster-1",
wantErr: false,
},
{
name: "test missing cluster id=cluster-6",
args: args{
ctx: ctx,
clusterID: "cluster-6",
},
want: "",
want1: "",
wantErr: true,
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
got, got1, err := fetchMappedClusterIDAndMons(ctx, tt.args.clusterID, clusterMappingConfigFile, csiConfigFile)
if (err != nil) != tt.wantErr {
t.Errorf("FetchMappedClusterIDAndMons() error = %v, wantErr %v", err, tt.wantErr)
return
}
if got != tt.want {
t.Errorf("FetchMappedClusterIDAndMons() got = %v, want %v", got, tt.want)
}
if got1 != tt.want1 {
t.Errorf("FetchMappedClusterIDAndMons() got1 = %v, want %v", got1, tt.want1)
}
})
}
}