util: added reference tracker

RT, reference tracker, is key-based implementation of a reference counter.
Unlike an integer-based counter, RT counts references by tracking unique
keys. This allows accounting in situations where idempotency must be
preserved. It guarantees there will be no duplicit increments or decrements
of the counter.

Signed-off-by: Robert Vasek <robert.vasek@cern.ch>
This commit is contained in:
Robert Vasek 2022-02-21 15:42:51 +01:00 committed by mergify[bot]
parent 8fb5739f21
commit f6ae612003
14 changed files with 2750 additions and 0 deletions

View File

@ -0,0 +1,85 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package errors
import (
goerrors "errors"
"fmt"
"github.com/ceph/go-ceph/rados"
"golang.org/x/sys/unix"
)
// ErrObjectOutOfDate is an error returned by RADOS read/write ops whose
// rados_*_op_assert_version failed.
var ErrObjectOutOfDate = goerrors.New("object is out of date since the last time it was read, try again later")
// UnexpectedReadSize formats an error message for a failure due to bad read
// size.
func UnexpectedReadSize(expectedBytes, actualBytes int) error {
return fmt.Errorf("unexpected size read: expected %d bytes, got %d",
expectedBytes, actualBytes)
}
// UnknownObjectVersion formats an error message for a failure due to unknown
// reftracker object version.
func UnknownObjectVersion(unknownVersion uint32) error {
return fmt.Errorf("unknown reftracker version %d", unknownVersion)
}
// FailedObjectRead formats an error message for a failed RADOS read op.
func FailedObjectRead(cause error) error {
if cause != nil {
return fmt.Errorf("failed to read object: %w", TryRADOSAborted(cause))
}
return nil
}
// FailedObjectRead formats an error message for a failed RADOS read op.
func FailedObjectWrite(cause error) error {
if cause != nil {
return fmt.Errorf("failed to write object: %w", TryRADOSAborted(cause))
}
return nil
}
// TryRADOSAborted tries to extract rados_*_op_assert_version from opErr.
func TryRADOSAborted(opErr error) error {
if opErr == nil {
return nil
}
var radosOpErr rados.OperationError
if !goerrors.As(opErr, &radosOpErr) {
return opErr
}
// nolint:errorlint // Can't use errors.As() because rados.radosError is private.
errnoErr, ok := radosOpErr.OpError.(interface{ ErrorCode() int })
if !ok {
return opErr
}
errno := errnoErr.ErrorCode()
if errno == -int(unix.EOVERFLOW) || errno == -int(unix.ERANGE) {
return ErrObjectOutOfDate
}
return nil
}

View File

@ -0,0 +1,551 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package radoswrapper
import (
"fmt"
"github.com/ceph/go-ceph/rados"
"golang.org/x/sys/unix"
)
type (
FakeObj struct {
Oid string
Ver uint64
Xattrs map[string][]byte
Omap map[string][]byte
Data []byte
}
FakeRados struct {
Objs map[string]*FakeObj
}
FakeIOContext struct {
LastObjVersion uint64
Rados *FakeRados
}
FakeWriteOp struct {
IoCtx *FakeIOContext
steps map[fakeWriteOpStepExecutorIdx]fakeWriteOpStepExecutor
oid string
}
FakeReadOp struct {
IoCtx *FakeIOContext
steps map[fakeReadOpStepExecutorIdx]fakeReadOpStepExecutor
oid string
}
fakeWriteOpStepExecutorIdx int
fakeReadOpStepExecutorIdx int
fakeWriteOpStepExecutor interface {
operate(w *FakeWriteOp) error
}
fakeReadOpStepExecutor interface {
operate(r *FakeReadOp) error
}
fakeRadosError int
)
const (
fakeWriteOpAssertVersionExecutorIdx fakeWriteOpStepExecutorIdx = iota
fakeWriteOpRemoveExecutorIdx
fakeWriteOpCreateExecutorIdx
fakeWriteOpSetXattrExecutorIdx
fakeWriteOpWriteFullExecutorIdx
fakeWriteOpRmOmapKeysExecutorIdx
fakeWriteOpSetOmapExecutorIdx
fakeReadOpAssertVersionExecutorIdx fakeReadOpStepExecutorIdx = iota
fakeReadOpReadExecutorIdx
fakeReadOpGetOmapValuesByKeysExecutorIdx
)
var (
_ IOContextW = &FakeIOContext{}
// fakeWriteOpStepExecutorOrder defines fixed order in which the write ops are performed.
fakeWriteOpStepExecutorOrder = []fakeWriteOpStepExecutorIdx{
fakeWriteOpAssertVersionExecutorIdx,
fakeWriteOpRemoveExecutorIdx,
fakeWriteOpCreateExecutorIdx,
fakeWriteOpSetXattrExecutorIdx,
fakeWriteOpWriteFullExecutorIdx,
fakeWriteOpRmOmapKeysExecutorIdx,
fakeWriteOpSetOmapExecutorIdx,
}
// fakeReadOpStepExecutorOrder defines fixed order in which the read ops are performed.
fakeReadOpStepExecutorOrder = []fakeReadOpStepExecutorIdx{
fakeReadOpAssertVersionExecutorIdx,
fakeReadOpReadExecutorIdx,
fakeReadOpGetOmapValuesByKeysExecutorIdx,
}
)
func NewFakeRados() *FakeRados {
return &FakeRados{
Objs: make(map[string]*FakeObj),
}
}
func NewFakeIOContext(fakeRados *FakeRados) *FakeIOContext {
return &FakeIOContext{
Rados: fakeRados,
}
}
func (e fakeRadosError) Error() string {
return fmt.Sprintf("FakeRados errno=%d", int(e))
}
func (e fakeRadosError) ErrorCode() int {
return int(e)
}
func (o *FakeObj) String() string {
return fmt.Sprintf("%s{Ver=%d, Xattrs(%d)=%+v, OMap(%d)=%+v, Data(%d)=%+v}",
o.Oid, o.Ver, len(o.Xattrs), o.Xattrs, len(o.Omap), o.Omap, len(o.Data), o.Data)
}
func (c *FakeIOContext) GetLastVersion() (uint64, error) {
return c.LastObjVersion, nil
}
func (c *FakeIOContext) getObj(oid string) (*FakeObj, error) {
obj, ok := c.Rados.Objs[oid]
if !ok {
return nil, rados.ErrNotFound
}
return obj, nil
}
func (c *FakeIOContext) GetXattr(oid, key string, data []byte) (int, error) {
obj, ok := c.Rados.Objs[oid]
if !ok {
return 0, rados.ErrNotFound
}
xattr, ok := obj.Xattrs[key]
if !ok {
return 0, fakeRadosError(-int(unix.ENODATA))
}
copy(data, xattr)
return len(xattr), nil
}
func (c *FakeIOContext) CreateWriteOp() WriteOpW {
return &FakeWriteOp{
IoCtx: c,
steps: make(map[fakeWriteOpStepExecutorIdx]fakeWriteOpStepExecutor),
}
}
func (w *FakeWriteOp) Operate(oid string) error {
if len(w.steps) == 0 {
return nil
}
w.oid = oid
for _, writeOpExecutorIdx := range fakeWriteOpStepExecutorOrder {
e, ok := w.steps[writeOpExecutorIdx]
if !ok {
continue
}
if err := e.operate(w); err != nil {
return err
}
}
if obj, err := w.IoCtx.getObj(oid); err == nil {
obj.Ver++
w.IoCtx.LastObjVersion = obj.Ver
}
return nil
}
func (w *FakeWriteOp) Release() {}
func (c *FakeIOContext) CreateReadOp() ReadOpW {
return &FakeReadOp{
IoCtx: c,
steps: make(map[fakeReadOpStepExecutorIdx]fakeReadOpStepExecutor),
}
}
func (r *FakeReadOp) Operate(oid string) error {
r.oid = oid
for _, readOpExecutorIdx := range fakeReadOpStepExecutorOrder {
e, ok := r.steps[readOpExecutorIdx]
if !ok {
continue
}
if err := e.operate(r); err != nil {
return err
}
}
if obj, err := r.IoCtx.getObj(oid); err == nil {
r.IoCtx.LastObjVersion = obj.Ver
}
return nil
}
func (r *FakeReadOp) Release() {}
// WriteOp Create
type fakeWriteOpCreateExecutor struct {
exclusive rados.CreateOption
}
func (e *fakeWriteOpCreateExecutor) operate(w *FakeWriteOp) error {
if e.exclusive == rados.CreateExclusive {
if _, exists := w.IoCtx.Rados.Objs[w.oid]; exists {
return rados.ErrObjectExists
}
}
w.IoCtx.Rados.Objs[w.oid] = &FakeObj{
Oid: w.oid,
Omap: make(map[string][]byte),
Xattrs: make(map[string][]byte),
}
return nil
}
func (w *FakeWriteOp) Create(exclusive rados.CreateOption) {
w.steps[fakeWriteOpCreateExecutorIdx] = &fakeWriteOpCreateExecutor{
exclusive: exclusive,
}
}
// WriteOp Remove
type fakeWriteOpRemoveExecutor struct{}
func (e *fakeWriteOpRemoveExecutor) operate(w *FakeWriteOp) error {
if _, err := w.IoCtx.getObj(w.oid); err != nil {
return err
}
delete(w.IoCtx.Rados.Objs, w.oid)
return nil
}
func (w *FakeWriteOp) Remove() {
w.steps[fakeWriteOpRemoveExecutorIdx] = &fakeWriteOpRemoveExecutor{}
}
// WriteOp SetXattr
type fakeWriteOpSetXattrExecutor struct {
name string
value []byte
}
func (e *fakeWriteOpSetXattrExecutor) operate(w *FakeWriteOp) error {
obj, err := w.IoCtx.getObj(w.oid)
if err != nil {
return err
}
obj.Xattrs[e.name] = e.value
return nil
}
func (w *FakeWriteOp) SetXattr(name string, value []byte) {
valueCopy := append([]byte(nil), value...)
w.steps[fakeWriteOpSetXattrExecutorIdx] = &fakeWriteOpSetXattrExecutor{
name: name,
value: valueCopy,
}
}
// WriteOp WriteFull
type fakeWriteOpWriteFullExecutor struct {
data []byte
}
func (e *fakeWriteOpWriteFullExecutor) operate(w *FakeWriteOp) error {
obj, err := w.IoCtx.getObj(w.oid)
if err != nil {
return err
}
obj.Data = e.data
return nil
}
func (w *FakeWriteOp) WriteFull(b []byte) {
bCopy := append([]byte(nil), b...)
w.steps[fakeWriteOpWriteFullExecutorIdx] = &fakeWriteOpWriteFullExecutor{
data: bCopy,
}
}
// WriteOp SetOmap
type fakeWriteOpSetOmapExecutor struct {
pairs map[string][]byte
}
func (e *fakeWriteOpSetOmapExecutor) operate(w *FakeWriteOp) error {
obj, err := w.IoCtx.getObj(w.oid)
if err != nil {
return err
}
for k, v := range e.pairs {
obj.Omap[k] = v
}
return nil
}
func (w *FakeWriteOp) SetOmap(pairs map[string][]byte) {
pairsCopy := make(map[string][]byte, len(pairs))
for k, v := range pairs {
vCopy := append([]byte(nil), v...)
pairsCopy[k] = vCopy
}
w.steps[fakeWriteOpSetOmapExecutorIdx] = &fakeWriteOpSetOmapExecutor{
pairs: pairsCopy,
}
}
// WriteOp RmOmapKeys
type fakeWriteOpRmOmapKeysExecutor struct {
keys []string
}
func (e *fakeWriteOpRmOmapKeysExecutor) operate(w *FakeWriteOp) error {
obj, err := w.IoCtx.getObj(w.oid)
if err != nil {
return err
}
for _, k := range e.keys {
delete(obj.Omap, k)
}
return nil
}
func (w *FakeWriteOp) RmOmapKeys(keys []string) {
keysCopy := append([]string(nil), keys...)
w.steps[fakeWriteOpRmOmapKeysExecutorIdx] = &fakeWriteOpRmOmapKeysExecutor{
keys: keysCopy,
}
}
// WriteOp AssertVersion
type fakeWriteOpAssertVersionExecutor struct {
version uint64
}
func (e *fakeWriteOpAssertVersionExecutor) operate(w *FakeWriteOp) error {
obj, err := w.IoCtx.getObj(w.oid)
if err != nil {
return err
}
return validateObjVersion(obj.Ver, e.version)
}
func (w *FakeWriteOp) AssertVersion(v uint64) {
w.steps[fakeWriteOpAssertVersionExecutorIdx] = &fakeWriteOpAssertVersionExecutor{
version: v,
}
}
// ReadOp Read
type fakeReadOpReadExecutor struct {
offset int
buffer []byte
step *rados.ReadOpReadStep
}
func (e *fakeReadOpReadExecutor) operate(r *FakeReadOp) error {
obj, err := r.IoCtx.getObj(r.oid)
if err != nil {
return err
}
if e.offset > len(obj.Data) {
// RADOS just returns zero bytes read.
return nil
}
end := e.offset + len(e.buffer)
if end > len(obj.Data) {
end = len(obj.Data)
}
nbytes := end - e.offset
e.step.BytesRead = int64(nbytes)
copy(e.buffer, obj.Data[e.offset:])
return nil
}
func (r *FakeReadOp) Read(offset uint64, buffer []byte) *rados.ReadOpReadStep {
s := &rados.ReadOpReadStep{}
r.steps[fakeReadOpReadExecutorIdx] = &fakeReadOpReadExecutor{
offset: int(offset),
buffer: buffer,
step: s,
}
return s
}
// ReadOp GetOmapValuesByKeys
type (
fakeReadOpGetOmapValuesByKeysExecutor struct {
keys []string
step *FakeReadOpOmapGetValsByKeysStep
}
FakeReadOpOmapGetValsByKeysStep struct {
pairs []rados.OmapKeyValue
idx int
canIterate bool
}
)
func (e *fakeReadOpGetOmapValuesByKeysExecutor) operate(r *FakeReadOp) error {
obj, err := r.IoCtx.getObj(r.oid)
if err != nil {
return err
}
var pairs []rados.OmapKeyValue
for _, key := range e.keys {
val, ok := obj.Omap[key]
if !ok {
continue
}
pairs = append(pairs, rados.OmapKeyValue{
Key: key,
Value: val,
})
}
e.step.pairs = pairs
e.step.canIterate = true
return nil
}
func (s *FakeReadOpOmapGetValsByKeysStep) Next() (*rados.OmapKeyValue, error) {
if !s.canIterate {
return nil, rados.ErrOperationIncomplete
}
if s.idx >= len(s.pairs) {
return nil, nil
}
omapKeyValue := &s.pairs[s.idx]
s.idx++
return omapKeyValue, nil
}
func (r *FakeReadOp) GetOmapValuesByKeys(keys []string) ReadOpOmapGetValsByKeysStepW {
keysCopy := append([]string(nil), keys...)
s := &FakeReadOpOmapGetValsByKeysStep{}
r.steps[fakeReadOpGetOmapValuesByKeysExecutorIdx] = &fakeReadOpGetOmapValuesByKeysExecutor{
keys: keysCopy,
step: s,
}
return s
}
// ReadOp AssertVersion
type fakeReadOpAssertVersionExecutor struct {
version uint64
}
func (e *fakeReadOpAssertVersionExecutor) operate(r *FakeReadOp) error {
obj, err := r.IoCtx.getObj(r.oid)
if err != nil {
return err
}
return validateObjVersion(obj.Ver, e.version)
}
func (r *FakeReadOp) AssertVersion(v uint64) {
r.steps[fakeReadOpAssertVersionExecutorIdx] = &fakeReadOpAssertVersionExecutor{
version: v,
}
}
func validateObjVersion(expected, actual uint64) error {
// See librados docs for returning error codes in rados_*_op_assert_version:
// https://docs.ceph.com/en/latest/rados/api/librados/?#c.rados_write_op_assert_version
// https://docs.ceph.com/en/latest/rados/api/librados/?#c.rados_read_op_assert_version
if expected > actual {
return rados.OperationError{
OpError: fakeRadosError(-int(unix.ERANGE)),
}
}
if expected < actual {
return rados.OperationError{
OpError: fakeRadosError(-int(unix.EOVERFLOW)),
}
}
return nil
}

View File

@ -0,0 +1,106 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package radoswrapper
import (
"github.com/ceph/go-ceph/rados"
)
// These interfaces are just wrappers around some of go-ceph's rados pkg
// structures and functions. They have two implementations: the "real" one
// (that simply uses go-ceph), and a fake one, used in unit tests.
// IOContextW is a wrapper around rados.IOContext.
type IOContextW interface {
// GetLastVersion will return the version number of the last object read or
// written to.
GetLastVersion() (uint64, error)
// GetXattr gets an xattr with key `name`, it returns the length of
// the key read or an error if not successful
GetXattr(oid string, key string, data []byte) (int, error)
// CreateWriteOp returns a newly constructed write operation.
CreateWriteOp() WriteOpW
// CreateReadOp returns a newly constructed read operation.
CreateReadOp() ReadOpW
}
// WriteOpW is a wrapper around rados.WriteOp interface.
type WriteOpW interface {
// Create a rados object.
Create(exclusive rados.CreateOption)
// Remove object.
Remove()
// SetXattr sets an xattr.
SetXattr(name string, value []byte)
// WriteFull writes a given byte slice as the whole object,
// atomically replacing it.
WriteFull(b []byte)
// SetOmap appends the map `pairs` to the omap `oid`.
SetOmap(pairs map[string][]byte)
// RmOmapKeys removes the specified `keys` from the omap `oid`.
RmOmapKeys(keys []string)
// AssertVersion ensures that the object exists and that its internal version
// number is equal to "ver" before writing. "ver" should be a version number
// previously obtained with IOContext.GetLastVersion().
AssertVersion(ver uint64)
// Operate will perform the operation(s).
Operate(oid string) error
// Release the resources associated with this write operation.
Release()
}
// ReadOpW is a wrapper around rados.ReadOp.
type ReadOpW interface {
// Read bytes from offset into buffer.
// len(buffer) is the maximum number of bytes read from the object.
// buffer[:ReadOpReadStep.BytesRead] then contains object data.
Read(offset uint64, buffer []byte) *rados.ReadOpReadStep
// GetOmapValuesByKeys starts iterating over specific key/value pairs.
GetOmapValuesByKeys(keys []string) ReadOpOmapGetValsByKeysStepW
// AssertVersion ensures that the object exists and that its internal version
// number is equal to "ver" before reading. "ver" should be a version number
// previously obtained with IOContext.GetLastVersion().
AssertVersion(ver uint64)
// Operate will perform the operation(s).
Operate(oid string) error
// Release the resources associated with this read operation.
Release()
}
// ReadOpOmapGetValsByKeysStepW is a wrapper around rados.ReadOpOmapGetValsByKeysStep.
type ReadOpOmapGetValsByKeysStepW interface {
// Next gets the next omap key/value pair referenced by
// ReadOpOmapGetValsByKeysStep's internal iterator.
// If there are no more elements to retrieve, (nil, nil) is returned.
// May be called only after Operate() finished.
Next() (*rados.OmapKeyValue, error)
}

View File

@ -0,0 +1,133 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package radoswrapper
import (
"github.com/ceph/go-ceph/rados"
)
type (
IOContext struct {
*rados.IOContext
}
WriteOp struct {
IoCtx *rados.IOContext
*rados.WriteOp
}
ReadOp struct {
IoCtx *rados.IOContext
*rados.ReadOp
}
ReadOpOmapGetValsByKeysStep struct {
*rados.ReadOpOmapGetValsByKeysStep
}
)
var _ IOContextW = &IOContext{}
func NewIOContext(ioctx *rados.IOContext) IOContextW {
return &IOContext{
IOContext: ioctx,
}
}
func (c *IOContext) GetLastVersion() (uint64, error) {
return c.IOContext.GetLastVersion()
}
func (c *IOContext) GetXattr(oid, key string, data []byte) (int, error) {
return c.IOContext.GetXattr(oid, key, data)
}
func (c *IOContext) CreateWriteOp() WriteOpW {
return &WriteOp{
IoCtx: c.IOContext,
WriteOp: rados.CreateWriteOp(),
}
}
func (c *IOContext) CreateReadOp() ReadOpW {
return &ReadOp{
IoCtx: c.IOContext,
ReadOp: rados.CreateReadOp(),
}
}
func (w *WriteOp) Create(exclusive rados.CreateOption) {
w.WriteOp.Create(exclusive)
}
func (w *WriteOp) Remove() {
w.WriteOp.Remove()
}
func (w *WriteOp) SetXattr(name string, value []byte) {
w.WriteOp.SetXattr(name, value)
}
func (w *WriteOp) WriteFull(b []byte) {
w.WriteOp.WriteFull(b)
}
func (w *WriteOp) SetOmap(pairs map[string][]byte) {
w.WriteOp.SetOmap(pairs)
}
func (w *WriteOp) RmOmapKeys(keys []string) {
w.WriteOp.RmOmapKeys(keys)
}
func (w *WriteOp) AssertVersion(v uint64) {
w.WriteOp.AssertVersion(v)
}
func (w *WriteOp) Operate(oid string) error {
return w.WriteOp.Operate(w.IoCtx, oid, rados.OperationNoFlag)
}
func (w *WriteOp) Release() {
w.WriteOp.Release()
}
func (r *ReadOp) Read(offset uint64, buffer []byte) *rados.ReadOpReadStep {
return r.ReadOp.Read(offset, buffer)
}
func (r *ReadOp) GetOmapValuesByKeys(keys []string) ReadOpOmapGetValsByKeysStepW {
return &ReadOpOmapGetValsByKeysStep{
ReadOpOmapGetValsByKeysStep: r.ReadOp.GetOmapValuesByKeys(keys),
}
}
func (r *ReadOp) AssertVersion(v uint64) {
r.ReadOp.AssertVersion(v)
}
func (r *ReadOp) Operate(oid string) error {
return r.ReadOp.Operate(r.IoCtx, oid, rados.OperationNoFlag)
}
func (r *ReadOp) Release() {
r.ReadOp.Release()
}
func (s *ReadOpOmapGetValsByKeysStep) Next() (*rados.OmapKeyValue, error) {
return s.ReadOpOmapGetValsByKeysStep.Next()
}

View File

@ -0,0 +1,248 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reftracker
import (
goerrors "errors"
"fmt"
"github.com/ceph/ceph-csi/internal/util/reftracker/errors"
"github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper"
"github.com/ceph/ceph-csi/internal/util/reftracker/reftype"
v1 "github.com/ceph/ceph-csi/internal/util/reftracker/v1"
"github.com/ceph/ceph-csi/internal/util/reftracker/version"
"github.com/ceph/go-ceph/rados"
)
// reftracker is key-based implementation of a reference counter.
//
// Unlike integer-based counter, reftracker counts references by tracking
// unique keys. This allows accounting in situations where idempotency must be
// preserved. It guarantees there will be no duplicit increments or decrements
// of the counter.
//
// It is stored persistently as a RADOS object, and is safe to use with
// multiple concurrent writers, and across different nodes of a cluster.
//
// Example:
//
// created, err := Add(
// ioctx,
// "my-reftracker",
// map[string]struct{}{
// "ref-key-1": {},
// "ref-key-2": {},
// },
// )
//
// Since this is a new reftracker object, `created` is `true`.
//
// "my-reftracker" now holds:
// ["ref-key-1":reftype.Normal, "ref-key-2":reftype.Normal]
// The reference count is 2.
//
// created, err := Add(
// ioctx,
// "my-reftracker",
// map[string]struct{}{
// "ref-key-1": {},
// "ref-key-2": {},
// "ref-key-3": {},
// },
// )
//
// Reftracker named "my-reftracker" already exists, so `created` is now
// `false`. Since "ref-key-1" and "ref-key-2" keys are already tracked,
// only "ref-key-3" is added.
//
// "my-reftracker" now holds:
// ["ref-key-1":reftype.Normal, "ref-key-2":reftype.Normal,
// "ref-key-3":reftype.Normal]
// The reference count is 3.
//
// deleted, err := Remove(
// ioctx,
// "my-reftracker",
// map[string]reftype.RefType{
// "ref-key-1": reftype.Normal,
// "ref-key-2": reftype.Mask,
// },
// )
//
// "my-reftracker" now holds:
// ["ref-key-2":reftype.Mask, "ref-key-3":reftype.Normal]
// The reference count is 1.
//
// Since the reference count is greater than zero, `deleted` is `false`.
// "ref-key-1" was removed, and so is not listed among tracked references.
// "ref-key-2" was only masked, so it's been kept. However, masked references
// don't contribute to overall reference count, so the resulting refcount
// after this Remove() call is 1.
//
// created, err := Add(
// ioctx,
// "my-reftracker",
// map[string]struct{}{
// "ref-key-2": {},
// },
// )
//
// "my-reftracker" now holds:
// ["ref-key-2":reftype.Mask, "ref-key-3":reftype.Normal]
// The reference count is 1.
//
// "ref-key-2" is already tracked, so it will not be added again. Since it
// remains masked, it won't contribute to the reference count.
//
// deleted, err := Remove(
// ioctx,
// "my-reftracker",
// map[string]reftype.RefType{
// "ref-key-3": reftype.Normal,
// },
// )
//
// "ref-key-3" was the only tracked key that contributed to reference count.
// After this Remove() call it's now removed. As a result, the reference count
// dropped down to zero, and the whole object has been deleted too.
// `deleted` is `true`.
// Add atomically adds references to `rtName` reference tracker.
// If the reftracker object doesn't exist yet, it is created and `true` is
// returned. If some keys in `refs` map are already tracked by this reftracker
// object, they will not be added again.
func Add(
ioctx radoswrapper.IOContextW,
rtName string,
refs map[string]struct{},
) (bool, error) {
if err := validateAddInput(rtName, refs); err != nil {
return false, err
}
// Read reftracker version.
rtVer, err := version.Read(ioctx, rtName)
if err != nil {
if goerrors.Is(err, rados.ErrNotFound) {
// This is a new reftracker. Initialize it with `refs`.
if err = v1.Init(ioctx, rtName, refs); err != nil {
return false, fmt.Errorf("failed to initialize reftracker: %w", err)
}
return true, nil
}
return false, fmt.Errorf("failed to read reftracker version: %w", err)
}
// Add references to reftracker object.
gen, err := ioctx.GetLastVersion()
if err != nil {
return false, fmt.Errorf("failed to get RADOS object version: %w", err)
}
switch rtVer {
case v1.Version:
err = v1.Add(ioctx, rtName, gen, refs)
if err != nil {
err = fmt.Errorf("failed to add refs: %w", err)
}
default:
err = errors.UnknownObjectVersion(rtVer)
}
return false, err
}
// Remove atomically removes references from `rtName` reference tracker.
// If the reftracker object holds no references after this removal, the whole
// object is deleted too, and `true` is returned. If the reftracker object
// doesn't exist, (true, nil) is returned.
func Remove(
ioctx radoswrapper.IOContextW,
rtName string,
refs map[string]reftype.RefType,
) (bool, error) {
if err := validateRemoveInput(rtName, refs); err != nil {
return false, err
}
// Read reftracker version.
rtVer, err := version.Read(ioctx, rtName)
if err != nil {
if goerrors.Is(err, rados.ErrNotFound) {
// This reftracker doesn't exist. Assume it was already deleted.
return true, nil
}
return false, fmt.Errorf("failed to read reftracker version: %w", err)
}
// Remove references from reftracker.
gen, err := ioctx.GetLastVersion()
if err != nil {
return false, fmt.Errorf("failed to get RADOS object version: %w", err)
}
var deleted bool
switch rtVer {
case v1.Version:
deleted, err = v1.Remove(ioctx, rtName, gen, refs)
if err != nil {
err = fmt.Errorf("failed to remove refs: %w", err)
}
default:
err = errors.UnknownObjectVersion(rtVer)
}
return deleted, err
}
var (
errNoRTName = goerrors.New("missing reftracker name")
errNoRefs = goerrors.New("missing refs")
)
func validateAddInput(rtName string, refs map[string]struct{}) error {
if rtName == "" {
return errNoRTName
}
if len(refs) == 0 {
return errNoRefs
}
return nil
}
func validateRemoveInput(rtName string, refs map[string]reftype.RefType) error {
if rtName == "" {
return errNoRTName
}
if len(refs) == 0 {
return errNoRefs
}
return nil
}

View File

@ -0,0 +1,491 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reftracker
import (
"testing"
"github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper"
"github.com/ceph/ceph-csi/internal/util/reftracker/reftype"
"github.com/stretchr/testify/assert"
)
const rtName = "hello-rt"
func TestRTAdd(t *testing.T) {
t.Parallel()
// Verify input validation for reftracker name.
t.Run("AddNoName", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, "", nil)
assert.Error(ts, err)
assert.False(ts, created)
})
// Verify input validation for nil and empty refs.
t.Run("AddNoRefs", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
refs := []map[string]struct{}{
nil,
make(map[string]struct{}),
}
for _, ref := range refs {
created, err := Add(ioctx, rtName, ref)
assert.Error(ts, err)
assert.False(ts, created)
}
})
// Add multiple refs in a single Add().
t.Run("AddBulk", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
"ref3": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
})
// Add refs where each Add() has some of the refs overlapping
// with the previous call.
t.Run("AddOverlapping", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
refsTable := []map[string]struct{}{
{"ref2": {}, "ref3": {}},
{"ref3": {}, "ref4": {}},
{"ref4": {}, "ref5": {}},
}
for _, refs := range refsTable {
created, err = Add(ioctx, rtName, refs)
assert.NoError(ts, err)
assert.False(ts, created)
}
})
}
func TestRTRemove(t *testing.T) {
t.Parallel()
// Verify input validation for nil and empty refs.
t.Run("RemoveNoRefs", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
refs := []map[string]reftype.RefType{
nil,
make(map[string]reftype.RefType),
}
for _, ref := range refs {
created, err := Remove(ioctx, rtName, ref)
assert.Error(ts, err)
assert.False(ts, created)
}
})
// Attempt to remove refs in a non-existent reftracker object should result
// in success, with deleted=true,err=nil.
t.Run("RemoveNotExists", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
deleted, err := Remove(ioctx, "xxx", map[string]reftype.RefType{
"ref1": reftype.Normal,
})
assert.NoError(ts, err)
assert.True(ts, deleted)
})
// Removing only non-existent refs should not result in reftracker object
// deletion.
t.Run("RemoveNonExistentRefs", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
"ref3": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{
"refX": reftype.Normal,
"refY": reftype.Normal,
"refZ": reftype.Normal,
})
assert.NoError(ts, err)
assert.False(ts, deleted)
})
// Removing all refs plus some surplus should result in reftracker object
// deletion.
t.Run("RemoveNonExistentRefs", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{
"refX": reftype.Normal,
"refY": reftype.Normal,
"ref": reftype.Normal,
"refZ": reftype.Normal,
})
assert.NoError(ts, err)
assert.True(ts, deleted)
})
// Bulk removal of all refs should result in reftracker object deletion.
t.Run("RemoveBulk", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
keys := []string{"ref1", "ref2", "ref3"}
refsToAdd := make(map[string]struct{})
refsToRemove := make(map[string]reftype.RefType)
for _, k := range keys {
refsToAdd[k] = struct{}{}
refsToRemove[k] = reftype.Normal
}
created, err := Add(ioctx, rtName, refsToAdd)
assert.NoError(ts, err)
assert.True(ts, created)
deleted, err := Remove(ioctx, rtName, refsToRemove)
assert.NoError(ts, err)
assert.True(ts, deleted)
})
// Removal of all refs one-by-one should result in reftracker object deletion
// in the last Remove() call.
t.Run("RemoveSingle", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
"ref3": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
for _, k := range []string{"ref3", "ref2"} {
deleted, errRemove := Remove(ioctx, rtName, map[string]reftype.RefType{
k: reftype.Normal,
})
assert.NoError(ts, errRemove)
assert.False(ts, deleted)
}
// Remove the last reference. It should remove the whole reftracker object too.
deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{
"ref1": reftype.Normal,
})
assert.NoError(ts, err)
assert.True(ts, deleted)
})
// Cycle through reftracker object twice.
t.Run("AddRemoveAddRemove", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
refsToAdd := map[string]struct{}{
"ref1": {},
"ref2": {},
"ref3": {},
}
refsToRemove := map[string]reftype.RefType{
"ref1": reftype.Normal,
"ref2": reftype.Normal,
"ref3": reftype.Normal,
}
for i := 0; i < 2; i++ {
created, err := Add(ioctx, rtName, refsToAdd)
assert.NoError(ts, err)
assert.True(ts, created)
deleted, err := Remove(ioctx, rtName, refsToRemove)
assert.NoError(ts, err)
assert.True(ts, deleted)
}
})
// Check for respecting idempotency by making multiple additions with overlapping keys
// and removing only ref keys that were distinct.
t.Run("AddOverlappingRemoveBulk", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
})
assert.True(ts, created)
assert.NoError(ts, err)
refsTable := []map[string]struct{}{
{"ref2": {}, "ref3": {}},
{"ref3": {}, "ref4": {}},
{"ref4": {}, "ref5": {}},
}
for _, refs := range refsTable {
created, err = Add(ioctx, rtName, refs)
assert.False(ts, created)
assert.NoError(ts, err)
}
deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{
"ref1": reftype.Normal,
"ref2": reftype.Normal,
"ref3": reftype.Normal,
"ref4": reftype.Normal,
"ref5": reftype.Normal,
})
assert.NoError(ts, err)
assert.True(ts, deleted)
})
}
func TestRTMask(t *testing.T) {
t.Parallel()
// Bulk masking all refs should result in reftracker object deletion.
t.Run("MaskAllBulk", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
keys := []string{"ref1", "ref2", "ref3"}
refsToAdd := make(map[string]struct{})
refsToRemove := make(map[string]reftype.RefType)
for _, k := range keys {
refsToAdd[k] = struct{}{}
refsToRemove[k] = reftype.Mask
}
created, err := Add(ioctx, rtName, refsToAdd)
assert.NoError(ts, err)
assert.True(ts, created)
deleted, err := Remove(ioctx, rtName, refsToRemove)
assert.NoError(ts, err)
assert.True(ts, deleted)
})
// Masking all refs one-by-one should result in reftracker object deletion in
// the last Remove() call.
t.Run("RemoveSingle", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
"ref3": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
for _, k := range []string{"ref3", "ref2"} {
deleted, errRemove := Remove(ioctx, rtName, map[string]reftype.RefType{
k: reftype.Mask,
})
assert.NoError(ts, errRemove)
assert.False(ts, deleted)
}
// Remove the last reference. It should delete the whole reftracker object
// too.
deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{
"ref1": reftype.Mask,
})
assert.NoError(ts, err)
assert.True(ts, deleted)
})
// Bulk removing two (out of 3) refs and then masking the ref that's left
// should result in reftracker object deletion in the last Remove() call.
t.Run("RemoveBulkMaskSingle", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
"ref3": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{
"ref1": reftype.Normal,
"ref2": reftype.Normal,
})
assert.NoError(ts, err)
assert.False(ts, deleted)
deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{
"ref3": reftype.Mask,
})
assert.NoError(ts, err)
assert.True(ts, deleted)
})
// Bulk masking two (out of 3) refs and then removing the ref that's left
// should result in reftracker object deletion in the last Remove() call.
t.Run("MaskSingleRemoveBulk", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
"ref3": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{
"ref1": reftype.Mask,
"ref2": reftype.Mask,
})
assert.NoError(ts, err)
assert.False(ts, deleted)
deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{
"ref3": reftype.Normal,
})
assert.NoError(ts, err)
assert.True(ts, deleted)
})
// Verify that masking refs hides them from future Add()s.
t.Run("MaskAndAdd", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
"ref3": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{
"ref1": reftype.Mask,
"ref2": reftype.Mask,
})
assert.NoError(ts, err)
assert.False(ts, deleted)
created, err = Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
})
assert.NoError(ts, err)
assert.False(ts, created)
deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{
"ref3": reftype.Normal,
})
assert.NoError(ts, err)
assert.True(ts, deleted)
})
// Verify that masked refs may be removed with reftype.Normal and re-added.
t.Run("MaskRemoveAdd", func(ts *testing.T) {
ts.Parallel()
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
created, err := Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
"ref3": {},
})
assert.NoError(ts, err)
assert.True(ts, created)
deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{
"ref1": reftype.Mask,
"ref2": reftype.Mask,
})
assert.NoError(ts, err)
assert.False(ts, deleted)
deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{
"ref1": reftype.Normal,
"ref2": reftype.Normal,
})
assert.NoError(ts, err)
assert.False(ts, deleted)
created, err = Add(ioctx, rtName, map[string]struct{}{
"ref1": {},
"ref2": {},
})
assert.NoError(ts, err)
assert.False(ts, created)
deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{
"ref3": reftype.Normal,
})
assert.NoError(ts, err)
assert.False(ts, deleted)
deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{
"ref1": reftype.Normal,
"ref2": reftype.Normal,
})
assert.NoError(ts, err)
assert.True(ts, deleted)
})
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reftype
import (
"fmt"
"github.com/ceph/ceph-csi/internal/util/reftracker/errors"
)
// RefType describes type of the reftracker reference.
type RefType int8
const (
refTypeSize = 1
// Unknown reftype used to signal error state.
Unknown RefType = 0
// Normal type tags the reference to have normal effect on the reference
// count. Adding Normal reference increments the reference count. Removing
// Normal reference decrements the reference count.
//
// It may be converted to a Mask if it is removed with Mask reftype.
Normal RefType = 1
// Mask type tags the reference to be masked, making it not contribute to the
// overall reference count. The reference will be ignored by all future Add()
// calls until it is removed with Normal reftype.
Mask RefType = 2
)
func ToBytes(t RefType) []byte {
return []byte{byte(t)}
}
func FromBytes(bs []byte) (RefType, error) {
if len(bs) != refTypeSize {
return Unknown, errors.UnexpectedReadSize(refTypeSize, len(bs))
}
num := RefType(bs[0])
switch num { // nolint:exhaustive // reftype.Unknown is handled in default case.
case Normal, Mask:
return num, nil
default:
return Unknown, fmt.Errorf("unknown reftype %d", num)
}
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reftype
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestRefTypeBytes(t *testing.T) {
t.Parallel()
var (
refTypeNormalBytes = []byte{1}
refTypeMaskBytes = []byte{2}
expectedBytes = [][]byte{refTypeNormalBytes, refTypeMaskBytes}
refTypes = []RefType{Normal, Mask}
refTypeInvalidBytes = []byte{0xFF}
refTypeWrongSizeBytes = []byte{0, 0, 0, 0, 1}
)
t.Run("ToBytes", func(ts *testing.T) {
ts.Parallel()
for i := range expectedBytes {
bs := ToBytes(refTypes[i])
assert.Equal(ts, expectedBytes[i], bs)
}
})
t.Run("FromBytes", func(ts *testing.T) {
ts.Parallel()
for i := range refTypes {
refType, err := FromBytes(expectedBytes[i])
assert.NoError(ts, err)
assert.Equal(ts, refTypes[i], refType)
}
_, err := FromBytes(refTypeInvalidBytes)
assert.Error(ts, err)
_, err = FromBytes(refTypeWrongSizeBytes)
assert.Error(ts, err)
})
}

View File

@ -0,0 +1,47 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"encoding/binary"
"github.com/ceph/ceph-csi/internal/util/reftracker/errors"
)
// Represents the number of references a reftracker object holds.
type refCount uint32
const (
Version = 1
refCountSize = 4
)
func (rc refCount) toBytes() []byte {
bs := make([]byte, refCountSize)
binary.BigEndian.PutUint32(bs, uint32(rc))
return bs
}
func refCountFromBytes(bs []byte) (refCount, error) {
if len(bs) != refCountSize {
return 0, errors.UnexpectedReadSize(refCountSize, len(bs))
}
return refCount(binary.BigEndian.Uint32(bs)), nil
}

View File

@ -0,0 +1,51 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestV1RefCountBytes(t *testing.T) {
t.Parallel()
var (
refCountBytes = []byte{0x0, 0x0, 0x0, 0x7B}
refCountValue = refCount(123)
wrongSizeRefCountBytes = []byte{0, 0, 1}
)
t.Run("ToBytes", func(ts *testing.T) {
ts.Parallel()
bs := refCountValue.toBytes()
assert.Equal(ts, refCountBytes, bs)
})
t.Run("FromBytes", func(ts *testing.T) {
ts.Parallel()
rc, err := refCountFromBytes(refCountBytes)
assert.NoError(ts, err)
assert.Equal(ts, refCountValue, rc)
_, err = refCountFromBytes(wrongSizeRefCountBytes)
assert.Error(ts, err)
})
}

View File

@ -0,0 +1,314 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
goerrors "errors"
"fmt"
"github.com/ceph/ceph-csi/internal/util/reftracker/errors"
"github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper"
"github.com/ceph/ceph-csi/internal/util/reftracker/reftype"
"github.com/ceph/ceph-csi/internal/util/reftracker/version"
"github.com/ceph/go-ceph/rados"
)
/*
Version 1 layout:
-----------------
If not specified otherwise, all values are stored in big-endian order.
byte idx type name
-------- ------ ------
0 .. 3 uint32 refcount
`refcount`: Number of references held by the reftracker object. The actual
reference keys are stored in an OMap of the RADOS object.
OMap entry layout:
Key:
reftracker key.
Value:
byte idx type name
-------- ------ ------
0 .. 3 uint32 type
`type`: reference type defined in reftracker/reftype.
*/
type readResult struct {
// Total number of references held by the reftracker object.
total refCount
// Refs whose keys matched the request.
foundRefs map[string]reftype.RefType
}
// Atomically initializes a new reftracker object.
func Init(
ioctx radoswrapper.IOContextW,
rtName string,
refs map[string]struct{},
) error {
// Prepare refcount and OMap key-value pairs.
refsToAddBytes := make(map[string][]byte, len(refs))
for ref := range refs {
refsToAddBytes[ref] = reftype.ToBytes(reftype.Normal)
}
// Perform the write.
w := ioctx.CreateWriteOp()
defer w.Release()
w.Create(rados.CreateExclusive)
w.SetXattr(version.XattrName, version.ToBytes(Version))
w.SetOmap(refsToAddBytes)
w.WriteFull(refCount(len(refsToAddBytes)).toBytes())
return errors.FailedObjectWrite(w.Operate(rtName))
}
// Atomically adds refs to an existing reftracker object.
func Add(
ioctx radoswrapper.IOContextW,
rtName string,
gen uint64,
refs map[string]struct{},
) error {
// Read the reftracker object to figure out which refs to add.
readRes, err := readObjectByKeys(ioctx, rtName, gen, refsMapToKeysSlice(refs))
if err != nil {
return errors.FailedObjectRead(err)
}
// Build list of refs to add.
// Add only refs that are missing in the reftracker object.
refsToAdd := make(map[string][]byte)
for ref := range refs {
if _, found := readRes.foundRefs[ref]; !found {
refsToAdd[ref] = reftype.ToBytes(reftype.Normal)
}
}
if len(refsToAdd) == 0 {
// Nothing to do.
return nil
}
// Calculate new refcount.
rcToAdd := refCount(len(refsToAdd))
newRC := readRes.total + rcToAdd
if newRC < readRes.total {
return goerrors.New("addition would overflow uint32 refcount")
}
// Write the data.
w := ioctx.CreateWriteOp()
defer w.Release()
w.AssertVersion(gen)
w.WriteFull(newRC.toBytes())
w.SetOmap(refsToAdd)
return errors.FailedObjectWrite(w.Operate(rtName))
}
// Atomically removes refs from reftracker object. If the object wouldn't hold
// any references after the removal, the whole object is deleted instead.
func Remove(
ioctx radoswrapper.IOContextW,
rtName string,
gen uint64,
refs map[string]reftype.RefType,
) (bool, error) {
// Read the reftracker object to figure out which refs to remove.
readRes, err := readObjectByKeys(ioctx, rtName, gen, typedRefsMapToKeysSlice(refs))
if err != nil {
return false, errors.FailedObjectRead(err)
}
// Build lists of refs to remove, replace, and add.
// There are three cases that need to be handled:
// (1) removing reftype.Normal refs,
// (2) converting refs that were reftype.Normal into reftype.Mask,
// (3) adding a new reftype.Mask key.
var (
refsToRemove []string
refsToSet = make(map[string][]byte)
rcToSubtract refCount
)
for ref, refType := range refs {
if matchedRefType, found := readRes.foundRefs[ref]; found {
if refType == reftype.Normal {
// Case (1): regular removal of Normal ref.
refsToRemove = append(refsToRemove, ref)
if matchedRefType == reftype.Normal {
// If matchedRef was reftype.Mask, it would have already been
// subtracted from the refcount.
rcToSubtract++
}
} else if refType == reftype.Mask && matchedRefType == reftype.Normal {
// Case (2): convert Normal ref to Mask.
// Since this ref is now reftype.Mask, rcToSubtract needs to be adjusted
// too -- so that this ref is not counted in.
refsToSet[ref] = reftype.ToBytes(reftype.Mask)
rcToSubtract++
}
} else {
if refType == reftype.Mask {
// Case (3): add a new Mask ref.
// reftype.Mask doesn't contribute refcount so no change to rcToSubtract.
refsToSet[ref] = reftype.ToBytes(reftype.Mask)
} // else: No such ref was found, so there's nothing to remove.
}
}
if len(refsToRemove) == 0 && len(refsToSet) == 0 {
// Nothing to do.
return false, nil
}
// Calculate new refcount.
if rcToSubtract > readRes.total {
// BUG: this should never happen!
return false, fmt.Errorf("refcount underflow, reftracker object corrupted")
}
newRC := readRes.total - rcToSubtract
// If newRC is zero, it means all refs that the reftracker object held will be
// now gone, and the object must be deleted.
deleted := newRC == 0
// Write the data.
w := ioctx.CreateWriteOp()
defer w.Release()
w.AssertVersion(gen)
if deleted {
w.Remove()
} else {
w.WriteFull(newRC.toBytes())
w.RmOmapKeys(refsToRemove)
w.SetOmap(refsToSet)
}
if err := w.Operate(rtName); err != nil {
return false, errors.FailedObjectWrite(err)
}
return deleted, nil
}
// Tries to find `keys` in reftracker object and returns the result. Failing to
// find any particular key does not result in an error.
func readObjectByKeys(
ioctx radoswrapper.IOContextW,
rtName string,
gen uint64,
keys []string,
) (*readResult, error) {
// Read data from object.
rcBytes := make([]byte, refCountSize)
r := ioctx.CreateReadOp()
defer r.Release()
r.AssertVersion(gen)
r.Read(0, rcBytes)
s := r.GetOmapValuesByKeys(keys)
if err := r.Operate(rtName); err != nil {
return nil, errors.TryRADOSAborted(err)
}
// Convert it from byte slices to type-safe values.
var (
rc refCount
refs = make(map[string]reftype.RefType)
err error
)
rc, err = refCountFromBytes(rcBytes)
if err != nil {
return nil, fmt.Errorf("failed to parse refcount: %w", err)
}
for {
kvPair, err := s.Next()
if err != nil {
return nil, fmt.Errorf("failed to iterate over OMap: %w", err)
}
if kvPair == nil {
break
}
refType, err := reftype.FromBytes(kvPair.Value)
if err != nil {
return nil, fmt.Errorf("failed to parse reftype: %w", err)
}
refs[kvPair.Key] = refType
}
return &readResult{
total: rc,
foundRefs: refs,
}, nil
}
func refsMapToKeysSlice(m map[string]struct{}) []string {
s := make([]string, 0, len(m))
for k := range m {
s = append(s, k)
}
return s
}
func typedRefsMapToKeysSlice(m map[string]reftype.RefType) []string {
s := make([]string, 0, len(m))
for k := range m {
s = append(s, k)
}
return s
}

View File

@ -0,0 +1,423 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
goerrors "errors"
"testing"
"github.com/ceph/ceph-csi/internal/util/reftracker/errors"
"github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper"
"github.com/ceph/ceph-csi/internal/util/reftracker/reftype"
"github.com/stretchr/testify/assert"
)
func TestV1Read(t *testing.T) {
t.Parallel()
const rtName = "hello-rt"
var (
gen = uint64(0)
validObj = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {
Oid: rtName,
Data: []byte{0, 0, 0, 0},
Omap: make(map[string][]byte),
},
},
})
invalidObjs = []*radoswrapper.FakeIOContext{
// Missing object.
radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()),
// Bad generation number.
radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {
Ver: 123,
Oid: rtName,
Data: []byte{0, 0, 0, 0},
},
},
}),
// Refcount overflow.
radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {
Oid: rtName,
Data: []byte{0xFF, 0xFF, 0xFF, 0xFF},
},
},
}),
}
refsToAdd = map[string]struct{}{"ref1": {}}
)
err := Add(validObj, rtName, gen, refsToAdd)
assert.NoError(t, err)
for i := range invalidObjs {
err = Add(invalidObjs[i], rtName, gen, refsToAdd)
assert.Error(t, err)
}
// Check for correct error type for wrong gen num.
err = Add(invalidObjs[1], rtName, gen, refsToAdd)
assert.Error(t, err)
assert.True(t, goerrors.Is(err, errors.ErrObjectOutOfDate))
}
func TestV1Init(t *testing.T) {
t.Parallel()
const rtName = "hello-rt"
var (
emptyRados = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{},
})
alreadyExists = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {},
},
})
refsToInit = map[string]struct{}{"ref1": {}}
)
err := Init(emptyRados, rtName, refsToInit)
assert.NoError(t, err)
err = Init(alreadyExists, rtName, refsToInit)
assert.Error(t, err)
}
func TestV1Add(t *testing.T) {
t.Parallel()
const rtName = "hello-rt"
var (
shouldSucceed = []struct {
before *radoswrapper.FakeObj
refsToAdd map[string]struct{}
after *radoswrapper.FakeObj
}{
// Add a new ref.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
refsToAdd: map[string]struct{}{
"ref2": {},
},
after: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 1,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
"ref2": reftype.ToBytes(reftype.Normal),
},
Data: refCount(2).toBytes(),
},
},
// Try to add a ref that's already tracked.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
refsToAdd: map[string]struct{}{
"ref1": {},
},
after: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
},
// Try to add a ref that's masked.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
"ref2": reftype.ToBytes(reftype.Mask),
},
Data: refCount(1).toBytes(),
},
refsToAdd: map[string]struct{}{
"ref1": {},
},
after: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
"ref2": reftype.ToBytes(reftype.Mask),
},
Data: refCount(1).toBytes(),
},
},
}
shouldFail = []*radoswrapper.FakeIOContext{
// Missing object.
radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()),
// Bad generation number.
radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {
Ver: 123,
Oid: rtName,
Data: []byte{0, 0, 0, 0},
},
},
}),
// Refcount overflow.
radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {
Oid: rtName,
Data: []byte{0xFF, 0xFF, 0xFF, 0xFF},
},
},
}),
}
)
for i := range shouldSucceed {
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
ioctx.Rados.Objs[rtName] = shouldSucceed[i].before
err := Add(ioctx, rtName, 0, shouldSucceed[i].refsToAdd)
assert.NoError(t, err)
assert.Equal(t, shouldSucceed[i].after, ioctx.Rados.Objs[rtName])
}
for i := range shouldFail {
err := Add(shouldFail[i], rtName, 0, map[string]struct{}{"ref1": {}})
assert.Error(t, err)
}
// Check for correct error type for wrong gen num.
err := Add(shouldFail[1], rtName, 0, map[string]struct{}{"ref1": {}})
assert.Error(t, err)
assert.True(t, goerrors.Is(err, errors.ErrObjectOutOfDate))
}
func TestV1Remove(t *testing.T) {
t.Parallel()
const rtName = "hello-rt"
var (
shouldSucceed = []struct {
before *radoswrapper.FakeObj
refsToRemove map[string]reftype.RefType
after *radoswrapper.FakeObj
deleted bool
}{
// Remove without deleting the reftracker object.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
"ref2": reftype.ToBytes(reftype.Normal),
},
Data: refCount(2).toBytes(),
},
refsToRemove: map[string]reftype.RefType{
"ref1": reftype.Normal,
},
after: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 1,
Omap: map[string][]byte{
"ref2": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
deleted: false,
},
// Remove and delete the reftracker object.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
refsToRemove: map[string]reftype.RefType{
"ref1": reftype.Normal,
},
after: nil,
deleted: true,
},
// Remove and delete the reftracker object.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
refsToRemove: map[string]reftype.RefType{
"ref1": reftype.Normal,
},
after: nil,
deleted: true,
},
// Mask a ref without deleting reftracker object.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
"ref2": reftype.ToBytes(reftype.Normal),
},
Data: refCount(2).toBytes(),
},
refsToRemove: map[string]reftype.RefType{
"ref2": reftype.Mask,
},
after: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 1,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
"ref2": reftype.ToBytes(reftype.Mask),
},
Data: refCount(1).toBytes(),
},
deleted: false,
},
// Mask a ref and delete reftracker object.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
refsToRemove: map[string]reftype.RefType{
"ref1": reftype.Mask,
},
after: nil,
deleted: true,
},
// Add a masking ref.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
refsToRemove: map[string]reftype.RefType{
"ref2": reftype.Mask,
},
after: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 1,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
"ref2": reftype.ToBytes(reftype.Mask),
},
Data: refCount(1).toBytes(),
},
deleted: false,
},
// Try to remove non-existent ref.
{
before: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
refsToRemove: map[string]reftype.RefType{
"ref2": reftype.Normal,
},
after: &radoswrapper.FakeObj{
Oid: rtName,
Ver: 0,
Omap: map[string][]byte{
"ref1": reftype.ToBytes(reftype.Normal),
},
Data: refCount(1).toBytes(),
},
deleted: false,
},
}
// Bad generation number.
badGen = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {
Ver: 123,
},
},
})
)
for i := range shouldSucceed {
ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados())
ioctx.Rados.Objs[rtName] = shouldSucceed[i].before
deleted, err := Remove(ioctx, rtName, 0, shouldSucceed[i].refsToRemove)
assert.NoError(t, err)
assert.Equal(t, shouldSucceed[i].deleted, deleted)
assert.Equal(t, shouldSucceed[i].after, ioctx.Rados.Objs[rtName])
}
_, err := Remove(badGen, rtName, 0, map[string]reftype.RefType{"ref": reftype.Normal})
assert.Error(t, err)
assert.True(t, goerrors.Is(err, errors.ErrObjectOutOfDate))
}

View File

@ -0,0 +1,64 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package version
import (
"encoding/binary"
"github.com/ceph/ceph-csi/internal/util/reftracker/errors"
"github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper"
)
// reftracker objects are versioned, should the object layout need to change.
// Version is stored in its underlying RADOS object xattr as uint32.
const (
// Name of the xattr entry in the RADOS object.
XattrName = "csi.ceph.com/rt-version"
// SizeBytes is the size of version in bytes.
SizeBytes = 4
)
func ToBytes(v uint32) []byte {
bs := make([]byte, SizeBytes)
binary.BigEndian.PutUint32(bs, v)
return bs
}
func FromBytes(bs []byte) (uint32, error) {
if len(bs) != SizeBytes {
return 0, errors.UnexpectedReadSize(SizeBytes, len(bs))
}
return binary.BigEndian.Uint32(bs), nil
}
func Read(ioctx radoswrapper.IOContextW, rtName string) (uint32, error) {
verBytes := make([]byte, SizeBytes)
readSize, err := ioctx.GetXattr(rtName, XattrName, verBytes)
if err != nil {
return 0, err
}
if readSize != SizeBytes {
return 0, errors.UnexpectedReadSize(SizeBytes, readSize)
}
return FromBytes(verBytes)
}

View File

@ -0,0 +1,111 @@
/*
Copyright 2022 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package version
import (
"testing"
"github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper"
"github.com/stretchr/testify/assert"
)
var (
v1Bytes = []byte{0, 0, 0, 1}
v1Value = uint32(1)
wrongSizeVersionBytes = []byte{0, 0, 1}
)
func TestVersionBytes(t *testing.T) {
t.Parallel()
t.Run("ToBytes", func(ts *testing.T) {
ts.Parallel()
bs := ToBytes(v1Value)
assert.Equal(ts, v1Bytes, bs)
})
t.Run("FromBytes", func(ts *testing.T) {
ts.Parallel()
ver, err := FromBytes(v1Bytes)
assert.NoError(ts, err)
assert.Equal(ts, v1Value, ver)
_, err = FromBytes(wrongSizeVersionBytes)
assert.Error(ts, err)
})
}
func TestVersionRead(t *testing.T) {
t.Parallel()
const rtName = "hello-rt"
var (
validObj = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {
Oid: rtName,
Xattrs: map[string][]byte{
XattrName: v1Bytes,
},
},
},
})
invalidObjs = []*radoswrapper.FakeIOContext{
// Missing object.
radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{},
}),
// Missing xattr.
radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {
Oid: rtName,
Xattrs: map[string][]byte{
"some-other-xattr": v1Bytes,
},
},
},
}),
// Wrongly sized version value.
radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{
Objs: map[string]*radoswrapper.FakeObj{
rtName: {
Oid: rtName,
Xattrs: map[string][]byte{
XattrName: wrongSizeVersionBytes,
},
},
},
}),
}
)
ver, err := Read(validObj, rtName)
assert.NoError(t, err)
assert.Equal(t, v1Value, ver)
for i := range invalidObjs {
_, err = Read(invalidObjs[i], rtName)
assert.Error(t, err)
}
}