From 5c4abf8347fde76376b6eb08741460aba28976b2 Mon Sep 17 00:00:00 2001 From: ShyamsundarR Date: Fri, 24 Jan 2020 11:26:56 -0500 Subject: [PATCH] Add topology support to ceph-csi Signed-off-by: ShyamsundarR --- cmd/cephcsi.go | 2 + .../csi-cephfsplugin-provisioner.yaml | 1 + .../cephfs/kubernetes/csi-cephfsplugin.yaml | 5 + .../kubernetes/csi-nodeplugin-rbac.yaml | 34 +++ .../kubernetes/csi-provisioner-rbac.yaml | 6 + .../rbd/kubernetes/csi-nodeplugin-rbac.yaml | 34 +++ .../rbd/kubernetes/csi-provisioner-rbac.yaml | 6 + .../kubernetes/csi-rbdplugin-provisioner.yaml | 1 + deploy/rbd/kubernetes/csi-rbdplugin.yaml | 5 + docs/deploy-cephfs.md | 9 +- docs/deploy-rbd.md | 9 +- e2e/rbd.go | 2 +- examples/rbd/storageclass.yaml | 23 ++ pkg/cephfs/controllerserver.go | 44 ++- pkg/cephfs/driver.go | 24 +- pkg/cephfs/fsjournal.go | 71 +++-- pkg/cephfs/identityserver.go | 7 + pkg/cephfs/volumeoptions.go | 54 +++- pkg/csi-common/driver.go | 6 +- pkg/csi-common/nodeserver-default.go | 7 +- pkg/csi-common/utils.go | 3 +- pkg/rbd/controllerserver.go | 136 +++++---- pkg/rbd/driver.go | 17 +- pkg/rbd/identityserver.go | 7 + pkg/rbd/nodeserver.go | 9 +- pkg/rbd/rbd_journal.go | 109 +++++-- pkg/rbd/rbd_util.go | 85 ++++-- pkg/util/cephcmds.go | 23 ++ pkg/util/topology.go | 255 ++++++++++++++++ pkg/util/util.go | 15 +- pkg/util/voljournal.go | 281 +++++++++++++----- 31 files changed, 1017 insertions(+), 273 deletions(-) create mode 100644 pkg/util/topology.go diff --git a/cmd/cephcsi.go b/cmd/cephcsi.go index 83e8986ed..90f50ff6d 100644 --- a/cmd/cephcsi.go +++ b/cmd/cephcsi.go @@ -58,6 +58,8 @@ func init() { flag.IntVar(&conf.PidLimit, "pidlimit", 0, "the PID limit to configure through cgroups") flag.BoolVar(&conf.IsControllerServer, "controllerserver", false, "start cephcsi controller server") flag.BoolVar(&conf.IsNodeServer, "nodeserver", false, "start cephcsi node server") + flag.StringVar(&conf.DomainLabels, "domainlabels", "", "list of kubernetes node labels, that determines the topology"+ + " domain the node belongs to, separated by ','") // cephfs related flags // marking this as deprecated, remove it in next major release diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml index c9298beb8..70b6c1497 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin-provisioner.yaml @@ -40,6 +40,7 @@ spec: - "--enable-leader-election=true" - "--leader-election-type=leases" - "--retry-interval-start=500ms" + - "--feature-gates=Topology=true" env: - name: ADDRESS value: unix:///csi/csi-provisioner.sock diff --git a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml index cdc851038..adea0ff0c 100644 --- a/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml +++ b/deploy/cephfs/kubernetes/csi-cephfsplugin.yaml @@ -63,6 +63,11 @@ spec: - "--v=5" - "--drivername=cephfs.csi.ceph.com" - "--metadatastorage=k8s_configmap" + # If topology based provisioning is desired, configure required + # node labels representing the nodes topology domain + # and pass the label names below, for CSI to consume and advertize + # its equivalent topology domain + # - "--domainlabels=failure-domain/region,failure-domain/zone" env: - name: POD_IP valueFrom: diff --git a/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml b/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml index a1ee7d1a0..b8d561763 100644 --- a/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml +++ b/deploy/cephfs/kubernetes/csi-nodeplugin-rbac.yaml @@ -3,3 +3,37 @@ apiVersion: v1 kind: ServiceAccount metadata: name: cephfs-csi-nodeplugin +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: cephfs-csi-nodeplugin +aggregationRule: + clusterRoleSelectors: + - matchLabels: + rbac.cephfs.csi.ceph.com/aggregate-to-cephfs-csi-nodeplugin: "true" +rules: [] +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: cephfs-csi-nodeplugin-rules + labels: + rbac.cephfs.csi.ceph.com/aggregate-to-cephfs-csi-nodeplugin: "true" +rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: cephfs-csi-nodeplugin +subjects: + - kind: ServiceAccount + name: cephfs-csi-nodeplugin + namespace: default +roleRef: + kind: ClusterRole + name: cephfs-csi-nodeplugin + apiGroup: rbac.authorization.k8s.io diff --git a/deploy/cephfs/kubernetes/csi-provisioner-rbac.yaml b/deploy/cephfs/kubernetes/csi-provisioner-rbac.yaml index 3eeeb8493..536038297 100644 --- a/deploy/cephfs/kubernetes/csi-provisioner-rbac.yaml +++ b/deploy/cephfs/kubernetes/csi-provisioner-rbac.yaml @@ -22,6 +22,9 @@ metadata: labels: rbac.cephfs.csi.ceph.com/aggregate-to-cephfs-external-provisioner-runner: "true" rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", watch"] - apiGroups: [""] resources: ["secrets"] verbs: ["get", "list"] @@ -43,6 +46,9 @@ rules: - apiGroups: [""] resources: ["persistentvolumeclaims/status"] verbs: ["update", "patch"] + - apiGroups: ["storage.k8s.io"] + resources: ["csinodes"] + verbs: ["get", "list", "watch"] --- kind: ClusterRoleBinding diff --git a/deploy/rbd/kubernetes/csi-nodeplugin-rbac.yaml b/deploy/rbd/kubernetes/csi-nodeplugin-rbac.yaml index c36d11510..96a553d1b 100644 --- a/deploy/rbd/kubernetes/csi-nodeplugin-rbac.yaml +++ b/deploy/rbd/kubernetes/csi-nodeplugin-rbac.yaml @@ -3,3 +3,37 @@ apiVersion: v1 kind: ServiceAccount metadata: name: rbd-csi-nodeplugin +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: rbd-csi-nodeplugin +aggregationRule: + clusterRoleSelectors: + - matchLabels: + rbac.rbd.csi.ceph.com/aggregate-to-rbd-csi-nodeplugin: "true" +rules: [] +--- +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: rbd-csi-nodeplugin-rules + labels: + rbac.rbd.csi.ceph.com/aggregate-to-rbd-csi-nodeplugin: "true" +rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get"] +--- +kind: ClusterRoleBinding +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: rbd-csi-nodeplugin +subjects: + - kind: ServiceAccount + name: rbd-csi-nodeplugin + namespace: default +roleRef: + kind: ClusterRole + name: rbd-csi-nodeplugin + apiGroup: rbac.authorization.k8s.io diff --git a/deploy/rbd/kubernetes/csi-provisioner-rbac.yaml b/deploy/rbd/kubernetes/csi-provisioner-rbac.yaml index b37fedbff..a0849681d 100644 --- a/deploy/rbd/kubernetes/csi-provisioner-rbac.yaml +++ b/deploy/rbd/kubernetes/csi-provisioner-rbac.yaml @@ -22,6 +22,9 @@ metadata: labels: rbac.rbd.csi.ceph.com/aggregate-to-rbd-external-provisioner-runner: "true" rules: + - apiGroups: [""] + resources: ["nodes"] + verbs: ["get", "list", "watch"] - apiGroups: [""] resources: ["secrets"] verbs: ["get", "list"] @@ -52,6 +55,9 @@ rules: - apiGroups: ["storage.k8s.io"] resources: ["volumeattachments"] verbs: ["get", "list", "watch", "update", "patch"] + - apiGroups: ["storage.k8s.io"] + resources: ["csinodes"] + verbs: ["get", "list", "watch"] - apiGroups: ["snapshot.storage.k8s.io"] resources: ["volumesnapshots/status"] verbs: ["update"] diff --git a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml index 4f0f571f5..2dca77c43 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin-provisioner.yaml @@ -40,6 +40,7 @@ spec: - "--retry-interval-start=500ms" - "--enable-leader-election=true" - "--leader-election-type=leases" + - "--feature-gates=Topology=true" env: - name: ADDRESS value: unix:///csi/csi-provisioner.sock diff --git a/deploy/rbd/kubernetes/csi-rbdplugin.yaml b/deploy/rbd/kubernetes/csi-rbdplugin.yaml index 71dc47fea..8a4ca5eae 100644 --- a/deploy/rbd/kubernetes/csi-rbdplugin.yaml +++ b/deploy/rbd/kubernetes/csi-rbdplugin.yaml @@ -63,6 +63,11 @@ spec: - "--endpoint=$(CSI_ENDPOINT)" - "--v=5" - "--drivername=rbd.csi.ceph.com" + # If topology based provisioning is desired, configure required + # node labels representing the nodes topology domain + # and pass the label names below, for CSI to consume and advertize + # its equivalent topology domain + # - "--domainlabels=failure-domain/region,failure-domain/zone" env: - name: POD_IP valueFrom: diff --git a/docs/deploy-cephfs.md b/docs/deploy-cephfs.md index 64c0ee825..43d6ea260 100644 --- a/docs/deploy-cephfs.md +++ b/docs/deploy-cephfs.md @@ -44,7 +44,7 @@ that should be resolved in v14.2.3. **Available command line arguments:** | Option | Default value | Description | -| ------------------------- | --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +|---------------------------|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket | | `--drivername` | `cephfs.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) | | `--nodeid` | _empty_ | This node's ID | @@ -53,13 +53,14 @@ that should be resolved in v14.2.3. | `--pluginpath` | "/var/lib/kubelet/plugins/" | The location of cephcsi plugin on host | | `--metadatastorage` | _empty_ | Points to where older (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) | | `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. | -| `--metricsport` | `8080` | TCP port for liveness metrics requests | +| `--metricsport` | `8080` | TCP port for liveness metrics requests | | `--metricspath` | `/metrics` | Path of prometheus endpoint where metrics will be available | -| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server | +| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server | | `--polltime` | `60s` | Time interval in between each poll | | `--timeout` | `3s` | Probe timeout in seconds | -| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) | +| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) | | `--forcecephkernelclient` | `false` | Force enabling Ceph Kernel clients for mounting on kernels < 4.17 | +| `--domainlabels` | _empty_ | Kubernetes node labels to use as CSI domain labels for topology aware provisioning, should be a comma separated value (ex:= "failure-domain/region,failure-domain/zone") | **NOTE:** The parameter `-forcecephkernelclient` enables the Kernel CephFS mounter on kernels < 4.17. diff --git a/docs/deploy-rbd.md b/docs/deploy-rbd.md index 99426aff6..462a30f16 100644 --- a/docs/deploy-rbd.md +++ b/docs/deploy-rbd.md @@ -27,7 +27,7 @@ make image-cephcsi **Available command line arguments:** | Option | Default value | Description | -| --------------------- | --------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +|-----------------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket | | `--drivername` | `rbd.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) | | `--nodeid` | _empty_ | This node's ID | @@ -35,12 +35,13 @@ make image-cephcsi | `--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning | | `--metadatastorage` | _empty_ | Points to where legacy (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively) | | `--pidlimit` | _0_ | Configure the PID limit in cgroups. The container runtime can restrict the number of processes/tasks which can cause problems while provisioning (or deleting) a large number of volumes. A value of `-1` configures the limit to the maximum, `0` does not configure limits at all. | -| `--metricsport` | `8080` | TCP port for liveness metrics requests | +| `--metricsport` | `8080` | TCP port for liveness metrics requests | | `--metricspath` | `"/metrics"` | Path of prometheus endpoint where metrics will be available | -| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server | +| `--enablegrpcmetrics` | `false` | [Deprecated] Enable grpc metrics collection and start prometheus server | | `--polltime` | `"60s"` | Time interval in between each poll | | `--timeout` | `"3s"` | Probe timeout in seconds | -| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) | +| `--histogramoption` | `0.5,2,6` | [Deprecated] Histogram option for grpc metrics, should be comma separated value (ex:= "0.5,2,6" where start=0.5 factor=2, count=6) | +| `--domainlabels` | _empty_ | Kubernetes node labels to use as CSI domain labels for topology aware provisioning, should be a comma separated value (ex:= "failure-domain/region,failure-domain/zone") | **Available volume parameters:** diff --git a/e2e/rbd.go b/e2e/rbd.go index 281a2a307..7222e7e13 100644 --- a/e2e/rbd.go +++ b/e2e/rbd.go @@ -293,7 +293,7 @@ var _ = Describe("RBD", func() { // validate created backend rbd images images := listRBDImages(f) if len(images) != totalCount { - e2elog.Logf("backend image creation not matching pvc count, image count = % pvc count %d", len(images), totalCount) + e2elog.Logf("backend image creation not matching pvc count, image count = %d pvc count %d images found = %+v", len(images), totalCount, images) Fail("validate multiple pvc failed") } diff --git a/examples/rbd/storageclass.yaml b/examples/rbd/storageclass.yaml index b5c58f2ac..61b179a2b 100644 --- a/examples/rbd/storageclass.yaml +++ b/examples/rbd/storageclass.yaml @@ -4,6 +4,10 @@ kind: StorageClass metadata: name: csi-rbd-sc provisioner: rbd.csi.ceph.com +# If topology based provisioning is desired, delayed provisioning of +# PV is required and is enabled using the following attribute +# For further information read TODO +# volumeBindingMode: WaitForFirstConsumer parameters: # String representing a Ceph cluster to provision storage from. # Should be unique across all Ceph clusters in use for provisioning, @@ -48,6 +52,25 @@ parameters: # a unique ID matching KMS ConfigMap. The ID is only used for correlation to # config map entry. # encryptionKMSID: + + # Add topology constrained pools configuration, if topology based pools + # are setup, and topology constrained provisioning is required. + # For further information read TODO + # topologyConstrainedPools: | + # [{"poolName":"pool0", + # "domainSegments":[ + # {"domainLabel":"region","value":"east"}, + # {"domainLabel":"zone","value":"zone1"}]}, + # {"poolName":"pool1", + # "domainSegments":[ + # {"domainLabel":"region","value":"east"}, + # {"domainLabel":"zone","value":"zone2"}]}, + # {"poolName":"pool2", + # "domainSegments":[ + # {"domainLabel":"region","value":"west"}, + # {"domainLabel":"zone","value":"zone1"}]} + # ] + reclaimPolicy: Delete allowVolumeExpansion: true mountOptions: diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index 71c825a41..d203a327c 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -76,7 +76,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } defer cs.VolumeLocks.Release(requestName) - volOptions, err := newVolumeOptions(ctx, requestName, req.GetParameters(), secret) + volOptions, err := newVolumeOptions(ctx, requestName, req, secret) if err != nil { klog.Errorf(util.Log(ctx, "validation and extraction of volume options failed: %v"), err) return nil, status.Error(codes.InvalidArgument, err.Error()) @@ -95,13 +95,20 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol // TODO return error message if requested vol size greater than found volume return error if vID != nil { - return &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - VolumeId: vID.VolumeID, - CapacityBytes: volOptions.Size, - VolumeContext: req.GetParameters(), - }, - }, nil + volume := &csi.Volume{ + VolumeId: vID.VolumeID, + CapacityBytes: volOptions.Size, + VolumeContext: req.GetParameters(), + } + if volOptions.Topology != nil { + volume.AccessibleTopology = + []*csi.Topology{ + { + Segments: volOptions.Topology, + }, + } + } + return &csi.CreateVolumeResponse{Volume: volume}, nil } // Reservation @@ -128,13 +135,20 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol klog.V(4).Infof(util.Log(ctx, "cephfs: successfully created backing volume named %s for request name %s"), vID.FsSubvolName, requestName) - return &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - VolumeId: vID.VolumeID, - CapacityBytes: volOptions.Size, - VolumeContext: req.GetParameters(), - }, - }, nil + volume := &csi.Volume{ + VolumeId: vID.VolumeID, + CapacityBytes: volOptions.Size, + VolumeContext: req.GetParameters(), + } + if volOptions.Topology != nil { + volume.AccessibleTopology = + []*csi.Topology{ + { + Segments: volOptions.Topology, + }, + } + } + return &csi.CreateVolumeResponse{Volume: volume}, nil } // deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin, diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index 27995057d..406f39a37 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -26,7 +26,6 @@ import ( ) const ( - // volIDVersion is the version number of volume ID encoding scheme volIDVersion uint16 = 1 @@ -81,9 +80,9 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis } // NewNodeServer initialize a node server for ceph CSI driver. -func NewNodeServer(d *csicommon.CSIDriver, t string) *NodeServer { +func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) *NodeServer { return &NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), VolumeLocks: util.NewVolumeLocks(), } } @@ -91,14 +90,17 @@ func NewNodeServer(d *csicommon.CSIDriver, t string) *NodeServer { // Run start a non-blocking grpc controller,node and identityserver for // ceph CSI driver which can serve multiple parallel requests func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { + var err error + var topology map[string]string + // Configuration PluginFolder = conf.PluginPath - if err := loadAvailableMounters(conf); err != nil { + if err = loadAvailableMounters(conf); err != nil { klog.Fatalf("cephfs: failed to load ceph mounters: %v", err) } - if err := util.WriteCephConfig(); err != nil { + if err = util.WriteCephConfig(); err != nil { klog.Fatalf("failed to write ceph configuration file: %v", err) } @@ -137,14 +139,22 @@ func (fs *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { fs.is = NewIdentityServer(fs.cd) if conf.IsNodeServer { - fs.ns = NewNodeServer(fs.cd, conf.Vtype) + topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) + if err != nil { + klog.Fatalln(err) + } + fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology) } if conf.IsControllerServer { fs.cs = NewControllerServer(fs.cd, cachePersister) } if !conf.IsControllerServer && !conf.IsNodeServer { - fs.ns = NewNodeServer(fs.cd, conf.Vtype) + topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) + if err != nil { + klog.Fatalln(err) + } + fs.ns = NewNodeServer(fs.cd, conf.Vtype, topology) fs.cs = NewControllerServer(fs.cd, cachePersister) } diff --git a/pkg/cephfs/fsjournal.go b/pkg/cephfs/fsjournal.go index c8fd4340e..b462cd05d 100644 --- a/pkg/cephfs/fsjournal.go +++ b/pkg/cephfs/fsjournal.go @@ -46,10 +46,7 @@ request name lock, and hence any stale omaps are leftovers from incomplete trans hence safe to garbage collect. */ func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) { - var ( - vi util.CSIIdentifier - vid volumeIdentifier - ) + var vid volumeIdentifier cr, err := util.NewAdminCredentials(secret) if err != nil { @@ -57,41 +54,36 @@ func checkVolExists(ctx context.Context, volOptions *volumeOptions, secret map[s } defer cr.DeleteCredentials() - imageUUID, err := volJournal.CheckReservation(ctx, volOptions.Monitors, cr, + imageData, err := volJournal.CheckReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix, "", "") if err != nil { return nil, err } - if imageUUID == "" { + if imageData == nil { return nil, nil } - - // now that we now that the reservation exists, let's get the volume name from - // the omap - _, vid.FsSubvolName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr, - volOptions.MetadataPool, imageUUID, false) - if err != nil { - return nil, err - } + imageUUID := imageData.ImageUUID + vid.FsSubvolName = imageData.ImageAttributes.ImageName _, err = getVolumeRootPathCeph(ctx, volOptions, cr, volumeID(vid.FsSubvolName)) if err != nil { if _, ok := err.(ErrVolumeNotFound); ok { - err = volJournal.UndoReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) + err = volJournal.UndoReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, + volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) return nil, err } return nil, err } + + // check if topology constraints match what is found + // TODO: we need an API to fetch subvolume attributes (size/datapool and others), based + // on which we can evaluate which topology this belongs to. + // TODO: CephFS topology support is postponed till we get the same // TODO: size checks // found a volume already available, process and return it! - vi = util.CSIIdentifier{ - LocationID: volOptions.FscID, - EncodingVersion: volIDVersion, - ClusterID: volOptions.ClusterID, - ObjectUUID: imageUUID, - } - vid.VolumeID, err = vi.ComposeCSIID() + vid.VolumeID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID, + "", volOptions.ClusterID, imageUUID, volIDVersion) if err != nil { return nil, err } @@ -111,16 +103,29 @@ func undoVolReservation(ctx context.Context, volOptions *volumeOptions, vid volu defer cr.DeleteCredentials() err = volJournal.UndoReservation(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, - vid.FsSubvolName, volOptions.RequestName) + volOptions.MetadataPool, vid.FsSubvolName, volOptions.RequestName) return err } +func updateTopologyConstraints(volOpts *volumeOptions) error { + // update request based on topology constrained parameters (if present) + poolName, topology, err := util.FindPoolAndTopology(volOpts.TopologyPools, volOpts.TopologyRequirement) + if err != nil { + return err + } + if poolName != "" { + volOpts.Pool = poolName + volOpts.Topology = topology + } + + return nil +} + // reserveVol is a helper routine to request a UUID reservation for the CSI VolumeName and, // to generate the volume identifier for the reserved UUID func reserveVol(ctx context.Context, volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) { var ( - vi util.CSIIdentifier vid volumeIdentifier imageUUID string err error @@ -132,20 +137,20 @@ func reserveVol(ctx context.Context, volOptions *volumeOptions, secret map[strin } defer cr.DeleteCredentials() - imageUUID, vid.FsSubvolName, err = volJournal.ReserveName(ctx, volOptions.Monitors, cr, - volOptions.MetadataPool, volOptions.RequestName, volOptions.NamePrefix, "", "") + err = updateTopologyConstraints(volOptions) + if err != nil { + return nil, err + } + + imageUUID, vid.FsSubvolName, err = volJournal.ReserveName(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, util.InvalidPoolID, + volOptions.MetadataPool, util.InvalidPoolID, volOptions.RequestName, volOptions.NamePrefix, "", "") if err != nil { return nil, err } // generate the volume ID to return to the CO system - vi = util.CSIIdentifier{ - LocationID: volOptions.FscID, - EncodingVersion: volIDVersion, - ClusterID: volOptions.ClusterID, - ObjectUUID: imageUUID, - } - vid.VolumeID, err = vi.ComposeCSIID() + vid.VolumeID, err = util.GenerateVolID(ctx, volOptions.Monitors, cr, volOptions.FscID, + "", volOptions.ClusterID, imageUUID, volIDVersion) if err != nil { return nil, err } diff --git a/pkg/cephfs/identityserver.go b/pkg/cephfs/identityserver.go index 3ee357350..ab560cbf6 100644 --- a/pkg/cephfs/identityserver.go +++ b/pkg/cephfs/identityserver.go @@ -48,6 +48,13 @@ func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.Ge }, }, }, + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS, + }, + }, + }, }, }, nil } diff --git a/pkg/cephfs/volumeoptions.go b/pkg/cephfs/volumeoptions.go index bc717566e..93acbac5c 100644 --- a/pkg/cephfs/volumeoptions.go +++ b/pkg/cephfs/volumeoptions.go @@ -21,26 +21,30 @@ import ( "fmt" "strconv" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/pkg/errors" "github.com/ceph/ceph-csi/pkg/util" ) type volumeOptions struct { - RequestName string - NamePrefix string - Size int64 - ClusterID string - FsName string - FscID int64 - MetadataPool string - Monitors string `json:"monitors"` - Pool string `json:"pool"` - RootPath string `json:"rootPath"` - Mounter string `json:"mounter"` - ProvisionVolume bool `json:"provisionVolume"` - KernelMountOptions string `json:"kernelMountOptions"` - FuseMountOptions string `json:"fuseMountOptions"` + TopologyPools *[]util.TopologyConstrainedPool + TopologyRequirement *csi.TopologyRequirement + Topology map[string]string + RequestName string + NamePrefix string + Size int64 + ClusterID string + FsName string + FscID int64 + MetadataPool string + Monitors string `json:"monitors"` + Pool string `json:"pool"` + RootPath string `json:"rootPath"` + Mounter string `json:"mounter"` + ProvisionVolume bool `json:"provisionVolume"` + KernelMountOptions string `json:"kernelMountOptions"` + FuseMountOptions string `json:"fuseMountOptions"` } func validateNonEmptyField(field, fieldName string) error { @@ -127,12 +131,15 @@ func getMonsAndClusterID(options map[string]string) (string, string, error) { // newVolumeOptions generates a new instance of volumeOptions from the provided // CSI request parameters -func newVolumeOptions(ctx context.Context, requestName string, volOptions, secret map[string]string) (*volumeOptions, error) { +func newVolumeOptions(ctx context.Context, requestName string, req *csi.CreateVolumeRequest, + secret map[string]string) (*volumeOptions, error) { var ( opts volumeOptions err error ) + volOptions := req.GetParameters() + opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(volOptions) if err != nil { return nil, err @@ -176,6 +183,19 @@ func newVolumeOptions(ctx context.Context, requestName string, volOptions, secre return nil, err } + // store topology information from the request + opts.TopologyPools, opts.TopologyRequirement, err = util.GetTopologyFromRequest(req) + if err != nil { + return nil, err + } + + // TODO: we need an API to fetch subvolume attributes (size/datapool and others), based + // on which we can evaluate which topology this belongs to. + // CephFS tracker: https://tracker.ceph.com/issues/44277 + if opts.TopologyPools != nil { + return nil, errors.New("topology based provisioning is not supported for CephFS backed volumes") + } + opts.ProvisionVolume = true return &opts, nil @@ -221,11 +241,13 @@ func newVolumeOptionsFromVolID(ctx context.Context, volID string, volOpt, secret return nil, nil, err } - volOptions.RequestName, vid.FsSubvolName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr, + imageAttributes, err := volJournal.GetImageAttributes(ctx, volOptions.Monitors, cr, volOptions.MetadataPool, vi.ObjectUUID, false) if err != nil { return nil, nil, err } + volOptions.RequestName = imageAttributes.RequestName + vid.FsSubvolName = imageAttributes.ImageName if volOpt != nil { if err = extractOptionalOption(&volOptions.Pool, "pool", volOpt); err != nil { diff --git a/pkg/csi-common/driver.go b/pkg/csi-common/driver.go index 5a7b92842..db13bc530 100644 --- a/pkg/csi-common/driver.go +++ b/pkg/csi-common/driver.go @@ -30,8 +30,10 @@ type CSIDriver struct { name string nodeID string version string - cap []*csi.ControllerServiceCapability - vc []*csi.VolumeCapability_AccessMode + // topology constraints that this nodeserver will advertise + topology map[string]string + cap []*csi.ControllerServiceCapability + vc []*csi.VolumeCapability_AccessMode } // NewCSIDriver Creates a NewCSIDriver object. Assumes vendor diff --git a/pkg/csi-common/nodeserver-default.go b/pkg/csi-common/nodeserver-default.go index 0ebbbf9aa..031189360 100644 --- a/pkg/csi-common/nodeserver-default.go +++ b/pkg/csi-common/nodeserver-default.go @@ -57,8 +57,13 @@ func (ns *DefaultNodeServer) NodeExpandVolume(ctx context.Context, req *csi.Node func (ns *DefaultNodeServer) NodeGetInfo(ctx context.Context, req *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) { klog.V(5).Infof(util.Log(ctx, "Using default NodeGetInfo")) + csiTopology := &csi.Topology{ + Segments: ns.Driver.topology, + } + return &csi.NodeGetInfoResponse{ - NodeId: ns.Driver.nodeID, + NodeId: ns.Driver.nodeID, + AccessibleTopology: csiTopology, }, nil } diff --git a/pkg/csi-common/utils.go b/pkg/csi-common/utils.go index 510eafb5e..6eca28afb 100644 --- a/pkg/csi-common/utils.go +++ b/pkg/csi-common/utils.go @@ -48,7 +48,8 @@ func NewVolumeCapabilityAccessMode(mode csi.VolumeCapability_AccessMode_Mode) *c } // NewDefaultNodeServer initializes default node server -func NewDefaultNodeServer(d *CSIDriver, t string) *DefaultNodeServer { +func NewDefaultNodeServer(d *CSIDriver, t string, topology map[string]string) *DefaultNodeServer { + d.topology = topology return &DefaultNodeServer{ Driver: d, Type: t, diff --git a/pkg/rbd/controllerserver.go b/pkg/rbd/controllerserver.go index 4f9805f41..f08535d2a 100644 --- a/pkg/rbd/controllerserver.go +++ b/pkg/rbd/controllerserver.go @@ -83,7 +83,7 @@ func (cs *ControllerServer) parseVolCreateRequest(ctx context.Context, req *csi. isMultiNode := false isBlock := false for _, cap := range req.VolumeCapabilities { - // RO modes need to be handled indepedently (ie right now even if access mode is RO, they'll be RW upon attach) + // RO modes need to be handled independently (ie right now even if access mode is RO, they'll be RW upon attach) if cap.GetAccessMode().GetMode() == csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER { isMultiNode = true } @@ -114,6 +114,16 @@ func (cs *ControllerServer) parseVolCreateRequest(ctx context.Context, req *csi. // always round up the request size in bytes to the nearest MiB/GiB rbdVol.VolSize = util.RoundOffBytes(volSizeBytes) + // start with pool the same as journal pool, in case there is a topology + // based split, pool for the image will be updated subsequently + rbdVol.JournalPool = rbdVol.Pool + + // store topology information from the request + rbdVol.TopologyPools, rbdVol.TopologyRequirement, err = util.GetTopologyFromRequest(req) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + // NOTE: rbdVol does not contain VolID and RbdImageName populated, everything // else is populated post create request parsing return rbdVol, nil @@ -163,17 +173,32 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } } - return &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - VolumeId: rbdVol.VolID, - CapacityBytes: rbdVol.VolSize, - VolumeContext: req.GetParameters(), - ContentSource: req.GetVolumeContentSource(), - }, - }, nil + volumeContext := req.GetParameters() + volumeContext["pool"] = rbdVol.Pool + volumeContext["journalPool"] = rbdVol.JournalPool + volume := &csi.Volume{ + VolumeId: rbdVol.VolID, + CapacityBytes: rbdVol.VolSize, + VolumeContext: volumeContext, + ContentSource: req.GetVolumeContentSource(), + } + if rbdVol.Topology != nil { + volume.AccessibleTopology = + []*csi.Topology{ + { + Segments: rbdVol.Topology, + }, + } + } + return &csi.CreateVolumeResponse{Volume: volume}, nil } - err = reserveVol(ctx, rbdVol, cr) + rbdSnap, err := cs.checkSnapshotSource(ctx, req, cr) + if err != nil { + return nil, err + } + + err = reserveVol(ctx, rbdVol, rbdSnap, cr) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } @@ -186,7 +211,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } }() - err = cs.createBackingImage(ctx, rbdVol, req) + err = createBackingImage(ctx, cr, rbdVol, rbdSnap) if err != nil { return nil, err } @@ -205,80 +230,81 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } } - return &csi.CreateVolumeResponse{ - Volume: &csi.Volume{ - VolumeId: rbdVol.VolID, - CapacityBytes: rbdVol.VolSize, - VolumeContext: req.GetParameters(), - ContentSource: req.GetVolumeContentSource(), - }, - }, nil + volumeContext := req.GetParameters() + volumeContext["pool"] = rbdVol.Pool + volumeContext["journalPool"] = rbdVol.JournalPool + volume := &csi.Volume{ + VolumeId: rbdVol.VolID, + CapacityBytes: rbdVol.VolSize, + VolumeContext: volumeContext, + ContentSource: req.GetVolumeContentSource(), + } + if rbdVol.Topology != nil { + volume.AccessibleTopology = + []*csi.Topology{ + { + Segments: rbdVol.Topology, + }, + } + } + return &csi.CreateVolumeResponse{Volume: volume}, nil } -func (cs *ControllerServer) createBackingImage(ctx context.Context, rbdVol *rbdVolume, req *csi.CreateVolumeRequest) error { +func createBackingImage(ctx context.Context, cr *util.Credentials, rbdVol *rbdVolume, rbdSnap *rbdSnapshot) error { var err error - // if VolumeContentSource is not nil, this request is for snapshot - if req.VolumeContentSource != nil { - if err = cs.checkSnapshot(ctx, req, rbdVol); err != nil { + if rbdSnap != nil { + err = restoreSnapshot(ctx, rbdVol, rbdSnap, cr) + if err != nil { return err } - } else { - cr, err := util.NewUserCredentials(req.GetSecrets()) - if err != nil { - return status.Error(codes.Internal, err.Error()) - } - defer cr.DeleteCredentials() - err = createImage(ctx, rbdVol, cr) - if err != nil { - klog.Errorf(util.Log(ctx, "failed to create volume: %v"), err) - return status.Error(codes.Internal, err.Error()) - } - - klog.V(4).Infof(util.Log(ctx, "created image %s"), rbdVol.RbdImageName) + klog.V(4).Infof(util.Log(ctx, "created volume %s from snapshot %s"), rbdVol.RequestName, rbdSnap.RbdSnapName) + return nil } + err = createImage(ctx, rbdVol, cr) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to create volume: %v"), err) + return status.Error(codes.Internal, err.Error()) + } + + klog.V(4).Infof(util.Log(ctx, "created volume %s backed by image %s"), rbdVol.RequestName, rbdVol.RbdImageName) + return nil } -func (cs *ControllerServer) checkSnapshot(ctx context.Context, req *csi.CreateVolumeRequest, rbdVol *rbdVolume) error { +func (cs *ControllerServer) checkSnapshotSource(ctx context.Context, req *csi.CreateVolumeRequest, + cr *util.Credentials) (*rbdSnapshot, error) { + if req.VolumeContentSource == nil { + return nil, nil + } + snapshot := req.VolumeContentSource.GetSnapshot() if snapshot == nil { - return status.Error(codes.InvalidArgument, "volume Snapshot cannot be empty") + return nil, status.Error(codes.InvalidArgument, "volume Snapshot cannot be empty") } snapshotID := snapshot.GetSnapshotId() if snapshotID == "" { - return status.Error(codes.InvalidArgument, "volume Snapshot ID cannot be empty") + return nil, status.Error(codes.InvalidArgument, "volume Snapshot ID cannot be empty") } - cr, err := util.NewUserCredentials(req.GetSecrets()) - if err != nil { - return status.Error(codes.Internal, err.Error()) - } - defer cr.DeleteCredentials() - rbdSnap := &rbdSnapshot{} - if err = genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil { + if err := genSnapFromSnapID(ctx, rbdSnap, snapshotID, cr); err != nil { if _, ok := err.(ErrSnapNotFound); !ok { - return status.Error(codes.Internal, err.Error()) + return nil, status.Error(codes.Internal, err.Error()) } if _, ok := err.(util.ErrPoolNotFound); ok { klog.Errorf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err) - return status.Error(codes.InvalidArgument, err.Error()) + return nil, status.Error(codes.InvalidArgument, err.Error()) } - return status.Error(codes.InvalidArgument, "missing requested Snapshot ID") + return nil, status.Error(codes.InvalidArgument, "missing requested Snapshot ID") } - err = restoreSnapshot(ctx, rbdVol, rbdSnap, cr) - if err != nil { - return status.Error(codes.Internal, err.Error()) - } - klog.V(4).Infof(util.Log(ctx, "create volume %s from snapshot %s"), req.GetName(), rbdSnap.RbdSnapName) - return nil + return rbdSnap, nil } // DeleteLegacyVolume deletes a volume provisioned using version 1.0.0 of the plugin diff --git a/pkg/rbd/driver.go b/pkg/rbd/driver.go index 00952da56..7726ae730 100644 --- a/pkg/rbd/driver.go +++ b/pkg/rbd/driver.go @@ -77,10 +77,10 @@ func NewControllerServer(d *csicommon.CSIDriver, cachePersister util.CachePersis } // NewNodeServer initialize a node server for rbd CSI driver. -func NewNodeServer(d *csicommon.CSIDriver, t string) (*NodeServer, error) { +func NewNodeServer(d *csicommon.CSIDriver, t string, topology map[string]string) (*NodeServer, error) { mounter := mount.New("") return &NodeServer{ - DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t), + DefaultNodeServer: csicommon.NewDefaultNodeServer(d, t, topology), mounter: mounter, VolumeLocks: util.NewVolumeLocks(), }, nil @@ -90,6 +90,7 @@ func NewNodeServer(d *csicommon.CSIDriver, t string) (*NodeServer, error) { // rbd CSI driver which can serve multiple parallel requests func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { var err error + var topology map[string]string // Create ceph.conf for use with CLI commands if err = util.WriteCephConfig(); err != nil { @@ -134,7 +135,11 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { r.ids = NewIdentityServer(r.cd) if conf.IsNodeServer { - r.ns, err = NewNodeServer(r.cd, conf.Vtype) + topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) + if err != nil { + klog.Fatalln(err) + } + r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology) if err != nil { klog.Fatalf("failed to start node server, err %v\n", err) } @@ -144,7 +149,11 @@ func (r *Driver) Run(conf *util.Config, cachePersister util.CachePersister) { r.cs = NewControllerServer(r.cd, cachePersister) } if !conf.IsControllerServer && !conf.IsNodeServer { - r.ns, err = NewNodeServer(r.cd, conf.Vtype) + topology, err = util.GetTopologyFromDomainLabels(conf.DomainLabels, conf.NodeID, conf.DriverName) + if err != nil { + klog.Fatalln(err) + } + r.ns, err = NewNodeServer(r.cd, conf.Vtype, topology) if err != nil { klog.Fatalf("failed to start node server, err %v\n", err) } diff --git a/pkg/rbd/identityserver.go b/pkg/rbd/identityserver.go index 70eaf5d0a..aecc41c83 100644 --- a/pkg/rbd/identityserver.go +++ b/pkg/rbd/identityserver.go @@ -48,6 +48,13 @@ func (is *IdentityServer) GetPluginCapabilities(ctx context.Context, req *csi.Ge }, }, }, + { + Type: &csi.PluginCapability_Service_{ + Service: &csi.PluginCapability_Service{ + Type: csi.PluginCapability_Service_VOLUME_ACCESSIBILITY_CONSTRAINTS, + }, + }, + }, }, }, nil } diff --git a/pkg/rbd/nodeserver.go b/pkg/rbd/nodeserver.go index 8bb2ac562..8359a93a4 100644 --- a/pkg/rbd/nodeserver.go +++ b/pkg/rbd/nodeserver.go @@ -145,13 +145,20 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } default: var vi util.CSIIdentifier + var imageAttributes *util.ImageAttributes err = vi.DecomposeCSIID(volID) if err != nil { err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID) return nil, status.Error(codes.Internal, err.Error()) } - _, volOptions.RbdImageName, _, _, err = volJournal.GetObjectUUIDData(ctx, volOptions.Monitors, cr, volOptions.Pool, vi.ObjectUUID, false) + imageAttributes, err = volJournal.GetImageAttributes(ctx, volOptions.Monitors, cr, + volOptions.Pool, vi.ObjectUUID, false) + if err != nil { + err = fmt.Errorf("error fetching image attributes for volume ID (%s) (%s)", err, volID) + return nil, status.Error(codes.Internal, err.Error()) + } + volOptions.RbdImageName = imageAttributes.ImageName } volOptions.VolID = volID diff --git a/pkg/rbd/rbd_journal.go b/pkg/rbd/rbd_journal.go index 809e37659..d202af5d0 100644 --- a/pkg/rbd/rbd_journal.go +++ b/pkg/rbd/rbd_journal.go @@ -114,36 +114,36 @@ func checkSnapExists(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credent return false, err } - snapUUID, err := snapJournal.CheckReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, + snapData, err := snapJournal.CheckReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.JournalPool, rbdSnap.RequestName, rbdSnap.NamePrefix, rbdSnap.RbdImageName, "") if err != nil { return false, err } - if snapUUID == "" { + if snapData == nil { return false, nil } + snapUUID := snapData.ImageUUID + rbdSnap.RbdSnapName = snapData.ImageAttributes.ImageName - // now that we now that the reservation exists, let's get the image name from - // the omap - _, rbdSnap.RbdSnapName, _, _, err = volJournal.GetObjectUUIDData(ctx, rbdSnap.Monitors, cr, - rbdSnap.Pool, snapUUID, false) - if err != nil { - return false, err + // it should never happen that this disagrees, but check + if rbdSnap.Pool != snapData.ImagePool { + return false, fmt.Errorf("stored snapshot pool (%s) and expected snapshot pool (%s) mismatch", + snapData.ImagePool, rbdSnap.Pool) } // Fetch on-disk image attributes err = updateSnapWithImageInfo(ctx, rbdSnap, cr) if err != nil { if _, ok := err.(ErrSnapNotFound); ok { - err = snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, - rbdSnap.RbdSnapName, rbdSnap.RequestName) + err = snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.JournalPool, + rbdSnap.Pool, rbdSnap.RbdSnapName, rbdSnap.RequestName) return false, err } return false, err } // found a snapshot already available, process and return its information - rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, + rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, snapData.ImagePoolID, rbdSnap.Pool, rbdSnap.ClusterID, snapUUID, volIDVersion) if err != nil { return false, err @@ -173,22 +173,30 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials if rbdVol.Encrypted { kmsID = rbdVol.KMS.GetID() } - imageUUID, err := volJournal.CheckReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool, + + imageData, err := volJournal.CheckReservation(ctx, rbdVol.Monitors, cr, rbdVol.JournalPool, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID) if err != nil { return false, err } - if imageUUID == "" { + if imageData == nil { return false, nil } - // now that we now that the reservation exists, let's get the image name from - // the omap - _, rbdVol.RbdImageName, _, _, err = volJournal.GetObjectUUIDData(ctx, rbdVol.Monitors, cr, - rbdVol.Pool, imageUUID, false) + imageUUID := imageData.ImageUUID + rbdVol.RbdImageName = imageData.ImageAttributes.ImageName + + // check if topology constraints match what is found + rbdVol.Topology, err = util.MatchTopologyForPool(rbdVol.TopologyPools, + rbdVol.TopologyRequirement, imageData.ImagePool) if err != nil { + // TODO check if need any undo operation here, or ErrVolNameConflict return false, err } + // update Pool, if it was topology constrained + if rbdVol.Topology != nil { + rbdVol.Pool = imageData.ImagePool + } // NOTE: Return volsize should be on-disk volsize, not request vol size, so // save it for size checks before fetching image data @@ -197,7 +205,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials err = updateVolWithImageInfo(ctx, rbdVol, cr) if err != nil { if _, ok := err.(ErrImageNotFound); ok { - err = volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool, + err = volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.JournalPool, rbdVol.Pool, rbdVol.RbdImageName, rbdVol.RequestName) return false, err } @@ -213,7 +221,7 @@ func checkVolExists(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials // TODO: We should also ensure image features and format is the same // found a volume already available, process and return it! - rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, rbdVol.Pool, + rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imageData.ImagePoolID, rbdVol.Pool, rbdVol.ClusterID, imageUUID, volIDVersion) if err != nil { return false, err @@ -233,13 +241,18 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials err error ) - snapUUID, rbdSnap.RbdSnapName, err = snapJournal.ReserveName(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, - rbdSnap.RequestName, rbdSnap.NamePrefix, rbdSnap.RbdImageName, "") + journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdSnap.Monitors, rbdSnap.JournalPool, rbdSnap.Pool, cr) if err != nil { return err } - rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, + snapUUID, rbdSnap.RbdSnapName, err = snapJournal.ReserveName(ctx, rbdSnap.Monitors, cr, rbdSnap.JournalPool, journalPoolID, + rbdSnap.Pool, imagePoolID, rbdSnap.RequestName, rbdSnap.NamePrefix, rbdSnap.RbdImageName, "") + if err != nil { + return err + } + + rbdSnap.SnapID, err = util.GenerateVolID(ctx, rbdSnap.Monitors, cr, imagePoolID, rbdSnap.Pool, rbdSnap.ClusterID, snapUUID, volIDVersion) if err != nil { return err @@ -251,26 +264,66 @@ func reserveSnap(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials return nil } +func updateTopologyConstraints(rbdVol *rbdVolume, rbdSnap *rbdSnapshot) error { + var err error + if rbdSnap != nil { + // check if topology constraints matches snapshot pool + rbdVol.Topology, err = util.MatchTopologyForPool(rbdVol.TopologyPools, + rbdVol.TopologyRequirement, rbdSnap.Pool) + if err != nil { + return err + } + + // update Pool, if it was topology constrained + if rbdVol.Topology != nil { + rbdVol.Pool = rbdSnap.Pool + } + + return nil + } + // update request based on topology constrained parameters (if present) + poolName, topology, err := util.FindPoolAndTopology(rbdVol.TopologyPools, rbdVol.TopologyRequirement) + if err != nil { + return err + } + if poolName != "" { + rbdVol.Pool = poolName + rbdVol.Topology = topology + } + + return nil +} + // reserveVol is a helper routine to request a rbdVolume name reservation and generate the // volume ID for the generated name -func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { +func reserveVol(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) error { var ( imageUUID string err error ) + err = updateTopologyConstraints(rbdVol, rbdSnap) + if err != nil { + return err + } + + journalPoolID, imagePoolID, err := util.GetPoolIDs(ctx, rbdVol.Monitors, rbdVol.JournalPool, rbdVol.Pool, cr) + if err != nil { + return err + } + kmsID := "" if rbdVol.Encrypted { kmsID = rbdVol.KMS.GetID() } - imageUUID, rbdVol.RbdImageName, err = volJournal.ReserveName(ctx, rbdVol.Monitors, cr, rbdVol.Pool, - rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID) + imageUUID, rbdVol.RbdImageName, err = volJournal.ReserveName(ctx, rbdVol.Monitors, cr, rbdVol.JournalPool, journalPoolID, + rbdVol.Pool, imagePoolID, rbdVol.RequestName, rbdVol.NamePrefix, "", kmsID) if err != nil { return err } - rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, rbdVol.Pool, + rbdVol.VolID, err = util.GenerateVolID(ctx, rbdVol.Monitors, cr, imagePoolID, rbdVol.Pool, rbdVol.ClusterID, imageUUID, volIDVersion) if err != nil { return err @@ -284,7 +337,7 @@ func reserveVol(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) er // undoSnapReservation is a helper routine to undo a name reservation for rbdSnapshot func undoSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Credentials) error { - err := snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, + err := snapJournal.UndoReservation(ctx, rbdSnap.Monitors, cr, rbdSnap.JournalPool, rbdSnap.Pool, rbdSnap.RbdSnapName, rbdSnap.RequestName) return err @@ -292,7 +345,7 @@ func undoSnapReservation(ctx context.Context, rbdSnap *rbdSnapshot, cr *util.Cre // undoVolReservation is a helper routine to undo a name reservation for rbdVolume func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error { - err := volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.Pool, + err := volJournal.UndoReservation(ctx, rbdVol.Monitors, cr, rbdVol.JournalPool, rbdVol.Pool, rbdVol.RbdImageName, rbdVol.RequestName) return err diff --git a/pkg/rbd/rbd_util.go b/pkg/rbd/rbd_util.go index c69218879..937b265a6 100644 --- a/pkg/rbd/rbd_util.go +++ b/pkg/rbd/rbd_util.go @@ -31,6 +31,7 @@ import ( "github.com/ceph/ceph-csi/pkg/util" "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" + "github.com/container-storage-interface/spec/lib/go/csi" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/timestamp" "github.com/pborman/uuid" @@ -69,24 +70,33 @@ type rbdVolume struct { // JSON tag as it is not stashed in JSON encoded config maps in v1.0.0 // VolName and MonValueFromSecret are retained from older plugin versions (<= 1.0.0) // for backward compatibility reasons - RbdImageName string - NamePrefix string - VolID string `json:"volID"` - Monitors string `json:"monitors"` - Pool string `json:"pool"` - DataPool string - ImageFeatures string `json:"imageFeatures"` - AdminID string `json:"adminId"` - UserID string `json:"userId"` - Mounter string `json:"mounter"` - ClusterID string `json:"clusterId"` - RequestName string - VolName string `json:"volName"` - MonValueFromSecret string `json:"monValueFromSecret"` - VolSize int64 `json:"volSize"` - DisableInUseChecks bool `json:"disableInUseChecks"` - Encrypted bool - KMS util.EncryptionKMS + // JournalPool is the ceph pool in which the CSI Journal is stored + // Pool is where the image journal and image is stored, and could be the same as `JournalPool` + // (retained as Pool instead of renaming to ImagePool or such, as this is referenced in the code extensively) + // DataPool is where the data for images in `Pool` are stored, this is used as the `--data-pool` + // argument when the pool is created, and is not used anywhere else + TopologyPools *[]util.TopologyConstrainedPool + TopologyRequirement *csi.TopologyRequirement + Topology map[string]string + RbdImageName string + NamePrefix string + VolID string `json:"volID"` + Monitors string `json:"monitors"` + JournalPool string + Pool string `json:"pool"` + DataPool string + ImageFeatures string `json:"imageFeatures"` + AdminID string `json:"adminId"` + UserID string `json:"userId"` + Mounter string `json:"mounter"` + ClusterID string `json:"clusterId"` + RequestName string + VolName string `json:"volName"` + MonValueFromSecret string `json:"monValueFromSecret"` + VolSize int64 `json:"volSize"` + DisableInUseChecks bool `json:"disableInUseChecks"` + Encrypted bool + KMS util.EncryptionKMS // connection conn *rados.Conn @@ -99,12 +109,15 @@ type rbdSnapshot struct { // RbdSnapName is the name of the RBD snapshot backing this rbdSnapshot // SnapID is the snapshot ID that is exchanged with CSI drivers, identifying this rbdSnapshot // RequestName is the CSI generated snapshot name for the rbdSnapshot + // JournalPool is the ceph pool in which the CSI snapshot Journal is stored + // Pool is where the image snapshot journal and snapshot is stored, and could be the same as `JournalPool` SourceVolumeID string RbdImageName string NamePrefix string RbdSnapName string SnapID string Monitors string + JournalPool string Pool string CreatedAt *timestamp.Timestamp SizeBytes int64 @@ -352,12 +365,25 @@ func genSnapFromSnapID(ctx context.Context, rbdSnap *rbdSnapshot, snapshotID str if err != nil { return err } + rbdSnap.JournalPool = rbdSnap.Pool - rbdSnap.RequestName, rbdSnap.RbdSnapName, rbdSnap.RbdImageName, _, err = snapJournal.GetObjectUUIDData(ctx, rbdSnap.Monitors, + imageAttributes, err := snapJournal.GetImageAttributes(ctx, rbdSnap.Monitors, cr, rbdSnap.Pool, vi.ObjectUUID, true) if err != nil { return err } + rbdSnap.RequestName = imageAttributes.RequestName + rbdSnap.RbdImageName = imageAttributes.SourceName + rbdSnap.RbdSnapName = imageAttributes.ImageName + + // convert the journal pool ID to name, for use in DeleteSnapshot cases + if imageAttributes.JournalPoolID != util.InvalidPoolID { + rbdSnap.JournalPool, err = util.GetPoolName(ctx, rbdSnap.Monitors, cr, imageAttributes.JournalPoolID) + if err != nil { + // TODO: If pool is not found we may leak the image (as DeleteSnapshot will return success) + return err + } + } err = updateSnapWithImageInfo(ctx, rbdSnap, cr) @@ -395,20 +421,32 @@ func genVolFromVolID(ctx context.Context, rbdVol *rbdVolume, volumeID string, cr if err != nil { return err } + rbdVol.JournalPool = rbdVol.Pool - kmsID := "" - rbdVol.RequestName, rbdVol.RbdImageName, _, kmsID, err = volJournal.GetObjectUUIDData(ctx, rbdVol.Monitors, cr, + imageAttributes, err := volJournal.GetImageAttributes(ctx, rbdVol.Monitors, cr, rbdVol.Pool, vi.ObjectUUID, false) if err != nil { return err } - if kmsID != "" { + + if imageAttributes.KmsID != "" { rbdVol.Encrypted = true - rbdVol.KMS, err = util.GetKMS(kmsID, secrets) + rbdVol.KMS, err = util.GetKMS(imageAttributes.KmsID, secrets) if err != nil { return err } } + rbdVol.RequestName = imageAttributes.RequestName + rbdVol.RbdImageName = imageAttributes.ImageName + + // convert the journal pool ID to name, for use in DeleteVolume cases + if imageAttributes.JournalPoolID >= 0 { + rbdVol.JournalPool, err = util.GetPoolName(ctx, rbdVol.Monitors, cr, imageAttributes.JournalPoolID) + if err != nil { + // TODO: If pool is not found we may leak the image (as DeleteVolume will return success) + return err + } + } err = updateVolWithImageInfo(ctx, rbdVol, cr) @@ -579,6 +617,7 @@ func genSnapFromOptions(ctx context.Context, rbdVol *rbdVolume, snapOptions map[ rbdSnap := &rbdSnapshot{} rbdSnap.Pool = rbdVol.Pool + rbdSnap.JournalPool = rbdVol.JournalPool rbdSnap.Monitors, rbdSnap.ClusterID, err = getMonsAndClusterID(ctx, snapOptions) if err != nil { diff --git a/pkg/util/cephcmds.go b/pkg/util/cephcmds.go index f6704e66f..c971a2b83 100644 --- a/pkg/util/cephcmds.go +++ b/pkg/util/cephcmds.go @@ -29,6 +29,9 @@ import ( "k8s.io/klog" ) +// InvalidPoolID used to denote an invalid pool +const InvalidPoolID int64 = -1 + // ExecCommand executes passed in program with args and returns separate stdout and stderr streams func ExecCommand(program string, args ...string) (stdout, stderr []byte, err error) { var ( @@ -117,6 +120,26 @@ func GetPoolName(ctx context.Context, monitors string, cr *Credentials, poolID i return "", ErrPoolNotFound{string(poolID), fmt.Errorf("pool ID (%d) not found in Ceph cluster", poolID)} } +// GetPoolIDs searches a list of pools in a cluster and returns the IDs of the pools that matches +// the passed in pools +// TODO this should take in a list and return a map[string(poolname)]int64(poolID) +func GetPoolIDs(ctx context.Context, monitors, journalPool, imagePool string, cr *Credentials) (int64, int64, error) { + journalPoolID, err := GetPoolID(ctx, monitors, cr, journalPool) + if err != nil { + return InvalidPoolID, InvalidPoolID, err + } + + imagePoolID := journalPoolID + if imagePool != journalPool { + imagePoolID, err = GetPoolID(ctx, monitors, cr, imagePool) + if err != nil { + return InvalidPoolID, InvalidPoolID, err + } + } + + return journalPoolID, imagePoolID, nil +} + // SetOMapKeyValue sets the given key and value into the provided Ceph omap name func SetOMapKeyValue(ctx context.Context, monitors string, cr *Credentials, poolName, namespace, oMapName, oMapKey, keyValue string) error { // Command: "rados setomapval oMapName oMapKey keyValue" diff --git a/pkg/util/topology.go b/pkg/util/topology.go new file mode 100644 index 000000000..16f535674 --- /dev/null +++ b/pkg/util/topology.go @@ -0,0 +1,255 @@ +/* +Copyright 2020 The Ceph-CSI Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/container-storage-interface/spec/lib/go/csi" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog" +) + +const ( + keySeparator rune = '/' + labelSeparator string = "," +) + +func k8sGetNodeLabels(nodeName string) (map[string]string, error) { + client := NewK8sClient() + node, err := client.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to get node (%s) information : %v", nodeName, err) + } + + return node.GetLabels(), nil +} + +// GetTopologyFromDomainLabels returns the CSI topology map, determined from +// the domain labels and their values from the CO system +// Expects domainLabels in arg to be in the format "[prefix/],[prefix/],...", +func GetTopologyFromDomainLabels(domainLabels, nodeName, driverName string) (map[string]string, error) { + if domainLabels == "" { + return nil, nil + } + + // size checks on domain label prefix + topologyPrefix := strings.ToLower("topology." + driverName) + if len(topologyPrefix) > 63 { + return nil, fmt.Errorf("computed topology label prefix (%s) for node exceeds length limits", topologyPrefix) + } + // driverName is validated, and we are adding a lowercase "topology." to it, so no validation for conformance + + // Convert passed in labels to a map, and check for uniqueness + labelsToRead := strings.SplitN(domainLabels, labelSeparator, -1) + klog.Infof("passed in node labels for processing : %+v", labelsToRead) + + labelsIn := make(map[string]bool) + labelCount := 0 + for _, label := range labelsToRead { + // as we read the labels from k8s, and check for missing labels, + // no label conformance checks here + if _, ok := labelsIn[label]; ok { + return nil, fmt.Errorf("duplicate label (%s) found in domain labels", label) + } + + labelsIn[label] = true + labelCount++ + } + + nodeLabels, err := k8sGetNodeLabels(nodeName) + if err != nil { + return nil, err + } + + // Determine values for requested labels from node labels + domainMap := make(map[string]string) + found := 0 + for key, value := range nodeLabels { + if _, ok := labelsIn[key]; !ok { + continue + } + // label found split name component and store value + nameIdx := strings.IndexRune(key, keySeparator) + domain := key[nameIdx+1:] + domainMap[domain] = value + labelsIn[key] = false + found++ + } + + // Ensure all labels are found + if found != labelCount { + missingLabels := []string{} + for key, missing := range labelsIn { + if missing { + missingLabels = append(missingLabels, key) + } + } + return nil, fmt.Errorf("missing domain labels %v on node (%s)", missingLabels, nodeName) + } + + klog.Infof("list of domains processed : %+v", domainMap) + + topology := make(map[string]string) + for domain, value := range domainMap { + topology[topologyPrefix+"/"+domain] = value + // TODO: when implementing domain takeover/giveback, enable a domain value that can remain pinned to the node + // topology["topology."+driverName+"/"+domain+"-pinned"] = value + } + + return topology, nil +} + +type topologySegment struct { + DomainLabel string `json:"domainLabel"` + DomainValue string `json:"value"` +} + +// TopologyConstrainedPool stores the pool name and a list of its associated topology domain values +type TopologyConstrainedPool struct { + PoolName string `json:"poolName"` + DomainSegments []topologySegment `json:"domainSegments"` +} + +// GetTopologyFromRequest extracts TopologyConstrainedPools and passed in accessibility constraints +// from a CSI CreateVolume request +func GetTopologyFromRequest(req *csi.CreateVolumeRequest) (*[]TopologyConstrainedPool, *csi.TopologyRequirement, error) { + var ( + topologyPools []TopologyConstrainedPool + ) + + // check if parameters have pool configuration pertaining to topology + topologyPoolsStr := req.GetParameters()["topologyConstrainedPools"] + if topologyPoolsStr == "" { + return nil, nil, nil + } + + // check if there are any accessibility requirements in the request + accessibilityRequirements := req.GetAccessibilityRequirements() + if accessibilityRequirements == nil { + return nil, nil, nil + } + + // extract topology based pools configuration + err := json.Unmarshal([]byte(strings.Replace(topologyPoolsStr, "\n", " ", -1)), &topologyPools) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse JSON encoded topology constrained pools parameter (%s): %v", topologyPoolsStr, err) + } + + return &topologyPools, accessibilityRequirements, nil +} + +// MatchTopologyForPool returns the topology map, if the passed in pool matches any +// passed in accessibility constraints +func MatchTopologyForPool(topologyPools *[]TopologyConstrainedPool, + accessibilityRequirements *csi.TopologyRequirement, poolName string) (map[string]string, error) { + var topologyPool []TopologyConstrainedPool + + if topologyPools == nil || accessibilityRequirements == nil { + return nil, nil + } + + // find the pool in the list of topology based pools + for _, value := range *topologyPools { + if value.PoolName == poolName { + topologyPool = append(topologyPool, value) + break + } + } + if len(topologyPool) == 0 { + return nil, fmt.Errorf("none of the configured topology pools (%+v) matched passed in pool name (%s)", + topologyPools, poolName) + } + + _, topology, err := FindPoolAndTopology(&topologyPool, accessibilityRequirements) + + return topology, err +} + +// FindPoolAndTopology loops through passed in "topologyPools" and also related +// accessibility requirements, to determine which pool matches the requirement. +// The return variables are, image poolname, data poolname, and topology map of +// matched requirement +func FindPoolAndTopology(topologyPools *[]TopologyConstrainedPool, + accessibilityRequirements *csi.TopologyRequirement) (string, map[string]string, error) { + if topologyPools == nil || accessibilityRequirements == nil { + return "", nil, nil + } + + // select pool that fits first topology constraint preferred requirements + for _, topology := range accessibilityRequirements.GetPreferred() { + poolName := matchPoolToTopology(topologyPools, topology) + if poolName != "" { + return poolName, topology.GetSegments(), nil + } + } + + // If preferred mismatches, check requisite for a fit + for _, topology := range accessibilityRequirements.GetRequisite() { + poolName := matchPoolToTopology(topologyPools, topology) + if poolName != "" { + return poolName, topology.GetSegments(), nil + } + } + + return "", nil, fmt.Errorf("none of the topology constrained pools matched requested "+ + "topology constraints : pools (%+v) requested topology (%+v)", + *topologyPools, *accessibilityRequirements) +} + +// matchPoolToTopology loops through passed in pools, and for each pool checks if all +// requested topology segments are present and match the request, returning the first pool +// that hence matches (or an empty string if none match) +func matchPoolToTopology(topologyPools *[]TopologyConstrainedPool, topology *csi.Topology) string { + domainMap := extractDomainsFromlabels(topology) + + // check if any pool matches all the domain keys and values + for _, topologyPool := range *topologyPools { + mismatch := false + // match all pool topology labels to requested topology + for _, segment := range topologyPool.DomainSegments { + if domainValue, ok := domainMap[segment.DomainLabel]; !ok || domainValue != segment.DomainValue { + mismatch = true + break + } + } + + if mismatch { + continue + } + + return topologyPool.PoolName + } + + return "" +} + +// extractDomainsFromlabels returns the domain name map, from passed in domain segments, +// which is of the form [prefix/] +func extractDomainsFromlabels(topology *csi.Topology) map[string]string { + domainMap := make(map[string]string) + for domainKey, value := range topology.GetSegments() { + domainIdx := strings.IndexRune(domainKey, keySeparator) + domain := domainKey[domainIdx+1:] + domainMap[domain] = value + } + + return domainMap +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 1931c0bd5..c4baff551 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -81,6 +81,7 @@ type Config struct { InstanceID string // unique ID distinguishing this instance of Ceph CSI MetadataStorage string // metadata persistence method [node|k8s_configmap] PluginPath string // location of cephcsi plugin + DomainLabels string // list of domain labels to read from the node // cephfs related flags MountCacheDir string // mount info cache save dir @@ -147,15 +148,19 @@ func ValidateDriverName(driverName string) error { // GenerateVolID generates a volume ID based on passed in parameters and version, to be returned // to the CO system -func GenerateVolID(ctx context.Context, monitors string, cr *Credentials, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) { - poolID, err := GetPoolID(ctx, monitors, cr, pool) - if err != nil { - return "", err +func GenerateVolID(ctx context.Context, monitors string, cr *Credentials, locationID int64, pool, clusterID, objUUID string, volIDVersion uint16) (string, error) { + var err error + + if locationID == InvalidPoolID { + locationID, err = GetPoolID(ctx, monitors, cr, pool) + if err != nil { + return "", err + } } // generate the volume ID to return to the CO system vi := CSIIdentifier{ - LocationID: poolID, + LocationID: locationID, EncodingVersion: volIDVersion, ClusterID: clusterID, ObjectUUID: objUUID, diff --git a/pkg/util/voljournal.go b/pkg/util/voljournal.go index 5aa6e1eb3..8c0f4101d 100644 --- a/pkg/util/voljournal.go +++ b/pkg/util/voljournal.go @@ -18,13 +18,19 @@ package util import ( "context" + "encoding/binary" + "encoding/hex" "fmt" + "strings" "github.com/pborman/uuid" "github.com/pkg/errors" "k8s.io/klog" ) +// Length of string representation of uuid, xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx is 36 bytes +const uuidEncodedLength = 36 + /* RADOS omaps usage: @@ -103,6 +109,7 @@ const ( defaultSnapshotNamingPrefix string = "csi-snap-" ) +// CSIJournal defines the interface and the required key names for the above RADOS based OMaps type CSIJournal struct { // csiDirectory is the name of the CSI volumes object map that contains CSI volume-name (or // snapshot name) based keys @@ -122,6 +129,10 @@ type CSIJournal struct { // of this Ceph volume csiImageKey string + // pool ID where csiDirectory is maintained, as it can be different from where the ceph volume + // object map is maintained, during topology based provisioning + csiJournalPool string + // source volume name key in per Ceph snapshot object map, containing Ceph source volume uuid // for which the snapshot was created cephSnapSourceKey string @@ -141,6 +152,7 @@ func NewCSIVolumeJournal() *CSIJournal { cephUUIDDirectoryPrefix: "csi.volume.", csiNameKey: "csi.volname", csiImageKey: "csi.imagename", + csiJournalPool: "csi.journalpool", cephSnapSourceKey: "", namespace: "", encryptKMSKey: "csi.volume.encryptKMS", @@ -155,6 +167,7 @@ func NewCSISnapshotJournal() *CSIJournal { cephUUIDDirectoryPrefix: "csi.snap.", csiNameKey: "csi.snapname", csiImageKey: "csi.imagename", + csiJournalPool: "csi.journalpool", cephSnapSourceKey: "csi.source", namespace: "", encryptKMSKey: "csi.volume.encryptKMS", @@ -183,6 +196,14 @@ func (cj *CSIJournal) SetNamespace(ns string) { cj.namespace = ns } +// ImageData contains image name and stored CSI properties +type ImageData struct { + ImageUUID string + ImagePool string + ImagePoolID int64 + ImageAttributes *ImageAttributes +} + /* CheckReservation checks if given request name contains a valid reservation - If there is a valid reservation, then the corresponding UUID for the volume/snapshot is returned @@ -198,71 +219,114 @@ Return values: there was no reservation found - error: non-nil in case of any errors */ -func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials, pool, reqName, namePrefix, parentName, kmsConf string) (string, error) { - var snapSource bool +func (cj *CSIJournal) CheckReservation(ctx context.Context, monitors string, cr *Credentials, + journalPool, reqName, namePrefix, parentName, kmsConfig string) (*ImageData, error) { + var ( + snapSource bool + objUUID string + savedImagePool string + savedImagePoolID int64 = InvalidPoolID + ) if parentName != "" { if cj.cephSnapSourceKey == "" { err := errors.New("invalid request, cephSnapSourceKey is nil") - return "", err + return nil, err } snapSource = true } // check if request name is already part of the directory omap - objUUID, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory, + objUUIDAndPool, err := GetOMapValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName) if err != nil { // error should specifically be not found, for volume to be absent, any other error // is not conclusive, and we should not proceed switch err.(type) { case ErrKeyNotFound, ErrPoolNotFound: - return "", nil + return nil, nil } - return "", err + return nil, err } - savedReqName, _, savedReqParentName, savedKms, err := cj.GetObjectUUIDData(ctx, monitors, cr, pool, + // check UUID only encoded value + if len(objUUIDAndPool) == uuidEncodedLength { + objUUID = objUUIDAndPool + savedImagePool = journalPool + } else { // check poolID/UUID encoding; extract the vol UUID and pool name + var buf64 []byte + components := strings.Split(objUUIDAndPool, "/") + objUUID = components[1] + savedImagePoolIDStr := components[0] + + buf64, err = hex.DecodeString(savedImagePoolIDStr) + if err != nil { + return nil, err + } + savedImagePoolID = int64(binary.BigEndian.Uint64(buf64)) + + savedImagePool, err = GetPoolName(ctx, monitors, cr, savedImagePoolID) + if err != nil { + if _, ok := err.(ErrPoolNotFound); ok { + err = cj.UndoReservation(ctx, monitors, cr, journalPool, "", "", reqName) + } + return nil, err + } + } + + savedImageAttributes, err := cj.GetImageAttributes(ctx, monitors, cr, savedImagePool, objUUID, snapSource) if err != nil { // error should specifically be not found, for image to be absent, any other error // is not conclusive, and we should not proceed if _, ok := err.(ErrKeyNotFound); ok { - err = cj.UndoReservation(ctx, monitors, cr, pool, cj.GetNameForUUID(namePrefix, objUUID, snapSource), reqName) + err = cj.UndoReservation(ctx, monitors, cr, journalPool, savedImagePool, + cj.GetNameForUUID(namePrefix, objUUID, snapSource), reqName) } - return "", err + return nil, err } // check if UUID key points back to the request name - if savedReqName != reqName { + if savedImageAttributes.RequestName != reqName { // NOTE: This should never be possible, hence no cleanup, but log error // and return, as cleanup may need to occur manually! - return "", fmt.Errorf("internal state inconsistent, omap names mismatch,"+ + return nil, fmt.Errorf("internal state inconsistent, omap names mismatch,"+ " request name (%s) volume UUID (%s) volume omap name (%s)", - reqName, objUUID, savedReqName) + reqName, objUUID, savedImageAttributes.RequestName) } - if kmsConf != "" { - if savedKms != kmsConf { - return "", fmt.Errorf("internal state inconsistent, omap encryption KMS"+ + if kmsConfig != "" { + if savedImageAttributes.KmsID != kmsConfig { + return nil, fmt.Errorf("internal state inconsistent, omap encryption KMS"+ " mismatch, request KMS (%s) volume UUID (%s) volume omap KMS (%s)", - kmsConf, objUUID, savedKms) + kmsConfig, objUUID, savedImageAttributes.KmsID) } } + // TODO: skipping due to excessive poolID to poolname call, also this should never happen! + // check if journal pool points back to the passed in journal pool + // if savedJournalPoolID != journalPoolID { + if snapSource { // check if source UUID key points back to the parent volume passed in - if savedReqParentName != parentName { + if savedImageAttributes.SourceName != parentName { // NOTE: This can happen if there is a snapname conflict, and we already have a snapshot // with the same name pointing to a different UUID as the source err = fmt.Errorf("snapname points to different volume, request name (%s)"+ " source name (%s) saved source name (%s)", - reqName, parentName, savedReqParentName) - return "", ErrSnapNameConflict{reqName, err} + reqName, parentName, savedImageAttributes.SourceName) + return nil, ErrSnapNameConflict{reqName, err} } } - return objUUID, nil + imageData := &ImageData{ + ImageUUID: objUUID, + ImagePool: savedImagePool, + ImagePoolID: savedImagePoolID, + ImageAttributes: savedImageAttributes, + } + + return imageData, nil } /* @@ -274,30 +338,37 @@ prior to cleaning up the reservation NOTE: As the function manipulates omaps, it should be called with a lock against the request name held, to prevent parallel operations from modifying the state of the omaps for this request name. + +Input arguments: + - csiJournalPool: Pool name that holds the CSI request name based journal + - volJournalPool: Pool name that holds the image/subvolume and the per-image journal (may be + different if image is created in a topology constrained pool) */ -func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials, pool, volName, reqName string) error { +func (cj *CSIJournal) UndoReservation(ctx context.Context, monitors string, cr *Credentials, + csiJournalPool, volJournalPool, volName, reqName string) error { // delete volume UUID omap (first, inverse of create order) - // TODO: Check cases where volName can be empty, and we need to just cleanup the reqName - if len(volName) < 36 { - return fmt.Errorf("unable to parse UUID from %s, too short", volName) - } + if volName != "" { + if len(volName) < 36 { + return fmt.Errorf("unable to parse UUID from %s, too short", volName) + } - imageUUID := volName[len(volName)-36:] - if valid := uuid.Parse(imageUUID); valid == nil { - return fmt.Errorf("failed parsing UUID in %s", volName) - } + imageUUID := volName[len(volName)-36:] + if valid := uuid.Parse(imageUUID); valid == nil { + return fmt.Errorf("failed parsing UUID in %s", volName) + } - err := RemoveObject(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID) - if err != nil { - if _, ok := err.(ErrObjectNotFound); !ok { - klog.Errorf(Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err) - return err + err := RemoveObject(ctx, monitors, cr, volJournalPool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID) + if err != nil { + if _, ok := err.(ErrObjectNotFound); !ok { + klog.Errorf(Log(ctx, "failed removing oMap %s (%s)"), cj.cephUUIDDirectoryPrefix+imageUUID, err) + return err + } } } // delete the request name key (last, inverse of create order) - err = RemoveOMapKey(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory, + err := RemoveOMapKey(ctx, monitors, cr, csiJournalPool, cj.namespace, cj.csiDirectory, cj.csiNameKeyPrefix+reqName) if err != nil { klog.Errorf(Log(ctx, "failed removing oMap key %s (%s)"), cj.csiNameKeyPrefix+reqName, err) @@ -346,13 +417,31 @@ pointers to the CSI generated request names. NOTE: As the function manipulates omaps, it should be called with a lock against the request name held, to prevent parallel operations from modifying the state of the omaps for this request name. +Input arguments: + - journalPool: Pool where the CSI journal is stored (maybe different than the pool where the + image/subvolume is created duw to topology constraints) + - journalPoolID: pool ID of the journalPool + - imagePool: Pool where the image/subvolume is created + - imagePoolID: pool ID of the imagePool + - reqName: Name of the volume request received + - namePrefix: Prefix to use when generating the image/subvolume name (suffix is an auto-genetated UUID) + - parentName: Name of the parent image/subvolume if reservation is for a snapshot (optional) + - kmsConf: Name of the key management service used to encrypt the image (optional) + Return values: - string: Contains the UUID that was reserved for the passed in reqName - string: Contains the image name that was reserved for the passed in reqName - error: non-nil in case of any errors */ -func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials, pool, reqName, namePrefix, parentName, kmsConf string) (string, string, error) { - var snapSource bool +func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Credentials, + journalPool string, journalPoolID int64, + imagePool string, imagePoolID int64, + reqName, namePrefix, parentName, kmsConf string) (string, string, error) { + // TODO: Take in-arg as ImageAttributes? + var ( + snapSource bool + nameKeyVal string + ) if parentName != "" { if cj.cephSnapSourceKey == "" { @@ -366,54 +455,81 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred // NOTE: If any service loss occurs post creation of the UUID directory, and before // setting the request name key (csiNameKey) to point back to the UUID directory, the // UUID directory key will be leaked - volUUID, err := reserveOMapName(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix) + volUUID, err := reserveOMapName(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix) if err != nil { return "", "", err } imageName := cj.GetNameForUUID(namePrefix, volUUID, snapSource) - // Create request name (csiNameKey) key in csiDirectory and store the UUId based - // volume name into it - err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.csiDirectory, - cj.csiNameKeyPrefix+reqName, volUUID) + // Create request name (csiNameKey) key in csiDirectory and store the UUID based + // volume name and optionally the image pool location into it + if journalPool != imagePool && imagePoolID != InvalidPoolID { + buf64 := make([]byte, 8) + binary.BigEndian.PutUint64(buf64, uint64(imagePoolID)) + poolIDEncodedHex := hex.EncodeToString(buf64) + nameKeyVal = poolIDEncodedHex + "/" + volUUID + } else { + nameKeyVal = volUUID + } + + err = SetOMapKeyValue(ctx, monitors, cr, journalPool, cj.namespace, cj.csiDirectory, + cj.csiNameKeyPrefix+reqName, nameKeyVal) if err != nil { return "", "", err } defer func() { if err != nil { klog.Warningf(Log(ctx, "reservation failed for volume: %s"), reqName) - errDefer := cj.UndoReservation(ctx, monitors, cr, pool, imageName, reqName) + errDefer := cj.UndoReservation(ctx, monitors, cr, imagePool, journalPool, imageName, reqName) if errDefer != nil { klog.Warningf(Log(ctx, "failed undoing reservation of volume: %s (%v)"), reqName, errDefer) } } }() - // Update UUID directory to store CSI request name and image name - err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + // NOTE: UUID directory is stored on the same pool as the image, helps determine image attributes + // and also CSI journal pool, when only the VolumeID is passed in (e.g DeleteVolume/DeleteSnapshot, + // VolID during CreateSnapshot). + // Update UUID directory to store CSI request name + err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.csiNameKey, reqName) if err != nil { return "", "", err } - err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + // Update UUID directory to store image name + err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.csiImageKey, imageName) if err != nil { return "", "", err } + // Update UUID directory to store encryption values if kmsConf != "" { - err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.encryptKMSKey, kmsConf) if err != nil { return "", "", err } } + if journalPool != imagePool && journalPoolID != InvalidPoolID { + buf64 := make([]byte, 8) + binary.BigEndian.PutUint64(buf64, uint64(journalPoolID)) + journalPoolIDStr := hex.EncodeToString(buf64) + + // Update UUID directory to store CSI journal pool name (prefer ID instead of name to be pool rename proof) + err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + cj.csiJournalPool, journalPoolIDStr) + if err != nil { + return "", "", err + } + } + if snapSource { // Update UUID directory to store source volume UUID in case of snapshots - err = SetOMapKeyValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, + err = SetOMapKeyValue(ctx, monitors, cr, imagePool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID, cj.cephSnapSourceKey, parentName) if err != nil { return "", "", err @@ -423,70 +539,89 @@ func (cj *CSIJournal) ReserveName(ctx context.Context, monitors string, cr *Cred return volUUID, imageName, nil } -/* -GetObjectUUIDData fetches all keys from a UUID directory -Return values: - - string: Contains the request name for the passed in UUID - - string: Contains the rbd image name for the passed in UUID - - string: Contains the parent image name for the passed in UUID, if it is a snapshot - - string: Contains encryption KMS, if it is an encrypted image - - error: non-nil in case of any errors -*/ -func (cj *CSIJournal) GetObjectUUIDData(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (string, string, string, string, error) { - var sourceName string +// ImageAttributes contains all CSI stored image attributes, typically as OMap keys +type ImageAttributes struct { + RequestName string // Contains the request name for the passed in UUID + SourceName string // Contains the parent image name for the passed in UUID, if it is a snapshot + ImageName string // Contains the image or subvolume name for the passed in UUID + KmsID string // Contains encryption KMS, if it is an encrypted image + JournalPoolID int64 // Pool ID of the CSI journal pool, stored in big endian format (on-disk data) +} + +// GetImageAttributes fetches all keys and their values, from a UUID directory, returning ImageAttributes structure +func (cj *CSIJournal) GetImageAttributes(ctx context.Context, monitors string, cr *Credentials, pool, objectUUID string, snapSource bool) (*ImageAttributes, error) { + var ( + err error + imageAttributes *ImageAttributes = &ImageAttributes{} + ) if snapSource && cj.cephSnapSourceKey == "" { - err := errors.New("invalid request, cephSnapSourceKey is nil") - return "", "", "", "", err + err = errors.New("invalid request, cephSnapSourceKey is nil") + return nil, err } // TODO: fetch all omap vals in one call, than make multiple listomapvals - requestName, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + imageAttributes.RequestName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey) if err != nil { - return "", "", "", "", err + return nil, err } // image key was added at some point, so not all volumes will have this key set // when ceph-csi was upgraded - imageName, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + imageAttributes.ImageName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiImageKey) if err != nil { // if the key was not found, assume the default key + UUID // otherwise return error switch err.(type) { default: - return "", "", "", "", err + return nil, err case ErrKeyNotFound, ErrPoolNotFound: } if snapSource { - imageName = defaultSnapshotNamingPrefix + objectUUID + imageAttributes.ImageName = defaultSnapshotNamingPrefix + objectUUID } else { - imageName = defaultVolumeNamingPrefix + objectUUID + imageAttributes.ImageName = defaultVolumeNamingPrefix + objectUUID } } - encryptionKmsConfig := "" - encryptionKmsConfig, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + imageAttributes.KmsID, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.encryptKMSKey) if err != nil { // ErrKeyNotFound means no encryption KMS was used switch err.(type) { default: - return "", "", "", "", fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s", + return nil, fmt.Errorf("OMapVal for %s/%s failed to get encryption KMS value: %s", pool, cj.cephUUIDDirectoryPrefix+objectUUID, err) case ErrKeyNotFound, ErrPoolNotFound: } } + journalPoolIDStr, err := GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiJournalPool) + if err != nil { + if _, ok := err.(ErrKeyNotFound); !ok { + return nil, err + } + imageAttributes.JournalPoolID = InvalidPoolID + } else { + var buf64 []byte + buf64, err = hex.DecodeString(journalPoolIDStr) + if err != nil { + return nil, err + } + imageAttributes.JournalPoolID = int64(binary.BigEndian.Uint64(buf64)) + } + if snapSource { - sourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, + imageAttributes.SourceName, err = GetOMapValue(ctx, monitors, cr, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey) if err != nil { - return "", "", "", "", err + return nil, err } } - return requestName, imageName, sourceName, encryptionKmsConfig, nil + return imageAttributes, nil }