From 113eebfd4c4a278a868cdc5d31c95400567ee6dd Mon Sep 17 00:00:00 2001 From: Rakshith R Date: Tue, 10 Oct 2023 20:32:17 +0530 Subject: [PATCH] cephfs: safeguard subVolumeGroupCreated map from race condition Multiple go-routines may simultaneously create the subVolumeGroupCreated map or write into it for a particular group. This commit safeguards subVolumeGroupCreated map from concurrent creation/writes while allowing for multiple readers. Signed-off-by: Rakshith R (cherry picked from commit d516a1d66d84e9fa62fc3e58d429f89205851741) --- internal/cephfs/core/metadata.go | 26 ++++++++++++++++++++++++++ internal/cephfs/core/volume.go | 10 ++++++---- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/internal/cephfs/core/metadata.go b/internal/cephfs/core/metadata.go index 9e2b90d5f..1dfd1b5c4 100644 --- a/internal/cephfs/core/metadata.go +++ b/internal/cephfs/core/metadata.go @@ -51,6 +51,32 @@ func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool { return true } +// isSubVolumeGroupCreated returns true if subvolume group is created. +func (s *subVolumeClient) isSubVolumeGroupCreated() bool { + newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RUnlock() + + if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil { + return false + } + + return clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.SubvolumeGroup] +} + +// updateSubVolumeGroupCreated updates subvolume group created map. +// If the map is nil, it creates a new map and updates it. +func (s *subVolumeClient) updateSubVolumeGroupCreated(state bool) { + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Unlock() + + if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil { + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated = make(map[string]bool) + } + + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.SubvolumeGroup] = state +} + // setMetadata sets custom metadata on the subvolume in a volume as a // key-value pair. func (s *subVolumeClient) setMetadata(key, value string) error { diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index 54c3dd4eb..0754b8a23 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -215,6 +215,9 @@ type localClusterState struct { // set true once a subvolumegroup is created // for corresponding filesystem in a cluster. subVolumeGroupsCreated map[string]bool + // subVolumeGroupsRWMutex is used to protect subVolumeGroupsCreated map + // against concurrent writes while allowing multiple readers. + subVolumeGroupsRWMutex sync.RWMutex } func newLocalClusterState(clusterID string) { @@ -224,7 +227,6 @@ func newLocalClusterState(clusterID string) { defer clusterAdditionalInfoMutex.Unlock() if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent { clusterAdditionalInfo[clusterID] = &localClusterState{} - clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool) } } @@ -240,7 +242,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { } // create subvolumegroup if not already created for the cluster. - if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] { + if !s.isSubVolumeGroupCreated() { opts := fsAdmin.SubVolumeGroupOptions{} err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts) if err != nil { @@ -254,7 +256,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { return err } log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup) - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true + s.updateSubVolumeGroupCreated(true) } opts := fsAdmin.SubVolumeOptions{ @@ -272,7 +274,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { if errors.Is(err, rados.ErrNotFound) { // Reset the subVolumeGroupsCreated so that we can try again to create the // subvolumegroup in next request if the error is Not Found. - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false + s.updateSubVolumeGroupCreated(false) } return err