From a82ae15f1abca7801ccf8e222259b42fc608f47f Mon Sep 17 00:00:00 2001 From: Niels de Vos Date: Mon, 22 Jul 2024 17:51:17 +0200 Subject: [PATCH] rbd: use the Manager to handle CSI-Addons VolumeGroup requests Signed-off-by: Niels de Vos --- internal/csi-addons/rbd/volumegroup.go | 15 +- internal/rbd/group/volume_group.go | 61 ++++++-- internal/rbd/manager.go | 203 ++++++++++++++++++++++--- internal/rbd/types/manager.go | 12 +- 4 files changed, 258 insertions(+), 33 deletions(-) diff --git a/internal/csi-addons/rbd/volumegroup.go b/internal/csi-addons/rbd/volumegroup.go index 261bc6778..beda20d5b 100644 --- a/internal/csi-addons/rbd/volumegroup.go +++ b/internal/csi-addons/rbd/volumegroup.go @@ -82,7 +82,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup( ctx context.Context, req *volumegroup.CreateVolumeGroupRequest, ) (*volumegroup.CreateVolumeGroupResponse, error) { - mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets()) + mgr := rbd.NewManager(vs.csiID, req.GetParameters(), req.GetSecrets()) defer mgr.Destroy(ctx) // resolve all volumes @@ -132,8 +132,17 @@ func (vs *VolumeGroupServer) CreateVolumeGroup( log.DebugLog(ctx, fmt.Sprintf("all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName())) + csiVG, err := vg.ToCSI(ctx) + if err != nil { + return nil, status.Errorf( + codes.Internal, + "failed to convert volume group %q to CSI type: %s", + req.GetName(), + err.Error()) + } + return &volumegroup.CreateVolumeGroupResponse{ - VolumeGroup: vg.ToCSI(ctx), + VolumeGroup: csiVG, }, nil } @@ -159,7 +168,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup( ctx context.Context, req *volumegroup.DeleteVolumeGroupRequest, ) (*volumegroup.DeleteVolumeGroupResponse, error) { - mgr := rbd.NewManager(nil, req.GetSecrets()) + mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets()) defer mgr.Destroy(ctx) // resolve the volume group diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go index 4d0a05d58..1a9e1963c 100644 --- a/internal/rbd/group/volume_group.go +++ b/internal/rbd/group/volume_group.go @@ -14,12 +14,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package group +package rbd_group import ( "context" "errors" "fmt" + "strings" "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" @@ -56,11 +57,17 @@ type volumeGroup struct { pool string namespace string + journal journal.VolumeGroupJournal + // volumes is a list of rbd-images that are part of the group. The ID // of each volume is stored in the journal. volumes []types.Volume - journal journal.VolumeGroupJournal + // volumeToFree contains Volumes that were resolved during + // GetVolumeGroup. The volumes slice can be updated independently of + // this by calling AddVolume (Volumes are allocated elsewhere), and + // RemoveVolume (need to keep track of the allocated Volume). + volumesToFree []types.Volume } // verify that volumeGroup implements the VolumeGroup and Stringer interfaces. @@ -71,11 +78,14 @@ var ( // GetVolumeGroup initializes a new VolumeGroup object that can be used // to manage an `rbd group`. +// If the .GetName() function returns an error, the VolumeGroup does not exist +// yet. It is needed to call .Create() in that case first. func GetVolumeGroup( ctx context.Context, id string, j journal.VolumeGroupJournal, creds *util.Credentials, + volumeResolver types.VolumeResolver, ) (types.VolumeGroup, error) { csiID := util.CSIIdentifier{} err := csiID.DecomposeCSIID(id) @@ -100,10 +110,27 @@ func GetVolumeGroup( attrs, err := j.GetVolumeGroupAttributes(ctx, pool, csiID.ObjectUUID) if err != nil { - return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", id, err) + if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) { + return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", id, err) + } + + attrs = &journal.VolumeGroupAttributes{} } - // TODO: get the volumes that are part of this volume group + var volumes []types.Volume + for volID := range attrs.VolumeMap { + vol, err := volumeResolver.GetVolumeByID(ctx, volID) + if err != nil { + // free the previously allocated volumes + for _, v := range volumes { + v.Destroy(ctx) + } + + return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", id, err) + } + + volumes = append(volumes, vol) + } return &volumeGroup{ journal: j, @@ -114,6 +141,9 @@ func GetVolumeGroup( monitors: mons, pool: pool, namespace: namespace, + volumes: volumes, + // all allocated volumes need to be free'd at Destroy() time + volumesToFree: volumes, }, nil } @@ -229,6 +259,14 @@ func (vg *volumeGroup) GetIOContext(ctx context.Context) (*rados.IOContext, erro // Destroy frees the resources used by the volumeGroup. func (vg *volumeGroup) Destroy(ctx context.Context) { + // free the volumes that were allocated in GetVolumeGroup() + if len(vg.volumesToFree) > 0 { + for _, volume := range vg.volumesToFree { + volume.Destroy(ctx) + } + vg.volumesToFree = make([]types.Volume, 0) + } + if vg.ioctx != nil { vg.ioctx.Destroy() vg.ioctx = nil @@ -244,7 +282,6 @@ func (vg *volumeGroup) Destroy(ctx context.Context) { vg.credentials = nil } - // FIXME: maybe need to .Destroy() all vg.volumes? log.DebugLog(ctx, "destroyed volume group instance with id %q", vg.id) } @@ -257,26 +294,32 @@ func (vg *volumeGroup) Create(ctx context.Context, name string) error { err = librbd.GroupCreate(ioctx, name) if err != nil { - if !errors.Is(rados.ErrObjectExists, err) { + if !errors.Is(rados.ErrObjectExists, err) && !strings.Contains(err.Error(), "rbd: ret=-17, File exists") { return fmt.Errorf("failed to create volume group %q: %w", name, err) } log.DebugLog(ctx, "ignoring error while creating volume group %q: %v", vg, err) } - log.DebugLog(ctx, "volume group %q has been created", name) + vg.name = name + log.DebugLog(ctx, "volume group %q has been created", vg) return nil } func (vg *volumeGroup) Delete(ctx context.Context) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + ioctx, err := vg.GetIOContext(ctx) if err != nil { return err } - err = librbd.GroupRemove(ioctx, vg.name) - if err != nil { + err = librbd.GroupRemove(ioctx, name) + if err != nil && !errors.Is(rados.ErrNotFound, err) { return fmt.Errorf("failed to remove volume group %q: %w", vg, err) } diff --git a/internal/rbd/manager.go b/internal/rbd/manager.go index 518a914d0..cf19e0b01 100644 --- a/internal/rbd/manager.go +++ b/internal/rbd/manager.go @@ -19,24 +19,36 @@ package rbd import ( "context" "errors" + "fmt" + "github.com/ceph/ceph-csi/internal/journal" + "github.com/ceph/ceph-csi/internal/rbd/group" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" ) var _ types.Manager = &rbdManager{} type rbdManager struct { + // csiID is the instance id of the CSI-driver (driver name). + csiID string + // parameters can contain the parameters of a create request. parameters map[string]string - secrets map[string]string + // secrets contain the credentials to connect to the Ceph cluster. + secrets map[string]string + // creds are the cached credentials, will be freed on Destroy() creds *util.Credentials + // vgJournal is the journal that is used during opetations, it will be freed on Destroy(). + vgJournal journal.VolumeGroupJournal } // NewManager returns a new manager for handling Volume and Volume Group // operations, combining the requests for RBD and the journalling in RADOS. -func NewManager(parameters, secrets map[string]string) types.Manager { +func NewManager(csiID string, parameters, secrets map[string]string) types.Manager { return &rbdManager{ + csiID: csiID, parameters: parameters, secrets: secrets, } @@ -47,43 +59,198 @@ func (mgr *rbdManager) Destroy(ctx context.Context) { mgr.creds.DeleteCredentials() mgr.creds = nil } + + if mgr.vgJournal != nil { + mgr.vgJournal.Destroy() + mgr.vgJournal = nil + } } -// connect sets up credentials and connects to the journal. -func (mgr *rbdManager) connect() error { - if mgr.creds == nil { - creds, err := util.NewUserCredentials(mgr.secrets) - if err != nil { - return err - } - - mgr.creds = creds +// getCredentials sets up credentials and connects to the journal. +func (mgr *rbdManager) getCredentials() (*util.Credentials, error) { + if mgr.creds != nil { + return mgr.creds, nil } - return nil + creds, err := util.NewUserCredentials(mgr.secrets) + if err != nil { + return nil, fmt.Errorf("failed to get credentials: %w", err) + } + + mgr.creds = creds + + return creds, nil +} + +func (mgr *rbdManager) getVolumeGroupJournal(clusterID string) (journal.VolumeGroupJournal, error) { + if mgr.vgJournal != nil { + return mgr.vgJournal, nil + } + + creds, err := mgr.getCredentials() + if err != nil { + return nil, err + } + + monitors, err := util.Mons(util.CsiConfigFile, clusterID) + if err != nil { + return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err) + } + + ns, err := util.GetRadosNamespace(util.CsiConfigFile, clusterID) + if err != nil { + return nil, fmt.Errorf("failed to find the RADOS namespace for cluster %q: %w", clusterID, err) + } + + vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.csiID, ns) + + vgJournal, err := vgJournalConfig.Connect(monitors, ns, creds) + if err != nil { + return nil, fmt.Errorf("failed to connect to journal: %w", err) + } + + mgr.vgJournal = vgJournal + + return vgJournal, nil } func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volume, error) { - if err := mgr.connect(); err != nil { + creds, err := mgr.getCredentials() + if err != nil { return nil, err } - volume, err := GenVolFromVolID(ctx, id, mgr.creds, mgr.secrets) + volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get volume from id %q: %w", id, err) } return volume, nil } func (mgr *rbdManager) GetVolumeGroupByID(ctx context.Context, id string) (types.VolumeGroup, error) { - return nil, errors.New("rbdManager.GetVolumeGroupByID() is not implemented yet") + vi := &util.CSIIdentifier{} + if err := vi.DecomposeCSIID(id); err != nil { + return nil, fmt.Errorf("failed to parse volume group id %q: %w", id, err) + } + + vgJournal, err := mgr.getVolumeGroupJournal(vi.ClusterID) + if err != nil { + return nil, err + } + + creds, err := mgr.getCredentials() + if err != nil { + return nil, err + } + + vg, err := rbd_group.GetVolumeGroup(ctx, id, vgJournal, creds, mgr) + if err != nil { + return nil, fmt.Errorf("failed to get volume group with id %q: %w", id, err) + } + + return vg, nil } func (mgr *rbdManager) CreateVolumeGroup(ctx context.Context, name string) (types.VolumeGroup, error) { - return nil, errors.New("rbdManager.CreateVolumeGroup() is not implemented yet") + creds, err := mgr.getCredentials() + if err != nil { + return nil, err + } + + clusterID, err := util.GetClusterID(mgr.parameters) + if err != nil { + return nil, fmt.Errorf("failed to get cluster-id: %w", err) + } + + vgJournal, err := mgr.getVolumeGroupJournal(clusterID) + if err != nil { + return nil, err + } + + // pool is a required parameter + pool, ok := mgr.parameters["pool"] + if !ok || pool == "" { + return nil, errors.New("required 'pool' option missing in volume group parameters") + } + + // journalPool is an optional parameter, use pool if it is not set + journalPool, ok := mgr.parameters["journalPool"] + if !ok || journalPool == "" { + journalPool = pool + } + + // volumeNamePrefix is an optional parameter, can be an empty string + prefix := mgr.parameters["volumeNamePrefix"] + + // check if the journal contains a generated name for the group already + vgData, err := vgJournal.CheckReservation(ctx, journalPool, name, prefix) + if err != nil { + return nil, fmt.Errorf("failed to reserve volume group for name %q: %w", name, err) + } + + var uuid string + if vgData != nil && vgData.GroupName != "" { + uuid = vgData.GroupUUID + } else { + log.DebugLog(ctx, "the journal does not contain a reservation for a volume group with name %q yet", name) + + var vgName string + uuid, vgName, err = vgJournal.ReserveName(ctx, journalPool, name, prefix) + if err != nil { + return nil, fmt.Errorf("failed to reserve volume group for name %q: %w", name, err) + } + defer func() { + if err != nil { + err = vgJournal.UndoReservation(ctx, pool, vgName, name) + if err != nil { + log.ErrorLog(ctx, "failed to undo the reservation for volume group %q: %w", name, err) + } + } + }() + } + + monitors, err := util.Mons(util.CsiConfigFile, clusterID) + if err != nil { + return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err) + } + + _ /*journalPoolID*/, poolID, err := util.GetPoolIDs(ctx, monitors, journalPool, pool, creds) + if err != nil { + return nil, fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", uuid, err) + } + + csiID, err := util.GenerateVolID(ctx, monitors, creds, poolID, pool, clusterID, uuid) + if err != nil { + return nil, fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", uuid, err) + } + + vg, err := rbd_group.GetVolumeGroup(ctx, csiID, vgJournal, creds, mgr) + if err != nil { + return nil, fmt.Errorf("failed to get volume group %q at cluster %q: %w", name, clusterID, err) + } + defer func() { + if err != nil { + vg.Destroy(ctx) + } + }() + + // check if the volume group exists in the backend + existingName, err := vg.GetName(ctx) + if err != nil { + // the volume group does not exist yet + err = vg.Create(ctx, vgName) + if err != nil { + return nil, fmt.Errorf("failed to create volume group %q: %w", name, err) + } + } else if existingName != vgName { + return nil, fmt.Errorf("volume group id %q has a name mismatch, expected %q, not %q", name, vgName, existingName) + } + + return vg, nil } func (mgr *rbdManager) DeleteVolumeGroup(ctx context.Context, vg types.VolumeGroup) error { - return errors.New("rbdManager.CreateVolumeGroup() is not implemented yet") + // TODO: remove from journal + return vg.Delete(ctx) } diff --git a/internal/rbd/types/manager.go b/internal/rbd/types/manager.go index 7744eb18a..334974932 100644 --- a/internal/rbd/types/manager.go +++ b/internal/rbd/types/manager.go @@ -20,16 +20,22 @@ import ( "context" ) +// VolumeResolver can be used to construct a Volume from a CSI VolumeId. +type VolumeResolver interface { + // GetVolumeByID uses the CSI VolumeId to resolve the returned Volume. + GetVolumeByID(ctx context.Context, id string) (Volume, error) +} + // Manager provides a way for other packages to get Volumes and VolumeGroups. // It handles the operations on the backend, and makes sure the journal // reflects the expected state. type Manager interface { + // VolumeResolver is fully implemented by the Manager. + VolumeResolver + // Destroy frees all resources that the Manager allocated. Destroy(ctx context.Context) - // GetVolumeByID uses the CSI VolumeId to resolve the returned Volume. - GetVolumeByID(ctx context.Context, id string) (Volume, error) - // GetVolumeGroupByID uses the CSI-Addons VolumeGroupId to resolve the // returned VolumeGroup. GetVolumeGroupByID(ctx context.Context, id string) (VolumeGroup, error)