mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-12-18 11:00:25 +00:00
journal: omap implementation for volumegroup
Implement the required function to store/retrieve the details from the omap for the volumegroup. This adds a new omap object that contains the mapping of the RequestName and all the volumeID and its corresponding snapshotID belonging to a group. Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
parent
9e08a67b36
commit
dd235d88e0
434
internal/journal/volumegroupjournal.go
Normal file
434
internal/journal/volumegroupjournal.go
Normal file
@ -0,0 +1,434 @@
|
||||
/*
|
||||
Copyright 2024 The Ceph-CSI Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package journal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/ceph/ceph-csi/internal/util"
|
||||
"github.com/ceph/ceph-csi/internal/util/log"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultVolumeGroupNamingPrefix string = "csi-vol-group-"
|
||||
)
|
||||
|
||||
type VolumeGroupJournal interface {
|
||||
// Connect establishes a new connection to a ceph cluster for journal metadata.
|
||||
Connect(
|
||||
monitors,
|
||||
namespace string,
|
||||
cr *util.Credentials) (*volumeGroupJournalConfig, error)
|
||||
// Destroy frees any resources and invalidates the journal connection.
|
||||
Destroy()
|
||||
// SetNamespace sets the namespace for the journal.
|
||||
SetNamespace(ns string)
|
||||
CheckReservation(
|
||||
ctx context.Context,
|
||||
journalPool,
|
||||
reqName,
|
||||
namePrefix string) (*VolumeGroupData, error)
|
||||
UndoReservation(
|
||||
ctx context.Context,
|
||||
csiJournalPool,
|
||||
snapshotGroupName,
|
||||
reqName string) error
|
||||
// GetGroupAttributes fetches all keys and their values, from a UUID directory,
|
||||
// returning VolumeGroupAttributes structure.
|
||||
GetVolumeGroupAttributes(
|
||||
ctx context.Context,
|
||||
pool,
|
||||
objectUUID string) (*VolumeGroupAttributes, error)
|
||||
ReserveName(
|
||||
ctx context.Context,
|
||||
journalPool string,
|
||||
journalPoolID int64,
|
||||
reqName,
|
||||
namePrefix string) (string, string, error)
|
||||
// AddVolumeSnapshotMapping adds a volumeID and snapshotID mapping to the UUID directory.
|
||||
AddVolumeSnapshotMapping(
|
||||
ctx context.Context,
|
||||
pool,
|
||||
reservedUUID,
|
||||
volumeID,
|
||||
snapshotID string) error
|
||||
// RemoveVolumeSnapshotMapping removes a volumeID and snapshotID mapping from the UUID directory.
|
||||
RemoveVolumeSnapshotMapping(
|
||||
ctx context.Context,
|
||||
pool,
|
||||
reservedUUID,
|
||||
volumeID string) error
|
||||
}
|
||||
|
||||
// volumeGroupJournalConfig contains the configuration and connection details.
|
||||
type volumeGroupJournalConfig struct {
|
||||
*Config
|
||||
*Connection
|
||||
}
|
||||
|
||||
// NewCSIVolumeroupJournal returns an instance of VolumeGroupJournal for groups.
|
||||
func NewCSIVolumeroupJournal(suffix string) VolumeGroupJournal {
|
||||
return &volumeGroupJournalConfig{
|
||||
Config: &Config{
|
||||
csiDirectory: "csi.groups." + suffix,
|
||||
csiNameKeyPrefix: "csi.volume.group.",
|
||||
cephUUIDDirectoryPrefix: "csi.volume.group.",
|
||||
csiImageKey: "csi.groupname",
|
||||
csiNameKey: "csi.volname",
|
||||
namespace: "",
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (sgj *volumeGroupJournalConfig) SetNamespace(ns string) {
|
||||
sgj.Config.namespace = ns
|
||||
}
|
||||
|
||||
// NewCSIVolumeGroupJournalWithNamespace returns an instance of VolumeGroupJournal for
|
||||
// volume groups using a predetermined namespace value.
|
||||
func NewCSIVolumeGroupJournalWithNamespace(suffix, ns string) VolumeGroupJournal {
|
||||
j := NewCSIVolumeroupJournal(suffix)
|
||||
j.SetNamespace(ns)
|
||||
|
||||
return j
|
||||
}
|
||||
|
||||
func (sgj *volumeGroupJournalConfig) Connect(
|
||||
monitors,
|
||||
namespace string,
|
||||
cr *util.Credentials,
|
||||
) (*volumeGroupJournalConfig, error) {
|
||||
conn, err := sgj.Config.Connect(monitors, namespace, cr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sgj.Connection = conn
|
||||
|
||||
return sgj, nil
|
||||
}
|
||||
|
||||
func (sgj *volumeGroupJournalConfig) Destroy() {
|
||||
sgj.Connection.Destroy()
|
||||
}
|
||||
|
||||
// VolumeGroupData contains the GroupUUID and VolumeGroupAttributes for a
|
||||
// volume group.
|
||||
type VolumeGroupData struct {
|
||||
GroupUUID string
|
||||
GroupName string
|
||||
VolumeGroupAttributes *VolumeGroupAttributes
|
||||
}
|
||||
|
||||
func generateVolumeGroupName(namePrefix, groupUUID string) string {
|
||||
if namePrefix == "" {
|
||||
namePrefix = defaultVolumeGroupNamingPrefix
|
||||
}
|
||||
|
||||
return namePrefix + groupUUID
|
||||
}
|
||||
|
||||
/*
|
||||
CheckReservation checks if given request name contains a valid reservation
|
||||
- If there is a valid reservation, then the corresponding VolumeGroupData for
|
||||
the snapshot group is returned
|
||||
- If there is a reservation that is stale (or not fully cleaned up), it is
|
||||
garbage collected using the UndoReservation call, as appropriate
|
||||
|
||||
NOTE: As the function manipulates omaps, it should be called with a lock
|
||||
against the request name held, to prevent parallel operations from modifying
|
||||
the state of the omaps for this request name.
|
||||
|
||||
Return values:
|
||||
- VolumeGroupData: which contains the GroupUUID and GroupSnapshotAttributes
|
||||
that were reserved for the passed in reqName, empty if there was no
|
||||
reservation found.
|
||||
- error: non-nil in case of any errors.
|
||||
*/
|
||||
func (sgj *volumeGroupJournalConfig) CheckReservation(ctx context.Context,
|
||||
journalPool, reqName, namePrefix string,
|
||||
) (*VolumeGroupData, error) {
|
||||
var (
|
||||
cj = sgj.Config
|
||||
volGroupData = &VolumeGroupData{}
|
||||
)
|
||||
|
||||
// check if request name is already part of the directory omap
|
||||
fetchKeys := []string{
|
||||
cj.csiNameKeyPrefix + reqName,
|
||||
}
|
||||
values, err := getOMapValues(
|
||||
ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory,
|
||||
cj.commonPrefix, fetchKeys)
|
||||
if err != nil {
|
||||
if errors.Is(err, util.ErrKeyNotFound) || errors.Is(err, util.ErrPoolNotFound) {
|
||||
// pool or omap (oid) was not present
|
||||
// stop processing but without an error for no reservation exists
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objUUID, found := values[cj.csiNameKeyPrefix+reqName]
|
||||
if !found {
|
||||
// omap was read but was missing the desired key-value pair
|
||||
// stop processing but without an error for no reservation exists
|
||||
return nil, nil
|
||||
}
|
||||
volGroupData.GroupUUID = objUUID
|
||||
|
||||
savedVolumeGroupAttributes, err := sgj.GetVolumeGroupAttributes(ctx, journalPool,
|
||||
objUUID)
|
||||
if err != nil {
|
||||
// error should specifically be not found, for image to be absent, any other error
|
||||
// is not conclusive, and we should not proceed
|
||||
if errors.Is(err, util.ErrKeyNotFound) {
|
||||
err = sgj.UndoReservation(ctx, journalPool,
|
||||
generateVolumeGroupName(namePrefix, objUUID), reqName)
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// check if the request name in the omap matches the passed in request name
|
||||
if savedVolumeGroupAttributes.RequestName != reqName {
|
||||
// NOTE: This should never be possible, hence no cleanup, but log error
|
||||
// and return, as cleanup may need to occur manually!
|
||||
return nil, fmt.Errorf("internal state inconsistent, omap names mismatch,"+
|
||||
" request name (%s) volume group UUID (%s) volume group omap name (%s)",
|
||||
reqName, objUUID, savedVolumeGroupAttributes.RequestName)
|
||||
}
|
||||
volGroupData.GroupName = savedVolumeGroupAttributes.GroupName
|
||||
volGroupData.VolumeGroupAttributes = &VolumeGroupAttributes{}
|
||||
volGroupData.VolumeGroupAttributes.RequestName = savedVolumeGroupAttributes.RequestName
|
||||
volGroupData.VolumeGroupAttributes.VolumeSnapshotMap = savedVolumeGroupAttributes.VolumeSnapshotMap
|
||||
|
||||
return volGroupData, nil
|
||||
}
|
||||
|
||||
/*
|
||||
UndoReservation undoes a reservation, in the reverse order of ReserveName
|
||||
- The UUID directory is cleaned up before the GroupName key in the csiDirectory is cleaned up
|
||||
|
||||
NOTE: Ensure that the Ceph volume snapshots backing the reservation is cleaned up
|
||||
prior to cleaning up the reservation
|
||||
|
||||
NOTE: As the function manipulates omaps, it should be called with a lock against the request name
|
||||
held, to prevent parallel operations from modifying the state of the omaps for this request name.
|
||||
|
||||
Input arguments:
|
||||
- csiJournalPool: Pool name that holds the CSI request name based journal
|
||||
- groupID: ID of the volume group, generated from the UUID
|
||||
- reqName: Request name for the volume group
|
||||
*/
|
||||
func (sgj *volumeGroupJournalConfig) UndoReservation(ctx context.Context,
|
||||
csiJournalPool, groupID, reqName string,
|
||||
) error {
|
||||
// delete volume UUID omap (first, inverse of create order)
|
||||
cj := sgj.Config
|
||||
if groupID != "" {
|
||||
if len(groupID) < uuidEncodedLength {
|
||||
return fmt.Errorf("unable to parse UUID from %s, too short", groupID)
|
||||
}
|
||||
|
||||
groupUUID := groupID[len(groupID)-36:]
|
||||
if _, err := uuid.Parse(groupUUID); err != nil {
|
||||
return fmt.Errorf("failed parsing UUID in %s: %w", groupUUID, err)
|
||||
}
|
||||
|
||||
err := util.RemoveObject(
|
||||
ctx,
|
||||
sgj.Connection.monitors,
|
||||
sgj.Connection.cr,
|
||||
csiJournalPool,
|
||||
cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+groupUUID)
|
||||
if err != nil {
|
||||
if !errors.Is(err, util.ErrObjectNotFound) {
|
||||
log.ErrorLog(ctx, "failed removing oMap %s (%s)", cj.cephUUIDDirectoryPrefix+groupUUID, err)
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// delete the request name key (last, inverse of create order)
|
||||
err := removeMapKeys(ctx, sgj.Connection, csiJournalPool, cj.namespace, cj.csiDirectory,
|
||||
[]string{cj.csiNameKeyPrefix + reqName})
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
/*
|
||||
ReserveName adds respective entries to the csiDirectory omaps, post generating a target
|
||||
UUIDDirectory for use. Further, these functions update the UUIDDirectory omaps, to store back
|
||||
pointers to the CSI generated request names.
|
||||
|
||||
NOTE: As the function manipulates omaps, it should be called with a lock against the request name
|
||||
held, to prevent parallel operations from modifying the state of the omaps for this request name.
|
||||
|
||||
Input arguments:
|
||||
- journalPool: Pool where the CSI journal is stored
|
||||
- journalPoolID: pool ID of the journalPool
|
||||
- reqName: Name of the volumeGroupSnapshot request received
|
||||
- namePrefix: Prefix to use when generating the volumeGroupName name (suffix is an auto-generated UUID)
|
||||
|
||||
Return values:
|
||||
- string: Contains the UUID that was reserved for the passed in reqName
|
||||
- string: Contains the VolumeGroup name that was reserved for the passed in reqName
|
||||
- error: non-nil in case of any errors
|
||||
*/
|
||||
func (sgj *volumeGroupJournalConfig) ReserveName(ctx context.Context,
|
||||
journalPool string, journalPoolID int64,
|
||||
reqName, namePrefix string,
|
||||
) (string, string, error) {
|
||||
cj := sgj.Config
|
||||
|
||||
// Create the UUID based omap first, to reserve the same and avoid conflicts
|
||||
// NOTE: If any service loss occurs post creation of the UUID directory, and before
|
||||
// setting the request name key to point back to the UUID directory, the
|
||||
// UUID directory key will be leaked
|
||||
objUUID, err := reserveOMapName(
|
||||
ctx,
|
||||
sgj.Connection.monitors,
|
||||
sgj.Connection.cr,
|
||||
journalPool,
|
||||
cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix,
|
||||
"")
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
groupName := generateVolumeGroupName(namePrefix, objUUID)
|
||||
nameKeyVal := objUUID
|
||||
// After generating the UUID Directory omap, we populate the csiDirectory
|
||||
// omap with a key-value entry to map the request to the backend volume group:
|
||||
// `csiNameKeyPrefix + reqName: nameKeyVal`
|
||||
err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, cj.csiDirectory,
|
||||
map[string]string{cj.csiNameKeyPrefix + reqName: nameKeyVal})
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.WarningLog(ctx, "reservation failed for volume group: %s", reqName)
|
||||
errDefer := sgj.UndoReservation(ctx, journalPool, groupName, reqName)
|
||||
if errDefer != nil {
|
||||
log.WarningLog(ctx, "failed undoing reservation of volume group: %s (%v)", reqName, errDefer)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
oid := cj.cephUUIDDirectoryPrefix + objUUID
|
||||
omapValues := map[string]string{}
|
||||
|
||||
// Update UUID directory to store CSI request name
|
||||
omapValues[cj.csiNameKey] = reqName
|
||||
omapValues[cj.csiImageKey] = groupName
|
||||
|
||||
err = setOMapKeys(ctx, sgj.Connection, journalPool, cj.namespace, oid, omapValues)
|
||||
if err != nil {
|
||||
return "", "", err
|
||||
}
|
||||
|
||||
return objUUID, groupName, nil
|
||||
}
|
||||
|
||||
// VolumeGroupAttributes contains the request name and the volumeID's and
|
||||
// the corresponding snapshotID's.
|
||||
type VolumeGroupAttributes struct {
|
||||
RequestName string // Contains the request name for the passed in UUID
|
||||
GroupName string // Contains the group name
|
||||
VolumeSnapshotMap map[string]string // Contains the volumeID and the corresponding snapshotID mapping
|
||||
}
|
||||
|
||||
func (sgj *volumeGroupJournalConfig) GetVolumeGroupAttributes(
|
||||
ctx context.Context,
|
||||
pool, objectUUID string,
|
||||
) (*VolumeGroupAttributes, error) {
|
||||
var (
|
||||
err error
|
||||
groupAttributes = &VolumeGroupAttributes{}
|
||||
cj = sgj.Config
|
||||
)
|
||||
|
||||
values, err := listOMapValues(
|
||||
ctx, sgj.Connection, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID,
|
||||
cj.commonPrefix)
|
||||
if err != nil {
|
||||
if !errors.Is(err, util.ErrKeyNotFound) && !errors.Is(err, util.ErrPoolNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
log.WarningLog(ctx, "unable to read omap values: pool missing: %v", err)
|
||||
}
|
||||
|
||||
groupAttributes.RequestName = values[cj.csiNameKey]
|
||||
groupAttributes.GroupName = values[cj.csiImageKey]
|
||||
|
||||
// Remove request name key and group name key from the omap, as we are
|
||||
// looking for volumeID/snapshotID mapping
|
||||
delete(values, cj.csiNameKey)
|
||||
delete(values, cj.csiImageKey)
|
||||
groupAttributes.VolumeSnapshotMap = map[string]string{}
|
||||
for k, v := range values {
|
||||
groupAttributes.VolumeSnapshotMap[k] = v
|
||||
}
|
||||
|
||||
return groupAttributes, nil
|
||||
}
|
||||
|
||||
func (sgj *volumeGroupJournalConfig) AddVolumeSnapshotMapping(
|
||||
ctx context.Context,
|
||||
pool,
|
||||
reservedUUID,
|
||||
volumeID,
|
||||
snapshotID string,
|
||||
) error {
|
||||
err := setOMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID,
|
||||
map[string]string{volumeID: snapshotID})
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "failed adding volume snapshot mapping: %v", err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sgj *volumeGroupJournalConfig) RemoveVolumeSnapshotMapping(
|
||||
ctx context.Context,
|
||||
pool,
|
||||
reservedUUID,
|
||||
volumeID string,
|
||||
) error {
|
||||
err := removeMapKeys(ctx, sgj.Connection, pool, sgj.Config.namespace, sgj.Config.cephUUIDDirectoryPrefix+reservedUUID,
|
||||
[]string{volumeID})
|
||||
if err != nil {
|
||||
log.ErrorLog(ctx, "failed removing volume snapshot mapping: %v", err)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
Loading…
Reference in New Issue
Block a user