mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-18 02:39:30 +00:00
Merge pull request #191 from Rakshith-R/BZ-2242121
BUG 2242121: cephfs: safeguard localClusterState struct from race conditions
This commit is contained in:
commit
88078ee510
@ -51,6 +51,32 @@ func (s *subVolumeClient) isUnsupportedSubVolMetadata(err error) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isSubVolumeGroupCreated returns true if subvolume group is created.
|
||||||
|
func (s *subVolumeClient) isSubVolumeGroupCreated() bool {
|
||||||
|
newLocalClusterState(s.clusterID)
|
||||||
|
clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RLock()
|
||||||
|
defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.RUnlock()
|
||||||
|
|
||||||
|
if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.SubvolumeGroup]
|
||||||
|
}
|
||||||
|
|
||||||
|
// updateSubVolumeGroupCreated updates subvolume group created map.
|
||||||
|
// If the map is nil, it creates a new map and updates it.
|
||||||
|
func (s *subVolumeClient) updateSubVolumeGroupCreated(state bool) {
|
||||||
|
clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Lock()
|
||||||
|
defer clusterAdditionalInfo[s.clusterID].subVolumeGroupsRWMutex.Unlock()
|
||||||
|
|
||||||
|
if clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated == nil {
|
||||||
|
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated = make(map[string]bool)
|
||||||
|
}
|
||||||
|
|
||||||
|
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.SubvolumeGroup] = state
|
||||||
|
}
|
||||||
|
|
||||||
// setMetadata sets custom metadata on the subvolume in a volume as a
|
// setMetadata sets custom metadata on the subvolume in a volume as a
|
||||||
// key-value pair.
|
// key-value pair.
|
||||||
func (s *subVolumeClient) setMetadata(key, value string) error {
|
func (s *subVolumeClient) setMetadata(key, value string) error {
|
||||||
|
@ -22,6 +22,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
|
cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
|
||||||
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
|
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
|
||||||
@ -32,12 +33,17 @@ import (
|
|||||||
"github.com/ceph/go-ceph/rados"
|
"github.com/ceph/go-ceph/rados"
|
||||||
)
|
)
|
||||||
|
|
||||||
// clusterAdditionalInfo contains information regarding if resize is
|
var (
|
||||||
// supported in the particular cluster and subvolumegroup is
|
// clusterAdditionalInfo contains information regarding if resize is
|
||||||
// created or not.
|
// supported in the particular cluster and subvolumegroup is
|
||||||
// Subvolumegroup creation and volume resize decisions are
|
// created or not.
|
||||||
// taken through this additional cluster information.
|
// Subvolumegroup creation and volume resize decisions are
|
||||||
var clusterAdditionalInfo = make(map[string]*localClusterState)
|
// taken through this additional cluster information.
|
||||||
|
clusterAdditionalInfo = make(map[string]*localClusterState)
|
||||||
|
// clusterAdditionalInfoMutex is used to protect against
|
||||||
|
// concurrent writes.
|
||||||
|
clusterAdditionalInfoMutex = sync.Mutex{}
|
||||||
|
)
|
||||||
|
|
||||||
// Subvolume holds subvolume information. This includes only the needed members
|
// Subvolume holds subvolume information. This includes only the needed members
|
||||||
// from fsAdmin.SubVolumeInfo.
|
// from fsAdmin.SubVolumeInfo.
|
||||||
@ -209,14 +215,18 @@ type localClusterState struct {
|
|||||||
// set true once a subvolumegroup is created
|
// set true once a subvolumegroup is created
|
||||||
// for corresponding filesystem in a cluster.
|
// for corresponding filesystem in a cluster.
|
||||||
subVolumeGroupsCreated map[string]bool
|
subVolumeGroupsCreated map[string]bool
|
||||||
|
// subVolumeGroupsRWMutex is used to protect subVolumeGroupsCreated map
|
||||||
|
// against concurrent writes while allowing multiple readers.
|
||||||
|
subVolumeGroupsRWMutex sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func newLocalClusterState(clusterID string) {
|
func newLocalClusterState(clusterID string) {
|
||||||
// verify if corresponding clusterID key is present in the map,
|
// verify if corresponding clusterID key is present in the map,
|
||||||
// and if not, initialize with default values(false).
|
// and if not, initialize with default values(false).
|
||||||
|
clusterAdditionalInfoMutex.Lock()
|
||||||
|
defer clusterAdditionalInfoMutex.Unlock()
|
||||||
if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent {
|
if _, keyPresent := clusterAdditionalInfo[clusterID]; !keyPresent {
|
||||||
clusterAdditionalInfo[clusterID] = &localClusterState{}
|
clusterAdditionalInfo[clusterID] = &localClusterState{}
|
||||||
clusterAdditionalInfo[clusterID].subVolumeGroupsCreated = make(map[string]bool)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -232,7 +242,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create subvolumegroup if not already created for the cluster.
|
// create subvolumegroup if not already created for the cluster.
|
||||||
if !clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] {
|
if !s.isSubVolumeGroupCreated() {
|
||||||
opts := fsAdmin.SubVolumeGroupOptions{}
|
opts := fsAdmin.SubVolumeGroupOptions{}
|
||||||
err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts)
|
err = ca.CreateSubVolumeGroup(s.FsName, s.SubvolumeGroup, &opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -246,7 +256,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup)
|
log.DebugLog(ctx, "cephfs: created subvolume group %s", s.SubvolumeGroup)
|
||||||
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = true
|
s.updateSubVolumeGroupCreated(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := fsAdmin.SubVolumeOptions{
|
opts := fsAdmin.SubVolumeOptions{
|
||||||
@ -264,7 +274,7 @@ func (s *subVolumeClient) CreateVolume(ctx context.Context) error {
|
|||||||
if errors.Is(err, rados.ErrNotFound) {
|
if errors.Is(err, rados.ErrNotFound) {
|
||||||
// Reset the subVolumeGroupsCreated so that we can try again to create the
|
// Reset the subVolumeGroupsCreated so that we can try again to create the
|
||||||
// subvolumegroup in next request if the error is Not Found.
|
// subvolumegroup in next request if the error is Not Found.
|
||||||
clusterAdditionalInfo[s.clusterID].subVolumeGroupsCreated[s.FsName] = false
|
s.updateSubVolumeGroupCreated(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
return err
|
return err
|
||||||
|
Loading…
Reference in New Issue
Block a user