journal: fix connection problem with groupjournal

Same group jounral config need to be reused
for multiple connection where different monitors
and users are used, for that reason create a unique
connection each time.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2024-03-19 14:48:27 +01:00 committed by mergify[bot]
parent f17ea38736
commit 68e93a31cc

View File

@ -32,15 +32,7 @@ const (
) )
type VolumeGroupJournal interface { type VolumeGroupJournal interface {
// Connect establishes a new connection to a ceph cluster for journal metadata.
Connect(
monitors,
namespace string,
cr *util.Credentials) error
// Destroy frees any resources and invalidates the journal connection.
Destroy() Destroy()
// SetNamespace sets the namespace for the journal.
SetNamespace(ns string)
CheckReservation( CheckReservation(
ctx context.Context, ctx context.Context,
journalPool, journalPool,
@ -78,16 +70,20 @@ type VolumeGroupJournal interface {
volumeID string) error volumeID string) error
} }
// volumeGroupJournalConfig contains the configuration and connection details. // VolumeGroupJournalConfig contains the configuration.
type volumeGroupJournalConfig struct { type VolumeGroupJournalConfig struct {
*Config Config
*Connection }
type VolumeGroupJournalConnection struct {
config *VolumeGroupJournalConfig
connection *Connection
} }
// NewCSIVolumeGroupJournal returns an instance of VolumeGroupJournal for groups. // NewCSIVolumeGroupJournal returns an instance of VolumeGroupJournal for groups.
func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal { func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournalConfig {
return &volumeGroupJournalConfig{ return VolumeGroupJournalConfig{
Config: &Config{ Config: Config{
csiDirectory: "csi.groups." + suffix, csiDirectory: "csi.groups." + suffix,
csiNameKeyPrefix: "csi.volume.group.", csiNameKeyPrefix: "csi.volume.group.",
cephUUIDDirectoryPrefix: "csi.volume.group.", cephUUIDDirectoryPrefix: "csi.volume.group.",
@ -98,35 +94,42 @@ func NewCSIVolumeGroupJournal(suffix string) VolumeGroupJournal {
} }
} }
func (sgj *volumeGroupJournalConfig) SetNamespace(ns string) { // SetNamespace sets the namespace for the journal.
sgj.Config.namespace = ns func (vgc *VolumeGroupJournalConfig) SetNamespace(ns string) {
vgc.Config.namespace = ns
} }
// NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for // NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for
// volume groups using a predetermined namespace value. // volume groups using a predetermined namespace value.
func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournal { func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournalConfig {
j := NewCSIVolumeGroupJournal(suffix) j := NewCSIVolumeGroupJournal(suffix)
j.SetNamespace(ns) j.SetNamespace(ns)
return j return j
} }
func (sgj *volumeGroupJournalConfig) Connect( // Connect establishes a new connection to a ceph cluster for journal metadata.
func (vgc *VolumeGroupJournalConfig) Connect(
monitors, monitors,
namespace string, namespace string,
cr *util.Credentials, cr *util.Credentials,
) error { ) (VolumeGroupJournal, error) {
conn, err := sgj.Config.Connect(monitors, namespace, cr) vgjc := &VolumeGroupJournalConnection{}
if err != nil { vgjc.config = &VolumeGroupJournalConfig{
return err Config: vgc.Config,
} }
sgj.Connection = conn conn, err := vgc.Config.Connect(monitors, namespace, cr)
if err != nil {
return nil, err
}
vgjc.connection = conn
return nil return vgjc, nil
} }
func (sgj *volumeGroupJournalConfig) Destroy() { // Destroy frees any resources and invalidates the journal connection.
sgj.Connection.Destroy() func (vgjc *VolumeGroupJournalConnection) Destroy() {
vgjc.connection.Destroy()
} }
// VolumeGroupData contains the GroupUUID and VolumeGroupAttributes for a // VolumeGroupData contains the GroupUUID and VolumeGroupAttributes for a
@ -162,11 +165,11 @@ Return values:
reservation found. reservation found.
- error: non-nil in case of any errors. - error: non-nil in case of any errors.
*/ */
func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context, func (vgjc *VolumeGroupJournalConnection) CheckReservation(ctx context.Context,
journalPool, reqName, namePrefix string, journalPool, reqName, namePrefix string,
) (*VolumeGroupData, error) { ) (*VolumeGroupData, error) {
var ( var (
cj = sgj.Config cj = vgjc.config
volGroupData = &VolumeGroupData{} volGroupData = &VolumeGroupData{}
) )
@ -175,7 +178,7 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context,
cj.csiNameKeyPrefix + reqName, cj.csiNameKeyPrefix + reqName,
} }
values, err := getOMapValues( values, err := getOMapValues(
ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory,
cj.commonPrefix, fetchKeys) cj.commonPrefix, fetchKeys)
if err != nil { if err != nil {
if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) { if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) {
@ -195,13 +198,13 @@ func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context,
} }
volGroupData.GroupUUID = objUUID volGroupData.GroupUUID = objUUID
savedVolumeGroupAttributes, err := sgj.GetVolumeGroupAttributes(ctx, journalPool, savedVolumeGroupAttributes, err := vgjc.GetVolumeGroupAttributes(ctx, journalPool,
objUUID) objUUID)
if err != nil { if err != nil {
// error should specifically be not found, for image to be absent, any other error // error should specifically be not found, for image to be absent, any other error
// is not conclusive, and we should not proceed // is not conclusive, and we should not proceed
if errors.Is(err, util.ErrKeyNotFound) { if errors.Is(err, util.ErrKeyNotFound) {
err = sgj.UndoReservation(ctx, journalPool, err = vgjc.UndoReservation(ctx, journalPool,
generateVolumeGroupName(namePrefix, objUUID), reqName) generateVolumeGroupName(namePrefix, objUUID), reqName)
} }
@ -239,11 +242,11 @@ Input arguments:
- groupID: ID of the volume group, generated from the UUID - groupID: ID of the volume group, generated from the UUID
- reqName: Request name for the volume group - reqName: Request name for the volume group
*/ */
func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context, func (vgjc *VolumeGroupJournalConnection) UndoReservation(ctx context.Context,
csiJournalPool, groupID, reqName string, csiJournalPool, groupID, reqName string,
) error { ) error {
// delete volume UUID omap (first, inverse of create order) // delete volume UUID omap (first, inverse of create order)
cj := sgj.Config cj := vgjc.config
if groupID != "" { if groupID != "" {
if len(groupID) < uuidEncodedLength { if len(groupID) < uuidEncodedLength {
return fmt.Errorf("unable to parse UUID from %s, too short", groupID) return fmt.Errorf("unable to parse UUID from %s, too short", groupID)
@ -256,8 +259,8 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context,
err := util.RemoveObject( err := util.RemoveObject(
ctx, ctx,
sgj.Connection.monitors, vgjc.connection.monitors,
sgj.Connection.cr, vgjc.connection.cr,
csiJournalPool, csiJournalPool,
cj.namespace, cj.namespace,
cj.cephUUIDDirectoryPrefix+groupUUID) cj.cephUUIDDirectoryPrefix+groupUUID)
@ -271,7 +274,7 @@ func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context,
} }
// delete the request name key (last, inverse of create order) // delete the request name key (last, inverse of create order)
err := removeMapKeys(ctx, sgj.Connection, csiJournalPool, cj.namespace, cj.csiDirectory, err := removeMapKeys(ctx, vgjc.connection, csiJournalPool, cj.namespace, cj.csiDirectory,
[]string{cj.csiNameKeyPrefix + reqName}) []string{cj.csiNameKeyPrefix + reqName})
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err) log.ErrorLog(ctx, "failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err)
@ -299,11 +302,11 @@ Return values:
- string: Contains the VolumeGroup name that was reserved for the passed in reqName - string: Contains the VolumeGroup name that was reserved for the passed in reqName
- error: non-nil in case of any errors - error: non-nil in case of any errors
*/ */
func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context, func (vgjc *VolumeGroupJournalConnection) ReserveName(ctx context.Context,
journalPool string, journalPoolID int64, journalPool string, journalPoolID int64,
reqName, namePrefix string, reqName, namePrefix string,
) (string, string, error) { ) (string, string, error) {
cj := sgj.Config cj := vgjc.config
// Create the UUID based omap first, to reserve the same and avoid conflicts // Create the UUID based omap first, to reserve the same and avoid conflicts
// NOTE: If any service loss occurs post creation of the UUID directory, and before // NOTE: If any service loss occurs post creation of the UUID directory, and before
@ -311,8 +314,8 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
// UUID directory key will be leaked // UUID directory key will be leaked
objUUID, err := reserveOMapName( objUUID, err := reserveOMapName(
ctx, ctx,
sgj.Connection.monitors, vgjc.connection.monitors,
sgj.Connection.cr, vgjc.connection.cr,
journalPool, journalPool,
cj.namespace, cj.namespace,
cj.cephUUIDDirectoryPrefix, cj.cephUUIDDirectoryPrefix,
@ -325,7 +328,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
// After generating the UUID Directory omap, we populate the csiDirectory // After generating the UUID Directory omap, we populate the csiDirectory
// omap with a key-value entry to map the request to the backend volume group: // omap with a key-value entry to map the request to the backend volume group:
// `csiNameKeyPrefix + reqName: nameKeyVal` // `csiNameKeyPrefix + reqName: nameKeyVal`
err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory, err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, cj.csiDirectory,
map[string]string{cj.csiNameKeyPrefix + reqName: nameKeyVal}) map[string]string{cj.csiNameKeyPrefix + reqName: nameKeyVal})
if err != nil { if err != nil {
return "", "", err return "", "", err
@ -333,7 +336,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
defer func() { defer func() {
if err != nil { if err != nil {
log.WarningLog(ctx, "reservation failed for volume group: %s", reqName) log.WarningLog(ctx, "reservation failed for volume group: %s", reqName)
errDefer := sgj.UndoReservation(ctx, journalPool, groupName, reqName) errDefer := vgjc.UndoReservation(ctx, journalPool, groupName, reqName)
if errDefer != nil { if errDefer != nil {
log.WarningLog(ctx, "failed undoing reservation of volume group: %s (%v)", reqName, errDefer) log.WarningLog(ctx, "failed undoing reservation of volume group: %s (%v)", reqName, errDefer)
} }
@ -347,7 +350,7 @@ func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
omapValues[cj.csiNameKey] = reqName omapValues[cj.csiNameKey] = reqName
omapValues[cj.csiImageKey] = groupName omapValues[cj.csiImageKey] = groupName
err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, oid, omapValues) err = setOMapKeys(ctx, vgjc.connection, journalPool, cj.namespace, oid, omapValues)
if err != nil { if err != nil {
return "", "", err return "", "", err
} }
@ -363,18 +366,18 @@ type VolumeGroupAttributes struct {
VolumeSnapshotMap map[string]string // Contains the volumeID and the corresponding snapshotID mapping VolumeSnapshotMap map[string]string // Contains the volumeID and the corresponding snapshotID mapping
} }
func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes( func (vgjc *VolumeGroupJournalConnection) GetVolumeGroupAttributes(
ctx context.Context, ctx context.Context,
pool, objectUUID string, pool, objectUUID string,
) (*VolumeGroupAttributes, error) { ) (*VolumeGroupAttributes, error) {
var ( var (
err error err error
groupAttributes = &VolumeGroupAttributes{} groupAttributes = &VolumeGroupAttributes{}
cj = sgj.Config cj = vgjc.config
) )
values, err := listOMapValues( values, err := listOMapValues(
ctx, sgj.Connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, ctx, vgjc.connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID,
cj.commonPrefix) cj.commonPrefix)
if err != nil { if err != nil {
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) { if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) {
@ -398,14 +401,14 @@ func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes(
return groupAttributes, nil return groupAttributes, nil
} }
func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping( func (vgjc *VolumeGroupJournalConnection) AddVolumeSnapshotMapping(
ctx context.Context, ctx context.Context,
pool, pool,
reservedUUID, reservedUUID,
volumeID, volumeID,
snapshotID string, snapshotID string,
) error { ) error {
err := setOMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, err := setOMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace, vgjc.config.cephUUIDDirectoryPrefix+reservedUUID,
map[string]string{volumeID: snapshotID}) map[string]string{volumeID: snapshotID})
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed adding volume snapshot mapping: %v", err) log.ErrorLog(ctx, "failed adding volume snapshot mapping: %v", err)
@ -416,13 +419,14 @@ func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping(
return nil return nil
} }
func (sgj *volumeGroupJournalConfig) RemoveVolumeSnapshotMapping( func (vgjc *VolumeGroupJournalConnection) RemoveVolumeSnapshotMapping(
ctx context.Context, ctx context.Context,
pool, pool,
reservedUUID, reservedUUID,
volumeID string, volumeID string,
) error { ) error {
err := removeMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID, err := removeMapKeys(ctx, vgjc.connection, pool, vgjc.config.namespace,
vgjc.config.cephUUIDDirectoryPrefix+reservedUUID,
[]string{volumeID}) []string{volumeID})
if err != nil { if err != nil {
log.ErrorLog(ctx, "failed removing volume snapshot mapping: %v", err) log.ErrorLog(ctx, "failed removing volume snapshot mapping: %v", err)