cephfs: use shallow volumes for the ROX accessMode

this commit makes shallow volume as default feature for ROX volumes.

Signed-off-by: riya-singhal31 <rsinghal@redhat.com>
This commit is contained in:
riya-singhal31 2023-02-07 14:36:36 +05:30 committed by mergify[bot]
parent 8854c8523d
commit b28b5e6c84
7 changed files with 389 additions and 21 deletions

View File

@ -70,7 +70,7 @@ you're running it inside a k8s cluster and find the config itself).
| `pool` | no | Ceph pool into which volume data shall be stored | | `pool` | no | Ceph pool into which volume data shall be stored |
| `volumeNamePrefix` | no | Prefix to use for naming subvolumes (defaults to `csi-vol-`). | | `volumeNamePrefix` | no | Prefix to use for naming subvolumes (defaults to `csi-vol-`). |
| `snapshotNamePrefix` | no | Prefix to use for naming snapshots (defaults to `csi-snap-`) | | `snapshotNamePrefix` | no | Prefix to use for naming snapshots (defaults to `csi-snap-`) |
| `backingSnapshot` | no | Boolean value. The PVC shall be backed by the CephFS snapshot specified in its data source. `pool` parameter must not be specified. (defaults to `false`) | | `backingSnapshot` | no | Boolean value. The PVC shall be backed by the CephFS snapshot specified in its data source. `pool` parameter must not be specified. (defaults to `true`) |
| `kernelMountOptions` | no | Comma separated string of mount options accepted by cephfs kernel mounter, by default no options are passed. Check man mount.ceph for options. | | `kernelMountOptions` | no | Comma separated string of mount options accepted by cephfs kernel mounter, by default no options are passed. Check man mount.ceph for options. |
| `fuseMountOptions` | no | Comma separated string of mount options accepted by ceph-fuse mounter, by default no options are passed. | | `fuseMountOptions` | no | Comma separated string of mount options accepted by ceph-fuse mounter, by default no options are passed. |
| `csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | Name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value | | `csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | Name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value |

View File

@ -1617,8 +1617,8 @@ var _ = Describe(cephfsType, func() {
if err != nil { if err != nil {
framework.Failf("failed to delete CephFS storageclass: %v", err) framework.Failf("failed to delete CephFS storageclass: %v", err)
} }
err = createCephfsStorageClass(f.ClientSet, f, false, map[string]string{ err = createCephfsStorageClass(f.ClientSet, f, false, map[string]string{
"backingSnapshot": "true",
"encrypted": "true", "encrypted": "true",
"encryptionKMSID": kmsID, "encryptionKMSID": kmsID,
}) })
@ -1759,17 +1759,6 @@ var _ = Describe(cephfsType, func() {
framework.Failf("failed to get SHA512 sum for file: %v", err) framework.Failf("failed to get SHA512 sum for file: %v", err)
} }
err = deleteResource(cephFSExamplePath + "storageclass.yaml")
if err != nil {
framework.Failf("failed to delete CephFS storageclass: %v", err)
}
err = createCephfsStorageClass(f.ClientSet, f, false, map[string]string{
"backingSnapshot": "true",
})
if err != nil {
framework.Failf("failed to create CephFS storageclass: %v", err)
}
pvcClone, err := loadPVC(pvcClonePath) pvcClone, err := loadPVC(pvcClonePath)
if err != nil { if err != nil {
framework.Failf("failed to load PVC: %v", err) framework.Failf("failed to load PVC: %v", err)
@ -1838,6 +1827,141 @@ var _ = Describe(cephfsType, func() {
} }
}) })
By("checking snapshot-backed volume by backing snapshot as false", func() {
pvc, err := loadPVC(pvcPath)
if err != nil {
framework.Failf("failed to load PVC: %v", err)
}
pvc.Namespace = f.UniqueName
err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout)
if err != nil {
framework.Failf("failed to create PVC: %v", err)
}
_, pv, err := getPVCAndPV(f.ClientSet, pvc.Name, pvc.Namespace)
if err != nil {
framework.Failf("failed to get PV object for %s: %v", pvc.Name, err)
}
app, err := loadApp(appPath)
if err != nil {
framework.Failf("failed to load application: %v", err)
}
app.Namespace = f.UniqueName
app.Spec.Volumes[0].PersistentVolumeClaim.ClaimName = pvc.Name
appLabels := map[string]string{
appKey: appLabel,
}
app.Labels = appLabels
optApp := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", appKey, appLabels[appKey]),
}
err = writeDataInPod(app, &optApp, f)
if err != nil {
framework.Failf("failed to write data: %v", err)
}
appTestFilePath := app.Spec.Containers[0].VolumeMounts[0].MountPath + "/test"
snap := getSnapshot(snapshotPath)
snap.Namespace = f.UniqueName
snap.Spec.Source.PersistentVolumeClaimName = &pvc.Name
err = createSnapshot(&snap, deployTimeout)
if err != nil {
framework.Failf("failed to create snapshot: %v", err)
}
validateCephFSSnapshotCount(f, 1, subvolumegroup, pv)
err = appendToFileInContainer(f, app, appTestFilePath, "hello", &optApp)
if err != nil {
framework.Failf("failed to append data: %v", err)
}
parentFileSum, err := calculateSHA512sum(f, app, appTestFilePath, &optApp)
if err != nil {
framework.Failf("failed to get SHA512 sum for file: %v", err)
}
err = deleteResource(cephFSExamplePath + "storageclass.yaml")
if err != nil {
framework.Failf("failed to delete CephFS storageclass: %v", err)
}
err = createCephfsStorageClass(f.ClientSet, f, false, map[string]string{
"backingSnapshot": "false",
})
if err != nil {
framework.Failf("failed to create CephFS storageclass: %v", err)
}
pvcClone, err := loadPVC(pvcClonePath)
if err != nil {
framework.Failf("failed to load PVC: %v", err)
}
// Snapshot-backed volumes support read-only access modes only.
pvcClone.Spec.AccessModes = []v1.PersistentVolumeAccessMode{v1.ReadOnlyMany}
appClone, err := loadApp(appClonePath)
if err != nil {
framework.Failf("failed to load application: %v", err)
}
appCloneLabels := map[string]string{
appKey: appCloneLabel,
}
appClone.Labels = appCloneLabels
optAppClone := metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", appKey, appCloneLabels[appKey]),
}
pvcClone.Namespace = f.UniqueName
appClone.Namespace = f.UniqueName
err = createPVCAndApp("", f, pvcClone, appClone, deployTimeout)
if err != nil {
framework.Failf("failed to create PVC and app: %v", err)
}
validateSubvolumeCount(f, 2, fileSystemName, subvolumegroup)
// Deleting snapshot before deleting pvcClone should succeed. It will be
// deleted once all volumes that are backed by this snapshot are gone.
err = deleteSnapshot(&snap, deployTimeout)
if err != nil {
framework.Failf("failed to delete snapshot: %v", err)
}
validateCephFSSnapshotCount(f, 0, subvolumegroup, pv)
appCloneTestFilePath := appClone.Spec.Containers[0].VolumeMounts[0].MountPath + "/test"
snapFileSum, err := calculateSHA512sum(f, appClone, appCloneTestFilePath, &optAppClone)
if err != nil {
framework.Failf("failed to get SHA512 sum for file: %v", err)
}
if parentFileSum == snapFileSum {
framework.Failf("SHA512 sums of files in parent subvol and snapshot should differ")
}
err = deletePVCAndApp("", f, pvcClone, appClone)
if err != nil {
framework.Failf("failed to delete PVC or application: %v", err)
}
validateCephFSSnapshotCount(f, 0, subvolumegroup, pv)
err = deletePVCAndApp("", f, pvc, app)
if err != nil {
framework.Failf("failed to delete PVC or application: %v", err)
}
err = deleteResource(cephFSExamplePath + "storageclass.yaml")
if err != nil {
framework.Failf("failed to delete CephFS storageclass: %v", err)
}
err = createCephfsStorageClass(f.ClientSet, f, false, nil)
if err != nil {
framework.Failf("failed to create CephFS storageclass: %v", err)
}
})
if testCephFSFscrypt { if testCephFSFscrypt {
kmsToTest := map[string]kmsConfig{ kmsToTest := map[string]kmsConfig{
"secrets-metadata-test": secretsMetadataKMS, "secrets-metadata-test": secretsMetadataKMS,

View File

@ -49,8 +49,8 @@ parameters:
# (optional) Boolean value. The PVC shall be backed by the CephFS snapshot # (optional) Boolean value. The PVC shall be backed by the CephFS snapshot
# specified in its data source. `pool` parameter must not be specified. # specified in its data source. `pool` parameter must not be specified.
# (defaults to `false`) # (defaults to `true`)
# backingSnapshot: "true" # backingSnapshot: "false"
# (optional) Instruct the plugin it has to encrypt the volume # (optional) Instruct the plugin it has to encrypt the volume
# By default it is disabled. Valid values are "true" or "false". # By default it is disabled. Valid values are "true" or "false".

View File

@ -230,15 +230,12 @@ func checkValidCreateVolumeRequest(
case sID != nil: case sID != nil:
if vol.BackingSnapshot { if vol.BackingSnapshot {
volCaps := req.GetVolumeCapabilities() volCaps := req.GetVolumeCapabilities()
for _, volCap := range volCaps { isRO := store.IsVolumeCreateRO(volCaps)
mode := volCap.AccessMode.Mode if !isRO {
if mode != csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY &&
mode != csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY {
return errors.New("backingSnapshot may be used only with read-only access modes") return errors.New("backingSnapshot may be used only with read-only access modes")
} }
} }
} }
}
return nil return nil
} }

View File

@ -235,6 +235,7 @@ func NewVolumeOptions(
opts.Monitors = strings.Join(clusterData.Monitors, ",") opts.Monitors = strings.Join(clusterData.Monitors, ",")
opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup opts.SubvolumeGroup = clusterData.CephFS.SubvolumeGroup
opts.Owner = k8s.GetOwner(volOptions) opts.Owner = k8s.GetOwner(volOptions)
opts.BackingSnapshot = IsShallowVolumeSupported(req)
if err = extractOptionalOption(&opts.Pool, "pool", volOptions); err != nil { if err = extractOptionalOption(&opts.Pool, "pool", volOptions); err != nil {
return nil, err return nil, err
@ -323,6 +324,28 @@ func NewVolumeOptions(
return &opts, nil return &opts, nil
} }
// IsShallowVolumeSupported returns true only for ReadOnly volume requests
// with datasource as snapshot.
func IsShallowVolumeSupported(req *csi.CreateVolumeRequest) bool {
isRO := IsVolumeCreateRO(req.VolumeCapabilities)
return isRO && (req.GetVolumeContentSource() != nil && req.GetVolumeContentSource().GetSnapshot() != nil)
}
func IsVolumeCreateRO(caps []*csi.VolumeCapability) bool {
for _, cap := range caps {
if cap.AccessMode != nil {
switch cap.AccessMode.Mode { //nolint:exhaustive // only check what we want
case csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY:
return true
}
}
}
return false
}
// newVolumeOptionsFromVolID generates a new instance of volumeOptions and VolumeIdentifier // newVolumeOptionsFromVolID generates a new instance of volumeOptions and VolumeIdentifier
// from the provided CSI VolumeID. // from the provided CSI VolumeID.
// nolint:gocyclo,cyclop // TODO: reduce complexity // nolint:gocyclo,cyclop // TODO: reduce complexity

View File

@ -0,0 +1,222 @@
/*
Copyright 2023 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"testing"
"github.com/container-storage-interface/spec/lib/go/csi"
)
func TestIsVolumeCreateRO(t *testing.T) {
t.Parallel()
tests := []struct {
name string
caps []*csi.VolumeCapability
isRO bool
}{
{
name: "valid access mode",
caps: []*csi.VolumeCapability{
{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY,
},
},
},
isRO: true,
},
{
name: "Invalid access mode",
caps: []*csi.VolumeCapability{
{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
},
},
},
isRO: false,
},
{
name: "valid access mode",
caps: []*csi.VolumeCapability{
{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
},
},
},
isRO: true,
},
{
name: "Invalid access mode",
caps: []*csi.VolumeCapability{
{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
},
isRO: false,
},
{
name: "Invalid access mode",
caps: []*csi.VolumeCapability{
{
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_SINGLE_WRITER,
},
},
},
isRO: false,
},
}
for _, tt := range tests {
newtt := tt
t.Run(newtt.name, func(t *testing.T) {
t.Parallel()
wantErr := IsVolumeCreateRO(newtt.caps)
if wantErr != newtt.isRO {
t.Errorf("isVolumeCreateRO() wantErr = %v, isRO %v", wantErr, newtt.isRO)
}
})
}
}
func TestIsShallowVolumeSupported(t *testing.T) {
t.Parallel()
type args struct {
req *csi.CreateVolumeRequest
}
tests := []struct {
name string
args args
want bool
}{
{
name: "Invalid request",
args: args{
req: &csi.CreateVolumeRequest{
Name: "",
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Block{},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_MULTI_WRITER,
},
},
},
VolumeContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: "vol",
},
},
},
},
},
want: false,
},
{
name: "Invalid request",
args: args{
req: &csi.CreateVolumeRequest{
Name: "",
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Block{},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
},
},
},
VolumeContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Volume{
Volume: &csi.VolumeContentSource_VolumeSource{
VolumeId: "vol",
},
},
},
},
},
want: false,
},
{
name: "Invalid request",
args: args{
req: &csi.CreateVolumeRequest{
Name: "",
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Block{},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
},
},
},
VolumeContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Snapshot{
Snapshot: &csi.VolumeContentSource_SnapshotSource{
SnapshotId: "snap",
},
},
},
},
},
want: false,
},
{
name: "valid request",
args: args{
req: &csi.CreateVolumeRequest{
Name: "",
VolumeCapabilities: []*csi.VolumeCapability{
{
AccessType: &csi.VolumeCapability_Block{},
AccessMode: &csi.VolumeCapability_AccessMode{
Mode: csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY,
},
},
},
VolumeContentSource: &csi.VolumeContentSource{
Type: &csi.VolumeContentSource_Snapshot{
Snapshot: &csi.VolumeContentSource_SnapshotSource{
SnapshotId: "snap",
},
},
},
},
},
want: true,
},
}
for _, tt := range tests {
newtt := tt
t.Run(newtt.name, func(t *testing.T) {
t.Log(newtt.args.req.GetVolumeContentSource().GetSnapshot())
t.Log(IsVolumeCreateRO(newtt.args.req.GetVolumeCapabilities()))
t.Parallel()
if got := IsShallowVolumeSupported(newtt.args.req); got != newtt.want {
t.Errorf("IsShallowVolumeSupported() = %v, want %v", got, newtt.want)
}
})
}
}

View File

@ -77,6 +77,8 @@ func (cs *Server) CreateVolume(
ctx context.Context, ctx context.Context,
req *csi.CreateVolumeRequest, req *csi.CreateVolumeRequest,
) (*csi.CreateVolumeResponse, error) { ) (*csi.CreateVolumeResponse, error) {
// nfs does not supports shallow snapshots
req.Parameters["backingSnapshot"] = "false"
res, err := cs.backendServer.CreateVolume(ctx, req) res, err := cs.backendServer.CreateVolume(ctx, req)
if err != nil { if err != nil {
return nil, err return nil, err