mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-17 18:29:30 +00:00
journal: change omap get func to handle multiple keys at once
Taking this appraoch means that any function that must get more than one key's value from the same oid can be more efficient by calling out to ceph only once. To be cautious and avoid missing things we always request ceph return more keys than we actually expect to be set on the oid. If there are unexpected keys there, we will not miss the keys we want if we first hit an unexpected key if we were to limit ourselves to iterating only over the number of keys we're expecting to be on the object. Signed-off-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
parent
cd24bb3f5c
commit
0ac5f40d09
@ -25,14 +25,18 @@ import (
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
func getOneOMapValue(
|
||||
// listExcess is the number of false-positive key-value pairs we will
|
||||
// accept from ceph when getting omap values.
|
||||
var listExcess = 32
|
||||
|
||||
func getOMapValues(
|
||||
ctx context.Context,
|
||||
conn *Connection,
|
||||
poolName, namespace, oMapName, oMapKey string) (string, error) {
|
||||
poolName, namespace, oid, prefix string, keys []string) (map[string]string, error) {
|
||||
// fetch and configure the rados ioctx
|
||||
ioctx, err := conn.conn.GetIoctx(poolName)
|
||||
if err != nil {
|
||||
return "", omapPoolError(poolName, err)
|
||||
return nil, omapPoolError(poolName, err)
|
||||
}
|
||||
defer ioctx.Destroy()
|
||||
|
||||
@ -40,34 +44,37 @@ func getOneOMapValue(
|
||||
ioctx.SetNamespace(namespace)
|
||||
}
|
||||
|
||||
pairs, err := ioctx.GetOmapValues(
|
||||
oMapName, // oid (name of object)
|
||||
"", // startAfter (ignored)
|
||||
oMapKey, // filterPrefix - match only keys with this prefix
|
||||
1, // maxReturn - fetch no more than N values
|
||||
results := map[string]string{}
|
||||
// want is our "lookup map" that ensures O(1) checks for keys
|
||||
// while iterating, without needing to complicate the caller.
|
||||
want := make(map[string]bool, len(keys))
|
||||
for i := range keys {
|
||||
want[keys[i]] = true
|
||||
}
|
||||
|
||||
err = ioctx.ListOmapValues(
|
||||
oid, "", prefix, int64(len(want)+listExcess),
|
||||
func(key string, value []byte) {
|
||||
if want[key] {
|
||||
results[key] = string(value)
|
||||
}
|
||||
},
|
||||
)
|
||||
switch err {
|
||||
case nil:
|
||||
case rados.ErrNotFound:
|
||||
klog.Errorf(
|
||||
util.Log(ctx, "omap not found (pool=%q, namespace=%q, name=%q, key=%q): %v"),
|
||||
poolName, namespace, oMapName, oMapKey, err)
|
||||
return "", util.NewErrKeyNotFound(oMapKey, err)
|
||||
util.Log(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v"),
|
||||
poolName, namespace, oid, err)
|
||||
return nil, util.NewErrKeyNotFound(oid, err)
|
||||
default:
|
||||
return "", err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, found := pairs[oMapKey]
|
||||
if !found {
|
||||
klog.Errorf(
|
||||
util.Log(ctx, "key not found in omap (pool=%q, namespace=%q, name=%q, key=%q): %v"),
|
||||
poolName, namespace, oMapName, oMapKey, err)
|
||||
return "", util.NewErrKeyNotFound(oMapKey, nil)
|
||||
}
|
||||
klog.Infof(
|
||||
util.Log(ctx, "XXX key found in omap! (pool=%q, namespace=%q, name=%q, key=%q): %v"),
|
||||
poolName, namespace, oMapName, oMapKey, result)
|
||||
return string(result), nil
|
||||
klog.V(4).Infof(
|
||||
util.Log(ctx, "got omap values: (pool=%q, namespace=%q, name=%q): %+v"),
|
||||
poolName, namespace, oid, results)
|
||||
return results, nil
|
||||
}
|
||||
|
||||
func removeOneOMapKey(
|
||||
|
@ -144,6 +144,9 @@ type Config struct {
|
||||
|
||||
// encryptKMS in which encryption passphrase was saved, default is no encryption
|
||||
encryptKMSKey string
|
||||
|
||||
// commonPrefix is the prefix common to all omap keys for this Config
|
||||
commonPrefix string
|
||||
}
|
||||
|
||||
// NewCSIVolumeJournal returns an instance of CSIJournal for volumes
|
||||
@ -158,6 +161,7 @@ func NewCSIVolumeJournal(suffix string) *Config {
|
||||
cephSnapSourceKey: "",
|
||||
namespace: "",
|
||||
encryptKMSKey: "csi.volume.encryptKMS",
|
||||
commonPrefix: "csi.",
|
||||
}
|
||||
}
|
||||
|
||||
@ -173,6 +177,7 @@ func NewCSISnapshotJournal(suffix string) *Config {
|
||||
cephSnapSourceKey: "csi.source",
|
||||
namespace: "",
|
||||
encryptKMSKey: "csi.volume.encryptKMS",
|
||||
commonPrefix: "csi.",
|
||||
}
|
||||
}
|
||||
|
||||
@ -264,17 +269,27 @@ func (conn *Connection) CheckReservation(ctx context.Context,
|
||||
}
|
||||
|
||||
// check if request name is already part of the directory omap
|
||||
objUUIDAndPool, err := getOneOMapValue(ctx, conn, journalPool, cj.namespace, cj.csiDirectory,
|
||||
cj.csiNameKeyPrefix+reqName)
|
||||
if err != nil {
|
||||
// error should specifically be not found, for volume to be absent, any other error
|
||||
// is not conclusive, and we should not proceed
|
||||
switch err.(type) {
|
||||
case util.ErrKeyNotFound, util.ErrPoolNotFound:
|
||||
return nil, nil
|
||||
}
|
||||
fetchKeys := []string{
|
||||
cj.csiNameKeyPrefix + reqName,
|
||||
}
|
||||
values, err := getOMapValues(
|
||||
ctx, conn, journalPool, cj.namespace, cj.csiDirectory,
|
||||
cj.commonPrefix, fetchKeys)
|
||||
switch err.(type) {
|
||||
case nil:
|
||||
case util.ErrKeyNotFound, util.ErrPoolNotFound:
|
||||
// pool or omap (oid) was not present
|
||||
// stop processing but without an error for no reservation exists
|
||||
return nil, nil
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
objUUIDAndPool, found := values[cj.csiNameKeyPrefix+reqName]
|
||||
if !found {
|
||||
// oamp was read but was missing the desired key-value pair
|
||||
// stop processing but without an error for no reservation exists
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// check UUID only encoded value
|
||||
if len(objUUIDAndPool) == uuidEncodedLength {
|
||||
@ -590,26 +605,33 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO: fetch all omap vals in one call, than make multiple listomapvals
|
||||
imageAttributes.RequestName, err = getOneOMapValue(ctx, conn, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey)
|
||||
if err != nil {
|
||||
fetchKeys := []string{
|
||||
cj.csiNameKey,
|
||||
cj.csiImageKey,
|
||||
cj.encryptKMSKey,
|
||||
cj.csiJournalPool,
|
||||
cj.cephSnapSourceKey,
|
||||
}
|
||||
values, err := getOMapValues(
|
||||
ctx, conn, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID,
|
||||
cj.commonPrefix, fetchKeys)
|
||||
switch err.(type) {
|
||||
case nil:
|
||||
case util.ErrPoolNotFound, util.ErrKeyNotFound:
|
||||
klog.Warningf(util.Log(ctx, "unable to read omap keys: pool or key missing: %v"), err)
|
||||
default:
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// image key was added at some point, so not all volumes will have this key set
|
||||
// when ceph-csi was upgraded
|
||||
imageAttributes.ImageName, err = getOneOMapValue(ctx, conn, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiImageKey)
|
||||
if err != nil {
|
||||
// if the key was not found, assume the default key + UUID
|
||||
// otherwise return error
|
||||
switch err.(type) {
|
||||
default:
|
||||
return nil, err
|
||||
case util.ErrKeyNotFound, util.ErrPoolNotFound:
|
||||
}
|
||||
var found bool
|
||||
imageAttributes.RequestName = values[cj.csiNameKey]
|
||||
imageAttributes.KmsID = values[cj.encryptKMSKey]
|
||||
|
||||
// image key was added at a later point, so not all volumes will have this
|
||||
// key set when ceph-csi was upgraded
|
||||
imageAttributes.ImageName, found = values[cj.csiImageKey]
|
||||
if !found {
|
||||
// if the key was not found, assume the default key + UUID
|
||||
if snapSource {
|
||||
imageAttributes.ImageName = defaultSnapshotNamingPrefix + objectUUID
|
||||
} else {
|
||||
@ -617,24 +639,8 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID
|
||||
}
|
||||
}
|
||||
|
||||
imageAttributes.KmsID, err = getOneOMapValue(ctx, conn, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.encryptKMSKey)
|
||||
if err != nil {
|
||||
// ErrKeyNotFound means no encryption KMS was used
|
||||
switch err.(type) {
|
||||
default:
|
||||
return nil, fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s",
|
||||
pool, cj.cephUUIDDirectoryPrefix+objectUUID, err)
|
||||
case util.ErrKeyNotFound, util.ErrPoolNotFound:
|
||||
}
|
||||
}
|
||||
|
||||
journalPoolIDStr, err := getOneOMapValue(ctx, conn, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiJournalPool)
|
||||
if err != nil {
|
||||
if _, ok := err.(util.ErrKeyNotFound); !ok {
|
||||
return nil, err
|
||||
}
|
||||
journalPoolIDStr, found := values[cj.csiJournalPool]
|
||||
if !found {
|
||||
imageAttributes.JournalPoolID = util.InvalidPoolID
|
||||
} else {
|
||||
var buf64 []byte
|
||||
@ -646,10 +652,11 @@ func (conn *Connection) GetImageAttributes(ctx context.Context, pool, objectUUID
|
||||
}
|
||||
|
||||
if snapSource {
|
||||
imageAttributes.SourceName, err = getOneOMapValue(ctx, conn, pool, cj.namespace,
|
||||
cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
imageAttributes.SourceName, found = values[cj.cephSnapSourceKey]
|
||||
if !found {
|
||||
return nil, util.NewErrKeyNotFound(
|
||||
cj.cephSnapSourceKey,
|
||||
fmt.Errorf("no snap source in omap for %q", cj.cephUUIDDirectoryPrefix+objectUUID))
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user