diff --git a/internal/util/reftracker/errors/errors.go b/internal/util/reftracker/errors/errors.go new file mode 100644 index 000000000..33f1a2740 --- /dev/null +++ b/internal/util/reftracker/errors/errors.go @@ -0,0 +1,85 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +import ( + goerrors "errors" + "fmt" + + "github.com/ceph/go-ceph/rados" + "golang.org/x/sys/unix" +) + +// ErrObjectOutOfDate is an error returned by RADOS read/write ops whose +// rados_*_op_assert_version failed. +var ErrObjectOutOfDate = goerrors.New("object is out of date since the last time it was read, try again later") + +// UnexpectedReadSize formats an error message for a failure due to bad read +// size. +func UnexpectedReadSize(expectedBytes, actualBytes int) error { + return fmt.Errorf("unexpected size read: expected %d bytes, got %d", + expectedBytes, actualBytes) +} + +// UnknownObjectVersion formats an error message for a failure due to unknown +// reftracker object version. +func UnknownObjectVersion(unknownVersion uint32) error { + return fmt.Errorf("unknown reftracker version %d", unknownVersion) +} + +// FailedObjectRead formats an error message for a failed RADOS read op. +func FailedObjectRead(cause error) error { + if cause != nil { + return fmt.Errorf("failed to read object: %w", TryRADOSAborted(cause)) + } + + return nil +} + +// FailedObjectRead formats an error message for a failed RADOS read op. +func FailedObjectWrite(cause error) error { + if cause != nil { + return fmt.Errorf("failed to write object: %w", TryRADOSAborted(cause)) + } + + return nil +} + +// TryRADOSAborted tries to extract rados_*_op_assert_version from opErr. +func TryRADOSAborted(opErr error) error { + if opErr == nil { + return nil + } + + var radosOpErr rados.OperationError + if !goerrors.As(opErr, &radosOpErr) { + return opErr + } + + // nolint:errorlint // Can't use errors.As() because rados.radosError is private. + errnoErr, ok := radosOpErr.OpError.(interface{ ErrorCode() int }) + if !ok { + return opErr + } + + errno := errnoErr.ErrorCode() + if errno == -int(unix.EOVERFLOW) || errno == -int(unix.ERANGE) { + return ErrObjectOutOfDate + } + + return nil +} diff --git a/internal/util/reftracker/radoswrapper/fakerados.go b/internal/util/reftracker/radoswrapper/fakerados.go new file mode 100644 index 000000000..c7ff59862 --- /dev/null +++ b/internal/util/reftracker/radoswrapper/fakerados.go @@ -0,0 +1,551 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package radoswrapper + +import ( + "fmt" + + "github.com/ceph/go-ceph/rados" + "golang.org/x/sys/unix" +) + +type ( + FakeObj struct { + Oid string + Ver uint64 + Xattrs map[string][]byte + Omap map[string][]byte + Data []byte + } + + FakeRados struct { + Objs map[string]*FakeObj + } + + FakeIOContext struct { + LastObjVersion uint64 + Rados *FakeRados + } + + FakeWriteOp struct { + IoCtx *FakeIOContext + + steps map[fakeWriteOpStepExecutorIdx]fakeWriteOpStepExecutor + oid string + } + + FakeReadOp struct { + IoCtx *FakeIOContext + + steps map[fakeReadOpStepExecutorIdx]fakeReadOpStepExecutor + oid string + } + + fakeWriteOpStepExecutorIdx int + fakeReadOpStepExecutorIdx int + + fakeWriteOpStepExecutor interface { + operate(w *FakeWriteOp) error + } + + fakeReadOpStepExecutor interface { + operate(r *FakeReadOp) error + } + + fakeRadosError int +) + +const ( + fakeWriteOpAssertVersionExecutorIdx fakeWriteOpStepExecutorIdx = iota + fakeWriteOpRemoveExecutorIdx + fakeWriteOpCreateExecutorIdx + fakeWriteOpSetXattrExecutorIdx + fakeWriteOpWriteFullExecutorIdx + fakeWriteOpRmOmapKeysExecutorIdx + fakeWriteOpSetOmapExecutorIdx + + fakeReadOpAssertVersionExecutorIdx fakeReadOpStepExecutorIdx = iota + fakeReadOpReadExecutorIdx + fakeReadOpGetOmapValuesByKeysExecutorIdx +) + +var ( + _ IOContextW = &FakeIOContext{} + + // fakeWriteOpStepExecutorOrder defines fixed order in which the write ops are performed. + fakeWriteOpStepExecutorOrder = []fakeWriteOpStepExecutorIdx{ + fakeWriteOpAssertVersionExecutorIdx, + fakeWriteOpRemoveExecutorIdx, + fakeWriteOpCreateExecutorIdx, + fakeWriteOpSetXattrExecutorIdx, + fakeWriteOpWriteFullExecutorIdx, + fakeWriteOpRmOmapKeysExecutorIdx, + fakeWriteOpSetOmapExecutorIdx, + } + + // fakeReadOpStepExecutorOrder defines fixed order in which the read ops are performed. + fakeReadOpStepExecutorOrder = []fakeReadOpStepExecutorIdx{ + fakeReadOpAssertVersionExecutorIdx, + fakeReadOpReadExecutorIdx, + fakeReadOpGetOmapValuesByKeysExecutorIdx, + } +) + +func NewFakeRados() *FakeRados { + return &FakeRados{ + Objs: make(map[string]*FakeObj), + } +} + +func NewFakeIOContext(fakeRados *FakeRados) *FakeIOContext { + return &FakeIOContext{ + Rados: fakeRados, + } +} + +func (e fakeRadosError) Error() string { + return fmt.Sprintf("FakeRados errno=%d", int(e)) +} + +func (e fakeRadosError) ErrorCode() int { + return int(e) +} + +func (o *FakeObj) String() string { + return fmt.Sprintf("%s{Ver=%d, Xattrs(%d)=%+v, OMap(%d)=%+v, Data(%d)=%+v}", + o.Oid, o.Ver, len(o.Xattrs), o.Xattrs, len(o.Omap), o.Omap, len(o.Data), o.Data) +} + +func (c *FakeIOContext) GetLastVersion() (uint64, error) { + return c.LastObjVersion, nil +} + +func (c *FakeIOContext) getObj(oid string) (*FakeObj, error) { + obj, ok := c.Rados.Objs[oid] + if !ok { + return nil, rados.ErrNotFound + } + + return obj, nil +} + +func (c *FakeIOContext) GetXattr(oid, key string, data []byte) (int, error) { + obj, ok := c.Rados.Objs[oid] + if !ok { + return 0, rados.ErrNotFound + } + + xattr, ok := obj.Xattrs[key] + if !ok { + return 0, fakeRadosError(-int(unix.ENODATA)) + } + copy(data, xattr) + + return len(xattr), nil +} + +func (c *FakeIOContext) CreateWriteOp() WriteOpW { + return &FakeWriteOp{ + IoCtx: c, + steps: make(map[fakeWriteOpStepExecutorIdx]fakeWriteOpStepExecutor), + } +} + +func (w *FakeWriteOp) Operate(oid string) error { + if len(w.steps) == 0 { + return nil + } + + w.oid = oid + + for _, writeOpExecutorIdx := range fakeWriteOpStepExecutorOrder { + e, ok := w.steps[writeOpExecutorIdx] + if !ok { + continue + } + + if err := e.operate(w); err != nil { + return err + } + } + + if obj, err := w.IoCtx.getObj(oid); err == nil { + obj.Ver++ + w.IoCtx.LastObjVersion = obj.Ver + } + + return nil +} + +func (w *FakeWriteOp) Release() {} + +func (c *FakeIOContext) CreateReadOp() ReadOpW { + return &FakeReadOp{ + IoCtx: c, + steps: make(map[fakeReadOpStepExecutorIdx]fakeReadOpStepExecutor), + } +} + +func (r *FakeReadOp) Operate(oid string) error { + r.oid = oid + + for _, readOpExecutorIdx := range fakeReadOpStepExecutorOrder { + e, ok := r.steps[readOpExecutorIdx] + if !ok { + continue + } + + if err := e.operate(r); err != nil { + return err + } + } + + if obj, err := r.IoCtx.getObj(oid); err == nil { + r.IoCtx.LastObjVersion = obj.Ver + } + + return nil +} + +func (r *FakeReadOp) Release() {} + +// WriteOp Create + +type fakeWriteOpCreateExecutor struct { + exclusive rados.CreateOption +} + +func (e *fakeWriteOpCreateExecutor) operate(w *FakeWriteOp) error { + if e.exclusive == rados.CreateExclusive { + if _, exists := w.IoCtx.Rados.Objs[w.oid]; exists { + return rados.ErrObjectExists + } + } + + w.IoCtx.Rados.Objs[w.oid] = &FakeObj{ + Oid: w.oid, + Omap: make(map[string][]byte), + Xattrs: make(map[string][]byte), + } + + return nil +} + +func (w *FakeWriteOp) Create(exclusive rados.CreateOption) { + w.steps[fakeWriteOpCreateExecutorIdx] = &fakeWriteOpCreateExecutor{ + exclusive: exclusive, + } +} + +// WriteOp Remove + +type fakeWriteOpRemoveExecutor struct{} + +func (e *fakeWriteOpRemoveExecutor) operate(w *FakeWriteOp) error { + if _, err := w.IoCtx.getObj(w.oid); err != nil { + return err + } + + delete(w.IoCtx.Rados.Objs, w.oid) + + return nil +} + +func (w *FakeWriteOp) Remove() { + w.steps[fakeWriteOpRemoveExecutorIdx] = &fakeWriteOpRemoveExecutor{} +} + +// WriteOp SetXattr + +type fakeWriteOpSetXattrExecutor struct { + name string + value []byte +} + +func (e *fakeWriteOpSetXattrExecutor) operate(w *FakeWriteOp) error { + obj, err := w.IoCtx.getObj(w.oid) + if err != nil { + return err + } + + obj.Xattrs[e.name] = e.value + + return nil +} + +func (w *FakeWriteOp) SetXattr(name string, value []byte) { + valueCopy := append([]byte(nil), value...) + + w.steps[fakeWriteOpSetXattrExecutorIdx] = &fakeWriteOpSetXattrExecutor{ + name: name, + value: valueCopy, + } +} + +// WriteOp WriteFull + +type fakeWriteOpWriteFullExecutor struct { + data []byte +} + +func (e *fakeWriteOpWriteFullExecutor) operate(w *FakeWriteOp) error { + obj, err := w.IoCtx.getObj(w.oid) + if err != nil { + return err + } + + obj.Data = e.data + + return nil +} + +func (w *FakeWriteOp) WriteFull(b []byte) { + bCopy := append([]byte(nil), b...) + + w.steps[fakeWriteOpWriteFullExecutorIdx] = &fakeWriteOpWriteFullExecutor{ + data: bCopy, + } +} + +// WriteOp SetOmap + +type fakeWriteOpSetOmapExecutor struct { + pairs map[string][]byte +} + +func (e *fakeWriteOpSetOmapExecutor) operate(w *FakeWriteOp) error { + obj, err := w.IoCtx.getObj(w.oid) + if err != nil { + return err + } + + for k, v := range e.pairs { + obj.Omap[k] = v + } + + return nil +} + +func (w *FakeWriteOp) SetOmap(pairs map[string][]byte) { + pairsCopy := make(map[string][]byte, len(pairs)) + for k, v := range pairs { + vCopy := append([]byte(nil), v...) + pairsCopy[k] = vCopy + } + + w.steps[fakeWriteOpSetOmapExecutorIdx] = &fakeWriteOpSetOmapExecutor{ + pairs: pairsCopy, + } +} + +// WriteOp RmOmapKeys + +type fakeWriteOpRmOmapKeysExecutor struct { + keys []string +} + +func (e *fakeWriteOpRmOmapKeysExecutor) operate(w *FakeWriteOp) error { + obj, err := w.IoCtx.getObj(w.oid) + if err != nil { + return err + } + + for _, k := range e.keys { + delete(obj.Omap, k) + } + + return nil +} + +func (w *FakeWriteOp) RmOmapKeys(keys []string) { + keysCopy := append([]string(nil), keys...) + + w.steps[fakeWriteOpRmOmapKeysExecutorIdx] = &fakeWriteOpRmOmapKeysExecutor{ + keys: keysCopy, + } +} + +// WriteOp AssertVersion + +type fakeWriteOpAssertVersionExecutor struct { + version uint64 +} + +func (e *fakeWriteOpAssertVersionExecutor) operate(w *FakeWriteOp) error { + obj, err := w.IoCtx.getObj(w.oid) + if err != nil { + return err + } + + return validateObjVersion(obj.Ver, e.version) +} + +func (w *FakeWriteOp) AssertVersion(v uint64) { + w.steps[fakeWriteOpAssertVersionExecutorIdx] = &fakeWriteOpAssertVersionExecutor{ + version: v, + } +} + +// ReadOp Read + +type fakeReadOpReadExecutor struct { + offset int + buffer []byte + step *rados.ReadOpReadStep +} + +func (e *fakeReadOpReadExecutor) operate(r *FakeReadOp) error { + obj, err := r.IoCtx.getObj(r.oid) + if err != nil { + return err + } + + if e.offset > len(obj.Data) { + // RADOS just returns zero bytes read. + return nil + } + + end := e.offset + len(e.buffer) + if end > len(obj.Data) { + end = len(obj.Data) + } + + nbytes := end - e.offset + e.step.BytesRead = int64(nbytes) + copy(e.buffer, obj.Data[e.offset:]) + + return nil +} + +func (r *FakeReadOp) Read(offset uint64, buffer []byte) *rados.ReadOpReadStep { + s := &rados.ReadOpReadStep{} + r.steps[fakeReadOpReadExecutorIdx] = &fakeReadOpReadExecutor{ + offset: int(offset), + buffer: buffer, + step: s, + } + + return s +} + +// ReadOp GetOmapValuesByKeys + +type ( + fakeReadOpGetOmapValuesByKeysExecutor struct { + keys []string + step *FakeReadOpOmapGetValsByKeysStep + } + + FakeReadOpOmapGetValsByKeysStep struct { + pairs []rados.OmapKeyValue + idx int + canIterate bool + } +) + +func (e *fakeReadOpGetOmapValuesByKeysExecutor) operate(r *FakeReadOp) error { + obj, err := r.IoCtx.getObj(r.oid) + if err != nil { + return err + } + + var pairs []rados.OmapKeyValue + for _, key := range e.keys { + val, ok := obj.Omap[key] + if !ok { + continue + } + + pairs = append(pairs, rados.OmapKeyValue{ + Key: key, + Value: val, + }) + } + + e.step.pairs = pairs + e.step.canIterate = true + + return nil +} + +func (s *FakeReadOpOmapGetValsByKeysStep) Next() (*rados.OmapKeyValue, error) { + if !s.canIterate { + return nil, rados.ErrOperationIncomplete + } + + if s.idx >= len(s.pairs) { + return nil, nil + } + + omapKeyValue := &s.pairs[s.idx] + s.idx++ + + return omapKeyValue, nil +} + +func (r *FakeReadOp) GetOmapValuesByKeys(keys []string) ReadOpOmapGetValsByKeysStepW { + keysCopy := append([]string(nil), keys...) + + s := &FakeReadOpOmapGetValsByKeysStep{} + r.steps[fakeReadOpGetOmapValuesByKeysExecutorIdx] = &fakeReadOpGetOmapValuesByKeysExecutor{ + keys: keysCopy, + step: s, + } + + return s +} + +// ReadOp AssertVersion + +type fakeReadOpAssertVersionExecutor struct { + version uint64 +} + +func (e *fakeReadOpAssertVersionExecutor) operate(r *FakeReadOp) error { + obj, err := r.IoCtx.getObj(r.oid) + if err != nil { + return err + } + + return validateObjVersion(obj.Ver, e.version) +} + +func (r *FakeReadOp) AssertVersion(v uint64) { + r.steps[fakeReadOpAssertVersionExecutorIdx] = &fakeReadOpAssertVersionExecutor{ + version: v, + } +} + +func validateObjVersion(expected, actual uint64) error { + // See librados docs for returning error codes in rados_*_op_assert_version: + // https://docs.ceph.com/en/latest/rados/api/librados/?#c.rados_write_op_assert_version + // https://docs.ceph.com/en/latest/rados/api/librados/?#c.rados_read_op_assert_version + + if expected > actual { + return rados.OperationError{ + OpError: fakeRadosError(-int(unix.ERANGE)), + } + } + + if expected < actual { + return rados.OperationError{ + OpError: fakeRadosError(-int(unix.EOVERFLOW)), + } + } + + return nil +} diff --git a/internal/util/reftracker/radoswrapper/interface.go b/internal/util/reftracker/radoswrapper/interface.go new file mode 100644 index 000000000..23e21dc91 --- /dev/null +++ b/internal/util/reftracker/radoswrapper/interface.go @@ -0,0 +1,106 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package radoswrapper + +import ( + "github.com/ceph/go-ceph/rados" +) + +// These interfaces are just wrappers around some of go-ceph's rados pkg +// structures and functions. They have two implementations: the "real" one +// (that simply uses go-ceph), and a fake one, used in unit tests. + +// IOContextW is a wrapper around rados.IOContext. +type IOContextW interface { + // GetLastVersion will return the version number of the last object read or + // written to. + GetLastVersion() (uint64, error) + + // GetXattr gets an xattr with key `name`, it returns the length of + // the key read or an error if not successful + GetXattr(oid string, key string, data []byte) (int, error) + + // CreateWriteOp returns a newly constructed write operation. + CreateWriteOp() WriteOpW + + // CreateReadOp returns a newly constructed read operation. + CreateReadOp() ReadOpW +} + +// WriteOpW is a wrapper around rados.WriteOp interface. +type WriteOpW interface { + // Create a rados object. + Create(exclusive rados.CreateOption) + + // Remove object. + Remove() + + // SetXattr sets an xattr. + SetXattr(name string, value []byte) + + // WriteFull writes a given byte slice as the whole object, + // atomically replacing it. + WriteFull(b []byte) + + // SetOmap appends the map `pairs` to the omap `oid`. + SetOmap(pairs map[string][]byte) + + // RmOmapKeys removes the specified `keys` from the omap `oid`. + RmOmapKeys(keys []string) + + // AssertVersion ensures that the object exists and that its internal version + // number is equal to "ver" before writing. "ver" should be a version number + // previously obtained with IOContext.GetLastVersion(). + AssertVersion(ver uint64) + + // Operate will perform the operation(s). + Operate(oid string) error + + // Release the resources associated with this write operation. + Release() +} + +// ReadOpW is a wrapper around rados.ReadOp. +type ReadOpW interface { + // Read bytes from offset into buffer. + // len(buffer) is the maximum number of bytes read from the object. + // buffer[:ReadOpReadStep.BytesRead] then contains object data. + Read(offset uint64, buffer []byte) *rados.ReadOpReadStep + + // GetOmapValuesByKeys starts iterating over specific key/value pairs. + GetOmapValuesByKeys(keys []string) ReadOpOmapGetValsByKeysStepW + + // AssertVersion ensures that the object exists and that its internal version + // number is equal to "ver" before reading. "ver" should be a version number + // previously obtained with IOContext.GetLastVersion(). + AssertVersion(ver uint64) + + // Operate will perform the operation(s). + Operate(oid string) error + + // Release the resources associated with this read operation. + Release() +} + +// ReadOpOmapGetValsByKeysStepW is a wrapper around rados.ReadOpOmapGetValsByKeysStep. +type ReadOpOmapGetValsByKeysStepW interface { + // Next gets the next omap key/value pair referenced by + // ReadOpOmapGetValsByKeysStep's internal iterator. + // If there are no more elements to retrieve, (nil, nil) is returned. + // May be called only after Operate() finished. + Next() (*rados.OmapKeyValue, error) +} diff --git a/internal/util/reftracker/radoswrapper/radoswrapper.go b/internal/util/reftracker/radoswrapper/radoswrapper.go new file mode 100644 index 000000000..133ccfa1c --- /dev/null +++ b/internal/util/reftracker/radoswrapper/radoswrapper.go @@ -0,0 +1,133 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package radoswrapper + +import ( + "github.com/ceph/go-ceph/rados" +) + +type ( + IOContext struct { + *rados.IOContext + } + + WriteOp struct { + IoCtx *rados.IOContext + *rados.WriteOp + } + + ReadOp struct { + IoCtx *rados.IOContext + *rados.ReadOp + } + + ReadOpOmapGetValsByKeysStep struct { + *rados.ReadOpOmapGetValsByKeysStep + } +) + +var _ IOContextW = &IOContext{} + +func NewIOContext(ioctx *rados.IOContext) IOContextW { + return &IOContext{ + IOContext: ioctx, + } +} + +func (c *IOContext) GetLastVersion() (uint64, error) { + return c.IOContext.GetLastVersion() +} + +func (c *IOContext) GetXattr(oid, key string, data []byte) (int, error) { + return c.IOContext.GetXattr(oid, key, data) +} + +func (c *IOContext) CreateWriteOp() WriteOpW { + return &WriteOp{ + IoCtx: c.IOContext, + WriteOp: rados.CreateWriteOp(), + } +} + +func (c *IOContext) CreateReadOp() ReadOpW { + return &ReadOp{ + IoCtx: c.IOContext, + ReadOp: rados.CreateReadOp(), + } +} + +func (w *WriteOp) Create(exclusive rados.CreateOption) { + w.WriteOp.Create(exclusive) +} + +func (w *WriteOp) Remove() { + w.WriteOp.Remove() +} + +func (w *WriteOp) SetXattr(name string, value []byte) { + w.WriteOp.SetXattr(name, value) +} + +func (w *WriteOp) WriteFull(b []byte) { + w.WriteOp.WriteFull(b) +} + +func (w *WriteOp) SetOmap(pairs map[string][]byte) { + w.WriteOp.SetOmap(pairs) +} + +func (w *WriteOp) RmOmapKeys(keys []string) { + w.WriteOp.RmOmapKeys(keys) +} + +func (w *WriteOp) AssertVersion(v uint64) { + w.WriteOp.AssertVersion(v) +} + +func (w *WriteOp) Operate(oid string) error { + return w.WriteOp.Operate(w.IoCtx, oid, rados.OperationNoFlag) +} + +func (w *WriteOp) Release() { + w.WriteOp.Release() +} + +func (r *ReadOp) Read(offset uint64, buffer []byte) *rados.ReadOpReadStep { + return r.ReadOp.Read(offset, buffer) +} + +func (r *ReadOp) GetOmapValuesByKeys(keys []string) ReadOpOmapGetValsByKeysStepW { + return &ReadOpOmapGetValsByKeysStep{ + ReadOpOmapGetValsByKeysStep: r.ReadOp.GetOmapValuesByKeys(keys), + } +} + +func (r *ReadOp) AssertVersion(v uint64) { + r.ReadOp.AssertVersion(v) +} + +func (r *ReadOp) Operate(oid string) error { + return r.ReadOp.Operate(r.IoCtx, oid, rados.OperationNoFlag) +} + +func (r *ReadOp) Release() { + r.ReadOp.Release() +} + +func (s *ReadOpOmapGetValsByKeysStep) Next() (*rados.OmapKeyValue, error) { + return s.ReadOpOmapGetValsByKeysStep.Next() +} diff --git a/internal/util/reftracker/reftracker.go b/internal/util/reftracker/reftracker.go new file mode 100644 index 000000000..ce1863220 --- /dev/null +++ b/internal/util/reftracker/reftracker.go @@ -0,0 +1,248 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reftracker + +import ( + goerrors "errors" + "fmt" + + "github.com/ceph/ceph-csi/internal/util/reftracker/errors" + "github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper" + "github.com/ceph/ceph-csi/internal/util/reftracker/reftype" + v1 "github.com/ceph/ceph-csi/internal/util/reftracker/v1" + "github.com/ceph/ceph-csi/internal/util/reftracker/version" + + "github.com/ceph/go-ceph/rados" +) + +// reftracker is key-based implementation of a reference counter. +// +// Unlike integer-based counter, reftracker counts references by tracking +// unique keys. This allows accounting in situations where idempotency must be +// preserved. It guarantees there will be no duplicit increments or decrements +// of the counter. +// +// It is stored persistently as a RADOS object, and is safe to use with +// multiple concurrent writers, and across different nodes of a cluster. +// +// Example: +// +// created, err := Add( +// ioctx, +// "my-reftracker", +// map[string]struct{}{ +// "ref-key-1": {}, +// "ref-key-2": {}, +// }, +// ) +// +// Since this is a new reftracker object, `created` is `true`. +// +// "my-reftracker" now holds: +// ["ref-key-1":reftype.Normal, "ref-key-2":reftype.Normal] +// The reference count is 2. +// +// created, err := Add( +// ioctx, +// "my-reftracker", +// map[string]struct{}{ +// "ref-key-1": {}, +// "ref-key-2": {}, +// "ref-key-3": {}, +// }, +// ) +// +// Reftracker named "my-reftracker" already exists, so `created` is now +// `false`. Since "ref-key-1" and "ref-key-2" keys are already tracked, +// only "ref-key-3" is added. +// +// "my-reftracker" now holds: +// ["ref-key-1":reftype.Normal, "ref-key-2":reftype.Normal, +// "ref-key-3":reftype.Normal] +// The reference count is 3. +// +// deleted, err := Remove( +// ioctx, +// "my-reftracker", +// map[string]reftype.RefType{ +// "ref-key-1": reftype.Normal, +// "ref-key-2": reftype.Mask, +// }, +// ) +// +// "my-reftracker" now holds: +// ["ref-key-2":reftype.Mask, "ref-key-3":reftype.Normal] +// The reference count is 1. +// +// Since the reference count is greater than zero, `deleted` is `false`. +// "ref-key-1" was removed, and so is not listed among tracked references. +// "ref-key-2" was only masked, so it's been kept. However, masked references +// don't contribute to overall reference count, so the resulting refcount +// after this Remove() call is 1. +// +// created, err := Add( +// ioctx, +// "my-reftracker", +// map[string]struct{}{ +// "ref-key-2": {}, +// }, +// ) +// +// "my-reftracker" now holds: +// ["ref-key-2":reftype.Mask, "ref-key-3":reftype.Normal] +// The reference count is 1. +// +// "ref-key-2" is already tracked, so it will not be added again. Since it +// remains masked, it won't contribute to the reference count. +// +// deleted, err := Remove( +// ioctx, +// "my-reftracker", +// map[string]reftype.RefType{ +// "ref-key-3": reftype.Normal, +// }, +// ) +// +// "ref-key-3" was the only tracked key that contributed to reference count. +// After this Remove() call it's now removed. As a result, the reference count +// dropped down to zero, and the whole object has been deleted too. +// `deleted` is `true`. + +// Add atomically adds references to `rtName` reference tracker. +// If the reftracker object doesn't exist yet, it is created and `true` is +// returned. If some keys in `refs` map are already tracked by this reftracker +// object, they will not be added again. +func Add( + ioctx radoswrapper.IOContextW, + rtName string, + refs map[string]struct{}, +) (bool, error) { + if err := validateAddInput(rtName, refs); err != nil { + return false, err + } + + // Read reftracker version. + + rtVer, err := version.Read(ioctx, rtName) + if err != nil { + if goerrors.Is(err, rados.ErrNotFound) { + // This is a new reftracker. Initialize it with `refs`. + if err = v1.Init(ioctx, rtName, refs); err != nil { + return false, fmt.Errorf("failed to initialize reftracker: %w", err) + } + + return true, nil + } + + return false, fmt.Errorf("failed to read reftracker version: %w", err) + } + + // Add references to reftracker object. + + gen, err := ioctx.GetLastVersion() + if err != nil { + return false, fmt.Errorf("failed to get RADOS object version: %w", err) + } + + switch rtVer { + case v1.Version: + err = v1.Add(ioctx, rtName, gen, refs) + if err != nil { + err = fmt.Errorf("failed to add refs: %w", err) + } + default: + err = errors.UnknownObjectVersion(rtVer) + } + + return false, err +} + +// Remove atomically removes references from `rtName` reference tracker. +// If the reftracker object holds no references after this removal, the whole +// object is deleted too, and `true` is returned. If the reftracker object +// doesn't exist, (true, nil) is returned. +func Remove( + ioctx radoswrapper.IOContextW, + rtName string, + refs map[string]reftype.RefType, +) (bool, error) { + if err := validateRemoveInput(rtName, refs); err != nil { + return false, err + } + + // Read reftracker version. + + rtVer, err := version.Read(ioctx, rtName) + if err != nil { + if goerrors.Is(err, rados.ErrNotFound) { + // This reftracker doesn't exist. Assume it was already deleted. + return true, nil + } + + return false, fmt.Errorf("failed to read reftracker version: %w", err) + } + + // Remove references from reftracker. + + gen, err := ioctx.GetLastVersion() + if err != nil { + return false, fmt.Errorf("failed to get RADOS object version: %w", err) + } + + var deleted bool + + switch rtVer { + case v1.Version: + deleted, err = v1.Remove(ioctx, rtName, gen, refs) + if err != nil { + err = fmt.Errorf("failed to remove refs: %w", err) + } + default: + err = errors.UnknownObjectVersion(rtVer) + } + + return deleted, err +} + +var ( + errNoRTName = goerrors.New("missing reftracker name") + errNoRefs = goerrors.New("missing refs") +) + +func validateAddInput(rtName string, refs map[string]struct{}) error { + if rtName == "" { + return errNoRTName + } + + if len(refs) == 0 { + return errNoRefs + } + + return nil +} + +func validateRemoveInput(rtName string, refs map[string]reftype.RefType) error { + if rtName == "" { + return errNoRTName + } + + if len(refs) == 0 { + return errNoRefs + } + + return nil +} diff --git a/internal/util/reftracker/reftracker_test.go b/internal/util/reftracker/reftracker_test.go new file mode 100644 index 000000000..58a121d6d --- /dev/null +++ b/internal/util/reftracker/reftracker_test.go @@ -0,0 +1,491 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reftracker + +import ( + "testing" + + "github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper" + "github.com/ceph/ceph-csi/internal/util/reftracker/reftype" + + "github.com/stretchr/testify/assert" +) + +const rtName = "hello-rt" + +func TestRTAdd(t *testing.T) { + t.Parallel() + + // Verify input validation for reftracker name. + t.Run("AddNoName", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + created, err := Add(ioctx, "", nil) + assert.Error(ts, err) + assert.False(ts, created) + }) + + // Verify input validation for nil and empty refs. + t.Run("AddNoRefs", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + refs := []map[string]struct{}{ + nil, + make(map[string]struct{}), + } + for _, ref := range refs { + created, err := Add(ioctx, rtName, ref) + assert.Error(ts, err) + assert.False(ts, created) + } + }) + + // Add multiple refs in a single Add(). + t.Run("AddBulk", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + "ref3": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + }) + + // Add refs where each Add() has some of the refs overlapping + // with the previous call. + t.Run("AddOverlapping", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + + refsTable := []map[string]struct{}{ + {"ref2": {}, "ref3": {}}, + {"ref3": {}, "ref4": {}}, + {"ref4": {}, "ref5": {}}, + } + for _, refs := range refsTable { + created, err = Add(ioctx, rtName, refs) + assert.NoError(ts, err) + assert.False(ts, created) + } + }) +} + +func TestRTRemove(t *testing.T) { + t.Parallel() + + // Verify input validation for nil and empty refs. + t.Run("RemoveNoRefs", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + refs := []map[string]reftype.RefType{ + nil, + make(map[string]reftype.RefType), + } + for _, ref := range refs { + created, err := Remove(ioctx, rtName, ref) + assert.Error(ts, err) + assert.False(ts, created) + } + }) + + // Attempt to remove refs in a non-existent reftracker object should result + // in success, with deleted=true,err=nil. + t.Run("RemoveNotExists", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + deleted, err := Remove(ioctx, "xxx", map[string]reftype.RefType{ + "ref1": reftype.Normal, + }) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) + + // Removing only non-existent refs should not result in reftracker object + // deletion. + t.Run("RemoveNonExistentRefs", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + "ref3": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + + deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{ + "refX": reftype.Normal, + "refY": reftype.Normal, + "refZ": reftype.Normal, + }) + assert.NoError(ts, err) + assert.False(ts, deleted) + }) + + // Removing all refs plus some surplus should result in reftracker object + // deletion. + t.Run("RemoveNonExistentRefs", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + + deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{ + "refX": reftype.Normal, + "refY": reftype.Normal, + "ref": reftype.Normal, + "refZ": reftype.Normal, + }) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) + + // Bulk removal of all refs should result in reftracker object deletion. + t.Run("RemoveBulk", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + keys := []string{"ref1", "ref2", "ref3"} + refsToAdd := make(map[string]struct{}) + refsToRemove := make(map[string]reftype.RefType) + for _, k := range keys { + refsToAdd[k] = struct{}{} + refsToRemove[k] = reftype.Normal + } + + created, err := Add(ioctx, rtName, refsToAdd) + assert.NoError(ts, err) + assert.True(ts, created) + + deleted, err := Remove(ioctx, rtName, refsToRemove) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) + + // Removal of all refs one-by-one should result in reftracker object deletion + // in the last Remove() call. + t.Run("RemoveSingle", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + "ref3": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + + for _, k := range []string{"ref3", "ref2"} { + deleted, errRemove := Remove(ioctx, rtName, map[string]reftype.RefType{ + k: reftype.Normal, + }) + assert.NoError(ts, errRemove) + assert.False(ts, deleted) + } + + // Remove the last reference. It should remove the whole reftracker object too. + deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref1": reftype.Normal, + }) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) + + // Cycle through reftracker object twice. + t.Run("AddRemoveAddRemove", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + refsToAdd := map[string]struct{}{ + "ref1": {}, + "ref2": {}, + "ref3": {}, + } + refsToRemove := map[string]reftype.RefType{ + "ref1": reftype.Normal, + "ref2": reftype.Normal, + "ref3": reftype.Normal, + } + + for i := 0; i < 2; i++ { + created, err := Add(ioctx, rtName, refsToAdd) + assert.NoError(ts, err) + assert.True(ts, created) + + deleted, err := Remove(ioctx, rtName, refsToRemove) + assert.NoError(ts, err) + assert.True(ts, deleted) + } + }) + + // Check for respecting idempotency by making multiple additions with overlapping keys + // and removing only ref keys that were distinct. + t.Run("AddOverlappingRemoveBulk", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + }) + assert.True(ts, created) + assert.NoError(ts, err) + refsTable := []map[string]struct{}{ + {"ref2": {}, "ref3": {}}, + {"ref3": {}, "ref4": {}}, + {"ref4": {}, "ref5": {}}, + } + for _, refs := range refsTable { + created, err = Add(ioctx, rtName, refs) + assert.False(ts, created) + assert.NoError(ts, err) + } + + deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref1": reftype.Normal, + "ref2": reftype.Normal, + "ref3": reftype.Normal, + "ref4": reftype.Normal, + "ref5": reftype.Normal, + }) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) +} + +func TestRTMask(t *testing.T) { + t.Parallel() + + // Bulk masking all refs should result in reftracker object deletion. + t.Run("MaskAllBulk", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + keys := []string{"ref1", "ref2", "ref3"} + refsToAdd := make(map[string]struct{}) + refsToRemove := make(map[string]reftype.RefType) + for _, k := range keys { + refsToAdd[k] = struct{}{} + refsToRemove[k] = reftype.Mask + } + + created, err := Add(ioctx, rtName, refsToAdd) + assert.NoError(ts, err) + assert.True(ts, created) + + deleted, err := Remove(ioctx, rtName, refsToRemove) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) + + // Masking all refs one-by-one should result in reftracker object deletion in + // the last Remove() call. + t.Run("RemoveSingle", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + "ref3": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + + for _, k := range []string{"ref3", "ref2"} { + deleted, errRemove := Remove(ioctx, rtName, map[string]reftype.RefType{ + k: reftype.Mask, + }) + assert.NoError(ts, errRemove) + assert.False(ts, deleted) + } + + // Remove the last reference. It should delete the whole reftracker object + // too. + deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref1": reftype.Mask, + }) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) + + // Bulk removing two (out of 3) refs and then masking the ref that's left + // should result in reftracker object deletion in the last Remove() call. + t.Run("RemoveBulkMaskSingle", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + "ref3": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + + deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref1": reftype.Normal, + "ref2": reftype.Normal, + }) + assert.NoError(ts, err) + assert.False(ts, deleted) + + deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref3": reftype.Mask, + }) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) + + // Bulk masking two (out of 3) refs and then removing the ref that's left + // should result in reftracker object deletion in the last Remove() call. + t.Run("MaskSingleRemoveBulk", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + "ref3": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + + deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref1": reftype.Mask, + "ref2": reftype.Mask, + }) + assert.NoError(ts, err) + assert.False(ts, deleted) + + deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref3": reftype.Normal, + }) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) + + // Verify that masking refs hides them from future Add()s. + t.Run("MaskAndAdd", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + "ref3": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + + deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref1": reftype.Mask, + "ref2": reftype.Mask, + }) + assert.NoError(ts, err) + assert.False(ts, deleted) + + created, err = Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + }) + assert.NoError(ts, err) + assert.False(ts, created) + + deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref3": reftype.Normal, + }) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) + + // Verify that masked refs may be removed with reftype.Normal and re-added. + t.Run("MaskRemoveAdd", func(ts *testing.T) { + ts.Parallel() + + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + + created, err := Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + "ref3": {}, + }) + assert.NoError(ts, err) + assert.True(ts, created) + + deleted, err := Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref1": reftype.Mask, + "ref2": reftype.Mask, + }) + assert.NoError(ts, err) + assert.False(ts, deleted) + + deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref1": reftype.Normal, + "ref2": reftype.Normal, + }) + assert.NoError(ts, err) + assert.False(ts, deleted) + + created, err = Add(ioctx, rtName, map[string]struct{}{ + "ref1": {}, + "ref2": {}, + }) + assert.NoError(ts, err) + assert.False(ts, created) + + deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref3": reftype.Normal, + }) + assert.NoError(ts, err) + assert.False(ts, deleted) + + deleted, err = Remove(ioctx, rtName, map[string]reftype.RefType{ + "ref1": reftype.Normal, + "ref2": reftype.Normal, + }) + assert.NoError(ts, err) + assert.True(ts, deleted) + }) +} diff --git a/internal/util/reftracker/reftype/reftype.go b/internal/util/reftracker/reftype/reftype.go new file mode 100644 index 000000000..e2d6d1587 --- /dev/null +++ b/internal/util/reftracker/reftype/reftype.go @@ -0,0 +1,63 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reftype + +import ( + "fmt" + + "github.com/ceph/ceph-csi/internal/util/reftracker/errors" +) + +// RefType describes type of the reftracker reference. +type RefType int8 + +const ( + refTypeSize = 1 + + // Unknown reftype used to signal error state. + Unknown RefType = 0 + + // Normal type tags the reference to have normal effect on the reference + // count. Adding Normal reference increments the reference count. Removing + // Normal reference decrements the reference count. + // + // It may be converted to a Mask if it is removed with Mask reftype. + Normal RefType = 1 + + // Mask type tags the reference to be masked, making it not contribute to the + // overall reference count. The reference will be ignored by all future Add() + // calls until it is removed with Normal reftype. + Mask RefType = 2 +) + +func ToBytes(t RefType) []byte { + return []byte{byte(t)} +} + +func FromBytes(bs []byte) (RefType, error) { + if len(bs) != refTypeSize { + return Unknown, errors.UnexpectedReadSize(refTypeSize, len(bs)) + } + + num := RefType(bs[0]) + switch num { // nolint:exhaustive // reftype.Unknown is handled in default case. + case Normal, Mask: + return num, nil + default: + return Unknown, fmt.Errorf("unknown reftype %d", num) + } +} diff --git a/internal/util/reftracker/reftype/reftype_test.go b/internal/util/reftracker/reftype/reftype_test.go new file mode 100644 index 000000000..88d25d3a2 --- /dev/null +++ b/internal/util/reftracker/reftype/reftype_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package reftype + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestRefTypeBytes(t *testing.T) { + t.Parallel() + + var ( + refTypeNormalBytes = []byte{1} + refTypeMaskBytes = []byte{2} + + expectedBytes = [][]byte{refTypeNormalBytes, refTypeMaskBytes} + refTypes = []RefType{Normal, Mask} + + refTypeInvalidBytes = []byte{0xFF} + refTypeWrongSizeBytes = []byte{0, 0, 0, 0, 1} + ) + + t.Run("ToBytes", func(ts *testing.T) { + ts.Parallel() + + for i := range expectedBytes { + bs := ToBytes(refTypes[i]) + assert.Equal(ts, expectedBytes[i], bs) + } + }) + + t.Run("FromBytes", func(ts *testing.T) { + ts.Parallel() + + for i := range refTypes { + refType, err := FromBytes(expectedBytes[i]) + assert.NoError(ts, err) + assert.Equal(ts, refTypes[i], refType) + } + + _, err := FromBytes(refTypeInvalidBytes) + assert.Error(ts, err) + + _, err = FromBytes(refTypeWrongSizeBytes) + assert.Error(ts, err) + }) +} diff --git a/internal/util/reftracker/v1/refcount.go b/internal/util/reftracker/v1/refcount.go new file mode 100644 index 000000000..9fb5bd1e1 --- /dev/null +++ b/internal/util/reftracker/v1/refcount.go @@ -0,0 +1,47 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "encoding/binary" + + "github.com/ceph/ceph-csi/internal/util/reftracker/errors" +) + +// Represents the number of references a reftracker object holds. +type refCount uint32 + +const ( + Version = 1 + + refCountSize = 4 +) + +func (rc refCount) toBytes() []byte { + bs := make([]byte, refCountSize) + binary.BigEndian.PutUint32(bs, uint32(rc)) + + return bs +} + +func refCountFromBytes(bs []byte) (refCount, error) { + if len(bs) != refCountSize { + return 0, errors.UnexpectedReadSize(refCountSize, len(bs)) + } + + return refCount(binary.BigEndian.Uint32(bs)), nil +} diff --git a/internal/util/reftracker/v1/refcount_test.go b/internal/util/reftracker/v1/refcount_test.go new file mode 100644 index 000000000..b3e7252db --- /dev/null +++ b/internal/util/reftracker/v1/refcount_test.go @@ -0,0 +1,51 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestV1RefCountBytes(t *testing.T) { + t.Parallel() + + var ( + refCountBytes = []byte{0x0, 0x0, 0x0, 0x7B} + refCountValue = refCount(123) + wrongSizeRefCountBytes = []byte{0, 0, 1} + ) + + t.Run("ToBytes", func(ts *testing.T) { + ts.Parallel() + + bs := refCountValue.toBytes() + assert.Equal(ts, refCountBytes, bs) + }) + + t.Run("FromBytes", func(ts *testing.T) { + ts.Parallel() + + rc, err := refCountFromBytes(refCountBytes) + assert.NoError(ts, err) + assert.Equal(ts, refCountValue, rc) + + _, err = refCountFromBytes(wrongSizeRefCountBytes) + assert.Error(ts, err) + }) +} diff --git a/internal/util/reftracker/v1/v1.go b/internal/util/reftracker/v1/v1.go new file mode 100644 index 000000000..bebeadbff --- /dev/null +++ b/internal/util/reftracker/v1/v1.go @@ -0,0 +1,314 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + goerrors "errors" + "fmt" + + "github.com/ceph/ceph-csi/internal/util/reftracker/errors" + "github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper" + "github.com/ceph/ceph-csi/internal/util/reftracker/reftype" + "github.com/ceph/ceph-csi/internal/util/reftracker/version" + + "github.com/ceph/go-ceph/rados" +) + +/* + +Version 1 layout: +----------------- + +If not specified otherwise, all values are stored in big-endian order. + + byte idx type name + -------- ------ ------ + 0 .. 3 uint32 refcount + + `refcount`: Number of references held by the reftracker object. The actual + reference keys are stored in an OMap of the RADOS object. + + OMap entry layout: + + Key: + + reftracker key. + + Value: + + byte idx type name + -------- ------ ------ + 0 .. 3 uint32 type + + `type`: reference type defined in reftracker/reftype. + +*/ + +type readResult struct { + // Total number of references held by the reftracker object. + total refCount + // Refs whose keys matched the request. + foundRefs map[string]reftype.RefType +} + +// Atomically initializes a new reftracker object. +func Init( + ioctx radoswrapper.IOContextW, + rtName string, + refs map[string]struct{}, +) error { + // Prepare refcount and OMap key-value pairs. + + refsToAddBytes := make(map[string][]byte, len(refs)) + + for ref := range refs { + refsToAddBytes[ref] = reftype.ToBytes(reftype.Normal) + } + + // Perform the write. + + w := ioctx.CreateWriteOp() + defer w.Release() + + w.Create(rados.CreateExclusive) + w.SetXattr(version.XattrName, version.ToBytes(Version)) + w.SetOmap(refsToAddBytes) + w.WriteFull(refCount(len(refsToAddBytes)).toBytes()) + + return errors.FailedObjectWrite(w.Operate(rtName)) +} + +// Atomically adds refs to an existing reftracker object. +func Add( + ioctx radoswrapper.IOContextW, + rtName string, + gen uint64, + refs map[string]struct{}, +) error { + // Read the reftracker object to figure out which refs to add. + + readRes, err := readObjectByKeys(ioctx, rtName, gen, refsMapToKeysSlice(refs)) + if err != nil { + return errors.FailedObjectRead(err) + } + + // Build list of refs to add. + // Add only refs that are missing in the reftracker object. + + refsToAdd := make(map[string][]byte) + + for ref := range refs { + if _, found := readRes.foundRefs[ref]; !found { + refsToAdd[ref] = reftype.ToBytes(reftype.Normal) + } + } + + if len(refsToAdd) == 0 { + // Nothing to do. + return nil + } + + // Calculate new refcount. + + rcToAdd := refCount(len(refsToAdd)) + newRC := readRes.total + rcToAdd + + if newRC < readRes.total { + return goerrors.New("addition would overflow uint32 refcount") + } + + // Write the data. + + w := ioctx.CreateWriteOp() + defer w.Release() + + w.AssertVersion(gen) + w.WriteFull(newRC.toBytes()) + w.SetOmap(refsToAdd) + + return errors.FailedObjectWrite(w.Operate(rtName)) +} + +// Atomically removes refs from reftracker object. If the object wouldn't hold +// any references after the removal, the whole object is deleted instead. +func Remove( + ioctx radoswrapper.IOContextW, + rtName string, + gen uint64, + refs map[string]reftype.RefType, +) (bool, error) { + // Read the reftracker object to figure out which refs to remove. + + readRes, err := readObjectByKeys(ioctx, rtName, gen, typedRefsMapToKeysSlice(refs)) + if err != nil { + return false, errors.FailedObjectRead(err) + } + + // Build lists of refs to remove, replace, and add. + // There are three cases that need to be handled: + // (1) removing reftype.Normal refs, + // (2) converting refs that were reftype.Normal into reftype.Mask, + // (3) adding a new reftype.Mask key. + + var ( + refsToRemove []string + refsToSet = make(map[string][]byte) + rcToSubtract refCount + ) + + for ref, refType := range refs { + if matchedRefType, found := readRes.foundRefs[ref]; found { + if refType == reftype.Normal { + // Case (1): regular removal of Normal ref. + refsToRemove = append(refsToRemove, ref) + if matchedRefType == reftype.Normal { + // If matchedRef was reftype.Mask, it would have already been + // subtracted from the refcount. + rcToSubtract++ + } + } else if refType == reftype.Mask && matchedRefType == reftype.Normal { + // Case (2): convert Normal ref to Mask. + // Since this ref is now reftype.Mask, rcToSubtract needs to be adjusted + // too -- so that this ref is not counted in. + refsToSet[ref] = reftype.ToBytes(reftype.Mask) + rcToSubtract++ + } + } else { + if refType == reftype.Mask { + // Case (3): add a new Mask ref. + // reftype.Mask doesn't contribute refcount so no change to rcToSubtract. + refsToSet[ref] = reftype.ToBytes(reftype.Mask) + } // else: No such ref was found, so there's nothing to remove. + } + } + + if len(refsToRemove) == 0 && len(refsToSet) == 0 { + // Nothing to do. + return false, nil + } + + // Calculate new refcount. + + if rcToSubtract > readRes.total { + // BUG: this should never happen! + return false, fmt.Errorf("refcount underflow, reftracker object corrupted") + } + + newRC := readRes.total - rcToSubtract + // If newRC is zero, it means all refs that the reftracker object held will be + // now gone, and the object must be deleted. + deleted := newRC == 0 + + // Write the data. + + w := ioctx.CreateWriteOp() + defer w.Release() + + w.AssertVersion(gen) + + if deleted { + w.Remove() + } else { + w.WriteFull(newRC.toBytes()) + w.RmOmapKeys(refsToRemove) + w.SetOmap(refsToSet) + } + + if err := w.Operate(rtName); err != nil { + return false, errors.FailedObjectWrite(err) + } + + return deleted, nil +} + +// Tries to find `keys` in reftracker object and returns the result. Failing to +// find any particular key does not result in an error. +func readObjectByKeys( + ioctx radoswrapper.IOContextW, + rtName string, + gen uint64, + keys []string, +) (*readResult, error) { + // Read data from object. + + rcBytes := make([]byte, refCountSize) + + r := ioctx.CreateReadOp() + defer r.Release() + + r.AssertVersion(gen) + r.Read(0, rcBytes) + s := r.GetOmapValuesByKeys(keys) + + if err := r.Operate(rtName); err != nil { + return nil, errors.TryRADOSAborted(err) + } + + // Convert it from byte slices to type-safe values. + + var ( + rc refCount + refs = make(map[string]reftype.RefType) + err error + ) + + rc, err = refCountFromBytes(rcBytes) + if err != nil { + return nil, fmt.Errorf("failed to parse refcount: %w", err) + } + + for { + kvPair, err := s.Next() + if err != nil { + return nil, fmt.Errorf("failed to iterate over OMap: %w", err) + } + + if kvPair == nil { + break + } + + refType, err := reftype.FromBytes(kvPair.Value) + if err != nil { + return nil, fmt.Errorf("failed to parse reftype: %w", err) + } + + refs[kvPair.Key] = refType + } + + return &readResult{ + total: rc, + foundRefs: refs, + }, nil +} + +func refsMapToKeysSlice(m map[string]struct{}) []string { + s := make([]string, 0, len(m)) + for k := range m { + s = append(s, k) + } + + return s +} + +func typedRefsMapToKeysSlice(m map[string]reftype.RefType) []string { + s := make([]string, 0, len(m)) + for k := range m { + s = append(s, k) + } + + return s +} diff --git a/internal/util/reftracker/v1/v1_test.go b/internal/util/reftracker/v1/v1_test.go new file mode 100644 index 000000000..4466ea101 --- /dev/null +++ b/internal/util/reftracker/v1/v1_test.go @@ -0,0 +1,423 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + goerrors "errors" + "testing" + + "github.com/ceph/ceph-csi/internal/util/reftracker/errors" + "github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper" + "github.com/ceph/ceph-csi/internal/util/reftracker/reftype" + + "github.com/stretchr/testify/assert" +) + +func TestV1Read(t *testing.T) { + t.Parallel() + + const rtName = "hello-rt" + + var ( + gen = uint64(0) + + validObj = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: { + Oid: rtName, + Data: []byte{0, 0, 0, 0}, + Omap: make(map[string][]byte), + }, + }, + }) + + invalidObjs = []*radoswrapper.FakeIOContext{ + // Missing object. + radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()), + // Bad generation number. + radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: { + Ver: 123, + Oid: rtName, + Data: []byte{0, 0, 0, 0}, + }, + }, + }), + // Refcount overflow. + radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: { + Oid: rtName, + Data: []byte{0xFF, 0xFF, 0xFF, 0xFF}, + }, + }, + }), + } + + refsToAdd = map[string]struct{}{"ref1": {}} + ) + + err := Add(validObj, rtName, gen, refsToAdd) + assert.NoError(t, err) + + for i := range invalidObjs { + err = Add(invalidObjs[i], rtName, gen, refsToAdd) + assert.Error(t, err) + } + + // Check for correct error type for wrong gen num. + err = Add(invalidObjs[1], rtName, gen, refsToAdd) + assert.Error(t, err) + assert.True(t, goerrors.Is(err, errors.ErrObjectOutOfDate)) +} + +func TestV1Init(t *testing.T) { + t.Parallel() + + const rtName = "hello-rt" + + var ( + emptyRados = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{}, + }) + + alreadyExists = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: {}, + }, + }) + + refsToInit = map[string]struct{}{"ref1": {}} + ) + + err := Init(emptyRados, rtName, refsToInit) + assert.NoError(t, err) + + err = Init(alreadyExists, rtName, refsToInit) + assert.Error(t, err) +} + +func TestV1Add(t *testing.T) { + t.Parallel() + + const rtName = "hello-rt" + + var ( + shouldSucceed = []struct { + before *radoswrapper.FakeObj + refsToAdd map[string]struct{} + after *radoswrapper.FakeObj + }{ + // Add a new ref. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + refsToAdd: map[string]struct{}{ + "ref2": {}, + }, + after: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 1, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + "ref2": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(2).toBytes(), + }, + }, + // Try to add a ref that's already tracked. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + refsToAdd: map[string]struct{}{ + "ref1": {}, + }, + after: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + }, + // Try to add a ref that's masked. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + "ref2": reftype.ToBytes(reftype.Mask), + }, + Data: refCount(1).toBytes(), + }, + refsToAdd: map[string]struct{}{ + "ref1": {}, + }, + after: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + "ref2": reftype.ToBytes(reftype.Mask), + }, + Data: refCount(1).toBytes(), + }, + }, + } + + shouldFail = []*radoswrapper.FakeIOContext{ + // Missing object. + radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()), + // Bad generation number. + radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: { + Ver: 123, + Oid: rtName, + Data: []byte{0, 0, 0, 0}, + }, + }, + }), + // Refcount overflow. + radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: { + Oid: rtName, + Data: []byte{0xFF, 0xFF, 0xFF, 0xFF}, + }, + }, + }), + } + ) + + for i := range shouldSucceed { + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + ioctx.Rados.Objs[rtName] = shouldSucceed[i].before + + err := Add(ioctx, rtName, 0, shouldSucceed[i].refsToAdd) + assert.NoError(t, err) + assert.Equal(t, shouldSucceed[i].after, ioctx.Rados.Objs[rtName]) + } + + for i := range shouldFail { + err := Add(shouldFail[i], rtName, 0, map[string]struct{}{"ref1": {}}) + assert.Error(t, err) + } + + // Check for correct error type for wrong gen num. + err := Add(shouldFail[1], rtName, 0, map[string]struct{}{"ref1": {}}) + assert.Error(t, err) + assert.True(t, goerrors.Is(err, errors.ErrObjectOutOfDate)) +} + +func TestV1Remove(t *testing.T) { + t.Parallel() + + const rtName = "hello-rt" + + var ( + shouldSucceed = []struct { + before *radoswrapper.FakeObj + refsToRemove map[string]reftype.RefType + after *radoswrapper.FakeObj + deleted bool + }{ + // Remove without deleting the reftracker object. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + "ref2": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(2).toBytes(), + }, + refsToRemove: map[string]reftype.RefType{ + "ref1": reftype.Normal, + }, + after: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 1, + Omap: map[string][]byte{ + "ref2": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + deleted: false, + }, + // Remove and delete the reftracker object. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + refsToRemove: map[string]reftype.RefType{ + "ref1": reftype.Normal, + }, + after: nil, + deleted: true, + }, + // Remove and delete the reftracker object. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + refsToRemove: map[string]reftype.RefType{ + "ref1": reftype.Normal, + }, + after: nil, + deleted: true, + }, + // Mask a ref without deleting reftracker object. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + "ref2": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(2).toBytes(), + }, + refsToRemove: map[string]reftype.RefType{ + "ref2": reftype.Mask, + }, + after: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 1, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + "ref2": reftype.ToBytes(reftype.Mask), + }, + Data: refCount(1).toBytes(), + }, + deleted: false, + }, + // Mask a ref and delete reftracker object. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + refsToRemove: map[string]reftype.RefType{ + "ref1": reftype.Mask, + }, + after: nil, + deleted: true, + }, + // Add a masking ref. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + refsToRemove: map[string]reftype.RefType{ + "ref2": reftype.Mask, + }, + after: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 1, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + "ref2": reftype.ToBytes(reftype.Mask), + }, + Data: refCount(1).toBytes(), + }, + deleted: false, + }, + // Try to remove non-existent ref. + { + before: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + refsToRemove: map[string]reftype.RefType{ + "ref2": reftype.Normal, + }, + after: &radoswrapper.FakeObj{ + Oid: rtName, + Ver: 0, + Omap: map[string][]byte{ + "ref1": reftype.ToBytes(reftype.Normal), + }, + Data: refCount(1).toBytes(), + }, + deleted: false, + }, + } + + // Bad generation number. + badGen = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: { + Ver: 123, + }, + }, + }) + ) + + for i := range shouldSucceed { + ioctx := radoswrapper.NewFakeIOContext(radoswrapper.NewFakeRados()) + ioctx.Rados.Objs[rtName] = shouldSucceed[i].before + + deleted, err := Remove(ioctx, rtName, 0, shouldSucceed[i].refsToRemove) + assert.NoError(t, err) + assert.Equal(t, shouldSucceed[i].deleted, deleted) + assert.Equal(t, shouldSucceed[i].after, ioctx.Rados.Objs[rtName]) + } + + _, err := Remove(badGen, rtName, 0, map[string]reftype.RefType{"ref": reftype.Normal}) + assert.Error(t, err) + assert.True(t, goerrors.Is(err, errors.ErrObjectOutOfDate)) +} diff --git a/internal/util/reftracker/version/version.go b/internal/util/reftracker/version/version.go new file mode 100644 index 000000000..600e9f8a3 --- /dev/null +++ b/internal/util/reftracker/version/version.go @@ -0,0 +1,64 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package version + +import ( + "encoding/binary" + + "github.com/ceph/ceph-csi/internal/util/reftracker/errors" + "github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper" +) + +// reftracker objects are versioned, should the object layout need to change. +// Version is stored in its underlying RADOS object xattr as uint32. + +const ( + // Name of the xattr entry in the RADOS object. + XattrName = "csi.ceph.com/rt-version" + + // SizeBytes is the size of version in bytes. + SizeBytes = 4 +) + +func ToBytes(v uint32) []byte { + bs := make([]byte, SizeBytes) + binary.BigEndian.PutUint32(bs, v) + + return bs +} + +func FromBytes(bs []byte) (uint32, error) { + if len(bs) != SizeBytes { + return 0, errors.UnexpectedReadSize(SizeBytes, len(bs)) + } + + return binary.BigEndian.Uint32(bs), nil +} + +func Read(ioctx radoswrapper.IOContextW, rtName string) (uint32, error) { + verBytes := make([]byte, SizeBytes) + readSize, err := ioctx.GetXattr(rtName, XattrName, verBytes) + if err != nil { + return 0, err + } + + if readSize != SizeBytes { + return 0, errors.UnexpectedReadSize(SizeBytes, readSize) + } + + return FromBytes(verBytes) +} diff --git a/internal/util/reftracker/version/version_test.go b/internal/util/reftracker/version/version_test.go new file mode 100644 index 000000000..d48e10182 --- /dev/null +++ b/internal/util/reftracker/version/version_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2022 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package version + +import ( + "testing" + + "github.com/ceph/ceph-csi/internal/util/reftracker/radoswrapper" + + "github.com/stretchr/testify/assert" +) + +var ( + v1Bytes = []byte{0, 0, 0, 1} + v1Value = uint32(1) + + wrongSizeVersionBytes = []byte{0, 0, 1} +) + +func TestVersionBytes(t *testing.T) { + t.Parallel() + + t.Run("ToBytes", func(ts *testing.T) { + ts.Parallel() + + bs := ToBytes(v1Value) + assert.Equal(ts, v1Bytes, bs) + }) + + t.Run("FromBytes", func(ts *testing.T) { + ts.Parallel() + + ver, err := FromBytes(v1Bytes) + assert.NoError(ts, err) + assert.Equal(ts, v1Value, ver) + + _, err = FromBytes(wrongSizeVersionBytes) + assert.Error(ts, err) + }) +} + +func TestVersionRead(t *testing.T) { + t.Parallel() + + const rtName = "hello-rt" + + var ( + validObj = radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: { + Oid: rtName, + Xattrs: map[string][]byte{ + XattrName: v1Bytes, + }, + }, + }, + }) + + invalidObjs = []*radoswrapper.FakeIOContext{ + // Missing object. + radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{}, + }), + // Missing xattr. + radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: { + Oid: rtName, + Xattrs: map[string][]byte{ + "some-other-xattr": v1Bytes, + }, + }, + }, + }), + // Wrongly sized version value. + radoswrapper.NewFakeIOContext(&radoswrapper.FakeRados{ + Objs: map[string]*radoswrapper.FakeObj{ + rtName: { + Oid: rtName, + Xattrs: map[string][]byte{ + XattrName: wrongSizeVersionBytes, + }, + }, + }, + }), + } + ) + + ver, err := Read(validObj, rtName) + assert.NoError(t, err) + assert.Equal(t, v1Value, ver) + + for i := range invalidObjs { + _, err = Read(invalidObjs[i], rtName) + assert.Error(t, err) + } +}