nfs: add basic provisioner with create/delete procedures

These NFS Controller and Identity servers are the base for the new
provisioner. The functionality is currently extremely limited, follow-up
PRs will implement various CSI procedures.

CreateVolume is implemented with the bare minimum. This makes it
possible to create a volume, and mount it with the
kubernetes-csi/csi-driver-nfs NodePlugin.

DeleteVolume unexports the volume from the Ceph managed NFS-Ganesha
service. In case the Ceph cluster provides multiple NFS-Ganesha
deployments, things might not work as expected. This is going to be
addressed in follow-up improvements.

Lots of TODO comments need to be resolved before this can be declared
"production ready". Unit- and e2e-tests are missing as well.

Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2022-02-25 13:16:15 +01:00 committed by mergify[bot]
parent 87f87141be
commit 6d83df9cc9
4 changed files with 465 additions and 0 deletions

View File

@ -0,0 +1,150 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"github.com/ceph/ceph-csi/internal/cephfs"
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Server struct of CEPH CSI driver with supported methods of CSI controller
// server spec.
type Server struct {
csi.UnimplementedControllerServer
// backendServer handles the CephFS requests
backendServer *cephfs.ControllerServer
}
// NewControllerServer initialize a controller server for ceph CSI driver.
func NewControllerServer(d *csicommon.CSIDriver) *Server {
// global instance of the volume journal, yuck
store.VolJournal = journal.NewCSIVolumeJournalWithNamespace(cephfs.CSIInstanceID, fsutil.RadosNamespace)
return &Server{
backendServer: cephfs.NewControllerServer(d),
}
}
// ControllerGetCapabilities uses the CephFS backendServer to return the
// capabilities that were set in the Driver.Run() function.
func (cs *Server) ControllerGetCapabilities(
ctx context.Context,
req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) {
return cs.backendServer.ControllerGetCapabilities(ctx, req)
}
// ValidateVolumeCapabilities checks whether the volume capabilities requested
// are supported.
func (cs *Server) ValidateVolumeCapabilities(
ctx context.Context,
req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
return cs.backendServer.ValidateVolumeCapabilities(ctx, req)
}
// CreateVolume creates the backing subvolume and on any error cleans up any
// created entities.
func (cs *Server) CreateVolume(
ctx context.Context,
req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
res, err := cs.backendServer.CreateVolume(ctx, req)
if err != nil {
return res, fmt.Errorf("failed to create CephFS volume: %w", err)
}
backend := res.Volume
log.DebugLog(ctx, "CephFS volume created: %s", backend.VolumeId)
secret := req.GetSecrets()
cr, err := util.NewAdminCredentials(secret)
if err != nil {
log.ErrorLog(ctx, "failed to retrieve admin credentials: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer cr.DeleteCredentials()
nfsVolume, err := NewNFSVolume(backend.VolumeId)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
err = nfsVolume.Connect(cr)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err)
}
err = nfsVolume.CreateExport(backend)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to create export: %v", err)
}
log.DebugLog(ctx, "published NFS-export: %s", nfsVolume)
// volume has been exported over NFS, set the "share" parameter to
// allow mounting
backend.VolumeContext["share"] = nfsVolume.GetExportPath()
return &csi.CreateVolumeResponse{Volume: backend}, nil
}
// DeleteVolume deletes the volume in backend and its reservation.
func (cs *Server) DeleteVolume(
ctx context.Context,
req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
secret := req.GetSecrets()
cr, err := util.NewAdminCredentials(secret)
if err != nil {
log.ErrorLog(ctx, "failed to retrieve admin credentials: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer cr.DeleteCredentials()
nfsVolume, err := NewNFSVolume(req.GetVolumeId())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
err = nfsVolume.Connect(cr)
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err)
}
err = nfsVolume.DeleteExport()
// TODO: if the export does not exist, but the backend does, delete the backend
if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to delete export: %v", err)
}
log.DebugLog(ctx, "deleted NFS-export: %s", nfsVolume)
return cs.backendServer.DeleteVolume(ctx, req)
}

View File

@ -0,0 +1,183 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"strings"
"github.com/ceph/ceph-csi/internal/util"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// NFSVolume presents the API for consumption by the CSI-controller to create,
// modify and delete the NFS-exported CephFS volume. Instances of this struct
// are short lived, they only exist as long as a CSI-procedure is active.
type NFSVolume struct {
volumeID string
clusterID string
mons string
// TODO: drop in favor of a go-ceph connection
connected bool
cr *util.Credentials
}
// NewNFSVolume create a new NFSVolume instance for the currently executing
// CSI-procedure.
func NewNFSVolume(volumeID string) (*NFSVolume, error) {
// TODO: validate volume.VolumeContext parameters
vi := util.CSIIdentifier{}
err := vi.DecomposeCSIID(volumeID)
if err != nil {
return nil, fmt.Errorf("error decoding volume ID (%s): %w", volumeID, err)
}
return &NFSVolume{
volumeID: volumeID,
}, nil
}
// String returns a simple/short representation of the NFSVolume.
func (nv *NFSVolume) String() string {
return nv.volumeID
}
// Connect fetches cluster connection details (like MONs) and connects to the
// Ceph cluster. This uses go-ceph, so after Connect(), Destroy() should be
// called to cleanup resources.
func (nv *NFSVolume) Connect(cr *util.Credentials) error {
nv.cr = cr
vi := util.CSIIdentifier{}
err := vi.DecomposeCSIID(nv.volumeID)
if err != nil {
return fmt.Errorf("error decoding volume ID (%s): %w", nv.volumeID, err)
}
nv.clusterID = vi.ClusterID
nv.mons, err = util.Mons(util.CsiConfigFile, vi.ClusterID)
if err != nil {
return fmt.Errorf("failed to get MONs for cluster (%s): %w", vi.ClusterID, err)
}
nv.connected = true
return nil
}
// GetExportPath returns the path on the NFS-server that can be used for
// mounting.
func (nv *NFSVolume) GetExportPath() string {
return "/" + nv.volumeID
}
// CreateExport takes the (CephFS) CSI-volume and instructs Ceph Mgr to create
// a new NFS-export for the volume on the Ceph managed NFS-server.
func (nv *NFSVolume) CreateExport(backend *csi.Volume) error {
if !nv.connected {
return fmt.Errorf("can not created export for %q: not connected", nv)
}
fs := backend.VolumeContext["fsName"]
nfsCluster := backend.VolumeContext["nfsCluster"]
path := backend.VolumeContext["subvolumePath"]
// ceph nfs export create cephfs ${FS} ${NFS} /${EXPORT} ${SUBVOL_PATH}
args := []string{
"--id", nv.cr.ID,
"--keyfile=" + nv.cr.KeyFile,
"-m", nv.mons,
"nfs",
"export",
"create",
"cephfs",
fs,
nfsCluster,
nv.GetExportPath(),
path,
}
// TODO: use new go-ceph API
_, stderr, err := util.ExecCommand(context.TODO(), "ceph", args...)
if err != nil {
return fmt.Errorf("executing ceph export command failed (%w): %s", err, stderr)
}
return nil
}
// TODO: store the NFSCluster ("CephNFS" name) in the journal?
func (nv *NFSVolume) getNFSCluster() (string, error) {
if !nv.connected {
return "", fmt.Errorf("can not get the NFSCluster for %q: not connected", nv)
}
// ceph nfs cluster ls
// FIXME: with a single CephNFS, it only returns a single like
args := []string{
"--id", nv.cr.ID,
"--keyfile=" + nv.cr.KeyFile,
"-m", nv.mons,
"nfs",
"cluster",
"ls",
}
nfsCluster, _, err := util.ExecCommand(context.TODO(), "ceph", args...)
if err != nil {
return "", fmt.Errorf("executing ceph export command failed: %w", err)
}
return strings.TrimSpace(nfsCluster), nil
}
// DeleteExport removes the NFS-export from the Ceph managed NFS-server.
func (nv *NFSVolume) DeleteExport() error {
if !nv.connected {
return fmt.Errorf("can not delete export for %q: not connected", nv)
}
nfsCluster, err := nv.getNFSCluster()
if err != nil {
return fmt.Errorf("failed to identify NFS cluster: %w", err)
}
// ceph nfs export rm <cluster_id> <pseudo_path>
args := []string{
"--id", nv.cr.ID,
"--keyfile=" + nv.cr.KeyFile,
"-m", nv.mons,
"nfs",
"export",
"delete",
nfsCluster,
nv.GetExportPath(),
}
// TODO: use new go-ceph API
_, stderr, err := util.ExecCommand(context.TODO(), "ceph", args...)
if err != nil {
return fmt.Errorf("executing ceph export command failed (%w): %s", err, stderr)
}
return nil
}

View File

@ -0,0 +1,77 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/nfs/controller"
"github.com/ceph/ceph-csi/internal/nfs/identity"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// Driver contains the default identity and controller struct.
type Driver struct{}
// NewDriver returns new ceph driver.
func NewDriver() *Driver {
return &Driver{}
}
// Run start a non-blocking grpc controller,node and identityserver for
// ceph CSI driver which can serve multiple parallel requests.
func (fs *Driver) Run(conf *util.Config) {
// Initialize default library driver
cd := csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID)
if cd == nil {
log.FatalLogMsg("failed to initialize CSI driver")
}
cd.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER,
})
// VolumeCapabilities are validated by the CephFS Controller
cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
})
// Create gRPC servers
server := csicommon.NewNonBlockingGRPCServer()
srv := csicommon.Servers{
IS: identity.NewIdentityServer(cd),
CS: controller.NewControllerServer(cd),
}
server.Start(conf.Endpoint, conf.HistogramOption, srv, conf.EnableGRPCMetrics)
if conf.EnableGRPCMetrics {
log.WarningLogMsg("EnableGRPCMetrics is deprecated")
go util.StartMetricsServer(conf)
}
if conf.EnableProfiling {
if !conf.EnableGRPCMetrics {
go util.StartMetricsServer(conf)
}
log.DebugLogMsg("Registering profiling handler")
go util.EnableProfiling()
}
server.Wait()
}

View File

@ -0,0 +1,55 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package identity
import (
"context"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// Server struct of ceph CSI driver with supported methods of CSI identity
// server spec.
type Server struct {
*csicommon.DefaultIdentityServer
}
// NewIdentityServer initialize a identity server for ceph CSI driver.
func NewIdentityServer(d *csicommon.CSIDriver) *Server {
return &Server{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
}
}
// GetPluginCapabilities returns available capabilities of the ceph driver.
func (is *Server) GetPluginCapabilities(
ctx context.Context,
req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) {
return &csi.GetPluginCapabilitiesResponse{
Capabilities: []*csi.PluginCapability{
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_CONTROLLER_SERVICE,
},
},
},
},
}, nil
}