cephfs: add read affinity mount option

This commit makes use of crush location labels from node
labels to supply `crush_location` and `read_from_replica=localize`
options during mount. Using these options, cephfs
will be able to redirect reads to the closest OSD,
improving performance.

Signed-off-by: Praveen M <m.praveen@ibm.com>
This commit is contained in:
Praveen M 2023-11-17 11:59:00 +05:30 committed by mergify[bot]
parent 6446150e67
commit 4d466843b9
13 changed files with 104 additions and 38 deletions

View File

@ -63,6 +63,15 @@ spec:
# and pass the label names below, for CSI to consume and advertise # and pass the label names below, for CSI to consume and advertise
# its equivalent topology domain # its equivalent topology domain
# - "--domainlabels=failure-domain/region,failure-domain/zone" # - "--domainlabels=failure-domain/region,failure-domain/zone"
#
# Options to enable read affinity.
# If enabled Ceph CSI will fetch labels from kubernetes node and
# pass `read_from_replica=localize,crush_location=type:value` during
# CephFS mount command. refer:
# https://docs.ceph.com/en/latest/man/8/rbd/#kernel-rbd-krbd-options
# for more details.
# - "--enable-read-affinity=true"
# - "--crush-location-labels=topology.io/zone,topology.io/rack"
env: env:
- name: POD_IP - name: POD_IP
valueFrom: valueFrom:

View File

@ -10,6 +10,9 @@ apiVersion: rbac.authorization.k8s.io/v1
metadata: metadata:
name: cephfs-csi-nodeplugin name: cephfs-csi-nodeplugin
rules: rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]
- apiGroups: [""] - apiGroups: [""]
resources: ["secrets"] resources: ["secrets"]
verbs: ["get"] verbs: ["get"]

View File

@ -28,6 +28,7 @@ import (
hc "github.com/ceph/ceph-csi/internal/health-checker" hc "github.com/ceph/ceph-csi/internal/health-checker"
"github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/k8s"
"github.com/ceph/ceph-csi/internal/util/log" "github.com/ceph/ceph-csi/internal/util/log"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
@ -74,24 +75,29 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
func NewNodeServer( func NewNodeServer(
d *csicommon.CSIDriver, d *csicommon.CSIDriver,
t string, t string,
topology map[string]string,
kernelMountOptions string, kernelMountOptions string,
fuseMountOptions string, fuseMountOptions string,
nodeLabels, topology, crushLocationMap map[string]string,
) *NodeServer { ) *NodeServer {
return &NodeServer{ cliReadAffinityMapOptions := util.ConstructReadAffinityMapOption(crushLocationMap)
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), ns := &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, cliReadAffinityMapOptions, topology, nodeLabels),
VolumeLocks: util.NewVolumeLocks(), VolumeLocks: util.NewVolumeLocks(),
kernelMountOptions: kernelMountOptions, kernelMountOptions: kernelMountOptions,
fuseMountOptions: fuseMountOptions, fuseMountOptions: fuseMountOptions,
healthChecker: hc.NewHealthCheckManager(), healthChecker: hc.NewHealthCheckManager(),
} }
return ns
} }
// Run start a non-blocking grpc controller,node and identityserver for // Run start a non-blocking grpc controller,node and identityserver for
// ceph CSI driver which can serve multiple parallel requests. // ceph CSI driver which can serve multiple parallel requests.
func (fs *Driver) Run(conf *util.Config) { func (fs *Driver) Run(conf *util.Config) {
var err error var (
var topology map[string]string err error
nodeLabels, topology, crushLocationMap map[string]string
)
// Configuration // Configuration
if err = mounter.LoadAvailableMounters(conf); err != nil { if err = mounter.LoadAvailableMounters(conf); err != nil {
@ -102,6 +108,18 @@ func (fs *Driver) Run(conf *util.Config) {
if conf.InstanceID != "" { if conf.InstanceID != "" {
CSIInstanceID = conf.InstanceID CSIInstanceID = conf.InstanceID
} }
if conf.IsNodeServer && k8s.RunsOnKubernetes() {
nodeLabels, err = k8s.GetNodeLabels(conf.NodeID)
if err != nil {
log.FatalLogMsg(err.Error())
}
}
if conf.EnableReadAffinity {
crushLocationMap = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels)
}
// Create an instance of the volume journal // Create an instance of the volume journal
store.VolJournal = journal.NewCSIVolumeJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) store.VolJournal = journal.NewCSIVolumeJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace)
@ -138,7 +156,11 @@ func (fs *Driver) Run(conf *util.Config) {
if err != nil { if err != nil {
log.FatalLogMsg(err.Error()) log.FatalLogMsg(err.Error())
} }
fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology, conf.KernelMountOptions, conf.FuseMountOptions) fs.ns = NewNodeServer(
fs.cd, conf.Vtype,
conf.KernelMountOptions, conf.FuseMountOptions,
nodeLabels, topology, crushLocationMap,
)
} }
if conf.IsControllerServer { if conf.IsControllerServer {
@ -151,7 +173,11 @@ func (fs *Driver) Run(conf *util.Config) {
if err != nil { if err != nil {
log.FatalLogMsg(err.Error()) log.FatalLogMsg(err.Error())
} }
fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology, conf.KernelMountOptions, conf.FuseMountOptions) fs.ns = NewNodeServer(
fs.cd, conf.Vtype,
conf.KernelMountOptions, conf.FuseMountOptions,
nodeLabels, topology, crushLocationMap,
)
fs.cs = NewControllerServer(fs.cd) fs.cs = NewControllerServer(fs.cd)
} }

View File

@ -766,11 +766,12 @@ func (ns *NodeServer) setMountOptions(
csiConfigFile string, csiConfigFile string,
) error { ) error {
var ( var (
configuredMountOptions string configuredMountOptions string
kernelMountOptions string readAffinityMountOptions string
fuseMountOptions string kernelMountOptions string
mountOptions []string fuseMountOptions string
err error mountOptions []string
err error
) )
if m := volCap.GetMount(); m != nil { if m := volCap.GetMount(); m != nil {
mountOptions = m.GetMountFlags() mountOptions = m.GetMountFlags()
@ -781,6 +782,14 @@ func (ns *NodeServer) setMountOptions(
if err != nil { if err != nil {
return err return err
} }
// read affinity mount options
readAffinityMountOptions, err = util.GetReadAffinityMapOptions(
csiConfigFile, volOptions.ClusterID, ns.CLIReadAffinityOptions, ns.NodeLabels,
)
if err != nil {
return err
}
} }
switch mnt.(type) { switch mnt.(type) {
@ -799,6 +808,7 @@ func (ns *NodeServer) setMountOptions(
configuredMountOptions = kernelMountOptions configuredMountOptions = kernelMountOptions
} }
volOptions.KernelMountOptions = util.MountOptionsAdd(volOptions.KernelMountOptions, configuredMountOptions) volOptions.KernelMountOptions = util.MountOptionsAdd(volOptions.KernelMountOptions, configuredMountOptions)
volOptions.KernelMountOptions = util.MountOptionsAdd(volOptions.KernelMountOptions, readAffinityMountOptions)
volOptions.KernelMountOptions = util.MountOptionsAdd(volOptions.KernelMountOptions, mountOptions...) volOptions.KernelMountOptions = util.MountOptionsAdd(volOptions.KernelMountOptions, mountOptions...)
} }

View File

@ -26,6 +26,7 @@ import (
"github.com/ceph/ceph-csi/internal/cephfs/mounter" "github.com/ceph/ceph-csi/internal/cephfs/mounter"
"github.com/ceph/ceph-csi/internal/cephfs/store" "github.com/ceph/ceph-csi/internal/cephfs/store"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
) )
@ -63,19 +64,19 @@ func Test_setMountOptions(t *testing.T) {
t.Logf("path = %s", tmpConfPath) t.Logf("path = %s", tmpConfPath)
err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600)
if err != nil { if err != nil {
t.Errorf("failed to write %s file content: %v", util.CsiConfigFile, err) t.Errorf("failed to write %s file content: %v", tmpConfPath, err)
} }
tests := []struct { tests := []struct {
name string name string
ns NodeServer ns *NodeServer
mnt mounter.VolumeMounter mnt mounter.VolumeMounter
volOptions *store.VolumeOptions volOptions *store.VolumeOptions
want string want string
}{ }{
{ {
name: "KernelMountOptions set in cluster-1 config and not set in CLI", name: "KernelMountOptions set in cluster-1 config and not set in CLI",
ns: NodeServer{}, ns: &NodeServer{},
mnt: mounter.VolumeMounter(&mounter.KernelMounter{}), mnt: mounter.VolumeMounter(&mounter.KernelMounter{}),
volOptions: &store.VolumeOptions{ volOptions: &store.VolumeOptions{
ClusterID: "cluster-1", ClusterID: "cluster-1",
@ -84,7 +85,7 @@ func Test_setMountOptions(t *testing.T) {
}, },
{ {
name: "FuseMountOptions set in cluster-1 config and not set in CLI", name: "FuseMountOptions set in cluster-1 config and not set in CLI",
ns: NodeServer{}, ns: &NodeServer{},
mnt: mounter.VolumeMounter(&mounter.FuseMounter{}), mnt: mounter.VolumeMounter(&mounter.FuseMounter{}),
volOptions: &store.VolumeOptions{ volOptions: &store.VolumeOptions{
ClusterID: "cluster-1", ClusterID: "cluster-1",
@ -93,7 +94,7 @@ func Test_setMountOptions(t *testing.T) {
}, },
{ {
name: "KernelMountOptions set in cluster-1 config and set in CLI", name: "KernelMountOptions set in cluster-1 config and set in CLI",
ns: NodeServer{ ns: &NodeServer{
kernelMountOptions: cliKernelMountOptions, kernelMountOptions: cliKernelMountOptions,
}, },
mnt: mounter.VolumeMounter(&mounter.KernelMounter{}), mnt: mounter.VolumeMounter(&mounter.KernelMounter{}),
@ -104,7 +105,7 @@ func Test_setMountOptions(t *testing.T) {
}, },
{ {
name: "FuseMountOptions not set in cluster-2 config and set in CLI", name: "FuseMountOptions not set in cluster-2 config and set in CLI",
ns: NodeServer{ ns: &NodeServer{
fuseMountOptions: cliFuseMountOptions, fuseMountOptions: cliFuseMountOptions,
}, },
mnt: mounter.VolumeMounter(&mounter.FuseMounter{}), mnt: mounter.VolumeMounter(&mounter.FuseMounter{}),
@ -115,7 +116,7 @@ func Test_setMountOptions(t *testing.T) {
}, },
{ {
name: "KernelMountOptions not set in cluster-2 config and set in CLI", name: "KernelMountOptions not set in cluster-2 config and set in CLI",
ns: NodeServer{ ns: &NodeServer{
kernelMountOptions: cliKernelMountOptions, kernelMountOptions: cliKernelMountOptions,
}, },
mnt: mounter.VolumeMounter(&mounter.KernelMounter{}), mnt: mounter.VolumeMounter(&mounter.KernelMounter{}),
@ -126,7 +127,7 @@ func Test_setMountOptions(t *testing.T) {
}, },
{ {
name: "FuseMountOptions not set in cluster-1 config and set in CLI", name: "FuseMountOptions not set in cluster-1 config and set in CLI",
ns: NodeServer{ ns: &NodeServer{
fuseMountOptions: cliFuseMountOptions, fuseMountOptions: cliFuseMountOptions,
}, },
mnt: mounter.VolumeMounter(&mounter.FuseMounter{}), mnt: mounter.VolumeMounter(&mounter.FuseMounter{}),
@ -146,6 +147,11 @@ func Test_setMountOptions(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Parallel() t.Parallel()
driver := &csicommon.CSIDriver{}
tc.ns.DefaultNodeServer = csicommon.NewDefaultNodeServer(
driver, "cephfs", "", map[string]string{}, map[string]string{},
)
err := tc.ns.setMountOptions(tc.mnt, tc.volOptions, volCap, tmpConfPath) err := tc.ns.setMountOptions(tc.mnt, tc.volOptions, volCap, tmpConfPath)
if err != nil { if err != nil {
t.Errorf("setMountOptions() = %v", err) t.Errorf("setMountOptions() = %v", err)

View File

@ -31,6 +31,10 @@ type DefaultNodeServer struct {
Driver *CSIDriver Driver *CSIDriver
Type string Type string
Mounter mount.Interface Mounter mount.Interface
// NodeLabels stores the node labels
NodeLabels map[string]string
// CLIReadAffinityOptions contains map options passed through command line to enable read affinity.
CLIReadAffinityOptions string
} }
// NodeGetInfo returns node ID. // NodeGetInfo returns node ID.

View File

@ -55,13 +55,18 @@ func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *c
} }
// NewDefaultNodeServer initializes default node server. // NewDefaultNodeServer initializes default node server.
func NewDefaultNodeServer(d *CSIDriver, t string, topology map[string]string) *DefaultNodeServer { func NewDefaultNodeServer(
d *CSIDriver, t, cliReadAffinityMapOptions string,
topology, nodeLabels map[string]string,
) *DefaultNodeServer {
d.topology = topology d.topology = topology
return &DefaultNodeServer{ return &DefaultNodeServer{
Driver: d, Driver: d,
Type: t, Type: t,
Mounter: mount.NewWithoutSystemd(""), Mounter: mount.NewWithoutSystemd(""),
NodeLabels: nodeLabels,
CLIReadAffinityOptions: cliReadAffinityMapOptions,
} }
} }

View File

@ -54,7 +54,7 @@ func NewNodeServer(
t string, t string,
) *NodeServer { ) *NodeServer {
return &NodeServer{ return &NodeServer{
DefaultNodeServer: *csicommon.NewDefaultNodeServer(d, t, map[string]string{}), DefaultNodeServer: *csicommon.NewDefaultNodeServer(d, t, "", map[string]string{}, map[string]string{}),
} }
} }

View File

@ -71,11 +71,10 @@ func NewNodeServer(
t string, t string,
nodeLabels, topology, crushLocationMap map[string]string, nodeLabels, topology, crushLocationMap map[string]string,
) (*rbd.NodeServer, error) { ) (*rbd.NodeServer, error) {
cliReadAffinityMapOptions := util.ConstructReadAffinityMapOption(crushLocationMap)
ns := rbd.NodeServer{ ns := rbd.NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, cliReadAffinityMapOptions, topology, nodeLabels),
VolumeLocks: util.NewVolumeLocks(), VolumeLocks: util.NewVolumeLocks(),
NodeLabels: nodeLabels,
CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap),
} }
return &ns, nil return &ns, nil

View File

@ -45,10 +45,6 @@ type NodeServer struct {
// A map storing all volumes with ongoing operations so that additional operations // A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error // for that same volume (as defined by VolumeID) return an Aborted error
VolumeLocks *util.VolumeLocks VolumeLocks *util.VolumeLocks
// NodeLabels stores the node labels
NodeLabels map[string]string
// CLIReadAffinityMapOptions contains map options passed through command line to enable read affinity.
CLIReadAffinityMapOptions string
} }
// stageTransaction struct represents the state a transaction was when it either completed // stageTransaction struct represents the state a transaction was when it either completed

View File

@ -22,6 +22,7 @@ import (
"os" "os"
"testing" "testing"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
@ -206,6 +207,7 @@ func TestReadAffinity_GetReadAffinityMapOptions(t *testing.T) {
"topology.kubernetes.io/zone": "east-1", "topology.kubernetes.io/zone": "east-1",
"topology.kubernetes.io/region": "east", "topology.kubernetes.io/region": "east",
} }
topology := map[string]string{}
csiConfig := []util.ClusterInfo{ csiConfig := []util.ClusterInfo{
{ {
@ -304,11 +306,16 @@ func TestReadAffinity_GetReadAffinityMapOptions(t *testing.T) {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
t.Parallel() t.Parallel()
crushLocationMap := util.GetCrushLocationMap(tc.CLICrushLocationLabels, nodeLabels) crushLocationMap := util.GetCrushLocationMap(tc.CLICrushLocationLabels, nodeLabels)
cliReadAffinityMapOptions := util.ConstructReadAffinityMapOption(crushLocationMap)
driver := &csicommon.CSIDriver{}
ns := &NodeServer{ ns := &NodeServer{
CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap), DefaultNodeServer: csicommon.NewDefaultNodeServer(
driver, "rbd", cliReadAffinityMapOptions, topology, nodeLabels,
),
} }
readAffinityMapOptions, err := util.GetReadAffinityMapOptions( readAffinityMapOptions, err := util.GetReadAffinityMapOptions(
tc.clusterID, ns.CLIReadAffinityMapOptions, nodeLabels, tmpConfPath, tc.clusterID, ns.CLIReadAffinityOptions, nodeLabels,
) )
if err != nil { if err != nil {
assert.Fail(t, err.Error()) assert.Fail(t, err.Error())

View File

@ -313,7 +313,7 @@ func (ns *NodeServer) getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolu
} }
readAffinityMapOptions, err := util.GetReadAffinityMapOptions( readAffinityMapOptions, err := util.GetReadAffinityMapOptions(
rv.ClusterID, ns.CLIReadAffinityMapOptions, ns.NodeLabels, util.CsiConfigFile, rv.ClusterID, ns.CLIReadAffinityOptions, ns.NodeLabels,
) )
if err != nil { if err != nil {
return err return err

View File

@ -48,7 +48,8 @@ func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string {
// If not, it falls back to returning the `cliReadAffinityMapOptions` from the command line. // If not, it falls back to returning the `cliReadAffinityMapOptions` from the command line.
// If neither of these options is available, it returns an empty string. // If neither of these options is available, it returns an empty string.
func GetReadAffinityMapOptions( func GetReadAffinityMapOptions(
clusterID, cliReadAffinityMapOptions string, nodeLabels map[string]string, csiConfigFile, clusterID, cliReadAffinityMapOptions string,
nodeLabels map[string]string,
) (string, error) { ) (string, error) {
var ( var (
err error err error
@ -56,7 +57,7 @@ func GetReadAffinityMapOptions(
configCrushLocationLabels string configCrushLocationLabels string
) )
configReadAffinityEnabled, configCrushLocationLabels, err = GetCrushLocationLabels(CsiConfigFile, clusterID) configReadAffinityEnabled, configCrushLocationLabels, err = GetCrushLocationLabels(csiConfigFile, clusterID)
if err != nil { if err != nil {
return "", err return "", err
} }