rebase: use go-ceph version with NFS-Admin API

The NFS-Admin API has been added to go-ceph v0.15.0. As the API can not
be tested in the go-ceph CI, it requires build-tag `ceph_ci_untested`.
This additional build-tag has been added to the `Makefile` and should be
removed when the API does not require the build-tag anymore.

See-also: ceph/go-ceph#655
Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2022-03-30 10:46:01 +02:00 committed by mergify[bot]
parent 28369702d2
commit 282c33cb58
16 changed files with 455 additions and 171 deletions

View File

@ -48,7 +48,8 @@ GO_PROJECT=github.com/ceph/ceph-csi
CEPH_VERSION ?= $(shell . $(CURDIR)/build.env ; echo $${CEPH_VERSION})
# TODO: ceph_preview tag may be removed with go-ceph 0.16.0
GO_TAGS_LIST ?= $(CEPH_VERSION) ceph_preview
# TODO: ceph_ci_untested is added for NFS-export management (go-ceph#655)
GO_TAGS_LIST ?= $(CEPH_VERSION) ceph_preview ceph_ci_untested
# go build flags
LDFLAGS ?=

3
go.mod
View File

@ -7,7 +7,8 @@ require (
github.com/aws/aws-sdk-go v1.43.32
github.com/aws/aws-sdk-go-v2/service/sts v1.16.3
github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000
github.com/ceph/go-ceph v0.14.0
// TODO: API for managing NFS-exports requires `ceph_ci_untested` build-tag
github.com/ceph/go-ceph v0.15.0
github.com/container-storage-interface/spec v1.5.0
github.com/csi-addons/replication-lib-utils v0.2.0
github.com/csi-addons/spec v0.1.2-0.20211220115741-32fa508dadbe

4
go.sum
View File

@ -180,8 +180,8 @@ github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f/go.mod h1:C0rtzmGXgN78pYR0tGJFhtHgkbAs0lIbHwkB81VxDQE=
github.com/ceph/go-ceph v0.14.0 h1:sJoT0au7NT3TPmDWf5W9w6tZy0U/5xZrIXVVauZR+Xo=
github.com/ceph/go-ceph v0.14.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY=
github.com/ceph/go-ceph v0.15.0 h1:ILB3NaLWOtt4u/2d8I8HZTC4Ycm1PsOYVar3IFU1xlo=
github.com/ceph/go-ceph v0.15.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=

View File

@ -0,0 +1,21 @@
//go:build !(nautilus || octopus) && ceph_preview && ceph_ci_untested
// +build !nautilus,!octopus,ceph_preview,ceph_ci_untested
package nfs
import (
ccom "github.com/ceph/go-ceph/common/commands"
)
// Admin is used to administer ceph nfs features.
type Admin struct {
conn ccom.RadosCommander
}
// NewFromConn creates an new management object from a preexisting
// rados connection. The existing connection can be rados.Conn or any
// type implementing the RadosCommander interface.
// PREVIEW
func NewFromConn(conn ccom.RadosCommander) *Admin {
return &Admin{conn}
}

View File

@ -0,0 +1,5 @@
/*
Package nfs from common/admin contains a set of APIs used to interact
with and administer NFS support for ceph clusters.
*/
package nfs

View File

@ -0,0 +1,198 @@
//go:build !(nautilus || octopus) && ceph_preview && ceph_ci_untested
// +build !nautilus,!octopus,ceph_preview,ceph_ci_untested
package nfs
import (
"github.com/ceph/go-ceph/internal/commands"
)
// SquashMode indicates the kind of user-id squashing performed on an export.
type SquashMode string
// src: https://github.com/nfs-ganesha/nfs-ganesha/blob/next/src/config_samples/export.txt
const (
// NoneSquash performs no id squashing.
NoneSquash SquashMode = "None"
// RootSquash performs squashing of root user (with any gid).
RootSquash SquashMode = "Root"
// AllSquash performs squashing of all users.
AllSquash SquashMode = "All"
// RootIDSquash performs squashing of root uid/gid.
RootIDSquash SquashMode = "RootId"
// NoRootSquash is equivalent to NoneSquash
NoRootSquash = NoneSquash
// Unspecifiedquash
Unspecifiedquash SquashMode = ""
)
// CephFSExportSpec is used to specify the parameters used to create a new
// CephFS based export.
type CephFSExportSpec struct {
FileSystemName string `json:"fsname"`
ClusterID string `json:"cluster_id"`
PseudoPath string `json:"pseudo_path"`
Path string `json:"path,omitempty"`
ReadOnly bool `json:"readonly"`
ClientAddr []string `json:"client_addr,omitempty"`
Squash SquashMode `json:"squash,omitempty"`
}
// ExportResult is returned along with newly created exports.
type ExportResult struct {
Bind string `json:"bind"`
FileSystemName string `json:"fs"`
Path string `json:"path"`
ClusterID string `json:"cluster"`
Mode string `json:"mode"`
}
type cephFSExportFields struct {
Prefix string `json:"prefix"`
Format string `json:"format"`
CephFSExportSpec
}
// FSALInfo describes NFS-Ganesha specific FSAL properties of an export.
type FSALInfo struct {
Name string `json:"name"`
UserID string `json:"user_id"`
FileSystemName string `json:"fs_name"`
}
// ClientInfo describes per-client parameters of an export.
type ClientInfo struct {
Addresses []string `json:"addresses"`
AccessType string `json:"access_type"`
Squash SquashMode `json:"squash"`
}
// ExportInfo describes an NFS export.
type ExportInfo struct {
ExportID int64 `json:"export_id"`
Path string `json:"path"`
ClusterID string `json:"cluster_id"`
PseudoPath string `json:"pseudo"`
AccessType string `json:"access_type"`
Squash SquashMode `json:"squash"`
SecurityLabel bool `json:"security_label"`
Protocols []int `json:"protocols"`
Transports []string `json:"transports"`
FSAL FSALInfo `json:"fsal"`
Clients []ClientInfo `json:"clients"`
}
func parseExportResult(res commands.Response) (*ExportResult, error) {
r := &ExportResult{}
if err := res.NoStatus().Unmarshal(r).End(); err != nil {
return nil, err
}
return r, nil
}
func parseExportsList(res commands.Response) ([]ExportInfo, error) {
l := []ExportInfo{}
if err := res.NoStatus().Unmarshal(&l).End(); err != nil {
return nil, err
}
return l, nil
}
func parseExportInfo(res commands.Response) (ExportInfo, error) {
i := ExportInfo{}
if err := res.NoStatus().Unmarshal(&i).End(); err != nil {
return i, err
}
return i, nil
}
// CreateCephFSExport will create a new NFS export for a CephFS file system.
// PREVIEW
//
// Similar To:
// ceph nfs export create cephfs
func (nfsa *Admin) CreateCephFSExport(spec CephFSExportSpec) (
*ExportResult, error) {
// ---
f := &cephFSExportFields{
Prefix: "nfs export create cephfs",
Format: "json",
CephFSExportSpec: spec,
}
return parseExportResult(commands.MarshalMgrCommand(nfsa.conn, f))
}
const delSucc = "Successfully deleted export"
// RemoveExport will remove an NFS export based on the pseudo-path of the export.
// PREVIEW
//
// Similar To:
// ceph nfs export rm
func (nfsa *Admin) RemoveExport(clusterID, pseudoPath string) error {
m := map[string]string{
"prefix": "nfs export rm",
"format": "json",
"cluster_id": clusterID,
"pseudo_path": pseudoPath,
}
return (commands.MarshalMgrCommand(nfsa.conn, m).
FilterBodyPrefix(delSucc).NoData().End())
}
// ListDetailedExports will return a list of exports with details.
// PREVIEW
//
// Similar To:
// ceph nfs export ls --detailed
func (nfsa *Admin) ListDetailedExports(clusterID string) ([]ExportInfo, error) {
/*
NOTE: there is no simple list because based on a quick reading of the code
in ceph, the details fetching should not be significantly slower with
details than without, and since this is an API call not a CLI its easy
enough to ignore the details you don't care about. If I'm wrong, and
we discover a major perf. difference in the future we can always add a new
simpler list-without-details function.
*/
m := map[string]string{
"prefix": "nfs export ls",
"detailed": "true",
"format": "json",
"cluster_id": clusterID,
}
return parseExportsList(commands.MarshalMgrCommand(nfsa.conn, m))
}
// ExportInfo will return a structure describing the export specified by it's
// pseudo-path.
// PREVIEW
//
// Similar To:
// ceph nfs export info
func (nfsa *Admin) ExportInfo(clusterID, pseudoPath string) (ExportInfo, error) {
m := map[string]string{
"prefix": "nfs export info",
"format": "json",
"cluster_id": clusterID,
"pseudo_path": pseudoPath,
}
return parseExportInfo(commands.MarshalMgrCommand(nfsa.conn, m))
}
/*
TODO?
'nfs export apply': cluster_id: str, inbuf: str
"""Create or update an export by `-i <json_or_ganesha_export_file>`"""
'nfs export create rgw':
bucket: str,
cluster_id: str,
pseudo_path: str,
readonly: Optional[bool] = False,
client_addr: Optional[List[str]] = None,
squash: str = 'none',
"""Create an RGW export"""
*/

View File

@ -1,6 +1,7 @@
package commands
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@ -156,6 +157,18 @@ func (r Response) FilterSuffix(s string) Response {
return r
}
// FilterBodyPrefix sets the body value equivalent to an empty string if the
// body value contains the given prefix string.
func (r Response) FilterBodyPrefix(p string) Response {
if !r.Ok() {
return r
}
if bytes.HasPrefix(r.body, []byte(p)) {
return Response{[]byte(""), r.status, r.err}
}
return r
}
// FilterDeprecated removes deprecation warnings from the response status.
// Use it when checking the response from calls that may be deprecated in ceph
// if you want those calls to continue working if the warning is present.

View File

@ -0,0 +1,89 @@
package cutil
// #include <stdlib.h>
import "C"
import (
"unsafe"
)
// BufferGroup is a helper structure that holds Go-allocated slices of
// C-allocated strings and their respective lengths. Useful for C functions
// that consume byte buffers with explicit length instead of null-terminated
// strings. When used as input arguments in C functions, caller must make sure
// the C code will not hold any pointers to either of the struct's attributes
// after that C function returns.
type BufferGroup struct {
// C-allocated buffers.
Buffers []CharPtr
// Lengths of C buffers, where Lengths[i] = length(Buffers[i]).
Lengths []SizeT
}
// TODO: should BufferGroup implementation change and the slices would contain
// nested Go pointers, they must be pinned with PtrGuard.
// NewBufferGroupStrings returns new BufferGroup constructed from strings.
func NewBufferGroupStrings(strs []string) *BufferGroup {
s := &BufferGroup{
Buffers: make([]CharPtr, len(strs)),
Lengths: make([]SizeT, len(strs)),
}
for i, str := range strs {
bs := []byte(str)
s.Buffers[i] = CharPtr(C.CBytes(bs))
s.Lengths[i] = SizeT(len(bs))
}
return s
}
// NewBufferGroupBytes returns new BufferGroup constructed
// from slice of byte slices.
func NewBufferGroupBytes(bss [][]byte) *BufferGroup {
s := &BufferGroup{
Buffers: make([]CharPtr, len(bss)),
Lengths: make([]SizeT, len(bss)),
}
for i, bs := range bss {
s.Buffers[i] = CharPtr(C.CBytes(bs))
s.Lengths[i] = SizeT(len(bs))
}
return s
}
// Free free()s the C-allocated memory.
func (s *BufferGroup) Free() {
for _, ptr := range s.Buffers {
C.free(unsafe.Pointer(ptr))
}
s.Buffers = nil
s.Lengths = nil
}
// BuffersPtr returns a pointer to the beginning of the Buffers slice.
func (s *BufferGroup) BuffersPtr() CharPtrPtr {
if len(s.Buffers) == 0 {
return nil
}
return CharPtrPtr(&s.Buffers[0])
}
// LengthsPtr returns a pointer to the beginning of the Lengths slice.
func (s *BufferGroup) LengthsPtr() SizeTPtr {
if len(s.Lengths) == 0 {
return nil
}
return SizeTPtr(&s.Lengths[0])
}
func testBufferGroupGet(s *BufferGroup, index int) (str string, length int) {
bs := C.GoBytes(unsafe.Pointer(s.Buffers[index]), C.int(s.Lengths[index]))
return string(bs), int(s.Lengths[index])
}

14
vendor/github.com/ceph/go-ceph/internal/log/log.go generated vendored Normal file
View File

@ -0,0 +1,14 @@
// Package log is the internal package for go-ceph logging. This package is only
// used from go-ceph code, not from consumers of go-ceph. go-ceph code uses the
// functions in this package to log information that can't be returned as
// errors. The functions default to no-ops and can be set with the external log
// package common/log by the go-ceph consumers.
package log
func noop(string, ...interface{}) {}
// These variables are set by the common log package.
var (
Warnf = noop
Debugf = noop
)

View File

@ -10,69 +10,8 @@ import "C"
import (
"runtime"
"unsafe"
"github.com/ceph/go-ceph/internal/cutil"
)
// setOmapStep is a write op step. It holds C memory used in the operation.
type setOmapStep struct {
withRefs
withoutUpdate
// C arguments
cKeys cutil.CPtrCSlice
cValues cutil.CPtrCSlice
cLengths cutil.SizeTCSlice
cNum C.size_t
}
func newSetOmapStep(pairs map[string][]byte) *setOmapStep {
maplen := len(pairs)
cKeys := cutil.NewCPtrCSlice(maplen)
cValues := cutil.NewCPtrCSlice(maplen)
cLengths := cutil.NewSizeTCSlice(maplen)
sos := &setOmapStep{
cKeys: cKeys,
cValues: cValues,
cLengths: cLengths,
cNum: C.size_t(maplen),
}
var i uintptr
for key, value := range pairs {
// key
ck := C.CString(key)
sos.add(unsafe.Pointer(ck))
cKeys[i] = cutil.CPtr(ck)
// value and its length
vlen := cutil.SizeT(len(value))
if vlen > 0 {
cv := C.CBytes(value)
sos.add(cv)
cValues[i] = cutil.CPtr(cv)
} else {
cValues[i] = nil
}
cLengths[i] = vlen
i++
}
runtime.SetFinalizer(sos, opStepFinalizer)
return sos
}
func (sos *setOmapStep) free() {
sos.cKeys.Free()
sos.cValues.Free()
sos.cLengths.Free()
sos.withRefs.free()
}
// OmapKeyValue items are returned by the GetOmapStep's Next call.
type OmapKeyValue struct {
Key string
@ -88,15 +27,6 @@ type OmapKeyValue struct {
// Release method is called the public methods of the step must no longer be
// used and may return errors.
type GetOmapStep struct {
// inputs:
startAfter string
filterPrefix string
maxReturn uint64
// arguments:
cStartAfter *C.char
cFilterPrefix *C.char
// C returned data:
iter C.rados_omap_iter_t
more *C.uchar
@ -109,15 +39,10 @@ type GetOmapStep struct {
canIterate bool
}
func newGetOmapStep(startAfter, filterPrefix string, maxReturn uint64) *GetOmapStep {
func newGetOmapStep() *GetOmapStep {
gos := &GetOmapStep{
startAfter: startAfter,
filterPrefix: filterPrefix,
maxReturn: maxReturn,
cStartAfter: C.CString(startAfter),
cFilterPrefix: C.CString(filterPrefix),
more: (*C.uchar)(C.malloc(C.sizeof_uchar)),
rval: (*C.int)(C.malloc(C.sizeof_int)),
more: (*C.uchar)(C.malloc(C.sizeof_uchar)),
rval: (*C.int)(C.malloc(C.sizeof_int)),
}
runtime.SetFinalizer(gos, opStepFinalizer)
return gos
@ -133,10 +58,6 @@ func (gos *GetOmapStep) free() {
gos.more = nil
C.free(unsafe.Pointer(gos.rval))
gos.rval = nil
C.free(unsafe.Pointer(gos.cStartAfter))
gos.cStartAfter = nil
C.free(unsafe.Pointer(gos.cFilterPrefix))
gos.cFilterPrefix = nil
}
func (gos *GetOmapStep) update() error {
@ -151,11 +72,12 @@ func (gos *GetOmapStep) Next() (*OmapKeyValue, error) {
return nil, ErrOperationIncomplete
}
var (
cKey *C.char
cVal *C.char
cLen C.size_t
cKey *C.char
cVal *C.char
cKeyLen C.size_t
cValLen C.size_t
)
ret := C.rados_omap_get_next(gos.iter, &cKey, &cVal, &cLen)
ret := C.rados_omap_get_next2(gos.iter, &cKey, &cVal, &cKeyLen, &cValLen)
if ret != 0 {
return nil, getError(ret)
}
@ -163,8 +85,8 @@ func (gos *GetOmapStep) Next() (*OmapKeyValue, error) {
return nil, nil
}
return &OmapKeyValue{
Key: C.GoString(cKey),
Value: C.GoBytes(unsafe.Pointer(cVal), C.int(cLen)),
Key: string(C.GoBytes(unsafe.Pointer(cKey), C.int(cKeyLen))),
Value: C.GoBytes(unsafe.Pointer(cVal), C.int(cValLen)),
}, nil
}
@ -175,40 +97,6 @@ func (gos *GetOmapStep) More() bool {
return *gos.more != 0
}
// removeOmapKeysStep is a write operation step used to track state, especially
// C memory, across the setup and use of a WriteOp.
type removeOmapKeysStep struct {
withRefs
withoutUpdate
// arguments:
cKeys cutil.CPtrCSlice
cNum C.size_t
}
func newRemoveOmapKeysStep(keys []string) *removeOmapKeysStep {
cKeys := cutil.NewCPtrCSlice(len(keys))
roks := &removeOmapKeysStep{
cKeys: cKeys,
cNum: C.size_t(len(keys)),
}
i := 0
for _, key := range keys {
cKeys[i] = cutil.CPtr(C.CString(key))
roks.add(unsafe.Pointer(cKeys[i]))
i++
}
runtime.SetFinalizer(roks, opStepFinalizer)
return roks
}
func (roks *removeOmapKeysStep) free() {
roks.cKeys.Free()
roks.withRefs.free()
}
// SetOmap appends the map `pairs` to the omap `oid`
func (ioctx *IOContext) SetOmap(oid string, pairs map[string][]byte) error {
op := CreateWriteOp()

View File

@ -0,0 +1,31 @@
//go:build ceph_preview
// +build ceph_preview
package rados
// #cgo LDFLAGS: -lrados
// #include <rados/librados.h>
// #include <stdlib.h>
//
import "C"
import (
"unsafe"
)
// SetLocator sets the key for mapping objects to pgs within an io context.
// Until a different locator key is set, all objects in this io context will be placed in the same pg.
// To reset the locator, an empty string must be set.
// PREVIEW
//
// Implements:
// void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key);
func (ioctx *IOContext) SetLocator(locator string) {
if locator == "" {
C.rados_ioctx_locator_set_key(ioctx.ioctx, nil)
} else {
var cLoc *C.char = C.CString(locator)
defer C.free(unsafe.Pointer(cLoc))
C.rados_ioctx_locator_set_key(ioctx.ioctx, cLoc)
}
}

View File

@ -69,13 +69,19 @@ func (r *ReadOp) AssertExists() {
// function. The GetOmapStep may be used to iterate over the key-value
// pairs after the Operate call has been performed.
func (r *ReadOp) GetOmapValues(startAfter, filterPrefix string, maxReturn uint64) *GetOmapStep {
gos := newGetOmapStep(startAfter, filterPrefix, maxReturn)
gos := newGetOmapStep()
r.steps = append(r.steps, gos)
cStartAfter := C.CString(startAfter)
cFilterPrefix := C.CString(filterPrefix)
defer C.free(unsafe.Pointer(cStartAfter))
defer C.free(unsafe.Pointer(cFilterPrefix))
C.rados_read_op_omap_get_vals2(
r.op,
gos.cStartAfter,
gos.cFilterPrefix,
C.uint64_t(gos.maxReturn),
cStartAfter,
cFilterPrefix,
C.uint64_t(maxReturn),
&gos.iter,
gos.more,
gos.rval,

View File

@ -11,6 +11,8 @@ import "C"
import (
"unsafe"
"github.com/ceph/go-ceph/internal/cutil"
)
// ReadOpOmapGetValsByKeysStep holds the result of the
@ -65,10 +67,11 @@ func (s *ReadOpOmapGetValsByKeysStep) Next() (*OmapKeyValue, error) {
var (
cKey *C.char
cVal *C.char
cKeyLen C.size_t
cValLen C.size_t
)
ret := C.rados_omap_get_next(s.iter, &cKey, &cVal, &cValLen)
ret := C.rados_omap_get_next2(s.iter, &cKey, &cVal, &cKeyLen, &cValLen)
if ret != 0 {
return nil, getError(ret)
}
@ -79,7 +82,7 @@ func (s *ReadOpOmapGetValsByKeysStep) Next() (*OmapKeyValue, error) {
}
return &OmapKeyValue{
Key: C.GoString(cKey),
Key: string(C.GoBytes(unsafe.Pointer(cKey), C.int(cKeyLen))),
Value: C.GoBytes(unsafe.Pointer(cVal), C.int(cValLen)),
}, nil
}
@ -88,30 +91,24 @@ func (s *ReadOpOmapGetValsByKeysStep) Next() (*OmapKeyValue, error) {
// PREVIEW
//
// Implements:
// void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op,
// char const * const * keys,
// size_t keys_len,
// rados_omap_iter_t * iter,
// int * prval)
// void rados_read_op_omap_get_vals_by_keys2(rados_read_op_t read_op,
// char const * const * keys,
// size_t num_keys,
// const size_t * key_lens,
// rados_omap_iter_t * iter,
// int * prval)
func (r *ReadOp) GetOmapValuesByKeys(keys []string) *ReadOpOmapGetValsByKeysStep {
s := newReadOpOmapGetValsByKeysStep()
r.steps = append(r.steps, s)
cKeys := make([]*C.char, len(keys))
defer func() {
for _, cKeyPtr := range cKeys {
C.free(unsafe.Pointer(cKeyPtr))
}
}()
cKeys := cutil.NewBufferGroupStrings(keys)
defer cKeys.Free()
for i, key := range keys {
cKeys[i] = C.CString(key)
}
C.rados_read_op_omap_get_vals_by_keys(
C.rados_read_op_omap_get_vals_by_keys2(
r.op,
&cKeys[0],
(**C.char)(cKeys.BuffersPtr()),
C.size_t(len(keys)),
(*C.size_t)(cKeys.LengthsPtr()),
&s.iter,
s.prval,
)

View File

@ -19,6 +19,8 @@ import (
"sync"
"time"
"unsafe"
"github.com/ceph/go-ceph/internal/log"
)
type (
@ -340,14 +342,6 @@ func decodeNotifyResponse(response *C.char, len C.size_t) ([]NotifyAck, []Notify
//export watchNotifyCb
func watchNotifyCb(_ unsafe.Pointer, notifyID C.uint64_t, id C.uint64_t,
notifierID C.uint64_t, cData unsafe.Pointer, dataLen C.size_t) {
watchersMtx.RLock()
w, ok := watchers[WatcherID(id)]
watchersMtx.RUnlock()
if !ok {
// usually this should not happen, but who knows
// TODO: some log message (once we have logging)
return
}
ev := NotifyEvent{
ID: NotifyID(notifyID),
WatcherID: WatcherID(id),
@ -356,6 +350,14 @@ func watchNotifyCb(_ unsafe.Pointer, notifyID C.uint64_t, id C.uint64_t,
if dataLen > 0 {
ev.Data = C.GoBytes(cData, C.int(dataLen))
}
watchersMtx.RLock()
w, ok := watchers[WatcherID(id)]
watchersMtx.RUnlock()
if !ok {
// usually this should not happen, but who knows
log.Warnf("received notification for unknown watcher ID: %#v", ev)
return
}
select {
case <-w.done: // unblock when deleted
case w.events <- ev:
@ -369,7 +371,7 @@ func watchErrorCb(_ unsafe.Pointer, id C.uint64_t, err C.int) {
watchersMtx.RUnlock()
if !ok {
// usually this should not happen, but who knows
// TODO: some log message (once we have logging)
log.Warnf("received error for unknown watcher ID: id=%d err=%#v", id, err)
return
}
select {

View File

@ -10,6 +10,7 @@ import "C"
import (
"unsafe"
"github.com/ceph/go-ceph/internal/cutil"
ts "github.com/ceph/go-ceph/internal/timespec"
)
@ -92,24 +93,39 @@ func (w *WriteOp) Create(exclusive CreateOption) {
// SetOmap appends the map `pairs` to the omap `oid`.
func (w *WriteOp) SetOmap(pairs map[string][]byte) {
sos := newSetOmapStep(pairs)
w.steps = append(w.steps, sos)
C.rados_write_op_omap_set(
keys := make([]string, len(pairs))
values := make([][]byte, len(pairs))
idx := 0
for k, v := range pairs {
keys[idx] = k
values[idx] = v
idx++
}
cKeys := cutil.NewBufferGroupStrings(keys)
cValues := cutil.NewBufferGroupBytes(values)
defer cKeys.Free()
defer cValues.Free()
C.rados_write_op_omap_set2(
w.op,
(**C.char)(sos.cKeys.Ptr()),
(**C.char)(sos.cValues.Ptr()),
(*C.size_t)(sos.cLengths.Ptr()),
sos.cNum)
(**C.char)(cKeys.BuffersPtr()),
(**C.char)(cValues.BuffersPtr()),
(*C.size_t)(cKeys.LengthsPtr()),
(*C.size_t)(cValues.LengthsPtr()),
(C.size_t)(len(pairs)))
}
// RmOmapKeys removes the specified `keys` from the omap `oid`.
func (w *WriteOp) RmOmapKeys(keys []string) {
roks := newRemoveOmapKeysStep(keys)
w.steps = append(w.steps, roks)
C.rados_write_op_omap_rm_keys(
cKeys := cutil.NewBufferGroupStrings(keys)
defer cKeys.Free()
C.rados_write_op_omap_rm_keys2(
w.op,
(**C.char)(roks.cKeys.Ptr()),
roks.cNum)
(**C.char)(cKeys.BuffersPtr()),
(*C.size_t)(cKeys.LengthsPtr()),
(C.size_t)(len(keys)))
}
// CleanOmap clears the omap `oid`.

4
vendor/modules.txt vendored
View File

@ -116,15 +116,17 @@ github.com/cenkalti/backoff/v3
github.com/ceph/ceph-csi/api/deploy/kubernetes/nfs
github.com/ceph/ceph-csi/api/deploy/kubernetes/rbd
github.com/ceph/ceph-csi/api/deploy/ocp
# github.com/ceph/go-ceph v0.14.0
# github.com/ceph/go-ceph v0.15.0
## explicit; go 1.12
github.com/ceph/go-ceph/cephfs/admin
github.com/ceph/go-ceph/common/admin/manager
github.com/ceph/go-ceph/common/admin/nfs
github.com/ceph/go-ceph/common/commands
github.com/ceph/go-ceph/internal/callbacks
github.com/ceph/go-ceph/internal/commands
github.com/ceph/go-ceph/internal/cutil
github.com/ceph/go-ceph/internal/errutil
github.com/ceph/go-ceph/internal/log
github.com/ceph/go-ceph/internal/retry
github.com/ceph/go-ceph/internal/timespec
github.com/ceph/go-ceph/rados