mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 02:33:34 +00:00
deploy: support for read affinity options per cluster
Implemented the capability to include read affinity options for individual clusters within the ceph-csi-config ConfigMap. This allows users to configure the crush location for each cluster separately. The read affinity options specified in the ConfigMap will supersede those provided via command line arguments. Signed-off-by: Praveen M <m.praveen@ibm.com>
This commit is contained in:
@ -26,6 +26,7 @@ import (
|
||||
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
|
||||
"github.com/ceph/ceph-csi/internal/rbd"
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
"github.com/ceph/ceph-csi/internal/util/k8s"
|
||||
"github.com/ceph/ceph-csi/internal/util/log"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
@ -68,14 +69,14 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer {
|
||||
func NewNodeServer(
|
||||
d *csicommon.CSIDriver,
|
||||
t string,
|
||||
topology map[string]string,
|
||||
crushLocationMap map[string]string,
|
||||
nodeLabels, topology, crushLocationMap map[string]string,
|
||||
) (*rbd.NodeServer, error) {
|
||||
ns := rbd.NodeServer{
|
||||
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
NodeLabels: nodeLabels,
|
||||
CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap),
|
||||
}
|
||||
ns.SetReadAffinityMapOptions(crushLocationMap)
|
||||
|
||||
return &ns, nil
|
||||
}
|
||||
@ -87,8 +88,8 @@ func NewNodeServer(
|
||||
// setupCSIAddonsServer().
|
||||
func (r *Driver) Run(conf *util.Config) {
|
||||
var (
|
||||
err error
|
||||
topology, crushLocationMap map[string]string
|
||||
err error
|
||||
nodeLabels, topology, crushLocationMap map[string]string
|
||||
)
|
||||
// update clone soft and hard limit
|
||||
rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth)
|
||||
@ -125,13 +126,17 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
})
|
||||
}
|
||||
|
||||
if conf.EnableReadAffinity {
|
||||
crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID)
|
||||
if k8s.RunsOnKubernetes() {
|
||||
nodeLabels, err = k8s.GetNodeLabels(conf.NodeID)
|
||||
if err != nil {
|
||||
log.FatalLogMsg(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if conf.EnableReadAffinity {
|
||||
crushLocationMap = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels)
|
||||
}
|
||||
|
||||
// Create GRPC servers
|
||||
r.ids = NewIdentityServer(r.cd)
|
||||
|
||||
@ -140,7 +145,7 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
if err != nil {
|
||||
log.FatalLogMsg(err.Error())
|
||||
}
|
||||
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap)
|
||||
r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap)
|
||||
if err != nil {
|
||||
log.FatalLogMsg("failed to start node server, err %v\n", err)
|
||||
}
|
||||
|
@ -45,8 +45,10 @@ type NodeServer struct {
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same volume (as defined by VolumeID) return an Aborted error
|
||||
VolumeLocks *util.VolumeLocks
|
||||
// readAffinityMapOptions contains map options to enable read affinity.
|
||||
readAffinityMapOptions string
|
||||
// NodeLabels stores the node labels
|
||||
NodeLabels map[string]string
|
||||
// CLIReadAffinityMapOptions contains map options passed through command line to enable read affinity.
|
||||
CLIReadAffinityMapOptions string
|
||||
}
|
||||
|
||||
// stageTransaction struct represents the state a transaction was when it either completed
|
||||
@ -258,11 +260,10 @@ func (ns *NodeServer) populateRbdVol(
|
||||
rv.Mounter = rbdNbdMounter
|
||||
}
|
||||
|
||||
err = getMapOptions(req, rv)
|
||||
err = ns.getMapOptions(req, rv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ns.appendReadAffinityMapOptions(rv)
|
||||
|
||||
rv.VolID = volID
|
||||
|
||||
@ -280,14 +281,14 @@ func (ns *NodeServer) populateRbdVol(
|
||||
|
||||
// appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions
|
||||
// if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty.
|
||||
func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) {
|
||||
func (rv *rbdVolume) appendReadAffinityMapOptions(readAffinityMapOptions string) {
|
||||
switch {
|
||||
case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter:
|
||||
case readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter:
|
||||
return
|
||||
case rv.MapOptions != "":
|
||||
rv.MapOptions += "," + ns.readAffinityMapOptions
|
||||
rv.MapOptions += "," + readAffinityMapOptions
|
||||
default:
|
||||
rv.MapOptions = ns.readAffinityMapOptions
|
||||
rv.MapOptions = readAffinityMapOptions
|
||||
}
|
||||
}
|
||||
|
||||
@ -1378,22 +1379,3 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) {
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) {
|
||||
if len(crushLocationMap) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
b.WriteString("read_from_replica=localize,crush_location=")
|
||||
first := true
|
||||
for key, val := range crushLocationMap {
|
||||
if first {
|
||||
b.WriteString(fmt.Sprintf("%s:%s", key, val))
|
||||
first = false
|
||||
} else {
|
||||
b.WriteString(fmt.Sprintf("|%s:%s", key, val))
|
||||
}
|
||||
}
|
||||
ns.readAffinityMapOptions = b.String()
|
||||
}
|
||||
|
@ -18,8 +18,12 @@ package rbd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -107,53 +111,6 @@ func TestParseBoolOption(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
crushLocationmap map[string]string
|
||||
wantAny []string
|
||||
}{
|
||||
{
|
||||
name: "nil crushLocationmap",
|
||||
crushLocationmap: nil,
|
||||
wantAny: []string{""},
|
||||
},
|
||||
{
|
||||
name: "empty crushLocationmap",
|
||||
crushLocationmap: map[string]string{},
|
||||
wantAny: []string{""},
|
||||
},
|
||||
{
|
||||
name: "single entry in crushLocationmap",
|
||||
crushLocationmap: map[string]string{
|
||||
"region": "east",
|
||||
},
|
||||
wantAny: []string{"read_from_replica=localize,crush_location=region:east"},
|
||||
},
|
||||
{
|
||||
name: "multiple entries in crushLocationmap",
|
||||
crushLocationmap: map[string]string{
|
||||
"region": "east",
|
||||
"zone": "east-1",
|
||||
},
|
||||
wantAny: []string{
|
||||
"read_from_replica=localize,crush_location=region:east|zone:east-1",
|
||||
"read_from_replica=localize,crush_location=zone:east-1|region:east",
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
currentTT := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ns := &NodeServer{}
|
||||
ns.SetReadAffinityMapOptions(currentTT.crushLocationmap)
|
||||
assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
type input struct {
|
||||
@ -236,11 +193,128 @@ func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) {
|
||||
MapOptions: currentTT.args.mapOptions,
|
||||
Mounter: currentTT.args.mounter,
|
||||
}
|
||||
ns := &NodeServer{
|
||||
readAffinityMapOptions: currentTT.args.readAffinityMapOptions,
|
||||
}
|
||||
ns.appendReadAffinityMapOptions(rv)
|
||||
rv.appendReadAffinityMapOptions(currentTT.args.readAffinityMapOptions)
|
||||
assert.Equal(t, currentTT.want, rv.MapOptions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadAffinity_GetReadAffinityMapOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
nodeLabels := map[string]string{
|
||||
"topology.kubernetes.io/zone": "east-1",
|
||||
"topology.kubernetes.io/region": "east",
|
||||
}
|
||||
|
||||
csiConfig := []util.ClusterInfo{
|
||||
{
|
||||
ClusterID: "cluster-1",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: true,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.kubernetes.io/region",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-2",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: false,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.kubernetes.io/region",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-3",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: true,
|
||||
CrushLocationLabels: []string{},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-4",
|
||||
},
|
||||
}
|
||||
|
||||
csiConfigFileContent, err := json.Marshal(csiConfig)
|
||||
if err != nil {
|
||||
t.Errorf("failed to marshal csi config info %v", err)
|
||||
}
|
||||
tmpConfPath := util.CsiConfigFile
|
||||
err = os.Mkdir("/etc/ceph-csi-config", 0o600)
|
||||
if err != nil {
|
||||
t.Errorf("failed to create directory %s: %v", "/etc/ceph-csi-config", err)
|
||||
}
|
||||
err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600)
|
||||
if err != nil {
|
||||
t.Errorf("failed to write %s file content: %v", util.CsiConfigFile, err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
clusterID string
|
||||
CLICrushLocationLabels string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "Enabled in cluster-1 and Enabled in CLI",
|
||||
clusterID: "cluster-1",
|
||||
CLICrushLocationLabels: "topology.kubernetes.io/region",
|
||||
want: "read_from_replica=localize,crush_location=region:east",
|
||||
},
|
||||
{
|
||||
name: "Disabled in cluster-2 and Enabled in CLI",
|
||||
clusterID: "cluster-2",
|
||||
CLICrushLocationLabels: "topology.kubernetes.io/zone",
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "Enabled in cluster-3 with empty crush labels and Enabled in CLI",
|
||||
clusterID: "cluster-3",
|
||||
CLICrushLocationLabels: "topology.kubernetes.io/zone",
|
||||
want: "read_from_replica=localize,crush_location=zone:east-1",
|
||||
},
|
||||
{
|
||||
name: "Enabled in cluster-3 with empty crush labels and Disabled in CLI",
|
||||
clusterID: "cluster-3",
|
||||
CLICrushLocationLabels: "",
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "Absent in cluster-4 and Enabled in CLI",
|
||||
clusterID: "cluster-4",
|
||||
CLICrushLocationLabels: "topology.kubernetes.io/zone",
|
||||
want: "",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tc := tt
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
crushLocationMap := util.GetCrushLocationMap(tc.CLICrushLocationLabels, nodeLabels)
|
||||
ns := &NodeServer{
|
||||
CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap),
|
||||
}
|
||||
readAffinityMapOptions, err := util.GetReadAffinityMapOptions(
|
||||
tc.clusterID, ns.CLIReadAffinityMapOptions, nodeLabels,
|
||||
)
|
||||
if err != nil {
|
||||
assert.Fail(t, err.Error())
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.want, readAffinityMapOptions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -295,7 +295,7 @@ func parseMapOptions(mapOptions string) (string, string, error) {
|
||||
|
||||
// getMapOptions is a wrapper func, calls parse map/unmap funcs and feeds the
|
||||
// rbdVolume object.
|
||||
func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error {
|
||||
func (ns *NodeServer) getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error {
|
||||
krbdMapOptions, nbdMapOptions, err := parseMapOptions(req.GetVolumeContext()["mapOptions"])
|
||||
if err != nil {
|
||||
return err
|
||||
@ -312,6 +312,14 @@ func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error {
|
||||
rv.UnmapOptions = nbdUnmapOptions
|
||||
}
|
||||
|
||||
readAffinityMapOptions, err := util.GetReadAffinityMapOptions(
|
||||
rv.ClusterID, ns.CLIReadAffinityMapOptions, ns.NodeLabels,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rv.appendReadAffinityMapOptions(readAffinityMapOptions)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -23,20 +23,15 @@ import (
|
||||
)
|
||||
|
||||
// GetCrushLocationMap returns the crush location map, determined from
|
||||
// the crush location labels and their values from the CO system.
|
||||
// the crush location labels and their values from the node labels passed in arg.
|
||||
// Expects crushLocationLabels in arg to be in the format "[prefix/]<name>,[prefix/]<name>,...",.
|
||||
// Returns map of crush location types with its array of associated values.
|
||||
func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) {
|
||||
func GetCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) map[string]string {
|
||||
if crushLocationLabels == "" {
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
nodeLabels, err := k8sGetNodeLabels(nodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return getCrushLocationMap(crushLocationLabels, nodeLabels), nil
|
||||
return getCrushLocationMap(crushLocationLabels, nodeLabels)
|
||||
}
|
||||
|
||||
// getCrushLocationMap returns the crush location map, determined from
|
||||
|
@ -62,6 +62,11 @@ type ClusterInfo struct {
|
||||
// symlink filepath for the network namespace where we need to execute commands.
|
||||
NetNamespaceFilePath string `json:"netNamespaceFilePath"`
|
||||
} `json:"nfs"`
|
||||
// Read affinity map options
|
||||
ReadAffinity struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
} `json:"readAffinity"`
|
||||
}
|
||||
|
||||
// Expected JSON structure in the passed in config file is,
|
||||
@ -203,3 +208,21 @@ func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error)
|
||||
|
||||
return cluster.NFS.NetNamespaceFilePath, nil
|
||||
}
|
||||
|
||||
// GetCrushLocationLabels returns the `readAffinity.enabled` and `readAffinity.crushLocationLabels`
|
||||
// values from the CSI config for the given `clusterID`. If `readAffinity.enabled` is set to true
|
||||
// it returns `true` and `crushLocationLabels`, else returns `false` and an empty string.
|
||||
func GetCrushLocationLabels(pathToConfig, clusterID string) (bool, string, error) {
|
||||
cluster, err := readClusterInfo(pathToConfig, clusterID)
|
||||
if err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
|
||||
if !cluster.ReadAffinity.Enabled {
|
||||
return false, "", nil
|
||||
}
|
||||
|
||||
crushLocationLabels := strings.Join(cluster.ReadAffinity.CrushLocationLabels, ",")
|
||||
|
||||
return true, crushLocationLabels, nil
|
||||
}
|
||||
|
@ -365,3 +365,116 @@ func TestGetNFSNetNamespaceFilePath(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetReadAffinityOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
clusterID string
|
||||
want struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}
|
||||
}{
|
||||
{
|
||||
name: "ReadAffinity enabled set to true for cluster-1",
|
||||
clusterID: "cluster-1",
|
||||
want: struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}{true, "topology.kubernetes.io/region,topology.kubernetes.io/zone,topology.io/rack"},
|
||||
},
|
||||
{
|
||||
name: "ReadAffinity enabled set to true for cluster-2",
|
||||
clusterID: "cluster-2",
|
||||
want: struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}{true, "topology.kubernetes.io/region"},
|
||||
},
|
||||
{
|
||||
name: "ReadAffinity enabled set to false for cluster-3",
|
||||
clusterID: "cluster-3",
|
||||
want: struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}{false, ""},
|
||||
},
|
||||
{
|
||||
name: "ReadAffinity option not set in cluster-4",
|
||||
clusterID: "cluster-4",
|
||||
want: struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}{false, ""},
|
||||
},
|
||||
}
|
||||
|
||||
csiConfig := []ClusterInfo{
|
||||
{
|
||||
ClusterID: "cluster-1",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: true,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.kubernetes.io/region",
|
||||
"topology.kubernetes.io/zone",
|
||||
"topology.io/rack",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-2",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: true,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.kubernetes.io/region",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-3",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: false,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.io/rack",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-4",
|
||||
},
|
||||
}
|
||||
csiConfigFileContent, err := json.Marshal(csiConfig)
|
||||
if err != nil {
|
||||
t.Errorf("failed to marshal csi config info %v", err)
|
||||
}
|
||||
tmpConfPath := t.TempDir() + "/ceph-csi.json"
|
||||
err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600)
|
||||
if err != nil {
|
||||
t.Errorf("failed to write %s file content: %v", CsiConfigFile, err)
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tc := tt
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
enabled, labels, err := GetCrushLocationLabels(tmpConfPath, tc.clusterID)
|
||||
if err != nil {
|
||||
t.Errorf("GetCrushLocationLabels() error = %v", err)
|
||||
|
||||
return
|
||||
}
|
||||
if enabled != tc.want.enabled || labels != tc.want.labels {
|
||||
t.Errorf("GetCrushLocationLabels() = {%v %v} want %v", enabled, labels, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user