Merge pull request #91 from ceph/devel

sync devel branch with upstream
This commit is contained in:
OpenShift Merge Robot 2022-04-14 04:36:51 -04:00 committed by GitHub
commit b74a6c082a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 811 additions and 504 deletions

View File

@ -48,7 +48,8 @@ GO_PROJECT=github.com/ceph/ceph-csi
CEPH_VERSION ?= $(shell . $(CURDIR)/build.env ; echo $${CEPH_VERSION})
# TODO: ceph_preview tag may be removed with go-ceph 0.16.0
GO_TAGS_LIST ?= $(CEPH_VERSION) ceph_preview
# TODO: ceph_ci_untested is added for NFS-export management (go-ceph#655)
GO_TAGS_LIST ?= $(CEPH_VERSION) ceph_preview ceph_ci_untested
# go build flags
LDFLAGS ?=

View File

@ -41,3 +41,5 @@ users:
- "system:serviceaccount:{{ .Namespace }}:{{ .Prefix }}csi-cephfs-plugin-sa"
# yamllint disable-line rule:line-length
- "system:serviceaccount:{{ .Namespace }}:{{ .Prefix }}csi-cephfs-provisioner-sa"
- "system:serviceaccount:{{ .Namespace }}:{{ .Prefix }}csi-nfs-plugin-sa"
- "system:serviceaccount:{{ .Namespace }}:{{ .Prefix }}csi-nfs-provisioner-sa"

View File

@ -48,3 +48,5 @@ users:
- "system:serviceaccount:ceph-csi:csi-cephfs-plugin-sa"
# yamllint disable-line rule:line-length
- "system:serviceaccount:ceph-csi:csi-cephfs-provisioner-sa"
- "system:serviceaccount:ceph-csi:csi-nfs-plugin-sa"
- "system:serviceaccount:ceph-csi:csi-nfs-provisioner-sa"

View File

@ -101,6 +101,8 @@ are available while running tests:
| kubeconfig | Path to kubeconfig containing embedded authinfo (default: $HOME/.kube/config) |
| timeout | Panic test binary after duration d (default 0, timeout disabled) |
| v | Verbose: print additional output |
| filesystem | Name of the CephFS filesystem (default: "myfs") |
| is-openshift | Run in OpenShift compatibility mode, skips certain new feature tests |
## E2E for snapshot

View File

@ -40,7 +40,7 @@ const (
// validateSubvolumegroup validates whether subvolumegroup is present.
func validateSubvolumegroup(f *framework.Framework, subvolgrp string) error {
cmd := fmt.Sprintf("ceph fs subvolumegroup getpath myfs %s", subvolgrp)
cmd := fmt.Sprintf("ceph fs subvolumegroup getpath %s %s", fileSystemName, subvolgrp)
stdOut, stdErr, err := execCommandInToolBoxPod(f, cmd, rookNamespace)
if err != nil {
return fmt.Errorf("failed to exec command in toolbox: %w", err)
@ -67,7 +67,7 @@ func createCephfsStorageClass(
if err != nil {
return err
}
sc.Parameters["fsName"] = "myfs"
sc.Parameters["fsName"] = fileSystemName
sc.Parameters["csi.storage.k8s.io/provisioner-secret-namespace"] = cephCSINamespace
sc.Parameters["csi.storage.k8s.io/provisioner-secret-name"] = cephFSProvisionerSecretName

View File

@ -47,7 +47,7 @@ func createConfigMap(pluginPath string, c kubernetes.Interface, f *framework.Fra
fsID, stdErr, err := execCommandInToolBoxPod(f, "ceph fsid", rookNamespace)
if err != nil {
return err
return fmt.Errorf("failed to exec command in toolbox: %w", err)
}
if stdErr != "" {
return fmt.Errorf("error getting fsid %v", stdErr)

View File

@ -43,6 +43,8 @@ func init() {
flag.StringVar(&upgradeVersion, "upgrade-version", "v3.5.1", "target version for upgrade testing")
flag.StringVar(&cephCSINamespace, "cephcsi-namespace", defaultNs, "namespace in which cephcsi deployed")
flag.StringVar(&rookNamespace, "rook-namespace", "rook-ceph", "namespace in which rook is deployed")
flag.StringVar(&fileSystemName, "filesystem", "myfs", "CephFS filesystem to use")
flag.BoolVar(&isOpenShift, "is-openshift", false, "disables certain checks on OpenShift")
setDefaultKubeconfig()
// Register framework flags, then handle flags

View File

@ -525,7 +525,8 @@ var _ = Describe("RBD", func() {
e2elog.Logf("pv name is empty %q in namespace %q: %v", pvc.Name, pvc.Namespace, err)
}
patchBytes := []byte(`{"spec":{"persistentVolumeReclaimPolicy": "Retain", "claimRef": null}}`)
// patch PV to Retain it after deleting the PVC.
patchBytes := []byte(`{"spec":{"persistentVolumeReclaimPolicy": "Retain"}}`)
_, err = c.CoreV1().PersistentVolumes().Patch(
context.TODO(),
pvcObj.Spec.VolumeName,
@ -533,7 +534,7 @@ var _ = Describe("RBD", func() {
patchBytes,
metav1.PatchOptions{})
if err != nil {
e2elog.Logf("error Patching PV %q for persistentVolumeReclaimPolicy and claimRef: %v",
e2elog.Logf("error Patching PV %q for persistentVolumeReclaimPolicy: %v",
pvcObj.Spec.VolumeName, err)
}
@ -545,6 +546,19 @@ var _ = Describe("RBD", func() {
e2elog.Logf("failed to delete pvc: %w", err)
}
// Remove the claimRef to bind this PV to a new PVC.
patchBytes = []byte(`{"spec":{"claimRef": null}}`)
_, err = c.CoreV1().PersistentVolumes().Patch(
context.TODO(),
pvcObj.Spec.VolumeName,
types.StrategicMergePatchType,
patchBytes,
metav1.PatchOptions{})
if err != nil {
e2elog.Logf("error Patching PV %q for claimRef: %v",
pvcObj.Spec.VolumeName, err)
}
// validate created backend rbd images
validateRBDImageCount(f, 1, defaultRBDPool)

View File

@ -331,7 +331,6 @@ func validateCephFsStaticPV(f *framework.Framework, appPath, scPath string) erro
var (
cephFsVolName = "testSubVol"
groupName = "testGroup"
fsName = "myfs"
pvName = "pv-name"
pvcName = "pvc-name"
namespace = f.UniqueName
@ -361,7 +360,7 @@ func validateCephFsStaticPV(f *framework.Framework, appPath, scPath string) erro
size := "4294967296"
// create subvolumegroup, command will work even if group is already present.
cmd := fmt.Sprintf("ceph fs subvolumegroup create %s %s", fsName, groupName)
cmd := fmt.Sprintf("ceph fs subvolumegroup create %s %s", fileSystemName, groupName)
_, e, err = execCommandInPod(f, cmd, rookNamespace, &listOpt)
if err != nil {
@ -372,7 +371,7 @@ func validateCephFsStaticPV(f *framework.Framework, appPath, scPath string) erro
}
// create subvolume
cmd = fmt.Sprintf("ceph fs subvolume create %s %s %s --size %s", fsName, cephFsVolName, groupName, size)
cmd = fmt.Sprintf("ceph fs subvolume create %s %s %s --size %s", fileSystemName, cephFsVolName, groupName, size)
_, e, err = execCommandInPod(f, cmd, rookNamespace, &listOpt)
if err != nil {
return err
@ -382,7 +381,7 @@ func validateCephFsStaticPV(f *framework.Framework, appPath, scPath string) erro
}
// get rootpath
cmd = fmt.Sprintf("ceph fs subvolume getpath %s %s %s", fsName, cephFsVolName, groupName)
cmd = fmt.Sprintf("ceph fs subvolume getpath %s %s %s", fileSystemName, cephFsVolName, groupName)
rootPath, e, err := execCommandInPod(f, cmd, rookNamespace, &listOpt)
if err != nil {
return err
@ -415,7 +414,7 @@ func validateCephFsStaticPV(f *framework.Framework, appPath, scPath string) erro
}
opt["clusterID"] = fsID
opt["fsName"] = fsName
opt["fsName"] = fileSystemName
opt["staticVolume"] = strconv.FormatBool(true)
opt["rootPath"] = rootPath
pv := getStaticPV(
@ -474,7 +473,7 @@ func validateCephFsStaticPV(f *framework.Framework, appPath, scPath string) erro
}
// delete subvolume
cmd = fmt.Sprintf("ceph fs subvolume rm %s %s %s", fsName, cephFsVolName, groupName)
cmd = fmt.Sprintf("ceph fs subvolume rm %s %s %s", fileSystemName, cephFsVolName, groupName)
_, e, err = execCommandInPod(f, cmd, rookNamespace, &listOpt)
if err != nil {
return err
@ -484,7 +483,7 @@ func validateCephFsStaticPV(f *framework.Framework, appPath, scPath string) erro
}
// delete subvolume group
cmd = fmt.Sprintf("ceph fs subvolumegroup rm %s %s", fsName, groupName)
cmd = fmt.Sprintf("ceph fs subvolumegroup rm %s %s", fileSystemName, groupName)
_, e, err = execCommandInPod(f, cmd, rookNamespace, &listOpt)
if err != nil {
return err

View File

@ -230,9 +230,6 @@ var _ = Describe("CephFS Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to calculate checksum: %v", err)
}
// pvc clone is only supported from v1.16+
if k8sVersionGreaterEquals(f.ClientSet, 1, 17) {
// Create snapshot of the pvc
snapshotPath := cephFSExamplePath + "snapshot.yaml"
snap := getSnapshot(snapshotPath)
@ -243,7 +240,6 @@ var _ = Describe("CephFS Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to create snapshot %v", err)
}
}
err = deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout)
if err != nil {
e2elog.Failf("failed to delete application: %v", err)
@ -280,9 +276,6 @@ var _ = Describe("CephFS Upgrade Testing", func() {
pvcClonePath := cephFSExamplePath + "pvc-restore.yaml"
appClonePath := cephFSExamplePath + "pod-restore.yaml"
label := make(map[string]string)
// pvc clone is only supported from v1.16+
if k8sVersionGreaterEquals(f.ClientSet, 1, 17) {
pvcClone, err = loadPVC(pvcClonePath)
if err != nil {
e2elog.Failf("failed to load pvc: %v", err)
@ -310,7 +303,6 @@ var _ = Describe("CephFS Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to calculate checksum: %v", err)
}
if strings.Compare(newCheckSum, checkSum) != 0 {
e2elog.Failf(
"The checksum of files did not match, expected %s received %s ",
@ -335,8 +327,6 @@ var _ = Describe("CephFS Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to delete snapshot %v", err)
}
}
})
By("Create clone from existing PVC", func() {
@ -344,8 +334,6 @@ var _ = Describe("CephFS Upgrade Testing", func() {
appSmartClonePath := cephFSExamplePath + "pod-clone.yaml"
label := make(map[string]string)
// pvc clone is only supported from v1.16+
if k8sVersionGreaterEquals(f.ClientSet, 1, 16) {
pvcClone, err = loadPVC(pvcSmartClonePath)
if err != nil {
e2elog.Failf("failed to load pvc: %v", err)
@ -388,16 +376,12 @@ var _ = Describe("CephFS Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to delete pvc and application: %v", err)
}
}
})
By("Resize pvc and verify expansion", func() {
pvcExpandSize := "5Gi"
label := make(map[string]string)
// Resize 0.3.0 is only supported from v1.15+
if k8sVersionGreaterEquals(f.ClientSet, 1, 15) {
label[appKey] = appLabel
opt := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", appKey, label[appKey]),
@ -424,7 +408,6 @@ var _ = Describe("CephFS Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to check directory size: %v", err)
}
}
})
By("delete pvc and app")

View File

@ -250,8 +250,6 @@ var _ = Describe("RBD Upgrade Testing", func() {
e2elog.Failf("failed to calculate checksum: %v", err)
}
// pvc clone is only supported from v1.16+
if k8sVersionGreaterEquals(f.ClientSet, 1, 16) {
// Create snapshot of the pvc
snapshotPath := rbdExamplePath + "snapshot.yaml"
snap := getSnapshot(snapshotPath)
@ -262,7 +260,7 @@ var _ = Describe("RBD Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to create snapshot %v", err)
}
}
err = deletePod(app.Name, app.Namespace, f.ClientSet, deployTimeout)
if err != nil {
e2elog.Failf("failed to delete application: %v", err)
@ -299,9 +297,6 @@ var _ = Describe("RBD Upgrade Testing", func() {
pvcClonePath := rbdExamplePath + "pvc-restore.yaml"
appClonePath := rbdExamplePath + "pod-restore.yaml"
label := make(map[string]string)
// pvc clone is only supported from v1.16+
if k8sVersionGreaterEquals(f.ClientSet, 1, 16) {
pvcClone, err := loadPVC(pvcClonePath)
if err != nil {
e2elog.Failf("failed to load pvc: %v", err)
@ -343,8 +338,6 @@ var _ = Describe("RBD Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to delete pvc and application: %v", err)
}
}
})
By("Create clone from existing PVC", func() {
@ -352,8 +345,6 @@ var _ = Describe("RBD Upgrade Testing", func() {
appSmartClonePath := rbdExamplePath + "pod-clone.yaml"
label := make(map[string]string)
// pvc clone is only supported from v1.16+
if k8sVersionGreaterEquals(f.ClientSet, 1, 16) {
pvcClone, err := loadPVC(pvcSmartClonePath)
if err != nil {
e2elog.Failf("failed to load pvc: %v", err)
@ -395,16 +386,12 @@ var _ = Describe("RBD Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to delete pvc and application: %v", err)
}
}
})
By("Resize pvc and verify expansion", func() {
pvcExpandSize := "5Gi"
label := make(map[string]string)
// Resize 0.3.0 is only supported from v1.15+
if k8sVersionGreaterEquals(f.ClientSet, 1, 15) {
label[appKey] = appLabel
opt := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", appKey, label[appKey]),
@ -432,7 +419,6 @@ var _ = Describe("RBD Upgrade Testing", func() {
if err != nil {
e2elog.Failf("failed to check directory size: %v", err)
}
}
})
By("delete pvc and app", func() {

View File

@ -78,6 +78,7 @@ var (
rookNamespace string
radosNamespace string
poll = 2 * time.Second
isOpenShift bool
)
func getMons(ns string, c kubernetes.Interface) ([]string, error) {
@ -404,7 +405,7 @@ func validateNormalUserPVCAccess(pvcPath string, f *framework.Framework) error {
if pvc.Spec.VolumeMode != nil {
isBlockMode = (*pvc.Spec.VolumeMode == v1.PersistentVolumeBlock)
}
if !isBlockMode || k8sVersionGreaterEquals(f.ClientSet, 1, 22) {
if (!isBlockMode || k8sVersionGreaterEquals(f.ClientSet, 1, 22)) && !isOpenShift {
err = getMetricsForPVC(f, pvc, deployTimeout)
if err != nil {
return err
@ -526,7 +527,7 @@ func pvcDeleteWhenPoolNotFound(pvcPath string, cephFS bool, f *framework.Framewo
return err
}
// delete cephFS filesystem
err = deletePool("myfs", cephFS, f)
err = deletePool(fileSystemName, cephFS, f)
if err != nil {
return err
}

3
go.mod
View File

@ -7,7 +7,8 @@ require (
github.com/aws/aws-sdk-go v1.43.32
github.com/aws/aws-sdk-go-v2/service/sts v1.16.3
github.com/ceph/ceph-csi/api v0.0.0-00010101000000-000000000000
github.com/ceph/go-ceph v0.14.0
// TODO: API for managing NFS-exports requires `ceph_ci_untested` build-tag
github.com/ceph/go-ceph v0.15.0
github.com/container-storage-interface/spec v1.5.0
github.com/csi-addons/replication-lib-utils v0.2.0
github.com/csi-addons/spec v0.1.2-0.20211220115741-32fa508dadbe

4
go.sum
View File

@ -180,8 +180,8 @@ github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/centrify/cloud-golang-sdk v0.0.0-20190214225812-119110094d0f/go.mod h1:C0rtzmGXgN78pYR0tGJFhtHgkbAs0lIbHwkB81VxDQE=
github.com/ceph/go-ceph v0.14.0 h1:sJoT0au7NT3TPmDWf5W9w6tZy0U/5xZrIXVVauZR+Xo=
github.com/ceph/go-ceph v0.14.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY=
github.com/ceph/go-ceph v0.15.0 h1:ILB3NaLWOtt4u/2d8I8HZTC4Ycm1PsOYVar3IFU1xlo=
github.com/ceph/go-ceph v0.15.0/go.mod h1:mafFpf5Vg8Ai8Bd+FAMvKBHLmtdpTXdRP/TNq8XWegY=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/certifi/gocertifi v0.0.0-20200922220541-2c3bb06c6054/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=

View File

@ -20,6 +20,7 @@ import (
"github.com/ceph/ceph-csi/internal/util/log"
"k8s.io/client-go/tools/leaderelection/resourcelock"
clientConfig "sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
@ -61,6 +62,7 @@ func Start(config Config) error {
// disable metrics
MetricsBindAddress: "0",
LeaderElectionNamespace: config.Namespace,
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaderElectionID: electionID,
}
mgr, err := manager.New(clientConfig.GetConfigOrDie(), opts)

View File

@ -26,6 +26,7 @@ import (
fsutil "github.com/ceph/ceph-csi/internal/cephfs/util"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/go-ceph/common/admin/nfs"
"github.com/container-storage-interface/spec/lib/go/csi"
)
@ -135,24 +136,33 @@ func (nv *NFSVolume) CreateExport(backend *csi.Volume) error {
return fmt.Errorf("failed to set NFS-cluster: %w", err)
}
// TODO: use new go-ceph API, see ceph/ceph-csi#2977
// new versions of Ceph use a different command, and the go-ceph API
// also seems to be different :-/
//
// run the new command, but fall back to the previous one in case of an
// error
cmds := [][]string{
// ceph nfs export create cephfs --cluster-id <cluster_id>
// --pseudo-path <pseudo_path> --fsname <fsname>
// [--readonly] [--path=/path/in/cephfs]
nv.createExportCommand("--cluster-id="+nfsCluster,
"--fsname="+fs, "--pseudo-path="+nv.GetExportPath(),
"--path="+path),
// ceph nfs export create cephfs ${FS} ${NFS} /${EXPORT} ${SUBVOL_PATH}
nv.createExportCommand(nfsCluster, fs, nv.GetExportPath(), path),
nfsa, err := nv.conn.GetNFSAdmin()
if err != nil {
return fmt.Errorf("failed to get NFSAdmin: %w", err)
}
stderr, err := nv.retryIfInvalid(cmds)
_, err = nfsa.CreateCephFSExport(nfs.CephFSExportSpec{
FileSystemName: fs,
ClusterID: nfsCluster,
PseudoPath: nv.GetExportPath(),
Path: path,
})
switch {
case err == nil:
return nil
case strings.Contains(err.Error(), "rados: ret=-2"): // try with the old command
break
default: // any other error
return fmt.Errorf("exporting %q on NFS-cluster %q failed: %w",
nv, nfsCluster, err)
}
// if we get here, the API call failed, fallback to the old command
// ceph nfs export create cephfs ${FS} ${NFS} /${EXPORT} ${SUBVOL_PATH}
cmd := nv.createExportCommand(nfsCluster, fs, nv.GetExportPath(), path)
_, stderr, err := util.ExecCommand(nv.ctx, "ceph", cmd...)
if err != nil {
return fmt.Errorf("failed to create export %q in NFS-cluster %q"+
"(%v): %s", nv, nfsCluster, err, stderr)
@ -161,28 +171,6 @@ func (nv *NFSVolume) CreateExport(backend *csi.Volume) error {
return nil
}
// retryIfInvalid executes the "ceph" command, and falls back to the next cmd
// in case the error is EINVAL.
func (nv *NFSVolume) retryIfInvalid(cmds [][]string) (string, error) {
var (
stderr string
err error
)
for _, cmd := range cmds {
_, stderr, err = util.ExecCommand(nv.ctx, "ceph", cmd...)
// in case of an invalid command, fallback to the next one
if strings.Contains(stderr, "Error EINVAL: invalid command") {
continue
}
// If we get here, either no error, or an unexpected error
// happened. There is no need to retry an other command.
break
}
return stderr, err
}
// createExportCommand returns the "ceph nfs export create ..." command
// arguments (without "ceph"). The order of the parameters matches old Ceph
// releases, new Ceph releases added --option formats, which can be added when
@ -214,20 +202,28 @@ func (nv *NFSVolume) DeleteExport() error {
return fmt.Errorf("failed to identify NFS cluster: %w", err)
}
// TODO: use new go-ceph API, see ceph/ceph-csi#2977
// new versions of Ceph use a different command, and the go-ceph API
// also seems to be different :-/
//
// run the new command, but fall back to the previous one in case of an
// error
cmds := [][]string{
// ceph nfs export rm <cluster_id> <pseudo_path>
nv.deleteExportCommand("rm", nfsCluster),
// ceph nfs export delete <cluster_id> <pseudo_path>
nv.deleteExportCommand("delete", nfsCluster),
nfsa, err := nv.conn.GetNFSAdmin()
if err != nil {
return fmt.Errorf("failed to get NFSAdmin: %w", err)
}
stderr, err := nv.retryIfInvalid(cmds)
err = nfsa.RemoveExport(nfsCluster, nv.GetExportPath())
switch {
case err == nil:
return nil
case strings.Contains(err.Error(), "API call not implemented"): // try with the old command
break
default: // any other error
return fmt.Errorf("failed to remove %q from NFS-cluster %q: "+
"%w", nv, nfsCluster, err)
}
// if we get here, the API call failed, fallback to the old command
// ceph nfs export delete <cluster_id> <pseudo_path>
cmd := nv.deleteExportCommand("delete", nfsCluster)
_, stderr, err := util.ExecCommand(nv.ctx, "ceph", cmd...)
if err != nil {
return fmt.Errorf("failed to delete export %q from NFS-cluster"+
"%q (%v): %s", nv, nfsCluster, err, stderr)

View File

@ -615,8 +615,9 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
}
// checkHealthyPrimary checks if the image is a healhty primary or not.
// healthy primary image will be in up+stopped state, for states other
// than this it returns an error message.
// healthy primary image will be in up+stopped state in local cluster and
// up+replaying in the remote clusters, for states other than this it returns
// an error message.
func checkHealthyPrimary(ctx context.Context, rbdVol *rbdVolume) error {
mirrorStatus, err := rbdVol.getImageMirroringStatus()
if err != nil {
@ -640,6 +641,26 @@ func checkHealthyPrimary(ctx context.Context, rbdVol *rbdVolume) error {
localStatus.State)
}
// Remote image should be in up+replaying state.
for _, s := range mirrorStatus.SiteStatuses {
log.UsefulLog(
ctx,
"peer site mirrorUUID=%q, daemon up=%t, mirroring state=%q, description=%q and lastUpdate=%d",
s.MirrorUUID,
s.Up,
s.State,
s.Description,
s.LastUpdate)
if s.MirrorUUID != "" {
if !s.Up && s.State != librbd.MirrorImageStatusStateReplaying {
return fmt.Errorf("remote image %s is not healthy. State is up=%t, state=%q",
rbdVol,
s.Up,
s.State)
}
}
}
return nil
}

View File

@ -22,6 +22,7 @@ import (
"time"
ca "github.com/ceph/go-ceph/cephfs/admin"
"github.com/ceph/go-ceph/common/admin/nfs"
"github.com/ceph/go-ceph/rados"
ra "github.com/ceph/go-ceph/rbd/admin"
)
@ -140,3 +141,13 @@ func (cc *ClusterConnection) GetTaskAdmin() (*ra.TaskAdmin, error) {
return rbdAdmin.Task(), nil
}
// GetNFSAdmin returns an Admin type that can be used to interact with the
// NFS-cluster that is managed by Ceph.
func (cc *ClusterConnection) GetNFSAdmin() (*nfs.Admin, error) {
if cc.conn == nil {
return nil, errors.New("cluster is not connected yet")
}
return nfs.NewFromConn(cc.conn), nil
}

View File

@ -41,3 +41,5 @@ users:
- "system:serviceaccount:{{ .Namespace }}:{{ .Prefix }}csi-cephfs-plugin-sa"
# yamllint disable-line rule:line-length
- "system:serviceaccount:{{ .Namespace }}:{{ .Prefix }}csi-cephfs-provisioner-sa"
- "system:serviceaccount:{{ .Namespace }}:{{ .Prefix }}csi-nfs-plugin-sa"
- "system:serviceaccount:{{ .Namespace }}:{{ .Prefix }}csi-nfs-provisioner-sa"

View File

@ -0,0 +1,21 @@
//go:build !(nautilus || octopus) && ceph_preview && ceph_ci_untested
// +build !nautilus,!octopus,ceph_preview,ceph_ci_untested
package nfs
import (
ccom "github.com/ceph/go-ceph/common/commands"
)
// Admin is used to administer ceph nfs features.
type Admin struct {
conn ccom.RadosCommander
}
// NewFromConn creates an new management object from a preexisting
// rados connection. The existing connection can be rados.Conn or any
// type implementing the RadosCommander interface.
// PREVIEW
func NewFromConn(conn ccom.RadosCommander) *Admin {
return &Admin{conn}
}

View File

@ -0,0 +1,5 @@
/*
Package nfs from common/admin contains a set of APIs used to interact
with and administer NFS support for ceph clusters.
*/
package nfs

View File

@ -0,0 +1,198 @@
//go:build !(nautilus || octopus) && ceph_preview && ceph_ci_untested
// +build !nautilus,!octopus,ceph_preview,ceph_ci_untested
package nfs
import (
"github.com/ceph/go-ceph/internal/commands"
)
// SquashMode indicates the kind of user-id squashing performed on an export.
type SquashMode string
// src: https://github.com/nfs-ganesha/nfs-ganesha/blob/next/src/config_samples/export.txt
const (
// NoneSquash performs no id squashing.
NoneSquash SquashMode = "None"
// RootSquash performs squashing of root user (with any gid).
RootSquash SquashMode = "Root"
// AllSquash performs squashing of all users.
AllSquash SquashMode = "All"
// RootIDSquash performs squashing of root uid/gid.
RootIDSquash SquashMode = "RootId"
// NoRootSquash is equivalent to NoneSquash
NoRootSquash = NoneSquash
// Unspecifiedquash
Unspecifiedquash SquashMode = ""
)
// CephFSExportSpec is used to specify the parameters used to create a new
// CephFS based export.
type CephFSExportSpec struct {
FileSystemName string `json:"fsname"`
ClusterID string `json:"cluster_id"`
PseudoPath string `json:"pseudo_path"`
Path string `json:"path,omitempty"`
ReadOnly bool `json:"readonly"`
ClientAddr []string `json:"client_addr,omitempty"`
Squash SquashMode `json:"squash,omitempty"`
}
// ExportResult is returned along with newly created exports.
type ExportResult struct {
Bind string `json:"bind"`
FileSystemName string `json:"fs"`
Path string `json:"path"`
ClusterID string `json:"cluster"`
Mode string `json:"mode"`
}
type cephFSExportFields struct {
Prefix string `json:"prefix"`
Format string `json:"format"`
CephFSExportSpec
}
// FSALInfo describes NFS-Ganesha specific FSAL properties of an export.
type FSALInfo struct {
Name string `json:"name"`
UserID string `json:"user_id"`
FileSystemName string `json:"fs_name"`
}
// ClientInfo describes per-client parameters of an export.
type ClientInfo struct {
Addresses []string `json:"addresses"`
AccessType string `json:"access_type"`
Squash SquashMode `json:"squash"`
}
// ExportInfo describes an NFS export.
type ExportInfo struct {
ExportID int64 `json:"export_id"`
Path string `json:"path"`
ClusterID string `json:"cluster_id"`
PseudoPath string `json:"pseudo"`
AccessType string `json:"access_type"`
Squash SquashMode `json:"squash"`
SecurityLabel bool `json:"security_label"`
Protocols []int `json:"protocols"`
Transports []string `json:"transports"`
FSAL FSALInfo `json:"fsal"`
Clients []ClientInfo `json:"clients"`
}
func parseExportResult(res commands.Response) (*ExportResult, error) {
r := &ExportResult{}
if err := res.NoStatus().Unmarshal(r).End(); err != nil {
return nil, err
}
return r, nil
}
func parseExportsList(res commands.Response) ([]ExportInfo, error) {
l := []ExportInfo{}
if err := res.NoStatus().Unmarshal(&l).End(); err != nil {
return nil, err
}
return l, nil
}
func parseExportInfo(res commands.Response) (ExportInfo, error) {
i := ExportInfo{}
if err := res.NoStatus().Unmarshal(&i).End(); err != nil {
return i, err
}
return i, nil
}
// CreateCephFSExport will create a new NFS export for a CephFS file system.
// PREVIEW
//
// Similar To:
// ceph nfs export create cephfs
func (nfsa *Admin) CreateCephFSExport(spec CephFSExportSpec) (
*ExportResult, error) {
// ---
f := &cephFSExportFields{
Prefix: "nfs export create cephfs",
Format: "json",
CephFSExportSpec: spec,
}
return parseExportResult(commands.MarshalMgrCommand(nfsa.conn, f))
}
const delSucc = "Successfully deleted export"
// RemoveExport will remove an NFS export based on the pseudo-path of the export.
// PREVIEW
//
// Similar To:
// ceph nfs export rm
func (nfsa *Admin) RemoveExport(clusterID, pseudoPath string) error {
m := map[string]string{
"prefix": "nfs export rm",
"format": "json",
"cluster_id": clusterID,
"pseudo_path": pseudoPath,
}
return (commands.MarshalMgrCommand(nfsa.conn, m).
FilterBodyPrefix(delSucc).NoData().End())
}
// ListDetailedExports will return a list of exports with details.
// PREVIEW
//
// Similar To:
// ceph nfs export ls --detailed
func (nfsa *Admin) ListDetailedExports(clusterID string) ([]ExportInfo, error) {
/*
NOTE: there is no simple list because based on a quick reading of the code
in ceph, the details fetching should not be significantly slower with
details than without, and since this is an API call not a CLI its easy
enough to ignore the details you don't care about. If I'm wrong, and
we discover a major perf. difference in the future we can always add a new
simpler list-without-details function.
*/
m := map[string]string{
"prefix": "nfs export ls",
"detailed": "true",
"format": "json",
"cluster_id": clusterID,
}
return parseExportsList(commands.MarshalMgrCommand(nfsa.conn, m))
}
// ExportInfo will return a structure describing the export specified by it's
// pseudo-path.
// PREVIEW
//
// Similar To:
// ceph nfs export info
func (nfsa *Admin) ExportInfo(clusterID, pseudoPath string) (ExportInfo, error) {
m := map[string]string{
"prefix": "nfs export info",
"format": "json",
"cluster_id": clusterID,
"pseudo_path": pseudoPath,
}
return parseExportInfo(commands.MarshalMgrCommand(nfsa.conn, m))
}
/*
TODO?
'nfs export apply': cluster_id: str, inbuf: str
"""Create or update an export by `-i <json_or_ganesha_export_file>`"""
'nfs export create rgw':
bucket: str,
cluster_id: str,
pseudo_path: str,
readonly: Optional[bool] = False,
client_addr: Optional[List[str]] = None,
squash: str = 'none',
"""Create an RGW export"""
*/

View File

@ -1,6 +1,7 @@
package commands
import (
"bytes"
"encoding/json"
"errors"
"fmt"
@ -156,6 +157,18 @@ func (r Response) FilterSuffix(s string) Response {
return r
}
// FilterBodyPrefix sets the body value equivalent to an empty string if the
// body value contains the given prefix string.
func (r Response) FilterBodyPrefix(p string) Response {
if !r.Ok() {
return r
}
if bytes.HasPrefix(r.body, []byte(p)) {
return Response{[]byte(""), r.status, r.err}
}
return r
}
// FilterDeprecated removes deprecation warnings from the response status.
// Use it when checking the response from calls that may be deprecated in ceph
// if you want those calls to continue working if the warning is present.

View File

@ -0,0 +1,89 @@
package cutil
// #include <stdlib.h>
import "C"
import (
"unsafe"
)
// BufferGroup is a helper structure that holds Go-allocated slices of
// C-allocated strings and their respective lengths. Useful for C functions
// that consume byte buffers with explicit length instead of null-terminated
// strings. When used as input arguments in C functions, caller must make sure
// the C code will not hold any pointers to either of the struct's attributes
// after that C function returns.
type BufferGroup struct {
// C-allocated buffers.
Buffers []CharPtr
// Lengths of C buffers, where Lengths[i] = length(Buffers[i]).
Lengths []SizeT
}
// TODO: should BufferGroup implementation change and the slices would contain
// nested Go pointers, they must be pinned with PtrGuard.
// NewBufferGroupStrings returns new BufferGroup constructed from strings.
func NewBufferGroupStrings(strs []string) *BufferGroup {
s := &BufferGroup{
Buffers: make([]CharPtr, len(strs)),
Lengths: make([]SizeT, len(strs)),
}
for i, str := range strs {
bs := []byte(str)
s.Buffers[i] = CharPtr(C.CBytes(bs))
s.Lengths[i] = SizeT(len(bs))
}
return s
}
// NewBufferGroupBytes returns new BufferGroup constructed
// from slice of byte slices.
func NewBufferGroupBytes(bss [][]byte) *BufferGroup {
s := &BufferGroup{
Buffers: make([]CharPtr, len(bss)),
Lengths: make([]SizeT, len(bss)),
}
for i, bs := range bss {
s.Buffers[i] = CharPtr(C.CBytes(bs))
s.Lengths[i] = SizeT(len(bs))
}
return s
}
// Free free()s the C-allocated memory.
func (s *BufferGroup) Free() {
for _, ptr := range s.Buffers {
C.free(unsafe.Pointer(ptr))
}
s.Buffers = nil
s.Lengths = nil
}
// BuffersPtr returns a pointer to the beginning of the Buffers slice.
func (s *BufferGroup) BuffersPtr() CharPtrPtr {
if len(s.Buffers) == 0 {
return nil
}
return CharPtrPtr(&s.Buffers[0])
}
// LengthsPtr returns a pointer to the beginning of the Lengths slice.
func (s *BufferGroup) LengthsPtr() SizeTPtr {
if len(s.Lengths) == 0 {
return nil
}
return SizeTPtr(&s.Lengths[0])
}
func testBufferGroupGet(s *BufferGroup, index int) (str string, length int) {
bs := C.GoBytes(unsafe.Pointer(s.Buffers[index]), C.int(s.Lengths[index]))
return string(bs), int(s.Lengths[index])
}

14
vendor/github.com/ceph/go-ceph/internal/log/log.go generated vendored Normal file
View File

@ -0,0 +1,14 @@
// Package log is the internal package for go-ceph logging. This package is only
// used from go-ceph code, not from consumers of go-ceph. go-ceph code uses the
// functions in this package to log information that can't be returned as
// errors. The functions default to no-ops and can be set with the external log
// package common/log by the go-ceph consumers.
package log
func noop(string, ...interface{}) {}
// These variables are set by the common log package.
var (
Warnf = noop
Debugf = noop
)

View File

@ -10,69 +10,8 @@ import "C"
import (
"runtime"
"unsafe"
"github.com/ceph/go-ceph/internal/cutil"
)
// setOmapStep is a write op step. It holds C memory used in the operation.
type setOmapStep struct {
withRefs
withoutUpdate
// C arguments
cKeys cutil.CPtrCSlice
cValues cutil.CPtrCSlice
cLengths cutil.SizeTCSlice
cNum C.size_t
}
func newSetOmapStep(pairs map[string][]byte) *setOmapStep {
maplen := len(pairs)
cKeys := cutil.NewCPtrCSlice(maplen)
cValues := cutil.NewCPtrCSlice(maplen)
cLengths := cutil.NewSizeTCSlice(maplen)
sos := &setOmapStep{
cKeys: cKeys,
cValues: cValues,
cLengths: cLengths,
cNum: C.size_t(maplen),
}
var i uintptr
for key, value := range pairs {
// key
ck := C.CString(key)
sos.add(unsafe.Pointer(ck))
cKeys[i] = cutil.CPtr(ck)
// value and its length
vlen := cutil.SizeT(len(value))
if vlen > 0 {
cv := C.CBytes(value)
sos.add(cv)
cValues[i] = cutil.CPtr(cv)
} else {
cValues[i] = nil
}
cLengths[i] = vlen
i++
}
runtime.SetFinalizer(sos, opStepFinalizer)
return sos
}
func (sos *setOmapStep) free() {
sos.cKeys.Free()
sos.cValues.Free()
sos.cLengths.Free()
sos.withRefs.free()
}
// OmapKeyValue items are returned by the GetOmapStep's Next call.
type OmapKeyValue struct {
Key string
@ -88,15 +27,6 @@ type OmapKeyValue struct {
// Release method is called the public methods of the step must no longer be
// used and may return errors.
type GetOmapStep struct {
// inputs:
startAfter string
filterPrefix string
maxReturn uint64
// arguments:
cStartAfter *C.char
cFilterPrefix *C.char
// C returned data:
iter C.rados_omap_iter_t
more *C.uchar
@ -109,13 +39,8 @@ type GetOmapStep struct {
canIterate bool
}
func newGetOmapStep(startAfter, filterPrefix string, maxReturn uint64) *GetOmapStep {
func newGetOmapStep() *GetOmapStep {
gos := &GetOmapStep{
startAfter: startAfter,
filterPrefix: filterPrefix,
maxReturn: maxReturn,
cStartAfter: C.CString(startAfter),
cFilterPrefix: C.CString(filterPrefix),
more: (*C.uchar)(C.malloc(C.sizeof_uchar)),
rval: (*C.int)(C.malloc(C.sizeof_int)),
}
@ -133,10 +58,6 @@ func (gos *GetOmapStep) free() {
gos.more = nil
C.free(unsafe.Pointer(gos.rval))
gos.rval = nil
C.free(unsafe.Pointer(gos.cStartAfter))
gos.cStartAfter = nil
C.free(unsafe.Pointer(gos.cFilterPrefix))
gos.cFilterPrefix = nil
}
func (gos *GetOmapStep) update() error {
@ -153,9 +74,10 @@ func (gos *GetOmapStep) Next() (*OmapKeyValue, error) {
var (
cKey *C.char
cVal *C.char
cLen C.size_t
cKeyLen C.size_t
cValLen C.size_t
)
ret := C.rados_omap_get_next(gos.iter, &cKey, &cVal, &cLen)
ret := C.rados_omap_get_next2(gos.iter, &cKey, &cVal, &cKeyLen, &cValLen)
if ret != 0 {
return nil, getError(ret)
}
@ -163,8 +85,8 @@ func (gos *GetOmapStep) Next() (*OmapKeyValue, error) {
return nil, nil
}
return &OmapKeyValue{
Key: C.GoString(cKey),
Value: C.GoBytes(unsafe.Pointer(cVal), C.int(cLen)),
Key: string(C.GoBytes(unsafe.Pointer(cKey), C.int(cKeyLen))),
Value: C.GoBytes(unsafe.Pointer(cVal), C.int(cValLen)),
}, nil
}
@ -175,40 +97,6 @@ func (gos *GetOmapStep) More() bool {
return *gos.more != 0
}
// removeOmapKeysStep is a write operation step used to track state, especially
// C memory, across the setup and use of a WriteOp.
type removeOmapKeysStep struct {
withRefs
withoutUpdate
// arguments:
cKeys cutil.CPtrCSlice
cNum C.size_t
}
func newRemoveOmapKeysStep(keys []string) *removeOmapKeysStep {
cKeys := cutil.NewCPtrCSlice(len(keys))
roks := &removeOmapKeysStep{
cKeys: cKeys,
cNum: C.size_t(len(keys)),
}
i := 0
for _, key := range keys {
cKeys[i] = cutil.CPtr(C.CString(key))
roks.add(unsafe.Pointer(cKeys[i]))
i++
}
runtime.SetFinalizer(roks, opStepFinalizer)
return roks
}
func (roks *removeOmapKeysStep) free() {
roks.cKeys.Free()
roks.withRefs.free()
}
// SetOmap appends the map `pairs` to the omap `oid`
func (ioctx *IOContext) SetOmap(oid string, pairs map[string][]byte) error {
op := CreateWriteOp()

View File

@ -0,0 +1,31 @@
//go:build ceph_preview
// +build ceph_preview
package rados
// #cgo LDFLAGS: -lrados
// #include <rados/librados.h>
// #include <stdlib.h>
//
import "C"
import (
"unsafe"
)
// SetLocator sets the key for mapping objects to pgs within an io context.
// Until a different locator key is set, all objects in this io context will be placed in the same pg.
// To reset the locator, an empty string must be set.
// PREVIEW
//
// Implements:
// void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key);
func (ioctx *IOContext) SetLocator(locator string) {
if locator == "" {
C.rados_ioctx_locator_set_key(ioctx.ioctx, nil)
} else {
var cLoc *C.char = C.CString(locator)
defer C.free(unsafe.Pointer(cLoc))
C.rados_ioctx_locator_set_key(ioctx.ioctx, cLoc)
}
}

View File

@ -69,13 +69,19 @@ func (r *ReadOp) AssertExists() {
// function. The GetOmapStep may be used to iterate over the key-value
// pairs after the Operate call has been performed.
func (r *ReadOp) GetOmapValues(startAfter, filterPrefix string, maxReturn uint64) *GetOmapStep {
gos := newGetOmapStep(startAfter, filterPrefix, maxReturn)
gos := newGetOmapStep()
r.steps = append(r.steps, gos)
cStartAfter := C.CString(startAfter)
cFilterPrefix := C.CString(filterPrefix)
defer C.free(unsafe.Pointer(cStartAfter))
defer C.free(unsafe.Pointer(cFilterPrefix))
C.rados_read_op_omap_get_vals2(
r.op,
gos.cStartAfter,
gos.cFilterPrefix,
C.uint64_t(gos.maxReturn),
cStartAfter,
cFilterPrefix,
C.uint64_t(maxReturn),
&gos.iter,
gos.more,
gos.rval,

View File

@ -11,6 +11,8 @@ import "C"
import (
"unsafe"
"github.com/ceph/go-ceph/internal/cutil"
)
// ReadOpOmapGetValsByKeysStep holds the result of the
@ -65,10 +67,11 @@ func (s *ReadOpOmapGetValsByKeysStep) Next() (*OmapKeyValue, error) {
var (
cKey *C.char
cVal *C.char
cKeyLen C.size_t
cValLen C.size_t
)
ret := C.rados_omap_get_next(s.iter, &cKey, &cVal, &cValLen)
ret := C.rados_omap_get_next2(s.iter, &cKey, &cVal, &cKeyLen, &cValLen)
if ret != 0 {
return nil, getError(ret)
}
@ -79,7 +82,7 @@ func (s *ReadOpOmapGetValsByKeysStep) Next() (*OmapKeyValue, error) {
}
return &OmapKeyValue{
Key: C.GoString(cKey),
Key: string(C.GoBytes(unsafe.Pointer(cKey), C.int(cKeyLen))),
Value: C.GoBytes(unsafe.Pointer(cVal), C.int(cValLen)),
}, nil
}
@ -88,30 +91,24 @@ func (s *ReadOpOmapGetValsByKeysStep) Next() (*OmapKeyValue, error) {
// PREVIEW
//
// Implements:
// void rados_read_op_omap_get_vals_by_keys(rados_read_op_t read_op,
// void rados_read_op_omap_get_vals_by_keys2(rados_read_op_t read_op,
// char const * const * keys,
// size_t keys_len,
// size_t num_keys,
// const size_t * key_lens,
// rados_omap_iter_t * iter,
// int * prval)
func (r *ReadOp) GetOmapValuesByKeys(keys []string) *ReadOpOmapGetValsByKeysStep {
s := newReadOpOmapGetValsByKeysStep()
r.steps = append(r.steps, s)
cKeys := make([]*C.char, len(keys))
defer func() {
for _, cKeyPtr := range cKeys {
C.free(unsafe.Pointer(cKeyPtr))
}
}()
cKeys := cutil.NewBufferGroupStrings(keys)
defer cKeys.Free()
for i, key := range keys {
cKeys[i] = C.CString(key)
}
C.rados_read_op_omap_get_vals_by_keys(
C.rados_read_op_omap_get_vals_by_keys2(
r.op,
&cKeys[0],
(**C.char)(cKeys.BuffersPtr()),
C.size_t(len(keys)),
(*C.size_t)(cKeys.LengthsPtr()),
&s.iter,
s.prval,
)

View File

@ -19,6 +19,8 @@ import (
"sync"
"time"
"unsafe"
"github.com/ceph/go-ceph/internal/log"
)
type (
@ -340,14 +342,6 @@ func decodeNotifyResponse(response *C.char, len C.size_t) ([]NotifyAck, []Notify
//export watchNotifyCb
func watchNotifyCb(_ unsafe.Pointer, notifyID C.uint64_t, id C.uint64_t,
notifierID C.uint64_t, cData unsafe.Pointer, dataLen C.size_t) {
watchersMtx.RLock()
w, ok := watchers[WatcherID(id)]
watchersMtx.RUnlock()
if !ok {
// usually this should not happen, but who knows
// TODO: some log message (once we have logging)
return
}
ev := NotifyEvent{
ID: NotifyID(notifyID),
WatcherID: WatcherID(id),
@ -356,6 +350,14 @@ func watchNotifyCb(_ unsafe.Pointer, notifyID C.uint64_t, id C.uint64_t,
if dataLen > 0 {
ev.Data = C.GoBytes(cData, C.int(dataLen))
}
watchersMtx.RLock()
w, ok := watchers[WatcherID(id)]
watchersMtx.RUnlock()
if !ok {
// usually this should not happen, but who knows
log.Warnf("received notification for unknown watcher ID: %#v", ev)
return
}
select {
case <-w.done: // unblock when deleted
case w.events <- ev:
@ -369,7 +371,7 @@ func watchErrorCb(_ unsafe.Pointer, id C.uint64_t, err C.int) {
watchersMtx.RUnlock()
if !ok {
// usually this should not happen, but who knows
// TODO: some log message (once we have logging)
log.Warnf("received error for unknown watcher ID: id=%d err=%#v", id, err)
return
}
select {

View File

@ -10,6 +10,7 @@ import "C"
import (
"unsafe"
"github.com/ceph/go-ceph/internal/cutil"
ts "github.com/ceph/go-ceph/internal/timespec"
)
@ -92,24 +93,39 @@ func (w *WriteOp) Create(exclusive CreateOption) {
// SetOmap appends the map `pairs` to the omap `oid`.
func (w *WriteOp) SetOmap(pairs map[string][]byte) {
sos := newSetOmapStep(pairs)
w.steps = append(w.steps, sos)
C.rados_write_op_omap_set(
keys := make([]string, len(pairs))
values := make([][]byte, len(pairs))
idx := 0
for k, v := range pairs {
keys[idx] = k
values[idx] = v
idx++
}
cKeys := cutil.NewBufferGroupStrings(keys)
cValues := cutil.NewBufferGroupBytes(values)
defer cKeys.Free()
defer cValues.Free()
C.rados_write_op_omap_set2(
w.op,
(**C.char)(sos.cKeys.Ptr()),
(**C.char)(sos.cValues.Ptr()),
(*C.size_t)(sos.cLengths.Ptr()),
sos.cNum)
(**C.char)(cKeys.BuffersPtr()),
(**C.char)(cValues.BuffersPtr()),
(*C.size_t)(cKeys.LengthsPtr()),
(*C.size_t)(cValues.LengthsPtr()),
(C.size_t)(len(pairs)))
}
// RmOmapKeys removes the specified `keys` from the omap `oid`.
func (w *WriteOp) RmOmapKeys(keys []string) {
roks := newRemoveOmapKeysStep(keys)
w.steps = append(w.steps, roks)
C.rados_write_op_omap_rm_keys(
cKeys := cutil.NewBufferGroupStrings(keys)
defer cKeys.Free()
C.rados_write_op_omap_rm_keys2(
w.op,
(**C.char)(roks.cKeys.Ptr()),
roks.cNum)
(**C.char)(cKeys.BuffersPtr()),
(*C.size_t)(cKeys.LengthsPtr()),
(C.size_t)(len(keys)))
}
// CleanOmap clears the omap `oid`.

4
vendor/modules.txt vendored
View File

@ -116,15 +116,17 @@ github.com/cenkalti/backoff/v3
github.com/ceph/ceph-csi/api/deploy/kubernetes/nfs
github.com/ceph/ceph-csi/api/deploy/kubernetes/rbd
github.com/ceph/ceph-csi/api/deploy/ocp
# github.com/ceph/go-ceph v0.14.0
# github.com/ceph/go-ceph v0.15.0
## explicit; go 1.12
github.com/ceph/go-ceph/cephfs/admin
github.com/ceph/go-ceph/common/admin/manager
github.com/ceph/go-ceph/common/admin/nfs
github.com/ceph/go-ceph/common/commands
github.com/ceph/go-ceph/internal/callbacks
github.com/ceph/go-ceph/internal/commands
github.com/ceph/go-ceph/internal/cutil
github.com/ceph/go-ceph/internal/errutil
github.com/ceph/go-ceph/internal/log
github.com/ceph/go-ceph/internal/retry
github.com/ceph/go-ceph/internal/timespec
github.com/ceph/go-ceph/rados