rbd: use the Manager to handle CSI-Addons VolumeGroup requests

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos 2024-07-22 17:51:17 +02:00 committed by mergify[bot]
parent 40b0526f64
commit a82ae15f1a
4 changed files with 258 additions and 33 deletions

View File

@ -82,7 +82,7 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
ctx context.Context,
req *volumegroup.CreateVolumeGroupRequest,
) (*volumegroup.CreateVolumeGroupResponse, error) {
mgr := rbd.NewManager(req.GetParameters(), req.GetSecrets())
mgr := rbd.NewManager(vs.csiID, req.GetParameters(), req.GetSecrets())
defer mgr.Destroy(ctx)
// resolve all volumes
@ -132,8 +132,17 @@ func (vs *VolumeGroupServer) CreateVolumeGroup(
log.DebugLog(ctx, fmt.Sprintf("all %d Volumes have been added to for VolumeGroup %q", len(volumes), req.GetName()))
csiVG, err := vg.ToCSI(ctx)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to convert volume group %q to CSI type: %s",
req.GetName(),
err.Error())
}
return &volumegroup.CreateVolumeGroupResponse{
VolumeGroup: vg.ToCSI(ctx),
VolumeGroup: csiVG,
}, nil
}
@ -159,7 +168,7 @@ func (vs *VolumeGroupServer) DeleteVolumeGroup(
ctx context.Context,
req *volumegroup.DeleteVolumeGroupRequest,
) (*volumegroup.DeleteVolumeGroupResponse, error) {
mgr := rbd.NewManager(nil, req.GetSecrets())
mgr := rbd.NewManager(vs.csiID, nil, req.GetSecrets())
defer mgr.Destroy(ctx)
// resolve the volume group

View File

@ -14,12 +14,13 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package group
package rbd_group
import (
"context"
"errors"
"fmt"
"strings"
"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
@ -56,11 +57,17 @@ type volumeGroup struct {
pool string
namespace string
journal journal.VolumeGroupJournal
// volumes is a list of rbd-images that are part of the group. The ID
// of each volume is stored in the journal.
volumes []types.Volume
journal journal.VolumeGroupJournal
// volumeToFree contains Volumes that were resolved during
// GetVolumeGroup. The volumes slice can be updated independently of
// this by calling AddVolume (Volumes are allocated elsewhere), and
// RemoveVolume (need to keep track of the allocated Volume).
volumesToFree []types.Volume
}
// verify that volumeGroup implements the VolumeGroup and Stringer interfaces.
@ -71,11 +78,14 @@ var (
// GetVolumeGroup initializes a new VolumeGroup object that can be used
// to manage an `rbd group`.
// If the .GetName() function returns an error, the VolumeGroup does not exist
// yet. It is needed to call .Create() in that case first.
func GetVolumeGroup(
ctx context.Context,
id string,
j journal.VolumeGroupJournal,
creds *util.Credentials,
volumeResolver types.VolumeResolver,
) (types.VolumeGroup, error) {
csiID := util.CSIIdentifier{}
err := csiID.DecomposeCSIID(id)
@ -100,10 +110,27 @@ func GetVolumeGroup(
attrs, err := j.GetVolumeGroupAttributes(ctx, pool, csiID.ObjectUUID)
if err != nil {
return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", id, err)
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) {
return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", id, err)
}
attrs = &journal.VolumeGroupAttributes{}
}
// TODO: get the volumes that are part of this volume group
var volumes []types.Volume
for volID := range attrs.VolumeMap {
vol, err := volumeResolver.GetVolumeByID(ctx, volID)
if err != nil {
// free the previously allocated volumes
for _, v := range volumes {
v.Destroy(ctx)
}
return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", id, err)
}
volumes = append(volumes, vol)
}
return &volumeGroup{
journal: j,
@ -114,6 +141,9 @@ func GetVolumeGroup(
monitors: mons,
pool: pool,
namespace: namespace,
volumes: volumes,
// all allocated volumes need to be free'd at Destroy() time
volumesToFree: volumes,
}, nil
}
@ -229,6 +259,14 @@ func (vg *volumeGroup) GetIOContext(ctx context.Context) (*rados.IOContext, erro
// Destroy frees the resources used by the volumeGroup.
func (vg *volumeGroup) Destroy(ctx context.Context) {
// free the volumes that were allocated in GetVolumeGroup()
if len(vg.volumesToFree) > 0 {
for _, volume := range vg.volumesToFree {
volume.Destroy(ctx)
}
vg.volumesToFree = make([]types.Volume, 0)
}
if vg.ioctx != nil {
vg.ioctx.Destroy()
vg.ioctx = nil
@ -244,7 +282,6 @@ func (vg *volumeGroup) Destroy(ctx context.Context) {
vg.credentials = nil
}
// FIXME: maybe need to .Destroy() all vg.volumes?
log.DebugLog(ctx, "destroyed volume group instance with id %q", vg.id)
}
@ -257,26 +294,32 @@ func (vg *volumeGroup) Create(ctx context.Context, name string) error {
err = librbd.GroupCreate(ioctx, name)
if err != nil {
if !errors.Is(rados.ErrObjectExists, err) {
if !errors.Is(rados.ErrObjectExists, err) && !strings.Contains(err.Error(), "rbd: ret=-17, File exists") {
return fmt.Errorf("failed to create volume group %q: %w", name, err)
}
log.DebugLog(ctx, "ignoring error while creating volume group %q: %v", vg, err)
}
log.DebugLog(ctx, "volume group %q has been created", name)
vg.name = name
log.DebugLog(ctx, "volume group %q has been created", vg)
return nil
}
func (vg *volumeGroup) Delete(ctx context.Context) error {
name, err := vg.GetName(ctx)
if err != nil {
return err
}
ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return err
}
err = librbd.GroupRemove(ioctx, vg.name)
if err != nil {
err = librbd.GroupRemove(ioctx, name)
if err != nil && !errors.Is(rados.ErrNotFound, err) {
return fmt.Errorf("failed to remove volume group %q: %w", vg, err)
}

View File

@ -19,24 +19,36 @@ package rbd
import (
"context"
"errors"
"fmt"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/rbd/group"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
var _ types.Manager = &rbdManager{}
type rbdManager struct {
// csiID is the instance id of the CSI-driver (driver name).
csiID string
// parameters can contain the parameters of a create request.
parameters map[string]string
secrets map[string]string
// secrets contain the credentials to connect to the Ceph cluster.
secrets map[string]string
// creds are the cached credentials, will be freed on Destroy()
creds *util.Credentials
// vgJournal is the journal that is used during opetations, it will be freed on Destroy().
vgJournal journal.VolumeGroupJournal
}
// NewManager returns a new manager for handling Volume and Volume Group
// operations, combining the requests for RBD and the journalling in RADOS.
func NewManager(parameters, secrets map[string]string) types.Manager {
func NewManager(csiID string, parameters, secrets map[string]string) types.Manager {
return &rbdManager{
csiID: csiID,
parameters: parameters,
secrets: secrets,
}
@ -47,43 +59,198 @@ func (mgr *rbdManager) Destroy(ctx context.Context) {
mgr.creds.DeleteCredentials()
mgr.creds = nil
}
if mgr.vgJournal != nil {
mgr.vgJournal.Destroy()
mgr.vgJournal = nil
}
}
// connect sets up credentials and connects to the journal.
func (mgr *rbdManager) connect() error {
if mgr.creds == nil {
creds, err := util.NewUserCredentials(mgr.secrets)
if err != nil {
return err
}
mgr.creds = creds
// getCredentials sets up credentials and connects to the journal.
func (mgr *rbdManager) getCredentials() (*util.Credentials, error) {
if mgr.creds != nil {
return mgr.creds, nil
}
return nil
creds, err := util.NewUserCredentials(mgr.secrets)
if err != nil {
return nil, fmt.Errorf("failed to get credentials: %w", err)
}
mgr.creds = creds
return creds, nil
}
func (mgr *rbdManager) getVolumeGroupJournal(clusterID string) (journal.VolumeGroupJournal, error) {
if mgr.vgJournal != nil {
return mgr.vgJournal, nil
}
creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}
monitors, err := util.Mons(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err)
}
ns, err := util.GetRadosNamespace(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find the RADOS namespace for cluster %q: %w", clusterID, err)
}
vgJournalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(mgr.csiID, ns)
vgJournal, err := vgJournalConfig.Connect(monitors, ns, creds)
if err != nil {
return nil, fmt.Errorf("failed to connect to journal: %w", err)
}
mgr.vgJournal = vgJournal
return vgJournal, nil
}
func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volume, error) {
if err := mgr.connect(); err != nil {
creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}
volume, err := GenVolFromVolID(ctx, id, mgr.creds, mgr.secrets)
volume, err := GenVolFromVolID(ctx, id, creds, mgr.secrets)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to get volume from id %q: %w", id, err)
}
return volume, nil
}
func (mgr *rbdManager) GetVolumeGroupByID(ctx context.Context, id string) (types.VolumeGroup, error) {
return nil, errors.New("rbdManager.GetVolumeGroupByID() is not implemented yet")
vi := &util.CSIIdentifier{}
if err := vi.DecomposeCSIID(id); err != nil {
return nil, fmt.Errorf("failed to parse volume group id %q: %w", id, err)
}
vgJournal, err := mgr.getVolumeGroupJournal(vi.ClusterID)
if err != nil {
return nil, err
}
creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}
vg, err := rbd_group.GetVolumeGroup(ctx, id, vgJournal, creds, mgr)
if err != nil {
return nil, fmt.Errorf("failed to get volume group with id %q: %w", id, err)
}
return vg, nil
}
func (mgr *rbdManager) CreateVolumeGroup(ctx context.Context, name string) (types.VolumeGroup, error) {
return nil, errors.New("rbdManager.CreateVolumeGroup() is not implemented yet")
creds, err := mgr.getCredentials()
if err != nil {
return nil, err
}
clusterID, err := util.GetClusterID(mgr.parameters)
if err != nil {
return nil, fmt.Errorf("failed to get cluster-id: %w", err)
}
vgJournal, err := mgr.getVolumeGroupJournal(clusterID)
if err != nil {
return nil, err
}
// pool is a required parameter
pool, ok := mgr.parameters["pool"]
if !ok || pool == "" {
return nil, errors.New("required 'pool' option missing in volume group parameters")
}
// journalPool is an optional parameter, use pool if it is not set
journalPool, ok := mgr.parameters["journalPool"]
if !ok || journalPool == "" {
journalPool = pool
}
// volumeNamePrefix is an optional parameter, can be an empty string
prefix := mgr.parameters["volumeNamePrefix"]
// check if the journal contains a generated name for the group already
vgData, err := vgJournal.CheckReservation(ctx, journalPool, name, prefix)
if err != nil {
return nil, fmt.Errorf("failed to reserve volume group for name %q: %w", name, err)
}
var uuid string
if vgData != nil && vgData.GroupName != "" {
uuid = vgData.GroupUUID
} else {
log.DebugLog(ctx, "the journal does not contain a reservation for a volume group with name %q yet", name)
var vgName string
uuid, vgName, err = vgJournal.ReserveName(ctx, journalPool, name, prefix)
if err != nil {
return nil, fmt.Errorf("failed to reserve volume group for name %q: %w", name, err)
}
defer func() {
if err != nil {
err = vgJournal.UndoReservation(ctx, pool, vgName, name)
if err != nil {
log.ErrorLog(ctx, "failed to undo the reservation for volume group %q: %w", name, err)
}
}
}()
}
monitors, err := util.Mons(util.CsiConfigFile, clusterID)
if err != nil {
return nil, fmt.Errorf("failed to find MONs for cluster %q: %w", clusterID, err)
}
_ /*journalPoolID*/, poolID, err := util.GetPoolIDs(ctx, monitors, journalPool, pool, creds)
if err != nil {
return nil, fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", uuid, err)
}
csiID, err := util.GenerateVolID(ctx, monitors, creds, poolID, pool, clusterID, uuid)
if err != nil {
return nil, fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", uuid, err)
}
vg, err := rbd_group.GetVolumeGroup(ctx, csiID, vgJournal, creds, mgr)
if err != nil {
return nil, fmt.Errorf("failed to get volume group %q at cluster %q: %w", name, clusterID, err)
}
defer func() {
if err != nil {
vg.Destroy(ctx)
}
}()
// check if the volume group exists in the backend
existingName, err := vg.GetName(ctx)
if err != nil {
// the volume group does not exist yet
err = vg.Create(ctx, vgName)
if err != nil {
return nil, fmt.Errorf("failed to create volume group %q: %w", name, err)
}
} else if existingName != vgName {
return nil, fmt.Errorf("volume group id %q has a name mismatch, expected %q, not %q", name, vgName, existingName)
}
return vg, nil
}
func (mgr *rbdManager) DeleteVolumeGroup(ctx context.Context, vg types.VolumeGroup) error {
return errors.New("rbdManager.CreateVolumeGroup() is not implemented yet")
// TODO: remove from journal
return vg.Delete(ctx)
}

View File

@ -20,16 +20,22 @@ import (
"context"
)
// VolumeResolver can be used to construct a Volume from a CSI VolumeId.
type VolumeResolver interface {
// GetVolumeByID uses the CSI VolumeId to resolve the returned Volume.
GetVolumeByID(ctx context.Context, id string) (Volume, error)
}
// Manager provides a way for other packages to get Volumes and VolumeGroups.
// It handles the operations on the backend, and makes sure the journal
// reflects the expected state.
type Manager interface {
// VolumeResolver is fully implemented by the Manager.
VolumeResolver
// Destroy frees all resources that the Manager allocated.
Destroy(ctx context.Context)
// GetVolumeByID uses the CSI VolumeId to resolve the returned Volume.
GetVolumeByID(ctx context.Context, id string) (Volume, error)
// GetVolumeGroupByID uses the CSI-Addons VolumeGroupId to resolve the
// returned VolumeGroup.
GetVolumeGroupByID(ctx context.Context, id string) (VolumeGroup, error)