nfs: store the NFS-cluster name in the journal

Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2022-03-18 10:51:37 +01:00 committed by mergify[bot]
parent 3b4d193ca8
commit 885295fcc9
2 changed files with 117 additions and 47 deletions

View File

@ -100,6 +100,7 @@ func (cs *Server) CreateVolume(
if err != nil { if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err) return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err)
} }
defer nfsVolume.Destroy()
err = nfsVolume.CreateExport(backend) err = nfsVolume.CreateExport(backend)
if err != nil { if err != nil {
@ -137,6 +138,7 @@ func (cs *Server) DeleteVolume(
if err != nil { if err != nil {
return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err) return nil, status.Errorf(codes.InvalidArgument, "failed to connect: %v", err)
} }
defer nfsVolume.Destroy()
err = nfsVolume.DeleteExport() err = nfsVolume.DeleteExport()
// TODO: if the export does not exist, but the backend does, delete the backend // TODO: if the export does not exist, but the backend does, delete the backend

View File

@ -19,13 +19,21 @@ package controller
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
fscore "github.com/ceph/ceph-csi/internal/cephfs/core"
"github.com/ceph/ceph-csi/internal/cephfs/store"
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
) )
const (
// clusterNameKey is the key in OMAP that contains the name of the
// NFS-cluster. It will be prefixed with the journal configuration.
clusterNameKey = "nfs.cluster"
)
// NFSVolume presents the API for consumption by the CSI-controller to create, // NFSVolume presents the API for consumption by the CSI-controller to create,
// modify and delete the NFS-exported CephFS volume. Instances of this struct // modify and delete the NFS-exported CephFS volume. Instances of this struct
// are short lived, they only exist as long as a CSI-procedure is active. // are short lived, they only exist as long as a CSI-procedure is active.
@ -36,16 +44,18 @@ type NFSVolume struct {
volumeID string volumeID string
clusterID string clusterID string
mons string mons string
fscID int64
objectUUID string
// TODO: drop in favor of a go-ceph connection // TODO: drop in favor of a go-ceph connection
connected bool
cr *util.Credentials cr *util.Credentials
connected bool
conn *util.ClusterConnection
} }
// NewNFSVolume create a new NFSVolume instance for the currently executing // NewNFSVolume create a new NFSVolume instance for the currently executing
// CSI-procedure. // CSI-procedure.
func NewNFSVolume(ctx context.Context, volumeID string) (*NFSVolume, error) { func NewNFSVolume(ctx context.Context, volumeID string) (*NFSVolume, error) {
// TODO: validate volume.VolumeContext parameters
vi := util.CSIIdentifier{} vi := util.CSIIdentifier{}
err := vi.DecomposeCSIID(volumeID) err := vi.DecomposeCSIID(volumeID)
@ -56,6 +66,10 @@ func NewNFSVolume(ctx context.Context, volumeID string) (*NFSVolume, error) {
return &NFSVolume{ return &NFSVolume{
ctx: ctx, ctx: ctx,
volumeID: volumeID, volumeID: volumeID,
clusterID: vi.ClusterID,
fscID: vi.LocationID,
objectUUID: vi.ObjectUUID,
conn: &util.ClusterConnection{},
}, nil }, nil
} }
@ -68,26 +82,36 @@ func (nv *NFSVolume) String() string {
// Ceph cluster. This uses go-ceph, so after Connect(), Destroy() should be // Ceph cluster. This uses go-ceph, so after Connect(), Destroy() should be
// called to cleanup resources. // called to cleanup resources.
func (nv *NFSVolume) Connect(cr *util.Credentials) error { func (nv *NFSVolume) Connect(cr *util.Credentials) error {
if nv.connected {
return nil
}
var err error
nv.mons, err = util.Mons(util.CsiConfigFile, nv.clusterID)
if err != nil {
return fmt.Errorf("failed to get MONs for cluster (%s): %w", nv.clusterID, err)
}
err = nv.conn.Connect(nv.mons, cr)
if err != nil {
return fmt.Errorf("failed to connect to cluster: %w", err)
}
nv.cr = cr nv.cr = cr
vi := util.CSIIdentifier{}
err := vi.DecomposeCSIID(nv.volumeID)
if err != nil {
return fmt.Errorf("error decoding volume ID (%s): %w", nv.volumeID, err)
}
nv.clusterID = vi.ClusterID
nv.mons, err = util.Mons(util.CsiConfigFile, vi.ClusterID)
if err != nil {
return fmt.Errorf("failed to get MONs for cluster (%s): %w", vi.ClusterID, err)
}
nv.connected = true nv.connected = true
return nil return nil
} }
// Destroy cleans up resources once the NFSVolume instance is not needed
// anymore.
func (nv *NFSVolume) Destroy() {
if nv.connected {
nv.conn.Destroy()
nv.connected = false
}
}
// GetExportPath returns the path on the NFS-server that can be used for // GetExportPath returns the path on the NFS-server that can be used for
// mounting. // mounting.
func (nv *NFSVolume) GetExportPath() string { func (nv *NFSVolume) GetExportPath() string {
@ -105,6 +129,11 @@ func (nv *NFSVolume) CreateExport(backend *csi.Volume) error {
nfsCluster := backend.VolumeContext["nfsCluster"] nfsCluster := backend.VolumeContext["nfsCluster"]
path := backend.VolumeContext["subvolumePath"] path := backend.VolumeContext["subvolumePath"]
err := nv.setNFSCluster(nfsCluster)
if err != nil {
return fmt.Errorf("failed to set NFS-cluster: %w", err)
}
// ceph nfs export create cephfs ${FS} ${NFS} /${EXPORT} ${SUBVOL_PATH} // ceph nfs export create cephfs ${FS} ${NFS} /${EXPORT} ${SUBVOL_PATH}
args := []string{ args := []string{
"--id", nv.cr.ID, "--id", nv.cr.ID,
@ -129,31 +158,6 @@ func (nv *NFSVolume) CreateExport(backend *csi.Volume) error {
return nil return nil
} }
// TODO: store the NFSCluster ("CephNFS" name) in the journal?
func (nv *NFSVolume) getNFSCluster() (string, error) {
if !nv.connected {
return "", fmt.Errorf("can not get the NFSCluster for %q: not connected", nv)
}
// ceph nfs cluster ls
// FIXME: with a single CephNFS, it only returns a single like
args := []string{
"--id", nv.cr.ID,
"--keyfile=" + nv.cr.KeyFile,
"-m", nv.mons,
"nfs",
"cluster",
"ls",
}
nfsCluster, _, err := util.ExecCommand(nv.ctx, "ceph", args...)
if err != nil {
return "", fmt.Errorf("executing ceph export command failed: %w", err)
}
return strings.TrimSpace(nfsCluster), nil
}
// DeleteExport removes the NFS-export from the Ceph managed NFS-server. // DeleteExport removes the NFS-export from the Ceph managed NFS-server.
func (nv *NFSVolume) DeleteExport() error { func (nv *NFSVolume) DeleteExport() error {
if !nv.connected { if !nv.connected {
@ -185,3 +189,67 @@ func (nv *NFSVolume) DeleteExport() error {
return nil return nil
} }
// getNFSCluster fetches the NFS-cluster name from the CephFS journal.
func (nv *NFSVolume) getNFSCluster() (string, error) {
if !nv.connected {
return "", fmt.Errorf("can not get NFS-cluster for %q: not connected", nv)
}
fs := fscore.NewFileSystem(nv.conn)
fsName, err := fs.GetFsName(nv.ctx, nv.fscID)
if err != nil {
return "", fmt.Errorf("failed to get filesystem name for ID %x: %w", nv.fscID, err)
}
mdPool, err := fs.GetMetadataPool(nv.ctx, fsName)
if err != nil {
return "", fmt.Errorf("failed to get metadata pool for %q: %w", fsName, err)
}
// Connect to cephfs' default radosNamespace (csi)
j, err := store.VolJournal.Connect(nv.mons, fsutil.RadosNamespace, nv.cr)
if err != nil {
return "", fmt.Errorf("failed to connect to journal: %w", err)
}
defer j.Destroy()
clusterName, err := j.FetchAttribute(nv.ctx, mdPool, nv.objectUUID, clusterNameKey)
if err != nil {
return "", fmt.Errorf("failed to get cluster name: %w", err)
}
return clusterName, nil
}
// setNFSCluster stores the NFS-cluster name in the CephFS journal.
func (nv *NFSVolume) setNFSCluster(clusterName string) error {
if !nv.connected {
return fmt.Errorf("can not set NFS-cluster for %q: not connected", nv)
}
fs := fscore.NewFileSystem(nv.conn)
fsName, err := fs.GetFsName(nv.ctx, nv.fscID)
if err != nil {
return fmt.Errorf("failed to get filesystem name for ID %x: %w", nv.fscID, err)
}
mdPool, err := fs.GetMetadataPool(nv.ctx, fsName)
if err != nil {
return fmt.Errorf("failed to get metadata pool for %q: %w", fsName, err)
}
// Connect to cephfs' default radosNamespace (csi)
j, err := store.VolJournal.Connect(nv.mons, fsutil.RadosNamespace, nv.cr)
if err != nil {
return fmt.Errorf("failed to connect to journal: %w", err)
}
defer j.Destroy()
err = j.StoreAttribute(nv.ctx, mdPool, nv.objectUUID, clusterNameKey, clusterName)
if err != nil {
return fmt.Errorf("failed to store cluster name: %w", err)
}
return nil
}