util: add support for the nsenter

add support to run rbd map and mount -t
commands with the nsenter.

complete design of pod/multus network
is added here https://github.com/rook/rook/
blob/master/design/ceph/multus-network.md#csi-pods

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
(cherry picked from commit 7b2aef0d81)
This commit is contained in:
Madhu Rajanna 2022-03-31 18:59:33 +05:30 committed by mergify[bot]
parent 2790daac39
commit 3161a6b060
16 changed files with 211 additions and 5 deletions

View File

@ -31,6 +31,7 @@ spec:
priorityClassName: {{ .Values.nodeplugin.priorityClassName }}
{{- end }}
hostNetwork: true
hostPID: true
# to use e.g. Rook orchestrated cluster, and mons' FQDN is
# resolved through k8s service, set dns policy to cluster first
dnsPolicy: ClusterFirstWithHostNet

View File

@ -27,6 +27,7 @@ serviceAccounts:
# - "<MONValue2>"
# cephFS:
# subvolumeGroup: "csi"
# netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net"
csiConfig: []
# Set logging level for csi containers.

View File

@ -25,6 +25,7 @@ serviceAccounts:
# monitors:
# - "<MONValue1>"
# - "<MONValue2>"
# netNamespaceFilePath: "{{ .kubeletDir }}/plugins/{{ .driverName }}/net"
csiConfig: []
# Configuration details of clusterID,PoolID and FscID mapping

View File

@ -15,6 +15,7 @@ spec:
serviceAccountName: cephfs-csi-nodeplugin
priorityClassName: system-node-critical
hostNetwork: true
hostPID: true
# to use e.g. Rook orchestrated cluster, and mons' FQDN is
# resolved through k8s service, set dns policy to cluster first
dnsPolicy: ClusterFirstWithHostNet

View File

@ -50,6 +50,42 @@ kubectl replace -f ./csi-config-map-sample.yaml
Storage class and snapshot class, using `<cluster-id>` as the value for the
option `clusterID`, can now be created on the cluster.
## Running CephCSI with pod networking
The current problem with Pod Networking, is when a CephFS/RBD volume is mounted
in a pod using Ceph CSI and then the CSI CephFS/RBD plugin is restarted or
terminated (e.g. by restarting or deleting its DaemonSet), all operations on
the volume become blocked, even after restarting the CSI pods.
The only workaround is to restart the node where the Ceph CSI plugin pod was
restarted. This can be mitigated by running the `rbd map`/`mount -t` commands
in a different network namespace which does not get deleted when the CSI
CephFS/RBD plugin is restarted or terminated.
If someone wants to run the CephCSI with the pod networking they can still do
by setting the `netNamespaceFilePath`. If this path is set CephCSI will execute
the `rbd map`/`mount -t` commands after entering the [network
namespace](https://man7.org/linux/man-pages/man7/network_namespaces.7.html)
specified by `netNamespaceFilePath` with the
[nsenter](https://man7.org/linux/man-pages/man1/nsenter.1.html) command.
`netNamespaceFilePath` should point to the network namespace of some
long-running process, typically it would be a symlink to
`/proc/<long running process id>/ns/net`.
The long-running process can also be another pod which is a Daemonset which
never restarts. This Pod should only be stopped and restarted when a node is
stopped so that volume operations do not become blocked. The new DaemonSet pod
can contain a single container, responsible for holding its pod network alive.
It is used as a passthrough by the CephCSI plugin pod which when mounting or
mapping will use the network namespace of this pod.
Once the pod is created get its PID and create a symlink to
`/proc/<PID>/ns/net` in the hostPath volume shared with the csi-plugin pod and
specify the path in the `netNamespaceFilePath` option.
*Note* This Pod should have `hostPID: true` in the Pod Spec.
## Deploying the storage class
Once the plugin is successfully deployed, you'll need to customize

View File

@ -20,6 +20,12 @@ kind: ConfigMap
# NOTE: Make sure you don't add radosNamespace option to a currently in use
# configuration as it will cause issues.
# The field "cephFS.subvolumeGroup" is optional and defaults to "csi".
# The <netNamespaceFilePath#> fields are the various network namespace
# path for the Ceph cluster identified by the <cluster-id>, This will be used
# by the CSI plugin to execute the rbd map/unmap and mount -t commands in the
# network namespace specified by the <netNamespaceFilePath#>.
# If a CSI plugin is using more than one Ceph cluster, repeat the section for
# each such cluster in use.
# NOTE: Changes to the configmap is automatically updated in the running pods,
# thus restarting existing pods using the configmap is NOT required on edits
# to the configmap.
@ -37,6 +43,7 @@ data:
{
"clusterID": "<cluster-id>",
"radosNamespace": "<rados-namespace>",
"netNamespaceFilePath": "<kubeletRootPath>/plugins/rbd.csi.ceph.com/net",
"monitors": [
"<MONValue1>",
"<MONValue2>",

View File

@ -65,12 +65,20 @@ func mountFuse(ctx context.Context, mountPoint string, cr *util.Credentials, vol
if volOptions.FsName != "" {
args = append(args, "--client_mds_namespace="+volOptions.FsName)
}
var (
stderr string
err error
)
if volOptions.NetNamespaceFilePath != "" {
_, stderr, err = util.ExecuteCommandWithNSEnter(ctx, volOptions.NetNamespaceFilePath, "ceph-fuse", args[:]...)
} else {
_, stderr, err = util.ExecCommand(ctx, "ceph-fuse", args[:]...)
}
_, stderr, err := util.ExecCommand(ctx, "ceph-fuse", args[:]...)
if err != nil {
return fmt.Errorf("%w stderr: %s", err, stderr)
}
// Parse the output:
// We need "starting fuse" meaning the mount is ok
// and PID of the ceph-fuse daemon for unmount

View File

@ -51,7 +51,16 @@ func mountKernel(ctx context.Context, mountPoint string, cr *util.Credentials, v
args = append(args, "-o", optionsStr)
_, stderr, err := util.ExecCommand(ctx, "mount", args[:]...)
var (
stderr string
err error
)
if volOptions.NetNamespaceFilePath != "" {
_, stderr, err = util.ExecuteCommandWithNSEnter(ctx, volOptions.NetNamespaceFilePath, "mount", args[:]...)
} else {
_, stderr, err = util.ExecCommand(ctx, "mount", args[:]...)
}
if err != nil {
return fmt.Errorf("%w stderr: %s", err, stderr)
}

View File

@ -126,6 +126,11 @@ func (ns *NodeServer) NodeStageVolume(
}
defer volOptions.Destroy()
volOptions.NetNamespaceFilePath, err = util.GetNetNamespaceFilePath(util.CsiConfigFile, volOptions.ClusterID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
mnt, err := mounter.New(volOptions)
if err != nil {
log.ErrorLog(ctx, "failed to create mounter for volume %s: %v", volID, err)

View File

@ -50,6 +50,8 @@ type VolumeOptions struct {
ProvisionVolume bool `json:"provisionVolume"`
KernelMountOptions string `json:"kernelMountOptions"`
FuseMountOptions string `json:"fuseMountOptions"`
// Network namespace file path to execute nsenter command
NetNamespaceFilePath string
// conn is a connection to the Ceph cluster obtained from a ConnPool
conn *util.ClusterConnection

View File

@ -332,6 +332,10 @@ func (ns *NodeServer) NodeStageVolume(
}
defer rv.Destroy()
rv.NetNamespaceFilePath, err = util.GetNetNamespaceFilePath(util.CsiConfigFile, rv.ClusterID)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if isHealer {
err = healerStageTransaction(ctx, cr, rv, stagingParentPath)
if err != nil {

View File

@ -457,8 +457,17 @@ func createPath(ctx context.Context, volOpt *rbdVolume, device string, cr *util.
mapArgs = append(mapArgs, "--read-only")
}
// Execute map
stdout, stderr, err := util.ExecCommand(ctx, cli, mapArgs...)
var (
stdout string
stderr string
err error
)
if volOpt.NetNamespaceFilePath != "" {
stdout, stderr, err = util.ExecuteCommandWithNSEnter(ctx, volOpt.NetNamespaceFilePath, cli, mapArgs...)
} else {
stdout, stderr, err = util.ExecCommand(ctx, cli, mapArgs...)
}
if err != nil {
log.WarningLog(ctx, "rbd: map error %v, rbd output: %s", err, stderr)
// unmap rbd image if connection timeout

View File

@ -156,6 +156,8 @@ type rbdVolume struct {
LogStrategy string
VolName string
MonValueFromSecret string
// Network namespace file path to execute nsenter command
NetNamespaceFilePath string
// RequestedVolSize has the size of the volume requested by the user and
// this value will not be updated when doing getImageInfo() on rbdVolume.
RequestedVolSize int64

View File

@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"os"
"os/exec"
"time"
@ -32,6 +33,47 @@ import (
// InvalidPoolID used to denote an invalid pool.
const InvalidPoolID int64 = -1
// ExecuteCommandWithNSEnter executes passed in program with args with nsenter
// and returns separate stdout and stderr streams. In case ctx is not set to
// context.TODO(), the command will be logged after it was executed.
func ExecuteCommandWithNSEnter(ctx context.Context, netPath, program string, args ...string) (string, string, error) {
var (
sanitizedArgs = StripSecretInArgs(args)
stdoutBuf bytes.Buffer
stderrBuf bytes.Buffer
)
// check netPath exists
if _, err := os.Stat(netPath); err != nil {
return "", "", fmt.Errorf("failed to get stat for %s %w", netPath, err)
}
// nsenter --net=%s -- <program> <args>
args = append([]string{fmt.Sprintf("--net=%s", netPath), "--", program}, args...)
cmd := exec.Command("nsenter", args...) // #nosec:G204, commands executing not vulnerable.
cmd.Stdout = &stdoutBuf
cmd.Stderr = &stderrBuf
err := cmd.Run()
stdout := stdoutBuf.String()
stderr := stderrBuf.String()
if err != nil {
err = fmt.Errorf("an error (%w) occurred while running %s args: %v", err, program, sanitizedArgs)
if ctx != context.TODO() {
log.UsefulLog(ctx, "%s", err)
}
return stdout, stderr, err
}
if ctx != context.TODO() {
log.UsefulLog(ctx, "command succeeded: %s %v", program, sanitizedArgs)
}
return stdout, stderr, nil
}
// ExecCommand executes passed in program with args and returns separate stdout
// and stderr streams. In case ctx is not set to context.TODO(), the command
// will be logged after it was executed.

View File

@ -49,6 +49,8 @@ type ClusterInfo struct {
// SubvolumeGroup contains the name of the SubvolumeGroup for CSI volumes
SubvolumeGroup string `json:"subvolumeGroup"`
} `json:"cephFS"`
// symlink filepath for the network namespace where we need to execute commands.
NetNamespaceFilePath string `json:"netNamespaceFilePath"`
}
// Expected JSON structure in the passed in config file is,
@ -161,3 +163,12 @@ func GetClusterID(options map[string]string) (string, error) {
return clusterID, nil
}
func GetNetNamespaceFilePath(pathToConfig, clusterID string) (string, error) {
cluster, err := readClusterInfo(pathToConfig, clusterID)
if err != nil {
return "", err
}
return cluster.NetNamespaceFilePath, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package util
import (
"encoding/json"
"os"
"testing"
)
@ -138,3 +139,69 @@ func TestCSIConfig(t *testing.T) {
t.Errorf("Test setup error %s", err)
}
}
func TestGetNetNamespaceFilePath(t *testing.T) {
t.Parallel()
tests := []struct {
name string
clusterID string
want string
}{
{
name: "get NetNamespaceFilePath for cluster-1",
clusterID: "cluster-1",
want: "/var/lib/kubelet/plugins/rbd.ceph.csi.com/cluster1-net",
},
{
name: "get NetNamespaceFilePath for cluster-2",
clusterID: "cluster-2",
want: "/var/lib/kubelet/plugins/rbd.ceph.csi.com/cluster2-net",
},
{
name: "when NetNamespaceFilePath is empty",
clusterID: "cluster-3",
want: "",
},
}
csiConfig := []ClusterInfo{
{
ClusterID: "cluster-1",
Monitors: []string{"ip-1", "ip-2"},
NetNamespaceFilePath: "/var/lib/kubelet/plugins/rbd.ceph.csi.com/cluster1-net",
},
{
ClusterID: "cluster-2",
Monitors: []string{"ip-3", "ip-4"},
NetNamespaceFilePath: "/var/lib/kubelet/plugins/rbd.ceph.csi.com/cluster2-net",
},
{
ClusterID: "cluster-3",
Monitors: []string{"ip-5", "ip-6"},
},
}
csiConfigFileContent, err := json.Marshal(csiConfig)
if err != nil {
t.Errorf("failed to marshal csi config info %v", err)
}
tmpConfPath := t.TempDir() + "/ceph-csi.json"
err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600)
if err != nil {
t.Errorf("failed to write %s file content: %v", CsiConfigFile, err)
}
for _, tt := range tests {
ts := tt
t.Run(ts.name, func(t *testing.T) {
t.Parallel()
got, err := GetNetNamespaceFilePath(tmpConfPath, ts.clusterID)
if err != nil {
t.Errorf("GetNetNamespaceFilePath() error = %v", err)
return
}
if got != ts.want {
t.Errorf("GetNetNamespaceFilePath() = %v, want %v", got, ts.want)
}
})
}
}