cephfs: fix subvolumegroup creation for multiple fs

In a cluster we can have multiple filesystem
for that we need to have a map of
subvolumegroups to check filesystem is created
nor not.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2022-09-07 11:46:22 +05:30 committed by mergify[bot]
parent 9d46478794
commit e56621cd66
3 changed files with 19 additions and 22 deletions

View File

@ -33,9 +33,7 @@ const (
var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations are not supported") var ErrSubVolMetadataNotSupported = errors.New("subvolume metadata operations are not supported")
func (s *subVolumeClient) supportsSubVolMetadata() bool { func (s *subVolumeClient) supportsSubVolMetadata() bool {
if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent { newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID] = &localClusterState{}
}
return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported return clusterAdditionalInfo[s.clusterID].subVolMetadataState != unsupported
} }

View File

@ -29,9 +29,7 @@ import (
var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata operations are not supported") var ErrSubVolSnapMetadataNotSupported = errors.New("subvolume snapshot metadata operations are not supported")
func (s *snapshotClient) supportsSubVolSnapMetadata() bool { func (s *snapshotClient) supportsSubVolSnapMetadata() bool {
if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent { newLocalClusterState(s.clusterID)
clusterAdditionalInfo[s.clusterID] = &localClusterState{}
}
return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported return clusterAdditionalInfo[s.clusterID].subVolSnapshotMetadataState != unsupported
} }

View File

@ -202,18 +202,25 @@ type localClusterState struct {
resizeState operationState resizeState operationState
subVolMetadataState operationState subVolMetadataState operationState
subVolSnapshotMetadataState operationState subVolSnapshotMetadataState operationState
// A cluster can have multiple filesystem for that we need to have a map of
// subvolumegroups to check filesystem is created nor not.
// set true once a subvolumegroup is created // set true once a subvolumegroup is created
// for corresponding cluster. // for corresponding filesystem in a cluster.
subVolumeGroupCreated bool subVolumeGroupsCreated map[string]bool
}
func newLocalClusterState(clusterID string) {
// verify if corresponding clusterID key is present in the map,
// and if not, initialize with default values(false).
if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent {
clusterAdditionalInfo[clusterID] = &localClusterState{}
clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool)
}
} }
// CreateVolume creates a subvolume. // CreateVolume creates a subvolume.
func (s *subVolumeClient) CreateVolume(ctx context.Context) error { func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
// verify if corresponding clusterID key is present in the map, newLocalClusterState(s.clusterID)
// and if not, initialize with default values(false).
if _, keyPresent := clusterAdditionalInfo[s.clusterID]; !keyPresent {
clusterAdditionalInfo[s.clusterID] = &localClusterState{}
}
ca, err := s.conn.GetFSAdmin() ca, err := s.conn.GetFSAdmin()
if err != nil { if err != nil {
@ -223,7 +230,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
} }
// create subvolumegroup if not already created for the cluster. // create subvolumegroup if not already created for the cluster.
if !clusterAdditionalInfo[s.clusterID].subVolumeGroupCreated { if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] {
opts := fsAdmin.SubVolumeGroupOptions{} opts := fsAdmin.SubVolumeGroupOptions{}
err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts) err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts)
if err != nil { if err != nil {
@ -237,7 +244,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
return err return err
} }
log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup) log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup)
clusterAdditionalInfo[s.clusterID].subVolumeGroupCreated = true clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true
} }
opts := fsAdmin.SubVolumeOptions{ opts := fsAdmin.SubVolumeOptions{
@ -279,13 +286,7 @@ func (s *subVolumeClient) ExpandVolume(ctx context.Context, bytesQuota int64) er
// subvolume. If the command is not available as a fallback it will use // subvolume. If the command is not available as a fallback it will use
// CreateVolume to resize the subvolume. // CreateVolume to resize the subvolume.
func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error { func (s *subVolumeClient) ResizeVolume(ctx context.Context, bytesQuota int64) error {
// keyPresent checks whether corresponding clusterID key is present in clusterAdditionalInfo newLocalClusterState(s.clusterID)
var keyPresent bool
// verify if corresponding clusterID key is present in the map,
// and if not, initialize with default values(false).
if _, keyPresent = clusterAdditionalInfo[s.clusterID]; !keyPresent {
clusterAdditionalInfo[s.clusterID] = &localClusterState{}
}
// resize subvolume when either it's supported, or when corresponding // resize subvolume when either it's supported, or when corresponding
// clusterID key was not present. // clusterID key was not present.
if clusterAdditionalInfo[s.clusterID].resizeState == unknown || if clusterAdditionalInfo[s.clusterID].resizeState == unknown ||