mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-17 10:19:30 +00:00
rbd: add capability to automatically enable read affinity
This commit makes use of crush location labels from node labels to supply `crush_location` and `read_from_replica=localize` options during rbd map cmd. Using these options, ceph will be able to redirect reads to the closest OSD, improving performance. Signed-off-by: Rakshith R <rar@redhat.com>
This commit is contained in:
parent
6e6cddb096
commit
95682522ee
@ -79,8 +79,15 @@ func init() {
|
||||
&conf.DomainLabels,
|
||||
"domainlabels",
|
||||
"",
|
||||
"list of kubernetes node labels, that determines the topology"+
|
||||
"list of Kubernetes node labels, that determines the topology"+
|
||||
" domain the node belongs to, separated by ','")
|
||||
flag.BoolVar(&conf.EnableReadAffinity, "enable-read-affinity", false, "enable read affinity")
|
||||
flag.StringVar(
|
||||
&conf.CrushLocationLabels,
|
||||
"crush-location-labels",
|
||||
"",
|
||||
"list of Kubernetes node labels, that determines the"+
|
||||
" CRUSH location the node belongs to, separated by ','")
|
||||
|
||||
// cephfs related flags
|
||||
flag.BoolVar(
|
||||
|
@ -71,11 +71,19 @@ func NewReplicationServer(c *rbd.ControllerServer) *rbd.ReplicationServer {
|
||||
}
|
||||
|
||||
// NewNodeServer initialize a node server for rbd CSI driver.
|
||||
func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*rbd.NodeServer, error) {
|
||||
return &rbd.NodeServer{
|
||||
func NewNodeServer(
|
||||
d *csicommon.CSIDriver,
|
||||
t string,
|
||||
topology map[string]string,
|
||||
crushLocationMap map[string]string,
|
||||
) (*rbd.NodeServer, error) {
|
||||
ns := rbd.NodeServer{
|
||||
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
}, nil
|
||||
}
|
||||
ns.SetReadAffinityMapOptions(crushLocationMap)
|
||||
|
||||
return &ns, nil
|
||||
}
|
||||
|
||||
// Run start a non-blocking grpc controller,node and identityserver for
|
||||
@ -84,9 +92,10 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string)
|
||||
// This also configures and starts a new CSI-Addons service, by calling
|
||||
// setupCSIAddonsServer().
|
||||
func (r *Driver) Run(conf *util.Config) {
|
||||
var err error
|
||||
var topology map[string]string
|
||||
|
||||
var (
|
||||
err error
|
||||
topology, crushLocationMap map[string]string
|
||||
)
|
||||
// update clone soft and hard limit
|
||||
rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth)
|
||||
rbd.SetGlobalInt("rbdSoftMaxCloneDepth", conf.RbdSoftMaxCloneDepth)
|
||||
@ -128,6 +137,13 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
})
|
||||
}
|
||||
|
||||
if conf.EnableReadAffinity {
|
||||
crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID)
|
||||
if err != nil {
|
||||
log.FatalLogMsg(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
// Create GRPC servers
|
||||
r.ids = NewIdentityServer(r.cd)
|
||||
|
||||
@ -136,7 +152,7 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
if err != nil {
|
||||
log.FatalLogMsg(err.Error())
|
||||
}
|
||||
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology)
|
||||
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap)
|
||||
if err != nil {
|
||||
log.FatalLogMsg("failed to start node server, err %v\n", err)
|
||||
}
|
||||
@ -163,17 +179,6 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
"and replaced by CSI-Addons, see https://github.com/ceph/ceph-csi/issues/3314 for more details")
|
||||
r.rs = NewReplicationServer(r.cs)
|
||||
}
|
||||
if !conf.IsControllerServer && !conf.IsNodeServer {
|
||||
topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName)
|
||||
if err != nil {
|
||||
log.FatalLogMsg(err.Error())
|
||||
}
|
||||
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology)
|
||||
if err != nil {
|
||||
log.FatalLogMsg("failed to start node server, err %v\n", err)
|
||||
}
|
||||
r.cs = NewControllerServer(r.cd)
|
||||
}
|
||||
|
||||
s := csicommon.NewNonBlockingGRPCServer()
|
||||
srv := csicommon.Servers{
|
||||
|
@ -45,6 +45,8 @@ type NodeServer struct {
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same volume (as defined by VolumeID) return an Aborted error
|
||||
VolumeLocks *util.VolumeLocks
|
||||
// readAffinityMapOptions contains map options to enable read affinity.
|
||||
readAffinityMapOptions string
|
||||
}
|
||||
|
||||
// stageTransaction struct represents the state a transaction was when it either completed
|
||||
@ -143,7 +145,7 @@ func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *r
|
||||
// this function also receive the credentials and secrets args as it differs in its data.
|
||||
// The credentials are used directly by functions like voljournal.Connect() and other functions
|
||||
// like genVolFromVolumeOptions() make use of secrets.
|
||||
func populateRbdVol(
|
||||
func (ns *NodeServer) populateRbdVol(
|
||||
ctx context.Context,
|
||||
req *csi.NodeStageVolumeRequest,
|
||||
cr *util.Credentials,
|
||||
@ -250,6 +252,7 @@ func populateRbdVol(
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ns.appendReadAffinityMapOptions(rv)
|
||||
|
||||
rv.VolID = volID
|
||||
|
||||
@ -265,6 +268,19 @@ func populateRbdVol(
|
||||
return rv, err
|
||||
}
|
||||
|
||||
// appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions
|
||||
// if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty.
|
||||
func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) {
|
||||
switch {
|
||||
case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter:
|
||||
return
|
||||
case rv.MapOptions != "":
|
||||
rv.MapOptions += "," + ns.readAffinityMapOptions
|
||||
default:
|
||||
rv.MapOptions = ns.readAffinityMapOptions
|
||||
}
|
||||
}
|
||||
|
||||
// NodeStageVolume mounts the volume to a staging path on the node.
|
||||
// Implementation notes:
|
||||
// - stagingTargetPath is the directory passed in the request where the volume needs to be staged
|
||||
@ -318,7 +334,7 @@ func (ns *NodeServer) NodeStageVolume(
|
||||
}
|
||||
|
||||
isStaticVol := parseBoolOption(ctx, req.GetVolumeContext(), staticVol, false)
|
||||
rv, err := populateRbdVol(ctx, req, cr)
|
||||
rv, err := ns.populateRbdVol(ctx, req, cr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -1349,3 +1365,22 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) {
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) {
|
||||
if len(crushLocationMap) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
b.WriteString("read_from_replica=localize,crush_location=")
|
||||
first := true
|
||||
for key, val := range crushLocationMap {
|
||||
if first {
|
||||
b.WriteString(fmt.Sprintf("%s:%s", key, val))
|
||||
first = false
|
||||
} else {
|
||||
b.WriteString(fmt.Sprintf("|%s:%s", key, val))
|
||||
}
|
||||
}
|
||||
ns.readAffinityMapOptions = b.String()
|
||||
}
|
||||
|
@ -21,6 +21,7 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestGetStagingPath(t *testing.T) {
|
||||
@ -105,3 +106,141 @@ func TestParseBoolOption(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
crushLocationmap map[string]string
|
||||
wantAny []string
|
||||
}{
|
||||
{
|
||||
name: "nil crushLocationmap",
|
||||
crushLocationmap: nil,
|
||||
wantAny: []string{""},
|
||||
},
|
||||
{
|
||||
name: "empty crushLocationmap",
|
||||
crushLocationmap: map[string]string{},
|
||||
wantAny: []string{""},
|
||||
},
|
||||
{
|
||||
name: "single entry in crushLocationmap",
|
||||
crushLocationmap: map[string]string{
|
||||
"region": "east",
|
||||
},
|
||||
wantAny: []string{"read_from_replica=localize,crush_location=region:east"},
|
||||
},
|
||||
{
|
||||
name: "multiple entries in crushLocationmap",
|
||||
crushLocationmap: map[string]string{
|
||||
"region": "east",
|
||||
"zone": "east-1",
|
||||
},
|
||||
wantAny: []string{
|
||||
"read_from_replica=localize,crush_location=region:east|zone:east-1",
|
||||
"read_from_replica=localize,crush_location=zone:east-1|region:east",
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
currentTT := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ns := &NodeServer{}
|
||||
ns.SetReadAffinityMapOptions(currentTT.crushLocationmap)
|
||||
assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
type input struct {
|
||||
mapOptions, readAffinityMapOptions, mounter string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args input
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "both empty mapOptions and crushLocationMap",
|
||||
args: input{
|
||||
mapOptions: "",
|
||||
readAffinityMapOptions: "",
|
||||
mounter: rbdDefaultMounter,
|
||||
},
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "empty mapOptions, filled crushLocationMap & default mounter",
|
||||
args: input{
|
||||
mapOptions: "",
|
||||
readAffinityMapOptions: "read_from_replica=localize,crush_location=region:west",
|
||||
mounter: rbdDefaultMounter,
|
||||
},
|
||||
want: "read_from_replica=localize,crush_location=region:west",
|
||||
},
|
||||
{
|
||||
name: "empty mapOptions, filled crushLocationMap & non-default mounter",
|
||||
args: input{
|
||||
mapOptions: "",
|
||||
readAffinityMapOptions: "read_from_replica=localize,crush_location=region:west",
|
||||
mounter: rbdNbdMounter,
|
||||
},
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "filled mapOptions, filled crushLocationMap & default mounter",
|
||||
args: input{
|
||||
mapOptions: "notrim",
|
||||
readAffinityMapOptions: "read_from_replica=localize,crush_location=region:west",
|
||||
mounter: rbdDefaultMounter,
|
||||
},
|
||||
want: "notrim,read_from_replica=localize,crush_location=region:west",
|
||||
},
|
||||
{
|
||||
name: "filled mapOptions, filled crushLocationMap & non-default mounter",
|
||||
args: input{
|
||||
mapOptions: "notrim",
|
||||
readAffinityMapOptions: "read_from_replica=localize,crush_location=region:west",
|
||||
mounter: rbdNbdMounter,
|
||||
},
|
||||
want: "notrim",
|
||||
},
|
||||
{
|
||||
name: "filled mapOptions, empty readAffinityMapOptions & default mounter",
|
||||
args: input{
|
||||
mapOptions: "notrim",
|
||||
readAffinityMapOptions: "",
|
||||
mounter: rbdDefaultMounter,
|
||||
},
|
||||
want: "notrim",
|
||||
},
|
||||
{
|
||||
name: "filled mapOptions, empty readAffinityMapOptions & non-default mounter",
|
||||
args: input{
|
||||
mapOptions: "notrim",
|
||||
readAffinityMapOptions: "",
|
||||
mounter: rbdNbdMounter,
|
||||
},
|
||||
want: "notrim",
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
currentTT := tt
|
||||
t.Run(currentTT.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
rv := &rbdVolume{
|
||||
MapOptions: currentTT.args.mapOptions,
|
||||
Mounter: currentTT.args.mounter,
|
||||
}
|
||||
ns := &NodeServer{
|
||||
readAffinityMapOptions: currentTT.args.readAffinityMapOptions,
|
||||
}
|
||||
ns.appendReadAffinityMapOptions(rv)
|
||||
assert.Equal(t, currentTT.want, rv.MapOptions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
77
internal/util/crushlocation.go
Normal file
77
internal/util/crushlocation.go
Normal file
@ -0,0 +1,77 @@
|
||||
/*
|
||||
Copyright 2023 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util/log"
|
||||
)
|
||||
|
||||
// GetCrushLocationMap returns the crush location map, determined from
|
||||
// the crush location labels and their values from the CO system.
|
||||
// Expects crushLocationLabels in arg to be in the format "[prefix/]<name>,[prefix/]<name>,...",.
|
||||
// Returns map of crush location types with its array of associated values.
|
||||
func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) {
|
||||
if crushLocationLabels == "" {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
nodeLabels, err := k8sGetNodeLabels(nodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return getCrushLocationMap(crushLocationLabels, nodeLabels), nil
|
||||
}
|
||||
|
||||
// getCrushLocationMap returns the crush location map, determined from
|
||||
// the crush location labels and node labels.
|
||||
func getCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) map[string]string {
|
||||
labelsToRead := strings.Split(crushLocationLabels, labelSeparator)
|
||||
log.DefaultLog("CRUSH location labels passed for processing: %+v", labelsToRead)
|
||||
|
||||
labelsIn := make(map[string]bool, len(labelsToRead))
|
||||
for _, label := range labelsToRead {
|
||||
labelsIn[label] = true
|
||||
}
|
||||
|
||||
// Determine values for requested labels from node labels
|
||||
crushLocationMap := make(map[string]string, len(labelsIn))
|
||||
for key, value := range nodeLabels {
|
||||
if _, ok := labelsIn[key]; !ok {
|
||||
continue
|
||||
}
|
||||
// label found split name component and store value
|
||||
nameIdx := strings.IndexRune(key, keySeparator)
|
||||
crushLocationType := strings.TrimSpace(key[nameIdx+1:])
|
||||
if crushLocationType == "hostname" {
|
||||
// ceph defaults to "host" while Kubernetes uses "hostname" as key.
|
||||
crushLocationType = "host"
|
||||
}
|
||||
// replace "." with "-" to satisfy ceph crush map.
|
||||
value = strings.Replace(strings.TrimSpace(value), ".", "-", -1)
|
||||
crushLocationMap[crushLocationType] = value
|
||||
}
|
||||
|
||||
if len(crushLocationMap) == 0 {
|
||||
return nil
|
||||
}
|
||||
log.DefaultLog("list of CRUSH location processed: %+v", crushLocationMap)
|
||||
|
||||
return crushLocationMap
|
||||
}
|
113
internal/util/crushlocation_test.go
Normal file
113
internal/util/crushlocation_test.go
Normal file
@ -0,0 +1,113 @@
|
||||
/*
|
||||
Copyright 2023 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func Test_getCrushLocationMap(t *testing.T) {
|
||||
t.Parallel()
|
||||
type input struct {
|
||||
crushLocationLabels string
|
||||
nodeLabels map[string]string
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args input
|
||||
want map[string]string
|
||||
}{
|
||||
{
|
||||
name: "empty crushLocationLabels",
|
||||
args: input{
|
||||
crushLocationLabels: "",
|
||||
nodeLabels: map[string]string{},
|
||||
},
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "empty nodeLabels",
|
||||
args: input{
|
||||
crushLocationLabels: "topology.io/zone,topology.io/rack",
|
||||
nodeLabels: map[string]string{},
|
||||
},
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "matching crushlocation and node labels",
|
||||
args: input{
|
||||
crushLocationLabels: "topology.io/zone,topology.io/rack",
|
||||
nodeLabels: map[string]string{
|
||||
"topology.io/zone": "zone1",
|
||||
},
|
||||
},
|
||||
want: map[string]string{"zone": "zone1"},
|
||||
},
|
||||
{
|
||||
name: "multuple matching crushlocation and node labels",
|
||||
args: input{
|
||||
crushLocationLabels: "topology.io/zone,topology.io/rack",
|
||||
nodeLabels: map[string]string{
|
||||
"topology.io/zone": "zone1",
|
||||
"topology.io/rack": "rack1",
|
||||
},
|
||||
},
|
||||
want: map[string]string{"zone": "zone1", "rack": "rack1"},
|
||||
},
|
||||
{
|
||||
name: "no match between crushlocation and node labels",
|
||||
args: input{
|
||||
crushLocationLabels: "topology.io/zone,topology.io/rack",
|
||||
nodeLabels: map[string]string{
|
||||
"topology.io/region": "region1",
|
||||
},
|
||||
},
|
||||
want: nil,
|
||||
},
|
||||
{
|
||||
name: "check crushlocation value replacement to satisfy ceph requirement",
|
||||
args: input{
|
||||
crushLocationLabels: "topology.io/zone,topology.io/rack",
|
||||
nodeLabels: map[string]string{
|
||||
"topology.io/zone": "south.east.1",
|
||||
},
|
||||
},
|
||||
want: map[string]string{"zone": "south-east-1"},
|
||||
},
|
||||
{
|
||||
name: "hostname key should be replaced with host",
|
||||
args: input{
|
||||
crushLocationLabels: "topology.io/zone,topology.io/hostname",
|
||||
nodeLabels: map[string]string{
|
||||
"topology.io/hostname": "worker-1",
|
||||
},
|
||||
},
|
||||
want: map[string]string{"host": "worker-1"},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
currentTT := tt
|
||||
t.Run(currentTT.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
assert.Equal(t,
|
||||
currentTT.want,
|
||||
getCrushLocationMap(currentTT.args.crushLocationLabels, currentTT.args.nodeLabels))
|
||||
})
|
||||
}
|
||||
}
|
@ -147,6 +147,10 @@ type Config struct {
|
||||
|
||||
// Cluster name
|
||||
ClusterName string
|
||||
|
||||
// Read affinity related options
|
||||
EnableReadAffinity bool // enable OSD read affinity.
|
||||
CrushLocationLabels string // list of CRUSH location labels to read from the node.
|
||||
}
|
||||
|
||||
// ValidateDriverName validates the driver name.
|
||||
|
Loading…
Reference in New Issue
Block a user