mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-10 00:10:20 +00:00
journal: fix reading omaps from objects with large key counts
The implementation of getOMapValues assumed that the number of key-value pairs assigned to the object would be close to the number of keys being requested. When the number of keys on the object exceeded the "listExcess" value the function would fail to read additional keys even if they existed in the omap. This change sets a large fixed "chunk size" value and keeps reading key-value pairs as long as the callback gets called and increments the numKeys counter. Signed-off-by: John Mulligan <jmulligan@redhat.com>
This commit is contained in:
parent
7caca1137f
commit
8a41cd03a5
@ -25,9 +25,10 @@ import (
|
|||||||
"github.com/ceph/go-ceph/rados"
|
"github.com/ceph/go-ceph/rados"
|
||||||
)
|
)
|
||||||
|
|
||||||
// listExcess is the number of false-positive key-value pairs we will
|
// chunkSize is the number of key-value pairs that will be fetched in
|
||||||
// accept from ceph when getting omap values.
|
// one call. This is set fairly large to avoid calling into ceph APIs
|
||||||
var listExcess = 32
|
// over and over.
|
||||||
|
const chunkSize int64 = 512
|
||||||
|
|
||||||
func getOMapValues(
|
func getOMapValues(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
@ -51,15 +52,26 @@ func getOMapValues(
|
|||||||
for i := range keys {
|
for i := range keys {
|
||||||
want[keys[i]] = true
|
want[keys[i]] = true
|
||||||
}
|
}
|
||||||
|
numKeys := uint64(0)
|
||||||
|
startAfter := ""
|
||||||
|
for {
|
||||||
|
prevNumKeys := numKeys
|
||||||
|
err = ioctx.ListOmapValues(
|
||||||
|
oid, startAfter, prefix, chunkSize,
|
||||||
|
func(key string, value []byte) {
|
||||||
|
numKeys++
|
||||||
|
startAfter = key
|
||||||
|
if want[key] {
|
||||||
|
results[key] = string(value)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
)
|
||||||
|
// if we hit an error, or no new keys were seen, exit the loop
|
||||||
|
if err != nil || numKeys == prevNumKeys {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
err = ioctx.ListOmapValues(
|
|
||||||
oid, "", prefix, int64(len(want)+listExcess),
|
|
||||||
func(key string, value []byte) {
|
|
||||||
if want[key] {
|
|
||||||
results[key] = string(value)
|
|
||||||
}
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, rados.ErrNotFound) {
|
if errors.Is(err, rados.ErrNotFound) {
|
||||||
util.ErrorLog(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v",
|
util.ErrorLog(ctx, "omap not found (pool=%q, namespace=%q, name=%q): %v",
|
||||||
|
Loading…
Reference in New Issue
Block a user