mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-18 02:39:30 +00:00
Merge pull request #211 from red-hat-storage/sync_us--devel
Syncing latest changes from devel for ceph-csi
This commit is contained in:
commit
9f9c3fe375
18
.github/workflows/tickgit.yaml
vendored
Normal file
18
.github/workflows/tickgit.yaml
vendored
Normal file
@ -0,0 +1,18 @@
|
||||
---
|
||||
name: List TODO's
|
||||
# yamllint disable-line rule:truthy
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- devel
|
||||
|
||||
permissions:
|
||||
contents: read
|
||||
|
||||
jobs:
|
||||
tickgit:
|
||||
name: tickgit
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v4
|
||||
- run: make containerized-test TARGET=tickgit
|
4
Makefile
4
Makefile
@ -143,6 +143,10 @@ check-env:
|
||||
|
||||
codespell:
|
||||
codespell --config scripts/codespell.conf
|
||||
|
||||
tickgit:
|
||||
tickgit $(CURDIR)
|
||||
|
||||
#
|
||||
# commitlint will do a rebase on top of GIT_SINCE when REBASE=1 is passed.
|
||||
#
|
||||
|
@ -5,3 +5,8 @@
|
||||
- Removed the deprecated grpc metrics flag's in [PR](https://github.com/ceph/ceph-csi/pull/4225)
|
||||
|
||||
## Features
|
||||
|
||||
RBD
|
||||
|
||||
- Support for configuring read affinity for individuals cluster within the ceph-csi-config
|
||||
ConfigMap in [PR](https://github.com/ceph/ceph-csi/pull/4165)
|
||||
|
@ -41,7 +41,7 @@ type SecurityContextConstraintsValues struct {
|
||||
}
|
||||
|
||||
// SecurityContextConstraintsDefaults can be used for generating deployment
|
||||
// artifacts with defails values.
|
||||
// artifacts with details values.
|
||||
var SecurityContextConstraintsDefaults = SecurityContextConstraintsValues{
|
||||
Namespace: "ceph-csi",
|
||||
Deployer: "",
|
||||
|
@ -27,6 +27,11 @@ serviceAccounts:
|
||||
# - "<MONValue2>"
|
||||
# rbd:
|
||||
# netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net"
|
||||
# readAffinity:
|
||||
# enabled: true
|
||||
# crushLocationLabels:
|
||||
# - topology.kubernetes.io/region
|
||||
# - topology.kubernetes.io/zone
|
||||
csiConfig: []
|
||||
|
||||
# Configuration details of clusterID,PoolID and FscID mapping
|
||||
|
@ -32,6 +32,10 @@ kind: ConfigMap
|
||||
# path for the Ceph cluster identified by the <cluster-id>, This will be used
|
||||
# by the RBD CSI plugin to execute the rbd map/unmap in the
|
||||
# network namespace specified by the "rbd.netNamespaceFilePath".
|
||||
# The "readAffinity" fields are used to enable read affinity and pass the crush
|
||||
# location map for the Ceph cluster identified by the cluster <cluster-id>,
|
||||
# enabling this will add
|
||||
# "read_from_replica=localize,crush_location=<label:value>" to the map option.
|
||||
# If a CSI plugin is using more than one Ceph cluster, repeat the section for
|
||||
# each such cluster in use.
|
||||
# NOTE: Changes to the configmap is automatically updated in the running pods,
|
||||
@ -66,6 +70,15 @@ data:
|
||||
}
|
||||
"nfs": {
|
||||
"netNamespaceFilePath": "<kubeletRootPath>/plugins/nfs.csi.ceph.com/net",
|
||||
},
|
||||
"readAffinity": {
|
||||
"enabled": "false",
|
||||
"crushLocationLabels": [
|
||||
"<Label1>",
|
||||
"<Label2>"
|
||||
...
|
||||
"<Label3>"
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
|
@ -47,7 +47,7 @@ make image-cephcsi
|
||||
| `--maxsnapshotsonimage` | `450` | Maximum number of snapshots allowed on rbd image without flattening |
|
||||
| `--setmetadata` | `false` | Set metadata on volume |
|
||||
| `--enable-read-affinity` | `false` | enable read affinity |
|
||||
| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ',' |
|
||||
| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ','.<br>`Note: These labels will be replaced if crush location labels are defined in the ceph-csi-config ConfigMap for the specific cluster.` |
|
||||
|
||||
**Available volume parameters:**
|
||||
|
||||
@ -222,6 +222,12 @@ If enabled, this option will be added to all RBD volumes mapped by Ceph CSI.
|
||||
Well known labels can be found
|
||||
[here](https://kubernetes.io/docs/reference/labels-annotations-taints/).
|
||||
|
||||
Read affinity can be configured for individual clusters within the
|
||||
`ceph-csi-config` ConfigMap. This allows configuring the crush location labels
|
||||
for each ceph cluster separately. The crush location labels specified in the
|
||||
ConfigMap will supersede those provided via command line argument
|
||||
`--crush-location-labels`.
|
||||
|
||||
>Note: Label values will have all its dots `"."` normalized with dashes `"-"`
|
||||
in order for it to work with ceph CRUSH map.
|
||||
|
||||
|
@ -63,6 +63,16 @@ func createConfigMap(pluginPath string, c kubernetes.Interface, f *framework.Fra
|
||||
}{
|
||||
RadosNamespace: radosNamespace,
|
||||
},
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: true,
|
||||
CrushLocationLabels: []string{
|
||||
crushLocationRegionLabel,
|
||||
crushLocationZoneLabel,
|
||||
},
|
||||
},
|
||||
}}
|
||||
if upgradeTesting {
|
||||
subvolumegroup = "csi"
|
||||
|
@ -1226,7 +1226,7 @@ func validatePVCSnapshot(
|
||||
checkSumClone, chErrs[n] = calculateSHA512sum(f, &a, filePath, &opt)
|
||||
framework.Logf("checksum value for the clone is %s with pod name %s", checkSumClone, name)
|
||||
if chErrs[n] != nil {
|
||||
framework.Logf("failed to calculte checksum for clone: %s", chErrs[n])
|
||||
framework.Logf("failed to calculate checksum for clone: %s", chErrs[n])
|
||||
}
|
||||
if checkSumClone != checkSum {
|
||||
framework.Logf(
|
||||
|
@ -68,11 +68,10 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
|
||||
return err
|
||||
}
|
||||
|
||||
// if cloneErr is not nil we will delete the snapshot
|
||||
var cloneErr error
|
||||
|
||||
defer func() {
|
||||
if cloneErr != nil {
|
||||
// if any error occurs while cloning, resizing or deleting the snapshot
|
||||
// fails then we need to delete the clone and snapshot.
|
||||
if err != nil && !cerrors.IsCloneRetryError(err) {
|
||||
if err = s.PurgeVolume(ctx, true); err != nil {
|
||||
log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, err)
|
||||
}
|
||||
@ -81,18 +80,19 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
|
||||
}
|
||||
}
|
||||
}()
|
||||
cloneErr = snapClient.CloneSnapshot(ctx, s.SubVolume)
|
||||
if cloneErr != nil {
|
||||
log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, cloneErr)
|
||||
err = snapClient.CloneSnapshot(ctx, s.SubVolume)
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, err)
|
||||
|
||||
return cloneErr
|
||||
return err
|
||||
}
|
||||
|
||||
cloneState, cloneErr := s.GetCloneState(ctx)
|
||||
if cloneErr != nil {
|
||||
log.ErrorLog(ctx, "failed to get clone state: %v", cloneErr)
|
||||
var cloneState cephFSCloneState
|
||||
cloneState, err = s.GetCloneState(ctx)
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "failed to get clone state: %v", err)
|
||||
|
||||
return cloneErr
|
||||
return err
|
||||
}
|
||||
|
||||
err = cloneState.ToError()
|
||||
@ -157,8 +157,9 @@ func (s *subVolumeClient) CreateCloneFromSnapshot(
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
cloneState, err := s.GetCloneState(ctx)
|
||||
var cloneState cephFSCloneState
|
||||
// avoid err variable shadowing
|
||||
cloneState, err = s.GetCloneState(ctx)
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "failed to get clone state: %v", err)
|
||||
|
||||
|
@ -155,7 +155,7 @@ func (fs *Driver) Run(conf *util.Config) {
|
||||
fs.cs = NewControllerServer(fs.cd)
|
||||
}
|
||||
|
||||
// configre CSI-Addons server and components
|
||||
// configure CSI-Addons server and components
|
||||
err = fs.setupCSIAddonsServer(conf)
|
||||
if err != nil {
|
||||
log.FatalLogMsg(err.Error())
|
||||
|
@ -68,9 +68,9 @@ func (ns *NodeServer) getMountState(path string) (mountState, error) {
|
||||
return msNotMounted, nil
|
||||
}
|
||||
|
||||
func findMountinfo(mountpoint string, mis []mountutil.MountInfo) int {
|
||||
for i := range mis {
|
||||
if mis[i].MountPoint == mountpoint {
|
||||
func findMountinfo(mountpoint string, minfo []mountutil.MountInfo) int {
|
||||
for i := range minfo {
|
||||
if minfo[i].MountPoint == mountpoint {
|
||||
return i
|
||||
}
|
||||
}
|
||||
@ -80,9 +80,9 @@ func findMountinfo(mountpoint string, mis []mountutil.MountInfo) int {
|
||||
|
||||
// Ensures that given mountpoint is of specified fstype.
|
||||
// Returns true if fstype matches, or if no such mountpoint exists.
|
||||
func validateFsType(mountpoint, fsType string, mis []mountutil.MountInfo) bool {
|
||||
if idx := findMountinfo(mountpoint, mis); idx > 0 {
|
||||
mi := mis[idx]
|
||||
func validateFsType(mountpoint, fsType string, minfo []mountutil.MountInfo) bool {
|
||||
if idx := findMountinfo(mountpoint, minfo); idx > 0 {
|
||||
mi := minfo[idx]
|
||||
|
||||
if mi.FsType != fsType {
|
||||
return false
|
||||
|
@ -212,9 +212,12 @@ func (ac *activeClient) fetchIP() (string, error) {
|
||||
clientInfo := ac.Inst
|
||||
parts := strings.Fields(clientInfo)
|
||||
if len(parts) >= 2 {
|
||||
ip := strings.Split(parts[1], ":")[0]
|
||||
|
||||
return ip, nil
|
||||
lastColonIndex := strings.LastIndex(parts[1], ":")
|
||||
firstPart := parts[1][:lastColonIndex]
|
||||
ip := net.ParseIP(firstPart)
|
||||
if ip != nil {
|
||||
return ip.String(), nil
|
||||
}
|
||||
}
|
||||
|
||||
return "", fmt.Errorf("failed to extract IP address, incorrect format: %s", clientInfo)
|
||||
|
@ -68,6 +68,11 @@ func TestFetchIP(t *testing.T) {
|
||||
expectedIP: "172.21.9.34",
|
||||
expectedErr: false,
|
||||
},
|
||||
{
|
||||
clientInfo: "client.4305 2001:0db8:85a3:0000:0000:8a2e:0370:7334:0/422650892",
|
||||
expectedIP: "2001:db8:85a3::8a2e:370:7334",
|
||||
expectedErr: false,
|
||||
},
|
||||
{
|
||||
clientInfo: "",
|
||||
expectedIP: "",
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
|
||||
"github.com/ceph/ceph-csi/internal/rbd"
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
"github.com/ceph/ceph-csi/internal/util/k8s"
|
||||
"github.com/ceph/ceph-csi/internal/util/log"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
@ -68,14 +69,14 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer {
|
||||
func NewNodeServer(
|
||||
d *csicommon.CSIDriver,
|
||||
t string,
|
||||
topology map[string]string,
|
||||
crushLocationMap map[string]string,
|
||||
nodeLabels, topology, crushLocationMap map[string]string,
|
||||
) (*rbd.NodeServer, error) {
|
||||
ns := rbd.NodeServer{
|
||||
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
|
||||
VolumeLocks: util.NewVolumeLocks(),
|
||||
NodeLabels: nodeLabels,
|
||||
CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap),
|
||||
}
|
||||
ns.SetReadAffinityMapOptions(crushLocationMap)
|
||||
|
||||
return &ns, nil
|
||||
}
|
||||
@ -87,8 +88,8 @@ func NewNodeServer(
|
||||
// setupCSIAddonsServer().
|
||||
func (r *Driver) Run(conf *util.Config) {
|
||||
var (
|
||||
err error
|
||||
topology, crushLocationMap map[string]string
|
||||
err error
|
||||
nodeLabels, topology, crushLocationMap map[string]string
|
||||
)
|
||||
// update clone soft and hard limit
|
||||
rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth)
|
||||
@ -125,13 +126,17 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
})
|
||||
}
|
||||
|
||||
if conf.EnableReadAffinity {
|
||||
crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID)
|
||||
if k8s.RunsOnKubernetes() {
|
||||
nodeLabels, err = k8s.GetNodeLabels(conf.NodeID)
|
||||
if err != nil {
|
||||
log.FatalLogMsg(err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
if conf.EnableReadAffinity {
|
||||
crushLocationMap = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels)
|
||||
}
|
||||
|
||||
// Create GRPC servers
|
||||
r.ids = NewIdentityServer(r.cd)
|
||||
|
||||
@ -140,7 +145,7 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
if err != nil {
|
||||
log.FatalLogMsg(err.Error())
|
||||
}
|
||||
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap)
|
||||
r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap)
|
||||
if err != nil {
|
||||
log.FatalLogMsg("failed to start node server, err %v\n", err)
|
||||
}
|
||||
@ -165,7 +170,7 @@ func (r *Driver) Run(conf *util.Config) {
|
||||
r.cs.SetMetadata = conf.SetMetadata
|
||||
}
|
||||
|
||||
// configre CSI-Addons server and components
|
||||
// configure CSI-Addons server and components
|
||||
err = r.setupCSIAddonsServer(conf)
|
||||
if err != nil {
|
||||
log.FatalLogMsg(err.Error())
|
||||
|
@ -45,8 +45,10 @@ type NodeServer struct {
|
||||
// A map storing all volumes with ongoing operations so that additional operations
|
||||
// for that same volume (as defined by VolumeID) return an Aborted error
|
||||
VolumeLocks *util.VolumeLocks
|
||||
// readAffinityMapOptions contains map options to enable read affinity.
|
||||
readAffinityMapOptions string
|
||||
// NodeLabels stores the node labels
|
||||
NodeLabels map[string]string
|
||||
// CLIReadAffinityMapOptions contains map options passed through command line to enable read affinity.
|
||||
CLIReadAffinityMapOptions string
|
||||
}
|
||||
|
||||
// stageTransaction struct represents the state a transaction was when it either completed
|
||||
@ -258,11 +260,10 @@ func (ns *NodeServer) populateRbdVol(
|
||||
rv.Mounter = rbdNbdMounter
|
||||
}
|
||||
|
||||
err = getMapOptions(req, rv)
|
||||
err = ns.getMapOptions(req, rv)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ns.appendReadAffinityMapOptions(rv)
|
||||
|
||||
rv.VolID = volID
|
||||
|
||||
@ -280,14 +281,14 @@ func (ns *NodeServer) populateRbdVol(
|
||||
|
||||
// appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions
|
||||
// if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty.
|
||||
func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) {
|
||||
func (rv *rbdVolume) appendReadAffinityMapOptions(readAffinityMapOptions string) {
|
||||
switch {
|
||||
case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter:
|
||||
case readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter:
|
||||
return
|
||||
case rv.MapOptions != "":
|
||||
rv.MapOptions += "," + ns.readAffinityMapOptions
|
||||
rv.MapOptions += "," + readAffinityMapOptions
|
||||
default:
|
||||
rv.MapOptions = ns.readAffinityMapOptions
|
||||
rv.MapOptions = readAffinityMapOptions
|
||||
}
|
||||
}
|
||||
|
||||
@ -1378,22 +1379,3 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) {
|
||||
|
||||
return size, nil
|
||||
}
|
||||
|
||||
func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) {
|
||||
if len(crushLocationMap) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
b.WriteString("read_from_replica=localize,crush_location=")
|
||||
first := true
|
||||
for key, val := range crushLocationMap {
|
||||
if first {
|
||||
b.WriteString(fmt.Sprintf("%s:%s", key, val))
|
||||
first = false
|
||||
} else {
|
||||
b.WriteString(fmt.Sprintf("|%s:%s", key, val))
|
||||
}
|
||||
}
|
||||
ns.readAffinityMapOptions = b.String()
|
||||
}
|
||||
|
@ -18,8 +18,12 @@ package rbd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
@ -107,53 +111,6 @@ func TestParseBoolOption(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
crushLocationmap map[string]string
|
||||
wantAny []string
|
||||
}{
|
||||
{
|
||||
name: "nil crushLocationmap",
|
||||
crushLocationmap: nil,
|
||||
wantAny: []string{""},
|
||||
},
|
||||
{
|
||||
name: "empty crushLocationmap",
|
||||
crushLocationmap: map[string]string{},
|
||||
wantAny: []string{""},
|
||||
},
|
||||
{
|
||||
name: "single entry in crushLocationmap",
|
||||
crushLocationmap: map[string]string{
|
||||
"region": "east",
|
||||
},
|
||||
wantAny: []string{"read_from_replica=localize,crush_location=region:east"},
|
||||
},
|
||||
{
|
||||
name: "multiple entries in crushLocationmap",
|
||||
crushLocationmap: map[string]string{
|
||||
"region": "east",
|
||||
"zone": "east-1",
|
||||
},
|
||||
wantAny: []string{
|
||||
"read_from_replica=localize,crush_location=region:east|zone:east-1",
|
||||
"read_from_replica=localize,crush_location=zone:east-1|region:east",
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
currentTT := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
ns := &NodeServer{}
|
||||
ns.SetReadAffinityMapOptions(currentTT.crushLocationmap)
|
||||
assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
type input struct {
|
||||
@ -236,11 +193,128 @@ func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) {
|
||||
MapOptions: currentTT.args.mapOptions,
|
||||
Mounter: currentTT.args.mounter,
|
||||
}
|
||||
ns := &NodeServer{
|
||||
readAffinityMapOptions: currentTT.args.readAffinityMapOptions,
|
||||
}
|
||||
ns.appendReadAffinityMapOptions(rv)
|
||||
rv.appendReadAffinityMapOptions(currentTT.args.readAffinityMapOptions)
|
||||
assert.Equal(t, currentTT.want, rv.MapOptions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadAffinity_GetReadAffinityMapOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
nodeLabels := map[string]string{
|
||||
"topology.kubernetes.io/zone": "east-1",
|
||||
"topology.kubernetes.io/region": "east",
|
||||
}
|
||||
|
||||
csiConfig := []util.ClusterInfo{
|
||||
{
|
||||
ClusterID: "cluster-1",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: true,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.kubernetes.io/region",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-2",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: false,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.kubernetes.io/region",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-3",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: true,
|
||||
CrushLocationLabels: []string{},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-4",
|
||||
},
|
||||
}
|
||||
|
||||
csiConfigFileContent, err := json.Marshal(csiConfig)
|
||||
if err != nil {
|
||||
t.Errorf("failed to marshal csi config info %v", err)
|
||||
}
|
||||
tmpConfPath := util.CsiConfigFile
|
||||
err = os.Mkdir("/etc/ceph-csi-config", 0o600)
|
||||
if err != nil {
|
||||
t.Errorf("failed to create directory %s: %v", "/etc/ceph-csi-config", err)
|
||||
}
|
||||
err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600)
|
||||
if err != nil {
|
||||
t.Errorf("failed to write %s file content: %v", util.CsiConfigFile, err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
clusterID string
|
||||
CLICrushLocationLabels string
|
||||
want string
|
||||
}{
|
||||
{
|
||||
name: "Enabled in cluster-1 and Enabled in CLI",
|
||||
clusterID: "cluster-1",
|
||||
CLICrushLocationLabels: "topology.kubernetes.io/region",
|
||||
want: "read_from_replica=localize,crush_location=region:east",
|
||||
},
|
||||
{
|
||||
name: "Disabled in cluster-2 and Enabled in CLI",
|
||||
clusterID: "cluster-2",
|
||||
CLICrushLocationLabels: "topology.kubernetes.io/zone",
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "Enabled in cluster-3 with empty crush labels and Enabled in CLI",
|
||||
clusterID: "cluster-3",
|
||||
CLICrushLocationLabels: "topology.kubernetes.io/zone",
|
||||
want: "read_from_replica=localize,crush_location=zone:east-1",
|
||||
},
|
||||
{
|
||||
name: "Enabled in cluster-3 with empty crush labels and Disabled in CLI",
|
||||
clusterID: "cluster-3",
|
||||
CLICrushLocationLabels: "",
|
||||
want: "",
|
||||
},
|
||||
{
|
||||
name: "Absent in cluster-4 and Enabled in CLI",
|
||||
clusterID: "cluster-4",
|
||||
CLICrushLocationLabels: "topology.kubernetes.io/zone",
|
||||
want: "",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tc := tt
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
crushLocationMap := util.GetCrushLocationMap(tc.CLICrushLocationLabels, nodeLabels)
|
||||
ns := &NodeServer{
|
||||
CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap),
|
||||
}
|
||||
readAffinityMapOptions, err := util.GetReadAffinityMapOptions(
|
||||
tc.clusterID, ns.CLIReadAffinityMapOptions, nodeLabels,
|
||||
)
|
||||
if err != nil {
|
||||
assert.Fail(t, err.Error())
|
||||
}
|
||||
|
||||
assert.Equal(t, tc.want, readAffinityMapOptions)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -295,7 +295,7 @@ func parseMapOptions(mapOptions string) (string, string, error) {
|
||||
|
||||
// getMapOptions is a wrapper func, calls parse map/unmap funcs and feeds the
|
||||
// rbdVolume object.
|
||||
func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error {
|
||||
func (ns *NodeServer) getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error {
|
||||
krbdMapOptions, nbdMapOptions, err := parseMapOptions(req.GetVolumeContext()["mapOptions"])
|
||||
if err != nil {
|
||||
return err
|
||||
@ -312,6 +312,14 @@ func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error {
|
||||
rv.UnmapOptions = nbdUnmapOptions
|
||||
}
|
||||
|
||||
readAffinityMapOptions, err := util.GetReadAffinityMapOptions(
|
||||
rv.ClusterID, ns.CLIReadAffinityMapOptions, ns.NodeLabels,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
rv.appendReadAffinityMapOptions(readAffinityMapOptions)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -23,20 +23,15 @@ import (
|
||||
)
|
||||
|
||||
// GetCrushLocationMap returns the crush location map, determined from
|
||||
// the crush location labels and their values from the CO system.
|
||||
// the crush location labels and their values from the node labels passed in arg.
|
||||
// Expects crushLocationLabels in arg to be in the format "[prefix/]<name>,[prefix/]<name>,...",.
|
||||
// Returns map of crush location types with its array of associated values.
|
||||
func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) {
|
||||
func GetCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) map[string]string {
|
||||
if crushLocationLabels == "" {
|
||||
return nil, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
nodeLabels, err := k8sGetNodeLabels(nodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return getCrushLocationMap(crushLocationLabels, nodeLabels), nil
|
||||
return getCrushLocationMap(crushLocationLabels, nodeLabels)
|
||||
}
|
||||
|
||||
// getCrushLocationMap returns the crush location map, determined from
|
||||
|
@ -62,6 +62,11 @@ type ClusterInfo struct {
|
||||
// symlink filepath for the network namespace where we need to execute commands.
|
||||
NetNamespaceFilePath string `json:"netNamespaceFilePath"`
|
||||
} `json:"nfs"`
|
||||
// Read affinity map options
|
||||
ReadAffinity struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
} `json:"readAffinity"`
|
||||
}
|
||||
|
||||
// Expected JSON structure in the passed in config file is,
|
||||
@ -203,3 +208,21 @@ func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error)
|
||||
|
||||
return cluster.NFS.NetNamespaceFilePath, nil
|
||||
}
|
||||
|
||||
// GetCrushLocationLabels returns the `readAffinity.enabled` and `readAffinity.crushLocationLabels`
|
||||
// values from the CSI config for the given `clusterID`. If `readAffinity.enabled` is set to true
|
||||
// it returns `true` and `crushLocationLabels`, else returns `false` and an empty string.
|
||||
func GetCrushLocationLabels(pathToConfig, clusterID string) (bool, string, error) {
|
||||
cluster, err := readClusterInfo(pathToConfig, clusterID)
|
||||
if err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
|
||||
if !cluster.ReadAffinity.Enabled {
|
||||
return false, "", nil
|
||||
}
|
||||
|
||||
crushLocationLabels := strings.Join(cluster.ReadAffinity.CrushLocationLabels, ",")
|
||||
|
||||
return true, crushLocationLabels, nil
|
||||
}
|
||||
|
@ -365,3 +365,116 @@ func TestGetNFSNetNamespaceFilePath(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestGetReadAffinityOptions(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
clusterID string
|
||||
want struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}
|
||||
}{
|
||||
{
|
||||
name: "ReadAffinity enabled set to true for cluster-1",
|
||||
clusterID: "cluster-1",
|
||||
want: struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}{true, "topology.kubernetes.io/region,topology.kubernetes.io/zone,topology.io/rack"},
|
||||
},
|
||||
{
|
||||
name: "ReadAffinity enabled set to true for cluster-2",
|
||||
clusterID: "cluster-2",
|
||||
want: struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}{true, "topology.kubernetes.io/region"},
|
||||
},
|
||||
{
|
||||
name: "ReadAffinity enabled set to false for cluster-3",
|
||||
clusterID: "cluster-3",
|
||||
want: struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}{false, ""},
|
||||
},
|
||||
{
|
||||
name: "ReadAffinity option not set in cluster-4",
|
||||
clusterID: "cluster-4",
|
||||
want: struct {
|
||||
enabled bool
|
||||
labels string
|
||||
}{false, ""},
|
||||
},
|
||||
}
|
||||
|
||||
csiConfig := []ClusterInfo{
|
||||
{
|
||||
ClusterID: "cluster-1",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: true,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.kubernetes.io/region",
|
||||
"topology.kubernetes.io/zone",
|
||||
"topology.io/rack",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-2",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: true,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.kubernetes.io/region",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-3",
|
||||
ReadAffinity: struct {
|
||||
Enabled bool `json:"enabled"`
|
||||
CrushLocationLabels []string `json:"crushLocationLabels"`
|
||||
}{
|
||||
Enabled: false,
|
||||
CrushLocationLabels: []string{
|
||||
"topology.io/rack",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ClusterID: "cluster-4",
|
||||
},
|
||||
}
|
||||
csiConfigFileContent, err := json.Marshal(csiConfig)
|
||||
if err != nil {
|
||||
t.Errorf("failed to marshal csi config info %v", err)
|
||||
}
|
||||
tmpConfPath := t.TempDir() + "/ceph-csi.json"
|
||||
err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600)
|
||||
if err != nil {
|
||||
t.Errorf("failed to write %s file content: %v", CsiConfigFile, err)
|
||||
}
|
||||
for _, tt := range tests {
|
||||
tc := tt
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
enabled, labels, err := GetCrushLocationLabels(tmpConfPath, tc.clusterID)
|
||||
if err != nil {
|
||||
t.Errorf("GetCrushLocationLabels() error = %v", err)
|
||||
|
||||
return
|
||||
}
|
||||
if enabled != tc.want.enabled || labels != tc.want.labels {
|
||||
t.Errorf("GetCrushLocationLabels() = {%v %v} want %v", enabled, labels, tc.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
@ -337,7 +337,7 @@ func InitializeNode(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// FscryptUnlock unlocks possilby creating fresh fscrypt metadata
|
||||
// FscryptUnlock unlocks possibly creating fresh fscrypt metadata
|
||||
// iff a volume is encrypted. Otherwise return immediately Calling
|
||||
// this function requires that InitializeFscrypt ran once on this node.
|
||||
func Unlock(
|
||||
|
@ -25,8 +25,14 @@ import (
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
)
|
||||
|
||||
var kubeclient *kubernetes.Clientset
|
||||
|
||||
// NewK8sClient create kubernetes client.
|
||||
func NewK8sClient() (*kubernetes.Clientset, error) {
|
||||
if kubeclient != nil {
|
||||
return kubeclient, nil
|
||||
}
|
||||
|
||||
var cfg *rest.Config
|
||||
var err error
|
||||
cPath := os.Getenv("KUBERNETES_CONFIG_PATH")
|
||||
@ -46,5 +52,15 @@ func NewK8sClient() (*kubernetes.Clientset, error) {
|
||||
return nil, fmt.Errorf("failed to create client: %w", err)
|
||||
}
|
||||
|
||||
kubeclient = client
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// RunsOnKubernetes checks if the application is running within a Kubernetes cluster
|
||||
// by inspecting the presence of the KUBERNETES_SERVICE_HOST environment variable.
|
||||
func RunsOnKubernetes() bool {
|
||||
kubernetesServiceHost := os.Getenv("KUBERNETES_SERVICE_HOST")
|
||||
|
||||
return kubernetesServiceHost != ""
|
||||
}
|
||||
|
39
internal/util/k8s/node.go
Normal file
39
internal/util/k8s/node.go
Normal file
@ -0,0 +1,39 @@
|
||||
/*
|
||||
Copyright 2023 The CephCSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package k8s
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
func GetNodeLabels(nodeName string) (map[string]string, error) {
|
||||
client, err := NewK8sClient()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can not get node %q information, failed "+
|
||||
"to connect to Kubernetes: %w", nodeName, err)
|
||||
}
|
||||
|
||||
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err)
|
||||
}
|
||||
|
||||
return node.GetLabels(), nil
|
||||
}
|
76
internal/util/read_affinity.go
Normal file
76
internal/util/read_affinity.go
Normal file
@ -0,0 +1,76 @@
|
||||
/*
|
||||
Copyright 2023 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// ConstructReadAffinityMapOption constructs a read affinity map option based on the provided crushLocationMap.
|
||||
// It appends crush location labels in the format
|
||||
// "read_from_replica=localize,crush_location=label1:value1|label2:value2|...".
|
||||
func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string {
|
||||
if len(crushLocationMap) == 0 {
|
||||
return ""
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
b.WriteString("read_from_replica=localize,crush_location=")
|
||||
first := true
|
||||
for key, val := range crushLocationMap {
|
||||
if first {
|
||||
b.WriteString(fmt.Sprintf("%s:%s", key, val))
|
||||
first = false
|
||||
} else {
|
||||
b.WriteString(fmt.Sprintf("|%s:%s", key, val))
|
||||
}
|
||||
}
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
// GetReadAffinityMapOptions retrieves the readAffinityMapOptions from the CSI config file if it exists.
|
||||
// If not, it falls back to returning the `cliReadAffinityMapOptions` from the command line.
|
||||
// If neither of these options is available, it returns an empty string.
|
||||
func GetReadAffinityMapOptions(
|
||||
clusterID, cliReadAffinityMapOptions string, nodeLabels map[string]string,
|
||||
) (string, error) {
|
||||
var (
|
||||
err error
|
||||
configReadAffinityEnabled bool
|
||||
configCrushLocationLabels string
|
||||
)
|
||||
|
||||
configReadAffinityEnabled, configCrushLocationLabels, err = GetCrushLocationLabels(CsiConfigFile, clusterID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if !configReadAffinityEnabled {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
if configCrushLocationLabels == "" {
|
||||
return cliReadAffinityMapOptions, nil
|
||||
}
|
||||
|
||||
crushLocationMap := GetCrushLocationMap(configCrushLocationLabels, nodeLabels)
|
||||
readAffinityMapOptions := ConstructReadAffinityMapOption(crushLocationMap)
|
||||
|
||||
return readAffinityMapOptions, nil
|
||||
}
|
68
internal/util/read_affinity_test.go
Normal file
68
internal/util/read_affinity_test.go
Normal file
@ -0,0 +1,68 @@
|
||||
/*
|
||||
Copyright 2023 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestReadAffinity_ConstructReadAffinityMapOption(t *testing.T) {
|
||||
t.Parallel()
|
||||
tests := []struct {
|
||||
name string
|
||||
crushLocationmap map[string]string
|
||||
wantAny []string
|
||||
}{
|
||||
{
|
||||
name: "nil crushLocationmap",
|
||||
crushLocationmap: nil,
|
||||
wantAny: []string{""},
|
||||
},
|
||||
{
|
||||
name: "empty crushLocationmap",
|
||||
crushLocationmap: map[string]string{},
|
||||
wantAny: []string{""},
|
||||
},
|
||||
{
|
||||
name: "single entry in crushLocationmap",
|
||||
crushLocationmap: map[string]string{
|
||||
"region": "east",
|
||||
},
|
||||
wantAny: []string{"read_from_replica=localize,crush_location=region:east"},
|
||||
},
|
||||
{
|
||||
name: "multiple entries in crushLocationmap",
|
||||
crushLocationmap: map[string]string{
|
||||
"region": "east",
|
||||
"zone": "east-1",
|
||||
},
|
||||
wantAny: []string{
|
||||
"read_from_replica=localize,crush_location=region:east|zone:east-1",
|
||||
"read_from_replica=localize,crush_location=zone:east-1|region:east",
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
currentTT := tt
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
assert.Contains(t, currentTT.wantAny, ConstructReadAffinityMapOption(currentTT.crushLocationmap))
|
||||
})
|
||||
}
|
||||
}
|
@ -17,16 +17,14 @@ limitations under the License.
|
||||
package util
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util/k8s"
|
||||
"github.com/ceph/ceph-csi/internal/util/log"
|
||||
|
||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -34,21 +32,6 @@ const (
|
||||
labelSeparator string = ","
|
||||
)
|
||||
|
||||
func k8sGetNodeLabels(nodeName string) (map[string]string, error) {
|
||||
client, err := k8s.NewK8sClient()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("can not get node %q information, failed "+
|
||||
"to connect to Kubernetes: %w", nodeName, err)
|
||||
}
|
||||
|
||||
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err)
|
||||
}
|
||||
|
||||
return node.GetLabels(), nil
|
||||
}
|
||||
|
||||
// GetTopologyFromDomainLabels returns the CSI topology map, determined from
|
||||
// the domain labels and their values from the CO system
|
||||
// Expects domainLabels in arg to be in the format "[prefix/]<name>,[prefix/]<name>,...",.
|
||||
@ -82,7 +65,7 @@ func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map
|
||||
labelCount++
|
||||
}
|
||||
|
||||
nodeLabels, err := k8sGetNodeLabels(nodeName)
|
||||
nodeLabels, err := k8s.GetNodeLabels(nodeName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -56,6 +56,7 @@ RUN source /build.env \
|
||||
&& npm install @commitlint/cli@"${COMMITLINT_VERSION}" \
|
||||
&& popd \
|
||||
&& git config --global --add safe.directory ${CEPHCSIPATH} \
|
||||
&& go install github.com/augmentable-dev/tickgit/cmd/tickgit@latest \
|
||||
&& true
|
||||
|
||||
WORKDIR ${CEPHCSIPATH}
|
||||
|
Loading…
Reference in New Issue
Block a user