journal: convert journal to use new omap functions

Convert the business-logic of the journal to use the new go-ceph based
omap manipulation functions.

Signed-off-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
John Mulligan 2020-05-21 15:58:08 -04:00 committed by mergify[bot]
parent 0b99bdaa82
commit cd24bb3f5c

View File

@ -211,14 +211,21 @@ type Connection struct {
// connection metadata
monitors string
cr *util.Credentials
// cached cluster connection (required by go-ceph)
conn *util.ClusterConnection
}
// Connect establishes a new connection to a ceph cluster for journal metadata.
func (cj *Config) Connect(monitors string, cr *util.Credentials) (*Connection, error) {
cc := &util.ClusterConnection{}
if err := cc.Connect(monitors, cr); err != nil {
return nil, err
}
conn := &Connection{
config: cj,
monitors: monitors,
cr: cr,
conn: cc,
}
return conn, nil
}
@ -257,7 +264,7 @@ func (conn *Connection) CheckReservation(ctx context.Context,
}
// check if request name is already part of the directory omap
objUUIDAndPool, err := util.GetOMapValue(ctx, conn.monitors, conn.cr, journalPool, cj.namespace, cj.csiDirectory,
objUUIDAndPool, err := getOneOMapValue(ctx, conn, journalPool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName)
if err != nil {
// error should specifically be not found, for volume to be absent, any other error
@ -389,7 +396,7 @@ func (conn *Connection) UndoReservation(ctx context.Context,
}
// delete the request name key (last, inverse of create order)
err := util.RemoveOMapKey(ctx, conn.monitors, conn.cr, csiJournalPool, cj.namespace, cj.csiDirectory,
err := removeOneOMapKey(ctx, conn, csiJournalPool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName)
if err != nil {
klog.Errorf(util.Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err)
@ -495,7 +502,7 @@ func (conn *Connection) ReserveName(ctx context.Context,
nameKeyVal = volUUID
}
err = util.SetOMapKeyValue(ctx, conn.monitors, conn.cr, journalPool, cj.namespace, cj.csiDirectory,
err = setOneOMapKey(ctx, conn, journalPool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName, nameKeyVal)
if err != nil {
return "", "", err
@ -514,14 +521,14 @@ func (conn *Connection) ReserveName(ctx context.Context,
// and also CSI journal pool, when only the VolumeID is passed in (e.g DeleteVolume/DeleteSnapshot,
// VolID during CreateSnapshot).
// Update UUID directory to store CSI request name
err = util.SetOMapKeyValue(ctx, conn.monitors, conn.cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
err = setOneOMapKey(ctx, conn, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiNameKey, reqName)
if err != nil {
return "", "", err
}
// Update UUID directory to store image name
err = util.SetOMapKeyValue(ctx, conn.monitors, conn.cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
err = setOneOMapKey(ctx, conn, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiImageKey, imageName)
if err != nil {
return "", "", err
@ -529,7 +536,7 @@ func (conn *Connection) ReserveName(ctx context.Context,
// Update UUID directory to store encryption values
if kmsConf != "" {
err = util.SetOMapKeyValue(ctx, conn.monitors, conn.cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
err = setOneOMapKey(ctx, conn, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.encryptKMSKey, kmsConf)
if err != nil {
return "", "", err
@ -542,7 +549,7 @@ func (conn *Connection) ReserveName(ctx context.Context,
journalPoolIDStr := hex.EncodeToString(buf64)
// Update UUID directory to store CSI journal pool name (prefer ID instead of name to be pool rename proof)
err = util.SetOMapKeyValue(ctx, conn.monitors, conn.cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
err = setOneOMapKey(ctx, conn, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiJournalPool, journalPoolIDStr)
if err != nil {
return "", "", err
@ -551,7 +558,7 @@ func (conn *Connection) ReserveName(ctx context.Context,
if snapSource {
// Update UUID directory to store source volume UUID in case of snapshots
err = util.SetOMapKeyValue(ctx, conn.monitors, conn.cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
err = setOneOMapKey(ctx, conn, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.cephSnapSourceKey, parentName)
if err != nil {
return "", "", err
@ -584,7 +591,7 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID
}
// TODO: fetch all omap vals in one call, than make multiple listomapvals
imageAttributes.RequestName, err = util.GetOMapValue(ctx, conn.monitors, conn.cr, pool, cj.namespace,
imageAttributes.RequestName, err = getOneOMapValue(ctx, conn, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey)
if err != nil {
return nil, err
@ -592,7 +599,7 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID
// image key was added at some point, so not all volumes will have this key set
// when ceph-csi was upgraded
imageAttributes.ImageName, err = util.GetOMapValue(ctx, conn.monitors, conn.cr, pool, cj.namespace,
imageAttributes.ImageName, err = getOneOMapValue(ctx, conn, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiImageKey)
if err != nil {
// if the key was not found, assume the default key + UUID
@ -610,7 +617,7 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID
}
}
imageAttributes.KmsID, err = util.GetOMapValue(ctx, conn.monitors, conn.cr, pool, cj.namespace,
imageAttributes.KmsID, err = getOneOMapValue(ctx, conn, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.encryptKMSKey)
if err != nil {
// ErrKeyNotFound means no encryption KMS was used
@ -622,7 +629,7 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID
}
}
journalPoolIDStr, err := util.GetOMapValue(ctx, conn.monitors, conn.cr, pool, cj.namespace,
journalPoolIDStr, err := getOneOMapValue(ctx, conn, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiJournalPool)
if err != nil {
if _, ok := err.(util.ErrKeyNotFound); !ok {
@ -639,7 +646,7 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID
}
if snapSource {
imageAttributes.SourceName, err = util.GetOMapValue(ctx, conn.monitors, conn.cr, pool, cj.namespace,
imageAttributes.SourceName, err = getOneOMapValue(ctx, conn, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey)
if err != nil {
return nil, err