diff --git a/internal/nfs/controller/controllerserver.go b/internal/nfs/controller/controllerserver.go index 65b2c3b02..c49c95d7a 100644 --- a/internal/nfs/controller/controllerserver.go +++ b/internal/nfs/controller/controllerserver.go @@ -100,6 +100,7 @@ func (cs *Server) CreateVolume( if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err) } + defer nfsVolume.Destroy() err = nfsVolume.CreateExport(backend) if err != nil { @@ -137,6 +138,7 @@ func (cs *Server) DeleteVolume( if err != nil { return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err) } + defer nfsVolume.Destroy() err = nfsVolume.DeleteExport() // TODO: if the export does not exist, but the backend does, delete the backend diff --git a/internal/nfs/controller/volume.go b/internal/nfs/controller/volume.go index 95e744b8b..076cbb42b 100644 --- a/internal/nfs/controller/volume.go +++ b/internal/nfs/controller/volume.go @@ -19,13 +19,21 @@ package controller import ( "context" "fmt" - "strings" + fscore "github.com/ceph/ceph-csi/internal/cephfs/core" + "github.com/ceph/ceph-csi/internal/cephfs/store" + fsutil "github.com/ceph/ceph-csi/internal/cephfs/util" "github.com/ceph/ceph-csi/internal/util" "github.com/container-storage-interface/spec/lib/go/csi" ) +const ( + // clusterNameKey is the key in OMAP that contains the name of the + // NFS-cluster. It will be prefixed with the journal configuration. + clusterNameKey = "nfs.cluster" +) + // NFSVolume presents the API for consumption by the CSI-controller to create, // modify and delete the NFS-exported CephFS volume. Instances of this struct // are short lived, they only exist as long as a CSI-procedure is active. @@ -33,19 +41,21 @@ type NFSVolume struct { // ctx is the context for this short living volume object ctx context.Context - volumeID string - clusterID string - mons string + volumeID string + clusterID string + mons string + fscID int64 + objectUUID string // TODO: drop in favor of a go-ceph connection - connected bool cr *util.Credentials + connected bool + conn *util.ClusterConnection } // NewNFSVolume create a new NFSVolume instance for the currently executing // CSI-procedure. func NewNFSVolume(ctx context.Context, volumeID string) (*NFSVolume, error) { - // TODO: validate volume.VolumeContext parameters vi := util.CSIIdentifier{} err := vi.DecomposeCSIID(volumeID) @@ -54,8 +64,12 @@ func NewNFSVolume(ctx context.Context, volumeID string) (*NFSVolume, error) { } return &NFSVolume{ - ctx: ctx, - volumeID: volumeID, + ctx: ctx, + volumeID: volumeID, + clusterID: vi.ClusterID, + fscID: vi.LocationID, + objectUUID: vi.ObjectUUID, + conn: &util.ClusterConnection{}, }, nil } @@ -68,26 +82,36 @@ func (nv *NFSVolume) String() string { // Ceph cluster. This uses go-ceph, so after Connect(), Destroy() should be // called to cleanup resources. func (nv *NFSVolume) Connect(cr *util.Credentials) error { + if nv.connected { + return nil + } + + var err error + nv.mons, err = util.Mons(util.CsiConfigFile, nv.clusterID) + if err != nil { + return fmt.Errorf("failed to get MONs for cluster (%s): %w", nv.clusterID, err) + } + + err = nv.conn.Connect(nv.mons, cr) + if err != nil { + return fmt.Errorf("failed to connect to cluster: %w", err) + } + nv.cr = cr - - vi := util.CSIIdentifier{} - - err := vi.DecomposeCSIID(nv.volumeID) - if err != nil { - return fmt.Errorf("error decoding volume ID (%s): %w", nv.volumeID, err) - } - - nv.clusterID = vi.ClusterID - nv.mons, err = util.Mons(util.CsiConfigFile, vi.ClusterID) - if err != nil { - return fmt.Errorf("failed to get MONs for cluster (%s): %w", vi.ClusterID, err) - } - nv.connected = true return nil } +// Destroy cleans up resources once the NFSVolume instance is not needed +// anymore. +func (nv *NFSVolume) Destroy() { + if nv.connected { + nv.conn.Destroy() + nv.connected = false + } +} + // GetExportPath returns the path on the NFS-server that can be used for // mounting. func (nv *NFSVolume) GetExportPath() string { @@ -105,6 +129,11 @@ func (nv *NFSVolume) CreateExport(backend *csi.Volume) error { nfsCluster := backend.VolumeContext["nfsCluster"] path := backend.VolumeContext["subvolumePath"] + err := nv.setNFSCluster(nfsCluster) + if err != nil { + return fmt.Errorf("failed to set NFS-cluster: %w", err) + } + // ceph nfs export create cephfs ${FS} ${NFS} /${EXPORT} ${SUBVOL_PATH} args := []string{ "--id", nv.cr.ID, @@ -129,31 +158,6 @@ func (nv *NFSVolume) CreateExport(backend *csi.Volume) error { return nil } -// TODO: store the NFSCluster ("CephNFS" name) in the journal? -func (nv *NFSVolume) getNFSCluster() (string, error) { - if !nv.connected { - return "", fmt.Errorf("can not get the NFSCluster for %q: not connected", nv) - } - - // ceph nfs cluster ls - // FIXME: with a single CephNFS, it only returns a single like - args := []string{ - "--id", nv.cr.ID, - "--keyfile=" + nv.cr.KeyFile, - "-m", nv.mons, - "nfs", - "cluster", - "ls", - } - - nfsCluster, _, err := util.ExecCommand(nv.ctx, "ceph", args...) - if err != nil { - return "", fmt.Errorf("executing ceph export command failed: %w", err) - } - - return strings.TrimSpace(nfsCluster), nil -} - // DeleteExport removes the NFS-export from the Ceph managed NFS-server. func (nv *NFSVolume) DeleteExport() error { if !nv.connected { @@ -185,3 +189,67 @@ func (nv *NFSVolume) DeleteExport() error { return nil } + +// getNFSCluster fetches the NFS-cluster name from the CephFS journal. +func (nv *NFSVolume) getNFSCluster() (string, error) { + if !nv.connected { + return "", fmt.Errorf("can not get NFS-cluster for %q: not connected", nv) + } + + fs := fscore.NewFileSystem(nv.conn) + fsName, err := fs.GetFsName(nv.ctx, nv.fscID) + if err != nil { + return "", fmt.Errorf("failed to get filesystem name for ID %x: %w", nv.fscID, err) + } + + mdPool, err := fs.GetMetadataPool(nv.ctx, fsName) + if err != nil { + return "", fmt.Errorf("failed to get metadata pool for %q: %w", fsName, err) + } + + // Connect to cephfs' default radosNamespace (csi) + j, err := store.VolJournal.Connect(nv.mons, fsutil.RadosNamespace, nv.cr) + if err != nil { + return "", fmt.Errorf("failed to connect to journal: %w", err) + } + defer j.Destroy() + + clusterName, err := j.FetchAttribute(nv.ctx, mdPool, nv.objectUUID, clusterNameKey) + if err != nil { + return "", fmt.Errorf("failed to get cluster name: %w", err) + } + + return clusterName, nil +} + +// setNFSCluster stores the NFS-cluster name in the CephFS journal. +func (nv *NFSVolume) setNFSCluster(clusterName string) error { + if !nv.connected { + return fmt.Errorf("can not set NFS-cluster for %q: not connected", nv) + } + + fs := fscore.NewFileSystem(nv.conn) + fsName, err := fs.GetFsName(nv.ctx, nv.fscID) + if err != nil { + return fmt.Errorf("failed to get filesystem name for ID %x: %w", nv.fscID, err) + } + + mdPool, err := fs.GetMetadataPool(nv.ctx, fsName) + if err != nil { + return fmt.Errorf("failed to get metadata pool for %q: %w", fsName, err) + } + + // Connect to cephfs' default radosNamespace (csi) + j, err := store.VolJournal.Connect(nv.mons, fsutil.RadosNamespace, nv.cr) + if err != nil { + return fmt.Errorf("failed to connect to journal: %w", err) + } + defer j.Destroy() + + err = j.StoreAttribute(nv.ctx, mdPool, nv.objectUUID, clusterNameKey, clusterName) + if err != nil { + return fmt.Errorf("failed to store cluster name: %w", err) + } + + return nil +}