diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go new file mode 100644 index 000000000..4d0a05d58 --- /dev/null +++ b/internal/rbd/group/volume_group.go @@ -0,0 +1,402 @@ +/* +Copyright 2024 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package group + +import ( + "context" + "errors" + "fmt" + + "github.com/ceph/go-ceph/rados" + librbd "github.com/ceph/go-ceph/rbd" + "github.com/container-storage-interface/spec/lib/go/csi" + "github.com/csi-addons/spec/lib/go/volumegroup" + + "github.com/ceph/ceph-csi/internal/journal" + "github.com/ceph/ceph-csi/internal/rbd/types" + "github.com/ceph/ceph-csi/internal/util" + "github.com/ceph/ceph-csi/internal/util/log" +) + +var ErrRBDGroupNotConnected = errors.New("RBD group is not connected") + +// volumeGroup handles all requests for 'rbd group' operations. +type volumeGroup struct { + // id is a unique value for this volume group in the Ceph cluster, it + // is used to find the group in the journal. + id string + + // name is used in RBD API calls as the name of this object + name string + + clusterID string + + credentials *util.Credentials + + // temporary connection attributes + conn *util.ClusterConnection + ioctx *rados.IOContext + + // required details to perform operations on the group + monitors string + pool string + namespace string + + // volumes is a list of rbd-images that are part of the group. The ID + // of each volume is stored in the journal. + volumes []types.Volume + + journal journal.VolumeGroupJournal +} + +// verify that volumeGroup implements the VolumeGroup and Stringer interfaces. +var ( + _ types.VolumeGroup = &volumeGroup{} + _ fmt.Stringer = &volumeGroup{} +) + +// GetVolumeGroup initializes a new VolumeGroup object that can be used +// to manage an `rbd group`. +func GetVolumeGroup( + ctx context.Context, + id string, + j journal.VolumeGroupJournal, + creds *util.Credentials, +) (types.VolumeGroup, error) { + csiID := util.CSIIdentifier{} + err := csiID.DecomposeCSIID(id) + if err != nil { + return nil, fmt.Errorf("failed to decompose volume group id %q: %w", id, err) + } + + mons, err := util.Mons(util.CsiConfigFile, csiID.ClusterID) + if err != nil { + return nil, fmt.Errorf("failed to get MONs for cluster id %q: %w", csiID.ClusterID, err) + } + + namespace, err := util.GetRadosNamespace(util.CsiConfigFile, csiID.ClusterID) + if err != nil { + return nil, fmt.Errorf("failed to get RADOS namespace for cluster id %q: %w", csiID.ClusterID, err) + } + + pool, err := util.GetPoolName(mons, creds, csiID.LocationID) + if err != nil { + return nil, fmt.Errorf("failed to get pool for volume group id %q: %w", id, err) + } + + attrs, err := j.GetVolumeGroupAttributes(ctx, pool, csiID.ObjectUUID) + if err != nil { + return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", id, err) + } + + // TODO: get the volumes that are part of this volume group + + return &volumeGroup{ + journal: j, + credentials: creds, + id: id, + name: attrs.GroupName, + clusterID: csiID.ClusterID, + monitors: mons, + pool: pool, + namespace: namespace, + }, nil +} + +// String makes it easy to include the volumeGroup object in log and error +// messages. +func (vg *volumeGroup) String() string { + if vg.name != "" { + return vg.name + } + + if vg.id != "" { + return vg.id + } + + return fmt.Sprintf("", *vg) +} + +// GetID returns the CSI-Addons VolumeGroupId of the VolumeGroup. +func (vg *volumeGroup) GetID(ctx context.Context) (string, error) { + if vg.id == "" { + return "", errors.New("BUG: ID is not set") + } + + return vg.id, nil +} + +// GetName returns the name in the backend storage for the VolumeGroup. +func (vg *volumeGroup) GetName(ctx context.Context) (string, error) { + if vg.name == "" { + return "", errors.New("BUG: name is not set") + } + + return vg.name, nil +} + +// ToCSI creates a CSI-Addons type for the VolumeGroup. +func (vg *volumeGroup) ToCSI(ctx context.Context) (*volumegroup.VolumeGroup, error) { + volumes, err := vg.ListVolumes(ctx) + if err != nil { + return nil, fmt.Errorf("failed to list volumes for volume group %q: %w", vg, err) + } + + csiVolumes := make([]*csi.Volume, len(volumes)) + for i, vol := range volumes { + csiVolumes[i], err = vol.ToCSI(ctx) + if err != nil { + return nil, fmt.Errorf("failed to convert volume %q to CSI type: %w", vol, err) + } + } + + id, err := vg.GetID(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get id for volume group %q: %w", vg, err) + } + + // TODO: maybe store the VolumeContext in the journal? + vgContext := map[string]string{} + + return &volumegroup.VolumeGroup{ + VolumeGroupId: id, + VolumeGroupContext: vgContext, + Volumes: csiVolumes, + }, nil +} + +// getConnection returns the ClusterConnection for the volume group if it +// exists, otherwise it will open a new one. +// Destroy should be used to close the ClusterConnection. +func (vg *volumeGroup) getConnection(ctx context.Context) (*util.ClusterConnection, error) { + if vg.conn != nil { + return vg.conn, nil + } + + conn := &util.ClusterConnection{} + err := conn.Connect(vg.monitors, vg.credentials) + if err != nil { + return nil, fmt.Errorf("failed to connect to MONs %q: %w", vg.monitors, err) + } + + vg.conn = conn + log.DebugLog(ctx, "connection established for volume group %q", vg.id) + + return conn, nil +} + +// GetIOContext returns the IOContext for the volume group if it exists, +// otherwise it will allocate a new one. +// Destroy should be used to free the IOContext. +func (vg *volumeGroup) GetIOContext(ctx context.Context) (*rados.IOContext, error) { + if vg.ioctx != nil { + return vg.ioctx, nil + } + + conn, err := vg.getConnection(ctx) + if err != nil { + return nil, fmt.Errorf("%w: failed to connect: %w", ErrRBDGroupNotConnected, err) + } + + ioctx, err := conn.GetIoctx(vg.pool) + if err != nil { + return nil, fmt.Errorf("%w: failed to get IOContext: %w", ErrRBDGroupNotConnected, err) + } + + if vg.namespace != "" { + ioctx.SetNamespace(vg.namespace) + } + + vg.ioctx = ioctx + log.DebugLog(ctx, "iocontext created for volume group %q in pool %q", vg.id, vg.pool) + + return ioctx, nil +} + +// Destroy frees the resources used by the volumeGroup. +func (vg *volumeGroup) Destroy(ctx context.Context) { + if vg.ioctx != nil { + vg.ioctx.Destroy() + vg.ioctx = nil + } + + if vg.conn != nil { + vg.conn.Destroy() + vg.conn = nil + } + + if vg.credentials != nil { + vg.credentials.DeleteCredentials() + vg.credentials = nil + } + + // FIXME: maybe need to .Destroy() all vg.volumes? + log.DebugLog(ctx, "destroyed volume group instance with id %q", vg.id) +} + +func (vg *volumeGroup) Create(ctx context.Context, name string) error { + // TODO: if the group already exists, resolve details and use that + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.GroupCreate(ioctx, name) + if err != nil { + if !errors.Is(rados.ErrObjectExists, err) { + return fmt.Errorf("failed to create volume group %q: %w", name, err) + } + + log.DebugLog(ctx, "ignoring error while creating volume group %q: %v", vg, err) + } + + log.DebugLog(ctx, "volume group %q has been created", name) + + return nil +} + +func (vg *volumeGroup) Delete(ctx context.Context) error { + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.GroupRemove(ioctx, vg.name) + if err != nil { + return fmt.Errorf("failed to remove volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "volume group %q has been removed", vg) + + return nil +} + +func (vg *volumeGroup) AddVolume(ctx context.Context, vol types.Volume) error { + err := vol.AddToGroup(ctx, vg) + if err != nil { + // rados.ErrObjectExists does not match the rbd error (there is no EEXISTS error) + if errors.Is(rados.ErrObjectExists, err) || strings.Contains(err.Error(), "ret=-17") { + log.DebugLog(ctx, "volume %q is already part of volume group %q: %v", vol, vg, err) + return nil + } + + return fmt.Errorf("failed to add volume %q to volume group %q: %w", vol, vg, err) + } + + vg.volumes = append(vg.volumes, vol) + + volID, err := vol.GetID(ctx) + if err != nil { + return err + } + + pool, err := vg.GetPool(ctx) + if err != nil { + return err + } + + id, err := vg.GetID(ctx) + if err != nil { + return err + } + + csiID := util.CSIIdentifier{} + err = csiID.DecomposeCSIID(id) + if err != nil { + return fmt.Errorf("failed to decompose volume group id %q: %w", id, err) + } + + toAdd := map[string]string{ + volID: "", + } + + err = vg.journal.AddVolumesMapping(ctx, pool, csiID.ObjectUUID, toAdd) + if err != nil { + return fmt.Errorf("failed to add mapping for volume %q to volume group id %q: %w", volID, id, err) + } + + return nil +} + +func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error { + // volume was already removed from the group + if len(vg.volumes) == 0 { + return nil + } + + err := vol.RemoveFromGroup(ctx, vg) + if err != nil { + return fmt.Errorf("failed to remove volume %q from volume group %q: %w", vol, vg, err) + } + + // toRemove contain the ID of the volume that is removed from the group + toRemove, err := vol.GetID(ctx) + if err != nil { + return fmt.Errorf("failed to get volume id for %q: %w", vol, err) + } + + // volumes is the updated list, without the volume that was removed + volumes := make([]types.Volume, 0) + var id string + for _, v := range vg.volumes { + id, err = v.GetID(ctx) + if err != nil { + return err + } + + if id == toRemove { + // do not add the volume to the list + continue + } + + volumes = append(volumes, v) + } + + // update the list of volumes + vg.volumes = volumes + + pool, err := vg.GetPool(ctx) + if err != nil { + return err + } + + id, err = vg.GetID(ctx) + if err != nil { + return err + } + + csiID := util.CSIIdentifier{} + err = csiID.DecomposeCSIID(id) + if err != nil { + return fmt.Errorf("failed to decompose volume group id %q: %w", id, err) + } + + mapping := []string{ + toRemove, + } + + err = vg.journal.RemoveVolumesMapping(ctx, pool, csiID.ObjectUUID, mapping) + if err != nil { + return fmt.Errorf("failed to remove mapping for volume %q to volume group id %q: %w", toRemove, id, err) + } + + return nil +} + +func (vg *volumeGroup) ListVolumes(ctx context.Context) ([]types.Volume, error) { + return vg.volumes, nil +} diff --git a/internal/rbd/types/group.go b/internal/rbd/types/group.go index 3beb4e953..d3c806a3b 100644 --- a/internal/rbd/types/group.go +++ b/internal/rbd/types/group.go @@ -19,6 +19,7 @@ package types import ( "context" + "github.com/ceph/go-ceph/rados" "github.com/csi-addons/spec/lib/go/volumegroup" ) @@ -28,11 +29,22 @@ type VolumeGroup interface { // Destroy frees the resources used by the VolumeGroup. Destroy(ctx context.Context) + // GetIOContext returns the IOContext for performing librbd operations + // on the VolumeGroup. This is used by the rbdVolume struct when it + // needs to add/remove itself from the VolumeGroup. + GetIOContext(ctx context.Context) (*rados.IOContext, error) + // GetID returns the CSI-Addons VolumeGroupId of the VolumeGroup. GetID(ctx context.Context) (string, error) + // GetName returns the name in the backend storage for the VolumeGroup. + GetName(ctx context.Context) (string, error) + // ToCSI creates a CSI-Addons type for the VolumeGroup. - ToCSI(ctx context.Context) *volumegroup.VolumeGroup + ToCSI(ctx context.Context) (*volumegroup.VolumeGroup, error) + + // Create makes a new group in the backend storage. + Create(ctx context.Context, name string) error // Delete removes the VolumeGroup from the backend storage. Delete(ctx context.Context) error