rbd: fill clusterID if its a migration nodestage request

the migration nodestage request does not carry the 'clusterID' in it
and only monitors are available with the volumeContext. The volume
context flag 'migration=true' and 'static=true' flags allow us to
fill 'clusterID' from the passed in monitors to the volume Context,so
that rest of the static operations on nodestage can be proceeded as we
do treat static volumes today.

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal 2021-09-16 13:56:06 +05:30 committed by mergify[bot]
parent 1f5963919f
commit 2e8e8f5e64
4 changed files with 49 additions and 7 deletions

View File

@ -149,6 +149,25 @@ func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *r
return nil
}
// getClusterIDFromMigrationVolume fills the clusterID for the passed in monitors.
func getClusterIDFromMigrationVolume(parameters map[string]string) (string, error) {
var err error
var rclusterID string
mons := parameters["monitors"]
for _, m := range strings.Split(mons, ",") {
rclusterID, err = util.GetClusterIDFromMon(m)
if err != nil && !errors.Is(err, util.ErrMissingConfigForMonitor) {
return "", err
}
if rclusterID != "" {
return rclusterID, nil
}
}
return "", err
}
// populateRbdVol update the fields in rbdVolume struct based on the request it received.
func populateRbdVol(
ctx context.Context,
@ -188,6 +207,10 @@ func populateRbdVol(
// get rbd image name from the volume journal
// for static volumes, the image name is actually the volume ID itself
if isStaticVol {
if req.GetVolumeContext()[intreeMigrationKey] == intreeMigrationLabel {
// if migration static volume, use imageName as volID
volID = req.GetVolumeContext()["imageName"]
}
rv.RbdImageName = volID
} else {
var vi util.CSIIdentifier
@ -268,6 +291,16 @@ func (ns *NodeServer) NodeStageVolume(
}
defer ns.VolumeLocks.Release(volID)
// Check this is a migration request because in that case, unlike other node stage requests
// it will be missing the clusterID, so fill it by fetching it from config file using mon.
if req.GetVolumeContext()[intreeMigrationKey] == intreeMigrationLabel && req.VolumeContext[util.ClusterIDKey] == "" {
cID, cErr := getClusterIDFromMigrationVolume(req.GetVolumeContext())
if cErr != nil {
return nil, status.Error(codes.Internal, cErr.Error())
}
req.VolumeContext[util.ClusterIDKey] = cID
}
stagingParentPath := req.GetStagingTargetPath()
stagingTargetPath := stagingParentPath + "/" + volID

View File

@ -73,6 +73,10 @@ const (
// thick provisioned or thin provisioned.
thickProvisionMetaData = "true"
thinProvisionMetaData = "false"
// these are the migration label key and value for parameters in volume context.
intreeMigrationKey = "migration"
intreeMigrationLabel = "true"
)
// rbdImage contains common attributes and methods for the rbdVolume and

View File

@ -33,7 +33,7 @@ const (
CsiConfigFile = "/etc/ceph-csi-config/config.json"
// ClusterIDKey is the name of the key containing clusterID.
clusterIDKey = "clusterID"
ClusterIDKey = "clusterID"
)
// ClusterInfo strongly typed JSON spec for the below JSON structure.
@ -154,7 +154,7 @@ func GetMonsAndClusterID(ctx context.Context, clusterID string, checkClusterIDMa
// GetClusterID fetches clusterID from given options map.
func GetClusterID(options map[string]string) (string, error) {
clusterID, ok := options[clusterIDKey]
clusterID, ok := options[ClusterIDKey]
if !ok {
return "", ErrClusterIDNotSet
}
@ -168,11 +168,8 @@ func GetClusterID(options map[string]string) (string, error) {
// else error.
func GetClusterIDFromMon(mon string) (string, error) {
clusterID, err := readClusterInfoWithMon(CsiConfigFile, mon)
if err != nil {
return "", err
}
return clusterID, nil
return clusterID, err
}
func readClusterInfoWithMon(pathToConfig, mon string) (string, error) {
@ -193,6 +190,12 @@ func readClusterInfoWithMon(pathToConfig, mon string) (string, error) {
}
for _, cluster := range config {
// as the same mons can fall into different clusterIDs with
// different radosnamespace configurations, we are bailing out
// if radosnamespace configuration is found for this cluster
if cluster.RadosNamespace != "" {
continue
}
for _, m := range cluster.Monitors {
if m == mon {
return cluster.ClusterID, nil
@ -200,5 +203,5 @@ func readClusterInfoWithMon(pathToConfig, mon string) (string, error) {
}
}
return "", fmt.Errorf("missing configuration of cluster ID for mon %q", mon)
return "", ErrMissingConfigForMonitor
}

View File

@ -35,6 +35,8 @@ var (
ErrPoolNotFound = errors.New("pool not found")
// ErrClusterIDNotSet is returned when cluster id is not set.
ErrClusterIDNotSet = errors.New("clusterID must be set")
// ErrMissingConfigForMonitor is returned when clusterID is not found for the mon.
ErrMissingConfigForMonitor = errors.New("missing configuration of cluster ID for monitor")
)
type errorPair struct {