cephfs: safeguard localClusterState struct from race conditions

Multiple go-routines may simultaneously check for a clusterID's
presence in clusterAdditionalInfo and create an entry if it is
absent. This set of operation needs to be serialized.

Therefore, this commit safeguards clusterAdditionalInfo map
from concurrent writes with a mutex to prevent the above problem.

Signed-off-by: Rakshith R <rar@redhat.com>
This commit is contained in:
Rakshith R 2023-10-09 11:38:04 +05:30 committed by mergify[bot]
parent 5abb0dae8d
commit 82f1323af4

View File

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"path" "path"
"strings" "strings"
"sync"
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors" cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
@ -32,12 +33,17 @@ import (
"github.com/ceph/go-ceph/rados" "github.com/ceph/go-ceph/rados"
) )
var (
// clusterAdditionalInfo contains information regarding if resize is // clusterAdditionalInfo contains information regarding if resize is
// supported in the particular cluster and subvolumegroup is // supported in the particular cluster and subvolumegroup is
// created or not. // created or not.
// Subvolumegroup creation and volume resize decisions are // Subvolumegroup creation and volume resize decisions are
// taken through this additional cluster information. // taken through this additional cluster information.
var clusterAdditionalInfo = make(map[string]*localClusterState) clusterAdditionalInfo = make(map[string]*localClusterState)
// clusterAdditionalInfoMutex is used to protect against
// concurrent writes.
clusterAdditionalInfoMutex = sync.Mutex{}
)
// Subvolume holds subvolume information. This includes only the needed members // Subvolume holds subvolume information. This includes only the needed members
// from fsAdmin.SubVolumeInfo. // from fsAdmin.SubVolumeInfo.
@ -214,6 +220,8 @@ type localClusterState struct {
func newLocalClusterState(clusterID string) { func newLocalClusterState(clusterID string) {
// verify if corresponding clusterID key is present in the map, // verify if corresponding clusterID key is present in the map,
// and if not, initialize with default values(false). // and if not, initialize with default values(false).
clusterAdditionalInfoMutex.Lock()
defer clusterAdditionalInfoMutex.Unlock()
if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent { if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent {
clusterAdditionalInfo[clusterID] = &localClusterState{} clusterAdditionalInfo[clusterID] = &localClusterState{}
clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool) clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool)