rebase: update go-ceph to v0.10.0

This commit updates the go-ceph to latest
release. More details about release at
https://github.com/ceph/go-ceph/releases/tag/v0.10.0

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2021-06-09 10:24:52 +05:30 committed by mergify[bot]
parent 17b0091cba
commit 5b7b5f1e3a
27 changed files with 1099 additions and 272 deletions

2
go.mod
View File

@ -4,7 +4,7 @@ go 1.16
require ( require (
github.com/aws/aws-sdk-go v1.38.54 github.com/aws/aws-sdk-go v1.38.54
github.com/ceph/go-ceph v0.8.0 github.com/ceph/go-ceph v0.10.0
github.com/container-storage-interface/spec v1.3.0 github.com/container-storage-interface/spec v1.3.0
github.com/csi-addons/replication-lib-utils v0.2.0 github.com/csi-addons/replication-lib-utils v0.2.0
github.com/csi-addons/spec v0.1.0 github.com/csi-addons/spec v0.1.0

4
go.sum
View File

@ -202,8 +202,8 @@ github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QH
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f h1:gJzxrodnNd/CtPXjO3WYiakyNzHg3rtAi7rO74ejHYU= github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f h1:gJzxrodnNd/CtPXjO3WYiakyNzHg3rtAi7rO74ejHYU=
github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f/go.mod h1:C0rtzmGXgN78pYR0tGJFhtHgkbAs0lIbHwkB81VxDQE= github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f/go.mod h1:C0rtzmGXgN78pYR0tGJFhtHgkbAs0lIbHwkB81VxDQE=
github.com/ceph/go-ceph v0.8.0 h1:d+VP0eyconBl9RrvKVUq7S0npyK969ErLkCt5pg2fp0= github.com/ceph/go-ceph v0.10.0 h1:c9Bvqdt2ccv7aSyNBFtN9fm4dwMyfj8IX2or15tb4ag=
github.com/ceph/go-ceph v0.8.0/go.mod h1:wd+keAOqrcsN//20VQnHBGtnBnY0KHl0PA024Ng8HfQ= github.com/ceph/go-ceph v0.10.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY=
github.com/cespare/prettybench v0.0.0-20150116022406-03b8cfe5406c/go.mod h1:Xe6ZsFhtM8HrDku0pxJ3/Lr51rwykrzgFwpmTzleatY= github.com/cespare/prettybench v0.0.0-20150116022406-03b8cfe5406c/go.mod h1:Xe6ZsFhtM8HrDku0pxJ3/Lr51rwykrzgFwpmTzleatY=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=

View File

@ -53,10 +53,10 @@ func (fsa *FSAdmin) CloneSubVolumeSnapshot(volume, group, subvolume, snapshot, n
} }
func checkCloneResponse(res response) error { func checkCloneResponse(res response) error {
if strings.HasSuffix(res.status, notProtectedSuffix) { if strings.HasSuffix(res.Status(), notProtectedSuffix) {
return NotProtectedError{response: res} return NotProtectedError{response: res}
} }
return res.noData().End() return res.NoData().End()
} }
// CloneState is used to define constant values used to determine the state of // CloneState is used to define constant values used to determine the state of
@ -94,7 +94,7 @@ type cloneStatusWrapper struct {
func parseCloneStatus(res response) (*CloneStatus, error) { func parseCloneStatus(res response) (*CloneStatus, error) {
var status cloneStatusWrapper var status cloneStatusWrapper
if err := res.noStatus().unmarshal(&status).End(); err != nil { if err := res.NoStatus().Unmarshal(&status).End(); err != nil {
return nil, err return nil, err
} }
return &status.Status, nil return &status.Status, nil
@ -132,5 +132,5 @@ func (fsa *FSAdmin) CancelClone(volume, group, clone string) error {
if group != NoGroup { if group != NoGroup {
m["group_name"] = group m["group_name"] = group
} }
return fsa.marshalMgrCommand(m).noData().End() return fsa.marshalMgrCommand(m).NoData().End()
} }

View File

@ -3,18 +3,16 @@
package admin package admin
import ( import (
"encoding/json"
"strconv" "strconv"
ccom "github.com/ceph/go-ceph/common/commands"
"github.com/ceph/go-ceph/internal/commands"
"github.com/ceph/go-ceph/rados" "github.com/ceph/go-ceph/rados"
) )
// RadosCommander provides an interface to execute JSON-formatted commands that // RadosCommander provides an interface to execute JSON-formatted commands that
// allow the cephfs administrative functions to interact with the Ceph cluster. // allow the cephfs administrative functions to interact with the Ceph cluster.
type RadosCommander interface { type RadosCommander = ccom.RadosCommander
MgrCommand(buf [][]byte) ([]byte, string, error)
MonCommand(buf []byte) ([]byte, string, error)
}
// FSAdmin is used to administrate CephFS within a ceph cluster. // FSAdmin is used to administrate CephFS within a ceph cluster.
type FSAdmin struct { type FSAdmin struct {
@ -60,39 +58,25 @@ func (fsa *FSAdmin) validate() error {
// rawMgrCommand takes a byte buffer and sends it to the MGR as a command. // rawMgrCommand takes a byte buffer and sends it to the MGR as a command.
// The buffer is expected to contain preformatted JSON. // The buffer is expected to contain preformatted JSON.
func (fsa *FSAdmin) rawMgrCommand(buf []byte) response { func (fsa *FSAdmin) rawMgrCommand(buf []byte) response {
if err := fsa.validate(); err != nil { return commands.RawMgrCommand(fsa.conn, buf)
return response{err: err}
}
return newResponse(fsa.conn.MgrCommand([][]byte{buf}))
} }
// marshalMgrCommand takes an generic interface{} value, converts it to JSON and // marshalMgrCommand takes an generic interface{} value, converts it to JSON and
// sends the json to the MGR as a command. // sends the json to the MGR as a command.
func (fsa *FSAdmin) marshalMgrCommand(v interface{}) response { func (fsa *FSAdmin) marshalMgrCommand(v interface{}) response {
b, err := json.Marshal(v) return commands.MarshalMgrCommand(fsa.conn, v)
if err != nil {
return response{err: err}
}
return fsa.rawMgrCommand(b)
} }
// rawMonCommand takes a byte buffer and sends it to the MON as a command. // rawMonCommand takes a byte buffer and sends it to the MON as a command.
// The buffer is expected to contain preformatted JSON. // The buffer is expected to contain preformatted JSON.
func (fsa *FSAdmin) rawMonCommand(buf []byte) response { func (fsa *FSAdmin) rawMonCommand(buf []byte) response {
if err := fsa.validate(); err != nil { return commands.RawMonCommand(fsa.conn, buf)
return response{err: err}
}
return newResponse(fsa.conn.MonCommand(buf))
} }
// marshalMonCommand takes an generic interface{} value, converts it to JSON and // marshalMonCommand takes an generic interface{} value, converts it to JSON and
// sends the json to the MGR as a command. // sends the json to the MGR as a command.
func (fsa *FSAdmin) marshalMonCommand(v interface{}) response { func (fsa *FSAdmin) marshalMonCommand(v interface{}) response {
b, err := json.Marshal(v) return commands.MarshalMonCommand(fsa.conn, v)
if err != nil {
return response{err: err}
}
return fsa.rawMonCommand(b)
} }
type listNamedResult struct { type listNamedResult struct {
@ -101,7 +85,7 @@ type listNamedResult struct {
func parseListNames(res response) ([]string, error) { func parseListNames(res response) ([]string, error) {
var r []listNamedResult var r []listNamedResult
if err := res.noStatus().unmarshal(&r).End(); err != nil { if err := res.NoStatus().Unmarshal(&r).End(); err != nil {
return nil, err return nil, err
} }
vl := make([]string, len(r)) vl := make([]string, len(r))
@ -114,10 +98,10 @@ func parseListNames(res response) ([]string, error) {
// parsePathResponse returns a cleaned up path from requests that get a path // parsePathResponse returns a cleaned up path from requests that get a path
// unless an error is encountered, then an error is returned. // unless an error is encountered, then an error is returned.
func parsePathResponse(res response) (string, error) { func parsePathResponse(res response) (string, error) {
if res2 := res.noStatus(); !res2.Ok() { if res2 := res.NoStatus(); !res2.Ok() {
return "", res.End() return "", res.End()
} }
b := res.body b := res.Body()
// if there's a trailing newline in the buffer strip it. // if there's a trailing newline in the buffer strip it.
// ceph assumes a CLI wants the output of the buffer and there's // ceph assumes a CLI wants the output of the buffer and there's
// no format=json mode available currently. // no format=json mode available currently.

View File

@ -3,140 +3,22 @@
package admin package admin
import ( import (
"encoding/json" "github.com/ceph/go-ceph/internal/commands"
"errors"
"fmt"
"strings"
) )
var ( var (
// ErrStatusNotEmpty may be returned if a call should not have a status // ErrStatusNotEmpty is an alias for commands.ErrStatusNotEmpty
// string set but one is. ErrStatusNotEmpty = commands.ErrStatusNotEmpty
ErrStatusNotEmpty = errors.New("response status not empty") // ErrBodyNotEmpty is an alias for commands.ErrBodyNotEmpty
// ErrBodyNotEmpty may be returned if a call should have an empty body but ErrBodyNotEmpty = commands.ErrBodyNotEmpty
// a body value is present.
ErrBodyNotEmpty = errors.New("response body not empty")
) )
const ( type response = commands.Response
deprecatedSuffix = "call is deprecated and will be removed in a future release"
missingPrefix = "No handler found"
einval = -22
)
type cephError interface { // NotImplementedError is an alias for commands.NotImplementedError.
ErrorCode() int type NotImplementedError = commands.NotImplementedError
}
// NotImplementedError error values will be returned in the case that an API
// call is not available in the version of Ceph that is running in the target
// cluster.
type NotImplementedError struct {
response
}
// Error implements the error interface.
func (e NotImplementedError) Error() string {
return fmt.Sprintf("API call not implemented server-side: %s", e.status)
}
// response encapsulates the data returned by ceph and supports easy processing
// pipelines.
type response struct {
body []byte
status string
err error
}
// Ok returns true if the response contains no error.
func (r response) Ok() bool {
return r.err == nil
}
// Error implements the error interface.
func (r response) Error() string {
if r.status == "" {
return r.err.Error()
}
return fmt.Sprintf("%s: %q", r.err, r.status)
}
// Unwrap returns the error this response contains.
func (r response) Unwrap() error {
return r.err
}
// Status returns the status string value.
func (r response) Status() string {
return r.status
}
// End returns an error if the response contains an error or nil, indicating
// that response is no longer needed for processing.
func (r response) End() error {
if !r.Ok() {
if ce, ok := r.err.(cephError); ok {
if ce.ErrorCode() == einval && strings.HasPrefix(r.status, missingPrefix) {
return NotImplementedError{response: r}
}
}
return r
}
return nil
}
// noStatus asserts that the input response has no status value.
func (r response) noStatus() response {
if !r.Ok() {
return r
}
if r.status != "" {
return response{r.body, r.status, ErrStatusNotEmpty}
}
return r
}
// noBody asserts that the input response has no body value.
func (r response) noBody() response {
if !r.Ok() {
return r
}
if len(r.body) != 0 {
return response{r.body, r.status, ErrBodyNotEmpty}
}
return r
}
// noData asserts that the input response has no status or body values.
func (r response) noData() response {
return r.noStatus().noBody()
}
// filterDeprecated removes deprecation warnings from the response status.
// Use it when checking the response from calls that may be deprecated in ceph
// if you want those calls to continue working if the warning is present.
func (r response) filterDeprecated() response {
if !r.Ok() {
return r
}
if strings.HasSuffix(r.status, deprecatedSuffix) {
return response{r.body, "", r.err}
}
return r
}
// unmarshal data from the response body into v.
func (r response) unmarshal(v interface{}) response {
if !r.Ok() {
return r
}
if err := json.Unmarshal(r.body, v); err != nil {
return response{body: r.body, err: err}
}
return r
}
// newResponse returns a response. // newResponse returns a response.
func newResponse(b []byte, s string, e error) response { func newResponse(b []byte, s string, e error) response {
return response{b, s, e} return commands.NewResponse(b, s, e)
} }

View File

@ -61,7 +61,7 @@ func (fsa *FSAdmin) CreateSubVolume(volume, group, name string, o *SubVolumeOpti
o = &SubVolumeOptions{} o = &SubVolumeOptions{}
} }
f := o.toFields(volume, group, name) f := o.toFields(volume, group, name)
return fsa.marshalMgrCommand(f).noData().End() return fsa.marshalMgrCommand(f).NoData().End()
} }
// ListSubVolumes returns a list of subvolumes belonging to the volume and // ListSubVolumes returns a list of subvolumes belonging to the volume and
@ -117,7 +117,7 @@ func (fsa *FSAdmin) RemoveSubVolumeWithFlags(volume, group, name string, o SubVo
if group != NoGroup { if group != NoGroup {
m["group_name"] = group m["group_name"] = group
} }
return fsa.marshalMgrCommand(mergeFlags(m, o)).noData().End() return fsa.marshalMgrCommand(mergeFlags(m, o)).NoData().End()
} }
type subVolumeResizeFields struct { type subVolumeResizeFields struct {
@ -159,7 +159,7 @@ func (fsa *FSAdmin) ResizeSubVolume(
} }
var result []*SubVolumeResizeResult var result []*SubVolumeResizeResult
res := fsa.marshalMgrCommand(f) res := fsa.marshalMgrCommand(f)
if err := res.noStatus().unmarshal(&result).End(); err != nil { if err := res.NoStatus().Unmarshal(&result).End(); err != nil {
return nil, err return nil, err
} }
return result[0], nil return result[0], nil
@ -248,7 +248,7 @@ type subVolumeInfoWrapper struct {
func parseSubVolumeInfo(res response) (*SubVolumeInfo, error) { func parseSubVolumeInfo(res response) (*SubVolumeInfo, error) {
var info subVolumeInfoWrapper var info subVolumeInfoWrapper
if err := res.noStatus().unmarshal(&info).End(); err != nil { if err := res.NoStatus().Unmarshal(&info).End(); err != nil {
return nil, err return nil, err
} }
if info.VBytesQuota != nil { if info.VBytesQuota != nil {
@ -289,7 +289,7 @@ func (fsa *FSAdmin) CreateSubVolumeSnapshot(volume, group, source, name string)
if group != NoGroup { if group != NoGroup {
m["group_name"] = group m["group_name"] = group
} }
return fsa.marshalMgrCommand(m).noData().End() return fsa.marshalMgrCommand(m).NoData().End()
} }
// RemoveSubVolumeSnapshot removes the specified snapshot from the subvolume. // RemoveSubVolumeSnapshot removes the specified snapshot from the subvolume.
@ -320,7 +320,7 @@ func (fsa *FSAdmin) rmSubVolumeSnapshot(volume, group, subvolume, name string, o
if group != NoGroup { if group != NoGroup {
m["group_name"] = group m["group_name"] = group
} }
return fsa.marshalMgrCommand(mergeFlags(m, o)).noData().End() return fsa.marshalMgrCommand(mergeFlags(m, o)).NoData().End()
} }
// ListSubVolumeSnapshots returns a listing of snapshots for a given subvolume. // ListSubVolumeSnapshots returns a listing of snapshots for a given subvolume.
@ -351,7 +351,7 @@ type SubVolumeSnapshotInfo struct {
func parseSubVolumeSnapshotInfo(res response) (*SubVolumeSnapshotInfo, error) { func parseSubVolumeSnapshotInfo(res response) (*SubVolumeSnapshotInfo, error) {
var info SubVolumeSnapshotInfo var info SubVolumeSnapshotInfo
if err := res.noStatus().unmarshal(&info).End(); err != nil { if err := res.NoStatus().Unmarshal(&info).End(); err != nil {
return nil, err return nil, err
} }
return &info, nil return &info, nil
@ -390,7 +390,7 @@ func (fsa *FSAdmin) ProtectSubVolumeSnapshot(volume, group, subvolume, name stri
if group != NoGroup { if group != NoGroup {
m["group_name"] = group m["group_name"] = group
} }
return fsa.marshalMgrCommand(m).filterDeprecated().noData().End() return fsa.marshalMgrCommand(m).FilterDeprecated().NoData().End()
} }
// UnprotectSubVolumeSnapshot removes protection from the specified snapshot. // UnprotectSubVolumeSnapshot removes protection from the specified snapshot.
@ -408,5 +408,5 @@ func (fsa *FSAdmin) UnprotectSubVolumeSnapshot(volume, group, subvolume, name st
if group != NoGroup { if group != NoGroup {
m["group_name"] = group m["group_name"] = group
} }
return fsa.marshalMgrCommand(m).filterDeprecated().noData().End() return fsa.marshalMgrCommand(m).FilterDeprecated().NoData().End()
} }

View File

@ -48,7 +48,7 @@ func (fsa *FSAdmin) CreateSubVolumeGroup(volume, name string, o *SubVolumeGroupO
o = &SubVolumeGroupOptions{} o = &SubVolumeGroupOptions{}
} }
res := fsa.marshalMgrCommand(o.toFields(volume, name)) res := fsa.marshalMgrCommand(o.toFields(volume, name))
return res.noData().End() return res.NoData().End()
} }
// ListSubVolumeGroups returns a list of subvolume groups belonging to the // ListSubVolumeGroups returns a list of subvolume groups belonging to the
@ -86,7 +86,7 @@ func (fsa *FSAdmin) rmSubVolumeGroup(volume, name string, o commonRmFlags) error
"group_name": name, "group_name": name,
"format": "json", "format": "json",
}, o)) }, o))
return res.noData().End() return res.NoData().End()
} }
// SubVolumeGroupPath returns the path to the subvolume from the root of the // SubVolumeGroupPath returns the path to the subvolume from the root of the

View File

@ -43,7 +43,7 @@ func (fsa *FSAdmin) ListFileSystems() ([]FSPoolInfo, error) {
func parseFsList(res response) ([]FSPoolInfo, error) { func parseFsList(res response) ([]FSPoolInfo, error) {
var listing []FSPoolInfo var listing []FSPoolInfo
if err := res.noStatus().unmarshal(&listing).End(); err != nil { if err := res.NoStatus().Unmarshal(&listing).End(); err != nil {
return nil, err return nil, err
} }
return listing, nil return listing, nil
@ -78,13 +78,8 @@ func parseDumpToIdents(res response) ([]VolumeIdent, error) {
if !res.Ok() { if !res.Ok() {
return nil, res.End() return nil, res.End()
} }
if len(res.status) >= dumpOkLen && res.status[:dumpOkLen] == dumpOkPrefix {
// Unhelpfully, ceph drops a status string on success responses for this
// call. this hacks around that by ignoring its typical prefix
res.status = ""
}
var dump fsDump var dump fsDump
if err := res.noStatus().unmarshal(&dump).End(); err != nil { if err := res.FilterPrefix(dumpOkPrefix).NoStatus().Unmarshal(&dump).End(); err != nil {
return nil, err return nil, err
} }
// copy the dump json into the simpler enumeration list // copy the dump json into the simpler enumeration list
@ -123,15 +118,16 @@ type VolumeStatus struct {
func parseVolumeStatus(res response) (*VolumeStatus, error) { func parseVolumeStatus(res response) (*VolumeStatus, error) {
var vs VolumeStatus var vs VolumeStatus
res = res.noStatus() res = res.NoStatus()
if !res.Ok() { if !res.Ok() {
return nil, res.End() return nil, res.End()
} }
res = res.unmarshal(&vs) res = res.Unmarshal(&vs)
if !res.Ok() { if !res.Ok() {
if bytes.HasPrefix(res.body, []byte("ceph")) { if bytes.HasPrefix(res.Body(), []byte("ceph")) {
res.status = invalidTextualResponse return nil, NotImplementedError{
return nil, NotImplementedError{response: res} Response: newResponse(res.Body(), invalidTextualResponse, res.Unwrap()),
}
} }
return nil, res.End() return nil, res.End()
} }

View File

@ -0,0 +1,7 @@
/*
Package commands provides types and utility functions that are used for
interfacing with the JSON based command infrastructure in Ceph.
The *rados.Conn type implements many of the interfaces found in this package.
*/
package commands

View File

@ -0,0 +1,20 @@
package commands
// MgrCommander in an interface for the API needed to execute JSON formatted
// commands on the ceph mgr.
type MgrCommander interface {
MgrCommand(buf [][]byte) ([]byte, string, error)
}
// MonCommander is an interface for the API needed to execute JSON formatted
// commands on the ceph mon(s).
type MonCommander interface {
MonCommand(buf []byte) ([]byte, string, error)
}
// RadosCommander provides an interface for APIs needed to execute JSON
// formatted commands on the Ceph cluster.
type RadosCommander interface {
MgrCommander
MonCommander
}

View File

@ -0,0 +1,53 @@
package commands
import (
"encoding/json"
ccom "github.com/ceph/go-ceph/common/commands"
"github.com/ceph/go-ceph/rados"
)
func validate(m interface{}) error {
if m == nil {
return rados.ErrNotConnected
}
return nil
}
// RawMgrCommand takes a byte buffer and sends it to the MGR as a command.
// The buffer is expected to contain preformatted JSON.
func RawMgrCommand(m ccom.MgrCommander, buf []byte) Response {
if err := validate(m); err != nil {
return Response{err: err}
}
return NewResponse(m.MgrCommand([][]byte{buf}))
}
// MarshalMgrCommand takes an generic interface{} value, converts it to JSON
// and sends the json to the MGR as a command.
func MarshalMgrCommand(m ccom.MgrCommander, v interface{}) Response {
b, err := json.Marshal(v)
if err != nil {
return Response{err: err}
}
return RawMgrCommand(m, b)
}
// RawMonCommand takes a byte buffer and sends it to the MON as a command.
// The buffer is expected to contain preformatted JSON.
func RawMonCommand(m ccom.MonCommander, buf []byte) Response {
if err := validate(m); err != nil {
return Response{err: err}
}
return NewResponse(m.MonCommand(buf))
}
// MarshalMonCommand takes an generic interface{} value, converts it to JSON
// and sends the json to the MGR as a command.
func MarshalMonCommand(m ccom.MonCommander, v interface{}) Response {
b, err := json.Marshal(v)
if err != nil {
return Response{err: err}
}
return RawMonCommand(m, b)
}

View File

@ -0,0 +1,163 @@
package commands
import (
"encoding/json"
"errors"
"fmt"
"strings"
)
var (
// ErrStatusNotEmpty may be returned if a call should not have a status
// string set but one is.
ErrStatusNotEmpty = errors.New("response status not empty")
// ErrBodyNotEmpty may be returned if a call should have an empty body but
// a body value is present.
ErrBodyNotEmpty = errors.New("response body not empty")
)
const (
deprecatedSuffix = "call is deprecated and will be removed in a future release"
missingPrefix = "No handler found"
einval = -22
)
type cephError interface {
ErrorCode() int
}
// NotImplementedError error values will be returned in the case that an API
// call is not available in the version of Ceph that is running in the target
// cluster.
type NotImplementedError struct {
Response
}
// Error implements the error interface.
func (e NotImplementedError) Error() string {
return fmt.Sprintf("API call not implemented server-side: %s", e.status)
}
// Response encapsulates the data returned by ceph and supports easy processing
// pipelines.
type Response struct {
body []byte
status string
err error
}
// Ok returns true if the response contains no error.
func (r Response) Ok() bool {
return r.err == nil
}
// Error implements the error interface.
func (r Response) Error() string {
if r.status == "" {
return r.err.Error()
}
return fmt.Sprintf("%s: %q", r.err, r.status)
}
// Unwrap returns the error this response contains.
func (r Response) Unwrap() error {
return r.err
}
// Status returns the status string value.
func (r Response) Status() string {
return r.status
}
// Body returns the response body as a raw byte-slice.
func (r Response) Body() []byte {
return r.body
}
// End returns an error if the response contains an error or nil, indicating
// that response is no longer needed for processing.
func (r Response) End() error {
if !r.Ok() {
if ce, ok := r.err.(cephError); ok {
if ce.ErrorCode() == einval && strings.HasPrefix(r.status, missingPrefix) {
return NotImplementedError{Response: r}
}
}
return r
}
return nil
}
// NoStatus asserts that the input response has no status value.
func (r Response) NoStatus() Response {
if !r.Ok() {
return r
}
if r.status != "" {
return Response{r.body, r.status, ErrStatusNotEmpty}
}
return r
}
// NoBody asserts that the input response has no body value.
func (r Response) NoBody() Response {
if !r.Ok() {
return r
}
if len(r.body) != 0 {
return Response{r.body, r.status, ErrBodyNotEmpty}
}
return r
}
// NoData asserts that the input response has no status or body values.
func (r Response) NoData() Response {
return r.NoStatus().NoBody()
}
// FilterPrefix sets the status value to an empty string if the status
// value contains the given prefix string.
func (r Response) FilterPrefix(p string) Response {
if !r.Ok() {
return r
}
if strings.HasPrefix(r.status, p) {
return Response{r.body, "", r.err}
}
return r
}
// FilterSuffix sets the status value to an empty string if the status
// value contains the given suffix string.
func (r Response) FilterSuffix(s string) Response {
if !r.Ok() {
return r
}
if strings.HasSuffix(r.status, s) {
return Response{r.body, "", r.err}
}
return r
}
// FilterDeprecated removes deprecation warnings from the response status.
// Use it when checking the response from calls that may be deprecated in ceph
// if you want those calls to continue working if the warning is present.
func (r Response) FilterDeprecated() Response {
return r.FilterSuffix(deprecatedSuffix)
}
// Unmarshal data from the response body into v.
func (r Response) Unmarshal(v interface{}) Response {
if !r.Ok() {
return r
}
if err := json.Unmarshal(r.body, v); err != nil {
return Response{body: r.body, err: err}
}
return r
}
// NewResponse returns a response.
func NewResponse(b []byte, s string, e error) Response {
return Response{b, s, e}
}

View File

@ -0,0 +1,53 @@
package commands
import (
"fmt"
ccom "github.com/ceph/go-ceph/common/commands"
)
// NewTraceCommander is a RadosCommander that wraps a given RadosCommander
// and when commands are executes prints debug level "traces" to the
// standard output.
func NewTraceCommander(c ccom.RadosCommander) ccom.RadosCommander {
return &tracingCommander{c}
}
// tracingCommander serves two purposes: first, it allows one to trace the
// input and output json when running the tests. It can help with actually
// debugging the tests. Second, it demonstrates the rationale for using an
// interface in FSAdmin. You can layer any sort of debugging, error injection,
// or whatnot between the FSAdmin layer and the RADOS layer.
type tracingCommander struct {
conn ccom.RadosCommander
}
func (t *tracingCommander) MgrCommand(buf [][]byte) ([]byte, string, error) {
fmt.Println("(MGR Command)")
for i := range buf {
fmt.Println("IN:", string(buf[i]))
}
r, s, err := t.conn.MgrCommand(buf)
fmt.Println("OUT(result):", string(r))
if s != "" {
fmt.Println("OUT(status):", s)
}
if err != nil {
fmt.Println("OUT(error):", err.Error())
}
return r, s, err
}
func (t *tracingCommander) MonCommand(buf []byte) ([]byte, string, error) {
fmt.Println("(MON Command)")
fmt.Println("IN:", string(buf))
r, s, err := t.conn.MonCommand(buf)
fmt.Println("OUT(result):", string(r))
if s != "" {
fmt.Println("OUT(status):", s)
}
if err != nil {
fmt.Println("OUT(error):", err.Error())
}
return r, s, err
}

View File

@ -8,12 +8,13 @@ typedef void* voidptr;
import "C" import "C"
import ( import (
"math"
"unsafe" "unsafe"
) )
const ( const (
// MaxIdx is the maximum index on 32 bit systems // MaxIdx is the maximum index on 32 bit systems
MaxIdx = 1<<31 - 1 // 2GB, max int32 value, should be safe MaxIdx = math.MaxInt32 // 2GB, max int32 value, should be safe
// PtrSize is the size of a pointer // PtrSize is the size of a pointer
PtrSize = C.sizeof_voidptr PtrSize = C.sizeof_voidptr

View File

@ -0,0 +1,76 @@
package cutil
// The following code needs some explanation:
// This creates slices on top of the C memory buffers allocated before in
// order to safely and comfortably use them as arrays. First the void pointer
// is cast to a pointer to an array of the type that will be stored in the
// array. Because the size of an array is a constant, but the real array size
// is dynamic, we just use the biggest possible index value MaxIdx, to make
// sure it's always big enough. (Nothing is allocated by casting, so the size
// can be arbitrarily big.) So, if the array should store items of myType, the
// cast would be (*[MaxIdx]myItem)(myCMemPtr).
// From that array pointer a slice is created with the [start:end:capacity]
// syntax. The capacity must be set explicitly here, because by default it
// would be set to the size of the original array, which is MaxIdx, which
// doesn't reflect reality in this case. This results in definitions like:
// cSlice := (*[MaxIdx]myItem)(myCMemPtr)[:numOfItems:numOfItems]
////////// CPtr //////////
// CPtrCSlice is a C allocated slice of C pointers.
type CPtrCSlice []CPtr
// NewCPtrCSlice returns a CPtrSlice.
// Similar to CString it must be freed with slice.Free()
func NewCPtrCSlice(size int) CPtrCSlice {
if size == 0 {
return nil
}
cMem := Malloc(SizeT(size) * PtrSize)
cSlice := (*[MaxIdx]CPtr)(cMem)[:size:size]
return cSlice
}
// Ptr returns a pointer to CPtrSlice
func (v *CPtrCSlice) Ptr() CPtr {
if len(*v) == 0 {
return nil
}
return CPtr(&(*v)[0])
}
// Free frees a CPtrSlice
func (v *CPtrCSlice) Free() {
Free(v.Ptr())
*v = nil
}
////////// SizeT //////////
// SizeTCSlice is a C allocated slice of C.size_t.
type SizeTCSlice []SizeT
// NewSizeTCSlice returns a SizeTCSlice.
// Similar to CString it must be freed with slice.Free()
func NewSizeTCSlice(size int) SizeTCSlice {
if size == 0 {
return nil
}
cMem := Malloc(SizeT(size) * SizeTSize)
cSlice := (*[MaxIdx]SizeT)(cMem)[:size:size]
return cSlice
}
// Ptr returns a pointer to SizeTCSlice
func (v *SizeTCSlice) Ptr() CPtr {
if len(*v) == 0 {
return nil
}
return CPtr(&(*v)[0])
}
// Free frees a SizeTCSlice
func (v *SizeTCSlice) Free() {
Free(v.Ptr())
*v = nil
}

View File

@ -57,6 +57,17 @@ func (v *PtrGuard) Release() {
} }
} }
// The uintptrPtr() helper function below assumes that uintptr has the same size
// as a pointer, although in theory it could be larger. Therefore we use this
// constant expression to assert size equality as a safeguard at compile time.
// How it works: the difference of both sizes is converted into an 8 bit value
// and left-bit-shifted by 8. This always creates an overflow error at compile
// time, if the difference of the sizes is not 0.
const _ = uint8(unsafe.Sizeof(uintptr(0))-PtrSize) << 8 // size assert
func uintptrPtr(p *CPtr) *uintptr {
return (*uintptr)(unsafe.Pointer(p))
}
//go:uintptrescapes //go:uintptrescapes
// From https://golang.org/src/cmd/compile/internal/gc/lex.go: // From https://golang.org/src/cmd/compile/internal/gc/lex.go:
@ -69,7 +80,7 @@ func (v *PtrGuard) Release() {
// Also see https://golang.org/cmd/compile/#hdr-Compiler_Directives // Also see https://golang.org/cmd/compile/#hdr-Compiler_Directives
func storeUntilRelease(v *PtrGuard, cPtr *CPtr, goPtr uintptr) { func storeUntilRelease(v *PtrGuard, cPtr *CPtr, goPtr uintptr) {
uip := (*uintptr)(unsafe.Pointer(cPtr)) uip := uintptrPtr(cPtr)
*uip = goPtr // store Go pointer in C memory at c_ptr *uip = goPtr // store Go pointer in C memory at c_ptr
v.stored.Unlock() // send "stored" signal to main thread -->(1) v.stored.Unlock() // send "stored" signal to main thread -->(1)
v.release.Lock() // wait for "release" signal from main thread when v.release.Lock() // wait for "release" signal from main thread when

View File

@ -658,3 +658,32 @@ func (ioctx *IOContext) GetLastVersion() (uint64, error) {
v := C.rados_get_last_version(ioctx.ioctx) v := C.rados_get_last_version(ioctx.ioctx)
return uint64(v), nil return uint64(v), nil
} }
// GetNamespace gets the namespace used for objects within this IO context.
//
// Implements:
// int rados_ioctx_get_namespace(rados_ioctx_t io, char *buf,
// unsigned maxlen);
func (ioctx *IOContext) GetNamespace() (string, error) {
if err := ioctx.validate(); err != nil {
return "", err
}
var (
err error
buf []byte
ret C.int
)
retry.WithSizes(128, 8192, func(size int) retry.Hint {
buf = make([]byte, size)
ret = C.rados_ioctx_get_namespace(
ioctx.ioctx,
(*C.char)(unsafe.Pointer(&buf[0])),
C.unsigned(len(buf)))
err = getErrorIfNegative(ret)
return retry.DoubleSize.If(err == errRange)
})
if err != nil {
return "", err
}
return string(buf[:ret]), nil
}

View File

@ -1,4 +1,4 @@
// +build !luminous,!mimic // +build nautilus
package rados package rados
@ -7,37 +7,28 @@ package rados
// //
import "C" import "C"
import ( // SetPoolFullTry makes sure to send requests to the cluster despite
"unsafe" // the cluster or pool being marked full; ops will either succeed(e.g., delete)
// or return EDQUOT or ENOSPC.
"github.com/ceph/go-ceph/internal/retry"
)
// GetNamespace gets the namespace used for objects within this IO context.
// //
// Implements: // Implements:
// int rados_ioctx_get_namespace(rados_ioctx_t io, char *buf, // void rados_set_osdmap_full_try(rados_ioctx_t io);
// unsigned maxlen); func (ioctx *IOContext) SetPoolFullTry() error {
func (ioctx *IOContext) GetNamespace() (string, error) {
if err := ioctx.validate(); err != nil { if err := ioctx.validate(); err != nil {
return "", err return err
} }
var ( C.rados_set_osdmap_full_try(ioctx.ioctx)
err error return nil
buf []byte
ret C.int
)
retry.WithSizes(128, 8192, func(size int) retry.Hint {
buf = make([]byte, size)
ret = C.rados_ioctx_get_namespace(
ioctx.ioctx,
(*C.char)(unsafe.Pointer(&buf[0])),
C.unsigned(len(buf)))
err = getErrorIfNegative(ret)
return retry.DoubleSize.If(err == errRange)
})
if err != nil {
return "", err
} }
return string(buf[:ret]), nil
// UnsetPoolFullTry unsets the flag set by SetPoolFullTry()
//
// Implements:
// void rados_unset_osdmap_full_try(rados_ioctx_t io);
func (ioctx *IOContext) UnsetPoolFullTry() error {
if err := ioctx.validate(); err != nil {
return err
}
C.rados_unset_osdmap_full_try(ioctx.ioctx)
return nil
} }

37
vendor/github.com/ceph/go-ceph/rados/ioctx_octopus.go generated vendored Normal file
View File

@ -0,0 +1,37 @@
// +build !nautilus
package rados
// #cgo LDFLAGS: -lrados
// #include <rados/librados.h>
//
import "C"
// Ceph octopus deprecates rados_set_osdmap_full_try() and implements rados_set_pool_full_try()
// Ceph octopus deprecates rados_unset_osdmap_full_try() and implements rados_unset_pool_full_try()
// SetPoolFullTry makes sure to send requests to the cluster despite
// the cluster or pool being marked full; ops will either succeed(e.g., delete)
// or return EDQUOT or ENOSPC.
//
// Implements:
// void rados_set_pool_full_try(rados_ioctx_t io);
func (ioctx *IOContext) SetPoolFullTry() error {
if err := ioctx.validate(); err != nil {
return err
}
C.rados_set_pool_full_try(ioctx.ioctx)
return nil
}
// UnsetPoolFullTry unsets the flag set by SetPoolFullTry()
//
// Implements:
// void rados_unset_pool_full_try(rados_ioctx_t io);
func (ioctx *IOContext) UnsetPoolFullTry() error {
if err := ioctx.validate(); err != nil {
return err
}
C.rados_unset_pool_full_try(ioctx.ioctx)
return nil
}

View File

@ -4,20 +4,14 @@ package rados
#cgo LDFLAGS: -lrados #cgo LDFLAGS: -lrados
#include <stdlib.h> #include <stdlib.h>
#include <rados/librados.h> #include <rados/librados.h>
typedef void* voidptr;
*/ */
import "C" import "C"
import ( import (
"runtime" "runtime"
"unsafe" "unsafe"
)
const ( "github.com/ceph/go-ceph/internal/cutil"
ptrSize = C.sizeof_voidptr
sizeTSize = C.sizeof_size_t
) )
// setOmapStep is a write op step. It holds C memory used in the operation. // setOmapStep is a write op step. It holds C memory used in the operation.
@ -26,50 +20,44 @@ type setOmapStep struct {
withoutUpdate withoutUpdate
// C arguments // C arguments
cKeys **C.char cKeys cutil.CPtrCSlice
cValues **C.char cValues cutil.CPtrCSlice
cLengths *C.size_t cLengths cutil.SizeTCSlice
cNum C.size_t cNum C.size_t
} }
func newSetOmapStep(pairs map[string][]byte) *setOmapStep { func newSetOmapStep(pairs map[string][]byte) *setOmapStep {
maplen := C.size_t(len(pairs)) maplen := len(pairs)
cKeys := C.malloc(maplen * ptrSize) cKeys := cutil.NewCPtrCSlice(maplen)
cValues := C.malloc(maplen * ptrSize) cValues := cutil.NewCPtrCSlice(maplen)
cLengths := C.malloc(maplen * sizeTSize) cLengths := cutil.NewSizeTCSlice(maplen)
sos := &setOmapStep{ sos := &setOmapStep{
cKeys: (**C.char)(cKeys), cKeys: cKeys,
cValues: (**C.char)(cValues), cValues: cValues,
cLengths: (*C.size_t)(cLengths), cLengths: cLengths,
cNum: C.size_t(len(pairs)), cNum: C.size_t(maplen),
} }
sos.add(cKeys)
sos.add(cValues)
sos.add(cLengths)
var i uintptr var i uintptr
for key, value := range pairs { for key, value := range pairs {
// key // key
ck := C.CString(key) ck := C.CString(key)
sos.add(unsafe.Pointer(ck)) sos.add(unsafe.Pointer(ck))
ckp := (**C.char)(unsafe.Pointer(uintptr(cKeys) + i*ptrSize)) cKeys[i] = cutil.CPtr(ck)
*ckp = ck
// value and its length // value and its length
cvp := (**C.char)(unsafe.Pointer(uintptr(cValues) + i*ptrSize)) vlen := cutil.SizeT(len(value))
vlen := C.size_t(len(value))
if vlen > 0 { if vlen > 0 {
cv := C.CBytes(value) cv := C.CBytes(value)
sos.add(cv) sos.add(cv)
*cvp = (*C.char)(cv) cValues[i] = cutil.CPtr(cv)
} else { } else {
*cvp = nil cValues[i] = nil
} }
clp := (*C.size_t)(unsafe.Pointer(uintptr(cLengths) + i*ptrSize)) cLengths[i] = vlen
*clp = vlen
i++ i++
} }
@ -79,9 +67,9 @@ func newSetOmapStep(pairs map[string][]byte) *setOmapStep {
} }
func (sos *setOmapStep) free() { func (sos *setOmapStep) free() {
sos.cKeys = nil sos.cKeys.Free()
sos.cValues = nil sos.cValues.Free()
sos.cLengths = nil sos.cLengths.Free()
sos.withRefs.free() sos.withRefs.free()
} }
@ -190,23 +178,21 @@ type removeOmapKeysStep struct {
withoutUpdate withoutUpdate
// arguments: // arguments:
cKeys **C.char cKeys cutil.CPtrCSlice
cNum C.size_t cNum C.size_t
} }
func newRemoveOmapKeysStep(keys []string) *removeOmapKeysStep { func newRemoveOmapKeysStep(keys []string) *removeOmapKeysStep {
cKeys := C.malloc(C.size_t(len(keys)) * ptrSize) cKeys := cutil.NewCPtrCSlice(len(keys))
roks := &removeOmapKeysStep{ roks := &removeOmapKeysStep{
cKeys: (**C.char)(cKeys), cKeys: cKeys,
cNum: C.size_t(len(keys)), cNum: C.size_t(len(keys)),
} }
roks.add(cKeys)
i := 0 i := 0
for _, key := range keys { for _, key := range keys {
ckp := (**C.char)(unsafe.Pointer(uintptr(cKeys) + uintptr(i)*ptrSize)) cKeys[i] = cutil.CPtr(C.CString(key))
*ckp = C.CString(key) roks.add(unsafe.Pointer(cKeys[i]))
roks.add(unsafe.Pointer(*ckp))
i++ i++
} }
@ -215,7 +201,7 @@ func newRemoveOmapKeysStep(keys []string) *removeOmapKeysStep {
} }
func (roks *removeOmapKeysStep) free() { func (roks *removeOmapKeysStep) free() {
roks.cKeys = nil roks.cKeys.Free()
roks.withRefs.free() roks.withRefs.free()
} }

View File

@ -96,9 +96,9 @@ func (w *WriteOp) SetOmap(pairs map[string][]byte) {
w.steps = append(w.steps, sos) w.steps = append(w.steps, sos)
C.rados_write_op_omap_set( C.rados_write_op_omap_set(
w.op, w.op,
sos.cKeys, (**C.char)(sos.cKeys.Ptr()),
sos.cValues, (**C.char)(sos.cValues.Ptr()),
sos.cLengths, (*C.size_t)(sos.cLengths.Ptr()),
sos.cNum) sos.cNum)
} }
@ -108,7 +108,7 @@ func (w *WriteOp) RmOmapKeys(keys []string) {
w.steps = append(w.steps, roks) w.steps = append(w.steps, roks)
C.rados_write_op_omap_rm_keys( C.rados_write_op_omap_rm_keys(
w.op, w.op,
roks.cKeys, (**C.char)(roks.cKeys.Ptr()),
roks.cNum) roks.cNum)
} }

View File

@ -71,6 +71,12 @@ var (
// revive:enable:exported // revive:enable:exported
) )
// Public general error
const (
// ErrNotExist indicates a non-specific missing resource.
ErrNotExist = rbdError(-C.ENOENT)
)
// Private errors: // Private errors:
const ( const (

View File

@ -13,8 +13,10 @@ package rbd
import "C" import "C"
import ( import (
"fmt"
"unsafe" "unsafe"
"github.com/ceph/go-ceph/internal/cutil"
"github.com/ceph/go-ceph/internal/retry" "github.com/ceph/go-ceph/internal/retry"
"github.com/ceph/go-ceph/rados" "github.com/ceph/go-ceph/rados"
) )
@ -31,6 +33,20 @@ const (
MirrorModePool = MirrorMode(C.RBD_MIRROR_MODE_POOL) MirrorModePool = MirrorMode(C.RBD_MIRROR_MODE_POOL)
) )
// String representation of MirrorMode.
func (m MirrorMode) String() string {
switch m {
case MirrorModeDisabled:
return "disabled"
case MirrorModeImage:
return "image"
case MirrorModePool:
return "pool"
default:
return "<unknown>"
}
}
// ImageMirrorMode is used to indicate the mirroring approach for an RBD image. // ImageMirrorMode is used to indicate the mirroring approach for an RBD image.
type ImageMirrorMode int64 type ImageMirrorMode int64
@ -43,6 +59,46 @@ const (
ImageMirrorModeSnapshot = ImageMirrorMode(C.RBD_MIRROR_IMAGE_MODE_SNAPSHOT) ImageMirrorModeSnapshot = ImageMirrorMode(C.RBD_MIRROR_IMAGE_MODE_SNAPSHOT)
) )
// String representation of ImageMirrorMode.
func (imm ImageMirrorMode) String() string {
switch imm {
case ImageMirrorModeJournal:
return "journal"
case ImageMirrorModeSnapshot:
return "snapshot"
default:
return "<unknown>"
}
}
// GetMirrorUUID returns a string naming the mirroring uuid for the pool
// associated with the ioctx.
//
// Implements:
// int rbd_mirror_uuid_get(rados_ioctx_t io_ctx,
// char *uuid, size_t *max_len);
func GetMirrorUUID(ioctx *rados.IOContext) (string, error) {
var (
err error
buf []byte
cSize C.size_t
)
retry.WithSizes(1024, 1<<16, func(size int) retry.Hint {
cSize = C.size_t(size)
buf = make([]byte, cSize)
ret := C.rbd_mirror_uuid_get(
cephIoctx(ioctx),
(*C.char)(unsafe.Pointer(&buf[0])),
&cSize)
err = getErrorIfNegative(ret)
return retry.Size(int(cSize)).If(err == errRange)
})
if err != nil {
return "", err
}
return string(buf[:cSize]), nil
}
// SetMirrorMode is used to enable or disable pool level mirroring with either // SetMirrorMode is used to enable or disable pool level mirroring with either
// an automatic or per-image behavior. // an automatic or per-image behavior.
// //
@ -181,6 +237,20 @@ const (
MirrorImageDisabled = MirrorImageState(C.RBD_MIRROR_IMAGE_DISABLED) MirrorImageDisabled = MirrorImageState(C.RBD_MIRROR_IMAGE_DISABLED)
) )
// String representation of MirrorImageState.
func (mis MirrorImageState) String() string {
switch mis {
case MirrorImageDisabling:
return "disabling"
case MirrorImageEnabled:
return "enabled"
case MirrorImageDisabled:
return "disabled"
default:
return "<unknown>"
}
}
// MirrorImageInfo represents the mirroring status information of a RBD image. // MirrorImageInfo represents the mirroring status information of a RBD image.
type MirrorImageInfo struct { type MirrorImageInfo struct {
GlobalID string GlobalID string
@ -188,6 +258,14 @@ type MirrorImageInfo struct {
Primary bool Primary bool
} }
func convertMirrorImageInfo(cInfo *C.rbd_mirror_image_info_t) MirrorImageInfo {
return MirrorImageInfo{
GlobalID: C.GoString(cInfo.global_id),
State: MirrorImageState(cInfo.state),
Primary: bool(cInfo.primary),
}
}
// GetMirrorImageInfo fetches the mirroring status information of a RBD image. // GetMirrorImageInfo fetches the mirroring status information of a RBD image.
// //
// Implements: // Implements:
@ -209,11 +287,7 @@ func (image *Image) GetMirrorImageInfo() (*MirrorImageInfo, error) {
return nil, getError(ret) return nil, getError(ret)
} }
mii := MirrorImageInfo{ mii := convertMirrorImageInfo(&cInfo)
GlobalID: C.GoString(cInfo.global_id),
State: MirrorImageState(cInfo.state),
Primary: bool(cInfo.primary),
}
// free C memory allocated by C.rbd_mirror_image_get_info call // free C memory allocated by C.rbd_mirror_image_get_info call
C.rbd_mirror_image_get_info_cleanup(&cInfo) C.rbd_mirror_image_get_info_cleanup(&cInfo)
@ -233,3 +307,418 @@ func (image *Image) GetImageMirrorMode() (ImageMirrorMode, error) {
ret := C.rbd_mirror_image_get_mode(image.image, &mode) ret := C.rbd_mirror_image_get_mode(image.image, &mode)
return ImageMirrorMode(mode), getError(ret) return ImageMirrorMode(mode), getError(ret)
} }
// MirrorImageStatusState is used to indicate the state of a mirrored image
// within the site status info.
type MirrorImageStatusState int64
const (
// MirrorImageStatusStateUnknown is equivalent to MIRROR_IMAGE_STATUS_STATE_UNKNOWN
MirrorImageStatusStateUnknown = MirrorImageStatusState(C.MIRROR_IMAGE_STATUS_STATE_UNKNOWN)
// MirrorImageStatusStateError is equivalent to MIRROR_IMAGE_STATUS_STATE_ERROR
MirrorImageStatusStateError = MirrorImageStatusState(C.MIRROR_IMAGE_STATUS_STATE_ERROR)
// MirrorImageStatusStateSyncing is equivalent to MIRROR_IMAGE_STATUS_STATE_SYNCING
MirrorImageStatusStateSyncing = MirrorImageStatusState(C.MIRROR_IMAGE_STATUS_STATE_SYNCING)
// MirrorImageStatusStateStartingReplay is equivalent to MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY
MirrorImageStatusStateStartingReplay = MirrorImageStatusState(C.MIRROR_IMAGE_STATUS_STATE_STARTING_REPLAY)
// MirrorImageStatusStateReplaying is equivalent to MIRROR_IMAGE_STATUS_STATE_REPLAYING
MirrorImageStatusStateReplaying = MirrorImageStatusState(C.MIRROR_IMAGE_STATUS_STATE_REPLAYING)
// MirrorImageStatusStateStoppingReplay is equivalent to MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY
MirrorImageStatusStateStoppingReplay = MirrorImageStatusState(C.MIRROR_IMAGE_STATUS_STATE_STOPPING_REPLAY)
// MirrorImageStatusStateStopped is equivalent to MIRROR_IMAGE_STATUS_STATE_STOPPED
MirrorImageStatusStateStopped = MirrorImageStatusState(C.MIRROR_IMAGE_STATUS_STATE_STOPPED)
)
// String represents the MirrorImageStatusState as a short string.
func (state MirrorImageStatusState) String() (s string) {
switch state {
case MirrorImageStatusStateUnknown:
s = "unknown"
case MirrorImageStatusStateError:
s = "error"
case MirrorImageStatusStateSyncing:
s = "syncing"
case MirrorImageStatusStateStartingReplay:
s = "starting_replay"
case MirrorImageStatusStateReplaying:
s = "replaying"
case MirrorImageStatusStateStoppingReplay:
s = "stopping_replay"
case MirrorImageStatusStateStopped:
s = "stopped"
default:
s = fmt.Sprintf("unknown(%d)", state)
}
return s
}
// SiteMirrorImageStatus contains information pertaining to the status of
// a mirrored image within a site.
type SiteMirrorImageStatus struct {
MirrorUUID string
State MirrorImageStatusState
Description string
LastUpdate int64
Up bool
}
// GlobalMirrorImageStatus contains information pertaining to the global
// status of a mirrored image. It contains general information as well
// as per-site information stored in the SiteStatuses slice.
type GlobalMirrorImageStatus struct {
Name string
Info MirrorImageInfo
SiteStatuses []SiteMirrorImageStatus
}
// LocalStatus returns one SiteMirrorImageStatus item from the SiteStatuses
// slice that corresponds to the local site's status. If the local status
// is not found than the error ErrNotExist will be returned.
func (gmis GlobalMirrorImageStatus) LocalStatus() (SiteMirrorImageStatus, error) {
var (
ss SiteMirrorImageStatus
err error = ErrNotExist
)
for i := range gmis.SiteStatuses {
// I couldn't find it explicitly documented, but a site mirror uuid
// of an empty string indicates that this is the local site.
// This pattern occurs in both the pybind code and ceph c++.
if gmis.SiteStatuses[i].MirrorUUID == "" {
ss = gmis.SiteStatuses[i]
err = nil
break
}
}
return ss, err
}
type siteArray [cutil.MaxIdx]C.rbd_mirror_image_site_status_t
// GetGlobalMirrorStatus returns status information pertaining to the state
// of the images's mirroring.
//
// Implements:
// int rbd_mirror_image_get_global_status(
// rbd_image_t image,
// rbd_mirror_image_global_status_t *mirror_image_global_status,
// size_t status_size);
func (image *Image) GetGlobalMirrorStatus() (GlobalMirrorImageStatus, error) {
if err := image.validate(imageIsOpen); err != nil {
return GlobalMirrorImageStatus{}, err
}
s := C.rbd_mirror_image_global_status_t{}
ret := C.rbd_mirror_image_get_global_status(
image.image,
&s,
C.sizeof_rbd_mirror_image_global_status_t)
if err := getError(ret); err != nil {
return GlobalMirrorImageStatus{}, err
}
defer C.rbd_mirror_image_global_status_cleanup(&s)
status := newGlobalMirrorImageStatus(&s)
return status, nil
}
func newGlobalMirrorImageStatus(
s *C.rbd_mirror_image_global_status_t) GlobalMirrorImageStatus {
status := GlobalMirrorImageStatus{
Name: C.GoString(s.name),
Info: convertMirrorImageInfo(&s.info),
SiteStatuses: make([]SiteMirrorImageStatus, s.site_statuses_count),
}
// use the "Sven Technique" to treat the C pointer as a go slice temporarily
sscs := (*siteArray)(unsafe.Pointer(s.site_statuses))[:s.site_statuses_count:s.site_statuses_count]
for i := C.uint32_t(0); i < s.site_statuses_count; i++ {
ss := sscs[i]
status.SiteStatuses[i] = SiteMirrorImageStatus{
MirrorUUID: C.GoString(ss.mirror_uuid),
State: MirrorImageStatusState(ss.state),
Description: C.GoString(ss.description),
LastUpdate: int64(ss.last_update),
Up: bool(ss.up),
}
}
return status
}
// CreateMirrorSnapshot creates a snapshot for image propagation to mirrors.
//
// Implements:
// int rbd_mirror_image_create_snapshot(rbd_image_t image,
// uint64_t *snap_id);
func (image *Image) CreateMirrorSnapshot() (uint64, error) {
var snapID C.uint64_t
ret := C.rbd_mirror_image_create_snapshot(
image.image,
&snapID)
return uint64(snapID), getError(ret)
}
// MirrorImageStatusSummary returns a map of images statuses and the count
// of images with said status.
//
// Implements:
// int rbd_mirror_image_status_summary(
// rados_ioctx_t io_ctx, rbd_mirror_image_status_state_t *states, int *counts,
// size_t *maxlen);
func MirrorImageStatusSummary(
ioctx *rados.IOContext) (map[MirrorImageStatusState]uint, error) {
// ideally, we already know the size of the arrays - they should be
// the size of all the values of the rbd_mirror_image_status_state_t
// enum. But the C api doesn't enforce this so we give a little
// wiggle room in case the server returns values outside the enum
// we know about. This is the only case I can think of that we'd
// be able to get -ERANGE.
var (
cioctx = cephIoctx(ioctx)
err error
cStates []C.rbd_mirror_image_status_state_t
cCounts []C.int
cSize C.size_t
)
retry.WithSizes(16, 1<<16, func(size int) retry.Hint {
cSize = C.size_t(size)
cStates = make([]C.rbd_mirror_image_status_state_t, cSize)
cCounts = make([]C.int, cSize)
ret := C.rbd_mirror_image_status_summary(
cioctx,
(*C.rbd_mirror_image_status_state_t)(&cStates[0]),
(*C.int)(&cCounts[0]),
&cSize)
err = getErrorIfNegative(ret)
return retry.Size(int(cSize)).If(err == errRange)
})
if err != nil {
return nil, err
}
m := map[MirrorImageStatusState]uint{}
for i := 0; i < int(cSize); i++ {
s := MirrorImageStatusState(cStates[i])
m[s] = uint(cCounts[i])
}
return m, nil
}
// SetMirrorSiteName sets the site name, used for rbd mirroring, for the ceph
// cluster associated with the provided rados connection.
//
// Implements:
// int rbd_mirror_site_name_set(rados_t cluster,
// const char *name);
func SetMirrorSiteName(conn *rados.Conn, name string) error {
cName := C.CString(name)
defer C.free(unsafe.Pointer(cName))
ret := C.rbd_mirror_site_name_set(
C.rados_t(conn.Cluster()),
cName)
return getError(ret)
}
// GetMirrorSiteName gets the site name, used for rbd mirroring, for the ceph
// cluster associated with the provided rados connection.
//
// Implements:
// int rbd_mirror_site_name_get(rados_t cluster,
// char *name, size_t *max_len);
func GetMirrorSiteName(conn *rados.Conn) (string, error) {
var (
cluster = C.rados_t(conn.Cluster())
err error
buf []byte
cSize C.size_t
)
retry.WithSizes(1024, 1<<16, func(size int) retry.Hint {
cSize = C.size_t(size)
buf = make([]byte, cSize)
ret := C.rbd_mirror_site_name_get(
cluster,
(*C.char)(unsafe.Pointer(&buf[0])),
&cSize)
err = getErrorIfNegative(ret)
return retry.Size(int(cSize)).If(err == errRange)
})
if err != nil {
return "", err
}
// the C code sets the size including null byte
return string(buf[:cSize-1]), nil
}
// CreateMirrorPeerBootstrapToken returns a token value, representing the
// cluster and pool associated with the given IO context, that can be provided
// to ImportMirrorPeerBootstrapToken in order to set up mirroring between
// pools.
//
// Implements:
// int rbd_mirror_peer_bootstrap_create(
// rados_ioctx_t io_ctx, char *token, size_t *max_len);
func CreateMirrorPeerBootstrapToken(ioctx *rados.IOContext) (string, error) {
var (
cioctx = cephIoctx(ioctx)
err error
buf []byte
cSize C.size_t
)
retry.WithSizes(1024, 1<<16, func(size int) retry.Hint {
cSize = C.size_t(size)
buf = make([]byte, cSize)
ret := C.rbd_mirror_peer_bootstrap_create(
cioctx,
(*C.char)(unsafe.Pointer(&buf[0])),
&cSize)
err = getErrorIfNegative(ret)
return retry.Size(int(cSize)).If(err == errRange)
})
if err != nil {
return "", err
}
// the C code sets the size including null byte
return string(buf[:cSize-1]), nil
}
// MirrorPeerDirection is used to indicate what direction data is mirrored.
type MirrorPeerDirection int
const (
// MirrorPeerDirectionRx is equivalent to RBD_MIRROR_PEER_DIRECTION_RX
MirrorPeerDirectionRx = MirrorPeerDirection(C.RBD_MIRROR_PEER_DIRECTION_RX)
// MirrorPeerDirectionTx is equivalent to RBD_MIRROR_PEER_DIRECTION_TX
MirrorPeerDirectionTx = MirrorPeerDirection(C.RBD_MIRROR_PEER_DIRECTION_TX)
// MirrorPeerDirectionRxTx is equivalent to RBD_MIRROR_PEER_DIRECTION_RX_TX
MirrorPeerDirectionRxTx = MirrorPeerDirection(C.RBD_MIRROR_PEER_DIRECTION_RX_TX)
)
// ImportMirrorPeerBootstrapToken applies the provided bootstrap token to the
// pool associated with the IO context to create a mirroring relationship
// between pools. The direction parameter controls if data in the pool is a
// source, destination, or both.
//
// Implements:
// int rbd_mirror_peer_bootstrap_import(
// rados_ioctx_t io_ctx, rbd_mirror_peer_direction_t direction,
// const char *token);
func ImportMirrorPeerBootstrapToken(
ioctx *rados.IOContext, direction MirrorPeerDirection, token string) error {
// instead of taking a length, rbd_mirror_peer_bootstrap_import assumes a
// null terminated "c string". We don't use CString because we don't use
// Go's string type as we don't want to treat the token as something users
// should interpret. If we were doing CString we'd be doing a copy anyway.
cToken := C.CString(token)
defer C.free(unsafe.Pointer(cToken))
ret := C.rbd_mirror_peer_bootstrap_import(
cephIoctx(ioctx),
C.rbd_mirror_peer_direction_t(direction),
cToken)
return getError(ret)
}
// GlobalMirrorImageIDAndStatus values contain an ID string for a RBD image
// and that image's GlobalMirrorImageStatus.
type GlobalMirrorImageIDAndStatus struct {
ID string
Status GlobalMirrorImageStatus
}
func mirrorImageGlobalStatusList(
ioctx *rados.IOContext, start string,
results []GlobalMirrorImageIDAndStatus) (int, error) {
// this C function is treated like a "batch" iterator. Based on it's
// design it appears expected to call it multiple times to get
// the entire result.
cStart := C.CString(start)
defer C.free(unsafe.Pointer(cStart))
var (
max = C.size_t(len(results))
length = C.size_t(0)
ids = make([]*C.char, len(results))
images = make([]C.rbd_mirror_image_global_status_t, len(results))
)
ret := C.rbd_mirror_image_global_status_list(
cephIoctx(ioctx),
cStart,
max,
(**C.char)(unsafe.Pointer(&ids[0])),
(*C.rbd_mirror_image_global_status_t)(unsafe.Pointer(&images[0])),
&length)
for i := 0; i < int(length); i++ {
results[i].ID = C.GoString(ids[i])
results[i].Status = newGlobalMirrorImageStatus(&images[0])
}
C.rbd_mirror_image_global_status_list_cleanup(
(**C.char)(unsafe.Pointer(&ids[0])),
(*C.rbd_mirror_image_global_status_t)(unsafe.Pointer(&images[0])),
length)
return int(length), getError(ret)
}
// statusIterBufSize is intentionally not a constant. The unit tests alter
// this value in order to get more code coverage w/o needing to create
// very many images.
var statusIterBufSize = 64
// MirrorImageGlobalStatusIter provide methods for iterating over all
// the GlobalMirrorImageIdAndStatus values in a pool.
type MirrorImageGlobalStatusIter struct {
ioctx *rados.IOContext
buf []GlobalMirrorImageIDAndStatus
lastID string
}
// NewMirrorImageGlobalStatusIter creates a new iterator type ready for use.
func NewMirrorImageGlobalStatusIter(ioctx *rados.IOContext) *MirrorImageGlobalStatusIter {
return &MirrorImageGlobalStatusIter{
ioctx: ioctx,
}
}
// Next fetches one GlobalMirrorImageIDAndStatus value or a nil value if
// iteration is exhausted. The error return will be non-nil if an underlying
// error fetching more values occurred.
func (iter *MirrorImageGlobalStatusIter) Next() (*GlobalMirrorImageIDAndStatus, error) {
if len(iter.buf) == 0 {
if err := iter.fetch(); err != nil {
return nil, err
}
}
if len(iter.buf) == 0 {
return nil, nil
}
item := iter.buf[0]
iter.lastID = item.ID
iter.buf = iter.buf[1:]
return &item, nil
}
// Close terminates iteration regardless if iteration was completed and
// frees any associated resources.
func (iter *MirrorImageGlobalStatusIter) Close() error {
iter.buf = nil
iter.lastID = ""
return nil
}
func (iter *MirrorImageGlobalStatusIter) fetch() error {
iter.buf = nil
items := make([]GlobalMirrorImageIDAndStatus, statusIterBufSize)
n, err := mirrorImageGlobalStatusList(
iter.ioctx,
iter.lastID,
items)
if err != nil {
return err
}
if n > 0 {
iter.buf = items[:n]
}
return nil
}

View File

@ -531,6 +531,33 @@ func (image *Image) Copy2(dest *Image) error {
return getError(C.rbd_copy2(image.image, dest.image)) return getError(C.rbd_copy2(image.image, dest.image))
} }
// DeepCopy an rbd image to a new image with specific options.
//
// Implements:
// int rbd_deep_copy(rbd_image_t src, rados_ioctx_t dest_io_ctx,
// const char *destname, rbd_image_options_t dest_opts);
func (image *Image) DeepCopy(ioctx *rados.IOContext, destname string, rio *ImageOptions) error {
if err := image.validate(imageIsOpen); err != nil {
return err
}
if ioctx == nil {
return ErrNoIOContext
}
if destname == "" {
return ErrNoName
}
if rio == nil {
return rbdError(C.EINVAL)
}
cDestname := C.CString(destname)
defer C.free(unsafe.Pointer(cDestname))
ret := C.rbd_deep_copy(image.image, cephIoctx(ioctx), cDestname,
C.rbd_image_options_t(rio.options))
return getError(ret)
}
// Flatten removes snapshot references from the image. // Flatten removes snapshot references from the image.
// //
// Implements: // Implements:
@ -831,7 +858,7 @@ func (image *Image) WriteSame(ofs, n uint64, data []byte, flags rados.OpFlags) (
ret := C.rbd_writesame(image.image, ret := C.rbd_writesame(image.image,
C.uint64_t(ofs), C.uint64_t(ofs),
C.uint64_t(n), C.size_t(n),
(*C.char)(unsafe.Pointer(&data[0])), (*C.char)(unsafe.Pointer(&data[0])),
C.size_t(len(data)), C.size_t(len(data)),
C.int(flags)) C.int(flags))
@ -916,6 +943,22 @@ func (image *Image) GetId() (string, error) {
} }
// SetSnapshot updates the rbd image (not the Snapshot) such that the snapshot
// is the source of readable data.
//
// Implements:
// int rbd_snap_set(rbd_image_t image, const char *snapname);
func (image *Image) SetSnapshot(snapname string) error {
if err := image.validate(imageIsOpen); err != nil {
return err
}
c_snapname := C.CString(snapname)
defer C.free(unsafe.Pointer(c_snapname))
return getError(C.rbd_snap_set(image.image, c_snapname))
}
// GetTrashList returns a slice of TrashInfo structs, containing information about all RBD images // GetTrashList returns a slice of TrashInfo structs, containing information about all RBD images
// currently residing in the trash. // currently residing in the trash.
func GetTrashList(ioctx *rados.IOContext) ([]TrashInfo, error) { func GetTrashList(ioctx *rados.IOContext) ([]TrashInfo, error) {

View File

@ -149,6 +149,7 @@ func (snapshot *Snapshot) IsProtected() (bool, error) {
// Set updates the rbd image (not the Snapshot) such that the snapshot // Set updates the rbd image (not the Snapshot) such that the snapshot
// is the source of readable data. // is the source of readable data.
// This method is deprecated. Refer the SetSnapshot method of the Image type instead.
// //
// Implements: // Implements:
// int rbd_snap_set(rbd_image_t image, const char *snapname); // int rbd_snap_set(rbd_image_t image, const char *snapname);
@ -157,10 +158,7 @@ func (snapshot *Snapshot) Set() error {
return err return err
} }
c_snapname := C.CString(snapshot.name) return snapshot.image.SetSnapshot(snapshot.name)
defer C.free(unsafe.Pointer(c_snapname))
return getError(C.rbd_snap_set(snapshot.image.image, c_snapname))
} }
// GetSnapTimestamp returns the timestamp of a snapshot for an image. // GetSnapTimestamp returns the timestamp of a snapshot for an image.

View File

@ -1,4 +1,4 @@
// +build octopus // +build !nautilus
package rbd package rbd

4
vendor/modules.txt vendored
View File

@ -47,10 +47,12 @@ github.com/aws/aws-sdk-go/service/sts/stsiface
github.com/beorn7/perks/quantile github.com/beorn7/perks/quantile
# github.com/blang/semver v3.5.1+incompatible # github.com/blang/semver v3.5.1+incompatible
github.com/blang/semver github.com/blang/semver
# github.com/ceph/go-ceph v0.8.0 # github.com/ceph/go-ceph v0.10.0
## explicit ## explicit
github.com/ceph/go-ceph/cephfs/admin github.com/ceph/go-ceph/cephfs/admin
github.com/ceph/go-ceph/common/commands
github.com/ceph/go-ceph/internal/callbacks github.com/ceph/go-ceph/internal/callbacks
github.com/ceph/go-ceph/internal/commands
github.com/ceph/go-ceph/internal/cutil github.com/ceph/go-ceph/internal/cutil
github.com/ceph/go-ceph/internal/errutil github.com/ceph/go-ceph/internal/errutil
github.com/ceph/go-ceph/internal/retry github.com/ceph/go-ceph/internal/retry