cephfs: Add support for multiple subvolumegroups

With the current code base, the subvolumegroup will
be created once, and even for a different cluster,
subvolumegroup creation is not allowed again.

Added support multiple subvolumegroups creation by
validating one subvolumegroup creation per cluster.

Fixes: #1123

Signed-off-by: Yug Gupta <ygupta@redhat.com>
This commit is contained in:
Yug Gupta
2020-06-25 19:18:48 +05:30
committed by mergify[bot]
parent efd9b31043
commit c9ad904331
3 changed files with 126 additions and 17 deletions

View File

@ -30,14 +30,12 @@ import (
)
var (
// cephfsInit is used to create "csi" subvolume group for the first time the csi plugin loads.
// Subvolume group create gets called every time the plugin loads, though it doesn't result in error
// its unnecessary
cephfsInit = false
// resizeSupportedList stores the mapping for clusterID and resize is
// supported in the particular cluster
resizeSupportedList = make(map[string]bool)
// clusterAdditionalInfo contains information regarding if resize is
// supported in the particular cluster and subvolumegroup is
// created or not.
// Subvolumegroup creation and volume resize decisions are
// taken through this additional cluster information.
clusterAdditionalInfo = make(map[string]*localClusterState)
inValidCommmand = "no valid command found"
)
@ -85,9 +83,23 @@ func getVolumeRootPathCeph(ctx context.Context, volOptions *volumeOptions, cr *u
return strings.TrimSuffix(string(stdout), "\n"), nil
}
type localClusterState struct {
// set true if cluster supports resize functionality.
resizeSupported bool
// set true once a subvolumegroup is created
// for corresponding cluster.
subVolumeGroupCreated bool
}
func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error {
//TODO: When we support multiple fs, need to hande subvolume group create for all fs's
if !cephfsInit {
// verify if corresponding ClusterID key is present in the map,
// and if not, initialize with default values(false).
if _, keyPresent := clusterAdditionalInfo[volOptions.ClusterID]; !keyPresent {
clusterAdditionalInfo[volOptions.ClusterID] = &localClusterState{}
}
// create subvolumegroup if not already created for the cluster.
if !clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated {
err := execCommandErr(
ctx,
"ceph",
@ -105,7 +117,7 @@ func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede
return err
}
klog.V(4).Infof(util.Log(ctx, "cephfs: created subvolume group %s"), volOptions.SubvolumeGroup)
cephfsInit = true
clusterAdditionalInfo[volOptions.ClusterID].subVolumeGroupCreated = true
}
args := []string{
@ -144,7 +156,17 @@ func createVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede
// subvolume. If the command is not available as a fallback it will use
// CreateVolume to resize the subvolume.
func resizeVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error {
if supported, ok := resizeSupportedList[volOptions.ClusterID]; supported || !ok {
// keyPresent checks whether corresponding clusterID key is present in clusterAdditionalInfo
var keyPresent bool
// verify if corresponding ClusterID key is present in the map,
// and if not, initialize with default values(false).
if _, keyPresent = clusterAdditionalInfo[volOptions.ClusterID]; !keyPresent {
clusterAdditionalInfo[volOptions.ClusterID] = &localClusterState{}
}
// resize subvolume when either it's supported, or when corresponding
// clusterID key was not present.
if clusterAdditionalInfo[volOptions.ClusterID].resizeSupported || !keyPresent {
args := []string{
"fs",
"subvolume",
@ -166,7 +188,7 @@ func resizeVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede
args[:]...)
if err == nil {
resizeSupportedList[volOptions.ClusterID] = true
clusterAdditionalInfo[volOptions.ClusterID].resizeSupported = true
return nil
}
// Incase the error is other than invalid command return error to the caller.
@ -175,7 +197,7 @@ func resizeVolume(ctx context.Context, volOptions *volumeOptions, cr *util.Crede
return err
}
}
resizeSupportedList[volOptions.ClusterID] = false
clusterAdditionalInfo[volOptions.ClusterID].resizeSupported = false
return createVolume(ctx, volOptions, cr, volID, bytesQuota)
}