added volumeNamePrefix and snapshotNamePrefix as parameters for storageClass

this allows administrators to override the naming prefix for both volumes and snapshots
created by the rbd plugin.

Signed-off-by: Reinier Schoof <reinier@skoef.nl>
This commit is contained in:
Reinier Schoof 2020-02-24 14:19:42 +01:00 committed by mergify[bot]
parent 8163552b81
commit a4532fafd0
13 changed files with 465 additions and 165 deletions

View File

@ -82,6 +82,8 @@ is used to define in which namespace you want the configmaps to be stored
| `fsName` | yes | CephFS filesystem name into which the volume shall be created |
| `mounter` | no | Mount method to be used for this volume. Available options are `kernel` for Ceph kernel client and `fuse` for Ceph FUSE driver. Defaults to "default mounter". |
| `pool` | no | Ceph pool into which volume data shall be stored |
| `volumeNamePrefix` | no | Prefix to use for naming volumes (defaults to `csi-vol-`). |
| `snapshotNamePrefix` | no | Prefix to use for naming snapshots (defaults to `csi-snap-`)
| `kernelMountOptions` | no | Comma separated string of mount options accepted by cephfs kernel mounter, by default no options are passed. Check man mount.ceph for options. |
| `fuseMountOptions` | no | Comma separated string of mount options accepted by ceph-fuse mounter, by default no options are passed. |
| `csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | Name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value |

View File

@ -49,6 +49,8 @@ make image-cephcsi
| `clusterID` | yes | String representing a Ceph cluster, must be unique across all Ceph clusters in use for provisioning, cannot be greater than 36 bytes in length, and should remain immutable for the lifetime of the Ceph cluster in use |
| `pool` | yes | Ceph pool into which the RBD image shall be created |
| `dataPool` | no | Ceph pool used for the data of the RBD images. |
| `volumeNamePrefix` | no | Prefix to use for naming RBD volume images (defaults to `csi-vol-`). |
| `snapshotNamePrefix` | no | Prefix to use for naming RBD snapshot images (defaults to `csi-snap-`). |
| `imageFeatures` | no | RBD image features. CSI RBD currently supports only `layering` feature. See [man pages](http://docs.ceph.com/docs/mimic/man/8/rbd/#cmdoption-rbd-image-feature) |
| `csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | yes (for Kubernetes) | name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value |
| `csi.storage.k8s.io/provisioner-secret-namespace`, `csi.storage.k8s.io/node-stage-secret-namespace` | yes (for Kubernetes) | namespaces of the above Secret objects |

View File

@ -2,6 +2,7 @@ package e2e
import (
"fmt"
"strings"
. "github.com/onsi/ginkgo" // nolint
@ -339,6 +340,45 @@ var _ = Describe("RBD", func() {
}
})
By("create PVC in storageClass with volumeNamePrefix", func() {
volumeNamePrefix := "foo-bar-"
deleteResource(rbdExamplePath + "storageclass.yaml")
createRBDStorageClass(f.ClientSet, f, map[string]string{"volumeNamePrefix": volumeNamePrefix})
// set up PVC
pvc, err := loadPVC(pvcPath)
if err != nil {
Fail(err.Error())
}
pvc.Namespace = f.UniqueName
err = createPVCAndvalidatePV(f.ClientSet, pvc, deployTimeout)
if err != nil {
Fail(err.Error())
}
// list RBD images and check if one of them has the same prefix
foundIt := false
for _, imgName := range listRBDImages(f) {
fmt.Printf("Checking prefix on %s\n", imgName)
if strings.HasPrefix(imgName, volumeNamePrefix) {
foundIt = true
break
}
}
// clean up after ourselves
err = deletePVCAndValidatePV(f.ClientSet, pvc, deployTimeout)
if err != nil {
Fail(err.Error())
}
deleteResource(rbdExamplePath + "storageclass.yaml")
createRBDStorageClass(f.ClientSet, f, make(map[string]string))
if !foundIt {
Fail(fmt.Sprintf("could not find image with prefix %s", volumeNamePrefix))
}
})
By("validate RBD static FileSystem PVC", func() {
err := validateRBDStaticPV(f, appPath, false)
if err != nil {

View File

@ -58,14 +58,21 @@ func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[s
defer cr.DeleteCredentials()
imageUUID, err := volJournal.CheckReservation(ctx, volOptions.Monitors, cr,
volOptions.MetadataPool, volOptions.RequestName, "", "")
volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix, "", "")
if err != nil {
return nil, err
}
if imageUUID == "" {
return nil, nil
}
vid.FsSubvolName = volJournal.NamingPrefix() + imageUUID
// now that we now that the reservation exists, let's get the volume name from
// the omap
_, vid.FsSubvolName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr,
volOptions.MetadataPool, imageUUID, false)
if err != nil {
return nil, err
}
// TODO: size checks
@ -107,6 +114,8 @@ func reserveVol(ctx context.Context, volOptions *volumeOptions, secret map[strin
var (
vi util.CSIIdentifier
vid volumeIdentifier
imageUUID string
err error
)
cr, err := util.NewAdminCredentials(secret)
@ -115,12 +124,11 @@ func reserveVol(ctx context.Context, volOptions *volumeOptions, secret map[strin
}
defer cr.DeleteCredentials()
imageUUID, err := volJournal.ReserveName(ctx, volOptions.Monitors, cr,
volOptions.MetadataPool, volOptions.RequestName, "", "")
imageUUID, vid.FsSubvolName, err = volJournal.ReserveName(ctx, volOptions.Monitors, cr,
volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix, "", "")
if err != nil {
return nil, err
}
vid.FsSubvolName = volJournal.NamingPrefix() + imageUUID
// generate the volume ID to return to the CO system
vi = util.CSIIdentifier{

View File

@ -28,6 +28,7 @@ import (
type volumeOptions struct {
RequestName string
NamePrefix string
Size int64
ClusterID string
FsName string
@ -197,7 +198,6 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
return nil, nil, ErrInvalidVolID{err}
}
volOptions.ClusterID = vi.ClusterID
vid.FsSubvolName = volJournal.NamingPrefix() + vi.ObjectUUID
vid.VolumeID = volID
volOptions.FscID = vi.LocationID
@ -221,7 +221,7 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
return nil, nil, err
}
volOptions.RequestName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr,
volOptions.RequestName, vid.FsSubvolName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr,
volOptions.MetadataPool, vi.ObjectUUID, false)
if err != nil {
return nil, nil, err

View File

@ -71,6 +71,9 @@ func (cs *ControllerServer) validateVolumeReq(ctx context.Context, req *csi.Crea
if value, ok := options["dataPool"]; ok && value == "" {
return status.Error(codes.InvalidArgument, "empty datapool name to provision volume from")
}
if value, ok := options["volumeNamePrefix"]; ok && value == "" {
return status.Error(codes.InvalidArgument, "empty volume name prefix to provision volume from")
}
return nil
}
@ -567,6 +570,11 @@ func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.Cr
return status.Error(codes.InvalidArgument, "source Volume ID cannot be empty")
}
options := req.GetParameters()
if value, ok := options["snapshotNamePrefix"]; ok && value == "" {
return status.Error(codes.InvalidArgument, "empty snapshot name prefix to provision snapshot from")
}
return nil
}

View File

@ -68,12 +68,12 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
disableInUseChecks := false
// MULTI_NODE_MULTI_WRITER is supported by default for Block access type volumes
if req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER {
if isBlock {
disableInUseChecks = true
} else {
if !isBlock {
klog.Warningf(util.Log(ctx, "MULTI_NODE_MULTI_WRITER currently only supported with volumes of access type `block`, invalid AccessMode for volume: %v"), req.GetVolumeId())
return nil, status.Error(codes.InvalidArgument, "rbd: RWX access mode request is only valid for volumes with access type `block`")
}
disableInUseChecks = true
}
volID := req.GetVolumeId()
@ -102,11 +102,6 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
}
isLegacyVolume, volName, err := getVolumeNameByID(volID, stagingParentPath, staticVol)
if err != nil {
return nil, err
}
var isNotMnt bool
// check if stagingPath is already mounted
isNotMnt, err = mount.IsNotMountPoint(ns.mounter, stagingTargetPath)
@ -115,16 +110,39 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
if !isNotMnt {
klog.Infof(util.Log(ctx, "rbd: volume %s is already mounted to %s, skipping"), req.GetVolumeId(), stagingTargetPath)
klog.Infof(util.Log(ctx, "rbd: volume %s is already mounted to %s, skipping"), volID, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
isLegacyVolume := isLegacyVolumeID(volID)
volOptions, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), req.GetSecrets(), disableInUseChecks, isLegacyVolume)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
volOptions.RbdImageName = volName
volOptions.VolID = req.GetVolumeId()
// get rbd image name from the volume journal
// for static volumes, the image name is actually the volume ID itself
// for legacy volumes (v1.0.0), the image name can be found in the staging path
switch {
case staticVol:
volOptions.RbdImageName = volID
case isLegacyVolume:
volOptions.RbdImageName, err = getLegacyVolumeName(stagingTargetPath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
default:
var vi util.CSIIdentifier
err = vi.DecomposeCSIID(volID)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID)
return nil, status.Error(codes.Internal, err.Error())
}
_, volOptions.RbdImageName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr, volOptions.Pool, vi.ObjectUUID, false)
}
volOptions.VolID = volID
isMounted := false
isEncrypted := false
@ -139,41 +157,13 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
defer func() {
if err != nil {
ns.undoStagingTransaction(ctx, stagingParentPath, devicePath, volID, isStagePathCreated, isMounted, isEncrypted)
ns.undoStagingTransaction(ctx, req, devicePath, isStagePathCreated, isMounted, isEncrypted)
}
}()
// Mapping RBD image
devicePath, err = attachRBDImage(ctx, volOptions, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
klog.V(4).Infof(util.Log(ctx, "rbd image: %s/%s was successfully mapped at %s\n"),
req.GetVolumeId(), volOptions.Pool, devicePath)
if volOptions.Encrypted {
devicePath, err = ns.processEncryptedDevice(ctx, volOptions, devicePath, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
isEncrypted = true
}
err = ns.createStageMountPoint(ctx, stagingTargetPath, isBlock)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
isStagePathCreated = true
// nodeStage Path
err = ns.mountVolumeToStagePath(ctx, req, staticVol, stagingTargetPath, devicePath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
isMounted = true
// #nosec - allow anyone to write inside the target path
err = os.Chmod(stagingTargetPath, 0777)
// perform the actual staging and if this fails, have undoStagingTransaction
// cleans up for us
isStagePathCreated, isMounted, isEncrypted, err = ns.stageTransaction(ctx, req, volOptions, staticVol)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@ -183,10 +173,65 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return &csi.NodeStageVolumeResponse{}, nil
}
func (ns *NodeServer) undoStagingTransaction(ctx context.Context, stagingParentPath, devicePath, volID string, isStagePathCreated, isMounted, isEncrypted bool) {
func (ns *NodeServer) stageTransaction(ctx context.Context, req *csi.NodeStageVolumeRequest, volOptions *rbdVolume, staticVol bool) (bool, bool, bool, error) {
isStagePathCreated := false
isMounted := false
isEncrypted := false
var err error
stagingTargetPath := stagingParentPath + "/" + volID
var cr *util.Credentials
cr, err = util.NewUserCredentials(req.GetSecrets())
if err != nil {
return isStagePathCreated, isMounted, isEncrypted, err
}
defer cr.DeleteCredentials()
// Mapping RBD image
var devicePath string
devicePath, err = attachRBDImage(ctx, volOptions, cr)
if err != nil {
return isStagePathCreated, isMounted, isEncrypted, err
}
klog.V(4).Infof(util.Log(ctx, "rbd image: %s/%s was successfully mapped at %s\n"),
req.GetVolumeId(), volOptions.Pool, devicePath)
if volOptions.Encrypted {
devicePath, err = ns.processEncryptedDevice(ctx, volOptions, devicePath, cr)
if err != nil {
return isStagePathCreated, isMounted, isEncrypted, err
}
isEncrypted = true
}
stagingTargetPath := getStagingTargetPath(req)
isBlock := req.GetVolumeCapability().GetBlock() != nil
err = ns.createStageMountPoint(ctx, stagingTargetPath, isBlock)
if err != nil {
return isStagePathCreated, isMounted, isEncrypted, err
}
isStagePathCreated = true
// nodeStage Path
err = ns.mountVolumeToStagePath(ctx, req, staticVol, stagingTargetPath, devicePath)
if err != nil {
return isStagePathCreated, isMounted, isEncrypted, err
}
isMounted = true
// #nosec - allow anyone to write inside the target path
err = os.Chmod(stagingTargetPath, 0777)
return isStagePathCreated, isMounted, isEncrypted, err
}
func (ns *NodeServer) undoStagingTransaction(ctx context.Context, req *csi.NodeStageVolumeRequest, devicePath string, isStagePathCreated, isMounted, isEncrypted bool) {
var err error
stagingTargetPath := getStagingTargetPath(req)
if isMounted {
err = ns.mounter.Unmount(stagingTargetPath)
if err != nil {
@ -204,6 +249,8 @@ func (ns *NodeServer) undoStagingTransaction(ctx context.Context, stagingParentP
}
}
volID := req.GetVolumeId()
// Unmapping rbd device
if devicePath != "" {
err = detachRBDDevice(ctx, devicePath, volID, isEncrypted)
@ -214,7 +261,7 @@ func (ns *NodeServer) undoStagingTransaction(ctx context.Context, stagingParentP
}
// Cleanup the stashed image metadata
if err = cleanupRBDImageMetadataStash(stagingParentPath); err != nil {
if err = cleanupRBDImageMetadataStash(req.GetStagingTargetPath()); err != nil {
klog.Errorf(util.Log(ctx, "failed to cleanup image metadata stash (%v)"), err)
return
}
@ -457,6 +504,19 @@ func (ns *NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return &csi.NodeUnpublishVolumeResponse{}, nil
}
// getStagingTargetPath concats either NodeStageVolumeRequest's or
// NodeUnstageVolumeRequest's target path with the volumeID
func getStagingTargetPath(req interface{}) string {
switch vr := req.(type) {
case *csi.NodeStageVolumeRequest:
return vr.GetStagingTargetPath() + "/" + vr.GetVolumeId()
case *csi.NodeUnstageVolumeRequest:
return vr.GetStagingTargetPath() + "/" + vr.GetVolumeId()
}
return ""
}
// NodeUnstageVolume unstages the volume from the staging path
func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
var err error
@ -473,7 +533,7 @@ func (ns *NodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
defer ns.VolumeLocks.Release(volID)
stagingParentPath := req.GetStagingTargetPath()
stagingTargetPath := stagingParentPath + "/" + req.GetVolumeId()
stagingTargetPath := getStagingTargetPath(req)
notMnt, err := mount.IsNotMountPoint(ns.mounter, stagingTargetPath)
if err != nil {
@ -556,10 +616,6 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
}
defer ns.VolumeLocks.Release(volumeID)
volName, err := getVolumeName(volumeID)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// volumePath is targetPath for block PVC and stagingPath for filesystem.
// check the path is mountpoint or not, if it is
// mountpoint treat this as block PVC or else it is filesystem PVC
@ -574,14 +630,12 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
if !notMnt {
return &csi.NodeExpandVolumeResponse{}, nil
}
imgInfo, devicePath, err := getDevicePathAndImageInfo(ctx, volumePath)
devicePath, err := getDevicePath(ctx, volumePath)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if volName != imgInfo.ImageName {
return nil, status.Errorf(codes.InvalidArgument, "volume name missmatch between request (%s) and stored metadata (%s)", volName, imgInfo.ImageName)
}
diskMounter := &mount.SafeFormatAndMount{Interface: ns.mounter, Exec: utilexec.New()}
// TODO check size and return success or error
volumePath += "/" + volumeID
@ -593,16 +647,16 @@ func (ns *NodeServer) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandV
return &csi.NodeExpandVolumeResponse{}, nil
}
func getDevicePathAndImageInfo(ctx context.Context, volumePath string) (rbdImageMetadataStash, string, error) {
func getDevicePath(ctx context.Context, volumePath string) (string, error) {
imgInfo, err := lookupRBDImageMetadataStash(volumePath)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to find image metadata: %v"), err)
}
device, found := findDeviceMappingImage(ctx, imgInfo.Pool, imgInfo.ImageName, imgInfo.NbdAccess)
if found {
return imgInfo, device, nil
return device, nil
}
return rbdImageMetadataStash{}, "", fmt.Errorf("failed to get device for stagingtarget path %v", volumePath)
return "", fmt.Errorf("failed to get device for stagingtarget path %v", volumePath)
}
// NodeGetCapabilities returns the supported capabilities of the node server
@ -731,28 +785,3 @@ func openEncryptedDevice(ctx context.Context, volOptions *rbdVolume, devicePath
return mapperFilePath, nil
}
func getVolumeNameByID(volID, stagingTargetPath string, staticVol bool) (bool, string, error) {
volName, err := getVolumeName(volID)
if err != nil {
if staticVol {
return false, volID, nil
}
// error ErrInvalidVolID may mean this is an 1.0.0 version volume, check for name
// pattern match in addition to error to ensure this is a likely v1.0.0 volume
if _, ok := err.(ErrInvalidVolID); !ok || !isLegacyVolumeID(volID) {
return false, "", status.Error(codes.InvalidArgument, err.Error())
}
volName, err = getLegacyVolumeName(stagingTargetPath)
if err != nil {
return false, "", status.Error(codes.InvalidArgument, err.Error())
}
return true, volName, nil
}
return false, volName, nil
}

View File

@ -0,0 +1,76 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"testing"
"github.com/container-storage-interface/spec/lib/go/csi"
)
func TestGetLegacyVolumeName(t *testing.T) {
tests := []struct {
mountPath string
volName string
}{
{"csi/vol/56a0cc34-a5c9-44ab-ad33-ed53dd2bd5ea/globalmount", "56a0cc34-a5c9-44ab-ad33-ed53dd2bd5ea"},
{"csi/vol/9fdb7491-3469-4414-8fe2-ea96be6f7f72/mount", "9fdb7491-3469-4414-8fe2-ea96be6f7f72"},
{"csi/vol/82cd91c4-4582-47b3-bb08-a84f8c5716d6", "82cd91c4-4582-47b3-bb08-a84f8c5716d6"},
}
for _, test := range tests {
if got, err := getLegacyVolumeName(test.mountPath); err != nil {
t.Errorf("getLegacyVolumeName(%s) returned error when it shouldn't: %s", test.mountPath, err.Error())
} else if got != test.volName {
t.Errorf("getLegacyVolumeName(%s) = %s, want %s", test.mountPath, got, test.volName)
}
}
}
func TestGetStagingPath(t *testing.T) {
var stagingPath string
// test with nodestagevolumerequest
nsvr := &csi.NodeStageVolumeRequest{
VolumeId: "758978be-6331-4925-b25e-e490fe99c9eb",
StagingTargetPath: "/path/to/stage",
}
expect := "/path/to/stage/758978be-6331-4925-b25e-e490fe99c9eb"
stagingPath = getStagingTargetPath(nsvr)
if stagingPath != expect {
t.Errorf("getStagingTargetPath() = %s, got %s", stagingPath, expect)
}
// test with nodestagevolumerequest
nuvr := &csi.NodeUnstageVolumeRequest{
VolumeId: "622cfdeb-69bf-4de6-9bd7-5fa0b71a603e",
StagingTargetPath: "/path/to/unstage",
}
expect = "/path/to/unstage/622cfdeb-69bf-4de6-9bd7-5fa0b71a603e"
stagingPath = getStagingTargetPath(nuvr)
if stagingPath != expect {
t.Errorf("getStagingTargetPath() = %s, got %s", stagingPath, expect)
}
// test with non-handled interface
expect = ""
stagingPath = getStagingTargetPath("")
if stagingPath != expect {
t.Errorf("getStagingTargetPath() = %s, got %s", stagingPath, expect)
}
}

View File

@ -115,14 +115,21 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent
}
snapUUID, err := snapJournal.CheckReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
rbdSnap.RequestName, rbdSnap.RbdImageName, "")
rbdSnap.RequestName, rbdSnap.NamePrefix, rbdSnap.RbdImageName, "")
if err != nil {
return false, err
}
if snapUUID == "" {
return false, nil
}
rbdSnap.RbdSnapName = snapJournal.NamingPrefix() + snapUUID
// now that we now that the reservation exists, let's get the image name from
// the omap
_, rbdSnap.RbdSnapName, _, _, err = volJournal.GetObjectUUIDData(ctx, rbdSnap.Monitors, cr,
rbdSnap.Pool, snapUUID, false)
if err != nil {
return false, err
}
// Fetch on-disk image attributes
err = updateSnapWithImageInfo(ctx, rbdSnap, cr)
@ -167,14 +174,21 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials
kmsID = rbdVol.KMS.GetID()
}
imageUUID, err := volJournal.CheckReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
rbdVol.RequestName, "", kmsID)
rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID)
if err != nil {
return false, err
}
if imageUUID == "" {
return false, nil
}
rbdVol.RbdImageName = volJournal.NamingPrefix() + imageUUID
// now that we now that the reservation exists, let's get the image name from
// the omap
_, rbdVol.RbdImageName, _, _, err = volJournal.GetObjectUUIDData(ctx, rbdVol.Monitors, cr,
rbdVol.Pool, imageUUID, false)
if err != nil {
return false, err
}
// NOTE: Return volsize should be on-disk volsize, not request vol size, so
// save it for size checks before fetching image data
@ -214,8 +228,13 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials
// reserveSnap is a helper routine to request a rbdSnapshot name reservation and generate the
// volume ID for the generated name
func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error {
snapUUID, err := snapJournal.ReserveName(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
rbdSnap.RequestName, rbdSnap.RbdImageName, "")
var (
snapUUID string
err error
)
snapUUID, rbdSnap.RbdSnapName, err = snapJournal.ReserveName(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
rbdSnap.RequestName, rbdSnap.NamePrefix, rbdSnap.RbdImageName, "")
if err != nil {
return err
}
@ -226,10 +245,8 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials
return err
}
rbdSnap.RbdSnapName = snapJournal.NamingPrefix() + snapUUID
klog.V(4).Infof(util.Log(ctx, "generated Volume ID (%s) and image name (%s) for request name (%s)"),
rbdSnap.SnapID, rbdSnap.RbdImageName, rbdSnap.RequestName)
rbdSnap.SnapID, rbdSnap.RbdSnapName, rbdSnap.RequestName)
return nil
}
@ -237,12 +254,18 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials
// reserveVol is a helper routine to request a rbdVolume name reservation and generate the
// volume ID for the generated name
func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
var (
imageUUID string
err error
)
kmsID := ""
if rbdVol.Encrypted {
kmsID = rbdVol.KMS.GetID()
}
imageUUID, err := volJournal.ReserveName(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
rbdVol.RequestName, "", kmsID)
imageUUID, rbdVol.RbdImageName, err = volJournal.ReserveName(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID)
if err != nil {
return err
}
@ -253,8 +276,6 @@ func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) er
return err
}
rbdVol.RbdImageName = volJournal.NamingPrefix() + imageUUID
klog.V(4).Infof(util.Log(ctx, "generated Volume ID (%s) and image name (%s) for request name (%s)"),
rbdVol.VolID, rbdVol.RbdImageName, rbdVol.RequestName)

View File

@ -70,6 +70,7 @@ type rbdVolume struct {
// VolName and MonValueFromSecret are retained from older plugin versions (<= 1.0.0)
// for backward compatibility reasons
RbdImageName string
NamePrefix string
VolID string `json:"volID"`
Monitors string `json:"monitors"`
Pool string `json:"pool"`
@ -98,6 +99,7 @@ type rbdSnapshot struct {
// RequestName is the CSI generated snapshot name for the rbdSnapshot
SourceVolumeID string
RbdImageName string
NamePrefix string
RbdSnapName string
SnapID string
Monitors string
@ -295,7 +297,6 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str
rbdSnap.ClusterID = vi.ClusterID
options["clusterID"] = rbdSnap.ClusterID
rbdSnap.RbdSnapName = snapJournal.NamingPrefix() + vi.ObjectUUID
rbdSnap.Monitors, _, err = getMonsAndClusterID(ctx, options)
if err != nil {
@ -307,7 +308,7 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str
return err
}
rbdSnap.RequestName, rbdSnap.RbdImageName, _, err = snapJournal.GetObjectUUIDData(ctx, rbdSnap.Monitors,
rbdSnap.RequestName, rbdSnap.RbdSnapName, rbdSnap.RbdImageName, _, err = snapJournal.GetObjectUUIDData(ctx, rbdSnap.Monitors,
cr, rbdSnap.Pool, vi.ObjectUUID, true)
if err != nil {
return err
@ -339,7 +340,6 @@ func genVolFromVolID(ctx context.Context, rbdVol *rbdVolume, volumeID string, cr
rbdVol.ClusterID = vi.ClusterID
options["clusterID"] = rbdVol.ClusterID
rbdVol.RbdImageName = volJournal.NamingPrefix() + vi.ObjectUUID
rbdVol.Monitors, _, err = getMonsAndClusterID(ctx, options)
if err != nil {
@ -352,8 +352,8 @@ func genVolFromVolID(ctx context.Context, rbdVol *rbdVolume, volumeID string, cr
}
kmsID := ""
rbdVol.RequestName, _, kmsID, err = volJournal.GetObjectUUIDData(
ctx, rbdVol.Monitors, cr, rbdVol.Pool, vi.ObjectUUID, false)
rbdVol.RequestName, rbdVol.RbdImageName, _, kmsID, err = volJournal.GetObjectUUIDData(ctx, rbdVol.Monitors, cr,
rbdVol.Pool, vi.ObjectUUID, false)
if err != nil {
return err
}
@ -456,6 +456,7 @@ func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[st
var (
ok bool
err error
namePrefix string
encrypted string
)
@ -466,6 +467,9 @@ func genVolFromVolumeOptions(ctx context.Context, volOptions, credentials map[st
}
rbdVol.DataPool = volOptions["dataPool"]
if namePrefix, ok = volOptions["volumeNamePrefix"]; ok {
rbdVol.NamePrefix = namePrefix
}
if isLegacyVolume {
err = updateMons(rbdVol, volOptions, credentials)
@ -537,6 +541,10 @@ func genSnapFromOptions(ctx context.Context, rbdVol *rbdVolume, snapOptions map[
rbdSnap.ClusterID = rbdVol.ClusterID
}
if namePrefix, ok := snapOptions["snapshotNamePrefix"]; ok {
rbdSnap.NamePrefix = namePrefix
}
return rbdSnap
}
@ -870,23 +878,16 @@ func resizeRBDImage(rbdVol *rbdVolume, newSize int64, cr *util.Credentials) erro
return nil
}
func getVolumeName(volID string) (string, error) {
func ensureEncryptionMetadataSet(ctx context.Context, cr *util.Credentials, rbdVol *rbdVolume) error {
var vi util.CSIIdentifier
err := vi.DecomposeCSIID(volID)
err := vi.DecomposeCSIID(rbdVol.VolID)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID)
return "", ErrInvalidVolID{err}
err = fmt.Errorf("error decoding volume ID (%s) (%s)", rbdVol.VolID, err)
return ErrInvalidVolID{err}
}
return volJournal.NamingPrefix() + vi.ObjectUUID, nil
}
func ensureEncryptionMetadataSet(ctx context.Context, cr *util.Credentials, rbdVol *rbdVolume) error {
rbdImageName, err := getVolumeName(rbdVol.VolID)
if err != nil {
return err
}
rbdImageName := volJournal.GetNameForUUID(rbdVol.NamePrefix, vi.ObjectUUID, false)
imageSpec := rbdVol.Pool + "/" + rbdImageName
err = util.SaveRbdImageEncryptionStatus(ctx, cr, rbdVol.Monitors, imageSpec, rbdImageRequiresEncryption)

56
pkg/rbd/rbd_util_test.go Normal file
View File

@ -0,0 +1,56 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rbd
import (
"testing"
)
func TestIsLegacyVolumeID(t *testing.T) {
tests := []struct {
volID string
isLegacy bool
}{
{"prefix-bda37d42-9979-420f-9389-74362f3f98f6", false},
{"csi-rbd-vo-f997e783-ff00-48b0-8cc7-30cb36c3df3d", false},
{"csi-rbd-vol-this-is-clearly-not-a-valid-UUID----", false},
{"csi-rbd-vol-b82f27de-3b3a-43f2-b5e7-9f8d0aad04e9", true},
}
for _, test := range tests {
if got := isLegacyVolumeID(test.volID); got != test.isLegacy {
t.Errorf("isLegacyVolumeID(%s) = %t, want %t", test.volID, got, test.isLegacy)
}
}
}
func TestHasSnapshotFeature(t *testing.T) {
tests := []struct {
features string
hasFeature bool
}{
{"foo", false},
{"foo,bar", false},
{"foo,layering,bar", true},
}
for _, test := range tests {
if got := hasSnapshotFeature(test.features); got != test.hasFeature {
t.Errorf("hasSnapshotFeature(%s) = %t, want %t", test.features, got, test.hasFeature)
}
}
}

View File

@ -19,7 +19,6 @@ package util
import (
"context"
"fmt"
"strings"
"github.com/pborman/uuid"
"github.com/pkg/errors"
@ -47,13 +46,17 @@ generated volume (or snapshot) name. There are 4 types of omaps in use,
- Key value contains the snapshot uuid that is created, for the CO provided name
- A per volume omap named "csi.volume."+[volume uuid], (referred to as CephUUIDDirectory)
- stores a single key named "csi.volname", that has the value of the CO generated VolName that
- stores the key named "csi.volname", that has the value of the CO generated VolName that
this volume refers to (referred to using csiNameKey value)
- stores the key named "csi.imagename", that has the value of the Ceph RBD image name
this volume refers to (referred to using csiImageKey value)
- A per snapshot omap named "rbd.csi.snap."+[RBD snapshot uuid], (referred to as CephUUIDDirectory)
- stores a key named "csi.snapname", that has the value of the CO generated SnapName that this
snapshot refers to (referred to using csiNameKey value)
- also stores another key named "csi.source", that has the value of the volume name that is the
- stores the key named "csi.imagename", that has the value of the Ceph RBD image name
this snapshot refers to (referred to using csiImageKey value)
- stores a key named "csi.source", that has the value of the volume name that is the
source of the snapshot (referred to using cephSnapSourceKey value)
Creation of omaps:
@ -72,7 +75,8 @@ that we do not use a uuid that is already in use
- Next, a key with the VolName is created in the csiDirectory, and its value is updated to store the
generated uuid
- This is followed by updating the CephUUIDDirectory with the VolName in the csiNameKey
- This is followed by updating the CephUUIDDirectory with the VolName in the csiNameKey and the RBD image
name in the csiImageKey
- Finally, the volume is created (or promoted from a snapshot, if content source was provided),
using the uuid and a corresponding name prefix (namingPrefix) as the volume name
@ -94,6 +98,11 @@ proceeding with deleting the volume and the related omap entries, to ensure ther
single entity modifying the related omaps for a given VolName.
*/
const (
defaultVolumeNamingPrefix string = "csi-vol-"
defaultSnapshotNamingPrefix string = "csi-snap-"
)
type CSIJournal struct {
// csiDirectory is the name of the CSI volumes object map that contains CSI volume-name (or
// snapshot name) based keys
@ -109,13 +118,14 @@ type CSIJournal struct {
// Ceph volume was created
csiNameKey string
// CSI image-name key in per Ceph volume object map, containing RBD image-name
// of this Ceph volume
csiImageKey string
// source volume name key in per Ceph snapshot object map, containing Ceph source volume uuid
// for which the snapshot was created
cephSnapSourceKey string
// volume name prefix for naming on Ceph rbd or FS, suffix is a uuid generated per volume
namingPrefix string
// namespace in which the RADOS objects are stored, default is no namespace
namespace string
@ -123,37 +133,44 @@ type CSIJournal struct {
encryptKMSKey string
}
// CSIVolumeJournal returns an instance of volume keys
// NewCSIVolumeJournal returns an instance of CSIJournal for volumes
func NewCSIVolumeJournal() *CSIJournal {
return &CSIJournal{
csiDirectory: "csi.volumes",
csiNameKeyPrefix: "csi.volume.",
cephUUIDDirectoryPrefix: "csi.volume.",
csiNameKey: "csi.volname",
namingPrefix: "csi-vol-",
csiImageKey: "csi.imagename",
cephSnapSourceKey: "",
namespace: "",
encryptKMSKey: "csi.volume.encryptKMS",
}
}
// CSISnapshotSnapshot returns an instance of snapshot keys
// NewCSISnapshotJournal returns an instance of CSIJournal for snapshots
func NewCSISnapshotJournal() *CSIJournal {
return &CSIJournal{
csiDirectory: "csi.snaps",
csiNameKeyPrefix: "csi.snap.",
cephUUIDDirectoryPrefix: "csi.snap.",
csiNameKey: "csi.snapname",
namingPrefix: "csi-snap-",
csiImageKey: "csi.imagename",
cephSnapSourceKey: "csi.source",
namespace: "",
encryptKMSKey: "csi.volume.encryptKMS",
}
}
// NamingPrefix returns the value of naming prefix from the journal keys
func (cj *CSIJournal) NamingPrefix() string {
return cj.namingPrefix
// GetNameForUUID returns volume name
func (cj *CSIJournal) GetNameForUUID(prefix, uid string, isSnapshot bool) string {
if prefix == "" {
if isSnapshot {
prefix = defaultSnapshotNamingPrefix
} else {
prefix = defaultVolumeNamingPrefix
}
}
return prefix + uid
}
// SetCSIDirectorySuffix sets the given suffix for the csiDirectory omap
@ -181,7 +198,7 @@ Return values:
there was no reservation found
- error: non-nil in case of any errors
*/
func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials, pool, reqName, parentName, encryptionKmsConfig string) (string, error) {
func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials, pool, reqName, namePrefix, parentName, kmsConf string) (string, error) {
var snapSource bool
if parentName != "" {
@ -204,13 +221,13 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr
return "", err
}
savedReqName, savedReqParentName, savedKms, err := cj.GetObjectUUIDData(ctx, monitors, cr, pool,
savedReqName, _, savedReqParentName, savedKms, err := cj.GetObjectUUIDData(ctx, monitors, cr, pool,
objUUID, snapSource)
if err != nil {
// error should specifically be not found, for image to be absent, any other error
// is not conclusive, and we should not proceed
if _, ok := err.(ErrKeyNotFound); ok {
err = cj.UndoReservation(ctx, monitors, cr, pool, cj.namingPrefix+objUUID, reqName)
err = cj.UndoReservation(ctx, monitors, cr, pool, cj.GetNameForUUID(namePrefix, objUUID, snapSource), reqName)
}
return "", err
}
@ -224,11 +241,11 @@ func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr
reqName, objUUID, savedReqName)
}
if encryptionKmsConfig != "" {
if savedKms != encryptionKmsConfig {
if kmsConf != "" {
if savedKms != kmsConf {
return "", fmt.Errorf("internal state inconsistent, omap encryption KMS"+
" mismatch, request KMS (%s) volume UUID (%s) volume omap KMS (%s)",
encryptionKmsConfig, objUUID, savedKms)
kmsConf, objUUID, savedKms)
}
}
@ -260,7 +277,16 @@ held, to prevent parallel operations from modifying the state of the omaps for t
func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials, pool, volName, reqName string) error {
// delete volume UUID omap (first, inverse of create order)
// TODO: Check cases where volName can be empty, and we need to just cleanup the reqName
imageUUID := strings.TrimPrefix(volName, cj.namingPrefix)
if len(volName) < 36 {
return fmt.Errorf("unable to parse UUID from %s, too short", volName)
}
imageUUID := volName[len(volName)-36:]
if valid := uuid.Parse(imageUUID); valid == nil {
return fmt.Errorf("failed parsing UUID in %s", volName)
}
err := RemoveObject(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
if err != nil {
if _, ok := err.(ErrObjectNotFound); !ok {
@ -321,15 +347,16 @@ held, to prevent parallel operations from modifying the state of the omaps for t
Return values:
- string: Contains the UUID that was reserved for the passed in reqName
- string: Contains the image name that was reserved for the passed in reqName
- error: non-nil in case of any errors
*/
func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials, pool, reqName, parentName, encryptionKmsConfig string) (string, error) {
func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials, pool, reqName, namePrefix, parentName, kmsConf string) (string, string, error) {
var snapSource bool
if parentName != "" {
if cj.cephSnapSourceKey == "" {
err := errors.New("invalid request, cephSnapSourceKey is nil")
return "", err
return "", "", err
}
snapSource = true
}
@ -340,39 +367,46 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred
// UUID directory key will be leaked
volUUID, err := reserveOMapName(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix)
if err != nil {
return "", err
return "", "", err
}
imageName := cj.GetNameForUUID(namePrefix, volUUID, snapSource)
// Create request name (csiNameKey) key in csiDirectory and store the UUId based
// volume name into it
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName, volUUID)
if err != nil {
return "", err
return "", "", err
}
defer func() {
if err != nil {
klog.Warningf(Log(ctx, "reservation failed for volume: %s"), reqName)
errDefer := cj.UndoReservation(ctx, monitors, cr, pool, cj.namingPrefix+volUUID,
reqName)
errDefer := cj.UndoReservation(ctx, monitors, cr, pool, imageName, reqName)
if errDefer != nil {
klog.Warningf(Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer)
}
}
}()
// Update UUID directory to store CSI request name
// Update UUID directory to store CSI request name and image name
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiNameKey, reqName)
if err != nil {
return "", err
return "", "", err
}
if encryptionKmsConfig != "" {
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.encryptKMSKey, encryptionKmsConfig)
cj.csiImageKey, imageName)
if err != nil {
return "", err
return "", "", err
}
if kmsConf != "" {
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.encryptKMSKey, kmsConf)
if err != nil {
return "", "", err
}
}
@ -381,34 +415,53 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.cephSnapSourceKey, parentName)
if err != nil {
return "", err
return "", "", err
}
}
return volUUID, nil
return volUUID, imageName, nil
}
/*
GetObjectUUIDData fetches all keys from a UUID directory
Return values:
- string: Contains the request name for the passed in UUID
- string: Contains the rbd image name for the passed in UUID
- string: Contains the parent image name for the passed in UUID, if it is a snapshot
- string: Contains encryption KMS, if it is an encrypted image
- error: non-nil in case of any errors
*/
func (cj *CSIJournal) GetObjectUUIDData(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (string, string, string, error) {
func (cj *CSIJournal) GetObjectUUIDData(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (string, string, string, string, error) {
var sourceName string
if snapSource && cj.cephSnapSourceKey == "" {
err := errors.New("invalid request, cephSnapSourceKey is nil")
return "", "", "", err
return "", "", "", "", err
}
// TODO: fetch all omap vals in one call, than make multiple listomapvals
requestName, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey)
if err != nil {
return "", "", "", err
return "", "", "", "", err
}
// image key was added at some point, so not all volumes will have this key set
// when ceph-csi was upgraded
imageName, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiImageKey)
if err != nil {
// if the key was not found, assume the default key + UUID
// otherwise return error
if _, ok := err.(ErrKeyNotFound); !ok {
return "", "", "", "", err
}
if snapSource {
imageName = defaultSnapshotNamingPrefix + objectUUID
} else {
imageName = defaultVolumeNamingPrefix + objectUUID
}
}
encryptionKmsConfig := ""
@ -416,7 +469,7 @@ func (cj *CSIJournal) GetObjectUUIDData(ctx context.Context, monitors string, cr
cj.cephUUIDDirectoryPrefix+objectUUID, cj.encryptKMSKey)
if err != nil {
if _, ok := err.(ErrKeyNotFound); !ok {
return "", "", "", fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s",
return "", "", "", "", fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s",
pool, cj.cephUUIDDirectoryPrefix+objectUUID, err)
}
// ErrKeyNotFound means no encryption KMS was used
@ -426,9 +479,9 @@ func (cj *CSIJournal) GetObjectUUIDData(ctx context.Context, monitors string, cr
sourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey)
if err != nil {
return "", "", "", err
return "", "", "", "", err
}
}
return requestName, sourceName, encryptionKmsConfig, nil
return requestName, imageName, sourceName, encryptionKmsConfig, nil
}

View File

@ -145,6 +145,10 @@ linters-settings:
# minimal code complexity to report, 30 by default (but we recommend 10-20)
# TODO: decrease complexity with refacoring the code
min-complexity: 40
dogsled:
# voljournal.GetObjectUUIDData currently returns 4 values of which some may
# not always be useful
max-blank-identifiers: 3
linters:
enable-all: true