diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index c0a657a0c..12a09259a 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -79,8 +79,15 @@ func init() { &conf.DomainLabels, "domainlabels", "", - "list of kubernetes node labels, that determines the topology"+ + "list of Kubernetes node labels, that determines the topology"+ " domain the node belongs to, separated by ','") + flag.BoolVar(&conf.EnableReadAffinity, "enable-read-affinity", false, "enable read affinity") + flag.StringVar( + &conf.CrushLocationLabels, + "crush-location-labels", + "", + "list of Kubernetes node labels, that determines the"+ + " CRUSH location the node belongs to, separated by ','") // cephfs related flags flag.BoolVar( diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index 76c2dcc84..5ae7c45ac 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -71,11 +71,19 @@ func NewReplicationServer(c *rbd.ControllerServer) *rbd.ReplicationServer { } // NewNodeServer initialize a node server for rbd CSI driver. -func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*rbd.NodeServer, error) { - return &rbd.NodeServer{ +func NewNodeServer( + d *csicommon.CSIDriver, + t string, + topology map[string]string, + crushLocationMap map[string]string, +) (*rbd.NodeServer, error) { + ns := rbd.NodeServer{ DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), VolumeLocks: util.NewVolumeLocks(), - }, nil + } + ns.SetReadAffinityMapOptions(crushLocationMap) + + return &ns, nil } // Run start a non-blocking grpc controller,node and identityserver for @@ -84,9 +92,10 @@ func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) // This also configures and starts a new CSI-Addons service, by calling // setupCSIAddonsServer(). func (r *Driver) Run(conf *util.Config) { - var err error - var topology map[string]string - + var ( + err error + topology, crushLocationMap map[string]string + ) // update clone soft and hard limit rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth) rbd.SetGlobalInt("rbdSoftMaxCloneDepth", conf.RbdSoftMaxCloneDepth) @@ -128,6 +137,13 @@ func (r *Driver) Run(conf *util.Config) { }) } + if conf.EnableReadAffinity { + crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + } + // Create GRPC servers r.ids = NewIdentityServer(r.cd) @@ -136,7 +152,7 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology) + r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap) if err != nil { log.FatalLogMsg("failed to start node server, err %v\n", err) } @@ -163,17 +179,6 @@ func (r *Driver) Run(conf *util.Config) { "and replaced by CSI-Addons, see https://github.com/ceph/ceph-csi/issues/3314 for more details") r.rs = NewReplicationServer(r.cs) } - if !conf.IsControllerServer && !conf.IsNodeServer { - topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) - if err != nil { - log.FatalLogMsg(err.Error()) - } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology) - if err != nil { - log.FatalLogMsg("failed to start node server, err %v\n", err) - } - r.cs = NewControllerServer(r.cd) - } s := csicommon.NewNonBlockingGRPCServer() srv := csicommon.Servers{ diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index b390f5c01..359b3fdf0 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -45,6 +45,8 @@ type NodeServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error VolumeLocks *util.VolumeLocks + // readAffinityMapOptions contains map options to enable read affinity. + readAffinityMapOptions string } // stageTransaction struct represents the state a transaction was when it either completed @@ -143,7 +145,7 @@ func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *r // this function also receive the credentials and secrets args as it differs in its data. // The credentials are used directly by functions like voljournal.Connect() and other functions // like genVolFromVolumeOptions() make use of secrets. -func populateRbdVol( +func (ns *NodeServer) populateRbdVol( ctx context.Context, req *csi.NodeStageVolumeRequest, cr *util.Credentials, @@ -250,6 +252,7 @@ func populateRbdVol( if err != nil { return nil, err } + ns.appendReadAffinityMapOptions(rv) rv.VolID = volID @@ -265,6 +268,19 @@ func populateRbdVol( return rv, err } +// appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions +// if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty. +func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) { + switch { + case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: + return + case rv.MapOptions != "": + rv.MapOptions += "," + ns.readAffinityMapOptions + default: + rv.MapOptions = ns.readAffinityMapOptions + } +} + // NodeStageVolume mounts the volume to a staging path on the node. // Implementation notes: // - stagingTargetPath is the directory passed in the request where the volume needs to be staged @@ -318,7 +334,7 @@ func (ns *NodeServer) NodeStageVolume( } isStaticVol := parseBoolOption(ctx, req.GetVolumeContext(), staticVol, false) - rv, err := populateRbdVol(ctx, req, cr) + rv, err := ns.populateRbdVol(ctx, req, cr) if err != nil { return nil, err } @@ -1349,3 +1365,22 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) { return size, nil } + +func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) { + if len(crushLocationMap) == 0 { + return + } + + var b strings.Builder + b.WriteString("read_from_replica=localize,crush_location=") + first := true + for key, val := range crushLocationMap { + if first { + b.WriteString(fmt.Sprintf("%s:%s", key, val)) + first = false + } else { + b.WriteString(fmt.Sprintf("|%s:%s", key, val)) + } + } + ns.readAffinityMapOptions = b.String() +} diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index 871a6ec76..822232c6f 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/stretchr/testify/assert" ) func TestGetStagingPath(t *testing.T) { @@ -105,3 +106,141 @@ func TestParseBoolOption(t *testing.T) { } } } + +func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { + t.Parallel() + tests := []struct { + name string + crushLocationmap map[string]string + wantAny []string + }{ + { + name: "nil crushLocationmap", + crushLocationmap: nil, + wantAny: []string{""}, + }, + { + name: "empty crushLocationmap", + crushLocationmap: map[string]string{}, + wantAny: []string{""}, + }, + { + name: "single entry in crushLocationmap", + crushLocationmap: map[string]string{ + "region": "east", + }, + wantAny: []string{"read_from_replica=localize,crush_location=region:east"}, + }, + { + name: "multiple entries in crushLocationmap", + crushLocationmap: map[string]string{ + "region": "east", + "zone": "east-1", + }, + wantAny: []string{ + "read_from_replica=localize,crush_location=region:east|zone:east-1", + "read_from_replica=localize,crush_location=zone:east-1|region:east", + }, + }, + } + for _, tt := range tests { + currentTT := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + ns := &NodeServer{} + ns.SetReadAffinityMapOptions(currentTT.crushLocationmap) + assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions) + }) + } +} + +func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) { + t.Parallel() + type input struct { + mapOptions, readAffinityMapOptions, mounter string + } + tests := []struct { + name string + args input + want string + }{ + { + name: "both empty mapOptions and crushLocationMap", + args: input{ + mapOptions: "", + readAffinityMapOptions: "", + mounter: rbdDefaultMounter, + }, + want: "", + }, + { + name: "empty mapOptions, filled crushLocationMap & default mounter", + args: input{ + mapOptions: "", + readAffinityMapOptions: "read_from_replica=localize,crush_location=region:west", + mounter: rbdDefaultMounter, + }, + want: "read_from_replica=localize,crush_location=region:west", + }, + { + name: "empty mapOptions, filled crushLocationMap & non-default mounter", + args: input{ + mapOptions: "", + readAffinityMapOptions: "read_from_replica=localize,crush_location=region:west", + mounter: rbdNbdMounter, + }, + want: "", + }, + { + name: "filled mapOptions, filled crushLocationMap & default mounter", + args: input{ + mapOptions: "notrim", + readAffinityMapOptions: "read_from_replica=localize,crush_location=region:west", + mounter: rbdDefaultMounter, + }, + want: "notrim,read_from_replica=localize,crush_location=region:west", + }, + { + name: "filled mapOptions, filled crushLocationMap & non-default mounter", + args: input{ + mapOptions: "notrim", + readAffinityMapOptions: "read_from_replica=localize,crush_location=region:west", + mounter: rbdNbdMounter, + }, + want: "notrim", + }, + { + name: "filled mapOptions, empty readAffinityMapOptions & default mounter", + args: input{ + mapOptions: "notrim", + readAffinityMapOptions: "", + mounter: rbdDefaultMounter, + }, + want: "notrim", + }, + { + name: "filled mapOptions, empty readAffinityMapOptions & non-default mounter", + args: input{ + mapOptions: "notrim", + readAffinityMapOptions: "", + mounter: rbdNbdMounter, + }, + want: "notrim", + }, + } + for _, tt := range tests { + currentTT := tt + t.Run(currentTT.name, func(t *testing.T) { + t.Parallel() + rv := &rbdVolume{ + MapOptions: currentTT.args.mapOptions, + Mounter: currentTT.args.mounter, + } + ns := &NodeServer{ + readAffinityMapOptions: currentTT.args.readAffinityMapOptions, + } + ns.appendReadAffinityMapOptions(rv) + assert.Equal(t, currentTT.want, rv.MapOptions) + }) + } +} diff --git a/internal/util/crushlocation.go b/internal/util/crushlocation.go new file mode 100644 index 000000000..f7cfc6302 --- /dev/null +++ b/internal/util/crushlocation.go @@ -0,0 +1,77 @@ +/* +Copyright 2023 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "strings" + + "github.com/ceph/ceph-csi/internal/util/log" +) + +// GetCrushLocationMap returns the crush location map, determined from +// the crush location labels and their values from the CO system. +// Expects crushLocationLabels in arg to be in the format "[prefix/],[prefix/],...",. +// Returns map of crush location types with its array of associated values. +func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) { + if crushLocationLabels == "" { + return nil, nil + } + + nodeLabels, err := k8sGetNodeLabels(nodeName) + if err != nil { + return nil, err + } + + return getCrushLocationMap(crushLocationLabels, nodeLabels), nil +} + +// getCrushLocationMap returns the crush location map, determined from +// the crush location labels and node labels. +func getCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) map[string]string { + labelsToRead := strings.Split(crushLocationLabels, labelSeparator) + log.DefaultLog("CRUSH location labels passed for processing: %+v", labelsToRead) + + labelsIn := make(map[string]bool, len(labelsToRead)) + for _, label := range labelsToRead { + labelsIn[label] = true + } + + // Determine values for requested labels from node labels + crushLocationMap := make(map[string]string, len(labelsIn)) + for key, value := range nodeLabels { + if _, ok := labelsIn[key]; !ok { + continue + } + // label found split name component and store value + nameIdx := strings.IndexRune(key, keySeparator) + crushLocationType := strings.TrimSpace(key[nameIdx+1:]) + if crushLocationType == "hostname" { + // ceph defaults to "host" while Kubernetes uses "hostname" as key. + crushLocationType = "host" + } + // replace "." with "-" to satisfy ceph crush map. + value = strings.Replace(strings.TrimSpace(value), ".", "-", -1) + crushLocationMap[crushLocationType] = value + } + + if len(crushLocationMap) == 0 { + return nil + } + log.DefaultLog("list of CRUSH location processed: %+v", crushLocationMap) + + return crushLocationMap +} diff --git a/internal/util/crushlocation_test.go b/internal/util/crushlocation_test.go new file mode 100644 index 000000000..0e06cefd1 --- /dev/null +++ b/internal/util/crushlocation_test.go @@ -0,0 +1,113 @@ +/* +Copyright 2023 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_getCrushLocationMap(t *testing.T) { + t.Parallel() + type input struct { + crushLocationLabels string + nodeLabels map[string]string + } + tests := []struct { + name string + args input + want map[string]string + }{ + { + name: "empty crushLocationLabels", + args: input{ + crushLocationLabels: "", + nodeLabels: map[string]string{}, + }, + want: nil, + }, + { + name: "empty nodeLabels", + args: input{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{}, + }, + want: nil, + }, + { + name: "matching crushlocation and node labels", + args: input{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{ + "topology.io/zone": "zone1", + }, + }, + want: map[string]string{"zone": "zone1"}, + }, + { + name: "multuple matching crushlocation and node labels", + args: input{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{ + "topology.io/zone": "zone1", + "topology.io/rack": "rack1", + }, + }, + want: map[string]string{"zone": "zone1", "rack": "rack1"}, + }, + { + name: "no match between crushlocation and node labels", + args: input{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{ + "topology.io/region": "region1", + }, + }, + want: nil, + }, + { + name: "check crushlocation value replacement to satisfy ceph requirement", + args: input{ + crushLocationLabels: "topology.io/zone,topology.io/rack", + nodeLabels: map[string]string{ + "topology.io/zone": "south.east.1", + }, + }, + want: map[string]string{"zone": "south-east-1"}, + }, + { + name: "hostname key should be replaced with host", + args: input{ + crushLocationLabels: "topology.io/zone,topology.io/hostname", + nodeLabels: map[string]string{ + "topology.io/hostname": "worker-1", + }, + }, + want: map[string]string{"host": "worker-1"}, + }, + } + for _, tt := range tests { + currentTT := tt + t.Run(currentTT.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, + currentTT.want, + getCrushLocationMap(currentTT.args.crushLocationLabels, currentTT.args.nodeLabels)) + }) + } +} diff --git a/internal/util/util.go b/internal/util/util.go index 528b3f1e1..659b40320 100644 --- a/internal/util/util.go +++ b/internal/util/util.go @@ -147,6 +147,10 @@ type Config struct { // Cluster name ClusterName string + + // Read affinity related options + EnableReadAffinity bool // enable OSD read affinity. + CrushLocationLabels string // list of CRUSH location labels to read from the node. } // ValidateDriverName validates the driver name.