From 4d466843b94d88403df695e8d6daaa0c1516c05a Mon Sep 17 00:00:00 2001 From: Praveen M Date: Fri, 17 Nov 2023 11:59:00 +0530 Subject: [PATCH] cephfs: add read affinity mount option This commit makes use of crush location labels from node labels to supply `crush_location` and `read_from_replica=localize` options during mount. Using these options, cephfs will be able to redirect reads to the closest OSD, improving performance. Signed-off-by: Praveen M --- .../cephfs/kubernetes/csi-cephfsplugin.yaml | 9 +++++ .../kubernetes/csi-nodeplugin-rbac.yaml | 3 ++ internal/cephfs/driver.go | 40 +++++++++++++++---- internal/cephfs/nodeserver.go | 20 +++++++--- internal/cephfs/nodeserver_test.go | 22 ++++++---- internal/csi-common/nodeserver-default.go | 4 ++ internal/csi-common/utils.go | 13 ++++-- internal/nfs/nodeserver/nodeserver.go | 2 +- internal/rbd/driver/driver.go | 7 ++-- internal/rbd/nodeserver.go | 4 -- internal/rbd/nodeserver_test.go | 11 ++++- internal/rbd/rbd_attach.go | 2 +- internal/util/read_affinity.go | 5 ++- 13 files changed, 104 insertions(+), 38 deletions(-) diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml index 8aa526ba4..a38ace99f 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml @@ -63,6 +63,15 @@ spec: # and pass the label names below, for CSI to consume and advertise # its equivalent topology domain # - "--domainlabels=failure-domain/region,failure-domain/zone" + # + # Options to enable read affinity. + # If enabled Ceph CSI will fetch labels from kubernetes node and + # pass `read_from_replica=localize,crush_location=type:value` during + # CephFS mount command. refer: + # https://docs.ceph.com/en/latest/man/8/rbd/#kernel-rbd-krbd-options + # for more details. + # - "--enable-read-affinity=true" + # - "--crush-location-labels=topology.io/zone,topology.io/rack" env: - name: POD_IP valueFrom: diff --git a/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml b/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml index c1833d044..a7d4b6bc6 100644 --- a/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml +++ b/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml @@ -10,6 +10,9 @@ apiVersion: rbac.authorization.k8s.io/v1 metadata: name: cephfs-csi-nodeplugin rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get"] - apiGroups: [""] resources: ["secrets"] verbs: ["get"] diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 951b71456..9f1957fd6 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -28,6 +28,7 @@ import ( hc "github.com/ceph/ceph-csi/internal/health-checker" "github.com/ceph/ceph-csi/internal/journal" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" "github.com/container-storage-interface/spec/lib/go/csi" @@ -74,24 +75,29 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer { func NewNodeServer( d *csicommon.CSIDriver, t string, - topology map[string]string, kernelMountOptions string, fuseMountOptions string, + nodeLabels, topology, crushLocationMap map[string]string, ) *NodeServer { - return &NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), + cliReadAffinityMapOptions := util.ConstructReadAffinityMapOption(crushLocationMap) + ns := &NodeServer{ + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, cliReadAffinityMapOptions, topology, nodeLabels), VolumeLocks: util.NewVolumeLocks(), kernelMountOptions: kernelMountOptions, fuseMountOptions: fuseMountOptions, healthChecker: hc.NewHealthCheckManager(), } + + return ns } // Run start a non-blocking grpc controller,node and identityserver for // ceph CSI driver which can serve multiple parallel requests. func (fs *Driver) Run(conf *util.Config) { - var err error - var topology map[string]string + var ( + err error + nodeLabels, topology, crushLocationMap map[string]string + ) // Configuration if err = mounter.LoadAvailableMounters(conf); err != nil { @@ -102,6 +108,18 @@ func (fs *Driver) Run(conf *util.Config) { if conf.InstanceID != "" { CSIInstanceID = conf.InstanceID } + + if conf.IsNodeServer && k8s.RunsOnKubernetes() { + nodeLabels, err = k8s.GetNodeLabels(conf.NodeID) + if err != nil { + log.FatalLogMsg(err.Error()) + } + } + + if conf.EnableReadAffinity { + crushLocationMap = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels) + } + // Create an instance of the volume journal store.VolJournal = journal.NewCSIVolumeJournalWithNamespace(CSIInstanceID, fsutil.RadosNamespace) @@ -138,7 +156,11 @@ func (fs *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology, conf.KernelMountOptions, conf.FuseMountOptions) + fs.ns = NewNodeServer( + fs.cd, conf.Vtype, + conf.KernelMountOptions, conf.FuseMountOptions, + nodeLabels, topology, crushLocationMap, + ) } if conf.IsControllerServer { @@ -151,7 +173,11 @@ func (fs *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology, conf.KernelMountOptions, conf.FuseMountOptions) + fs.ns = NewNodeServer( + fs.cd, conf.Vtype, + conf.KernelMountOptions, conf.FuseMountOptions, + nodeLabels, topology, crushLocationMap, + ) fs.cs = NewControllerServer(fs.cd) } diff --git a/internal/cephfs/nodeserver.go b/internal/cephfs/nodeserver.go index 5d353cc66..58b2e3deb 100644 --- a/internal/cephfs/nodeserver.go +++ b/internal/cephfs/nodeserver.go @@ -766,11 +766,12 @@ func (ns *NodeServer) setMountOptions( csiConfigFile string, ) error { var ( - configuredMountOptions string - kernelMountOptions string - fuseMountOptions string - mountOptions []string - err error + configuredMountOptions string + readAffinityMountOptions string + kernelMountOptions string + fuseMountOptions string + mountOptions []string + err error ) if m := volCap.GetMount(); m != nil { mountOptions = m.GetMountFlags() @@ -781,6 +782,14 @@ func (ns *NodeServer) setMountOptions( if err != nil { return err } + + // read affinity mount options + readAffinityMountOptions, err = util.GetReadAffinityMapOptions( + csiConfigFile, volOptions.ClusterID, ns.CLIReadAffinityOptions, ns.NodeLabels, + ) + if err != nil { + return err + } } switch mnt.(type) { @@ -799,6 +808,7 @@ func (ns *NodeServer) setMountOptions( configuredMountOptions = kernelMountOptions } volOptions.KernelMountOptions = util.MountOptionsAdd(volOptions.KernelMountOptions, configuredMountOptions) + volOptions.KernelMountOptions = util.MountOptionsAdd(volOptions.KernelMountOptions, readAffinityMountOptions) volOptions.KernelMountOptions = util.MountOptionsAdd(volOptions.KernelMountOptions, mountOptions...) } diff --git a/internal/cephfs/nodeserver_test.go b/internal/cephfs/nodeserver_test.go index e610d95a6..34095529e 100644 --- a/internal/cephfs/nodeserver_test.go +++ b/internal/cephfs/nodeserver_test.go @@ -26,6 +26,7 @@ import ( "github.com/ceph/ceph-csi/internal/cephfs/mounter" "github.com/ceph/ceph-csi/internal/cephfs/store" + csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/util" ) @@ -63,19 +64,19 @@ func Test_setMountOptions(t *testing.T) { t.Logf("path = %s", tmpConfPath) err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) if err != nil { - t.Errorf("failed to write %s file content: %v", util.CsiConfigFile, err) + t.Errorf("failed to write %s file content: %v", tmpConfPath, err) } tests := []struct { name string - ns NodeServer + ns *NodeServer mnt mounter.VolumeMounter volOptions *store.VolumeOptions want string }{ { name: "KernelMountOptions set in cluster-1 config and not set in CLI", - ns: NodeServer{}, + ns: &NodeServer{}, mnt: mounter.VolumeMounter(&mounter.KernelMounter{}), volOptions: &store.VolumeOptions{ ClusterID: "cluster-1", @@ -84,7 +85,7 @@ func Test_setMountOptions(t *testing.T) { }, { name: "FuseMountOptions set in cluster-1 config and not set in CLI", - ns: NodeServer{}, + ns: &NodeServer{}, mnt: mounter.VolumeMounter(&mounter.FuseMounter{}), volOptions: &store.VolumeOptions{ ClusterID: "cluster-1", @@ -93,7 +94,7 @@ func Test_setMountOptions(t *testing.T) { }, { name: "KernelMountOptions set in cluster-1 config and set in CLI", - ns: NodeServer{ + ns: &NodeServer{ kernelMountOptions: cliKernelMountOptions, }, mnt: mounter.VolumeMounter(&mounter.KernelMounter{}), @@ -104,7 +105,7 @@ func Test_setMountOptions(t *testing.T) { }, { name: "FuseMountOptions not set in cluster-2 config and set in CLI", - ns: NodeServer{ + ns: &NodeServer{ fuseMountOptions: cliFuseMountOptions, }, mnt: mounter.VolumeMounter(&mounter.FuseMounter{}), @@ -115,7 +116,7 @@ func Test_setMountOptions(t *testing.T) { }, { name: "KernelMountOptions not set in cluster-2 config and set in CLI", - ns: NodeServer{ + ns: &NodeServer{ kernelMountOptions: cliKernelMountOptions, }, mnt: mounter.VolumeMounter(&mounter.KernelMounter{}), @@ -126,7 +127,7 @@ func Test_setMountOptions(t *testing.T) { }, { name: "FuseMountOptions not set in cluster-1 config and set in CLI", - ns: NodeServer{ + ns: &NodeServer{ fuseMountOptions: cliFuseMountOptions, }, mnt: mounter.VolumeMounter(&mounter.FuseMounter{}), @@ -146,6 +147,11 @@ func Test_setMountOptions(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() + driver := &csicommon.CSIDriver{} + tc.ns.DefaultNodeServer = csicommon.NewDefaultNodeServer( + driver, "cephfs", "", map[string]string{}, map[string]string{}, + ) + err := tc.ns.setMountOptions(tc.mnt, tc.volOptions, volCap, tmpConfPath) if err != nil { t.Errorf("setMountOptions() = %v", err) diff --git a/internal/csi-common/nodeserver-default.go b/internal/csi-common/nodeserver-default.go index 4e6be24b2..3d0bae93a 100644 --- a/internal/csi-common/nodeserver-default.go +++ b/internal/csi-common/nodeserver-default.go @@ -31,6 +31,10 @@ type DefaultNodeServer struct { Driver *CSIDriver Type string Mounter mount.Interface + // NodeLabels stores the node labels + NodeLabels map[string]string + // CLIReadAffinityOptions contains map options passed through command line to enable read affinity. + CLIReadAffinityOptions string } // NodeGetInfo returns node ID. diff --git a/internal/csi-common/utils.go b/internal/csi-common/utils.go index 03323af04..080f9df93 100644 --- a/internal/csi-common/utils.go +++ b/internal/csi-common/utils.go @@ -55,13 +55,18 @@ func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *c } // NewDefaultNodeServer initializes default node server. -func NewDefaultNodeServer(d *CSIDriver, t string, topology map[string]string) *DefaultNodeServer { +func NewDefaultNodeServer( + d *CSIDriver, t, cliReadAffinityMapOptions string, + topology, nodeLabels map[string]string, +) *DefaultNodeServer { d.topology = topology return &DefaultNodeServer{ - Driver: d, - Type: t, - Mounter: mount.NewWithoutSystemd(""), + Driver: d, + Type: t, + Mounter: mount.NewWithoutSystemd(""), + NodeLabels: nodeLabels, + CLIReadAffinityOptions: cliReadAffinityMapOptions, } } diff --git a/internal/nfs/nodeserver/nodeserver.go b/internal/nfs/nodeserver/nodeserver.go index 7d49fdc39..19baaa80a 100644 --- a/internal/nfs/nodeserver/nodeserver.go +++ b/internal/nfs/nodeserver/nodeserver.go @@ -54,7 +54,7 @@ func NewNodeServer( t string, ) *NodeServer { return &NodeServer{ - DefaultNodeServer: *csicommon.NewDefaultNodeServer(d, t, map[string]string{}), + DefaultNodeServer: *csicommon.NewDefaultNodeServer(d, t, "", map[string]string{}, map[string]string{}), } } diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index eda9f03be..4d7061b8e 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -71,11 +71,10 @@ func NewNodeServer( t string, nodeLabels, topology, crushLocationMap map[string]string, ) (*rbd.NodeServer, error) { + cliReadAffinityMapOptions := util.ConstructReadAffinityMapOption(crushLocationMap) ns := rbd.NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), - VolumeLocks: util.NewVolumeLocks(), - NodeLabels: nodeLabels, - CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, cliReadAffinityMapOptions, topology, nodeLabels), + VolumeLocks: util.NewVolumeLocks(), } return &ns, nil diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index d7961e2bb..773557e0c 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -45,10 +45,6 @@ type NodeServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error VolumeLocks *util.VolumeLocks - // NodeLabels stores the node labels - NodeLabels map[string]string - // CLIReadAffinityMapOptions contains map options passed through command line to enable read affinity. - CLIReadAffinityMapOptions string } // stageTransaction struct represents the state a transaction was when it either completed diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index bffc114dc..59c2c274c 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -22,6 +22,7 @@ import ( "os" "testing" + csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/util" "github.com/container-storage-interface/spec/lib/go/csi" @@ -206,6 +207,7 @@ func TestReadAffinity_GetReadAffinityMapOptions(t *testing.T) { "topology.kubernetes.io/zone": "east-1", "topology.kubernetes.io/region": "east", } + topology := map[string]string{} csiConfig := []util.ClusterInfo{ { @@ -304,11 +306,16 @@ func TestReadAffinity_GetReadAffinityMapOptions(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() crushLocationMap := util.GetCrushLocationMap(tc.CLICrushLocationLabels, nodeLabels) + cliReadAffinityMapOptions := util.ConstructReadAffinityMapOption(crushLocationMap) + driver := &csicommon.CSIDriver{} + ns := &NodeServer{ - CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap), + DefaultNodeServer: csicommon.NewDefaultNodeServer( + driver, "rbd", cliReadAffinityMapOptions, topology, nodeLabels, + ), } readAffinityMapOptions, err := util.GetReadAffinityMapOptions( - tc.clusterID, ns.CLIReadAffinityMapOptions, nodeLabels, + tmpConfPath, tc.clusterID, ns.CLIReadAffinityOptions, nodeLabels, ) if err != nil { assert.Fail(t, err.Error()) diff --git a/internal/rbd/rbd_attach.go b/internal/rbd/rbd_attach.go index 37d121d79..dde6a24d3 100644 --- a/internal/rbd/rbd_attach.go +++ b/internal/rbd/rbd_attach.go @@ -313,7 +313,7 @@ func (ns *NodeServer) getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolu } readAffinityMapOptions, err := util.GetReadAffinityMapOptions( - rv.ClusterID, ns.CLIReadAffinityMapOptions, ns.NodeLabels, + util.CsiConfigFile, rv.ClusterID, ns.CLIReadAffinityOptions, ns.NodeLabels, ) if err != nil { return err diff --git a/internal/util/read_affinity.go b/internal/util/read_affinity.go index a62620892..cf802b462 100644 --- a/internal/util/read_affinity.go +++ b/internal/util/read_affinity.go @@ -48,7 +48,8 @@ func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string { // If not, it falls back to returning the `cliReadAffinityMapOptions` from the command line. // If neither of these options is available, it returns an empty string. func GetReadAffinityMapOptions( - clusterID, cliReadAffinityMapOptions string, nodeLabels map[string]string, + csiConfigFile, clusterID, cliReadAffinityMapOptions string, + nodeLabels map[string]string, ) (string, error) { var ( err error @@ -56,7 +57,7 @@ func GetReadAffinityMapOptions( configCrushLocationLabels string ) - configReadAffinityEnabled, configCrushLocationLabels, err = GetCrushLocationLabels(CsiConfigFile, clusterID) + configReadAffinityEnabled, configCrushLocationLabels, err = GetCrushLocationLabels(csiConfigFile, clusterID) if err != nil { return "", err }