diff --git a/internal/cephfs/core/metadata.go b/internal/cephfs/core/metadata.go index 9e2b90d5f..1dfd1b5c4 100644 --- a/internal/cephfs/core/metadata.go +++ b/internal/cephfs/core/metadata.go @@ -51,6 +51,32 @@ func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool { return true } +// isSubVolumeGroupCreated returns true if subvolume group is created. +func (s *subVolumeClient) isSubVolumeGroupCreated() bool { + newLocalClusterState(s.clusterID) + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RLock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RUnlock() + + if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil { + return false + } + + return clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.SubvolumeGroup] +} + +// updateSubVolumeGroupCreated updates subvolume group created map. +// If the map is nil, it creates a new map and updates it. +func (s *subVolumeClient) updateSubVolumeGroupCreated(state bool) { + clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Lock() + defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Unlock() + + if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil { + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated = make(map[string]bool) + } + + clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.SubvolumeGroup] = state +} + // setMetadata sets custom metadata on the subvolume in a volume as a // key-value pair. func (s *subVolumeClient) setMetadata(key, value string) error { diff --git a/internal/cephfs/core/volume.go b/internal/cephfs/core/volume.go index c95a7cc86..0754b8a23 100644 --- a/internal/cephfs/core/volume.go +++ b/internal/cephfs/core/volume.go @@ -22,6 +22,7 @@ import ( "fmt" "path" "strings" + "sync" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" @@ -32,12 +33,17 @@ import ( "github.com/ceph/go-ceph/rados" ) -// clusterAdditionalInfo contains information regarding if resize is -// supported in the particular cluster and subvolumegroup is -// created or not. -// Subvolumegroup creation and volume resize decisions are -// taken through this additional cluster information. -var clusterAdditionalInfo = make(map[string]*localClusterState) +var ( + // clusterAdditionalInfo contains information regarding if resize is + // supported in the particular cluster and subvolumegroup is + // created or not. + // Subvolumegroup creation and volume resize decisions are + // taken through this additional cluster information. + clusterAdditionalInfo = make(map[string]*localClusterState) + // clusterAdditionalInfoMutex is used to protect against + // concurrent writes. + clusterAdditionalInfoMutex = sync.Mutex{} +) // Subvolume holds subvolume information. This includes only the needed members // from fsAdmin.SubVolumeInfo. @@ -209,14 +215,18 @@ type localClusterState struct { // set true once a subvolumegroup is created // for corresponding filesystem in a cluster. subVolumeGroupsCreated map[string]bool + // subVolumeGroupsRWMutex is used to protect subVolumeGroupsCreated map + // against concurrent writes while allowing multiple readers. + subVolumeGroupsRWMutex sync.RWMutex } func newLocalClusterState(clusterID string) { // verify if corresponding clusterID key is present in the map, // and if not, initialize with default values(false). + clusterAdditionalInfoMutex.Lock() + defer clusterAdditionalInfoMutex.Unlock() if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent { clusterAdditionalInfo[clusterID] = &localClusterState{} - clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool) } } @@ -232,7 +242,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { } // create subvolumegroup if not already created for the cluster. - if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] { + if !s.isSubVolumeGroupCreated() { opts := fsAdmin.SubVolumeGroupOptions{} err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts) if err != nil { @@ -246,7 +256,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { return err } log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup) - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true + s.updateSubVolumeGroupCreated(true) } opts := fsAdmin.SubVolumeOptions{ @@ -264,7 +274,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error { if errors.Is(err, rados.ErrNotFound) { // Reset the subVolumeGroupsCreated so that we can try again to create the // subvolumegroup in next request if the error is Not Found. - clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false + s.updateSubVolumeGroupCreated(false) } return err