Add topology support to ceph-csi

Signed-off-by: ShyamsundarR <srangana@redhat.com>
This commit is contained in:
ShyamsundarR 2020-01-24 11:26:56 -05:00 committed by mergify[bot]
parent 5475022bc3
commit 5c4abf8347
31 changed files with 1017 additions and 273 deletions

View File

@ -58,6 +58,8 @@ func init() {
flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups")
flag.BoolVar(&conf.IsControllerServer, "controllerserver", false, "start cephcsi controller server")
flag.BoolVar(&conf.IsNodeServer, "nodeserver", false, "start cephcsi node server")
flag.StringVar(&conf.DomainLabels, "domainlabels", "", "list of kubernetes node labels, that determines the topology"+
" domain the node belongs to, separated by ','")
// cephfs related flags
// marking this as deprecated, remove it in next major release

View File

@ -40,6 +40,7 @@ spec:
- "--enable-leader-election=true"
- "--leader-election-type=leases"
- "--retry-interval-start=500ms"
- "--feature-gates=Topology=true"
env:
- name: ADDRESS
value: unix:///csi/csi-provisioner.sock

View File

@ -63,6 +63,11 @@ spec:
- "--v=5"
- "--drivername=cephfs.csi.ceph.com"
- "--metadatastorage=k8s_configmap"
# If topology based provisioning is desired, configure required
# node labels representing the nodes topology domain
# and pass the label names below, for CSI to consume and advertize
# its equivalent topology domain
# - "--domainlabels=failure-domain/region,failure-domain/zone"
env:
- name: POD_IP
valueFrom:

View File

@ -3,3 +3,37 @@ apiVersion: v1
kind: ServiceAccount
metadata:
name: cephfs-csi-nodeplugin
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: cephfs-csi-nodeplugin
aggregationRule:
clusterRoleSelectors:
- matchLabels:
rbac.cephfs.csi.ceph.com/aggregate-to-cephfs-csi-nodeplugin: "true"
rules: []
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: cephfs-csi-nodeplugin-rules
labels:
rbac.cephfs.csi.ceph.com/aggregate-to-cephfs-csi-nodeplugin: "true"
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: cephfs-csi-nodeplugin
subjects:
- kind: ServiceAccount
name: cephfs-csi-nodeplugin
namespace: default
roleRef:
kind: ClusterRole
name: cephfs-csi-nodeplugin
apiGroup: rbac.authorization.k8s.io

View File

@ -22,6 +22,9 @@ metadata:
labels:
rbac.cephfs.csi.ceph.com/aggregate-to-cephfs-external-provisioner-runner: "true"
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", watch"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list"]
@ -43,6 +46,9 @@ rules:
- apiGroups: [""]
resources: ["persistentvolumeclaims/status"]
verbs: ["update", "patch"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get", "list", "watch"]
---
kind: ClusterRoleBinding

View File

@ -3,3 +3,37 @@ apiVersion: v1
kind: ServiceAccount
metadata:
name: rbd-csi-nodeplugin
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: rbd-csi-nodeplugin
aggregationRule:
clusterRoleSelectors:
- matchLabels:
rbac.rbd.csi.ceph.com/aggregate-to-rbd-csi-nodeplugin: "true"
rules: []
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: rbd-csi-nodeplugin-rules
labels:
rbac.rbd.csi.ceph.com/aggregate-to-rbd-csi-nodeplugin: "true"
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: rbd-csi-nodeplugin
subjects:
- kind: ServiceAccount
name: rbd-csi-nodeplugin
namespace: default
roleRef:
kind: ClusterRole
name: rbd-csi-nodeplugin
apiGroup: rbac.authorization.k8s.io

View File

@ -22,6 +22,9 @@ metadata:
labels:
rbac.rbd.csi.ceph.com/aggregate-to-rbd-external-provisioner-runner: "true"
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list"]
@ -52,6 +55,9 @@ rules:
- apiGroups: ["storage.k8s.io"]
resources: ["volumeattachments"]
verbs: ["get", "list", "watch", "update", "patch"]
- apiGroups: ["storage.k8s.io"]
resources: ["csinodes"]
verbs: ["get", "list", "watch"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots/status"]
verbs: ["update"]

View File

@ -40,6 +40,7 @@ spec:
- "--retry-interval-start=500ms"
- "--enable-leader-election=true"
- "--leader-election-type=leases"
- "--feature-gates=Topology=true"
env:
- name: ADDRESS
value: unix:///csi/csi-provisioner.sock

View File

@ -63,6 +63,11 @@ spec:
- "--endpoint=$(CSI_ENDPOINT)"
- "--v=5"
- "--drivername=rbd.csi.ceph.com"
# If topology based provisioning is desired, configure required
# node labels representing the nodes topology domain
# and pass the label names below, for CSI to consume and advertize
# its equivalent topology domain
# - "--domainlabels=failure-domain/region,failure-domain/zone"
env:
- name: POD_IP
valueFrom:

View File

@ -44,7 +44,7 @@ that should be resolved in v14.2.3.
**Available command line arguments:**
| Option | Default value | Description |
| ------------------------- | --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
|---------------------------|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket |
| `--drivername` | `cephfs.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) |
| `--nodeid` | _empty_ | This node's ID |
@ -53,13 +53,14 @@ that should be resolved in v14.2.3.
| `--pluginpath` | "/var/lib/kubelet/plugins/" | The location of cephcsi plugin on host |
| `--metadatastorage` | _empty_ | Points to where older (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) |
| `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. |
| `--metricsport` | `8080` | TCP port for liveness metrics requests |
| `--metricsport` | `8080` | TCP port for liveness metrics requests |
| `--metricspath` | `/metrics` | Path of prometheus endpoint where metrics will be available |
| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server |
| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server |
| `--polltime` | `60s` | Time interval in between each poll |
| `--timeout` | `3s` | Probe timeout in seconds |
| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) |
| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) |
| `--forcecephkernelclient` | `false` | Force enabling Ceph Kernel clients for mounting on kernels < 4.17 |
| `--domainlabels` | _empty_ | Kubernetes node labels to use as CSI domain labels for topology aware provisioning, should be a comma separated value (ex:= "failure-domain/region,failure-domain/zone") |
**NOTE:** The parameter `-forcecephkernelclient` enables the Kernel
CephFS mounter on kernels < 4.17.

View File

@ -27,7 +27,7 @@ make image-cephcsi
**Available command line arguments:**
| Option | Default value | Description |
| --------------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ |
|-----------------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket |
| `--drivername` | `rbd.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) |
| `--nodeid` | _empty_ | This node's ID |
@ -35,12 +35,13 @@ make image-cephcsi
| `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning |
| `--metadatastorage` | _empty_ | Points to where legacy (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) |
| `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. |
| `--metricsport` | `8080` | TCP port for liveness metrics requests |
| `--metricsport` | `8080` | TCP port for liveness metrics requests |
| `--metricspath` | `"/metrics"` | Path of prometheus endpoint where metrics will be available |
| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server |
| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server |
| `--polltime` | `"60s"` | Time interval in between each poll |
| `--timeout` | `"3s"` | Probe timeout in seconds |
| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) |
| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) |
| `--domainlabels` | _empty_ | Kubernetes node labels to use as CSI domain labels for topology aware provisioning, should be a comma separated value (ex:= "failure-domain/region,failure-domain/zone") |
**Available volume parameters:**

View File

@ -293,7 +293,7 @@ var _ = Describe("RBD", func() {
// validate created backend rbd images
images := listRBDImages(f)
if len(images) != totalCount {
e2elog.Logf("backend image creation not matching pvc count, image count = % pvc count %d", len(images), totalCount)
e2elog.Logf("backend image creation not matching pvc count, image count = %d pvc count %d images found = %+v", len(images), totalCount, images)
Fail("validate multiple pvc failed")
}

View File

@ -4,6 +4,10 @@ kind: StorageClass
metadata:
name: csi-rbd-sc
provisioner: rbd.csi.ceph.com
# If topology based provisioning is desired, delayed provisioning of
# PV is required and is enabled using the following attribute
# For further information read TODO<doc>
# volumeBindingMode: WaitForFirstConsumer
parameters:
# String representing a Ceph cluster to provision storage from.
# Should be unique across all Ceph clusters in use for provisioning,
@ -48,6 +52,25 @@ parameters:
# a unique ID matching KMS ConfigMap. The ID is only used for correlation to
# config map entry.
# encryptionKMSID: <kms-config-id>
# Add topology constrained pools configuration, if topology based pools
# are setup, and topology constrained provisioning is required.
# For further information read TODO<doc>
# topologyConstrainedPools: |
# [{"poolName":"pool0",
# "domainSegments":[
# {"domainLabel":"region","value":"east"},
# {"domainLabel":"zone","value":"zone1"}]},
# {"poolName":"pool1",
# "domainSegments":[
# {"domainLabel":"region","value":"east"},
# {"domainLabel":"zone","value":"zone2"}]},
# {"poolName":"pool2",
# "domainSegments":[
# {"domainLabel":"region","value":"west"},
# {"domainLabel":"zone","value":"zone1"}]}
# ]
reclaimPolicy: Delete
allowVolumeExpansion: true
mountOptions:

View File

@ -76,7 +76,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
defer cs.VolumeLocks.Release(requestName)
volOptions, err := newVolumeOptions(ctx, requestName, req.GetParameters(), secret)
volOptions, err := newVolumeOptions(ctx, requestName, req, secret)
if err != nil {
klog.Errorf(util.Log(ctx, "validation and extraction of volume options failed: %v"), err)
return nil, status.Error(codes.InvalidArgument, err.Error())
@ -95,13 +95,20 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
// TODO return error message if requested vol size greater than found volume return error
if vID != nil {
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
VolumeContext: req.GetParameters(),
},
}, nil
volume := &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
VolumeContext: req.GetParameters(),
}
if volOptions.Topology != nil {
volume.AccessibleTopology =
[]*csi.Topology{
{
Segments: volOptions.Topology,
},
}
}
return &csi.CreateVolumeResponse{Volume: volume}, nil
}
// Reservation
@ -128,13 +135,20 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
klog.V(4).Infof(util.Log(ctx, "cephfs: successfully created backing volume named %s for request name %s"),
vID.FsSubvolName, requestName)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
VolumeContext: req.GetParameters(),
},
}, nil
volume := &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
VolumeContext: req.GetParameters(),
}
if volOptions.Topology != nil {
volume.AccessibleTopology =
[]*csi.Topology{
{
Segments: volOptions.Topology,
},
}
}
return &csi.CreateVolumeResponse{Volume: volume}, nil
}
// deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin,

View File

@ -26,7 +26,6 @@ import (
)
const (
// volIDVersion is the version number of volume ID encoding scheme
volIDVersion uint16 = 1
@ -81,9 +80,9 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
}
// NewNodeServer initialize a node server for ceph CSI driver.
func NewNodeServer(d *csicommon.CSIDriver, t string) *NodeServer {
func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) *NodeServer {
return &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t),
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
VolumeLocks: util.NewVolumeLocks(),
}
}
@ -91,14 +90,17 @@ func NewNodeServer(d *csicommon.CSIDriver, t string) *NodeServer {
// Run start a non-blocking grpc controller,node and identityserver for
// ceph CSI driver which can serve multiple parallel requests
func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
var err error
var topology map[string]string
// Configuration
PluginFolder = conf.PluginPath
if err := loadAvailableMounters(conf); err != nil {
if err = loadAvailableMounters(conf); err != nil {
klog.Fatalf("cephfs: failed to load ceph mounters: %v", err)
}
if err := util.WriteCephConfig(); err != nil {
if err = util.WriteCephConfig(); err != nil {
klog.Fatalf("failed to write ceph configuration file: %v", err)
}
@ -137,14 +139,22 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
fs.is = NewIdentityServer(fs.cd)
if conf.IsNodeServer {
fs.ns = NewNodeServer(fs.cd, conf.Vtype)
topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName)
if err != nil {
klog.Fatalln(err)
}
fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology)
}
if conf.IsControllerServer {
fs.cs = NewControllerServer(fs.cd, cachePersister)
}
if !conf.IsControllerServer && !conf.IsNodeServer {
fs.ns = NewNodeServer(fs.cd, conf.Vtype)
topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName)
if err != nil {
klog.Fatalln(err)
}
fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology)
fs.cs = NewControllerServer(fs.cd, cachePersister)
}

View File

@ -46,10 +46,7 @@ request name lock, and hence any stale omaps are leftovers from incomplete trans
hence safe to garbage collect.
*/
func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
var (
vi util.CSIIdentifier
vid volumeIdentifier
)
var vid volumeIdentifier
cr, err := util.NewAdminCredentials(secret)
if err != nil {
@ -57,41 +54,36 @@ func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[s
}
defer cr.DeleteCredentials()
imageUUID, err := volJournal.CheckReservation(ctx, volOptions.Monitors, cr,
imageData, err := volJournal.CheckReservation(ctx, volOptions.Monitors, cr,
volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix, "", "")
if err != nil {
return nil, err
}
if imageUUID == "" {
if imageData == nil {
return nil, nil
}
// now that we now that the reservation exists, let's get the volume name from
// the omap
_, vid.FsSubvolName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr,
volOptions.MetadataPool, imageUUID, false)
if err != nil {
return nil, err
}
imageUUID := imageData.ImageUUID
vid.FsSubvolName = imageData.ImageAttributes.ImageName
_, err = getVolumeRootPathCeph(ctx, volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
if _, ok := err.(ErrVolumeNotFound); ok {
err = volJournal.UndoReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
err = volJournal.UndoReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool,
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return nil, err
}
return nil, err
}
// check if topology constraints match what is found
// TODO: we need an API to fetch subvolume attributes (size/datapool and others), based
// on which we can evaluate which topology this belongs to.
// TODO: CephFS topology support is postponed till we get the same
// TODO: size checks
// found a volume already available, process and return it!
vi = util.CSIIdentifier{
LocationID: volOptions.FscID,
EncodingVersion: volIDVersion,
ClusterID: volOptions.ClusterID,
ObjectUUID: imageUUID,
}
vid.VolumeID, err = vi.ComposeCSIID()
vid.VolumeID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, imageUUID, volIDVersion)
if err != nil {
return nil, err
}
@ -111,16 +103,29 @@ func undoVolReservation(ctx context.Context, volOptions *volumeOptions, vid volu
defer cr.DeleteCredentials()
err = volJournal.UndoReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool,
vid.FsSubvolName, volOptions.RequestName)
volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName)
return err
}
func updateTopologyConstraints(volOpts *volumeOptions) error {
// update request based on topology constrained parameters (if present)
poolName, topology, err := util.FindPoolAndTopology(volOpts.TopologyPools, volOpts.TopologyRequirement)
if err != nil {
return err
}
if poolName != "" {
volOpts.Pool = poolName
volOpts.Topology = topology
}
return nil
}
// reserveVol is a helper routine to request a UUID reservation for the CSI VolumeName and,
// to generate the volume identifier for the reserved UUID
func reserveVol(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
var (
vi util.CSIIdentifier
vid volumeIdentifier
imageUUID string
err error
@ -132,20 +137,20 @@ func reserveVol(ctx context.Context, volOptions *volumeOptions, secret map[strin
}
defer cr.DeleteCredentials()
imageUUID, vid.FsSubvolName, err = volJournal.ReserveName(ctx, volOptions.Monitors, cr,
volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix, "", "")
err = updateTopologyConstraints(volOptions)
if err != nil {
return nil, err
}
imageUUID, vid.FsSubvolName, err = volJournal.ReserveName(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, util.InvalidPoolID,
volOptions.MetadataPool, util.InvalidPoolID, volOptions.RequestName, volOptions.NamePrefix, "", "")
if err != nil {
return nil, err
}
// generate the volume ID to return to the CO system
vi = util.CSIIdentifier{
LocationID: volOptions.FscID,
EncodingVersion: volIDVersion,
ClusterID: volOptions.ClusterID,
ObjectUUID: imageUUID,
}
vid.VolumeID, err = vi.ComposeCSIID()
vid.VolumeID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID,
"", volOptions.ClusterID, imageUUID, volIDVersion)
if err != nil {
return nil, err
}

View File

@ -48,6 +48,13 @@ func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.Ge
},
},
},
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
},
},
}, nil
}

View File

@ -21,26 +21,30 @@ import (
"fmt"
"strconv"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/pkg/errors"
"github.com/ceph/ceph-csi/pkg/util"
)
type volumeOptions struct {
RequestName string
NamePrefix string
Size int64
ClusterID string
FsName string
FscID int64
MetadataPool string
Monitors string `json:"monitors"`
Pool string `json:"pool"`
RootPath string `json:"rootPath"`
Mounter string `json:"mounter"`
ProvisionVolume bool `json:"provisionVolume"`
KernelMountOptions string `json:"kernelMountOptions"`
FuseMountOptions string `json:"fuseMountOptions"`
TopologyPools *[]util.TopologyConstrainedPool
TopologyRequirement *csi.TopologyRequirement
Topology map[string]string
RequestName string
NamePrefix string
Size int64
ClusterID string
FsName string
FscID int64
MetadataPool string
Monitors string `json:"monitors"`
Pool string `json:"pool"`
RootPath string `json:"rootPath"`
Mounter string `json:"mounter"`
ProvisionVolume bool `json:"provisionVolume"`
KernelMountOptions string `json:"kernelMountOptions"`
FuseMountOptions string `json:"fuseMountOptions"`
}
func validateNonEmptyField(field, fieldName string) error {
@ -127,12 +131,15 @@ func getMonsAndClusterID(options map[string]string) (string, string, error) {
// newVolumeOptions generates a new instance of volumeOptions from the provided
// CSI request parameters
func newVolumeOptions(ctx context.Context, requestName string, volOptions, secret map[string]string) (*volumeOptions, error) {
func newVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVolumeRequest,
secret map[string]string) (*volumeOptions, error) {
var (
opts volumeOptions
err error
)
volOptions := req.GetParameters()
opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(volOptions)
if err != nil {
return nil, err
@ -176,6 +183,19 @@ func newVolumeOptions(ctx context.Context, requestName string, volOptions, secre
return nil, err
}
// store topology information from the request
opts.TopologyPools, opts.TopologyRequirement, err = util.GetTopologyFromRequest(req)
if err != nil {
return nil, err
}
// TODO: we need an API to fetch subvolume attributes (size/datapool and others), based
// on which we can evaluate which topology this belongs to.
// CephFS tracker: https://tracker.ceph.com/issues/44277
if opts.TopologyPools != nil {
return nil, errors.New("topology based provisioning is not supported for CephFS backed volumes")
}
opts.ProvisionVolume = true
return &opts, nil
@ -221,11 +241,13 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret
return nil, nil, err
}
volOptions.RequestName, vid.FsSubvolName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr,
imageAttributes, err := volJournal.GetImageAttributes(ctx, volOptions.Monitors, cr,
volOptions.MetadataPool, vi.ObjectUUID, false)
if err != nil {
return nil, nil, err
}
volOptions.RequestName = imageAttributes.RequestName
vid.FsSubvolName = imageAttributes.ImageName
if volOpt != nil {
if err = extractOptionalOption(&volOptions.Pool, "pool", volOpt); err != nil {

View File

@ -30,8 +30,10 @@ type CSIDriver struct {
name string
nodeID string
version string
cap []*csi.ControllerServiceCapability
vc []*csi.VolumeCapability_AccessMode
// topology constraints that this nodeserver will advertise
topology map[string]string
cap []*csi.ControllerServiceCapability
vc []*csi.VolumeCapability_AccessMode
}
// NewCSIDriver Creates a NewCSIDriver object. Assumes vendor

View File

@ -57,8 +57,13 @@ func (ns *DefaultNodeServer) NodeExpandVolume(ctx context.Context, req *csi.Node
func (ns *DefaultNodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
klog.V(5).Infof(util.Log(ctx, "Using default NodeGetInfo"))
csiTopology := &csi.Topology{
Segments: ns.Driver.topology,
}
return &csi.NodeGetInfoResponse{
NodeId: ns.Driver.nodeID,
NodeId: ns.Driver.nodeID,
AccessibleTopology: csiTopology,
}, nil
}

View File

@ -48,7 +48,8 @@ func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *c
}
// NewDefaultNodeServer initializes default node server
func NewDefaultNodeServer(d *CSIDriver, t string) *DefaultNodeServer {
func NewDefaultNodeServer(d *CSIDriver, t string, topology map[string]string) *DefaultNodeServer {
d.topology = topology
return &DefaultNodeServer{
Driver: d,
Type: t,

View File

@ -83,7 +83,7 @@ func (cs *ControllerServer) parseVolCreateRequest(ctx context.Context, req *csi.
isMultiNode := false
isBlock := false
for _, cap := range req.VolumeCapabilities {
// RO modes need to be handled indepedently (ie right now even if access mode is RO, they'll be RW upon attach)
// RO modes need to be handled independently (ie right now even if access mode is RO, they'll be RW upon attach)
if cap.GetAccessMode().GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER {
isMultiNode = true
}
@ -114,6 +114,16 @@ func (cs *ControllerServer) parseVolCreateRequest(ctx context.Context, req *csi.
// always round up the request size in bytes to the nearest MiB/GiB
rbdVol.VolSize = util.RoundOffBytes(volSizeBytes)
// start with pool the same as journal pool, in case there is a topology
// based split, pool for the image will be updated subsequently
rbdVol.JournalPool = rbdVol.Pool
// store topology information from the request
rbdVol.TopologyPools, rbdVol.TopologyRequirement, err = util.GetTopologyFromRequest(req)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// NOTE: rbdVol does not contain VolID and RbdImageName populated, everything
// else is populated post create request parsing
return rbdVol, nil
@ -163,17 +173,32 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: rbdVol.VolID,
CapacityBytes: rbdVol.VolSize,
VolumeContext: req.GetParameters(),
ContentSource: req.GetVolumeContentSource(),
},
}, nil
volumeContext := req.GetParameters()
volumeContext["pool"] = rbdVol.Pool
volumeContext["journalPool"] = rbdVol.JournalPool
volume := &csi.Volume{
VolumeId: rbdVol.VolID,
CapacityBytes: rbdVol.VolSize,
VolumeContext: volumeContext,
ContentSource: req.GetVolumeContentSource(),
}
if rbdVol.Topology != nil {
volume.AccessibleTopology =
[]*csi.Topology{
{
Segments: rbdVol.Topology,
},
}
}
return &csi.CreateVolumeResponse{Volume: volume}, nil
}
err = reserveVol(ctx, rbdVol, cr)
rbdSnap, err := cs.checkSnapshotSource(ctx, req, cr)
if err != nil {
return nil, err
}
err = reserveVol(ctx, rbdVol, rbdSnap, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
@ -186,7 +211,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
}()
err = cs.createBackingImage(ctx, rbdVol, req)
err = createBackingImage(ctx, cr, rbdVol, rbdSnap)
if err != nil {
return nil, err
}
@ -205,80 +230,81 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: rbdVol.VolID,
CapacityBytes: rbdVol.VolSize,
VolumeContext: req.GetParameters(),
ContentSource: req.GetVolumeContentSource(),
},
}, nil
volumeContext := req.GetParameters()
volumeContext["pool"] = rbdVol.Pool
volumeContext["journalPool"] = rbdVol.JournalPool
volume := &csi.Volume{
VolumeId: rbdVol.VolID,
CapacityBytes: rbdVol.VolSize,
VolumeContext: volumeContext,
ContentSource: req.GetVolumeContentSource(),
}
if rbdVol.Topology != nil {
volume.AccessibleTopology =
[]*csi.Topology{
{
Segments: rbdVol.Topology,
},
}
}
return &csi.CreateVolumeResponse{Volume: volume}, nil
}
func (cs *ControllerServer) createBackingImage(ctx context.Context, rbdVol *rbdVolume, req *csi.CreateVolumeRequest) error {
func createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol *rbdVolume, rbdSnap *rbdSnapshot) error {
var err error
// if VolumeContentSource is not nil, this request is for snapshot
if req.VolumeContentSource != nil {
if err = cs.checkSnapshot(ctx, req, rbdVol); err != nil {
if rbdSnap != nil {
err = restoreSnapshot(ctx, rbdVol, rbdSnap, cr)
if err != nil {
return err
}
} else {
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
err = createImage(ctx, rbdVol, cr)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to create volume: %v"), err)
return status.Error(codes.Internal, err.Error())
}
klog.V(4).Infof(util.Log(ctx, "created image %s"), rbdVol.RbdImageName)
klog.V(4).Infof(util.Log(ctx, "created volume %s from snapshot %s"), rbdVol.RequestName, rbdSnap.RbdSnapName)
return nil
}
err = createImage(ctx, rbdVol, cr)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to create volume: %v"), err)
return status.Error(codes.Internal, err.Error())
}
klog.V(4).Infof(util.Log(ctx, "created volume %s backed by image %s"), rbdVol.RequestName, rbdVol.RbdImageName)
return nil
}
func (cs *ControllerServer) checkSnapshot(ctx context.Context, req *csi.CreateVolumeRequest, rbdVol *rbdVolume) error {
func (cs *ControllerServer) checkSnapshotSource(ctx context.Context, req *csi.CreateVolumeRequest,
cr *util.Credentials) (*rbdSnapshot, error) {
if req.VolumeContentSource == nil {
return nil, nil
}
snapshot := req.VolumeContentSource.GetSnapshot()
if snapshot == nil {
return status.Error(codes.InvalidArgument, "volume Snapshot cannot be empty")
return nil, status.Error(codes.InvalidArgument, "volume Snapshot cannot be empty")
}
snapshotID := snapshot.GetSnapshotId()
if snapshotID == "" {
return status.Error(codes.InvalidArgument, "volume Snapshot ID cannot be empty")
return nil, status.Error(codes.InvalidArgument, "volume Snapshot ID cannot be empty")
}
cr, err := util.NewUserCredentials(req.GetSecrets())
if err != nil {
return status.Error(codes.Internal, err.Error())
}
defer cr.DeleteCredentials()
rbdSnap := &rbdSnapshot{}
if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil {
if err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil {
if _, ok := err.(ErrSnapNotFound); !ok {
return status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, err.Error())
}
if _, ok := err.(util.ErrPoolNotFound); ok {
klog.Errorf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err)
return status.Error(codes.InvalidArgument, err.Error())
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return status.Error(codes.InvalidArgument, "missing requested Snapshot ID")
return nil, status.Error(codes.InvalidArgument, "missing requested Snapshot ID")
}
err = restoreSnapshot(ctx, rbdVol, rbdSnap, cr)
if err != nil {
return status.Error(codes.Internal, err.Error())
}
klog.V(4).Infof(util.Log(ctx, "create volume %s from snapshot %s"), req.GetName(), rbdSnap.RbdSnapName)
return nil
return rbdSnap, nil
}
// DeleteLegacyVolume deletes a volume provisioned using version 1.0.0 of the plugin

View File

@ -77,10 +77,10 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis
}
// NewNodeServer initialize a node server for rbd CSI driver.
func NewNodeServer(d *csicommon.CSIDriver, t string) (*NodeServer, error) {
func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*NodeServer, error) {
mounter := mount.New("")
return &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t),
DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology),
mounter: mounter,
VolumeLocks: util.NewVolumeLocks(),
}, nil
@ -90,6 +90,7 @@ func NewNodeServer(d *csicommon.CSIDriver, t string) (*NodeServer, error) {
// rbd CSI driver which can serve multiple parallel requests
func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
var err error
var topology map[string]string
// Create ceph.conf for use with CLI commands
if err = util.WriteCephConfig(); err != nil {
@ -134,7 +135,11 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
r.ids = NewIdentityServer(r.cd)
if conf.IsNodeServer {
r.ns, err = NewNodeServer(r.cd, conf.Vtype)
topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName)
if err != nil {
klog.Fatalln(err)
}
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology)
if err != nil {
klog.Fatalf("failed to start node server, err %v\n", err)
}
@ -144,7 +149,11 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) {
r.cs = NewControllerServer(r.cd, cachePersister)
}
if !conf.IsControllerServer && !conf.IsNodeServer {
r.ns, err = NewNodeServer(r.cd, conf.Vtype)
topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName)
if err != nil {
klog.Fatalln(err)
}
r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology)
if err != nil {
klog.Fatalf("failed to start node server, err %v\n", err)
}

View File

@ -48,6 +48,13 @@ func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.Ge
},
},
},
{
Type: &csi.PluginCapability_Service_{
Service: &csi.PluginCapability_Service{
Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS,
},
},
},
},
}, nil
}

View File

@ -145,13 +145,20 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
default:
var vi util.CSIIdentifier
var imageAttributes *util.ImageAttributes
err = vi.DecomposeCSIID(volID)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID)
return nil, status.Error(codes.Internal, err.Error())
}
_, volOptions.RbdImageName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr, volOptions.Pool, vi.ObjectUUID, false)
imageAttributes, err = volJournal.GetImageAttributes(ctx, volOptions.Monitors, cr,
volOptions.Pool, vi.ObjectUUID, false)
if err != nil {
err = fmt.Errorf("error fetching image attributes for volume ID (%s) (%s)", err, volID)
return nil, status.Error(codes.Internal, err.Error())
}
volOptions.RbdImageName = imageAttributes.ImageName
}
volOptions.VolID = volID

View File

@ -114,36 +114,36 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent
return false, err
}
snapUUID, err := snapJournal.CheckReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
snapData, err := snapJournal.CheckReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.JournalPool,
rbdSnap.RequestName, rbdSnap.NamePrefix, rbdSnap.RbdImageName, "")
if err != nil {
return false, err
}
if snapUUID == "" {
if snapData == nil {
return false, nil
}
snapUUID := snapData.ImageUUID
rbdSnap.RbdSnapName = snapData.ImageAttributes.ImageName
// now that we now that the reservation exists, let's get the image name from
// the omap
_, rbdSnap.RbdSnapName, _, _, err = volJournal.GetObjectUUIDData(ctx, rbdSnap.Monitors, cr,
rbdSnap.Pool, snapUUID, false)
if err != nil {
return false, err
// it should never happen that this disagrees, but check
if rbdSnap.Pool != snapData.ImagePool {
return false, fmt.Errorf("stored snapshot pool (%s) and expected snapshot pool (%s) mismatch",
snapData.ImagePool, rbdSnap.Pool)
}
// Fetch on-disk image attributes
err = updateSnapWithImageInfo(ctx, rbdSnap, cr)
if err != nil {
if _, ok := err.(ErrSnapNotFound); ok {
err = snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
rbdSnap.RbdSnapName, rbdSnap.RequestName)
err = snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.JournalPool,
rbdSnap.Pool, rbdSnap.RbdSnapName, rbdSnap.RequestName)
return false, err
}
return false, err
}
// found a snapshot already available, process and return its information
rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, snapData.ImagePoolID, rbdSnap.Pool,
rbdSnap.ClusterID, snapUUID, volIDVersion)
if err != nil {
return false, err
@ -173,22 +173,30 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials
if rbdVol.Encrypted {
kmsID = rbdVol.KMS.GetID()
}
imageUUID, err := volJournal.CheckReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
imageData, err := volJournal.CheckReservation(ctx, rbdVol.Monitors, cr, rbdVol.JournalPool,
rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID)
if err != nil {
return false, err
}
if imageUUID == "" {
if imageData == nil {
return false, nil
}
// now that we now that the reservation exists, let's get the image name from
// the omap
_, rbdVol.RbdImageName, _, _, err = volJournal.GetObjectUUIDData(ctx, rbdVol.Monitors, cr,
rbdVol.Pool, imageUUID, false)
imageUUID := imageData.ImageUUID
rbdVol.RbdImageName = imageData.ImageAttributes.ImageName
// check if topology constraints match what is found
rbdVol.Topology, err = util.MatchTopologyForPool(rbdVol.TopologyPools,
rbdVol.TopologyRequirement, imageData.ImagePool)
if err != nil {
// TODO check if need any undo operation here, or ErrVolNameConflict
return false, err
}
// update Pool, if it was topology constrained
if rbdVol.Topology != nil {
rbdVol.Pool = imageData.ImagePool
}
// NOTE: Return volsize should be on-disk volsize, not request vol size, so
// save it for size checks before fetching image data
@ -197,7 +205,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials
err = updateVolWithImageInfo(ctx, rbdVol, cr)
if err != nil {
if _, ok := err.(ErrImageNotFound); ok {
err = volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
err = volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.JournalPool, rbdVol.Pool,
rbdVol.RbdImageName, rbdVol.RequestName)
return false, err
}
@ -213,7 +221,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials
// TODO: We should also ensure image features and format is the same
// found a volume already available, process and return it!
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imageData.ImagePoolID, rbdVol.Pool,
rbdVol.ClusterID, imageUUID, volIDVersion)
if err != nil {
return false, err
@ -233,13 +241,18 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials
err error
)
snapUUID, rbdSnap.RbdSnapName, err = snapJournal.ReserveName(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
rbdSnap.RequestName, rbdSnap.NamePrefix, rbdSnap.RbdImageName, "")
journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdSnap.Monitors, rbdSnap.JournalPool, rbdSnap.Pool, cr)
if err != nil {
return err
}
rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
snapUUID, rbdSnap.RbdSnapName, err = snapJournal.ReserveName(ctx, rbdSnap.Monitors, cr, rbdSnap.JournalPool, journalPoolID,
rbdSnap.Pool, imagePoolID, rbdSnap.RequestName, rbdSnap.NamePrefix, rbdSnap.RbdImageName, "")
if err != nil {
return err
}
rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, imagePoolID, rbdSnap.Pool,
rbdSnap.ClusterID, snapUUID, volIDVersion)
if err != nil {
return err
@ -251,26 +264,66 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials
return nil
}
func updateTopologyConstraints(rbdVol *rbdVolume, rbdSnap *rbdSnapshot) error {
var err error
if rbdSnap != nil {
// check if topology constraints matches snapshot pool
rbdVol.Topology, err = util.MatchTopologyForPool(rbdVol.TopologyPools,
rbdVol.TopologyRequirement, rbdSnap.Pool)
if err != nil {
return err
}
// update Pool, if it was topology constrained
if rbdVol.Topology != nil {
rbdVol.Pool = rbdSnap.Pool
}
return nil
}
// update request based on topology constrained parameters (if present)
poolName, topology, err := util.FindPoolAndTopology(rbdVol.TopologyPools, rbdVol.TopologyRequirement)
if err != nil {
return err
}
if poolName != "" {
rbdVol.Pool = poolName
rbdVol.Topology = topology
}
return nil
}
// reserveVol is a helper routine to request a rbdVolume name reservation and generate the
// volume ID for the generated name
func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
func reserveVol(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) error {
var (
imageUUID string
err error
)
err = updateTopologyConstraints(rbdVol, rbdSnap)
if err != nil {
return err
}
journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdVol.Monitors, rbdVol.JournalPool, rbdVol.Pool, cr)
if err != nil {
return err
}
kmsID := ""
if rbdVol.Encrypted {
kmsID = rbdVol.KMS.GetID()
}
imageUUID, rbdVol.RbdImageName, err = volJournal.ReserveName(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID)
imageUUID, rbdVol.RbdImageName, err = volJournal.ReserveName(ctx, rbdVol.Monitors, cr, rbdVol.JournalPool, journalPoolID,
rbdVol.Pool, imagePoolID, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID)
if err != nil {
return err
}
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool,
rbdVol.ClusterID, imageUUID, volIDVersion)
if err != nil {
return err
@ -284,7 +337,7 @@ func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) er
// undoSnapReservation is a helper routine to undo a name reservation for rbdSnapshot
func undoSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error {
err := snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool,
err := snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.JournalPool, rbdSnap.Pool,
rbdSnap.RbdSnapName, rbdSnap.RequestName)
return err
@ -292,7 +345,7 @@ func undoSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Cre
// undoVolReservation is a helper routine to undo a name reservation for rbdVolume
func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
err := volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool,
err := volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.JournalPool, rbdVol.Pool,
rbdVol.RbdImageName, rbdVol.RequestName)
return err

View File

@ -31,6 +31,7 @@ import (
"github.com/ceph/ceph-csi/pkg/util"
"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/pborman/uuid"
@ -69,24 +70,33 @@ type rbdVolume struct {
// JSON tag as it is not stashed in JSON encoded config maps in v1.0.0
// VolName and MonValueFromSecret are retained from older plugin versions (<= 1.0.0)
// for backward compatibility reasons
RbdImageName string
NamePrefix string
VolID string `json:"volID"`
Monitors string `json:"monitors"`
Pool string `json:"pool"`
DataPool string
ImageFeatures string `json:"imageFeatures"`
AdminID string `json:"adminId"`
UserID string `json:"userId"`
Mounter string `json:"mounter"`
ClusterID string `json:"clusterId"`
RequestName string
VolName string `json:"volName"`
MonValueFromSecret string `json:"monValueFromSecret"`
VolSize int64 `json:"volSize"`
DisableInUseChecks bool `json:"disableInUseChecks"`
Encrypted bool
KMS util.EncryptionKMS
// JournalPool is the ceph pool in which the CSI Journal is stored
// Pool is where the image journal and image is stored, and could be the same as `JournalPool`
// (retained as Pool instead of renaming to ImagePool or such, as this is referenced in the code extensively)
// DataPool is where the data for images in `Pool` are stored, this is used as the `--data-pool`
// argument when the pool is created, and is not used anywhere else
TopologyPools *[]util.TopologyConstrainedPool
TopologyRequirement *csi.TopologyRequirement
Topology map[string]string
RbdImageName string
NamePrefix string
VolID string `json:"volID"`
Monitors string `json:"monitors"`
JournalPool string
Pool string `json:"pool"`
DataPool string
ImageFeatures string `json:"imageFeatures"`
AdminID string `json:"adminId"`
UserID string `json:"userId"`
Mounter string `json:"mounter"`
ClusterID string `json:"clusterId"`
RequestName string
VolName string `json:"volName"`
MonValueFromSecret string `json:"monValueFromSecret"`
VolSize int64 `json:"volSize"`
DisableInUseChecks bool `json:"disableInUseChecks"`
Encrypted bool
KMS util.EncryptionKMS
// connection
conn *rados.Conn
@ -99,12 +109,15 @@ type rbdSnapshot struct {
// RbdSnapName is the name of the RBD snapshot backing this rbdSnapshot
// SnapID is the snapshot ID that is exchanged with CSI drivers, identifying this rbdSnapshot
// RequestName is the CSI generated snapshot name for the rbdSnapshot
// JournalPool is the ceph pool in which the CSI snapshot Journal is stored
// Pool is where the image snapshot journal and snapshot is stored, and could be the same as `JournalPool`
SourceVolumeID string
RbdImageName string
NamePrefix string
RbdSnapName string
SnapID string
Monitors string
JournalPool string
Pool string
CreatedAt *timestamp.Timestamp
SizeBytes int64
@ -352,12 +365,25 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str
if err != nil {
return err
}
rbdSnap.JournalPool = rbdSnap.Pool
rbdSnap.RequestName, rbdSnap.RbdSnapName, rbdSnap.RbdImageName, _, err = snapJournal.GetObjectUUIDData(ctx, rbdSnap.Monitors,
imageAttributes, err := snapJournal.GetImageAttributes(ctx, rbdSnap.Monitors,
cr, rbdSnap.Pool, vi.ObjectUUID, true)
if err != nil {
return err
}
rbdSnap.RequestName = imageAttributes.RequestName
rbdSnap.RbdImageName = imageAttributes.SourceName
rbdSnap.RbdSnapName = imageAttributes.ImageName
// convert the journal pool ID to name, for use in DeleteSnapshot cases
if imageAttributes.JournalPoolID != util.InvalidPoolID {
rbdSnap.JournalPool, err = util.GetPoolName(ctx, rbdSnap.Monitors, cr, imageAttributes.JournalPoolID)
if err != nil {
// TODO: If pool is not found we may leak the image (as DeleteSnapshot will return success)
return err
}
}
err = updateSnapWithImageInfo(ctx, rbdSnap, cr)
@ -395,20 +421,32 @@ func genVolFromVolID(ctx context.Context, rbdVol *rbdVolume, volumeID string, cr
if err != nil {
return err
}
rbdVol.JournalPool = rbdVol.Pool
kmsID := ""
rbdVol.RequestName, rbdVol.RbdImageName, _, kmsID, err = volJournal.GetObjectUUIDData(ctx, rbdVol.Monitors, cr,
imageAttributes, err := volJournal.GetImageAttributes(ctx, rbdVol.Monitors, cr,
rbdVol.Pool, vi.ObjectUUID, false)
if err != nil {
return err
}
if kmsID != "" {
if imageAttributes.KmsID != "" {
rbdVol.Encrypted = true
rbdVol.KMS, err = util.GetKMS(kmsID, secrets)
rbdVol.KMS, err = util.GetKMS(imageAttributes.KmsID, secrets)
if err != nil {
return err
}
}
rbdVol.RequestName = imageAttributes.RequestName
rbdVol.RbdImageName = imageAttributes.ImageName
// convert the journal pool ID to name, for use in DeleteVolume cases
if imageAttributes.JournalPoolID >= 0 {
rbdVol.JournalPool, err = util.GetPoolName(ctx, rbdVol.Monitors, cr, imageAttributes.JournalPoolID)
if err != nil {
// TODO: If pool is not found we may leak the image (as DeleteVolume will return success)
return err
}
}
err = updateVolWithImageInfo(ctx, rbdVol, cr)
@ -579,6 +617,7 @@ func genSnapFromOptions(ctx context.Context, rbdVol *rbdVolume, snapOptions map[
rbdSnap := &rbdSnapshot{}
rbdSnap.Pool = rbdVol.Pool
rbdSnap.JournalPool = rbdVol.JournalPool
rbdSnap.Monitors, rbdSnap.ClusterID, err = getMonsAndClusterID(ctx, snapOptions)
if err != nil {

View File

@ -29,6 +29,9 @@ import (
"k8s.io/klog"
)
// InvalidPoolID used to denote an invalid pool
const InvalidPoolID int64 = -1
// ExecCommand executes passed in program with args and returns separate stdout and stderr streams
func ExecCommand(program string, args ...string) (stdout, stderr []byte, err error) {
var (
@ -117,6 +120,26 @@ func GetPoolName(ctx context.Context, monitors string, cr *Credentials, poolID i
return "", ErrPoolNotFound{string(poolID), fmt.Errorf("pool ID (%d) not found in Ceph cluster", poolID)}
}
// GetPoolIDs searches a list of pools in a cluster and returns the IDs of the pools that matches
// the passed in pools
// TODO this should take in a list and return a map[string(poolname)]int64(poolID)
func GetPoolIDs(ctx context.Context, monitors, journalPool, imagePool string, cr *Credentials) (int64, int64, error) {
journalPoolID, err := GetPoolID(ctx, monitors, cr, journalPool)
if err != nil {
return InvalidPoolID, InvalidPoolID, err
}
imagePoolID := journalPoolID
if imagePool != journalPool {
imagePoolID, err = GetPoolID(ctx, monitors, cr, imagePool)
if err != nil {
return InvalidPoolID, InvalidPoolID, err
}
}
return journalPoolID, imagePoolID, nil
}
// SetOMapKeyValue sets the given key and value into the provided Ceph omap name
func SetOMapKeyValue(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey, keyValue string) error {
// Command: "rados <options> setomapval oMapName oMapKey keyValue"

255
pkg/util/topology.go Normal file
View File

@ -0,0 +1,255 @@
/*
Copyright 2020 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/container-storage-interface/spec/lib/go/csi"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog"
)
const (
keySeparator rune = '/'
labelSeparator string = ","
)
func k8sGetNodeLabels(nodeName string) (map[string]string, error) {
client := NewK8sClient()
node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get node (%s) information : %v", nodeName, err)
}
return node.GetLabels(), nil
}
// GetTopologyFromDomainLabels returns the CSI topology map, determined from
// the domain labels and their values from the CO system
// Expects domainLabels in arg to be in the format "[prefix/]<name>,[prefix/]<name>,...",
func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map[string]string, error) {
if domainLabels == "" {
return nil, nil
}
// size checks on domain label prefix
topologyPrefix := strings.ToLower("topology." + driverName)
if len(topologyPrefix) > 63 {
return nil, fmt.Errorf("computed topology label prefix (%s) for node exceeds length limits", topologyPrefix)
}
// driverName is validated, and we are adding a lowercase "topology." to it, so no validation for conformance
// Convert passed in labels to a map, and check for uniqueness
labelsToRead := strings.SplitN(domainLabels, labelSeparator, -1)
klog.Infof("passed in node labels for processing : %+v", labelsToRead)
labelsIn := make(map[string]bool)
labelCount := 0
for _, label := range labelsToRead {
// as we read the labels from k8s, and check for missing labels,
// no label conformance checks here
if _, ok := labelsIn[label]; ok {
return nil, fmt.Errorf("duplicate label (%s) found in domain labels", label)
}
labelsIn[label] = true
labelCount++
}
nodeLabels, err := k8sGetNodeLabels(nodeName)
if err != nil {
return nil, err
}
// Determine values for requested labels from node labels
domainMap := make(map[string]string)
found := 0
for key, value := range nodeLabels {
if _, ok := labelsIn[key]; !ok {
continue
}
// label found split name component and store value
nameIdx := strings.IndexRune(key, keySeparator)
domain := key[nameIdx+1:]
domainMap[domain] = value
labelsIn[key] = false
found++
}
// Ensure all labels are found
if found != labelCount {
missingLabels := []string{}
for key, missing := range labelsIn {
if missing {
missingLabels = append(missingLabels, key)
}
}
return nil, fmt.Errorf("missing domain labels %v on node (%s)", missingLabels, nodeName)
}
klog.Infof("list of domains processed : %+v", domainMap)
topology := make(map[string]string)
for domain, value := range domainMap {
topology[topologyPrefix+"/"+domain] = value
// TODO: when implementing domain takeover/giveback, enable a domain value that can remain pinned to the node
// topology["topology."+driverName+"/"+domain+"-pinned"] = value
}
return topology, nil
}
type topologySegment struct {
DomainLabel string `json:"domainLabel"`
DomainValue string `json:"value"`
}
// TopologyConstrainedPool stores the pool name and a list of its associated topology domain values
type TopologyConstrainedPool struct {
PoolName string `json:"poolName"`
DomainSegments []topologySegment `json:"domainSegments"`
}
// GetTopologyFromRequest extracts TopologyConstrainedPools and passed in accessibility constraints
// from a CSI CreateVolume request
func GetTopologyFromRequest(req *csi.CreateVolumeRequest) (*[]TopologyConstrainedPool, *csi.TopologyRequirement, error) {
var (
topologyPools []TopologyConstrainedPool
)
// check if parameters have pool configuration pertaining to topology
topologyPoolsStr := req.GetParameters()["topologyConstrainedPools"]
if topologyPoolsStr == "" {
return nil, nil, nil
}
// check if there are any accessibility requirements in the request
accessibilityRequirements := req.GetAccessibilityRequirements()
if accessibilityRequirements == nil {
return nil, nil, nil
}
// extract topology based pools configuration
err := json.Unmarshal([]byte(strings.Replace(topologyPoolsStr, "\n", " ", -1)), &topologyPools)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse JSON encoded topology constrained pools parameter (%s): %v", topologyPoolsStr, err)
}
return &topologyPools, accessibilityRequirements, nil
}
// MatchTopologyForPool returns the topology map, if the passed in pool matches any
// passed in accessibility constraints
func MatchTopologyForPool(topologyPools *[]TopologyConstrainedPool,
accessibilityRequirements *csi.TopologyRequirement, poolName string) (map[string]string, error) {
var topologyPool []TopologyConstrainedPool
if topologyPools == nil || accessibilityRequirements == nil {
return nil, nil
}
// find the pool in the list of topology based pools
for _, value := range *topologyPools {
if value.PoolName == poolName {
topologyPool = append(topologyPool, value)
break
}
}
if len(topologyPool) == 0 {
return nil, fmt.Errorf("none of the configured topology pools (%+v) matched passed in pool name (%s)",
topologyPools, poolName)
}
_, topology, err := FindPoolAndTopology(&topologyPool, accessibilityRequirements)
return topology, err
}
// FindPoolAndTopology loops through passed in "topologyPools" and also related
// accessibility requirements, to determine which pool matches the requirement.
// The return variables are, image poolname, data poolname, and topology map of
// matched requirement
func FindPoolAndTopology(topologyPools *[]TopologyConstrainedPool,
accessibilityRequirements *csi.TopologyRequirement) (string, map[string]string, error) {
if topologyPools == nil || accessibilityRequirements == nil {
return "", nil, nil
}
// select pool that fits first topology constraint preferred requirements
for _, topology := range accessibilityRequirements.GetPreferred() {
poolName := matchPoolToTopology(topologyPools, topology)
if poolName != "" {
return poolName, topology.GetSegments(), nil
}
}
// If preferred mismatches, check requisite for a fit
for _, topology := range accessibilityRequirements.GetRequisite() {
poolName := matchPoolToTopology(topologyPools, topology)
if poolName != "" {
return poolName, topology.GetSegments(), nil
}
}
return "", nil, fmt.Errorf("none of the topology constrained pools matched requested "+
"topology constraints : pools (%+v) requested topology (%+v)",
*topologyPools, *accessibilityRequirements)
}
// matchPoolToTopology loops through passed in pools, and for each pool checks if all
// requested topology segments are present and match the request, returning the first pool
// that hence matches (or an empty string if none match)
func matchPoolToTopology(topologyPools *[]TopologyConstrainedPool, topology *csi.Topology) string {
domainMap := extractDomainsFromlabels(topology)
// check if any pool matches all the domain keys and values
for _, topologyPool := range *topologyPools {
mismatch := false
// match all pool topology labels to requested topology
for _, segment := range topologyPool.DomainSegments {
if domainValue, ok := domainMap[segment.DomainLabel]; !ok || domainValue != segment.DomainValue {
mismatch = true
break
}
}
if mismatch {
continue
}
return topologyPool.PoolName
}
return ""
}
// extractDomainsFromlabels returns the domain name map, from passed in domain segments,
// which is of the form [prefix/]<name>
func extractDomainsFromlabels(topology *csi.Topology) map[string]string {
domainMap := make(map[string]string)
for domainKey, value := range topology.GetSegments() {
domainIdx := strings.IndexRune(domainKey, keySeparator)
domain := domainKey[domainIdx+1:]
domainMap[domain] = value
}
return domainMap
}

View File

@ -81,6 +81,7 @@ type Config struct {
InstanceID string // unique ID distinguishing this instance of Ceph CSI
MetadataStorage string // metadata persistence method [node|k8s_configmap]
PluginPath string // location of cephcsi plugin
DomainLabels string // list of domain labels to read from the node
// cephfs related flags
MountCacheDir string // mount info cache save dir
@ -147,15 +148,19 @@ func ValidateDriverName(driverName string) error {
// GenerateVolID generates a volume ID based on passed in parameters and version, to be returned
// to the CO system
func GenerateVolID(ctx context.Context, monitors string, cr *Credentials, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) {
poolID, err := GetPoolID(ctx, monitors, cr, pool)
if err != nil {
return "", err
func GenerateVolID(ctx context.Context, monitors string, cr *Credentials, locationID int64, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) {
var err error
if locationID == InvalidPoolID {
locationID, err = GetPoolID(ctx, monitors, cr, pool)
if err != nil {
return "", err
}
}
// generate the volume ID to return to the CO system
vi := CSIIdentifier{
LocationID: poolID,
LocationID: locationID,
EncodingVersion: volIDVersion,
ClusterID: clusterID,
ObjectUUID: objUUID,

View File

@ -18,13 +18,19 @@ package util
import (
"context"
"encoding/binary"
"encoding/hex"
"fmt"
"strings"
"github.com/pborman/uuid"
"github.com/pkg/errors"
"k8s.io/klog"
)
// Length of string representation of uuid, xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx is 36 bytes
const uuidEncodedLength = 36
/*
RADOS omaps usage:
@ -103,6 +109,7 @@ const (
defaultSnapshotNamingPrefix string = "csi-snap-"
)
// CSIJournal defines the interface and the required key names for the above RADOS based OMaps
type CSIJournal struct {
// csiDirectory is the name of the CSI volumes object map that contains CSI volume-name (or
// snapshot name) based keys
@ -122,6 +129,10 @@ type CSIJournal struct {
// of this Ceph volume
csiImageKey string
// pool ID where csiDirectory is maintained, as it can be different from where the ceph volume
// object map is maintained, during topology based provisioning
csiJournalPool string
// source volume name key in per Ceph snapshot object map, containing Ceph source volume uuid
// for which the snapshot was created
cephSnapSourceKey string
@ -141,6 +152,7 @@ func NewCSIVolumeJournal() *CSIJournal {
cephUUIDDirectoryPrefix: "csi.volume.",
csiNameKey: "csi.volname",
csiImageKey: "csi.imagename",
csiJournalPool: "csi.journalpool",
cephSnapSourceKey: "",
namespace: "",
encryptKMSKey: "csi.volume.encryptKMS",
@ -155,6 +167,7 @@ func NewCSISnapshotJournal() *CSIJournal {
cephUUIDDirectoryPrefix: "csi.snap.",
csiNameKey: "csi.snapname",
csiImageKey: "csi.imagename",
csiJournalPool: "csi.journalpool",
cephSnapSourceKey: "csi.source",
namespace: "",
encryptKMSKey: "csi.volume.encryptKMS",
@ -183,6 +196,14 @@ func (cj *CSIJournal) SetNamespace(ns string) {
cj.namespace = ns
}
// ImageData contains image name and stored CSI properties
type ImageData struct {
ImageUUID string
ImagePool string
ImagePoolID int64
ImageAttributes *ImageAttributes
}
/*
CheckReservation checks if given request name contains a valid reservation
- If there is a valid reservation, then the corresponding UUID for the volume/snapshot is returned
@ -198,71 +219,114 @@ Return values:
there was no reservation found
- error: non-nil in case of any errors
*/
func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials, pool, reqName, namePrefix, parentName, kmsConf string) (string, error) {
var snapSource bool
func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials,
journalPool, reqName, namePrefix, parentName, kmsConfig string) (*ImageData, error) {
var (
snapSource bool
objUUID string
savedImagePool string
savedImagePoolID int64 = InvalidPoolID
)
if parentName != "" {
if cj.cephSnapSourceKey == "" {
err := errors.New("invalid request, cephSnapSourceKey is nil")
return "", err
return nil, err
}
snapSource = true
}
// check if request name is already part of the directory omap
objUUID, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory,
objUUIDAndPool, err := GetOMapValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName)
if err != nil {
// error should specifically be not found, for volume to be absent, any other error
// is not conclusive, and we should not proceed
switch err.(type) {
case ErrKeyNotFound, ErrPoolNotFound:
return "", nil
return nil, nil
}
return "", err
return nil, err
}
savedReqName, _, savedReqParentName, savedKms, err := cj.GetObjectUUIDData(ctx, monitors, cr, pool,
// check UUID only encoded value
if len(objUUIDAndPool) == uuidEncodedLength {
objUUID = objUUIDAndPool
savedImagePool = journalPool
} else { // check poolID/UUID encoding; extract the vol UUID and pool name
var buf64 []byte
components := strings.Split(objUUIDAndPool, "/")
objUUID = components[1]
savedImagePoolIDStr := components[0]
buf64, err = hex.DecodeString(savedImagePoolIDStr)
if err != nil {
return nil, err
}
savedImagePoolID = int64(binary.BigEndian.Uint64(buf64))
savedImagePool, err = GetPoolName(ctx, monitors, cr, savedImagePoolID)
if err != nil {
if _, ok := err.(ErrPoolNotFound); ok {
err = cj.UndoReservation(ctx, monitors, cr, journalPool, "", "", reqName)
}
return nil, err
}
}
savedImageAttributes, err := cj.GetImageAttributes(ctx, monitors, cr, savedImagePool,
objUUID, snapSource)
if err != nil {
// error should specifically be not found, for image to be absent, any other error
// is not conclusive, and we should not proceed
if _, ok := err.(ErrKeyNotFound); ok {
err = cj.UndoReservation(ctx, monitors, cr, pool, cj.GetNameForUUID(namePrefix, objUUID, snapSource), reqName)
err = cj.UndoReservation(ctx, monitors, cr, journalPool, savedImagePool,
cj.GetNameForUUID(namePrefix, objUUID, snapSource), reqName)
}
return "", err
return nil, err
}
// check if UUID key points back to the request name
if savedReqName != reqName {
if savedImageAttributes.RequestName != reqName {
// NOTE: This should never be possible, hence no cleanup, but log error
// and return, as cleanup may need to occur manually!
return "", fmt.Errorf("internal state inconsistent, omap names mismatch,"+
return nil, fmt.Errorf("internal state inconsistent, omap names mismatch,"+
" request name (%s) volume UUID (%s) volume omap name (%s)",
reqName, objUUID, savedReqName)
reqName, objUUID, savedImageAttributes.RequestName)
}
if kmsConf != "" {
if savedKms != kmsConf {
return "", fmt.Errorf("internal state inconsistent, omap encryption KMS"+
if kmsConfig != "" {
if savedImageAttributes.KmsID != kmsConfig {
return nil, fmt.Errorf("internal state inconsistent, omap encryption KMS"+
" mismatch, request KMS (%s) volume UUID (%s) volume omap KMS (%s)",
kmsConf, objUUID, savedKms)
kmsConfig, objUUID, savedImageAttributes.KmsID)
}
}
// TODO: skipping due to excessive poolID to poolname call, also this should never happen!
// check if journal pool points back to the passed in journal pool
// if savedJournalPoolID != journalPoolID {
if snapSource {
// check if source UUID key points back to the parent volume passed in
if savedReqParentName != parentName {
if savedImageAttributes.SourceName != parentName {
// NOTE: This can happen if there is a snapname conflict, and we already have a snapshot
// with the same name pointing to a different UUID as the source
err = fmt.Errorf("snapname points to different volume, request name (%s)"+
" source name (%s) saved source name (%s)",
reqName, parentName, savedReqParentName)
return "", ErrSnapNameConflict{reqName, err}
reqName, parentName, savedImageAttributes.SourceName)
return nil, ErrSnapNameConflict{reqName, err}
}
}
return objUUID, nil
imageData := &ImageData{
ImageUUID: objUUID,
ImagePool: savedImagePool,
ImagePoolID: savedImagePoolID,
ImageAttributes: savedImageAttributes,
}
return imageData, nil
}
/*
@ -274,30 +338,37 @@ prior to cleaning up the reservation
NOTE: As the function manipulates omaps, it should be called with a lock against the request name
held, to prevent parallel operations from modifying the state of the omaps for this request name.
Input arguments:
- csiJournalPool: Pool name that holds the CSI request name based journal
- volJournalPool: Pool name that holds the image/subvolume and the per-image journal (may be
different if image is created in a topology constrained pool)
*/
func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials, pool, volName, reqName string) error {
func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials,
csiJournalPool, volJournalPool, volName, reqName string) error {
// delete volume UUID omap (first, inverse of create order)
// TODO: Check cases where volName can be empty, and we need to just cleanup the reqName
if len(volName) < 36 {
return fmt.Errorf("unable to parse UUID from %s, too short", volName)
}
if volName != "" {
if len(volName) < 36 {
return fmt.Errorf("unable to parse UUID from %s, too short", volName)
}
imageUUID := volName[len(volName)-36:]
if valid := uuid.Parse(imageUUID); valid == nil {
return fmt.Errorf("failed parsing UUID in %s", volName)
}
imageUUID := volName[len(volName)-36:]
if valid := uuid.Parse(imageUUID); valid == nil {
return fmt.Errorf("failed parsing UUID in %s", volName)
}
err := RemoveObject(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
if err != nil {
if _, ok := err.(ErrObjectNotFound); !ok {
klog.Errorf(Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err)
return err
err := RemoveObject(ctx, monitors, cr, volJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
if err != nil {
if _, ok := err.(ErrObjectNotFound); !ok {
klog.Errorf(Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err)
return err
}
}
}
// delete the request name key (last, inverse of create order)
err = RemoveOMapKey(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory,
err := RemoveOMapKey(ctx, monitors, cr, csiJournalPool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName)
if err != nil {
klog.Errorf(Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err)
@ -346,13 +417,31 @@ pointers to the CSI generated request names.
NOTE: As the function manipulates omaps, it should be called with a lock against the request name
held, to prevent parallel operations from modifying the state of the omaps for this request name.
Input arguments:
- journalPool: Pool where the CSI journal is stored (maybe different than the pool where the
image/subvolume is created duw to topology constraints)
- journalPoolID: pool ID of the journalPool
- imagePool: Pool where the image/subvolume is created
- imagePoolID: pool ID of the imagePool
- reqName: Name of the volume request received
- namePrefix: Prefix to use when generating the image/subvolume name (suffix is an auto-genetated UUID)
- parentName: Name of the parent image/subvolume if reservation is for a snapshot (optional)
- kmsConf: Name of the key management service used to encrypt the image (optional)
Return values:
- string: Contains the UUID that was reserved for the passed in reqName
- string: Contains the image name that was reserved for the passed in reqName
- error: non-nil in case of any errors
*/
func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials, pool, reqName, namePrefix, parentName, kmsConf string) (string, string, error) {
var snapSource bool
func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials,
journalPool string, journalPoolID int64,
imagePool string, imagePoolID int64,
reqName, namePrefix, parentName, kmsConf string) (string, string, error) {
// TODO: Take in-arg as ImageAttributes?
var (
snapSource bool
nameKeyVal string
)
if parentName != "" {
if cj.cephSnapSourceKey == "" {
@ -366,54 +455,81 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred
// NOTE: If any service loss occurs post creation of the UUID directory, and before
// setting the request name key (csiNameKey) to point back to the UUID directory, the
// UUID directory key will be leaked
volUUID, err := reserveOMapName(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix)
volUUID, err := reserveOMapName(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix)
if err != nil {
return "", "", err
}
imageName := cj.GetNameForUUID(namePrefix, volUUID, snapSource)
// Create request name (csiNameKey) key in csiDirectory and store the UUId based
// volume name into it
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName, volUUID)
// Create request name (csiNameKey) key in csiDirectory and store the UUID based
// volume name and optionally the image pool location into it
if journalPool != imagePool && imagePoolID != InvalidPoolID {
buf64 := make([]byte, 8)
binary.BigEndian.PutUint64(buf64, uint64(imagePoolID))
poolIDEncodedHex := hex.EncodeToString(buf64)
nameKeyVal = poolIDEncodedHex + "/" + volUUID
} else {
nameKeyVal = volUUID
}
err = SetOMapKeyValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName, nameKeyVal)
if err != nil {
return "", "", err
}
defer func() {
if err != nil {
klog.Warningf(Log(ctx, "reservation failed for volume: %s"), reqName)
errDefer := cj.UndoReservation(ctx, monitors, cr, pool, imageName, reqName)
errDefer := cj.UndoReservation(ctx, monitors, cr, imagePool, journalPool, imageName, reqName)
if errDefer != nil {
klog.Warningf(Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer)
}
}
}()
// Update UUID directory to store CSI request name and image name
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
// NOTE: UUID directory is stored on the same pool as the image, helps determine image attributes
// and also CSI journal pool, when only the VolumeID is passed in (e.g DeleteVolume/DeleteSnapshot,
// VolID during CreateSnapshot).
// Update UUID directory to store CSI request name
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiNameKey, reqName)
if err != nil {
return "", "", err
}
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
// Update UUID directory to store image name
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiImageKey, imageName)
if err != nil {
return "", "", err
}
// Update UUID directory to store encryption values
if kmsConf != "" {
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.encryptKMSKey, kmsConf)
if err != nil {
return "", "", err
}
}
if journalPool != imagePool && journalPoolID != InvalidPoolID {
buf64 := make([]byte, 8)
binary.BigEndian.PutUint64(buf64, uint64(journalPoolID))
journalPoolIDStr := hex.EncodeToString(buf64)
// Update UUID directory to store CSI journal pool name (prefer ID instead of name to be pool rename proof)
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiJournalPool, journalPoolIDStr)
if err != nil {
return "", "", err
}
}
if snapSource {
// Update UUID directory to store source volume UUID in case of snapshots
err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.cephSnapSourceKey, parentName)
if err != nil {
return "", "", err
@ -423,70 +539,89 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred
return volUUID, imageName, nil
}
/*
GetObjectUUIDData fetches all keys from a UUID directory
Return values:
- string: Contains the request name for the passed in UUID
- string: Contains the rbd image name for the passed in UUID
- string: Contains the parent image name for the passed in UUID, if it is a snapshot
- string: Contains encryption KMS, if it is an encrypted image
- error: non-nil in case of any errors
*/
func (cj *CSIJournal) GetObjectUUIDData(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (string, string, string, string, error) {
var sourceName string
// ImageAttributes contains all CSI stored image attributes, typically as OMap keys
type ImageAttributes struct {
RequestName string // Contains the request name for the passed in UUID
SourceName string // Contains the parent image name for the passed in UUID, if it is a snapshot
ImageName string // Contains the image or subvolume name for the passed in UUID
KmsID string // Contains encryption KMS, if it is an encrypted image
JournalPoolID int64 // Pool ID of the CSI journal pool, stored in big endian format (on-disk data)
}
// GetImageAttributes fetches all keys and their values, from a UUID directory, returning ImageAttributes structure
func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (*ImageAttributes, error) {
var (
err error
imageAttributes *ImageAttributes = &ImageAttributes{}
)
if snapSource && cj.cephSnapSourceKey == "" {
err := errors.New("invalid request, cephSnapSourceKey is nil")
return "", "", "", "", err
err = errors.New("invalid request, cephSnapSourceKey is nil")
return nil, err
}
// TODO: fetch all omap vals in one call, than make multiple listomapvals
requestName, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
imageAttributes.RequestName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey)
if err != nil {
return "", "", "", "", err
return nil, err
}
// image key was added at some point, so not all volumes will have this key set
// when ceph-csi was upgraded
imageName, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
imageAttributes.ImageName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiImageKey)
if err != nil {
// if the key was not found, assume the default key + UUID
// otherwise return error
switch err.(type) {
default:
return "", "", "", "", err
return nil, err
case ErrKeyNotFound, ErrPoolNotFound:
}
if snapSource {
imageName = defaultSnapshotNamingPrefix + objectUUID
imageAttributes.ImageName = defaultSnapshotNamingPrefix + objectUUID
} else {
imageName = defaultVolumeNamingPrefix + objectUUID
imageAttributes.ImageName = defaultVolumeNamingPrefix + objectUUID
}
}
encryptionKmsConfig := ""
encryptionKmsConfig, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
imageAttributes.KmsID, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.encryptKMSKey)
if err != nil {
// ErrKeyNotFound means no encryption KMS was used
switch err.(type) {
default:
return "", "", "", "", fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s",
return nil, fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s",
pool, cj.cephUUIDDirectoryPrefix+objectUUID, err)
case ErrKeyNotFound, ErrPoolNotFound:
}
}
journalPoolIDStr, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiJournalPool)
if err != nil {
if _, ok := err.(ErrKeyNotFound); !ok {
return nil, err
}
imageAttributes.JournalPoolID = InvalidPoolID
} else {
var buf64 []byte
buf64, err = hex.DecodeString(journalPoolIDStr)
if err != nil {
return nil, err
}
imageAttributes.JournalPoolID = int64(binary.BigEndian.Uint64(buf64))
}
if snapSource {
sourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
imageAttributes.SourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey)
if err != nil {
return "", "", "", "", err
return nil, err
}
}
return requestName, imageName, sourceName, encryptionKmsConfig, nil
return imageAttributes, nil
}