mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-21 22:00:19 +00:00
rbd: pass CSI-drivername to volume group instead of journal instance
Each object is responsible for maintaining a connection to the journal. By sharing a single journal, cleanup of objects becomes more complex as the journal is used in deferred functions and only the last should destroy the journal connection resources. Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
parent
29bf5797b0
commit
9808408340
@ -57,13 +57,16 @@ type commonVolumeGroup struct {
|
|||||||
pool string
|
pool string
|
||||||
namespace string
|
namespace string
|
||||||
|
|
||||||
|
// csiDriver is the CSI drivername that is required to connect the journal
|
||||||
|
csiDriver string
|
||||||
|
// use getJournal() to make sure the journal is connected
|
||||||
journal journal.VolumeGroupJournal
|
journal journal.VolumeGroupJournal
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cvg *commonVolumeGroup) initCommonVolumeGroup(
|
func (cvg *commonVolumeGroup) initCommonVolumeGroup(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
id string,
|
id string,
|
||||||
j journal.VolumeGroupJournal,
|
csiDriver string,
|
||||||
creds *util.Credentials,
|
creds *util.Credentials,
|
||||||
) error {
|
) error {
|
||||||
csiID := util.CSIIdentifier{}
|
csiID := util.CSIIdentifier{}
|
||||||
@ -87,7 +90,7 @@ func (cvg *commonVolumeGroup) initCommonVolumeGroup(
|
|||||||
return fmt.Errorf("failed to get pool for volume group id %q: %w", id, err)
|
return fmt.Errorf("failed to get pool for volume group id %q: %w", id, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
cvg.journal = j
|
cvg.csiDriver = csiDriver
|
||||||
cvg.credentials = creds
|
cvg.credentials = creds
|
||||||
cvg.id = id
|
cvg.id = id
|
||||||
cvg.clusterID = csiID.ClusterID
|
cvg.clusterID = csiID.ClusterID
|
||||||
@ -113,18 +116,27 @@ func (cvg *commonVolumeGroup) Destroy(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if cvg.credentials != nil {
|
if cvg.credentials != nil {
|
||||||
cvg.credentials.DeleteCredentials()
|
// credentials should only be removed with DeleteCredentials()
|
||||||
|
// by the caller that allocated them
|
||||||
cvg.credentials = nil
|
cvg.credentials = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.DebugLog(ctx, "destroyed volume group instance with id %q", cvg.id)
|
if cvg.journal != nil {
|
||||||
|
cvg.journal.Destroy()
|
||||||
|
cvg.journal = nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getVolumeGroupAttributes fetches the attributes from the journal, sets some
|
// getVolumeGroupAttributes fetches the attributes from the journal, sets some
|
||||||
// of the common values for the VolumeGroup and returns the attributes struct
|
// of the common values for the VolumeGroup and returns the attributes struct
|
||||||
// for further consumption (like checking the VolumeMap).
|
// for further consumption (like checking the VolumeMap).
|
||||||
func (cvg *commonVolumeGroup) getVolumeGroupAttributes(ctx context.Context) (*journal.VolumeGroupAttributes, error) {
|
func (cvg *commonVolumeGroup) getVolumeGroupAttributes(ctx context.Context) (*journal.VolumeGroupAttributes, error) {
|
||||||
attrs, err := cvg.journal.GetVolumeGroupAttributes(ctx, cvg.pool, cvg.objectUUID)
|
j, err := cvg.getJournal(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
attrs, err := j.GetVolumeGroupAttributes(ctx, cvg.pool, cvg.objectUUID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) {
|
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) {
|
||||||
return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", cvg.id, err)
|
return nil, fmt.Errorf("failed to get attributes for volume group id %q: %w", cvg.id, err)
|
||||||
@ -197,6 +209,12 @@ func (cvg *commonVolumeGroup) getConnection(ctx context.Context) (*util.ClusterC
|
|||||||
return cvg.conn, nil
|
return cvg.conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cvg.credentials == nil {
|
||||||
|
log.DebugLog(ctx, "missing credentials for common volume group %q: %s", cvg, util.CallStack())
|
||||||
|
|
||||||
|
return nil, errors.New("can not connect to cluster without credentials")
|
||||||
|
}
|
||||||
|
|
||||||
conn := &util.ClusterConnection{}
|
conn := &util.ClusterConnection{}
|
||||||
err := conn.Connect(cvg.monitors, cvg.credentials)
|
err := conn.Connect(cvg.monitors, cvg.credentials)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -209,6 +227,29 @@ func (cvg *commonVolumeGroup) getConnection(ctx context.Context) (*util.ClusterC
|
|||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cvg *commonVolumeGroup) getJournal(ctx context.Context) (journal.VolumeGroupJournal, error) {
|
||||||
|
if cvg.journal != nil {
|
||||||
|
return cvg.journal, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if cvg.credentials == nil {
|
||||||
|
log.DebugLog(ctx, "missing credentials for common volume group %q: %s", cvg, util.CallStack())
|
||||||
|
|
||||||
|
return nil, errors.New("can not connect the journal without credentials")
|
||||||
|
}
|
||||||
|
|
||||||
|
journalConfig := journal.NewCSIVolumeGroupJournalWithNamespace(cvg.csiDriver, cvg.namespace)
|
||||||
|
|
||||||
|
j, err := journalConfig.Connect(cvg.monitors, cvg.namespace, cvg.credentials)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to connect to journal: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
cvg.journal = j
|
||||||
|
|
||||||
|
return j, nil
|
||||||
|
}
|
||||||
|
|
||||||
// GetIOContext returns the IOContext for the volume group if it exists,
|
// GetIOContext returns the IOContext for the volume group if it exists,
|
||||||
// otherwise it will allocate a new one.
|
// otherwise it will allocate a new one.
|
||||||
// Destroy should be used to free the IOContext.
|
// Destroy should be used to free the IOContext.
|
||||||
@ -254,7 +295,12 @@ func (cvg *commonVolumeGroup) Delete(ctx context.Context) error {
|
|||||||
return fmt.Errorf("failed to get pool for volume group %q: %w", cvg, err)
|
return fmt.Errorf("failed to get pool for volume group %q: %w", cvg, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = cvg.journal.UndoReservation(ctx, pool, name, csiID)
|
j, err := cvg.getJournal(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = j.UndoReservation(ctx, pool, name, csiID)
|
||||||
if err != nil /* TODO? !errors.Is(..., err) */ {
|
if err != nil /* TODO? !errors.Is(..., err) */ {
|
||||||
return fmt.Errorf("failed to undo the reservation for volume group %q: %w", cvg, err)
|
return fmt.Errorf("failed to undo the reservation for volume group %q: %w", cvg, err)
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,6 @@ import (
|
|||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
||||||
"github.com/csi-addons/spec/lib/go/volumegroup"
|
"github.com/csi-addons/spec/lib/go/volumegroup"
|
||||||
|
|
||||||
"github.com/ceph/ceph-csi/internal/journal"
|
|
||||||
"github.com/ceph/ceph-csi/internal/rbd/types"
|
"github.com/ceph/ceph-csi/internal/rbd/types"
|
||||||
"github.com/ceph/ceph-csi/internal/util"
|
"github.com/ceph/ceph-csi/internal/util"
|
||||||
"github.com/ceph/ceph-csi/internal/util/log"
|
"github.com/ceph/ceph-csi/internal/util/log"
|
||||||
@ -36,7 +35,7 @@ var ErrRBDGroupNotConnected = errors.New("RBD group is not connected")
|
|||||||
|
|
||||||
// volumeGroup handles all requests for 'rbd group' operations.
|
// volumeGroup handles all requests for 'rbd group' operations.
|
||||||
type volumeGroup struct {
|
type volumeGroup struct {
|
||||||
*commonVolumeGroup
|
commonVolumeGroup
|
||||||
|
|
||||||
// volumes is a list of rbd-images that are part of the group. The ID
|
// volumes is a list of rbd-images that are part of the group. The ID
|
||||||
// of each volume is stored in the journal.
|
// of each volume is stored in the journal.
|
||||||
@ -62,12 +61,12 @@ var (
|
|||||||
func GetVolumeGroup(
|
func GetVolumeGroup(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
id string,
|
id string,
|
||||||
j journal.VolumeGroupJournal,
|
csiDriver string,
|
||||||
creds *util.Credentials,
|
creds *util.Credentials,
|
||||||
volumeResolver types.VolumeResolver,
|
volumeResolver types.VolumeResolver,
|
||||||
) (types.VolumeGroup, error) {
|
) (types.VolumeGroup, error) {
|
||||||
vg := &volumeGroup{}
|
vg := &volumeGroup{}
|
||||||
err := vg.initCommonVolumeGroup(ctx, id, j, creds)
|
err := vg.initCommonVolumeGroup(ctx, id, csiDriver, creds)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to initialize volume group with id %q: %w", id, err)
|
return nil, fmt.Errorf("failed to initialize volume group with id %q: %w", id, err)
|
||||||
}
|
}
|
||||||
@ -235,7 +234,12 @@ func (vg *volumeGroup) AddVolume(ctx context.Context, vol types.Volume) error {
|
|||||||
volID: "",
|
volID: "",
|
||||||
}
|
}
|
||||||
|
|
||||||
err = vg.journal.AddVolumesMapping(ctx, pool, csiID.ObjectUUID, toAdd)
|
j, err := vg.getJournal(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = j.AddVolumesMapping(ctx, pool, csiID.ObjectUUID, toAdd)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to add mapping for volume %q to volume group id %q: %w", volID, id, err)
|
return fmt.Errorf("failed to add mapping for volume %q to volume group id %q: %w", volID, id, err)
|
||||||
}
|
}
|
||||||
@ -304,7 +308,12 @@ func (vg *volumeGroup) RemoveVolume(ctx context.Context, vol types.Volume) error
|
|||||||
toRemove,
|
toRemove,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = vg.journal.RemoveVolumesMapping(ctx, pool, csiID.ObjectUUID, mapping)
|
j, err := vg.getJournal(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = j.RemoveVolumesMapping(ctx, pool, csiID.ObjectUUID, mapping)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to remove mapping for volume %q to volume group id %q: %w", toRemove, id, err)
|
return fmt.Errorf("failed to remove mapping for volume %q to volume group id %q: %w", toRemove, id, err)
|
||||||
}
|
}
|
||||||
|
@ -140,22 +140,12 @@ func (mgr *rbdManager) GetVolumeByID(ctx context.Context, id string) (types.Volu
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (mgr *rbdManager) GetVolumeGroupByID(ctx context.Context, id string) (types.VolumeGroup, error) {
|
func (mgr *rbdManager) GetVolumeGroupByID(ctx context.Context, id string) (types.VolumeGroup, error) {
|
||||||
vi := &util.CSIIdentifier{}
|
|
||||||
if err := vi.DecomposeCSIID(id); err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to parse volume group id %q: %w", id, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
vgJournal, err := mgr.getVolumeGroupJournal(vi.ClusterID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
creds, err := mgr.getCredentials()
|
creds, err := mgr.getCredentials()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
vg, err := rbd_group.GetVolumeGroup(ctx, id, vgJournal, creds, mgr)
|
vg, err := rbd_group.GetVolumeGroup(ctx, id, mgr.csiID, creds, mgr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get volume group with id %q: %w", id, err)
|
return nil, fmt.Errorf("failed to get volume group with id %q: %w", id, err)
|
||||||
}
|
}
|
||||||
@ -236,7 +226,7 @@ func (mgr *rbdManager) CreateVolumeGroup(ctx context.Context, name string) (type
|
|||||||
return nil, fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", uuid, err)
|
return nil, fmt.Errorf("failed to generate a unique CSI volume group with uuid for %q: %w", uuid, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
vg, err := rbd_group.GetVolumeGroup(ctx, csiID, vgJournal, creds, mgr)
|
vg, err := rbd_group.GetVolumeGroup(ctx, csiID, mgr.csiID, creds, mgr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to get volume group %q at cluster %q: %w", name, clusterID, err)
|
return nil, fmt.Errorf("failed to get volume group %q at cluster %q: %w", name, clusterID, err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user