diff --git a/internal/csi-common/nodeserver-default.go b/internal/csi-common/nodeserver-default.go index d2f57421a..40206980d 100644 --- a/internal/csi-common/nodeserver-default.go +++ b/internal/csi-common/nodeserver-default.go @@ -29,6 +29,7 @@ import ( // DefaultNodeServer stores driver object. type DefaultNodeServer struct { + csi.UnimplementedNodeServer Driver *CSIDriver Type string Mounter mount.Interface diff --git a/internal/nfs/driver/driver.go b/internal/nfs/driver/driver.go index 8454ec664..cf7593d6e 100644 --- a/internal/nfs/driver/driver.go +++ b/internal/nfs/driver/driver.go @@ -20,6 +20,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/nfs/controller" "github.com/ceph/ceph-csi/internal/nfs/identity" + "github.com/ceph/ceph-csi/internal/nfs/nodeserver" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -43,27 +44,39 @@ func (fs *Driver) Run(conf *util.Config) { log.FatalLogMsg("failed to initialize CSI driver") } - cd.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ - csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, - csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, - csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, - csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, - csi.ControllerServiceCapability_RPC_CLONE_VOLUME, - }) - // VolumeCapabilities are validated by the CephFS Controller - cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ - csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, - csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, - csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER, - csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, - }) + if conf.IsControllerServer || !conf.IsNodeServer { + cd.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ + csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, + csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, + csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, + csi.ControllerServiceCapability_RPC_CLONE_VOLUME, + }) + // VolumeCapabilities are validated by the CephFS Controller + cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ + csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER, + csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, + }) + } // Create gRPC servers server := csicommon.NewNonBlockingGRPCServer() srv := csicommon.Servers{ IS: identity.NewIdentityServer(cd), - CS: controller.NewControllerServer(cd), } + + switch { + case conf.IsNodeServer: + srv.NS = nodeserver.NewNodeServer(cd, conf.Vtype) + case conf.IsControllerServer: + srv.CS = controller.NewControllerServer(cd) + default: + srv.NS = nodeserver.NewNodeServer(cd, conf.Vtype) + srv.CS = controller.NewControllerServer(cd) + } + server.Start(conf.Endpoint, conf.HistogramOption, srv, conf.EnableGRPCMetrics) if conf.EnableGRPCMetrics { log.WarningLogMsg("EnableGRPCMetrics is deprecated") diff --git a/internal/nfs/nodeserver/nodeserver.go b/internal/nfs/nodeserver/nodeserver.go new file mode 100644 index 000000000..cb0c70275 --- /dev/null +++ b/internal/nfs/nodeserver/nodeserver.go @@ -0,0 +1,284 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeserver + +import ( + "errors" + "fmt" + "os" + "strings" + + csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/container-storage-interface/spec/lib/go/csi" + "golang.org/x/net/context" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + mount "k8s.io/mount-utils" + netutil "k8s.io/utils/net" +) + +const ( + defaultMountPermission = os.FileMode(0o777) + // Address of the NFS server. + paramServer = "server" + paramShare = "share" + paramClusterID = "clusterID" +) + +// NodeServer struct of ceph CSI driver with supported methods of CSI +// node server spec. +type NodeServer struct { + csicommon.DefaultNodeServer +} + +// NewNodeServer initialize a node server for ceph CSI driver. +func NewNodeServer( + d *csicommon.CSIDriver, + t string, +) *NodeServer { + return &NodeServer{ + DefaultNodeServer: *csicommon.NewDefaultNodeServer(d, t, map[string]string{}), + } +} + +// NodePublishVolume mount the volume. +func (ns *NodeServer) NodePublishVolume( + ctx context.Context, + req *csi.NodePublishVolumeRequest, +) (*csi.NodePublishVolumeResponse, error) { + err := validateNodePublishVolumeRequest(req) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + volumeID := req.GetVolumeId() + volCap := req.GetVolumeCapability() + targetPath := req.GetTargetPath() + mountOptions := volCap.GetMount().GetMountFlags() + if req.GetReadonly() { + mountOptions = append(mountOptions, "ro") + } + + source, err := getSource(req.GetVolumeContext()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + clusterID := req.GetVolumeContext()[paramClusterID] + netNamespaceFilePath := "" + if clusterID != "" { + netNamespaceFilePath, err = util.GetNFSNetNamespaceFilePath( + util.CsiConfigFile, + clusterID) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + } + + err = ns.mountNFS(ctx, + volumeID, + source, + targetPath, + netNamespaceFilePath, + mountOptions) + if err != nil { + if os.IsPermission(err) { + return nil, status.Error(codes.PermissionDenied, err.Error()) + } + if strings.Contains(err.Error(), "invalid argument") { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + return nil, status.Error(codes.Internal, err.Error()) + } + log.DebugLog(ctx, "nfs: successfully mounted volume %q mount %q to %q succeeded", + volumeID, source, targetPath) + + return &csi.NodePublishVolumeResponse{}, nil +} + +// NodeUnpublishVolume unmount the volume. +func (ns *NodeServer) NodeUnpublishVolume( + ctx context.Context, + req *csi.NodeUnpublishVolumeRequest, +) (*csi.NodeUnpublishVolumeResponse, error) { + err := util.ValidateNodeUnpublishVolumeRequest(req) + if err != nil { + return nil, err + } + + volumeID := req.GetVolumeId() + targetPath := req.GetTargetPath() + log.DebugLog(ctx, "nfs: unmounting volume %s on %s", volumeID, targetPath) + err = mount.CleanupMountPoint(targetPath, ns.Mounter, true) + if err != nil { + return nil, status.Errorf(codes.Internal, "failed to unmount target %q: %v", + targetPath, err) + } + log.DebugLog(ctx, "nfs: successfully unbounded volume %q from %q", + volumeID, targetPath) + + return &csi.NodeUnpublishVolumeResponse{}, nil +} + +// NodeGetCapabilities returns the supported capabilities of the node server. +func (ns *NodeServer) NodeGetCapabilities( + ctx context.Context, + req *csi.NodeGetCapabilitiesRequest, +) (*csi.NodeGetCapabilitiesResponse, error) { + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: []*csi.NodeServiceCapability{ + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, + }, + }, + }, + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, + }, + }, + }, + }, + }, nil +} + +// NodeGetVolumeStats get volume stats. +func (ns *NodeServer) NodeGetVolumeStats( + ctx context.Context, + req *csi.NodeGetVolumeStatsRequest, +) (*csi.NodeGetVolumeStatsResponse, error) { + var err error + targetPath := req.GetVolumePath() + if targetPath == "" { + return nil, status.Error(codes.InvalidArgument, + fmt.Sprintf("targetpath %v is empty", targetPath)) + } + + stat, err := os.Stat(targetPath) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, + "failed to get stat for targetpath %q: %v", targetPath, err) + } + + if stat.Mode().IsDir() { + return csicommon.FilesystemNodeGetVolumeStats(ctx, ns.Mounter, targetPath) + } + + return nil, status.Errorf(codes.InvalidArgument, + "targetpath %q is not a directory or device", targetPath) +} + +// mountNFS mounts nfs volumes. +func (ns *NodeServer) mountNFS( + ctx context.Context, + volumeID, source, mountPoint, netNamespaceFilePath string, + mountOptions []string, +) error { + var ( + stderr string + err error + ) + + notMnt, err := ns.Mounter.IsLikelyNotMountPoint(mountPoint) + if err != nil { + if os.IsNotExist(err) { + err = os.MkdirAll(mountPoint, defaultMountPermission) + if err != nil { + return err + } + notMnt = true + } else { + return err + } + } + if !notMnt { + log.DebugLog(ctx, "nfs: volume is already mounted to %s", mountPoint) + + return nil + } + + args := []string{ + "-t", "nfs", + source, + mountPoint, + } + + if len(mountOptions) > 0 { + args = append(append(args, "-o"), mountOptions...) + } + + log.DefaultLog("nfs: mounting volumeID(%v) source(%s) targetPath(%s) mountflags(%v)", + volumeID, source, mountPoint, mountOptions) + if netNamespaceFilePath != "" { + _, stderr, err = util.ExecuteCommandWithNSEnter( + ctx, netNamespaceFilePath, "mount", args[:]...) + } else { + err = ns.Mounter.Mount(source, mountPoint, "nfs", mountOptions) + } + if err != nil { + return fmt.Errorf("nfs: failed to mount %q to %q : %w stderr: %q", + source, mountPoint, err, stderr) + } + if stderr != "" { + return fmt.Errorf("nfs: failed to mount %q to %q : stderr %q", + source, mountPoint, stderr) + } + + return err +} + +// validateNodePublishVolumeRequest validates node publish volume request. +func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { + switch { + case req.GetVolumeId() == "": + return errors.New("volume ID missing in request") + case req.GetVolumeCapability() == nil: + return errors.New("volume capability missing in request") + case req.GetTargetPath() == "": + return errors.New("target path missing in request") + } + + return nil +} + +// getSource validates volume context, extracts and returns source. +// This function expects `server` and `share` parameters to be set +// and validates for the same. +func getSource(volContext map[string]string) (string, error) { + server := volContext[paramServer] + if server == "" { + return "", fmt.Errorf("%v missing in request", paramServer) + } + baseDir := volContext[paramShare] + if baseDir == "" { + return "", fmt.Errorf("%v missing in request", paramShare) + } + + if netutil.IsIPv6String(server) { + // if server is IPv6, format to [IPv6]. + server = fmt.Sprintf("[%s]", server) + } + + return fmt.Sprintf("%s:%s", server, baseDir), nil +} diff --git a/internal/nfs/nodeserver/nodeserver_test.go b/internal/nfs/nodeserver/nodeserver_test.go new file mode 100644 index 000000000..e568dac6a --- /dev/null +++ b/internal/nfs/nodeserver/nodeserver_test.go @@ -0,0 +1,174 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodeserver + +import ( + "testing" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +func Test_validateNodePublishVolumeRequest(t *testing.T) { + t.Parallel() + type args struct { + req *csi.NodePublishVolumeRequest + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "passing testcase", + args: args{ + req: &csi.NodePublishVolumeRequest{ + VolumeId: "123", + TargetPath: "/target", + VolumeCapability: &csi.VolumeCapability{}, + }, + }, + wantErr: false, + }, + { + name: "missing VolumeId", + args: args{ + req: &csi.NodePublishVolumeRequest{ + VolumeId: "", + TargetPath: "/target", + VolumeCapability: &csi.VolumeCapability{}, + }, + }, + wantErr: true, + }, + { + name: "missing TargetPath", + args: args{ + req: &csi.NodePublishVolumeRequest{ + VolumeId: "123", + TargetPath: "", + VolumeCapability: &csi.VolumeCapability{}, + }, + }, + wantErr: true, + }, + { + name: "missing VolumeCapability", + args: args{ + req: &csi.NodePublishVolumeRequest{ + VolumeId: "123", + TargetPath: "/target", + VolumeCapability: nil, + }, + }, + wantErr: true, + }, + } + for _, tt := range tests { + currentTT := tt + t.Run(currentTT.name, func(t *testing.T) { + t.Parallel() + err := validateNodePublishVolumeRequest(currentTT.args.req) + if (err != nil) != currentTT.wantErr { + t.Errorf("validateNodePublishVoluemRequest() error = %v, wantErr %v", err, currentTT.wantErr) + } + }) + } +} + +func Test_getSource(t *testing.T) { + t.Parallel() + type args struct { + volContext map[string]string + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "hostname as address", + args: args{ + volContext: map[string]string{ + paramServer: "example.io", + paramShare: "/a", + }, + }, + want: "example.io:/a", + wantErr: false, + }, + { + name: "ipv4 address", + args: args{ + volContext: map[string]string{ + paramServer: "10.12.1.0", + paramShare: "/a", + }, + }, + want: "10.12.1.0:/a", + wantErr: false, + }, + { + name: "ipv6 address", + args: args{ + volContext: map[string]string{ + paramServer: "2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b", + paramShare: "/a", + }, + }, + want: "[2001:0db8:3c4d:0015:0000:0000:1a2f:1a2b]:/a", + wantErr: false, + }, + { + name: "missing server parameter", + args: args{ + volContext: map[string]string{ + paramServer: "", + paramShare: "/a", + }, + }, + want: "", + wantErr: true, + }, + { + name: "missing share parameter", + args: args{ + volContext: map[string]string{ + paramServer: "10.12.1.0", + paramShare: "", + }, + }, + want: "", + wantErr: true, + }, + } + for _, tt := range tests { + currentTT := tt + t.Run(currentTT.name, func(t *testing.T) { + t.Parallel() + got, err := getSource(currentTT.args.volContext) + if (err != nil) != currentTT.wantErr { + t.Errorf("getSource() error = %v, wantErr %v", err, currentTT.wantErr) + + return + } + if got != currentTT.want { + t.Errorf("getSource() = %v, want %v", got, currentTT.want) + } + }) + } +} diff --git a/internal/util/csiconfig.go b/internal/util/csiconfig.go index bdfe6d267..96c8915df 100644 --- a/internal/util/csiconfig.go +++ b/internal/util/csiconfig.go @@ -59,6 +59,11 @@ type ClusterInfo struct { // RadosNamespace is a rados namespace in the pool RadosNamespace string `json:"radosNamespace"` } `json:"rbd"` + // NFS contains NFS specific options + NFS struct { + // symlink filepath for the network namespace where we need to execute commands. + NetNamespaceFilePath string `json:"netNamespaceFilePath"` + } `json:"nfs"` } // Expected JSON structure in the passed in config file is, @@ -194,3 +199,13 @@ func GetCephFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, erro return cluster.CephFS.NetNamespaceFilePath, nil } + +// GetNFSNetNamespaceFilePath returns the netNamespaceFilePath for NFS volumes. +func GetNFSNetNamespaceFilePath(pathToConfig, clusterID string) (string, error) { + cluster, err := readClusterInfo(pathToConfig, clusterID) + if err != nil { + return "", err + } + + return cluster.NFS.NetNamespaceFilePath, nil +} diff --git a/internal/util/csiconfig_test.go b/internal/util/csiconfig_test.go index 4a37e588b..66b5c927d 100644 --- a/internal/util/csiconfig_test.go +++ b/internal/util/csiconfig_test.go @@ -291,3 +291,77 @@ func TestGetCephFSNetNamespaceFilePath(t *testing.T) { }) } } + +func TestGetNFSNetNamespaceFilePath(t *testing.T) { + t.Parallel() + tests := []struct { + name string + clusterID string + want string + }{ + { + name: "get NFS specific NetNamespaceFilePath for cluster-1", + clusterID: "cluster-1", + want: "/var/lib/kubelet/plugins/nfs.ceph.csi.com/cluster1-net", + }, + { + name: "get NFS specific NetNamespaceFilePath for cluster-2", + clusterID: "cluster-2", + want: "/var/lib/kubelet/plugins/nfs.ceph.csi.com/cluster2-net", + }, + { + name: "when NFS specific NetNamespaceFilePath is empty", + clusterID: "cluster-3", + want: "", + }, + } + + csiConfig := []ClusterInfo{ + { + ClusterID: "cluster-1", + Monitors: []string{"ip-1", "ip-2"}, + NFS: struct { + NetNamespaceFilePath string `json:"netNamespaceFilePath"` + }{ + NetNamespaceFilePath: "/var/lib/kubelet/plugins/nfs.ceph.csi.com/cluster1-net", + }, + }, + { + ClusterID: "cluster-2", + Monitors: []string{"ip-3", "ip-4"}, + NFS: struct { + NetNamespaceFilePath string `json:"netNamespaceFilePath"` + }{ + NetNamespaceFilePath: "/var/lib/kubelet/plugins/nfs.ceph.csi.com/cluster2-net", + }, + }, + { + ClusterID: "cluster-3", + Monitors: []string{"ip-5", "ip-6"}, + }, + } + csiConfigFileContent, err := json.Marshal(csiConfig) + if err != nil { + t.Errorf("failed to marshal csi config info %v", err) + } + tmpConfPath := t.TempDir() + "/ceph-csi.json" + err = os.WriteFile(tmpConfPath, csiConfigFileContent, 0o600) + if err != nil { + t.Errorf("failed to write %s file content: %v", CsiConfigFile, err) + } + for _, tt := range tests { + ts := tt + t.Run(ts.name, func(t *testing.T) { + t.Parallel() + got, err := GetNFSNetNamespaceFilePath(tmpConfPath, ts.clusterID) + if err != nil { + t.Errorf("GetNFSNetNamespaceFilePath() error = %v", err) + + return + } + if got != ts.want { + t.Errorf("GetNFSNetNamespaceFilePath() = %v, want %v", got, ts.want) + } + }) + } +}