diff --git a/.github/workflows/tickgit.yaml b/.github/workflows/tickgit.yaml new file mode 100644 index 000000000..2b49b48eb --- /dev/null +++ b/.github/workflows/tickgit.yaml @@ -0,0 +1,18 @@ +--- +name: List TODO's +# yamllint disable-line rule:truthy +on: + push: + branches: + - devel + +permissions: + contents: read + +jobs: + tickgit: + name: tickgit + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - run: make containerized-test TARGET=tickgit diff --git a/Makefile b/Makefile index 11d6a2d3e..0fd0406e4 100644 --- a/Makefile +++ b/Makefile @@ -143,6 +143,10 @@ check-env: codespell: codespell --config scripts/codespell.conf + +tickgit: + tickgit $(CURDIR) + # # commitlint will do a rebase on top of GIT_SINCE when REBASE=1 is passed. # diff --git a/PendingReleaseNotes.md b/PendingReleaseNotes.md index 8a11ae3c8..3ee8d31d7 100644 --- a/PendingReleaseNotes.md +++ b/PendingReleaseNotes.md @@ -5,3 +5,8 @@ - Removed the deprecated grpc metrics flag's in [PR](https://github.com/ceph/ceph-csi/pull/4225) ## Features + +RBD + +- Support for configuring read affinity for individuals cluster within the ceph-csi-config + ConfigMap in [PR](https://github.com/ceph/ceph-csi/pull/4165) diff --git a/api/deploy/ocp/scc.go b/api/deploy/ocp/scc.go index 9bd6aa3ec..b0f277da4 100644 --- a/api/deploy/ocp/scc.go +++ b/api/deploy/ocp/scc.go @@ -41,7 +41,7 @@ type SecurityContextConstraintsValues struct { } // SecurityContextConstraintsDefaults can be used for generating deployment -// artifacts with defails values. +// artifacts with details values. var SecurityContextConstraintsDefaults = SecurityContextConstraintsValues{ Namespace: "ceph-csi", Deployer: "", diff --git a/charts/ceph-csi-rbd/values.yaml b/charts/ceph-csi-rbd/values.yaml index 0d43b1671..64b74e2ab 100644 --- a/charts/ceph-csi-rbd/values.yaml +++ b/charts/ceph-csi-rbd/values.yaml @@ -27,6 +27,11 @@ serviceAccounts: # - "" # rbd: # netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net" +# readAffinity: +# enabled: true +# crushLocationLabels: +# - topology.kubernetes.io/region +# - topology.kubernetes.io/zone csiConfig: [] # Configuration details of clusterID,PoolID and FscID mapping diff --git a/deploy/csi-config-map-sample.yaml b/deploy/csi-config-map-sample.yaml index b48e834a5..7f8653a58 100644 --- a/deploy/csi-config-map-sample.yaml +++ b/deploy/csi-config-map-sample.yaml @@ -32,6 +32,10 @@ kind: ConfigMap # path for the Ceph cluster identified by the , This will be used # by the RBD CSI plugin to execute the rbd map/unmap in the # network namespace specified by the "rbd.netNamespaceFilePath". +# The "readAffinity" fields are used to enable read affinity and pass the crush +# location map for the Ceph cluster identified by the cluster , +# enabling this will add +# "read_from_replica=localize,crush_location=" to the map option. # If a CSI plugin is using more than one Ceph cluster, repeat the section for # each such cluster in use. # NOTE: Changes to the configmap is automatically updated in the running pods, @@ -66,6 +70,15 @@ data: } "nfs": { "netNamespaceFilePath": "/plugins/nfs.csi.ceph.com/net", + }, + "readAffinity": { + "enabled": "false", + "crushLocationLabels": [ + "", + "" + ... + "" + ] } } ] diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index 9f22c1315..de76d5a1a 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -47,7 +47,7 @@ make image-cephcsi | `--maxsnapshotsonimage` | `450` | Maximum number of snapshots allowed on rbd image without flattening | | `--setmetadata` | `false` | Set metadata on volume | | `--enable-read-affinity` | `false` | enable read affinity | -| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ',' | +| `--crush-location-labels`| _empty_ | Kubernetes node labels that determine the CRUSH location the node belongs to, separated by ','.
`Note: These labels will be replaced if crush location labels are defined in the ceph-csi-config ConfigMap for the specific cluster.` | **Available volume parameters:** @@ -222,6 +222,12 @@ If enabled, this option will be added to all RBD volumes mapped by Ceph CSI. Well known labels can be found [here](https://kubernetes.io/docs/reference/labels-annotations-taints/). +Read affinity can be configured for individual clusters within the +`ceph-csi-config` ConfigMap. This allows configuring the crush location labels +for each ceph cluster separately. The crush location labels specified in the +ConfigMap will supersede those provided via command line argument +`--crush-location-labels`. + >Note: Label values will have all its dots `"."` normalized with dashes `"-"` in order for it to work with ceph CRUSH map. diff --git a/e2e/configmap.go b/e2e/configmap.go index 6eb366065..a52626af7 100644 --- a/e2e/configmap.go +++ b/e2e/configmap.go @@ -63,6 +63,16 @@ func createConfigMap(pluginPath string, c kubernetes.Interface, f *framework.Fra }{ RadosNamespace: radosNamespace, }, + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + crushLocationRegionLabel, + crushLocationZoneLabel, + }, + }, }} if upgradeTesting { subvolumegroup = "csi" diff --git a/e2e/utils.go b/e2e/utils.go index 67e413f5e..ff1618bf0 100644 --- a/e2e/utils.go +++ b/e2e/utils.go @@ -1226,7 +1226,7 @@ func validatePVCSnapshot( checkSumClone, chErrs[n] = calculateSHA512sum(f, &a, filePath, &opt) framework.Logf("checksum value for the clone is %s with pod name %s", checkSumClone, name) if chErrs[n] != nil { - framework.Logf("failed to calculte checksum for clone: %s", chErrs[n]) + framework.Logf("failed to calculate checksum for clone: %s", chErrs[n]) } if checkSumClone != checkSum { framework.Logf( diff --git a/internal/cephfs/core/clone.go b/internal/cephfs/core/clone.go index b36951439..6ec18db94 100644 --- a/internal/cephfs/core/clone.go +++ b/internal/cephfs/core/clone.go @@ -68,11 +68,10 @@ func (s *subVolumeClient) CreateCloneFromSubvolume( return err } - // if cloneErr is not nil we will delete the snapshot - var cloneErr error - defer func() { - if cloneErr != nil { + // if any error occurs while cloning, resizing or deleting the snapshot + // fails then we need to delete the clone and snapshot. + if err != nil && !cerrors.IsCloneRetryError(err) { if err = s.PurgeVolume(ctx, true); err != nil { log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, err) } @@ -81,18 +80,19 @@ func (s *subVolumeClient) CreateCloneFromSubvolume( } } }() - cloneErr = snapClient.CloneSnapshot(ctx, s.SubVolume) - if cloneErr != nil { - log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, cloneErr) + err = snapClient.CloneSnapshot(ctx, s.SubVolume) + if err != nil { + log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, err) - return cloneErr + return err } - cloneState, cloneErr := s.GetCloneState(ctx) - if cloneErr != nil { - log.ErrorLog(ctx, "failed to get clone state: %v", cloneErr) + var cloneState cephFSCloneState + cloneState, err = s.GetCloneState(ctx) + if err != nil { + log.ErrorLog(ctx, "failed to get clone state: %v", err) - return cloneErr + return err } err = cloneState.ToError() @@ -157,8 +157,9 @@ func (s *subVolumeClient) CreateCloneFromSnapshot( } } }() - - cloneState, err := s.GetCloneState(ctx) + var cloneState cephFSCloneState + // avoid err variable shadowing + cloneState, err = s.GetCloneState(ctx) if err != nil { log.ErrorLog(ctx, "failed to get clone state: %v", err) diff --git a/internal/cephfs/driver.go b/internal/cephfs/driver.go index 60e0dec65..951b71456 100644 --- a/internal/cephfs/driver.go +++ b/internal/cephfs/driver.go @@ -155,7 +155,7 @@ func (fs *Driver) Run(conf *util.Config) { fs.cs = NewControllerServer(fs.cd) } - // configre CSI-Addons server and components + // configure CSI-Addons server and components err = fs.setupCSIAddonsServer(conf) if err != nil { log.FatalLogMsg(err.Error()) diff --git a/internal/cephfs/fuserecovery.go b/internal/cephfs/fuserecovery.go index 0b249b017..279b0f79b 100644 --- a/internal/cephfs/fuserecovery.go +++ b/internal/cephfs/fuserecovery.go @@ -68,9 +68,9 @@ func (ns *NodeServer) getMountState(path string) (mountState, error) { return msNotMounted, nil } -func findMountinfo(mountpoint string, mis []mountutil.MountInfo) int { - for i := range mis { - if mis[i].MountPoint == mountpoint { +func findMountinfo(mountpoint string, minfo []mountutil.MountInfo) int { + for i := range minfo { + if minfo[i].MountPoint == mountpoint { return i } } @@ -80,9 +80,9 @@ func findMountinfo(mountpoint string, mis []mountutil.MountInfo) int { // Ensures that given mountpoint is of specified fstype. // Returns true if fstype matches, or if no such mountpoint exists. -func validateFsType(mountpoint, fsType string, mis []mountutil.MountInfo) bool { - if idx := findMountinfo(mountpoint, mis); idx > 0 { - mi := mis[idx] +func validateFsType(mountpoint, fsType string, minfo []mountutil.MountInfo) bool { + if idx := findMountinfo(mountpoint, minfo); idx > 0 { + mi := minfo[idx] if mi.FsType != fsType { return false diff --git a/internal/csi-addons/networkfence/fencing.go b/internal/csi-addons/networkfence/fencing.go index 3f7725cab..d8361e55c 100644 --- a/internal/csi-addons/networkfence/fencing.go +++ b/internal/csi-addons/networkfence/fencing.go @@ -212,9 +212,12 @@ func (ac *activeClient) fetchIP() (string, error) { clientInfo := ac.Inst parts := strings.Fields(clientInfo) if len(parts) >= 2 { - ip := strings.Split(parts[1], ":")[0] - - return ip, nil + lastColonIndex := strings.LastIndex(parts[1], ":") + firstPart := parts[1][:lastColonIndex] + ip := net.ParseIP(firstPart) + if ip != nil { + return ip.String(), nil + } } return "", fmt.Errorf("failed to extract IP address, incorrect format: %s", clientInfo) diff --git a/internal/csi-addons/networkfence/fencing_test.go b/internal/csi-addons/networkfence/fencing_test.go index bbe82120d..40242cf79 100644 --- a/internal/csi-addons/networkfence/fencing_test.go +++ b/internal/csi-addons/networkfence/fencing_test.go @@ -68,6 +68,11 @@ func TestFetchIP(t *testing.T) { expectedIP: "172.21.9.34", expectedErr: false, }, + { + clientInfo: "client.4305 2001:0db8:85a3:0000:0000:8a2e:0370:7334:0/422650892", + expectedIP: "2001:db8:85a3::8a2e:370:7334", + expectedErr: false, + }, { clientInfo: "", expectedIP: "", diff --git a/internal/rbd/driver/driver.go b/internal/rbd/driver/driver.go index f70a76a80..eda9f03be 100644 --- a/internal/rbd/driver/driver.go +++ b/internal/rbd/driver/driver.go @@ -26,6 +26,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" "github.com/container-storage-interface/spec/lib/go/csi" @@ -68,14 +69,14 @@ func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer { func NewNodeServer( d *csicommon.CSIDriver, t string, - topology map[string]string, - crushLocationMap map[string]string, + nodeLabels, topology, crushLocationMap map[string]string, ) (*rbd.NodeServer, error) { ns := rbd.NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), - VolumeLocks: util.NewVolumeLocks(), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), + VolumeLocks: util.NewVolumeLocks(), + NodeLabels: nodeLabels, + CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap), } - ns.SetReadAffinityMapOptions(crushLocationMap) return &ns, nil } @@ -87,8 +88,8 @@ func NewNodeServer( // setupCSIAddonsServer(). func (r *Driver) Run(conf *util.Config) { var ( - err error - topology, crushLocationMap map[string]string + err error + nodeLabels, topology, crushLocationMap map[string]string ) // update clone soft and hard limit rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth) @@ -125,13 +126,17 @@ func (r *Driver) Run(conf *util.Config) { }) } - if conf.EnableReadAffinity { - crushLocationMap, err = util.GetCrushLocationMap(conf.CrushLocationLabels, conf.NodeID) + if k8s.RunsOnKubernetes() { + nodeLabels, err = k8s.GetNodeLabels(conf.NodeID) if err != nil { log.FatalLogMsg(err.Error()) } } + if conf.EnableReadAffinity { + crushLocationMap = util.GetCrushLocationMap(conf.CrushLocationLabels, nodeLabels) + } + // Create GRPC servers r.ids = NewIdentityServer(r.cd) @@ -140,7 +145,7 @@ func (r *Driver) Run(conf *util.Config) { if err != nil { log.FatalLogMsg(err.Error()) } - r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology, crushLocationMap) + r.ns, err = NewNodeServer(r.cd, conf.Vtype, nodeLabels, topology, crushLocationMap) if err != nil { log.FatalLogMsg("failed to start node server, err %v\n", err) } @@ -165,7 +170,7 @@ func (r *Driver) Run(conf *util.Config) { r.cs.SetMetadata = conf.SetMetadata } - // configre CSI-Addons server and components + // configure CSI-Addons server and components err = r.setupCSIAddonsServer(conf) if err != nil { log.FatalLogMsg(err.Error()) diff --git a/internal/rbd/nodeserver.go b/internal/rbd/nodeserver.go index 1009fd25f..d7961e2bb 100644 --- a/internal/rbd/nodeserver.go +++ b/internal/rbd/nodeserver.go @@ -45,8 +45,10 @@ type NodeServer struct { // A map storing all volumes with ongoing operations so that additional operations // for that same volume (as defined by VolumeID) return an Aborted error VolumeLocks *util.VolumeLocks - // readAffinityMapOptions contains map options to enable read affinity. - readAffinityMapOptions string + // NodeLabels stores the node labels + NodeLabels map[string]string + // CLIReadAffinityMapOptions contains map options passed through command line to enable read affinity. + CLIReadAffinityMapOptions string } // stageTransaction struct represents the state a transaction was when it either completed @@ -258,11 +260,10 @@ func (ns *NodeServer) populateRbdVol( rv.Mounter = rbdNbdMounter } - err = getMapOptions(req, rv) + err = ns.getMapOptions(req, rv) if err != nil { return nil, err } - ns.appendReadAffinityMapOptions(rv) rv.VolID = volID @@ -280,14 +281,14 @@ func (ns *NodeServer) populateRbdVol( // appendReadAffinityMapOptions appends readAffinityMapOptions to mapOptions // if mounter is rbdDefaultMounter and readAffinityMapOptions is not empty. -func (ns NodeServer) appendReadAffinityMapOptions(rv *rbdVolume) { +func (rv *rbdVolume) appendReadAffinityMapOptions(readAffinityMapOptions string) { switch { - case ns.readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: + case readAffinityMapOptions == "" || rv.Mounter != rbdDefaultMounter: return case rv.MapOptions != "": - rv.MapOptions += "," + ns.readAffinityMapOptions + rv.MapOptions += "," + readAffinityMapOptions default: - rv.MapOptions = ns.readAffinityMapOptions + rv.MapOptions = readAffinityMapOptions } } @@ -1378,22 +1379,3 @@ func getDeviceSize(ctx context.Context, devicePath string) (uint64, error) { return size, nil } - -func (ns *NodeServer) SetReadAffinityMapOptions(crushLocationMap map[string]string) { - if len(crushLocationMap) == 0 { - return - } - - var b strings.Builder - b.WriteString("read_from_replica=localize,crush_location=") - first := true - for key, val := range crushLocationMap { - if first { - b.WriteString(fmt.Sprintf("%s:%s", key, val)) - first = false - } else { - b.WriteString(fmt.Sprintf("|%s:%s", key, val)) - } - } - ns.readAffinityMapOptions = b.String() -} diff --git a/internal/rbd/nodeserver_test.go b/internal/rbd/nodeserver_test.go index 822232c6f..bffc114dc 100644 --- a/internal/rbd/nodeserver_test.go +++ b/internal/rbd/nodeserver_test.go @@ -18,8 +18,12 @@ package rbd import ( "context" + "encoding/json" + "os" "testing" + "github.com/ceph/ceph-csi/internal/util" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/stretchr/testify/assert" ) @@ -107,53 +111,6 @@ func TestParseBoolOption(t *testing.T) { } } -func TestNodeServer_SetReadAffinityMapOptions(t *testing.T) { - t.Parallel() - tests := []struct { - name string - crushLocationmap map[string]string - wantAny []string - }{ - { - name: "nil crushLocationmap", - crushLocationmap: nil, - wantAny: []string{""}, - }, - { - name: "empty crushLocationmap", - crushLocationmap: map[string]string{}, - wantAny: []string{""}, - }, - { - name: "single entry in crushLocationmap", - crushLocationmap: map[string]string{ - "region": "east", - }, - wantAny: []string{"read_from_replica=localize,crush_location=region:east"}, - }, - { - name: "multiple entries in crushLocationmap", - crushLocationmap: map[string]string{ - "region": "east", - "zone": "east-1", - }, - wantAny: []string{ - "read_from_replica=localize,crush_location=region:east|zone:east-1", - "read_from_replica=localize,crush_location=zone:east-1|region:east", - }, - }, - } - for _, tt := range tests { - currentTT := tt - t.Run(tt.name, func(t *testing.T) { - t.Parallel() - ns := &NodeServer{} - ns.SetReadAffinityMapOptions(currentTT.crushLocationmap) - assert.Contains(t, currentTT.wantAny, ns.readAffinityMapOptions) - }) - } -} - func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) { t.Parallel() type input struct { @@ -236,11 +193,128 @@ func TestNodeServer_appendReadAffinityMapOptions(t *testing.T) { MapOptions: currentTT.args.mapOptions, Mounter: currentTT.args.mounter, } - ns := &NodeServer{ - readAffinityMapOptions: currentTT.args.readAffinityMapOptions, - } - ns.appendReadAffinityMapOptions(rv) + rv.appendReadAffinityMapOptions(currentTT.args.readAffinityMapOptions) assert.Equal(t, currentTT.want, rv.MapOptions) }) } } + +func TestReadAffinity_GetReadAffinityMapOptions(t *testing.T) { + t.Parallel() + + nodeLabels := map[string]string{ + "topology.kubernetes.io/zone": "east-1", + "topology.kubernetes.io/region": "east", + } + + csiConfig := []util.ClusterInfo{ + { + ClusterID: "cluster-1", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-2", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: false, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-3", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{}, + }, + }, + { + ClusterID: "cluster-4", + }, + } + + csiConfigFileContent, err := json.Marshal(csiConfig) + if err != nil { + t.Errorf("failed to marshal csi config info %v", err) + } + tmpConfPath := util.CsiConfigFile + err = os.Mkdir("/etc/ceph-csi-config", 0o600) + if err != nil { + t.Errorf("failed to create directory %s: %v", "/etc/ceph-csi-config", err) + } + err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) + if err != nil { + t.Errorf("failed to write %s file content: %v", util.CsiConfigFile, err) + } + + tests := []struct { + name string + clusterID string + CLICrushLocationLabels string + want string + }{ + { + name: "Enabled in cluster-1 and Enabled in CLI", + clusterID: "cluster-1", + CLICrushLocationLabels: "topology.kubernetes.io/region", + want: "read_from_replica=localize,crush_location=region:east", + }, + { + name: "Disabled in cluster-2 and Enabled in CLI", + clusterID: "cluster-2", + CLICrushLocationLabels: "topology.kubernetes.io/zone", + want: "", + }, + { + name: "Enabled in cluster-3 with empty crush labels and Enabled in CLI", + clusterID: "cluster-3", + CLICrushLocationLabels: "topology.kubernetes.io/zone", + want: "read_from_replica=localize,crush_location=zone:east-1", + }, + { + name: "Enabled in cluster-3 with empty crush labels and Disabled in CLI", + clusterID: "cluster-3", + CLICrushLocationLabels: "", + want: "", + }, + { + name: "Absent in cluster-4 and Enabled in CLI", + clusterID: "cluster-4", + CLICrushLocationLabels: "topology.kubernetes.io/zone", + want: "", + }, + } + + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + crushLocationMap := util.GetCrushLocationMap(tc.CLICrushLocationLabels, nodeLabels) + ns := &NodeServer{ + CLIReadAffinityMapOptions: util.ConstructReadAffinityMapOption(crushLocationMap), + } + readAffinityMapOptions, err := util.GetReadAffinityMapOptions( + tc.clusterID, ns.CLIReadAffinityMapOptions, nodeLabels, + ) + if err != nil { + assert.Fail(t, err.Error()) + } + + assert.Equal(t, tc.want, readAffinityMapOptions) + }) + } +} diff --git a/internal/rbd/rbd_attach.go b/internal/rbd/rbd_attach.go index c8d4701e2..37d121d79 100644 --- a/internal/rbd/rbd_attach.go +++ b/internal/rbd/rbd_attach.go @@ -295,7 +295,7 @@ func parseMapOptions(mapOptions string) (string, string, error) { // getMapOptions is a wrapper func, calls parse map/unmap funcs and feeds the // rbdVolume object. -func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { +func (ns *NodeServer) getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { krbdMapOptions, nbdMapOptions, err := parseMapOptions(req.GetVolumeContext()["mapOptions"]) if err != nil { return err @@ -312,6 +312,14 @@ func getMapOptions(req *csi.NodeStageVolumeRequest, rv *rbdVolume) error { rv.UnmapOptions = nbdUnmapOptions } + readAffinityMapOptions, err := util.GetReadAffinityMapOptions( + rv.ClusterID, ns.CLIReadAffinityMapOptions, ns.NodeLabels, + ) + if err != nil { + return err + } + rv.appendReadAffinityMapOptions(readAffinityMapOptions) + return nil } diff --git a/internal/util/crushlocation.go b/internal/util/crushlocation.go index 5f3751c33..bc68bf4e6 100644 --- a/internal/util/crushlocation.go +++ b/internal/util/crushlocation.go @@ -23,20 +23,15 @@ import ( ) // GetCrushLocationMap returns the crush location map, determined from -// the crush location labels and their values from the CO system. +// the crush location labels and their values from the node labels passed in arg. // Expects crushLocationLabels in arg to be in the format "[prefix/],[prefix/],...",. // Returns map of crush location types with its array of associated values. -func GetCrushLocationMap(crushLocationLabels, nodeName string) (map[string]string, error) { +func GetCrushLocationMap(crushLocationLabels string, nodeLabels map[string]string) map[string]string { if crushLocationLabels == "" { - return nil, nil + return nil } - nodeLabels, err := k8sGetNodeLabels(nodeName) - if err != nil { - return nil, err - } - - return getCrushLocationMap(crushLocationLabels, nodeLabels), nil + return getCrushLocationMap(crushLocationLabels, nodeLabels) } // getCrushLocationMap returns the crush location map, determined from diff --git a/internal/util/csiconfig.go b/internal/util/csiconfig.go index b39ce241d..abacab329 100644 --- a/internal/util/csiconfig.go +++ b/internal/util/csiconfig.go @@ -62,6 +62,11 @@ type ClusterInfo struct { // symlink filepath for the network namespace where we need to execute commands. NetNamespaceFilePath string `json:"netNamespaceFilePath"` } `json:"nfs"` + // Read affinity map options + ReadAffinity struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + } `json:"readAffinity"` } // Expected JSON structure in the passed in config file is, @@ -203,3 +208,21 @@ func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error) return cluster.NFS.NetNamespaceFilePath, nil } + +// GetCrushLocationLabels returns the `readAffinity.enabled` and `readAffinity.crushLocationLabels` +// values from the CSI config for the given `clusterID`. If `readAffinity.enabled` is set to true +// it returns `true` and `crushLocationLabels`, else returns `false` and an empty string. +func GetCrushLocationLabels(pathToConfig, clusterID string) (bool, string, error) { + cluster, err := readClusterInfo(pathToConfig, clusterID) + if err != nil { + return false, "", err + } + + if !cluster.ReadAffinity.Enabled { + return false, "", nil + } + + crushLocationLabels := strings.Join(cluster.ReadAffinity.CrushLocationLabels, ",") + + return true, crushLocationLabels, nil +} diff --git a/internal/util/csiconfig_test.go b/internal/util/csiconfig_test.go index 66b5c927d..40e1b4d5e 100644 --- a/internal/util/csiconfig_test.go +++ b/internal/util/csiconfig_test.go @@ -365,3 +365,116 @@ func TestGetNFSNetNamespaceFilePath(t *testing.T) { }) } } + +func TestGetReadAffinityOptions(t *testing.T) { + t.Parallel() + tests := []struct { + name string + clusterID string + want struct { + enabled bool + labels string + } + }{ + { + name: "ReadAffinity enabled set to true for cluster-1", + clusterID: "cluster-1", + want: struct { + enabled bool + labels string + }{true, "topology.kubernetes.io/region,topology.kubernetes.io/zone,topology.io/rack"}, + }, + { + name: "ReadAffinity enabled set to true for cluster-2", + clusterID: "cluster-2", + want: struct { + enabled bool + labels string + }{true, "topology.kubernetes.io/region"}, + }, + { + name: "ReadAffinity enabled set to false for cluster-3", + clusterID: "cluster-3", + want: struct { + enabled bool + labels string + }{false, ""}, + }, + { + name: "ReadAffinity option not set in cluster-4", + clusterID: "cluster-4", + want: struct { + enabled bool + labels string + }{false, ""}, + }, + } + + csiConfig := []ClusterInfo{ + { + ClusterID: "cluster-1", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + "topology.kubernetes.io/zone", + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-2", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: true, + CrushLocationLabels: []string{ + "topology.kubernetes.io/region", + }, + }, + }, + { + ClusterID: "cluster-3", + ReadAffinity: struct { + Enabled bool `json:"enabled"` + CrushLocationLabels []string `json:"crushLocationLabels"` + }{ + Enabled: false, + CrushLocationLabels: []string{ + "topology.io/rack", + }, + }, + }, + { + ClusterID: "cluster-4", + }, + } + csiConfigFileContent, err := json.Marshal(csiConfig) + if err != nil { + t.Errorf("failed to marshal csi config info %v", err) + } + tmpConfPath := t.TempDir() + "/ceph-csi.json" + err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) + if err != nil { + t.Errorf("failed to write %s file content: %v", CsiConfigFile, err) + } + for _, tt := range tests { + tc := tt + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + enabled, labels, err := GetCrushLocationLabels(tmpConfPath, tc.clusterID) + if err != nil { + t.Errorf("GetCrushLocationLabels() error = %v", err) + + return + } + if enabled != tc.want.enabled || labels != tc.want.labels { + t.Errorf("GetCrushLocationLabels() = {%v %v} want %v", enabled, labels, tc.want) + } + }) + } +} diff --git a/internal/util/fscrypt/fscrypt.go b/internal/util/fscrypt/fscrypt.go index 6f87b69d6..3b1d46b97 100644 --- a/internal/util/fscrypt/fscrypt.go +++ b/internal/util/fscrypt/fscrypt.go @@ -337,7 +337,7 @@ func InitializeNode(ctx context.Context) error { return nil } -// FscryptUnlock unlocks possilby creating fresh fscrypt metadata +// FscryptUnlock unlocks possibly creating fresh fscrypt metadata // iff a volume is encrypted. Otherwise return immediately Calling // this function requires that InitializeFscrypt ran once on this node. func Unlock( diff --git a/internal/util/k8s/client.go b/internal/util/k8s/client.go index 684fd7090..d5629fee5 100644 --- a/internal/util/k8s/client.go +++ b/internal/util/k8s/client.go @@ -25,8 +25,14 @@ import ( "k8s.io/client-go/tools/clientcmd" ) +var kubeclient *kubernetes.Clientset + // NewK8sClient create kubernetes client. func NewK8sClient() (*kubernetes.Clientset, error) { + if kubeclient != nil { + return kubeclient, nil + } + var cfg *rest.Config var err error cPath := os.Getenv("KUBERNETES_CONFIG_PATH") @@ -46,5 +52,15 @@ func NewK8sClient() (*kubernetes.Clientset, error) { return nil, fmt.Errorf("failed to create client: %w", err) } + kubeclient = client + return client, nil } + +// RunsOnKubernetes checks if the application is running within a Kubernetes cluster +// by inspecting the presence of the KUBERNETES_SERVICE_HOST environment variable. +func RunsOnKubernetes() bool { + kubernetesServiceHost := os.Getenv("KUBERNETES_SERVICE_HOST") + + return kubernetesServiceHost != "" +} diff --git a/internal/util/k8s/node.go b/internal/util/k8s/node.go new file mode 100644 index 000000000..ad7760cd8 --- /dev/null +++ b/internal/util/k8s/node.go @@ -0,0 +1,39 @@ +/* +Copyright 2023 The CephCSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package k8s + +import ( + "context" + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func GetNodeLabels(nodeName string) (map[string]string, error) { + client, err := NewK8sClient() + if err != nil { + return nil, fmt.Errorf("can not get node %q information, failed "+ + "to connect to Kubernetes: %w", nodeName, err) + } + + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) + } + + return node.GetLabels(), nil +} diff --git a/internal/util/read_affinity.go b/internal/util/read_affinity.go new file mode 100644 index 000000000..a62620892 --- /dev/null +++ b/internal/util/read_affinity.go @@ -0,0 +1,76 @@ +/* +Copyright 2023 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "fmt" + "strings" +) + +// ConstructReadAffinityMapOption constructs a read affinity map option based on the provided crushLocationMap. +// It appends crush location labels in the format +// "read_from_replica=localize,crush_location=label1:value1|label2:value2|...". +func ConstructReadAffinityMapOption(crushLocationMap map[string]string) string { + if len(crushLocationMap) == 0 { + return "" + } + + var b strings.Builder + b.WriteString("read_from_replica=localize,crush_location=") + first := true + for key, val := range crushLocationMap { + if first { + b.WriteString(fmt.Sprintf("%s:%s", key, val)) + first = false + } else { + b.WriteString(fmt.Sprintf("|%s:%s", key, val)) + } + } + + return b.String() +} + +// GetReadAffinityMapOptions retrieves the readAffinityMapOptions from the CSI config file if it exists. +// If not, it falls back to returning the `cliReadAffinityMapOptions` from the command line. +// If neither of these options is available, it returns an empty string. +func GetReadAffinityMapOptions( + clusterID, cliReadAffinityMapOptions string, nodeLabels map[string]string, +) (string, error) { + var ( + err error + configReadAffinityEnabled bool + configCrushLocationLabels string + ) + + configReadAffinityEnabled, configCrushLocationLabels, err = GetCrushLocationLabels(CsiConfigFile, clusterID) + if err != nil { + return "", err + } + + if !configReadAffinityEnabled { + return "", nil + } + + if configCrushLocationLabels == "" { + return cliReadAffinityMapOptions, nil + } + + crushLocationMap := GetCrushLocationMap(configCrushLocationLabels, nodeLabels) + readAffinityMapOptions := ConstructReadAffinityMapOption(crushLocationMap) + + return readAffinityMapOptions, nil +} diff --git a/internal/util/read_affinity_test.go b/internal/util/read_affinity_test.go new file mode 100644 index 000000000..89da72741 --- /dev/null +++ b/internal/util/read_affinity_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2023 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestReadAffinity_ConstructReadAffinityMapOption(t *testing.T) { + t.Parallel() + tests := []struct { + name string + crushLocationmap map[string]string + wantAny []string + }{ + { + name: "nil crushLocationmap", + crushLocationmap: nil, + wantAny: []string{""}, + }, + { + name: "empty crushLocationmap", + crushLocationmap: map[string]string{}, + wantAny: []string{""}, + }, + { + name: "single entry in crushLocationmap", + crushLocationmap: map[string]string{ + "region": "east", + }, + wantAny: []string{"read_from_replica=localize,crush_location=region:east"}, + }, + { + name: "multiple entries in crushLocationmap", + crushLocationmap: map[string]string{ + "region": "east", + "zone": "east-1", + }, + wantAny: []string{ + "read_from_replica=localize,crush_location=region:east|zone:east-1", + "read_from_replica=localize,crush_location=zone:east-1|region:east", + }, + }, + } + for _, tt := range tests { + currentTT := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + assert.Contains(t, currentTT.wantAny, ConstructReadAffinityMapOption(currentTT.crushLocationmap)) + }) + } +} diff --git a/internal/util/topology.go b/internal/util/topology.go index be99dbbe1..1f08ca6ff 100644 --- a/internal/util/topology.go +++ b/internal/util/topology.go @@ -17,16 +17,14 @@ limitations under the License. package util import ( - "context" "encoding/json" "fmt" "strings" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/ceph/ceph-csi/internal/util/k8s" "github.com/ceph/ceph-csi/internal/util/log" - - "github.com/container-storage-interface/spec/lib/go/csi" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) const ( @@ -34,21 +32,6 @@ const ( labelSeparator string = "," ) -func k8sGetNodeLabels(nodeName string) (map[string]string, error) { - client, err := k8s.NewK8sClient() - if err != nil { - return nil, fmt.Errorf("can not get node %q information, failed "+ - "to connect to Kubernetes: %w", nodeName, err) - } - - node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) - if err != nil { - return nil, fmt.Errorf("failed to get node %q information: %w", nodeName, err) - } - - return node.GetLabels(), nil -} - // GetTopologyFromDomainLabels returns the CSI topology map, determined from // the domain labels and their values from the CO system // Expects domainLabels in arg to be in the format "[prefix/],[prefix/],...",. @@ -82,7 +65,7 @@ func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map labelCount++ } - nodeLabels, err := k8sGetNodeLabels(nodeName) + nodeLabels, err := k8s.GetNodeLabels(nodeName) if err != nil { return nil, err } diff --git a/scripts/Dockerfile.test b/scripts/Dockerfile.test index 4efb764b9..d9d5a7e20 100644 --- a/scripts/Dockerfile.test +++ b/scripts/Dockerfile.test @@ -56,6 +56,7 @@ RUN source /build.env \ && npm install @commitlint/cli@"${COMMITLINT_VERSION}" \ && popd \ && git config --global --add safe.directory ${CEPHCSIPATH} \ + && go install github.com/augmentable-dev/tickgit/cmd/tickgit@latest \ && true WORKDIR ${CEPHCSIPATH}