rbd: implement the VolumeGroup interface

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos 2024-07-19 16:25:21 +02:00 committed by mergify[bot]
parent fbf9ffcac4
commit 40b0526f64
2 changed files with 415 additions and 1 deletions

View File

@ -0,0 +1,402 @@
/*
Copyright 2024 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package group
import (
"context"
"errors"
"fmt"
"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/csi-addons/spec/lib/go/volumegroup"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
)
var ErrRBDGroupNotConnected = errors.New("RBD group is not connected")
// volumeGroup handles all requests for 'rbd group' operations.
type volumeGroup struct {
// id is a unique value for this volume group in the Ceph cluster, it
// is used to find the group in the journal.
id string
// name is used in RBD API calls as the name of this object
name string
clusterID string
credentials *util.Credentials
// temporary connection attributes
conn *util.ClusterConnection
ioctx *rados.IOContext
// required details to perform operations on the group
monitors string
pool string
namespace string
// volumes is a list of rbd-images that are part of the group. The ID
// of each volume is stored in the journal.
volumes []types.Volume
journal journal.VolumeGroupJournal
}
// verify that volumeGroup implements the VolumeGroup and Stringer interfaces.
var (
_ types.VolumeGroup = &volumeGroup{}
_ fmt.Stringer = &volumeGroup{}
)
// GetVolumeGroup initializes a new VolumeGroup object that can be used
// to manage an `rbd group`.
func GetVolumeGroup(
ctx context.Context,
id string,
j journal.VolumeGroupJournal,
creds *util.Credentials,
) (types.VolumeGroup, error) {
csiID := util.CSIIdentifier{}
err := csiID.DecomposeCSIID(id)
if err != nil {
return nil, fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
}
mons, err := util.Mons(util.CsiConfigFile, csiID.ClusterID)
if err != nil {
return nil, fmt.Errorf("failed to get MONs for cluster id %q: %w", csiID.ClusterID, err)
}
namespace, err := util.GetRadosNamespace(util.CsiConfigFile, csiID.ClusterID)
if err != nil {
return nil, fmt.Errorf("failed to get RADOS namespace for cluster id %q: %w", csiID.ClusterID, err)
}
pool, err := util.GetPoolName(mons, creds, csiID.LocationID)
if err != nil {
return nil, fmt.Errorf("failed to get pool for volume group id %q: %w", id, err)
}
attrs, err := j.GetVolumeGroupAttributes(ctx, pool, csiID.ObjectUUID)
if err != nil {
return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", id, err)
}
// TODO: get the volumes that are part of this volume group
return &volumeGroup{
journal: j,
credentials: creds,
id: id,
name: attrs.GroupName,
clusterID: csiID.ClusterID,
monitors: mons,
pool: pool,
namespace: namespace,
}, nil
}
// String makes it easy to include the volumeGroup object in log and error
// messages.
func (vg *volumeGroup) String() string {
if vg.name != "" {
return vg.name
}
if vg.id != "" {
return vg.id
}
return fmt.Sprintf("<unidentified volume %v>", *vg)
}
// GetID returns the CSI-Addons VolumeGroupId of the VolumeGroup.
func (vg *volumeGroup) GetID(ctx context.Context) (string, error) {
if vg.id == "" {
return "", errors.New("BUG: ID is not set")
}
return vg.id, nil
}
// GetName returns the name in the backend storage for the VolumeGroup.
func (vg *volumeGroup) GetName(ctx context.Context) (string, error) {
if vg.name == "" {
return "", errors.New("BUG: name is not set")
}
return vg.name, nil
}
// ToCSI creates a CSI-Addons type for the VolumeGroup.
func (vg *volumeGroup) ToCSI(ctx context.Context) (*volumegroup.VolumeGroup, error) {
volumes, err := vg.ListVolumes(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list volumes for volume group %q: %w", vg, err)
}
csiVolumes := make([]*csi.Volume, len(volumes))
for i, vol := range volumes {
csiVolumes[i], err = vol.ToCSI(ctx)
if err != nil {
return nil, fmt.Errorf("failed to convert volume %q to CSI type: %w", vol, err)
}
}
id, err := vg.GetID(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get id for volume group %q: %w", vg, err)
}
// TODO: maybe store the VolumeContext in the journal?
vgContext := map[string]string{}
return &volumegroup.VolumeGroup{
VolumeGroupId: id,
VolumeGroupContext: vgContext,
Volumes: csiVolumes,
}, nil
}
// getConnection returns the ClusterConnection for the volume group if it
// exists, otherwise it will open a new one.
// Destroy should be used to close the ClusterConnection.
func (vg *volumeGroup) getConnection(ctx context.Context) (*util.ClusterConnection, error) {
if vg.conn != nil {
return vg.conn, nil
}
conn := &util.ClusterConnection{}
err := conn.Connect(vg.monitors, vg.credentials)
if err != nil {
return nil, fmt.Errorf("failed to connect to MONs %q: %w", vg.monitors, err)
}
vg.conn = conn
log.DebugLog(ctx, "connection established for volume group %q", vg.id)
return conn, nil
}
// GetIOContext returns the IOContext for the volume group if it exists,
// otherwise it will allocate a new one.
// Destroy should be used to free the IOContext.
func (vg *volumeGroup) GetIOContext(ctx context.Context) (*rados.IOContext, error) {
if vg.ioctx != nil {
return vg.ioctx, nil
}
conn, err := vg.getConnection(ctx)
if err != nil {
return nil, fmt.Errorf("%w: failed to connect: %w", ErrRBDGroupNotConnected, err)
}
ioctx, err := conn.GetIoctx(vg.pool)
if err != nil {
return nil, fmt.Errorf("%w: failed to get IOContext: %w", ErrRBDGroupNotConnected, err)
}
if vg.namespace != "" {
ioctx.SetNamespace(vg.namespace)
}
vg.ioctx = ioctx
log.DebugLog(ctx, "iocontext created for volume group %q in pool %q", vg.id, vg.pool)
return ioctx, nil
}
// Destroy frees the resources used by the volumeGroup.
func (vg *volumeGroup) Destroy(ctx context.Context) {
if vg.ioctx != nil {
vg.ioctx.Destroy()
vg.ioctx = nil
}
if vg.conn != nil {
vg.conn.Destroy()
vg.conn = nil
}
if vg.credentials != nil {
vg.credentials.DeleteCredentials()
vg.credentials = nil
}
// FIXME: maybe need to .Destroy() all vg.volumes?
log.DebugLog(ctx, "destroyed volume group instance with id %q", vg.id)
}
func (vg *volumeGroup) Create(ctx context.Context, name string) error {
// TODO: if the group already exists, resolve details and use that
ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return err
}
err = librbd.GroupCreate(ioctx, name)
if err != nil {
if !errors.Is(rados.ErrObjectExists, err) {
return fmt.Errorf("failed to create volume group %q: %w", name, err)
}
log.DebugLog(ctx, "ignoring error while creating volume group %q: %v", vg, err)
}
log.DebugLog(ctx, "volume group %q has been created", name)
return nil
}
func (vg *volumeGroup) Delete(ctx context.Context) error {
ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return err
}
err = librbd.GroupRemove(ioctx, vg.name)
if err != nil {
return fmt.Errorf("failed to remove volume group %q: %w", vg, err)
}
log.DebugLog(ctx, "volume group %q has been removed", vg)
return nil
}
func (vg *volumeGroup) AddVolume(ctx context.Context, vol types.Volume) error {
err := vol.AddToGroup(ctx, vg)
if err != nil {
// rados.ErrObjectExists does not match the rbd error (there is no EEXISTS error)
if errors.Is(rados.ErrObjectExists, err) || strings.Contains(err.Error(), "ret=-17") {
log.DebugLog(ctx, "volume %q is already part of volume group %q: %v", vol, vg, err)
return nil
}
return fmt.Errorf("failed to add volume %q to volume group %q: %w", vol, vg, err)
}
vg.volumes = append(vg.volumes, vol)
volID, err := vol.GetID(ctx)
if err != nil {
return err
}
pool, err := vg.GetPool(ctx)
if err != nil {
return err
}
id, err := vg.GetID(ctx)
if err != nil {
return err
}
csiID := util.CSIIdentifier{}
err = csiID.DecomposeCSIID(id)
if err != nil {
return fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
}
toAdd := map[string]string{
volID: "",
}
err = vg.journal.AddVolumesMapping(ctx, pool, csiID.ObjectUUID, toAdd)
if err != nil {
return fmt.Errorf("failed to add mapping for volume %q to volume group id %q: %w", volID, id, err)
}
return nil
}
func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error {
// volume was already removed from the group
if len(vg.volumes) == 0 {
return nil
}
err := vol.RemoveFromGroup(ctx, vg)
if err != nil {
return fmt.Errorf("failed to remove volume %q from volume group %q: %w", vol, vg, err)
}
// toRemove contain the ID of the volume that is removed from the group
toRemove, err := vol.GetID(ctx)
if err != nil {
return fmt.Errorf("failed to get volume id for %q: %w", vol, err)
}
// volumes is the updated list, without the volume that was removed
volumes := make([]types.Volume, 0)
var id string
for _, v := range vg.volumes {
id, err = v.GetID(ctx)
if err != nil {
return err
}
if id == toRemove {
// do not add the volume to the list
continue
}
volumes = append(volumes, v)
}
// update the list of volumes
vg.volumes = volumes
pool, err := vg.GetPool(ctx)
if err != nil {
return err
}
id, err = vg.GetID(ctx)
if err != nil {
return err
}
csiID := util.CSIIdentifier{}
err = csiID.DecomposeCSIID(id)
if err != nil {
return fmt.Errorf("failed to decompose volume group id %q: %w", id, err)
}
mapping := []string{
toRemove,
}
err = vg.journal.RemoveVolumesMapping(ctx, pool, csiID.ObjectUUID, mapping)
if err != nil {
return fmt.Errorf("failed to remove mapping for volume %q to volume group id %q: %w", toRemove, id, err)
}
return nil
}
func (vg *volumeGroup) ListVolumes(ctx context.Context) ([]types.Volume, error) {
return vg.volumes, nil
}

View File

@ -19,6 +19,7 @@ package types
import ( import (
"context" "context"
"github.com/ceph/go-ceph/rados"
"github.com/csi-addons/spec/lib/go/volumegroup" "github.com/csi-addons/spec/lib/go/volumegroup"
) )
@ -28,11 +29,22 @@ type VolumeGroup interface {
// Destroy frees the resources used by the VolumeGroup. // Destroy frees the resources used by the VolumeGroup.
Destroy(ctx context.Context) Destroy(ctx context.Context)
// GetIOContext returns the IOContext for performing librbd operations
// on the VolumeGroup. This is used by the rbdVolume struct when it
// needs to add/remove itself from the VolumeGroup.
GetIOContext(ctx context.Context) (*rados.IOContext, error)
// GetID returns the CSI-Addons VolumeGroupId of the VolumeGroup. // GetID returns the CSI-Addons VolumeGroupId of the VolumeGroup.
GetID(ctx context.Context) (string, error) GetID(ctx context.Context) (string, error)
// GetName returns the name in the backend storage for the VolumeGroup.
GetName(ctx context.Context) (string, error)
// ToCSI creates a CSI-Addons type for the VolumeGroup. // ToCSI creates a CSI-Addons type for the VolumeGroup.
ToCSI(ctx context.Context) *volumegroup.VolumeGroup ToCSI(ctx context.Context) (*volumegroup.VolumeGroup, error)
// Create makes a new group in the backend storage.
Create(ctx context.Context, name string) error
// Delete removes the VolumeGroup from the backend storage. // Delete removes the VolumeGroup from the backend storage.
Delete(ctx context.Context) error Delete(ctx context.Context) error