rebase: bump github.com/ceph/go-ceph from 0.13.0 to 0.14.0

Bumps [github.com/ceph/go-ceph](https://github.com/ceph/go-ceph) from 0.13.0 to 0.14.0.
- [Release notes](https://github.com/ceph/go-ceph/releases)
- [Changelog](https://github.com/ceph/go-ceph/blob/master/docs/release-process.md)
- [Commits](https://github.com/ceph/go-ceph/compare/v0.13.0...v0.14.0)

---
updated-dependencies:
- dependency-name: github.com/ceph/go-ceph
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot] 2022-02-21 20:13:09 +00:00 committed by mergify[bot]
parent 3922cfc860
commit cd83c7be48
18 changed files with 832 additions and 68 deletions

2
go.mod
View File

@ -6,7 +6,7 @@ require (
github.com/IBM/keyprotect-go-client v0.7.0
github.com/aws/aws-sdk-go v1.42.53
github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000
github.com/ceph/go-ceph v0.13.0
github.com/ceph/go-ceph v0.14.0
github.com/container-storage-interface/spec v1.5.0
github.com/csi-addons/replication-lib-utils v0.2.0
github.com/csi-addons/spec v0.1.2-0.20211220115741-32fa508dadbe

4
go.sum
View File

@ -168,8 +168,8 @@ github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f/go.mod h1:C0rtzmGXgN78pYR0tGJFhtHgkbAs0lIbHwkB81VxDQE=
github.com/ceph/go-ceph v0.13.0 h1:69dgIPlNHD2OCz98T0benI4++vcnShGcpQK4RIALjw4=
github.com/ceph/go-ceph v0.13.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY=
github.com/ceph/go-ceph v0.14.0 h1:sJoT0au7NT3TPmDWf5W9w6tZy0U/5xZrIXVVauZR+Xo=
github.com/ceph/go-ceph v0.14.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=

View File

@ -1,40 +1,31 @@
package admin
import (
"github.com/ceph/go-ceph/internal/commands"
"github.com/ceph/go-ceph/common/admin/manager"
)
const mirroring = "mirroring"
// EnableModule will enable the specified manager module.
//
// Deprecated: use the equivalent function in cluster/admin/manager.
//
// Similar To:
// ceph mgr module enable <module> [--force]
func (fsa *FSAdmin) EnableModule(module string, force bool) error {
m := map[string]string{
"prefix": "mgr module enable",
"module": module,
"format": "json",
}
if force {
m["force"] = "--force"
}
// Why is this _only_ part of the mon command json? You'd think a mgr
// command would be available as a MgrCommand but I couldn't figure it out.
return commands.MarshalMonCommand(fsa.conn, m).NoData().End()
mgradmin := manager.NewFromConn(fsa.conn)
return mgradmin.EnableModule(module, force)
}
// DisableModule will disable the specified manager module.
//
// Deprecated: use the equivalent function in cluster/admin/manager.
//
// Similar To:
// ceph mgr module disable <module>
func (fsa *FSAdmin) DisableModule(module string) error {
m := map[string]string{
"prefix": "mgr module disable",
"module": module,
"format": "json",
}
return commands.MarshalMonCommand(fsa.conn, m).NoData().End()
mgradmin := manager.NewFromConn(fsa.conn)
return mgradmin.DisableModule(module)
}
// EnableMirroringModule will enable the mirroring module for cephfs.
@ -42,7 +33,8 @@ func (fsa *FSAdmin) DisableModule(module string) error {
// Similar To:
// ceph mgr module enable mirroring [--force]
func (fsa *FSAdmin) EnableMirroringModule(force bool) error {
return fsa.EnableModule(mirroring, force)
mgradmin := manager.NewFromConn(fsa.conn)
return mgradmin.EnableModule(mirroring, force)
}
// DisableMirroringModule will disable the mirroring module for cephfs.
@ -50,34 +42,6 @@ func (fsa *FSAdmin) EnableMirroringModule(force bool) error {
// Similar To:
// ceph mgr module disable mirroring
func (fsa *FSAdmin) DisableMirroringModule() error {
return fsa.DisableModule(mirroring)
}
type moduleInfo struct {
EnabledModules []string `json:"enabled_modules"`
//DisabledModules []string `json:"disabled_modules"`
// DisabledModules is documented in ceph as a list of string
// but that's not what comes back from the server (on pacific).
// Since we don't need this today, we're just going to ignore
// it, but if we ever want to support this for external consumers
// we'll need to figure out the real structure of this.
}
func parseModuleInfo(res response) (*moduleInfo, error) {
m := &moduleInfo{}
if err := res.NoStatus().Unmarshal(m).End(); err != nil {
return nil, err
}
return m, nil
}
// listModules returns moduleInfo or error. it is not exported because
// this is really not a cephfs specific thing but we needed it
// for cephfs tests. maybe lift it somewhere else someday.
func (fsa *FSAdmin) listModules() (*moduleInfo, error) {
m := map[string]string{
"prefix": "mgr module ls",
"format": "json",
}
return parseModuleInfo(commands.MarshalMonCommand(fsa.conn, m))
mgradmin := manager.NewFromConn(fsa.conn)
return mgradmin.DisableModule(mirroring)
}

View File

@ -0,0 +1,17 @@
package manager
import (
ccom "github.com/ceph/go-ceph/common/commands"
)
// MgrAdmin is used to administrate ceph's manager (mgr).
type MgrAdmin struct {
conn ccom.RadosCommander
}
// NewFromConn creates an new management object from a preexisting
// rados connection. The existing connection can be rados.Conn or any
// type implementing the RadosCommander interface.
func NewFromConn(conn ccom.RadosCommander) *MgrAdmin {
return &MgrAdmin{conn}
}

View File

@ -0,0 +1,5 @@
/*
Package manager from common/admin contains a set of APIs used to interact
with and administer the Ceph manager (mgr).
*/
package manager

View File

@ -0,0 +1,75 @@
package manager
import (
"github.com/ceph/go-ceph/internal/commands"
)
// EnableModule will enable the specified manager module.
//
// Similar To:
// ceph mgr module enable <module> [--force]
func (fsa *MgrAdmin) EnableModule(module string, force bool) error {
m := map[string]string{
"prefix": "mgr module enable",
"module": module,
"format": "json",
}
if force {
m["force"] = "--force"
}
// Why is this _only_ part of the mon command json? You'd think a mgr
// command would be available as a MgrCommand but I couldn't figure it out.
return commands.MarshalMonCommand(fsa.conn, m).NoData().End()
}
// DisableModule will disable the specified manager module.
//
// Similar To:
// ceph mgr module disable <module>
func (fsa *MgrAdmin) DisableModule(module string) error {
m := map[string]string{
"prefix": "mgr module disable",
"module": module,
"format": "json",
}
return commands.MarshalMonCommand(fsa.conn, m).NoData().End()
}
// DisabledModule describes a disabled Ceph mgr module.
// The Ceph JSON structure contains a complex module_options
// substructure that go-ceph does not currently implement.
type DisabledModule struct {
Name string `json:"name"`
CanRun bool `json:"can_run"`
ErrorString string `json:"error_string"`
}
// ModuleInfo contains fields that report the status of modules within the
// ceph mgr.
type ModuleInfo struct {
// EnabledModules lists the names of the enabled modules.
EnabledModules []string `json:"enabled_modules"`
// AlwaysOnModules lists the names of the always-on modules.
AlwaysOnModules []string `json:"always_on_modules"`
// DisabledModules lists structures describing modules that are
// not currently enabled.
DisabledModules []DisabledModule `json:"disabled_modules"`
}
func parseModuleInfo(res commands.Response) (*ModuleInfo, error) {
m := &ModuleInfo{}
if err := res.NoStatus().Unmarshal(m).End(); err != nil {
return nil, err
}
return m, nil
}
// ListModules returns a module info struct reporting the lists of
// enabled, disabled, and always-on modules in the Ceph mgr.
func (fsa *MgrAdmin) ListModules() (*ModuleInfo, error) {
m := map[string]string{
"prefix": "mgr module ls",
"format": "json",
}
return parseModuleInfo(commands.MarshalMonCommand(fsa.conn, m))
}

View File

@ -26,4 +26,4 @@ func (v *SyncBuffer) Release() {
// Sync asserts that changes in the C buffer are available in the data
// slice
func (v *SyncBuffer) Sync() {}
func (*SyncBuffer) Sync() {}

View File

@ -0,0 +1,22 @@
//go:build ceph_preview
// +build ceph_preview
package rados
// #cgo LDFLAGS: -lrados
// #include <rados/librados.h>
// #include <stdlib.h>
//
import "C"
// AssertVersion ensures that the object exists and that its internal version
// number is equal to "ver" before reading. "ver" should be a version number
// previously obtained with IOContext.GetLastVersion().
// PREVIEW
//
// Implements:
// void rados_read_op_assert_version(rados_read_op_t read_op,
// uint64_t ver)
func (r *ReadOp) AssertVersion(ver uint64) {
C.rados_read_op_assert_version(r.op, C.uint64_t(ver))
}

View File

@ -0,0 +1,22 @@
//go:build ceph_preview
// +build ceph_preview
package rados
// #cgo LDFLAGS: -lrados
// #include <rados/librados.h>
// #include <stdlib.h>
//
import "C"
// AssertVersion ensures that the object exists and that its internal version
// number is equal to "ver" before writing. "ver" should be a version number
// previously obtained with IOContext.GetLastVersion().
// PREVIEW
//
// Implements:
// void rados_read_op_assert_version(rados_read_op_t read_op,
// uint64_t ver)
func (w *WriteOp) AssertVersion(ver uint64) {
C.rados_write_op_assert_version(w.op, C.uint64_t(ver))
}

View File

@ -0,0 +1,19 @@
//go:build ceph_preview
// +build ceph_preview
package rados
// #cgo LDFLAGS: -lrados
// #include <rados/librados.h>
// #include <stdlib.h>
//
import "C"
// Remove object.
// PREVIEW
//
// Implements:
// void rados_write_op_remove(rados_write_op_t write_op)
func (w *WriteOp) Remove() {
C.rados_write_op_remove(w.op)
}

View File

@ -0,0 +1,34 @@
//go:build ceph_preview
// +build ceph_preview
package rados
// #cgo LDFLAGS: -lrados
// #include <rados/librados.h>
// #include <stdlib.h>
//
import "C"
import (
"unsafe"
)
// SetXattr sets an xattr.
// PREVIEW
//
// Implements:
// void rados_write_op_setxattr(rados_write_op_t write_op,
// const char * name,
// const char * value,
// size_t value_len)
func (w *WriteOp) SetXattr(name string, value []byte) {
cName := C.CString(name)
defer C.free(unsafe.Pointer(cName))
C.rados_write_op_setxattr(
w.op,
cName,
(*C.char)(unsafe.Pointer(&value[0])),
C.size_t(len(value)),
)
}

View File

@ -0,0 +1,120 @@
//go:build ceph_preview
// +build ceph_preview
package rados
// #cgo LDFLAGS: -lrados
// #include <rados/librados.h>
// #include <stdlib.h>
//
import "C"
import (
"unsafe"
)
// ReadOpOmapGetValsByKeysStep holds the result of the
// GetOmapValuesByKeys read operation.
// Result is valid only after Operate() was called.
type ReadOpOmapGetValsByKeysStep struct {
// C arguments
iter C.rados_omap_iter_t
prval *C.int
// Internal state
// canIterate is only set after the operation is performed and is
// intended to prevent premature fetching of data.
canIterate bool
}
func newReadOpOmapGetValsByKeysStep() *ReadOpOmapGetValsByKeysStep {
s := &ReadOpOmapGetValsByKeysStep{
prval: (*C.int)(C.malloc(C.sizeof_int)),
}
return s
}
func (s *ReadOpOmapGetValsByKeysStep) free() {
s.canIterate = false
C.rados_omap_get_end(s.iter)
C.free(unsafe.Pointer(s.prval))
s.prval = nil
}
func (s *ReadOpOmapGetValsByKeysStep) update() error {
err := getError(*s.prval)
s.canIterate = (err == nil)
return err
}
// Next gets the next omap key/value pair referenced by
// ReadOpOmapGetValsByKeysStep's internal iterator.
// If there are no more elements to retrieve, (nil, nil) is returned.
// May be called only after Operate() finished.
// PREVIEW
func (s *ReadOpOmapGetValsByKeysStep) Next() (*OmapKeyValue, error) {
if !s.canIterate {
return nil, ErrOperationIncomplete
}
var (
cKey *C.char
cVal *C.char
cValLen C.size_t
)
ret := C.rados_omap_get_next(s.iter, &cKey, &cVal, &cValLen)
if ret != 0 {
return nil, getError(ret)
}
if cKey == nil {
// Iterator has reached the end of the list.
return nil, nil
}
return &OmapKeyValue{
Key: C.GoString(cKey),
Value: C.GoBytes(unsafe.Pointer(cVal), C.int(cValLen)),
}, nil
}
// GetOmapValuesByKeys starts iterating over specific key/value pairs.
// PREVIEW
//
// Implements:
// void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op,
// char const * const * keys,
// size_t keys_len,
// rados_omap_iter_t * iter,
// int * prval)
func (r *ReadOp) GetOmapValuesByKeys(keys []string) *ReadOpOmapGetValsByKeysStep {
s := newReadOpOmapGetValsByKeysStep()
r.steps = append(r.steps, s)
cKeys := make([]*C.char, len(keys))
defer func() {
for _, cKeyPtr := range cKeys {
C.free(unsafe.Pointer(cKeyPtr))
}
}()
for i, key := range keys {
cKeys[i] = C.CString(key)
}
C.rados_read_op_omap_get_vals_by_keys(
r.op,
&cKeys[0],
C.size_t(len(keys)),
&s.iter,
s.prval,
)
return s
}

75
vendor/github.com/ceph/go-ceph/rados/read_op_read.go generated vendored Normal file
View File

@ -0,0 +1,75 @@
//go:build ceph_preview
// +build ceph_preview
package rados
// #cgo LDFLAGS: -lrados
// #include <rados/librados.h>
// #include <stdlib.h>
//
import "C"
import (
"unsafe"
)
// ReadOpReadStep holds the result of the Read read operation.
// Result is valid only after Operate() was called.
type ReadOpReadStep struct {
// C returned data:
bytesRead *C.size_t
prval *C.int
BytesRead int64 // Bytes read by this action.
Result int // Result of this action.
}
func (s *ReadOpReadStep) update() error {
s.BytesRead = (int64)(*s.bytesRead)
s.Result = (int)(*s.prval)
return nil
}
func (s *ReadOpReadStep) free() {
C.free(unsafe.Pointer(s.bytesRead))
C.free(unsafe.Pointer(s.prval))
s.bytesRead = nil
s.prval = nil
}
func newReadOpReadStep() *ReadOpReadStep {
return &ReadOpReadStep{
bytesRead: (*C.size_t)(C.malloc(C.sizeof_size_t)),
prval: (*C.int)(C.malloc(C.sizeof_int)),
}
}
// Read bytes from offset into buffer.
// len(buffer) is the maximum number of bytes read from the object.
// buffer[:ReadOpReadStep.BytesRead] then contains object data.
// PREVIEW
//
// Implements:
// void rados_read_op_read(rados_read_op_t read_op,
// uint64_t offset,
// size_t len,
// char * buffer,
// size_t * bytes_read,
// int * prval)
func (r *ReadOp) Read(offset uint64, buffer []byte) *ReadOpReadStep {
oe := newReadStep(buffer, offset)
readStep := newReadOpReadStep()
r.steps = append(r.steps, oe, readStep)
C.rados_read_op_read(
r.op,
oe.cOffset,
oe.cReadLen,
oe.cBuffer,
readStep.bytesRead,
readStep.prval,
)
return readStep
}

31
vendor/github.com/ceph/go-ceph/rados/read_step.go generated vendored Normal file
View File

@ -0,0 +1,31 @@
package rados
// #include <stdint.h>
import "C"
import (
"unsafe"
)
type readStep struct {
withoutUpdate
withoutFree
// the c pointer utilizes the Go byteslice data and no free is needed
// inputs:
b []byte
// arguments:
cBuffer *C.char
cReadLen C.size_t
cOffset C.uint64_t
}
func newReadStep(b []byte, offset uint64) *readStep {
return &readStep{
b: b,
cBuffer: (*C.char)(unsafe.Pointer(&b[0])), // TODO: must be pinned
cReadLen: C.size_t(len(b)),
cOffset: C.uint64_t(offset),
}
}

379
vendor/github.com/ceph/go-ceph/rados/watcher.go generated vendored Normal file
View File

@ -0,0 +1,379 @@
//go:build ceph_preview
// +build ceph_preview
package rados
/*
#cgo LDFLAGS: -lrados
#include <stdlib.h>
#include <rados/librados.h>
extern void watchNotifyCb(void*, uint64_t, uint64_t, uint64_t, void*, size_t);
extern void watchErrorCb(void*, uint64_t, int);
*/
import "C"
import (
"encoding/binary"
"fmt"
"math"
"sync"
"time"
"unsafe"
)
type (
// WatcherID is the unique id of a Watcher.
WatcherID uint64
// NotifyID is the unique id of a NotifyEvent.
NotifyID uint64
// NotifierID is the unique id of a notifying client.
NotifierID uint64
)
// NotifyEvent is received by a watcher for each notification.
type NotifyEvent struct {
ID NotifyID
WatcherID WatcherID
NotifierID NotifierID
Data []byte
}
// NotifyAck represents an acknowleged notification.
type NotifyAck struct {
WatcherID WatcherID
NotifierID NotifierID
Response []byte
}
// NotifyTimeout represents an unacknowleged notification.
type NotifyTimeout struct {
WatcherID WatcherID
NotifierID NotifierID
}
// Watcher receives all notifications for certain object.
type Watcher struct {
id WatcherID
oid string
ioctx *IOContext
events chan NotifyEvent
errors chan error
done chan struct{}
}
var (
watchers = map[WatcherID]*Watcher{}
watchersMtx sync.RWMutex
)
// Watch creates a Watcher for the specified object.
// PREVIEW
//
// A Watcher receives all notifications that are sent to the object on which it
// has been created. It exposes two read-only channels: Events() receives all
// the NotifyEvents and Errors() receives all occuring errors. A typical code
// creating a Watcher could look like this:
//
// watcher, err := ioctx.Watch(oid)
// go func() { // event handler
// for ne := range watcher.Events() {
// ...
// ne.Ack([]byte("response data..."))
// ...
// }
// }()
// go func() { // error handler
// for err := range watcher.Errors() {
// ... handle err ...
// }
// }()
//
// CAUTION: the Watcher references the IOContext in which it has been created.
// Therefore all watchers must be deleted with the Delete() method before the
// IOContext is being destroyed.
//
// Implements:
// int rados_watch2(rados_ioctx_t io, const char* o, uint64_t* cookie,
// rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, void* arg)
func (ioctx *IOContext) Watch(obj string) (*Watcher, error) {
return ioctx.WatchWithTimeout(obj, 0)
}
// WatchWithTimeout creates a watcher on an object. Same as Watcher(), but
// different timeout than the default can be specified.
// PREVIEW
//
// Implements:
// int rados_watch3(rados_ioctx_t io, const char *o, uint64_t *cookie,
// rados_watchcb2_t watchcb, rados_watcherrcb_t watcherrcb, uint32_t timeout,
// void *arg);
func (ioctx *IOContext) WatchWithTimeout(oid string, timeout time.Duration) (*Watcher, error) {
cObj := C.CString(oid)
defer C.free(unsafe.Pointer(cObj))
var id C.uint64_t
watchersMtx.Lock()
defer watchersMtx.Unlock()
ret := C.rados_watch3(
ioctx.ioctx,
cObj,
&id,
(C.rados_watchcb2_t)(C.watchNotifyCb),
(C.rados_watcherrcb_t)(C.watchErrorCb),
C.uint32_t(timeout.Milliseconds()/1000),
nil,
)
if err := getError(ret); err != nil {
return nil, err
}
evCh := make(chan NotifyEvent)
errCh := make(chan error)
w := &Watcher{
id: WatcherID(id),
ioctx: ioctx,
oid: oid,
events: evCh,
errors: errCh,
done: make(chan struct{}),
}
watchers[WatcherID(id)] = w
return w, nil
}
// ID returns the WatcherId of the Watcher
// PREVIEW
func (w *Watcher) ID() WatcherID {
return w.id
}
// Events returns a read-only channel, that receives all notifications that are
// sent to the object of the Watcher.
// PREVIEW
func (w *Watcher) Events() <-chan NotifyEvent {
return w.events
}
// Errors returns a read-only channel, that receives all errors for the Watcher.
// PREVIEW
func (w *Watcher) Errors() <-chan error {
return w.errors
}
// Check on the status of a Watcher.
// PREVIEW
//
// Returns the time since it was last confirmed. If there is an error, the
// Watcher is no longer valid, and should be destroyed with the Delete() method.
//
// Implements:
// int rados_watch_check(rados_ioctx_t io, uint64_t cookie)
func (w *Watcher) Check() (time.Duration, error) {
ret := C.rados_watch_check(w.ioctx.ioctx, C.uint64_t(w.id))
if ret < 0 {
return 0, getError(ret)
}
return time.Millisecond * time.Duration(ret), nil
}
// Delete the watcher. This closes both the event and error channel.
// PREVIEW
//
// Implements:
// int rados_unwatch2(rados_ioctx_t io, uint64_t cookie)
func (w *Watcher) Delete() error {
watchersMtx.Lock()
_, ok := watchers[w.id]
if ok {
delete(watchers, w.id)
}
watchersMtx.Unlock()
if !ok {
return nil
}
ret := C.rados_unwatch2(w.ioctx.ioctx, C.uint64_t(w.id))
if ret != 0 {
return getError(ret)
}
close(w.done) // unblock blocked callbacks
close(w.events)
close(w.errors)
return nil
}
// Notify sends a notification with the provided data to all Watchers of the
// specified object.
// PREVIEW
//
// CAUTION: even if the error is not nil. the returned slices
// might still contain data.
func (ioctx *IOContext) Notify(obj string, data []byte) ([]NotifyAck, []NotifyTimeout, error) {
return ioctx.NotifyWithTimeout(obj, data, 0)
}
// NotifyWithTimeout is like Notify() but with a different timeout than the
// default.
// PREVIEW
//
// Implements:
// int rados_notify2(rados_ioctx_t io, const char* o, const char* buf, int buf_len,
// uint64_t timeout_ms, char** reply_buffer, size_t* reply_buffer_len)
func (ioctx *IOContext) NotifyWithTimeout(obj string, data []byte, timeout time.Duration) ([]NotifyAck,
[]NotifyTimeout, error) {
cObj := C.CString(obj)
defer C.free(unsafe.Pointer(cObj))
var cResponse *C.char
defer C.rados_buffer_free(cResponse)
var responseLen C.size_t
var dataPtr *C.char
if len(data) > 0 {
dataPtr = (*C.char)(unsafe.Pointer(&data[0]))
}
ret := C.rados_notify2(
ioctx.ioctx,
cObj,
dataPtr,
C.int(len(data)),
C.uint64_t(timeout.Milliseconds()),
&cResponse,
&responseLen,
)
// cResponse has been set even if an error is returned, so we decode it anyway
acks, timeouts := decodeNotifyResponse(cResponse, responseLen)
return acks, timeouts, getError(ret)
}
// Ack sends an acknowledgement with the specified response data to the notfier
// of the NotifyEvent. If a notify is not ack'ed, the originating Notify() call
// blocks and eventiually times out.
// PREVIEW
//
// Implements:
// int rados_notify_ack(rados_ioctx_t io, const char *o, uint64_t notify_id,
// uint64_t cookie, const char *buf, int buf_len)
func (ne *NotifyEvent) Ack(response []byte) error {
watchersMtx.RLock()
w, ok := watchers[ne.WatcherID]
watchersMtx.RUnlock()
if !ok {
return fmt.Errorf("can't ack on deleted watcher %v", ne.WatcherID)
}
cOID := C.CString(w.oid)
defer C.free(unsafe.Pointer(cOID))
var respPtr *C.char
if len(response) > 0 {
respPtr = (*C.char)(unsafe.Pointer(&response[0]))
}
ret := C.rados_notify_ack(
w.ioctx.ioctx,
cOID,
C.uint64_t(ne.ID),
C.uint64_t(ne.WatcherID),
respPtr,
C.int(len(response)),
)
return getError(ret)
}
// WatcherFlush flushes all pending notifications of the cluster.
// PREVIEW
//
// Implements:
// int rados_watch_flush(rados_t cluster)
func (c *Conn) WatcherFlush() error {
if !c.connected {
return ErrNotConnected
}
ret := C.rados_watch_flush(c.cluster)
return getError(ret)
}
// decoder for this notify response format:
// le32 num_acks
// {
// le64 gid global id for the client (for client.1234 that's 1234)
// le64 cookie cookie for the client
// le32 buflen length of reply message buffer
// u8 buflen payload
// } num_acks
// le32 num_timeouts
// {
// le64 gid global id for the client
// le64 cookie cookie for the client
// } num_timeouts
//
// NOTE: starting with pacific this is implemented as a C function and this can
// be replaced later
func decodeNotifyResponse(response *C.char, len C.size_t) ([]NotifyAck, []NotifyTimeout) {
if len == 0 || response == nil {
return nil, nil
}
b := (*[math.MaxInt32]byte)(unsafe.Pointer(response))[:len:len]
pos := 0
num := binary.LittleEndian.Uint32(b[pos:])
pos += 4
acks := make([]NotifyAck, num)
for i := range acks {
acks[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:]))
pos += 8
acks[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:]))
pos += 8
dataLen := binary.LittleEndian.Uint32(b[pos:])
pos += 4
if dataLen > 0 {
acks[i].Response = C.GoBytes(unsafe.Pointer(&b[pos]), C.int(dataLen))
pos += int(dataLen)
}
}
num = binary.LittleEndian.Uint32(b[pos:])
pos += 4
timeouts := make([]NotifyTimeout, num)
for i := range timeouts {
timeouts[i].NotifierID = NotifierID(binary.LittleEndian.Uint64(b[pos:]))
pos += 8
timeouts[i].WatcherID = WatcherID(binary.LittleEndian.Uint64(b[pos:]))
pos += 8
}
return acks, timeouts
}
//export watchNotifyCb
func watchNotifyCb(_ unsafe.Pointer, notifyID C.uint64_t, id C.uint64_t,
notifierID C.uint64_t, cData unsafe.Pointer, dataLen C.size_t) {
watchersMtx.RLock()
w, ok := watchers[WatcherID(id)]
watchersMtx.RUnlock()
if !ok {
// usually this should not happen, but who knows
// TODO: some log message (once we have logging)
return
}
ev := NotifyEvent{
ID: NotifyID(notifyID),
WatcherID: WatcherID(id),
NotifierID: NotifierID(notifierID),
}
if dataLen > 0 {
ev.Data = C.GoBytes(cData, C.int(dataLen))
}
select {
case <-w.done: // unblock when deleted
case w.events <- ev:
}
}
//export watchErrorCb
func watchErrorCb(_ unsafe.Pointer, id C.uint64_t, err C.int) {
watchersMtx.RLock()
w, ok := watchers[WatcherID(id)]
watchersMtx.RUnlock()
if !ok {
// usually this should not happen, but who knows
// TODO: some log message (once we have logging)
return
}
select {
case <-w.done: // unblock when deleted
case w.errors <- getError(err):
}
}

View File

@ -1,6 +1,3 @@
//go:build ceph_preview
// +build ceph_preview
package rados
// #cgo LDFLAGS: -lrados
@ -40,7 +37,6 @@ func newWriteOpCmpExtStep() *WriteOpCmpExtStep {
}
// CmpExt ensures that given object range (extent) satisfies comparison.
// PREVIEW
//
// Implements:
// void rados_write_op_cmpext(rados_write_op_t write_op,

View File

@ -706,15 +706,13 @@ func (image *Image) BreakLock(client string, cookie string) error {
return getError(C.rbd_break_lock(image.image, cClient, cCookie))
}
// ssize_t rbd_read(rbd_image_t image, uint64_t ofs, size_t len, char *buf);
// TODO: int64_t rbd_read_iterate(rbd_image_t image, uint64_t ofs, size_t len,
// int (*cb)(uint64_t, size_t, const char *, void *), void *arg);
// TODO: int rbd_read_iterate2(rbd_image_t image, uint64_t ofs, uint64_t len,
// int (*cb)(uint64_t, size_t, const char *, void *), void *arg);
// TODO: int rbd_diff_iterate(rbd_image_t image,
// const char *fromsnapname,
// uint64_t ofs, uint64_t len,
// int (*cb)(uint64_t, size_t, int, void *), void *arg);
// Read data from the image. The length of the read is determined by the length
// of the buffer slice. The position of the read is determined by an internal
// offset which is not safe in concurrent code. Prefer ReadAt when possible.
//
// Implements:
// ssize_t rbd_read(rbd_image_t image, uint64_t ofs, size_t len,
// char *buf);
func (image *Image) Read(data []byte) (int, error) {
if err := image.validate(imageIsOpen); err != nil {
return 0, err
@ -742,7 +740,13 @@ func (image *Image) Read(data []byte) (int, error) {
return ret, nil
}
// ssize_t rbd_write(rbd_image_t image, uint64_t ofs, size_t len, const char *buf);
// Write data to an image. The length of the write is determined by the length of
// the buffer slice. The position of the write is determined by an internal
// offset which is not safe in concurrent code. Prefer WriteAt when possible.
//
// Implements:
// ssize_t rbd_write(rbd_image_t image, uint64_t ofs, size_t len,
// const char *buf);
func (image *Image) Write(data []byte) (n int, err error) {
if err := image.validate(imageIsOpen); err != nil {
return 0, err

3
vendor/modules.txt vendored
View File

@ -69,9 +69,10 @@ github.com/cenkalti/backoff/v3
## explicit; go 1.16
github.com/ceph/ceph-csi/api/deploy/kubernetes/rbd
github.com/ceph/ceph-csi/api/deploy/ocp
# github.com/ceph/go-ceph v0.13.0
# github.com/ceph/go-ceph v0.14.0
## explicit; go 1.12
github.com/ceph/go-ceph/cephfs/admin
github.com/ceph/go-ceph/common/admin/manager
github.com/ceph/go-ceph/common/commands
github.com/ceph/go-ceph/internal/callbacks
github.com/ceph/go-ceph/internal/commands