Merge pull request #55 from ceph/devel

Sync rhs/ceph-csi:devel with ceph/ceph-csi:devel
This commit is contained in:
OpenShift Merge Robot 2021-12-13 06:31:29 +00:00 committed by GitHub
commit 38c200a51c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 407 additions and 176 deletions

View File

@ -50,12 +50,13 @@ pull_request_rules:
- "status-success=lint-extras"
- "status-success=ci/centos/k8s-e2e-external-storage/1.21"
- "status-success=ci/centos/k8s-e2e-external-storage/1.22"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.20"
- "status-success=ci/centos/k8s-e2e-external-storage/1.23"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.21"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.22"
- "status-success=ci/centos/mini-e2e/k8s-1.20"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.23"
- "status-success=ci/centos/mini-e2e/k8s-1.21"
- "status-success=ci/centos/mini-e2e/k8s-1.22"
- "status-success=ci/centos/mini-e2e/k8s-1.23"
- "status-success=ci/centos/upgrade-tests-cephfs"
- "status-success=ci/centos/upgrade-tests-rbd"
- "status-success=DCO"
@ -80,12 +81,13 @@ pull_request_rules:
- "status-success=lint-extras"
- "status-success=ci/centos/k8s-e2e-external-storage/1.21"
- "status-success=ci/centos/k8s-e2e-external-storage/1.22"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.20"
- "status-success=ci/centos/k8s-e2e-external-storage/1.23"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.21"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.22"
- "status-success=ci/centos/mini-e2e/k8s-1.20"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.23"
- "status-success=ci/centos/mini-e2e/k8s-1.21"
- "status-success=ci/centos/mini-e2e/k8s-1.22"
- "status-success=ci/centos/mini-e2e/k8s-1.23"
- "status-success=ci/centos/upgrade-tests-cephfs"
- "status-success=ci/centos/upgrade-tests-rbd"
- "status-success=DCO"
@ -109,50 +111,13 @@ pull_request_rules:
- "#changes-requested-reviews-by=0"
- "status-success=ci/centos/k8s-e2e-external-storage/1.21"
- "status-success=ci/centos/k8s-e2e-external-storage/1.22"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.20"
- "status-success=ci/centos/k8s-e2e-external-storage/1.23"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.21"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.22"
- "status-success=ci/centos/mini-e2e/k8s-1.20"
- "status-success=ci/centos/mini-e2e/k8s-1.21"
- "status-success=ci/centos/mini-e2e/k8s-1.22"
- "status-success=ci/centos/upgrade-tests-cephfs"
- "status-success=ci/centos/upgrade-tests-rbd"
- "status-success=DCO"
actions:
merge: {}
dismiss_reviews: {}
delete_head_branch: {}
- name: backport patches to release-v3.3 branch
conditions:
- base=devel
- label=backport-to-release-v3.3
actions:
backport:
branches:
- release-v3.3
# automerge backports if CI successfully ran
- name: automerge backport release-v3.3
conditions:
- author=mergify[bot]
- base=release-v3.3
- label!=DNM
- "#approved-reviews-by>=2"
- "approved-reviews-by=@ceph/ceph-csi-contributors"
- "approved-reviews-by=@ceph/ceph-csi-maintainers"
- "status-success=codespell"
- "status-success=multi-arch-build"
- "status-success=go-test"
- "status-success=commitlint"
- "status-success=golangci-lint"
- "status-success=mod-check"
- "status-success=lint-extras"
- "#changes-requested-reviews-by=0"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.20"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.21"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.22"
- "status-success=ci/centos/mini-e2e/k8s-1.20"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.23"
- "status-success=ci/centos/mini-e2e/k8s-1.21"
- "status-success=ci/centos/mini-e2e/k8s-1.22"
- "status-success=ci/centos/mini-e2e/k8s-1.23"
- "status-success=ci/centos/upgrade-tests-cephfs"
- "status-success=ci/centos/upgrade-tests-rbd"
- "status-success=DCO"
@ -185,12 +150,12 @@ pull_request_rules:
- "status-success=mod-check"
- "status-success=lint-extras"
- "#changes-requested-reviews-by=0"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.20"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.21"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.22"
- "status-success=ci/centos/mini-e2e/k8s-1.20"
- "status-success=ci/centos/mini-e2e-helm/k8s-1.23"
- "status-success=ci/centos/mini-e2e/k8s-1.21"
- "status-success=ci/centos/mini-e2e/k8s-1.22"
- "status-success=ci/centos/mini-e2e/k8s-1.23"
- "status-success=ci/centos/upgrade-tests-cephfs"
- "status-success=ci/centos/upgrade-tests-rbd"
- "status-success=DCO"

View File

@ -16,7 +16,7 @@ BASE_IMAGE=docker.io/ceph/ceph:v16
CEPH_VERSION=octopus
# standard Golang options
GOLANG_VERSION=1.16.4
GOLANG_VERSION=1.17.5
GO111MODULE=on
# commitlint version

View File

@ -27,7 +27,7 @@ import (
"github.com/ceph/ceph-csi/internal/controller"
"github.com/ceph/ceph-csi/internal/controller/persistentvolume"
"github.com/ceph/ceph-csi/internal/liveness"
"github.com/ceph/ceph-csi/internal/rbd"
rbddriver "github.com/ceph/ceph-csi/internal/rbd/driver"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -222,7 +222,7 @@ func main() {
case rbdType:
validateCloneDepthFlag(&conf)
validateMaxSnaphostFlag(&conf)
driver := rbd.NewDriver()
driver := rbddriver.NewDriver()
driver.Run(&conf)
case cephFSType:

View File

@ -37,6 +37,8 @@ var (
rbdDeploymentName = "csi-rbdplugin-provisioner"
rbdDaemonsetName = "csi-rbdplugin"
defaultRBDPool = "replicapool"
erasureCodedPool = "ec-pool"
noDataPool = ""
// Topology related variables.
nodeRegionLabel = "test.failure-domain/region"
regionValue = "testregion"
@ -465,6 +467,75 @@ var _ = Describe("RBD", func() {
validateRBDImageCount(f, 0, defaultRBDPool)
})
By("create an erasure coded PVC and bind it to an app", func() {
err := deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
e2elog.Failf("failed to delete storageclass: %v", err)
}
err = createRBDStorageClass(
f.ClientSet,
f,
defaultSCName,
nil,
map[string]string{
"dataPool": erasureCodedPool,
"pool": defaultRBDPool,
},
deletePolicy)
if err != nil {
e2elog.Failf("failed to create storageclass: %v", err)
}
pvc, app, err := createPVCAndAppBinding(pvcPath, appPath, f, deployTimeout)
if err != nil {
e2elog.Failf("failed to create pvc and application binding: %v", err)
}
err = checkPVCDataPoolForImageInPool(f, pvc, defaultRBDPool, "ec-pool")
if err != nil {
e2elog.Failf("failed to check data pool for image: %v", err)
}
err = deletePVCAndApp("", f, pvc, app)
if err != nil {
e2elog.Failf("failed to delete pvc and application : %v", err)
}
// validate created backend rbd images
validateRBDImageCount(f, 0, defaultRBDPool)
})
By("create an erasure coded PVC and validate snapshot restore", func() {
validatePVCSnapshot(
defaultCloneCount,
pvcPath,
appPath,
snapshotPath,
pvcClonePath,
appClonePath,
noKMS, noKMS,
defaultSCName,
erasureCodedPool,
f)
})
By("create an erasure coded PVC and validate PVC-PVC clone", func() {
validatePVCClone(
defaultCloneCount,
pvcPath,
appPath,
pvcSmartClonePath,
appSmartClonePath,
erasureCodedPool,
noKMS,
noPVCValidation,
f)
err := deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
e2elog.Failf("failed to delete storageclass: %v", err)
}
err = createRBDStorageClass(f.ClientSet, f, defaultSCName, nil, nil, deletePolicy)
if err != nil {
e2elog.Failf("failed to create storageclass: %v", err)
}
})
By("create a PVC and bind it to an app with ext4 as the FS ", func() {
err := deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
@ -578,6 +649,80 @@ var _ = Describe("RBD", func() {
}
})
By("create PVC with layering,fast-diff image-features and bind it to an app",
func() {
if util.CheckKernelSupport(kernelRelease, fastDiffSupport) {
err := deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
e2elog.Failf("failed to delete storageclass: %v", err)
}
err = createRBDStorageClass(
f.ClientSet,
f,
defaultSCName,
nil,
map[string]string{
"imageFeatures": "layering,exclusive-lock,object-map,fast-diff",
},
deletePolicy)
if err != nil {
e2elog.Failf("failed to create storageclass: %v", err)
}
err = validatePVCAndAppBinding(pvcPath, appPath, f)
if err != nil {
e2elog.Failf("failed to validate CephFS pvc and application binding: %v", err)
}
// validate created backend rbd images
validateRBDImageCount(f, 0, defaultRBDPool)
err = deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
e2elog.Failf("failed to delete storageclass: %v", err)
}
err = createRBDStorageClass(f.ClientSet, f, defaultSCName, nil, nil, deletePolicy)
if err != nil {
e2elog.Failf("failed to create storageclass: %v", err)
}
}
})
By("create PVC with journaling,fast-diff image-features and bind it to an app using rbd-nbd mounter",
func() {
if util.CheckKernelSupport(kernelRelease, fastDiffSupport) {
err := deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
e2elog.Failf("failed to delete storageclass: %v", err)
}
// Storage class with rbd-nbd mounter
err = createRBDStorageClass(
f.ClientSet,
f,
defaultSCName,
nil,
map[string]string{
"mounter": "rbd-nbd",
"imageFeatures": "layering,journaling,exclusive-lock,object-map,fast-diff",
},
deletePolicy)
if err != nil {
e2elog.Failf("failed to create storageclass: %v", err)
}
err = validatePVCAndAppBinding(pvcPath, appPath, f)
if err != nil {
e2elog.Failf("failed to validate CephFS pvc and application binding: %v", err)
}
// validate created backend rbd images
validateRBDImageCount(f, 0, defaultRBDPool)
err = deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
e2elog.Failf("failed to delete storageclass: %v", err)
}
err = createRBDStorageClass(f.ClientSet, f, defaultSCName, nil, nil, deletePolicy)
if err != nil {
e2elog.Failf("failed to create storageclass: %v", err)
}
}
})
// NOTE: RWX is restricted for FileSystem VolumeMode at ceph-csi,
// see pull#261 for more details.
By("Create RWX+Block Mode PVC and bind to multiple pods via deployment using rbd-nbd mounter", func() {
@ -1475,6 +1620,7 @@ var _ = Describe("RBD", func() {
appClonePath,
noKMS, noKMS,
defaultSCName,
noDataPool,
f)
})
@ -1485,6 +1631,7 @@ var _ = Describe("RBD", func() {
appPath,
pvcSmartClonePath,
appSmartClonePath,
noDataPool,
noKMS,
noPVCValidation,
f)
@ -1502,7 +1649,15 @@ var _ = Describe("RBD", func() {
e2elog.Failf("failed to create storageclass: %v", err)
}
validatePVCClone(1, pvcPath, appPath, pvcSmartClonePath, appSmartClonePath, noKMS, isThickPVC, f)
validatePVCClone(1,
pvcPath,
appPath,
pvcSmartClonePath,
appSmartClonePath,
noDataPool,
noKMS,
isThickPVC,
f)
err = deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
@ -1531,7 +1686,7 @@ var _ = Describe("RBD", func() {
validatePVCSnapshot(1,
pvcPath, appPath, snapshotPath, pvcClonePath, appClonePath,
vaultKMS, vaultKMS,
defaultSCName,
defaultSCName, noDataPool,
f)
err = deleteResource(rbdExamplePath + "storageclass.yaml")
@ -1577,7 +1732,7 @@ var _ = Describe("RBD", func() {
validatePVCSnapshot(1,
pvcPath, appPath, snapshotPath, pvcClonePath, appClonePath,
vaultKMS, vaultTenantSAKMS,
restoreSCName, f)
restoreSCName, noDataPool, f)
err = retryKubectlArgs(cephCSINamespace, kubectlDelete, deployTimeout, "storageclass", restoreSCName)
if err != nil {
@ -1636,7 +1791,7 @@ var _ = Describe("RBD", func() {
validatePVCSnapshot(1,
pvcPath, appPath, snapshotPath, pvcClonePath, appClonePath,
vaultKMS, secretsMetadataKMS,
restoreSCName, f)
restoreSCName, noDataPool, f)
// delete user secret
err = retryKubectlFile(namespace,
@ -1681,7 +1836,15 @@ var _ = Describe("RBD", func() {
e2elog.Failf("failed to create storageclass: %v", err)
}
validatePVCClone(1, pvcPath, appPath, pvcSmartClonePath, appSmartClonePath, secretsMetadataKMS, isEncryptedPVC, f)
validatePVCClone(1,
pvcPath,
appPath,
pvcSmartClonePath,
appSmartClonePath,
noDataPool,
secretsMetadataKMS,
isEncryptedPVC,
f)
err = deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
@ -1707,7 +1870,15 @@ var _ = Describe("RBD", func() {
e2elog.Failf("failed to create storageclass: %v", err)
}
validatePVCClone(1, pvcPath, appPath, pvcSmartClonePath, appSmartClonePath, vaultKMS, isEncryptedPVC, f)
validatePVCClone(1,
pvcPath,
appPath,
pvcSmartClonePath,
appSmartClonePath,
noDataPool,
vaultKMS,
isEncryptedPVC,
f)
err = deleteResource(rbdExamplePath + "storageclass.yaml")
if err != nil {
@ -1736,6 +1907,7 @@ var _ = Describe("RBD", func() {
rawAppPath,
pvcBlockSmartClonePath,
appBlockSmartClonePath,
noDataPool,
noKMS,
noPVCValidation,
f)

View File

@ -40,6 +40,18 @@ var nbdResizeSupport = []util.KernelVersion{
}, // standard 5.3+ versions
}
// nolint:gomnd // numbers specify Kernel versions.
var fastDiffSupport = []util.KernelVersion{
{
Version: 5,
PatchLevel: 3,
SubLevel: 0,
ExtraVersion: 0,
Distribution: "",
Backport: false,
}, // standard 5.3+ versions
}
// To use `io-timeout=0` we need
// www.mail-archive.com/linux-block@vger.kernel.org/msg38060.html
// nolint:gomnd // numbers specify Kernel versions.

View File

@ -594,7 +594,8 @@ func writeDataAndCalChecksum(app *v1.Pod, opt *metav1.ListOptions, f *framework.
// nolint:gocyclo,gocognit,nestif,cyclop // reduce complexity
func validatePVCClone(
totalCount int,
sourcePvcPath, sourceAppPath, clonePvcPath, clonePvcAppPath string,
sourcePvcPath, sourceAppPath, clonePvcPath, clonePvcAppPath,
dataPool string,
kms kmsConfig,
validatePVC validateFunc,
f *framework.Framework) {
@ -662,6 +663,9 @@ func validatePVCClone(
LabelSelector: fmt.Sprintf("%s=%s", appKey, label[appKey]),
}
wgErrs[n] = createPVCAndApp(name, f, &p, &a, deployTimeout)
if wgErrs[n] == nil && dataPool != noDataPool {
wgErrs[n] = checkPVCDataPoolForImageInPool(f, &p, defaultRBDPool, dataPool)
}
if wgErrs[n] == nil && kms != noKMS {
if kms.canGetPassphrase() {
imageData, sErr := getImageInfoFromPVC(p.Namespace, name, f)
@ -802,9 +806,8 @@ func validatePVCClone(
func validatePVCSnapshot(
totalCount int,
pvcPath, appPath, snapshotPath, pvcClonePath, appClonePath string,
kms, restoreKMS kmsConfig,
restoreSCName string,
f *framework.Framework) {
kms, restoreKMS kmsConfig, restoreSCName,
dataPool string, f *framework.Framework) {
var wg sync.WaitGroup
wgErrs := make([]error, totalCount)
chErrs := make([]error, totalCount)
@ -1020,6 +1023,10 @@ func validatePVCSnapshot(
name := fmt.Sprintf("%s%d", f.UniqueName, n)
p.Spec.DataSource.Name = name
wgErrs[n] = createPVCAndApp(name, f, &p, &a, deployTimeout)
if wgErrs[n] == nil && dataPool != noDataPool {
wgErrs[n] = checkPVCDataPoolForImageInPool(f, &p, defaultRBDPool, dataPool)
}
wg.Done()
}(i, *pvcClone, *appClone)
}

1
go.mod
View File

@ -25,6 +25,7 @@ require (
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
golang.org/x/sys v0.0.0-20210817190340-bfb29a6856f2
google.golang.org/grpc v1.42.0
google.golang.org/protobuf v1.26.0
k8s.io/api v0.22.4
k8s.io/apimachinery v0.22.4
k8s.io/client-go v12.0.0+incompatible

View File

@ -734,7 +734,7 @@ func checkContentSource(
if volID == "" {
return nil, nil, status.Errorf(codes.NotFound, "volume ID cannot be empty")
}
rbdvol, err := genVolFromVolID(ctx, volID, cr, req.GetSecrets())
rbdvol, err := GenVolFromVolID(ctx, volID, cr, req.GetSecrets())
if err != nil {
log.ErrorLog(ctx, "failed to get backend image for %s: %v", volID, err)
if !errors.Is(err, ErrImageNotFound) {
@ -750,9 +750,9 @@ func checkContentSource(
return nil, nil, status.Errorf(codes.InvalidArgument, "not a proper volume source")
}
// checkErrAndUndoReserve work on error from genVolFromVolID() and undo omap reserve.
// checkErrAndUndoReserve work on error from GenVolFromVolID() and undo omap reserve.
// Even-though volumeID is part of rbdVolume struct we take it as an arg here, the main reason
// being, the volume id is getting filled from `genVolFromVolID->generateVolumeFromVolumeID` call path,
// being, the volume id is getting filled from `GenVolFromVolID->generateVolumeFromVolumeID` call path,
// and this function is operating on the error case/scenario of above call chain, so we can not rely
// on the 'rbdvol->rbdimage->voldID' field.
@ -865,7 +865,7 @@ func (cs *ControllerServer) DeleteVolume(
return &csi.DeleteVolumeResponse{}, nil
}
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, secrets)
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, secrets)
defer rbdVol.Destroy()
if err != nil {
return cs.checkErrAndUndoReserve(ctx, err, volumeID, rbdVol, cr)
@ -1016,7 +1016,7 @@ func (cs *ControllerServer) CreateSnapshot(
defer cr.DeleteCredentials()
// Fetch source volume information
rbdVol, err := genVolFromVolID(ctx, req.GetSourceVolumeId(), cr, req.GetSecrets())
rbdVol, err := GenVolFromVolID(ctx, req.GetSourceVolumeId(), cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
@ -1462,7 +1462,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
}
defer cr.DeleteCredentials()
rbdVol, err := genVolFromVolID(ctx, volID, cr, req.GetSecrets())
rbdVol, err := GenVolFromVolID(ctx, volID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
package rbddriver
import (
"fmt"
@ -22,7 +22,7 @@ import (
casrbd "github.com/ceph/ceph-csi/internal/csi-addons/rbd"
csiaddons "github.com/ceph/ceph-csi/internal/csi-addons/server"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/rbd"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
@ -30,61 +30,34 @@ import (
mount "k8s.io/mount-utils"
)
const (
// volIDVersion is the version number of volume ID encoding scheme.
volIDVersion uint16 = 1
)
// Driver contains the default identity,node and controller struct.
type Driver struct {
cd *csicommon.CSIDriver
ids *IdentityServer
ns *NodeServer
cs *ControllerServer
rs *ReplicationServer
ids *rbd.IdentityServer
ns *rbd.NodeServer
cs *rbd.ControllerServer
rs *rbd.ReplicationServer
// cas is the CSIAddonsServer where CSI-Addons services are handled
cas *csiaddons.CSIAddonsServer
}
var (
// CSIInstanceID is the instance ID that is unique to an instance of CSI, used when sharing
// ceph clusters across CSI instances, to differentiate omap names per CSI instance.
CSIInstanceID = "default"
// volJournal and snapJournal are used to maintain RADOS based journals for CO generated
// VolumeName to backing RBD images.
volJournal *journal.Config
snapJournal *journal.Config
// rbdHardMaxCloneDepth is the hard limit for maximum number of nested volume clones that are taken before flatten
// occurs.
rbdHardMaxCloneDepth uint
// rbdSoftMaxCloneDepth is the soft limit for maximum number of nested volume clones that are taken before flatten
// occurs.
rbdSoftMaxCloneDepth uint
maxSnapshotsOnImage uint
minSnapshotsOnImageToStartFlatten uint
skipForceFlatten bool
)
// NewDriver returns new rbd driver.
func NewDriver() *Driver {
return &Driver{}
}
// NewIdentityServer initialize a identity server for rbd CSI driver.
func NewIdentityServer(d *csicommon.CSIDriver) *IdentityServer {
return &IdentityServer{
func NewIdentityServer(d *csicommon.CSIDriver) *rbd.IdentityServer {
return &rbd.IdentityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
}
}
// NewControllerServer initialize a controller server for rbd CSI driver.
func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
return &ControllerServer{
func NewControllerServer(d *csicommon.CSIDriver) *rbd.ControllerServer {
return &rbd.ControllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
VolumeLocks: util.NewVolumeLocks(),
SnapshotLocks: util.NewVolumeLocks(),
@ -92,17 +65,17 @@ func NewControllerServer(d *csicommon.CSIDriver) *ControllerServer {
}
}
func NewReplicationServer(c *ControllerServer) *ReplicationServer {
return &ReplicationServer{ControllerServer: c}
func NewReplicationServer(c *rbd.ControllerServer) *rbd.ReplicationServer {
return &rbd.ReplicationServer{ControllerServer: c}
}
// NewNodeServer initialize a node server for rbd CSI driver.
func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*NodeServer, error) {
func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*rbd.NodeServer, error) {
mounter := mount.New("")
return &NodeServer{
return &rbd.NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
mounter: mounter,
Mounter: mounter,
VolumeLocks: util.NewVolumeLocks(),
}, nil
}
@ -116,20 +89,14 @@ func (r *Driver) Run(conf *util.Config) {
var err error
var topology map[string]string
// Use passed in instance ID, if provided for omap suffix naming
if conf.InstanceID != "" {
CSIInstanceID = conf.InstanceID
}
// update clone soft and hard limit
rbdHardMaxCloneDepth = conf.RbdHardMaxCloneDepth
rbdSoftMaxCloneDepth = conf.RbdSoftMaxCloneDepth
skipForceFlatten = conf.SkipForceFlatten
maxSnapshotsOnImage = conf.MaxSnapshotsOnImage
minSnapshotsOnImageToStartFlatten = conf.MinSnapshotsOnImage
rbd.SetGlobalInt("rbdHardMaxCloneDepth", conf.RbdHardMaxCloneDepth)
rbd.SetGlobalInt("rbdSoftMaxCloneDepth", conf.RbdSoftMaxCloneDepth)
rbd.SetGlobalBool("skipForceFlatten", conf.SkipForceFlatten)
rbd.SetGlobalInt("maxSnapshotsOnImage", conf.MaxSnapshotsOnImage)
rbd.SetGlobalInt("minSnapshotsOnImageToStartFlatten", conf.MinSnapshotsOnImage)
// Create instances of the volume and snapshot journal
volJournal = journal.NewCSIVolumeJournal(CSIInstanceID)
snapJournal = journal.NewCSISnapshotJournal(CSIInstanceID)
rbd.InitJournals(conf.InstanceID)
// configre CSI-Addons server and components
err = r.setupCSIAddonsServer(conf)
@ -174,14 +141,16 @@ func (r *Driver) Run(conf *util.Config) {
log.FatalLogMsg("failed to start node server, err %v\n", err)
}
var attr string
attr, err = getKrbdSupportedFeatures()
attr, err = rbd.GetKrbdSupportedFeatures()
if err != nil {
log.FatalLogMsg(err.Error())
}
krbdFeatures, err = hexStringToInteger(attr)
var krbdFeatures uint
krbdFeatures, err = rbd.HexStringToInteger(attr)
if err != nil {
log.FatalLogMsg(err.Error())
}
rbd.SetGlobalInt("krbdFeatures", krbdFeatures)
}
if conf.IsControllerServer {
@ -220,7 +189,7 @@ func (r *Driver) Run(conf *util.Config) {
if conf.IsNodeServer {
go func() {
// TODO: move the healer to csi-addons
err := runVolumeHealer(r.ns, conf)
err := rbd.RunVolumeHealer(r.ns, conf)
if err != nil {
log.ErrorLogMsg("healer had failures, err %v\n", err)
}

View File

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
package rbddriver
import (
"os"

106
internal/rbd/globals.go Normal file
View File

@ -0,0 +1,106 @@
/*
Copyright 2021 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"fmt"
"github.com/ceph/ceph-csi/internal/journal"
)
const (
// volIDVersion is the version number of volume ID encoding scheme.
volIDVersion uint16 = 1
)
var (
// CSIInstanceID is the instance ID that is unique to an instance of CSI, used when sharing
// ceph clusters across CSI instances, to differentiate omap names per CSI instance.
CSIInstanceID = "default"
// volJournal and snapJournal are used to maintain RADOS based journals for CO generated
// VolumeName to backing RBD images.
volJournal *journal.Config
snapJournal *journal.Config
// rbdHardMaxCloneDepth is the hard limit for maximum number of nested volume clones that are taken before flatten
// occurs.
rbdHardMaxCloneDepth uint
// rbdSoftMaxCloneDepth is the soft limit for maximum number of nested volume clones that are taken before flatten
// occurs.
rbdSoftMaxCloneDepth uint
maxSnapshotsOnImage uint
minSnapshotsOnImageToStartFlatten uint
skipForceFlatten bool
// krbd features supported by the loaded driver.
krbdFeatures uint
)
// SetGlobalInt provides a way for the rbd-driver to configure global variables
// in the rbd package.
//
// TODO: these global variables should be set in the ControllerService and
// NodeService where appropriate. Using global variables limits the ability to
// configure these options based on the Ceph cluster or StorageClass.
func SetGlobalInt(name string, value uint) {
switch name {
case "rbdHardMaxCloneDepth":
rbdHardMaxCloneDepth = value
case "rbdSoftMaxCloneDepth":
rbdSoftMaxCloneDepth = value
case "maxSnapshotsOnImage":
maxSnapshotsOnImage = value
case "minSnapshotsOnImageToStartFlatten":
minSnapshotsOnImageToStartFlatten = value
case "krbdFeatures":
krbdFeatures = value
default:
panic(fmt.Sprintf("BUG: can not set unknown variable %q", name))
}
}
// SetGlobalBool provides a way for the rbd-driver to configure global
// variables in the rbd package.
//
// TODO: these global variables should be set in the ControllerService and
// NodeService where appropriate. Using global variables limits the ability to
// configure these options based on the Ceph cluster or StorageClass.
func SetGlobalBool(name string, value bool) {
switch name {
case "skipForceFlatten":
skipForceFlatten = value
default:
panic(fmt.Sprintf("BUG: can not set unknown variable %q", name))
}
}
// InitJournals initializes the global journals that are used by the rbd
// package. This is called from the rbd-driver on startup.
//
// TODO: these global journals should be set in the ControllerService and
// NodeService where appropriate. Using global journals limits the ability to
// configure these options based on the Ceph cluster or StorageClass.
func InitJournals(instance string) {
// Use passed in instance ID, if provided for omap suffix naming
if instance != "" {
CSIInstanceID = instance
}
volJournal = journal.NewCSIVolumeJournal(CSIInstanceID)
snapJournal = journal.NewCSISnapshotJournal(CSIInstanceID)
}

View File

@ -42,7 +42,7 @@ import (
// node server spec.
type NodeServer struct {
*csicommon.DefaultNodeServer
mounter mount.Interface
Mounter mount.Interface
// A map storing all volumes with ongoing operations so that additional operations
// for that same volume (as defined by VolumeID) return an Aborted error
VolumeLocks *util.VolumeLocks
@ -297,7 +297,7 @@ func (ns *NodeServer) NodeStageVolume(
if !isHealer {
var isNotMnt bool
// check if stagingPath is already mounted
isNotMnt, err = isNotMountPoint(ns.mounter, stagingTargetPath)
isNotMnt, err = isNotMountPoint(ns.Mounter, stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
} else if !isNotMnt {
@ -507,7 +507,7 @@ func (ns *NodeServer) undoStagingTransaction(
stagingTargetPath := getStagingTargetPath(req)
if transaction.isMounted {
err = ns.mounter.Unmount(stagingTargetPath)
err = ns.Mounter.Unmount(stagingTargetPath)
if err != nil {
log.ErrorLog(ctx, "failed to unmount stagingtargetPath: %s with error: %v", stagingTargetPath, err)
@ -626,7 +626,7 @@ func (ns *NodeServer) mountVolumeToStagePath(
stagingPath, devicePath string) (bool, error) {
readOnly := false
fsType := req.GetVolumeCapability().GetMount().GetFsType()
diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()}
diskMounter := &mount.SafeFormatAndMount{Interface: ns.Mounter, Exec: utilexec.New()}
// rbd images are thin-provisioned and return zeros for unwritten areas. A freshly created
// image will not benefit from discard and we also want to avoid as much unnecessary zeroing
// as possible. Open-code mkfs here because FormatAndMount() doesn't accept custom mkfs
@ -730,7 +730,7 @@ func (ns *NodeServer) mountVolume(ctx context.Context, stagingPath string, req *
func (ns *NodeServer) createTargetMountPath(ctx context.Context, mountPath string, isBlock bool) (bool, error) {
// Check if that mount path exists properly
notMnt, err := mount.IsNotMountPoint(ns.mounter, mountPath)
notMnt, err := mount.IsNotMountPoint(ns.Mounter, mountPath)
if err == nil {
return notMnt, nil
}
@ -773,7 +773,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
targetPath := req.GetTargetPath()
// considering kubelet make sure node operations like unpublish/unstage...etc can not be called
// at same time, an explicit locking at time of nodeunpublish is not required.
notMnt, err := mount.IsNotMountPoint(ns.mounter, targetPath)
notMnt, err := mount.IsNotMountPoint(ns.Mounter, targetPath)
if err != nil {
if os.IsNotExist(err) {
// targetPath has already been deleted
@ -792,7 +792,7 @@ func (ns *NodeServer) NodeUnpublishVolume(
return &csi.NodeUnpublishVolumeResponse{}, nil
}
if err = ns.mounter.Unmount(targetPath); err != nil {
if err = ns.Mounter.Unmount(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@ -839,7 +839,7 @@ func (ns *NodeServer) NodeUnstageVolume(
stagingParentPath := req.GetStagingTargetPath()
stagingTargetPath := getStagingTargetPath(req)
notMnt, err := mount.IsNotMountPoint(ns.mounter, stagingTargetPath)
notMnt, err := mount.IsNotMountPoint(ns.Mounter, stagingTargetPath)
if err != nil {
if !os.IsNotExist(err) {
return nil, status.Error(codes.NotFound, err.Error())
@ -849,7 +849,7 @@ func (ns *NodeServer) NodeUnstageVolume(
}
if !notMnt {
// Unmounting the image
err = ns.mounter.Unmount(stagingTargetPath)
err = ns.Mounter.Unmount(stagingTargetPath)
if err != nil {
log.ExtendedLog(ctx, "failed to unmount targetPath: %s with error: %v", stagingTargetPath, err)
@ -1061,7 +1061,7 @@ func (ns *NodeServer) processEncryptedDevice(
// make sure we continue with the encrypting of the device
fallthrough
case encrypted == rbdImageEncryptionPrepared:
diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()}
diskMounter := &mount.SafeFormatAndMount{Interface: ns.Mounter, Exec: utilexec.New()}
// TODO: update this when adding support for static (pre-provisioned) PVs
var existingFormat string
existingFormat, err = diskMounter.GetDiskFormat(devicePath)
@ -1109,7 +1109,7 @@ func (ns *NodeServer) xfsSupportsReflink() bool {
// run mkfs.xfs in the same namespace as formatting would be done in
// mountVolumeToStagePath()
diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()}
diskMounter := &mount.SafeFormatAndMount{Interface: ns.Mounter, Exec: utilexec.New()}
out, err := diskMounter.Exec.Command("mkfs.xfs").CombinedOutput()
if err != nil {
// mkfs.xfs should fail with an error message (and help text)

View File

@ -127,8 +127,8 @@ func callNodeStageVolume(ns *NodeServer, c *k8s.Clientset, pv *v1.PersistentVolu
return nil
}
// runVolumeHealer heal the volumes attached on a node.
func runVolumeHealer(ns *NodeServer, conf *util.Config) error {
// RunVolumeHealer heal the volumes attached on a node.
func RunVolumeHealer(ns *NodeServer, conf *util.Config) error {
c, err := kubeclient.NewK8sClient()
if err != nil {
log.ErrorLogMsg("failed to connect to Kubernetes: %v", err)

View File

@ -200,8 +200,7 @@ type migrationVolID struct {
clusterID string
}
var (
supportedFeatures = map[string]imageFeature{
var supportedFeatures = map[string]imageFeature{
librbd.FeatureNameLayering: {
needRbdNbd: false,
},
@ -220,14 +219,11 @@ var (
needRbdNbd: true,
dependsOn: []string{librbd.FeatureNameExclusiveLock},
},
}
}
krbdFeatures uint64 // krbd features supported by the loaded driver.
)
// getKrbdSupportedFeatures load the module if needed and return supported
// GetKrbdSupportedFeatures load the module if needed and return supported
// features attribute as a string.
func getKrbdSupportedFeatures() (string, error) {
func GetKrbdSupportedFeatures() (string, error) {
// check if the module is loaded or compiled in
_, err := os.Stat(krbdSupportedFeaturesFile)
if err != nil {
@ -254,8 +250,8 @@ func getKrbdSupportedFeatures() (string, error) {
return strings.TrimSuffix(string(val), "\n"), nil
}
// hexStringToInteger convert hex value to uint.
func hexStringToInteger(hexString string) (uint64, error) {
// HexStringToInteger convert hex value to uint.
func HexStringToInteger(hexString string) (uint, error) {
// trim 0x prefix
numberStr := strings.TrimPrefix(strings.ToLower(hexString), "0x")
@ -266,7 +262,7 @@ func hexStringToInteger(hexString string) (uint64, error) {
return 0, err
}
return output, nil
return uint(output), nil
}
// isKrbdFeatureSupported checks if a given Image Feature is supported by krbd
@ -278,7 +274,7 @@ func isKrbdFeatureSupported(ctx context.Context, imageFeatures string) bool {
supported := true
for _, featureName := range imageFeatureSet.Names() {
if (uint64(librbd.FeatureSetFromNames(strings.Split(featureName, " "))) & krbdFeatures) == 0 {
if (uint(librbd.FeatureSetFromNames(strings.Split(featureName, " "))) & krbdFeatures) == 0 {
supported = false
log.ErrorLog(ctx, "krbd feature %q not supported", featureName)
@ -1145,9 +1141,11 @@ func generateVolumeFromVolumeID(
return rbdVol, err
}
// genVolFromVolID generates a rbdVolume structure from the provided identifier, updating
// GenVolFromVolID generates a rbdVolume structure from the provided identifier, updating
// the structure with elements from on-disk image metadata as well.
func genVolFromVolID(
//
// nolint // returns non-exported *rbdVolume, which is fine
func GenVolFromVolID(
ctx context.Context,
volumeID string,
cr *util.Credentials,

View File

@ -334,9 +334,9 @@ func TestIsKrbdFeatureSupported(t *testing.T) {
t.Parallel()
var err error
krbdSupportedFeaturesAttr := "0x1"
krbdFeatures, err = hexStringToInteger(krbdSupportedFeaturesAttr) // initialize krbdFeatures
krbdFeatures, err = HexStringToInteger(krbdSupportedFeaturesAttr) // initialize krbdFeatures
if err != nil {
t.Errorf("hexStringToInteger(%s) failed", krbdSupportedFeaturesAttr)
t.Errorf("HexStringToInteger(%s) failed", krbdSupportedFeaturesAttr)
}
supported := isKrbdFeatureSupported(ctx, tc.featureName)
if supported != tc.isSupported {

View File

@ -235,7 +235,7 @@ func (rs *ReplicationServer) EnableVolumeReplication(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
@ -410,7 +410,7 @@ func (rs *ReplicationServer) DisableVolumeReplication(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
@ -522,7 +522,7 @@ func (rs *ReplicationServer) PromoteVolume(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
@ -658,7 +658,7 @@ func (rs *ReplicationServer) DemoteVolume(ctx context.Context,
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {
@ -745,7 +745,7 @@ func (rs *ReplicationServer) ResyncVolume(ctx context.Context,
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, volumeID)
}
defer rs.VolumeLocks.Release(volumeID)
rbdVol, err := genVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
rbdVol, err := GenVolFromVolID(ctx, volumeID, cr, req.GetSecrets())
defer rbdVol.Destroy()
if err != nil {
switch {

View File

@ -31,7 +31,7 @@ import (
"golang.org/x/sys/unix"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/cloud-provider/volume/helpers"
"k8s.io/utils/mount"
mount "k8s.io/mount-utils"
)
// RoundOffVolSize rounds up given quantity up to chunks of MiB/GiB.

1
vendor/modules.txt vendored
View File

@ -543,6 +543,7 @@ google.golang.org/grpc/stats
google.golang.org/grpc/status
google.golang.org/grpc/tap
# google.golang.org/protobuf v1.26.0
## explicit
google.golang.org/protobuf/encoding/protojson
google.golang.org/protobuf/encoding/prototext
google.golang.org/protobuf/encoding/protowire