diff --git a/internal/nfs/controller/controllerserver.go b/internal/nfs/controller/controllerserver.go new file mode 100644 index 000000000..ed31dde7c --- /dev/null +++ b/internal/nfs/controller/controllerserver.go @@ -0,0 +1,150 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + + "github.com/ceph/ceph-csi/internal/cephfs" + "github.com/ceph/ceph-csi/internal/cephfs/store" + fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" + csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/journal" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// Server struct of CEPH CSI driver with supported methods of CSI controller +// server spec. +type Server struct { + csi.UnimplementedControllerServer + + // backendServer handles the CephFS requests + backendServer *cephfs.ControllerServer +} + +// NewControllerServer initialize a controller server for ceph CSI driver. +func NewControllerServer(d *csicommon.CSIDriver) *Server { + // global instance of the volume journal, yuck + store.VolJournal = journal.NewCSIVolumeJournalWithNamespace(cephfs.CSIInstanceID, fsutil.RadosNamespace) + + return &Server{ + backendServer: cephfs.NewControllerServer(d), + } +} + +// ControllerGetCapabilities uses the CephFS backendServer to return the +// capabilities that were set in the Driver.Run() function. +func (cs *Server) ControllerGetCapabilities( + ctx context.Context, + req *csi.ControllerGetCapabilitiesRequest) (*csi.ControllerGetCapabilitiesResponse, error) { + return cs.backendServer.ControllerGetCapabilities(ctx, req) +} + +// ValidateVolumeCapabilities checks whether the volume capabilities requested +// are supported. +func (cs *Server) ValidateVolumeCapabilities( + ctx context.Context, + req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { + return cs.backendServer.ValidateVolumeCapabilities(ctx, req) +} + +// CreateVolume creates the backing subvolume and on any error cleans up any +// created entities. +func (cs *Server) CreateVolume( + ctx context.Context, + req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { + res, err := cs.backendServer.CreateVolume(ctx, req) + if err != nil { + return res, fmt.Errorf("failed to create CephFS volume: %w", err) + } + + backend := res.Volume + + log.DebugLog(ctx, "CephFS volume created: %s", backend.VolumeId) + + secret := req.GetSecrets() + cr, err := util.NewAdminCredentials(secret) + if err != nil { + log.ErrorLog(ctx, "failed to retrieve admin credentials: %v", err) + + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + defer cr.DeleteCredentials() + + nfsVolume, err := NewNFSVolume(backend.VolumeId) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + err = nfsVolume.Connect(cr) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err) + } + + err = nfsVolume.CreateExport(backend) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to create export: %v", err) + } + + log.DebugLog(ctx, "published NFS-export: %s", nfsVolume) + + // volume has been exported over NFS, set the "share" parameter to + // allow mounting + backend.VolumeContext["share"] = nfsVolume.GetExportPath() + + return &csi.CreateVolumeResponse{Volume: backend}, nil +} + +// DeleteVolume deletes the volume in backend and its reservation. +func (cs *Server) DeleteVolume( + ctx context.Context, + req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) { + secret := req.GetSecrets() + cr, err := util.NewAdminCredentials(secret) + if err != nil { + log.ErrorLog(ctx, "failed to retrieve admin credentials: %v", err) + + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + defer cr.DeleteCredentials() + + nfsVolume, err := NewNFSVolume(req.GetVolumeId()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + err = nfsVolume.Connect(cr) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err) + } + + err = nfsVolume.DeleteExport() + // TODO: if the export does not exist, but the backend does, delete the backend + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to delete export: %v", err) + } + + log.DebugLog(ctx, "deleted NFS-export: %s", nfsVolume) + + return cs.backendServer.DeleteVolume(ctx, req) +} diff --git a/internal/nfs/controller/volume.go b/internal/nfs/controller/volume.go new file mode 100644 index 000000000..8b22a7a28 --- /dev/null +++ b/internal/nfs/controller/volume.go @@ -0,0 +1,183 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "strings" + + "github.com/ceph/ceph-csi/internal/util" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +// NFSVolume presents the API for consumption by the CSI-controller to create, +// modify and delete the NFS-exported CephFS volume. Instances of this struct +// are short lived, they only exist as long as a CSI-procedure is active. +type NFSVolume struct { + volumeID string + clusterID string + mons string + + // TODO: drop in favor of a go-ceph connection + connected bool + cr *util.Credentials +} + +// NewNFSVolume create a new NFSVolume instance for the currently executing +// CSI-procedure. +func NewNFSVolume(volumeID string) (*NFSVolume, error) { + // TODO: validate volume.VolumeContext parameters + vi := util.CSIIdentifier{} + + err := vi.DecomposeCSIID(volumeID) + if err != nil { + return nil, fmt.Errorf("error decoding volume ID (%s): %w", volumeID, err) + } + + return &NFSVolume{ + volumeID: volumeID, + }, nil +} + +// String returns a simple/short representation of the NFSVolume. +func (nv *NFSVolume) String() string { + return nv.volumeID +} + +// Connect fetches cluster connection details (like MONs) and connects to the +// Ceph cluster. This uses go-ceph, so after Connect(), Destroy() should be +// called to cleanup resources. +func (nv *NFSVolume) Connect(cr *util.Credentials) error { + nv.cr = cr + + vi := util.CSIIdentifier{} + + err := vi.DecomposeCSIID(nv.volumeID) + if err != nil { + return fmt.Errorf("error decoding volume ID (%s): %w", nv.volumeID, err) + } + + nv.clusterID = vi.ClusterID + nv.mons, err = util.Mons(util.CsiConfigFile, vi.ClusterID) + if err != nil { + return fmt.Errorf("failed to get MONs for cluster (%s): %w", vi.ClusterID, err) + } + + nv.connected = true + + return nil +} + +// GetExportPath returns the path on the NFS-server that can be used for +// mounting. +func (nv *NFSVolume) GetExportPath() string { + return "/" + nv.volumeID +} + +// CreateExport takes the (CephFS) CSI-volume and instructs Ceph Mgr to create +// a new NFS-export for the volume on the Ceph managed NFS-server. +func (nv *NFSVolume) CreateExport(backend *csi.Volume) error { + if !nv.connected { + return fmt.Errorf("can not created export for %q: not connected", nv) + } + + fs := backend.VolumeContext["fsName"] + nfsCluster := backend.VolumeContext["nfsCluster"] + path := backend.VolumeContext["subvolumePath"] + + // ceph nfs export create cephfs ${FS} ${NFS} /${EXPORT} ${SUBVOL_PATH} + args := []string{ + "--id", nv.cr.ID, + "--keyfile=" + nv.cr.KeyFile, + "-m", nv.mons, + "nfs", + "export", + "create", + "cephfs", + fs, + nfsCluster, + nv.GetExportPath(), + path, + } + + // TODO: use new go-ceph API + _, stderr, err := util.ExecCommand(context.TODO(), "ceph", args...) + if err != nil { + return fmt.Errorf("executing ceph export command failed (%w): %s", err, stderr) + } + + return nil +} + +// TODO: store the NFSCluster ("CephNFS" name) in the journal? +func (nv *NFSVolume) getNFSCluster() (string, error) { + if !nv.connected { + return "", fmt.Errorf("can not get the NFSCluster for %q: not connected", nv) + } + + // ceph nfs cluster ls + // FIXME: with a single CephNFS, it only returns a single like + args := []string{ + "--id", nv.cr.ID, + "--keyfile=" + nv.cr.KeyFile, + "-m", nv.mons, + "nfs", + "cluster", + "ls", + } + + nfsCluster, _, err := util.ExecCommand(context.TODO(), "ceph", args...) + if err != nil { + return "", fmt.Errorf("executing ceph export command failed: %w", err) + } + + return strings.TrimSpace(nfsCluster), nil +} + +// DeleteExport removes the NFS-export from the Ceph managed NFS-server. +func (nv *NFSVolume) DeleteExport() error { + if !nv.connected { + return fmt.Errorf("can not delete export for %q: not connected", nv) + } + + nfsCluster, err := nv.getNFSCluster() + if err != nil { + return fmt.Errorf("failed to identify NFS cluster: %w", err) + } + + // ceph nfs export rm + args := []string{ + "--id", nv.cr.ID, + "--keyfile=" + nv.cr.KeyFile, + "-m", nv.mons, + "nfs", + "export", + "delete", + nfsCluster, + nv.GetExportPath(), + } + + // TODO: use new go-ceph API + _, stderr, err := util.ExecCommand(context.TODO(), "ceph", args...) + if err != nil { + return fmt.Errorf("executing ceph export command failed (%w): %s", err, stderr) + } + + return nil +} diff --git a/internal/nfs/driver/driver.go b/internal/nfs/driver/driver.go new file mode 100644 index 000000000..6fae5ef10 --- /dev/null +++ b/internal/nfs/driver/driver.go @@ -0,0 +1,77 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package driver + +import ( + csicommon "github.com/ceph/ceph-csi/internal/csi-common" + "github.com/ceph/ceph-csi/internal/nfs/controller" + "github.com/ceph/ceph-csi/internal/nfs/identity" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +// Driver contains the default identity and controller struct. +type Driver struct{} + +// NewDriver returns new ceph driver. +func NewDriver() *Driver { + return &Driver{} +} + +// Run start a non-blocking grpc controller,node and identityserver for +// ceph CSI driver which can serve multiple parallel requests. +func (fs *Driver) Run(conf *util.Config) { + // Initialize default library driver + cd := csicommon.NewCSIDriver(conf.DriverName, util.DriverVersion, conf.NodeID) + if cd == nil { + log.FatalLogMsg("failed to initialize CSI driver") + } + + cd.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{ + csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME, + csi.ControllerServiceCapability_RPC_SINGLE_NODE_MULTI_WRITER, + }) + // VolumeCapabilities are validated by the CephFS Controller + cd.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{ + csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER, + csi.VolumeCapability_AccessMode_SINGLE_NODE_WRITER, + csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER, + csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER, + }) + + // Create gRPC servers + server := csicommon.NewNonBlockingGRPCServer() + srv := csicommon.Servers{ + IS: identity.NewIdentityServer(cd), + CS: controller.NewControllerServer(cd), + } + server.Start(conf.Endpoint, conf.HistogramOption, srv, conf.EnableGRPCMetrics) + if conf.EnableGRPCMetrics { + log.WarningLogMsg("EnableGRPCMetrics is deprecated") + go util.StartMetricsServer(conf) + } + if conf.EnableProfiling { + if !conf.EnableGRPCMetrics { + go util.StartMetricsServer(conf) + } + log.DebugLogMsg("Registering profiling handler") + go util.EnableProfiling() + } + server.Wait() +} diff --git a/internal/nfs/identity/identityserver.go b/internal/nfs/identity/identityserver.go new file mode 100644 index 000000000..89bb0f508 --- /dev/null +++ b/internal/nfs/identity/identityserver.go @@ -0,0 +1,55 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package identity + +import ( + "context" + + csicommon "github.com/ceph/ceph-csi/internal/csi-common" + + "github.com/container-storage-interface/spec/lib/go/csi" +) + +// Server struct of ceph CSI driver with supported methods of CSI identity +// server spec. +type Server struct { + *csicommon.DefaultIdentityServer +} + +// NewIdentityServer initialize a identity server for ceph CSI driver. +func NewIdentityServer(d *csicommon.CSIDriver) *Server { + return &Server{ + DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d), + } +} + +// GetPluginCapabilities returns available capabilities of the ceph driver. +func (is *Server) GetPluginCapabilities( + ctx context.Context, + req *csi.GetPluginCapabilitiesRequest) (*csi.GetPluginCapabilitiesResponse, error) { + return &csi.GetPluginCapabilitiesResponse{ + Capabilities: []*csi.PluginCapability{ + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_CONTROLLER_SERVICE, + }, + }, + }, + }, + }, nil +}