rbd: check for clusterid mapping in genVolFromVolumeOptions()

This commit adds capability to genVolFromVolumeOptions() to fetch
mapped clusted-id & mon ips for mirrored PVC on secondary cluster
which may have different cluster-id.

This is required for NodeStageVolume().

We also don't need to check for mapping during volume create requests,
so it can be disabled by passing a bool checkClusterIDMapping as false.

GetMonsAndClusterID() is modified to accept bool checkClusterIDMapping
based on which clustermapping is checked to fetch mapped cluster-id and
mon-ips.

Signed-off-by: Rakshith R <rar@redhat.com>
(cherry picked from commit 9d1e98ca60)
This commit is contained in:
Rakshith R 2021-09-06 10:57:50 +05:30 committed by mergify[bot]
parent f77e1a9e27
commit b7505c29e2
5 changed files with 21 additions and 10 deletions

View File

@ -128,7 +128,7 @@ func genSnapFromOptions(ctx context.Context, req *csi.CreateSnapshotRequest) (sn
cephfsSnap.RequestName = req.GetName() cephfsSnap.RequestName = req.GetName()
snapOptions := req.GetParameters() snapOptions := req.GetParameters()
cephfsSnap.Monitors, cephfsSnap.ClusterID, err = util.GetMonsAndClusterID(snapOptions) cephfsSnap.Monitors, cephfsSnap.ClusterID, err = util.GetMonsAndClusterID(ctx, snapOptions, false)
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed getting mons (%s)", err) util.ErrorLog(ctx, "failed getting mons (%s)", err)

View File

@ -122,7 +122,10 @@ func (cs *ControllerServer) parseVolCreateRequest(
} }
// if it's NOT SINGLE_NODE_WRITER and it's BLOCK we'll set the parameter to ignore the in-use checks // if it's NOT SINGLE_NODE_WRITER and it's BLOCK we'll set the parameter to ignore the in-use checks
rbdVol, err := genVolFromVolumeOptions(ctx, req.GetParameters(), req.GetSecrets(), (isMultiNode && isBlock)) rbdVol, err := genVolFromVolumeOptions(
ctx,
req.GetParameters(), req.GetSecrets(),
(isMultiNode && isBlock), false)
if err != nil { if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }

View File

@ -231,7 +231,7 @@ func (ns *NodeServer) NodeStageVolume(
return nil, status.Error(codes.InvalidArgument, "missing required parameter imageFeatures") return nil, status.Error(codes.InvalidArgument, "missing required parameter imageFeatures")
} }
volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks) volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks, true)
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }

View File

@ -862,7 +862,7 @@ func genSnapFromSnapID(
rbdSnap.ClusterID = vi.ClusterID rbdSnap.ClusterID = vi.ClusterID
options["clusterID"] = rbdSnap.ClusterID options["clusterID"] = rbdSnap.ClusterID
rbdSnap.Monitors, _, err = util.GetMonsAndClusterID(options) rbdSnap.Monitors, _, err = util.GetMonsAndClusterID(ctx, options, false)
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed getting mons (%s)", err) util.ErrorLog(ctx, "failed getting mons (%s)", err)
@ -946,12 +946,11 @@ func generateVolumeFromVolumeID(
// Mounter, MultiNodeWritable // Mounter, MultiNodeWritable
rbdVol = &rbdVolume{} rbdVol = &rbdVolume{}
rbdVol.VolID = volumeID rbdVol.VolID = volumeID
// TODO check clusterID mapping exists
rbdVol.ClusterID = vi.ClusterID rbdVol.ClusterID = vi.ClusterID
options["clusterID"] = rbdVol.ClusterID options["clusterID"] = rbdVol.ClusterID
rbdVol.Monitors, _, err = util.GetMonsAndClusterID(options) rbdVol.Monitors, _, err = util.GetMonsAndClusterID(ctx, options, false)
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed getting mons (%s)", err) util.ErrorLog(ctx, "failed getting mons (%s)", err)
@ -1149,7 +1148,7 @@ func generateVolumeFromMapping(
func genVolFromVolumeOptions( func genVolFromVolumeOptions(
ctx context.Context, ctx context.Context,
volOptions, credentials map[string]string, volOptions, credentials map[string]string,
disableInUseChecks bool) (*rbdVolume, error) { disableInUseChecks, checkClusterIDMapping bool) (*rbdVolume, error) {
var ( var (
ok bool ok bool
err error err error
@ -1167,7 +1166,7 @@ func genVolFromVolumeOptions(
rbdVol.NamePrefix = namePrefix rbdVol.NamePrefix = namePrefix
} }
rbdVol.Monitors, rbdVol.ClusterID, err = util.GetMonsAndClusterID(volOptions) rbdVol.Monitors, rbdVol.ClusterID, err = util.GetMonsAndClusterID(ctx, volOptions, checkClusterIDMapping)
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed getting mons (%s)", err) util.ErrorLog(ctx, "failed getting mons (%s)", err)
@ -1244,7 +1243,7 @@ func genSnapFromOptions(ctx context.Context, rbdVol *rbdVolume, snapOptions map[
rbdSnap.JournalPool = rbdVol.JournalPool rbdSnap.JournalPool = rbdVol.JournalPool
rbdSnap.RadosNamespace = rbdVol.RadosNamespace rbdSnap.RadosNamespace = rbdVol.RadosNamespace
rbdSnap.Monitors, rbdSnap.ClusterID, err = util.GetMonsAndClusterID(snapOptions) rbdSnap.Monitors, rbdSnap.ClusterID, err = util.GetMonsAndClusterID(ctx, snapOptions, false)
if err != nil { if err != nil {
util.ErrorLog(ctx, "failed getting mons (%s)", err) util.ErrorLog(ctx, "failed getting mons (%s)", err)

View File

@ -17,6 +17,7 @@ limitations under the License.
package util package util
import ( import (
"context"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -131,11 +132,19 @@ func CephFSSubvolumeGroup(pathToConfig, clusterID string) (string, error) {
// GetMonsAndClusterID returns monitors and clusterID information read from // GetMonsAndClusterID returns monitors and clusterID information read from
// configfile. // configfile.
func GetMonsAndClusterID(options map[string]string) (string, string, error) { func GetMonsAndClusterID(ctx context.Context, options map[string]string, checkClusterIDMapping bool) (string, string, error) {
clusterID, ok := options["clusterID"] clusterID, ok := options["clusterID"]
if !ok { if !ok {
return "", "", errors.New("clusterID must be set") return "", "", errors.New("clusterID must be set")
} }
if checkClusterIDMapping {
monitors, mappedClusterID, err := FetchMappedClusterIDAndMons(ctx, clusterID)
if err != nil {
return "", "", err
}
return monitors, mappedClusterID, nil
}
monitors, err := Mons(CsiConfigFile, clusterID) monitors, err := Mons(CsiConfigFile, clusterID)
if err != nil { if err != nil {