Make CephFS plugin stateless reusing RADOS based journal scheme

This is a part of the stateless set of commits for CephCSI.

This commit removes the dependency on config maps to store cephFS provisioned
volumes, and instead relies on RADOS based objects and keys, and required
CSI VolumeID encoding to detect the provisioned volumes.

Changes:
- Provide backward compatibility to provisioned volumes by older plugin versions (1.0.0 or older)
- Remove Create/Delete support for statically provisioned volumes (fixes #382)
- Added namespace support to RADOS OMaps and used the same to store RADOS CSI objects and keys in the CephFS metadata pool
- Added support to mention fsname for CephFS provisioning (fixes #359)
- Changed field name in CSI Identifier to 'location', to denote a pool or fscid
- Updated mounter cache to use new scheme
- Required Helm manifests are updated
- Required documentation and other manifests are updated
- Made driver option 'metadatastorage' as optional, as fresh installs do not need to specify the same

Testing done:
- Create/Mount/Delete PVC
- Create/Delete 5 PVCs
- Mount version 1.0.0 PVC
- Delete version 1.0.0 PV
- Mount Statically defined PV/PVC/Pod
- Mount Statically defined version 1.0.0 PV/PVC/Pod
- Delete Statically defined version 1.0.0 PV/PVC/Pod
- Node restart when mounted to test mountcache
- Use InstanceID other than 'default'
- RBD basic round of tests, as namespace is added to OMaps
- csitest against ceph-fs plugin
  - NOTE: CephFS plugin still does not detect and address already created
  volumes but of a different size
- Test not providing any value to the metadata storage parameter

Signed-off-by: ShyamsundarR <srangana@redhat.com>
This commit is contained in:
ShyamsundarR 2019-05-28 15:03:18 -04:00
parent 1406f29dcd
commit b9cd0e18ad
35 changed files with 959 additions and 262 deletions

View File

@ -42,11 +42,11 @@ var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "", "name of the driver")
nodeID = flag.String("nodeid", "", "node id")
instanceID = flag.String("instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+
" instances, when sharing Ceph clusters across CSI instances for provisioning")
// rbd related flags
containerized = flag.Bool("containerized", true, "whether run as containerized")
instanceID = flag.String("instanceid", "", "Unique ID distinguishing this instance of Ceph CSI among other"+
" instances, when sharing Ceph clusters across CSI instances for provisioning")
// cephfs related flags
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
@ -93,6 +93,8 @@ func getDriverName() string {
}
func main() {
var cp util.CachePersister
driverType := getType()
if len(driverType) == 0 {
klog.Fatalln("driver type not specified")
@ -112,13 +114,15 @@ func main() {
case cephfsType:
cephfs.PluginFolder = cephfs.PluginFolder + dname
cp, err := util.CreatePersistanceStorage(
cephfs.PluginFolder, *metadataStorage, dname)
if err != nil {
os.Exit(1)
if *metadataStorage != "" {
cp, err = util.CreatePersistanceStorage(
cephfs.PluginFolder, *metadataStorage, dname)
if err != nil {
os.Exit(1)
}
}
driver := cephfs.NewDriver()
driver.Run(dname, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, cp)
driver.Run(dname, *nodeID, *endpoint, *volumeMounter, *mountCacheDir, *instanceID, cp)
default:
klog.Fatalln("invalid volume type", vtype) // calls exit

View File

@ -0,0 +1,14 @@
---
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ .Values.configMapName | quote }}
labels:
app: {{ include "ceph-csi-cephfs.name" . }}
chart: {{ include "ceph-csi-cephfs.chart" . }}
component: {{ .Values.provisioner.name }}
release: {{ .Release.Name }}
heritage: {{ .Release.Service }}
data:
config.json: |-
[]

View File

@ -104,6 +104,8 @@ spec:
- mountPath: /lib/modules
name: lib-modules
readOnly: true
- name: ceph-csi-config
mountPath: /etc/ceph-csi-config/
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
volumes:
@ -137,6 +139,9 @@ spec:
- name: lib-modules
hostPath:
path: /lib/modules
- name: ceph-csi-config
configMap:
name: {{ .Values.configMapName | quote }}
{{- if .Values.nodeplugin.affinity -}}
affinity:
{{ toYaml .Values.nodeplugin.affinity . | indent 8 }}

View File

@ -86,6 +86,8 @@ spec:
mountPath: {{ .Values.socketDir }}
- name: host-rootfs
mountPath: "/rootfs"
- name: ceph-csi-config
mountPath: /etc/ceph-csi-config/
resources:
{{ toYaml .Values.nodeplugin.plugin.resources | indent 12 }}
volumes:
@ -95,6 +97,9 @@ spec:
- name: host-rootfs
hostPath:
path: /
- name: ceph-csi-config
configMap:
name: {{ .Values.configMapName | quote }}
{{- if .Values.provisioner.affinity -}}
affinity:
{{ toYaml .Values.provisioner.affinity . | indent 8 }}

View File

@ -18,6 +18,7 @@ socketFile: csi.sock
registrationDir: /var/lib/kubelet/plugins_registry
volumeDevicesDir: /var/lib/kubelet/plugins/kubernetes.io/csi/volumeDevices
driverName: cephfs.csi.ceph.com
configMapName: ceph-csi-config
attacher:
name: attacher
enabled: true

View File

@ -87,6 +87,8 @@ spec:
readOnly: true
- name: host-dev
mountPath: /dev
- name: ceph-csi-config
mountPath: /etc/ceph-csi-config/
volumes:
- name: socket-dir
hostPath:
@ -101,3 +103,6 @@ spec:
- name: host-dev
hostPath:
path: /dev
- name: ceph-csi-config
configMap:
name: ceph-csi-config

View File

@ -88,6 +88,8 @@ spec:
readOnly: true
- name: host-dev
mountPath: /dev
- name: ceph-csi-config
mountPath: /etc/ceph-csi-config/
volumes:
- name: mount-cache-dir
emptyDir: {}
@ -116,3 +118,6 @@ spec:
- name: host-dev
hostPath:
path: /dev
- name: ceph-csi-config
configMap:
name: ceph-csi-config

View File

@ -0,0 +1,8 @@
---
apiVersion: v1
kind: ConfigMap
data:
config.json: |-
[]
metadata:
name: ceph-csi-config

View File

@ -28,15 +28,16 @@ make image-cephcsi
**Available command line arguments:**
| Option | Default value | Description |
| ------------------- | --------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket |
| `--drivername` | `cephfs.csi.ceph.com` | name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value) |
| `--nodeid` | _empty_ | This node's ID |
| `--type` | _empty_ | driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin` |
| `--volumemounter` | _empty_ | default volume mounter. Available options are `kernel` and `fuse`. This is the mount method used if volume parameters don't specify otherwise. If left unspecified, the driver will first probe for `ceph-fuse` in system's path and will choose Ceph kernel client if probing failed. |
| `--metadatastorage` | _empty_ | Whether metadata should be kept on node as file or in a k8s configmap (`node` or `k8s_configmap`) |
| `--mountcachedir` | _empty_ | volume mount cache info save dir. If left unspecified, the dirver will not record mount info, or it will save mount info and when driver restart it will remount volume it cached. |
Option | Default value | Description
--------------------|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
`--endpoint` | `unix://tmp/csi.sock` | CSI endpoint, must be a UNIX socket
`--drivername` | `cephfs.csi.ceph.com` | Name of the driver (Kubernetes: `provisioner` field in StorageClass must correspond to this value)
`--nodeid` | _empty_ | This node's ID
| `--type` | _empty_ | Driver type `[rbd | cephfs]` If the driver type is set to `rbd` it will act as a `rbd plugin` or if it's set to `cephfs` will act as a `cephfs plugin`
`--volumemounter` | _empty_ | Default volume mounter. Available options are `kernel` and `fuse`. This is the mount method used if volume parameters don't specify otherwise. If left unspecified, the driver will first probe for `ceph-fuse` in system's path and will choose Ceph kernel client if probing failed.
`--mountcachedir` | _empty_ | Volume mount cache info save dir. If left unspecified, the dirver will not record mount info, or it will save mount info and when driver restart it will remount volume it cached.
`--instanceid` | "default" | Unique ID distinguishing this instance of Ceph CSI among other instances, when sharing Ceph clusters across CSI instances for provisioning
`--metadatastorage` | _empty_ | Points to where older (1.0.0 or older plugin versions) metadata about provisioned volumes are kept, as file or in as k8s configmap (`node` or `k8s_configmap` respectively)
**Available environmental variables:**
@ -49,24 +50,30 @@ is used to define in which namespace you want the configmaps to be stored
**Available volume parameters:**
| Parameter | Required | Description |
| --------------------------------------------------------------------------------------------------- | ------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `monitors` | yes | Comma separated list of Ceph monitors (e.g. `192.168.100.1:6789,192.168.100.2:6789,192.168.100.3:6789`) |
| `monValueFromSecret` | one of `monitors` and `monValueFromSecret` must be set | a string pointing the key in the credential secret, whose value is the mon. This is used for the case when the monitors' IP or hostnames are changed, the secret can be updated to pick up the new monitors. If both `monitors` and `monValueFromSecret` are set and the monitors set in the secret exists, `monValueFromSecret` takes precedence. |
| `mounter` | no | Mount method to be used for this volume. Available options are `kernel` for Ceph kernel client and `fuse` for Ceph FUSE driver. Defaults to "default mounter", see command line arguments. |
| `provisionVolume` | yes | Mode of operation. BOOL value. If `true`, a new CephFS volume will be provisioned. If `false`, an existing volume will be used. |
| `pool` | for `provisionVolume=true` | Ceph pool into which the volume shall be created |
| `rootPath` | for `provisionVolume=false` | Root path of an existing CephFS volume |
| `csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value |
| `csi.storage.k8s.io/provisioner-secret-namespace`, `csi.storage.k8s.io/node-stage-secret-namespace` | for Kubernetes | namespaces of the above Secret objects |
Parameter | Required | Description
----------------------------------------------------------------------------------------------------|--------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
`clusterID` | yes | String representing a Ceph cluster, must be unique across all Ceph clusters in use for provisioning, cannot be greater than 36 bytes in length, and should remain immutable for the lifetime of the Ceph cluster in use
`fsName` | yes | CephFS filesystem name into which the volume shall be created
`mounter` | no | Mount method to be used for this volume. Available options are `kernel` for Ceph kernel client and `fuse` for Ceph FUSE driver. Defaults to "default mounter", see command line arguments.
`pool` | yes | Ceph pool into which the volume shall be created
`csi.storage.k8s.io/provisioner-secret-name`, `csi.storage.k8s.io/node-stage-secret-name` | for Kubernetes | Name of the Kubernetes Secret object containing Ceph client credentials. Both parameters should have the same value
`csi.storage.k8s.io/provisioner-secret-namespace`, `csi.storage.k8s.io/node-stage-secret-namespace` | for Kubernetes | Namespaces of the above Secret objects
**Required secrets for `provisionVolume=true`:**
**NOTE:** An accompanying CSI configuration file, needs to be provided to the
running pods. Refer to [Creating CSI configuration](../examples/README.md#creating-csi-configuration)
for more information.
**NOTE:** A suggested way to populate and retain uniqueness of the clusterID is
to use the output of `ceph fsid` of the Ceph cluster to be used for
provisioning.
**Required secrets for provisioning:**
Admin credentials are required for provisioning new volumes
* `adminID`: ID of an admin client
* `adminKey`: key of the admin client
**Required secrets for `provisionVolume=false`:**
**Required secrets for statically provisioned volumes:**
User credentials with access to an existing volume
* `userID`: ID of a user client
@ -133,11 +140,14 @@ service/csi-cephfsplugin-provisioner ClusterIP 10.101.78.75 <none>
...
```
You can try deploying a demo pod from `examples/cephfs` to test the deployment further.
Once the CSI plugin configuration is updated with details from a Ceph cluster of
choice, you can try deploying a demo pod from examples/cephfs using the
instructions [provided](../examples/README.md#deploying-the-storage-class) to
test the deployment further.
### Notes on volume deletion
Volumes that were provisioned dynamically (i.e. `provisionVolume=true`) are
allowed to be deleted by the driver as well, if the user chooses to do
so.Otherwise, the driver is forbidden to delete such volumes - attempting to
delete them is a no-op.
Dynamically povisioned volumes are deleted by the driver, when requested to
do so. Statically provisioned volumes, from plugin versions less than or
equal to 1.0.0, are a no-op when a delete operation is performed against the
same, and are expected to be deleted on the Ceph cluster by the user.

View File

@ -52,11 +52,10 @@ make image-cephcsi
| `mounter` | no | if set to `rbd-nbd`, use `rbd-nbd` on nodes that have `rbd-nbd` and `nbd` kernel modules to map rbd images |
**NOTE:** An accompanying CSI configuration file, needs to be provided to the
running pods. Refer to [Creating CSI configuration for RBD based
provisioning](../examples/README.md#creating-csi-configuration-for-rbd-based-provisioning)
running pods. Refer to [Creating CSI configuration](../examples/README.md#creating-csi-configuration)
for more information.
**NOTE:** A suggested way to populate and retain uniquness of the clusterID is
**NOTE:** A suggested way to populate and retain uniqueness of the clusterID is
to use the output of `ceph fsid` of the Ceph cluster to be used for
provisioning.

View File

@ -9,16 +9,13 @@ By default, they look for the YAML manifests in
`../../deploy/{rbd,cephfs}/kubernetes`.
You can override this path by running `$ ./plugin-deploy.sh /path/to/my/manifests`.
## Creating CSI configuration for RBD based provisioning
## Creating CSI configuration
**NOTE:** This section is not required for cephfs based provisioning, and SHOULD
be skipped.
For RBD based provisioning, the CSI plugin requires configuration information
regarding the Ceph cluster(s), that would host the RBD based block devices. This
The CSI plugin requires configuration information regarding the Ceph cluster(s),
that would host the dynamically or statically provisioned volumes. This
is provided by adding a per-cluster identifier (referred to as clusterID), and
the required monitor details for the same, as in the provided [sample config
map](./rbd/csi-config-map-sample.yaml).
map](./csi-config-map-sample.yaml).
Gather the following information from the Ceph cluster(s) of choice,
@ -31,12 +28,12 @@ Gather the following information from the Ceph cluster(s) of choice,
* Alternatively, choose a `<cluster-id>` value that is distinct per Ceph
cluster in use by this kubernetes cluster
Update the [sample config map](./rbd/csi-config-map-sample.yaml) with values
Update the [sample config map](./csi-config-map-sample.yaml) with values
from a Ceph cluster and replace `<cluster-id>` with the chosen clusterID, to
create the manifest for the config map which can be updated in the cluster
using the following command,
* `kubectl replace -f rbd/csi-config-map-sample.yaml`
* `kubectl replace -f ./csi-config-map-sample.yaml`
Storage class and snapshot class, using `<cluster-id>` as the value for the
option `clusterID`, can now be created on the cluster.

View File

@ -8,7 +8,7 @@ fi
cd "$deployment_base" || exit 1
objects=(csi-provisioner-rbac csi-nodeplugin-rbac csi-cephfsplugin-provisioner csi-cephfsplugin)
objects=(csi-provisioner-rbac csi-nodeplugin-rbac csi-config-map csi-cephfsplugin-provisioner csi-cephfsplugin)
for obj in "${objects[@]}"; do
kubectl create -f "./$obj.yaml"

View File

@ -8,7 +8,7 @@ fi
cd "$deployment_base" || exit 1
objects=(csi-cephfsplugin-provisioner csi-cephfsplugin csi-provisioner-rbac csi-nodeplugin-rbac)
objects=(csi-cephfsplugin-provisioner csi-cephfsplugin csi-config-map csi-provisioner-rbac csi-nodeplugin-rbac)
for obj in "${objects[@]}"; do
kubectl delete -f "./$obj.yaml"

View File

@ -5,10 +5,10 @@ metadata:
name: csi-cephfs-secret
namespace: default
data:
# Required if provisionVolume is set to false
# Required for statically provisioned volumes
userID: BASE64-ENCODED-VALUE
userKey: BASE64-ENCODED-VALUE
# Required if provisionVolume is set to true
# Required for dynamically provisioned volumes
adminID: BASE64-ENCODED-VALUE
adminKey: BASE64-ENCODED-VALUE

View File

@ -5,27 +5,21 @@ metadata:
name: csi-cephfs-sc
provisioner: cephfs.csi.ceph.com
parameters:
# Comma separated list of Ceph monitors
# if using FQDN, make sure csi plugin's dns policy is appropriate.
monitors: mon1:port,mon2:port,...
# String representing a Ceph cluster to provision storage from.
# Should be unique across all Ceph clusters in use for provisioning,
# cannot be greater than 36 bytes in length, and should remain immutable for
# the lifetime of the StorageClass in use.
# Ensure to create an entry in the config map named ceph-csi-config, based on
# csi-config-map-sample.yaml, to accompany the string chosen to
# represent the Ceph cluster in clusterID below
clusterID: <cluster-id>
# For provisionVolume: "true":
# A new volume will be created along with a new Ceph user.
# Requires admin credentials (adminID, adminKey).
# For provisionVolume: "false":
# It is assumed the volume already exists and the user is expected
# to provide path to that volume (rootPath) and user credentials
# (userID, userKey).
provisionVolume: "true"
# CephFS filesystem name into which the volume shall be created
fsName: myfs
# Ceph pool into which the volume shall be created
# Required for provisionVolume: "true"
pool: cephfs_data
# Root path of an existing CephFS volume
# Required for provisionVolume: "false"
# rootPath: /absolute/path
# The secrets have to contain user and/or Ceph admin credentials.
csi.storage.k8s.io/provisioner-secret-name: csi-cephfs-secret
csi.storage.k8s.io/provisioner-secret-namespace: default

View File

@ -8,7 +8,7 @@ fi
cd "$deployment_base" || exit 1
objects=(csi-provisioner-rbac csi-nodeplugin-rbac csi-rbdplugin-provisioner csi-rbdplugin)
objects=(csi-provisioner-rbac csi-nodeplugin-rbac csi-config-map csi-rbdplugin-provisioner csi-rbdplugin)
for obj in "${objects[@]}"; do
kubectl create -f "./$obj.yaml"

View File

@ -8,7 +8,7 @@ fi
cd "$deployment_base" || exit 1
objects=(csi-rbdplugin-provisioner csi-rbdplugin csi-provisioner-rbac csi-nodeplugin-rbac)
objects=(csi-rbdplugin-provisioner csi-rbdplugin csi-config-map csi-provisioner-rbac csi-nodeplugin-rbac)
for obj in "${objects[@]}"; do
kubectl delete -f "./$obj.yaml"

117
pkg/cephfs/cephfs_util.go Normal file
View File

@ -0,0 +1,117 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"github.com/ceph/ceph-csi/pkg/util"
)
// MDSMap is a representation of the mds map sub-structure returned by 'ceph fs get'
type MDSMap struct {
FilesystemName string `json:"fs_name"`
}
// CephFilesystemDetails is a representation of the main json structure returned by 'ceph fs get'
type CephFilesystemDetails struct {
ID int64 `json:"id"`
MDSMap MDSMap `json:"mdsmap"`
}
func getFscID(monitors, id, key, fsName string) (int64, error) {
// ceph fs get myfs --format=json
// {"mdsmap":{...},"id":2}
var fsDetails CephFilesystemDetails
err := execCommandJSON(&fsDetails,
"ceph",
"-m", monitors,
"--id", id,
"--key="+key,
"-c", util.CephConfigPath,
"fs", "get", fsName, "--format=json",
)
if err != nil {
return 0, err
}
return fsDetails.ID, nil
}
// CephFilesystem is a representation of the json structure returned by 'ceph fs ls'
type CephFilesystem struct {
Name string `json:"name"`
MetadataPool string `json:"metadata_pool"`
MetadataPoolID int `json:"metadata_pool_id"`
DataPools []string `json:"data_pools"`
DataPoolIDs []int `json:"data_pool_ids"`
}
func getMetadataPool(monitors, id, key, fsName string) (string, error) {
// ./tbox ceph fs ls --format=json
// [{"name":"myfs","metadata_pool":"myfs-metadata","metadata_pool_id":4,...},...]
var filesystems []CephFilesystem
err := execCommandJSON(&filesystems,
"ceph",
"-m", monitors,
"--id", id,
"--key="+key,
"-c", util.CephConfigPath,
"fs", "ls", "--format=json",
)
if err != nil {
return "", err
}
for _, fs := range filesystems {
if fs.Name == fsName {
return fs.MetadataPool, nil
}
}
return "", fmt.Errorf("fsName (%s) not found in Ceph cluster", fsName)
}
// CephFilesystemDetails is a representation of the main json structure returned by 'ceph fs dump'
type CephFilesystemDump struct {
Filesystems []CephFilesystemDetails `json:"filesystems"`
}
func getFsName(monitors, id, key string, fscID int64) (string, error) {
// ./tbox ceph fs dump --format=json
// JSON: {...,"filesystems":[{"mdsmap":{},"id":<n>},...],...}
var fsDump CephFilesystemDump
err := execCommandJSON(&fsDump,
"ceph",
"-m", monitors,
"--id", id,
"--key="+key,
"-c", util.CephConfigPath,
"fs", "dump", "--format=json",
)
if err != nil {
return "", err
}
for _, fs := range fsDump.Filesystems {
if fs.ID == fscID {
return fs.MDSMap.FilesystemName, nil
}
}
return "", fmt.Errorf("fscID (%d) not found in Ceph cluster", fscID)
}

View File

@ -100,6 +100,7 @@ func createCephUser(volOptions *volumeOptions, adminCr *credentials, volID volum
func deleteCephUser(volOptions *volumeOptions, adminCr *credentials, volID volumeID) error {
adminID, userID := genUserIDs(adminCr, volID)
// TODO: Need to return success if userID is not found
return execCommandErr("ceph",
"-m", volOptions.Monitors,
"-n", adminID,

View File

@ -41,10 +41,39 @@ type controllerCacheEntry struct {
}
var (
mtxControllerVolumeID = keymutex.NewHashed(0)
mtxControllerVolumeID = keymutex.NewHashed(0)
mtxControllerVolumeName = keymutex.NewHashed(0)
)
// CreateVolume creates the volume in backend and store the volume metadata
// createBackingVolume creates the backing subvolume and user/key for the given volOptions and vID,
// and on any error cleans up any created entities
func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error {
cr, err := getAdminCredentials(secret)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
if err = createVolume(volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size); err != nil {
klog.Errorf("failed to create volume %s: %v", volOptions.RequestName, err)
return status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil {
if errDefer := purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); errDefer != nil {
klog.Warningf("failed purging volume: %s (%s)", volOptions.RequestName, errDefer)
}
}
}()
if _, err = createCephUser(volOptions, cr, volumeID(vID.FsSubvolName)); err != nil {
klog.Errorf("failed to create ceph user for volume %s: %v", volOptions.RequestName, err)
return status.Error(codes.Internal, err.Error())
}
return nil
}
// CreateVolume creates a reservation and the volume in backend, if it is not already present
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateCreateVolumeRequest(req); err != nil {
klog.Errorf("CreateVolumeRequest validation failed: %v", err)
@ -52,67 +81,69 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
// Configuration
secret := req.GetSecrets()
volOptions, err := newVolumeOptions(req.GetParameters(), secret)
requestName := req.GetName()
volOptions, err := newVolumeOptions(requestName, req.GetCapacityRange().GetRequiredBytes(),
req.GetParameters(), secret)
if err != nil {
klog.Errorf("validation of volume options failed: %v", err)
klog.Errorf("validation and extraction of volume options failed: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
volID := makeVolumeID(req.GetName())
// Existence and conflict checks
mtxControllerVolumeName.LockKey(requestName)
defer mustUnlock(mtxControllerVolumeName, requestName)
mtxControllerVolumeID.LockKey(string(volID))
defer mustUnlock(mtxControllerVolumeID, string(volID))
// Create a volume in case the user didn't provide one
if volOptions.ProvisionVolume {
// Admin credentials are required
cr, err := getAdminCredentials(secret)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if err = createVolume(volOptions, cr, volID, req.GetCapacityRange().GetRequiredBytes()); err != nil {
klog.Errorf("failed to create volume %s: %v", req.GetName(), err)
return nil, status.Error(codes.Internal, err.Error())
}
if _, err = createCephUser(volOptions, cr, volID); err != nil {
klog.Errorf("failed to create ceph user for volume %s: %v", req.GetName(), err)
return nil, status.Error(codes.Internal, err.Error())
}
klog.Infof("cephfs: successfully created volume %s", volID)
} else {
klog.Infof("cephfs: volume %s is provisioned statically", volID)
}
ce := &controllerCacheEntry{VolOptions: *volOptions, VolumeID: volID}
if err := cs.MetadataStore.Create(string(volID), ce); err != nil {
klog.Errorf("failed to store a cache entry for volume %s: %v", volID, err)
vID, err := checkVolExists(volOptions, secret)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if vID != nil {
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
VolumeContext: req.GetParameters(),
},
}, nil
}
// Reservation
vID, err = reserveVol(volOptions, secret)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil {
errDefer := undoVolReservation(volOptions, *vID, secret)
if errDefer != nil {
klog.Warningf("failed undoing reservation of volume: %s (%s)",
requestName, errDefer)
}
}
}()
// Create a volume
err = cs.createBackingVolume(volOptions, vID, secret)
if err != nil {
return nil, err
}
klog.Infof("cephfs: successfully created backing volume named %s for request name %s",
vID.FsSubvolName, requestName)
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
VolumeId: string(volID),
VolumeId: vID.VolumeID,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
VolumeContext: req.GetParameters(),
},
}, nil
}
// DeleteVolume deletes the volume in backend
// and removes the volume metadata from store
// nolint: gocyclo
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := cs.validateDeleteVolumeRequest(); err != nil {
klog.Errorf("DeleteVolumeRequest validation failed: %v", err)
return nil, err
}
// deleteVolumeDeprecated is used to delete volumes created using version 1.0.0 of the plugin,
// that have state information stored in files or kubernetes config maps
func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
var (
volID = volumeID(req.GetVolumeId())
secrets = req.GetSecrets()
@ -172,6 +203,65 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return &csi.DeleteVolumeResponse{}, nil
}
// DeleteVolume deletes the volume in backend and its reservation
func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := cs.validateDeleteVolumeRequest(); err != nil {
klog.Errorf("DeleteVolumeRequest validation failed: %v", err)
return nil, err
}
volID := volumeID(req.GetVolumeId())
secrets := req.GetSecrets()
// Find the volume using the provided VolumeID
volOptions, vID, err := newVolumeOptionsFromVolID(string(volID), nil, secrets)
if err != nil {
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
// or partially complete (subvolume and imageOMap are garbage collected already), hence
// return success as deletion is complete
if _, ok := err.(util.ErrKeyNotFound); ok {
return &csi.DeleteVolumeResponse{}, nil
}
// ErrInvalidVolID may mean this is an 1.0.0 version volume
if _, ok := err.(ErrInvalidVolID); ok && cs.MetadataStore != nil {
return cs.deleteVolumeDeprecated(req)
}
return nil, status.Error(codes.Internal, err.Error())
}
// Deleting a volume requires admin credentials
cr, err := getAdminCredentials(secrets)
if err != nil {
klog.Errorf("failed to retrieve admin credentials: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// lock out parallel delete and create requests against the same volume name as we
// cleanup the subvolume and associated omaps for the same
mtxControllerVolumeName.LockKey(volOptions.RequestName)
defer mustUnlock(mtxControllerVolumeName, volOptions.RequestName)
if err = purgeVolume(volumeID(vID.FsSubvolName), cr, volOptions); err != nil {
klog.Errorf("failed to delete volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
if err = deleteCephUser(volOptions, cr, volumeID(vID.FsSubvolName)); err != nil {
klog.Errorf("failed to delete ceph user for volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
if err := undoVolReservation(volOptions, *vID, secrets); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
klog.Infof("cephfs: successfully deleted volume %s", volID)
return &csi.DeleteVolumeResponse{}, nil
}
// ValidateVolumeCapabilities checks whether the volume capabilities requested
// are supported.
func (cs *ControllerServer) ValidateVolumeCapabilities(

View File

@ -26,9 +26,17 @@ import (
)
const (
// version of ceph driver
version = "1.0.0"
// volIDVersion is the version number of volume ID encoding scheme
volIDVersion uint16 = 1
// csiConfigFile is the location of the CSI config file
csiConfigFile = "/etc/ceph-csi-config/config.json"
// RADOS namespace to store CSI specific objects and keys
radosNamespace = "csi"
)
// PluginFolder defines the location of ceph plugin
@ -46,6 +54,14 @@ type Driver struct {
var (
// DefaultVolumeMounter for mounting volumes
DefaultVolumeMounter string
// CSIInstanceID is the instance ID that is unique to an instance of CSI, used when sharing
// ceph clusters across CSI instances, to differentiate omap names per CSI instance
CSIInstanceID = "default"
// volJournal is used to maintain RADOS based journals for CO generated
// VolumeName to backing CephFS subvolumes
volJournal *util.CSIJournal
)
// NewDriver returns new ceph driver
@ -77,7 +93,7 @@ func NewNodeServer(d *csicommon.CSIDriver) *NodeServer {
// Run start a non-blocking grpc controller,node and identityserver for
// ceph CSI driver which can serve multiple parallel requests
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir string, cachePersister util.CachePersister) {
func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir, instanceID string, cachePersister util.CachePersister) {
klog.Infof("Driver: %v version: %v", driverName, version)
// Configuration
@ -105,7 +121,21 @@ func (fs *Driver) Run(driverName, nodeID, endpoint, volumeMounter, mountCacheDir
klog.Fatalf("failed to write ceph configuration file: %v", err)
}
initVolumeMountCache(driverName, mountCacheDir, cachePersister)
// Use passed in instance ID, if provided for omap suffix naming
if instanceID != "" {
CSIInstanceID = instanceID
}
// Get an instance of the volume journal
volJournal = util.NewCSIVolumeJournal()
// Update keys with CSI instance suffix
volJournal.SetCSIDirectorySuffix(CSIInstanceID)
// Update namespace for storing keys into a specific namespace on RADOS, in the CephFS
// metadata pool
volJournal.SetNamespace(radosNamespace)
initVolumeMountCache(driverName, mountCacheDir)
if mountCacheDir != "" {
if err := remountCachedVolumes(); err != nil {
klog.Warningf("failed to remount cached volumes: %v", err)

37
pkg/cephfs/errors.go Normal file
View File

@ -0,0 +1,37 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
// ErrInvalidVolID is returned when a CSI passed VolumeID is not conformant to any known volume ID
// formats
type ErrInvalidVolID struct {
err error
}
func (e ErrInvalidVolID) Error() string {
return e.err.Error()
}
// ErrNonStaticVolume is returned when a volume is detected as not being
// statically provisioned
type ErrNonStaticVolume struct {
err error
}
func (e ErrNonStaticVolume) Error() string {
return e.err.Error()
}

136
pkg/cephfs/fsjournal.go Normal file
View File

@ -0,0 +1,136 @@
/*
Copyright 2019 The Ceph-CSI Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"github.com/ceph/ceph-csi/pkg/util"
"k8s.io/klog"
)
// volumeIdentifier structure contains an association between the CSI VolumeID to its subvolume
// name on the backing CephFS instance
type volumeIdentifier struct {
FsSubvolName string
VolumeID string
}
/*
checkVolExists checks to determine if passed in RequestName in volOptions exists on the backend.
**NOTE:** These functions manipulate the rados omaps that hold information regarding
volume names as requested by the CSI drivers. Hence, these need to be invoked only when the
respective CSI driver generated volume name based locks are held, as otherwise racy
access to these omaps may end up leaving them in an inconsistent state.
These functions also cleanup omap reservations that are stale. I.e when omap entries exist and
backing subvolumes are missing, or one of the omaps exist and the next is missing. This is
because, the order of omap creation and deletion are inverse of each other, and protected by the
request name lock, and hence any stale omaps are leftovers from incomplete transactions and are
hence safe to garbage collect.
*/
func checkVolExists(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
var (
vi util.CSIIdentifier
vid volumeIdentifier
)
cr, err := getAdminCredentials(secret)
if err != nil {
return nil, err
}
imageUUID, err := volJournal.CheckReservation(volOptions.Monitors, cr.id, cr.key,
volOptions.MetadataPool, volOptions.RequestName, "")
if err != nil {
return nil, err
}
if imageUUID == "" {
return nil, nil
}
vid.FsSubvolName = volJournal.NamingPrefix() + imageUUID
// TODO: size checks
// found a volume already available, process and return it!
vi = util.CSIIdentifier{
LocationID: volOptions.FscID,
EncodingVersion: volIDVersion,
ClusterID: volOptions.ClusterID,
ObjectUUID: imageUUID,
}
vid.VolumeID, err = vi.ComposeCSIID()
if err != nil {
return nil, err
}
klog.V(4).Infof("Found existing volume (%s) with subvolume name (%s) for request (%s)",
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
return &vid, nil
}
// undoVolReservation is a helper routine to undo a name reservation for a CSI VolumeName
func undoVolReservation(volOptions *volumeOptions, vid volumeIdentifier, secret map[string]string) error {
cr, err := getAdminCredentials(secret)
if err != nil {
return err
}
err = volJournal.UndoReservation(volOptions.Monitors, cr.id, cr.key, volOptions.MetadataPool,
vid.FsSubvolName, volOptions.RequestName)
return err
}
// reserveVol is a helper routine to request a UUID reservation for the CSI VolumeName and,
// to generate the volume identifier for the reserved UUID
func reserveVol(volOptions *volumeOptions, secret map[string]string) (*volumeIdentifier, error) {
var (
vi util.CSIIdentifier
vid volumeIdentifier
)
cr, err := getAdminCredentials(secret)
if err != nil {
return nil, err
}
imageUUID, err := volJournal.ReserveName(volOptions.Monitors, cr.id, cr.key,
volOptions.MetadataPool, volOptions.RequestName, "")
if err != nil {
return nil, err
}
vid.FsSubvolName = volJournal.NamingPrefix() + imageUUID
// generate the volume ID to return to the CO system
vi = util.CSIIdentifier{
LocationID: volOptions.FscID,
EncodingVersion: volIDVersion,
ClusterID: volOptions.ClusterID,
ObjectUUID: imageUUID,
}
vid.VolumeID, err = vi.ComposeCSIID()
if err != nil {
return nil, err
}
klog.V(4).Infof("Generated Volume ID (%s) and subvolume name (%s) for request name (%s)",
vid.VolumeID, vid.FsSubvolName, volOptions.RequestName)
return &vid, nil
}

View File

@ -16,6 +16,7 @@ type volumeMountCacheEntry struct {
DriverVersion string `json:"driverVersion"`
VolumeID string `json:"volumeID"`
Mounter string `json:"mounter"`
Secrets map[string]string `json:"secrets"`
StagingPath string `json:"stagingPath"`
TargetPaths map[string]bool `json:"targetPaths"`
@ -25,7 +26,6 @@ type volumeMountCacheEntry struct {
type volumeMountCacheMap struct {
volumes map[string]volumeMountCacheEntry
nodeCacheStore util.NodeCache
metadataStore util.CachePersister
}
var (
@ -34,10 +34,9 @@ var (
volumeMountCacheMtx sync.Mutex
)
func initVolumeMountCache(driverName string, mountCacheDir string, cachePersister util.CachePersister) {
func initVolumeMountCache(driverName string, mountCacheDir string) {
volumeMountCache.volumes = make(map[string]volumeMountCacheEntry)
volumeMountCache.metadataStore = cachePersister
volumeMountCache.nodeCacheStore.BasePath = mountCacheDir
volumeMountCache.nodeCacheStore.CacheDir = driverName
klog.Infof("mount-cache: name: %s, version: %s, mountCacheDir: %s", driverName, version, mountCacheDir)
@ -50,18 +49,19 @@ func remountCachedVolumes() error {
}
var remountFailCount, remountSuccCount int64
me := &volumeMountCacheEntry{}
ce := &controllerCacheEntry{}
err := volumeMountCache.nodeCacheStore.ForAll(volumeMountCachePrefix, me, func(identifier string) error {
volID := me.VolumeID
if err := volumeMountCache.metadataStore.Get(volID, ce); err != nil {
if err, ok := err.(*util.CacheEntryNotFound); ok {
klog.Infof("mount-cache: metadata not found, assuming the volume %s to be already deleted (%v)", volID, err)
if volOpts, vid, err := newVolumeOptionsFromVolID(me.VolumeID, nil, decodeCredentials(me.Secrets)); err != nil {
if err, ok := err.(util.ErrKeyNotFound); ok {
klog.Infof("mount-cache: image key not found, assuming the volume %s to be already deleted (%v)", volID, err)
if err := volumeMountCache.nodeCacheStore.Delete(genVolumeMountCacheFileName(volID)); err == nil {
klog.Infof("mount-cache: metadata not found, delete volume cache entry for volume %s", volID)
}
}
} else {
if err := mountOneCacheEntry(ce, me); err == nil {
// update Mounter from mount cache
volOpts.Mounter = me.Mounter
if err := mountOneCacheEntry(volOpts, vid, me); err == nil {
remountSuccCount++
volumeMountCache.volumes[me.VolumeID] = *me
klog.Infof("mount-cache: successfully remounted volume %s", volID)
@ -84,7 +84,7 @@ func remountCachedVolumes() error {
return nil
}
func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) error {
func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *volumeMountCacheEntry) error {
volumeMountCacheMtx.Lock()
defer volumeMountCacheMtx.Unlock()
@ -92,17 +92,16 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) err
err error
cr *credentials
)
volID := ce.VolumeID
volOptions := ce.VolOptions
volID := vid.VolumeID
if volOptions.ProvisionVolume {
volOptions.RootPath = getVolumeRootPathCeph(volID)
volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName))
cr, err = getAdminCredentials(decodeCredentials(me.Secrets))
if err != nil {
return err
}
var entity *cephEntity
entity, err = getCephUser(&volOptions, cr, volID)
entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
return err
}
@ -127,12 +126,12 @@ func mountOneCacheEntry(ce *controllerCacheEntry, me *volumeMountCacheEntry) err
}
if !isMnt {
m, err := newMounter(&volOptions)
m, err := newMounter(volOptions)
if err != nil {
klog.Errorf("mount-cache: failed to create mounter for volume %s: %v", volID, err)
return err
}
if err := m.mount(me.StagingPath, cr, &volOptions); err != nil {
if err := m.mount(me.StagingPath, cr, volOptions); err != nil {
klog.Errorf("mount-cache: failed to mount volume %s: %v", volID, err)
return err
}
@ -204,7 +203,7 @@ func (mc *volumeMountCacheMap) isEnable() bool {
return mc.nodeCacheStore.BasePath != ""
}
func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath string, secrets map[string]string) error {
func (mc *volumeMountCacheMap) nodeStageVolume(volID, stagingTargetPath, mounter string, secrets map[string]string) error {
if !mc.isEnable() {
return nil
}
@ -228,6 +227,7 @@ func (mc *volumeMountCacheMap) nodeStageVolume(volID string, stagingTargetPath s
me.Secrets = encodeCredentials(secrets)
me.StagingPath = stagingTargetPath
me.TargetPaths = lastTargetPaths
me.Mounter = mounter
me.CreateTime = time.Now()
volumeMountCache.volumes[volID] = me

View File

@ -80,6 +80,10 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi
// NodeStageVolume mounts the volume to a staging path on the node.
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
var (
volOptions *volumeOptions
vid *volumeIdentifier
)
if err := validateNodeStageVolumeRequest(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
@ -89,15 +93,26 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
stagingTargetPath := req.GetStagingTargetPath()
volID := volumeID(req.GetVolumeId())
volOptions, err := newVolumeOptions(req.GetVolumeContext(), req.GetSecrets())
volOptions, vid, err := newVolumeOptionsFromVolID(string(volID), req.GetVolumeContext(), req.GetSecrets())
if err != nil {
klog.Errorf("error reading volume options for volume %s: %v", volID, err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if _, ok := err.(ErrInvalidVolID); !ok {
return nil, status.Error(codes.Internal, err.Error())
}
if volOptions.ProvisionVolume {
// Dynamically provisioned volumes don't have their root path set, do it here
volOptions.RootPath = getVolumeRootPathCeph(volID)
// check for pre-provisioned volumes (plugin versions > 1.0.0)
volOptions, vid, err = newVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext())
if err != nil {
if _, ok := err.(ErrNonStaticVolume); !ok {
return nil, status.Error(codes.Internal, err.Error())
}
// check for volumes from plugin versions <= 1.0.0
volOptions, vid, err = newVolumeOptionsFromVersion1Context(string(volID), req.GetVolumeContext(),
req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
}
if err = createMountPoint(stagingTargetPath); err != nil {
@ -123,7 +138,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
}
// It's not, mount now
if err = ns.mount(volOptions, req); err != nil {
if err = ns.mount(volOptions, req, vid); err != nil {
return nil, err
}
@ -132,11 +147,11 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return &csi.NodeStageVolumeResponse{}, nil
}
func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error {
func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest, vid *volumeIdentifier) error {
stagingTargetPath := req.GetStagingTargetPath()
volID := volumeID(req.GetVolumeId())
cr, err := getCredentialsForVolume(volOptions, volID, req)
cr, err := getCredentialsForVolume(volOptions, volumeID(vid.FsSubvolName), req)
if err != nil {
klog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err)
return status.Error(codes.Internal, err.Error())
@ -154,7 +169,7 @@ func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequ
klog.Errorf("failed to mount volume %s: %v", volID, err)
return status.Error(codes.Internal, err.Error())
}
if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, req.GetSecrets()); err != nil {
if err := volumeMountCache.nodeStageVolume(req.GetVolumeId(), stagingTargetPath, volOptions.Mounter, req.GetSecrets()); err != nil {
klog.Warningf("mount-cache: failed to stage volume %s %s: %v", volID, stagingTargetPath, err)
}
return nil

View File

@ -42,10 +42,6 @@ func mustUnlock(m keymutex.KeyMutex, key string) {
}
}
func makeVolumeID(volName string) volumeID {
return volumeID("csi-cephfs-" + volName)
}
func execCommand(program string, args ...string) (stdout, stderr []byte, err error) {
var (
cmd = exec.Command(program, args...) // nolint: gosec
@ -72,6 +68,7 @@ func execCommandErr(program string, args ...string) error {
return err
}
//nolint: unparam
func execCommandJSON(v interface{}, program string, args ...string) error {
stdout, _, err := execCommand(program, args...)
if err != nil {

View File

@ -114,7 +114,7 @@ func newMounter(volOptions *volumeOptions) (volumeMounter, error) {
type fuseMounter struct{}
func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions) error {
args := [...]string{
args := []string{
mountPoint,
"-m", volOptions.Monitors,
"-c", util.CephConfigPath,
@ -123,6 +123,10 @@ func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions) er
"-o", "nonempty",
}
if volOptions.FsName != "" {
args = append(args, "--client_mds_namespace="+volOptions.FsName)
}
_, stderr, err := execCommand("ceph-fuse", args[:]...)
if err != nil {
return err
@ -166,12 +170,18 @@ func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions)
return err
}
return execCommandErr("mount",
args := []string{
"-t", "ceph",
fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath),
mountPoint,
"-o", fmt.Sprintf("name=%s,secret=%s", cr.id, cr.key),
)
}
optionsStr := fmt.Sprintf("name=%s,secret=%s", cr.id, cr.key)
if volOptions.FsName != "" {
optionsStr = optionsStr + fmt.Sprintf(",mds_namespace=%s", volOptions.FsName)
}
args = append(args, "-o", optionsStr)
return execCommandErr("mount", args[:]...)
}
func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions) error {

View File

@ -19,17 +19,22 @@ package cephfs
import (
"fmt"
"strconv"
"github.com/ceph/ceph-csi/pkg/util"
)
type volumeOptions struct {
Monitors string `json:"monitors"`
Pool string `json:"pool"`
RootPath string `json:"rootPath"`
RequestName string
Size int64
ClusterID string
FsName string
FscID int64
MetadataPool string
Monitors string `json:"monitors"`
Pool string `json:"pool"`
RootPath string `json:"rootPath"`
Mounter string `json:"mounter"`
ProvisionVolume bool `json:"provisionVolume"`
MonValueFromSecret string `json:"monValueFromSecret"`
}
func validateNonEmptyField(field, fieldName string) error {
@ -40,35 +45,18 @@ func validateNonEmptyField(field, fieldName string) error {
return nil
}
func (o *volumeOptions) validate() error {
if err := validateNonEmptyField(o.Monitors, "monitors"); err != nil {
if err = validateNonEmptyField(o.MonValueFromSecret, "monValueFromSecret"); err != nil {
return err
}
func extractOptionalOption(dest *string, optionLabel string, options map[string]string) error {
opt, ok := options[optionLabel]
if !ok {
// Option not found, no error as it is optional
return nil
}
if err := validateNonEmptyField(o.RootPath, "rootPath"); err != nil {
if !o.ProvisionVolume {
return err
}
} else {
if o.ProvisionVolume {
return fmt.Errorf("non-empty field rootPath is in conflict with provisionVolume=true")
}
}
if o.ProvisionVolume {
if err := validateNonEmptyField(o.Pool, "pool"); err != nil {
return err
}
}
if o.Mounter != "" {
if err := validateMounter(o.Mounter); err != nil {
return err
}
if err := validateNonEmptyField(opt, optionLabel); err != nil {
return err
}
*dest = opt
return nil
}
@ -78,6 +66,10 @@ func extractOption(dest *string, optionLabel string, options map[string]string)
return fmt.Errorf("missing required field %s", optionLabel)
}
if err := validateNonEmptyField(opt, optionLabel); err != nil {
return err
}
*dest = opt
return nil
}
@ -93,63 +85,252 @@ func validateMounter(m string) error {
return nil
}
func newVolumeOptions(volOptions, secret map[string]string) (*volumeOptions, error) {
func extractMounter(dest *string, options map[string]string) error {
if err := extractOptionalOption(dest, "mounter", options); err != nil {
return err
}
if *dest != "" {
if err := validateMounter(*dest); err != nil {
return err
}
}
return nil
}
func getMonsAndClusterID(options map[string]string) (string, string, error) {
clusterID, ok := options["clusterID"]
if !ok {
err := fmt.Errorf("clusterID must be set")
return "", "", err
}
if err := validateNonEmptyField(clusterID, "clusterID"); err != nil {
return "", "", err
}
monitors, err := util.Mons(csiConfigFile, clusterID)
if err != nil {
err = fmt.Errorf("failed to fetch monitor list using clusterID (%s)", clusterID)
return "", "", err
}
return monitors, clusterID, err
}
// newVolumeOptions generates a new instance of volumeOptions from the provided
// CSI request parameters
func newVolumeOptions(requestName string, size int64, volOptions, secret map[string]string) (*volumeOptions, error) {
var (
opts volumeOptions
err error
)
// extract mon from secret first
if err = extractOption(&opts.MonValueFromSecret, "monValueFromSecret", volOptions); err == nil {
mon := ""
if mon, err = getMonValFromSecret(secret); err == nil && len(mon) > 0 {
opts.Monitors = mon
}
}
if len(opts.Monitors) == 0 {
// if not set in secret, get it from parameter
if err = extractOption(&opts.Monitors, "monitors", volOptions); err != nil {
return nil, fmt.Errorf("either monitors or monValueFromSecret should be set")
}
}
if err = extractNewVolOpt(&opts, volOptions); err != nil {
opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(volOptions)
if err != nil {
return nil, err
}
if err = opts.validate(); err != nil {
if err = extractOption(&opts.Pool, "pool", volOptions); err != nil {
return nil, err
}
if err = extractMounter(&opts.Mounter, volOptions); err != nil {
return nil, err
}
if err = extractOption(&opts.FsName, "fsName", volOptions); err != nil {
return nil, err
}
opts.RequestName = requestName
opts.Size = size
cr, err := getAdminCredentials(secret)
if err != nil {
return nil, err
}
opts.FscID, err = getFscID(opts.Monitors, cr.id, cr.key, opts.FsName)
if err != nil {
return nil, err
}
opts.MetadataPool, err = getMetadataPool(opts.Monitors, cr.id, cr.key, opts.FsName)
if err != nil {
return nil, err
}
opts.ProvisionVolume = true
return &opts, nil
}
func extractNewVolOpt(opts *volumeOptions, volOpt map[string]string) error {
// newVolumeOptionsFromVolID generates a new instance of volumeOptions and volumeIdentifier
// from the provided CSI VolumeID
func newVolumeOptionsFromVolID(volID string, volOpt, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) {
var (
vi util.CSIIdentifier
volOptions volumeOptions
vid volumeIdentifier
)
// Decode the VolID first, to detect older volumes or pre-provisioned volumes
// before other errors
err := vi.DecomposeCSIID(volID)
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s) (%s)", err, volID)
return nil, nil, ErrInvalidVolID{err}
}
volOptions.ClusterID = vi.ClusterID
vid.FsSubvolName = volJournal.NamingPrefix() + vi.ObjectUUID
vid.VolumeID = volID
volOptions.FscID = vi.LocationID
if volOptions.Monitors, err = util.Mons(csiConfigFile, vi.ClusterID); err != nil {
err = fmt.Errorf("failed to fetch monitor list using clusterID (%s)", vi.ClusterID)
return nil, nil, err
}
cr, err := getAdminCredentials(secrets)
if err != nil {
return nil, nil, err
}
volOptions.FsName, err = getFsName(volOptions.Monitors, cr.id, cr.key, volOptions.FscID)
if err != nil {
return nil, nil, err
}
volOptions.MetadataPool, err = getMetadataPool(volOptions.Monitors, cr.id, cr.key,
volOptions.FsName)
if err != nil {
return nil, nil, err
}
volOptions.RequestName, _, err = volJournal.GetObjectUUIDData(volOptions.Monitors, cr.id, cr.key,
volOptions.MetadataPool, vi.ObjectUUID, false)
if err != nil {
return nil, nil, err
}
if volOpt != nil {
if err = extractOption(&volOptions.Pool, "pool", volOpt); err != nil {
return nil, nil, err
}
if err = extractMounter(&volOptions.Mounter, volOpt); err != nil {
return nil, nil, err
}
}
volOptions.RootPath = getVolumeRootPathCeph(volumeID(vid.FsSubvolName))
volOptions.ProvisionVolume = true
return &volOptions, &vid, nil
}
// newVolumeOptionsFromVersion1Context generates a new instance of volumeOptions and
// volumeIdentifier from the provided CSI volume context, if the provided context was
// for a volume created by version 1.0.0 (or prior) of the CSI plugin
func newVolumeOptionsFromVersion1Context(volID string, options, secrets map[string]string) (*volumeOptions, *volumeIdentifier, error) {
var (
opts volumeOptions
vid volumeIdentifier
provisionVolumeBool string
err error
)
if err = extractOption(&provisionVolumeBool, "provisionVolume", volOpt); err != nil {
return err
// Check if monitors is part of the options, that is an indicator this is an 1.0.0 volume
if err = extractOption(&opts.Monitors, "monitors", options); err != nil {
return nil, nil, err
}
// check if there are mon values in secret and if so override option retrieved monitors from
// monitors in the secret
mon, err := getMonValFromSecret(secrets)
if err == nil && len(mon) > 0 {
opts.Monitors = mon
}
if err = extractOption(&provisionVolumeBool, "provisionVolume", options); err != nil {
return nil, nil, err
}
if opts.ProvisionVolume, err = strconv.ParseBool(provisionVolumeBool); err != nil {
return fmt.Errorf("failed to parse provisionVolume: %v", err)
return nil, nil, fmt.Errorf("failed to parse provisionVolume: %v", err)
}
if opts.ProvisionVolume {
if err = extractOption(&opts.Pool, "pool", volOpt); err != nil {
return err
if err = extractOption(&opts.Pool, "pool", options); err != nil {
return nil, nil, err
}
opts.RootPath = getVolumeRootPathCeph(volumeID(volID))
} else {
if err = extractOption(&opts.RootPath, "rootPath", volOpt); err != nil {
return err
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
return nil, nil, err
}
}
// This field is optional, don't check for its presence
// nolint
// (skip errcheck and gosec as this is optional)
extractOption(&opts.Mounter, "mounter", volOpt)
return nil
if err = extractMounter(&opts.Mounter, options); err != nil {
return nil, nil, err
}
vid.FsSubvolName = volID
vid.VolumeID = volID
return &opts, &vid, nil
}
// newVolumeOptionsFromStaticVolume generates a new instance of volumeOptions and
// volumeIdentifier from the provided CSI volume context, if the provided context is
// detected to be a statically provisioned volume
func newVolumeOptionsFromStaticVolume(volID string, options map[string]string) (*volumeOptions, *volumeIdentifier, error) {
var (
opts volumeOptions
vid volumeIdentifier
staticVol bool
err error
)
val, ok := options["staticVolume"]
if !ok {
return nil, nil, ErrNonStaticVolume{err}
}
if staticVol, err = strconv.ParseBool(val); err != nil {
return nil, nil, fmt.Errorf("failed to parse preProvisionedVolume: %v", err)
}
if !staticVol {
return nil, nil, ErrNonStaticVolume{err}
}
// Volume is static, and ProvisionVolume carries bool stating if it was provisioned, hence
// store NOT of static boolean
opts.ProvisionVolume = !staticVol
opts.Monitors, opts.ClusterID, err = getMonsAndClusterID(options)
if err != nil {
return nil, nil, err
}
if err = extractOption(&opts.RootPath, "rootPath", options); err != nil {
return nil, nil, err
}
if err = extractOption(&opts.FsName, "fsName", options); err != nil {
return nil, nil, err
}
if err = extractMounter(&opts.Mounter, options); err != nil {
return nil, nil, err
}
vid.FsSubvolName = opts.RootPath
vid.VolumeID = volID
return &opts, &vid, nil
}

View File

@ -306,7 +306,7 @@ func genSnapFromSnapID(rbdSnap *rbdSnapshot, snapshotID string, credentials map[
return err
}
rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, rbdSnap.AdminID, key, vi.PoolID)
rbdSnap.Pool, err = util.GetPoolName(rbdSnap.Monitors, rbdSnap.AdminID, key, vi.LocationID)
if err != nil {
return err
}
@ -359,7 +359,7 @@ func genVolFromVolID(rbdVol *rbdVolume, volumeID string, credentials map[string]
}
rbdVol.Pool, err = util.GetPoolName(rbdVol.Monitors, rbdVol.AdminID, key,
vi.PoolID)
vi.LocationID)
if err != nil {
return err
}

View File

@ -123,17 +123,22 @@ func GetPoolName(monitors string, adminID string, key string, poolID int64) (str
}
// SetOMapKeyValue sets the given key and value into the provided Ceph omap name
func SetOMapKeyValue(monitors, adminID, key, poolName, oMapName, oMapKey, keyValue string) error {
func SetOMapKeyValue(monitors, adminID, key, poolName, namespace, oMapName, oMapKey, keyValue string) error {
// Command: "rados <options> setomapval oMapName oMapKey keyValue"
_, _, err := ExecCommand(
"rados",
args := []string{
"-m", monitors,
"--id", adminID,
"--key="+key,
"--key=" + key,
"-c", CephConfigPath,
"-p", poolName,
"setomapval", oMapName, oMapKey, keyValue)
"setomapval", oMapName, oMapKey, keyValue,
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
_, _, err := ExecCommand("rados", args[:]...)
if err != nil {
klog.Errorf("failed adding key (%s with value %s), to omap (%s) in "+
"pool (%s): (%v)", oMapKey, keyValue, oMapName, poolName, err)
@ -144,7 +149,7 @@ func SetOMapKeyValue(monitors, adminID, key, poolName, oMapName, oMapKey, keyVal
}
// GetOMapValue gets the value for the given key from the named omap
func GetOMapValue(monitors, adminID, key, poolName, oMapName, oMapKey string) (string, error) {
func GetOMapValue(monitors, adminID, key, poolName, namespace, oMapName, oMapKey string) (string, error) {
// Command: "rados <options> getomapval oMapName oMapKey <outfile>"
// No such key: replicapool/csi.volumes.directory.default/csi.volname
tmpFile, err := ioutil.TempFile("", "omap-get-")
@ -155,14 +160,20 @@ func GetOMapValue(monitors, adminID, key, poolName, oMapName, oMapKey string) (s
defer tmpFile.Close()
defer os.Remove(tmpFile.Name())
stdout, stderr, err := ExecCommand(
"rados",
args := []string{
"-m", monitors,
"--id", adminID,
"--key="+key,
"--key=" + key,
"-c", CephConfigPath,
"-p", poolName,
"getomapval", oMapName, oMapKey, tmpFile.Name())
"getomapval", oMapName, oMapKey, tmpFile.Name(),
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
stdout, stderr, err := ExecCommand("rados", args[:]...)
if err != nil {
// no logs, as attempting to check for non-existent key/value is done even on
// regular call sequences
@ -189,17 +200,22 @@ func GetOMapValue(monitors, adminID, key, poolName, oMapName, oMapKey string) (s
}
// RemoveOMapKey removes the omap key from the given omap name
func RemoveOMapKey(monitors, adminID, key, poolName, oMapName, oMapKey string) error {
func RemoveOMapKey(monitors, adminID, key, poolName, namespace, oMapName, oMapKey string) error {
// Command: "rados <options> rmomapkey oMapName oMapKey"
_, _, err := ExecCommand(
"rados",
args := []string{
"-m", monitors,
"--id", adminID,
"--key="+key,
"--key=" + key,
"-c", CephConfigPath,
"-p", poolName,
"rmomapkey", oMapName, oMapKey)
"rmomapkey", oMapName, oMapKey,
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
_, _, err := ExecCommand("rados", args[:]...)
if err != nil {
// NOTE: Missing omap key removal does not return an error
klog.Errorf("failed removing key (%s), from omap (%s) in "+
@ -212,17 +228,22 @@ func RemoveOMapKey(monitors, adminID, key, poolName, oMapName, oMapKey string) e
// CreateObject creates the object name passed in and returns ErrObjectExists if the provided object
// is already present in rados
func CreateObject(monitors, adminID, key, poolName, objectName string) error {
func CreateObject(monitors, adminID, key, poolName, namespace, objectName string) error {
// Command: "rados <options> create objectName"
stdout, _, err := ExecCommand(
"rados",
args := []string{
"-m", monitors,
"--id", adminID,
"--key="+key,
"--key=" + key,
"-c", CephConfigPath,
"-p", poolName,
"create", objectName)
"create", objectName,
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
stdout, _, err := ExecCommand("rados", args[:]...)
if err != nil {
klog.Errorf("failed creating omap (%s) in pool (%s): (%v)", objectName, poolName, err)
if strings.Contains(string(stdout), "error creating "+poolName+"/"+objectName+
@ -237,17 +258,22 @@ func CreateObject(monitors, adminID, key, poolName, objectName string) error {
// RemoveObject removes the entire omap name passed in and returns ErrObjectNotFound is provided omap
// is not found in rados
func RemoveObject(monitors, adminID, key, poolName, oMapName string) error {
func RemoveObject(monitors, adminID, key, poolName, namespace, oMapName string) error {
// Command: "rados <options> rm oMapName"
stdout, _, err := ExecCommand(
"rados",
args := []string{
"-m", monitors,
"--id", adminID,
"--key="+key,
"--key=" + key,
"-c", CephConfigPath,
"-p", poolName,
"rm", oMapName)
"rm", oMapName,
}
if namespace != "" {
args = append(args, "--namespace="+namespace)
}
stdout, _, err := ExecCommand("rados", args[:]...)
if err != nil {
klog.Errorf("failed removing omap (%s) in pool (%s): (%v)", oMapName, poolName, err)
if strings.Contains(string(stdout), "error removing "+poolName+">"+oMapName+

View File

@ -102,7 +102,7 @@ func GenerateVolID(monitors, id, key, pool, clusterID, objUUID string, volIDVers
// generate the volume ID to return to the CO system
vi := CSIIdentifier{
PoolID: poolID,
LocationID: poolID,
EncodingVersion: volIDVersion,
ClusterID: clusterID,
ObjectUUID: objUUID,

View File

@ -32,8 +32,8 @@ The CSI identifier is composed as elaborated in the comment against ComposeCSIID
DecomposeCSIID is the inverse of the same function.
The CSIIdentifier structure carries the following fields,
- PoolID: 64 bit integer of the pool that the volume belongs to, where the ID comes from Ceph pool
identifier for the corresponding pool name.
- LocationID: 64 bit integer identifier determining the location of the volume on the Ceph cluster.
It is the ID of the poolname or fsname, for RBD or CephFS backed volumes respectively.
- EncodingVersion: Carries the version number of the encoding scheme used to encode the CSI ID,
and is preserved for any future proofing w.r.t changes in the encoding scheme, and to retain
ability to parse backward compatible encodings.
@ -43,7 +43,7 @@ The CSIIdentifier structure carries the following fields,
corresponds to this CSI ID.
*/
type CSIIdentifier struct {
PoolID int64 // TODO: Name appropriately when reused for CephFS
LocationID int64
EncodingVersion uint16
ClusterID string
ObjectUUID string
@ -87,7 +87,7 @@ func (ci CSIIdentifier) ComposeCSIID() (string, error) {
binary.BigEndian.PutUint16(buf16, uint16(len(ci.ClusterID)))
clusterIDLength := hex.EncodeToString(buf16)
binary.BigEndian.PutUint64(buf64, uint64(ci.PoolID))
binary.BigEndian.PutUint64(buf64, uint64(ci.LocationID))
poolIDEncodedHex := hex.EncodeToString(buf64)
return strings.Join([]string{versionEncodedHex, clusterIDLength, ci.ClusterID,
@ -136,7 +136,7 @@ func (ci *CSIIdentifier) DecomposeCSIID(composedCSIID string) (err error) {
if err != nil {
return err
}
ci.PoolID = int64(binary.BigEndian.Uint64(buf64))
ci.LocationID = int64(binary.BigEndian.Uint64(buf64))
// 16 for poolID encoding and 1 for '-' separator
bytesToProcess -= 17
nextFieldStartIdx = nextFieldStartIdx + 17

View File

@ -33,7 +33,7 @@ type testTuple struct {
var testData = []testTuple{
{
vID: CSIIdentifier{
PoolID: 0xffff,
LocationID: 0xffff,
EncodingVersion: 0xffff,
ClusterID: "01616094-9d93-4178-bf45-c7eac19e8b15",
ObjectUUID: "00000000-1111-2222-bbbb-cacacacacaca",

View File

@ -114,6 +114,9 @@ type CSIJournal struct {
// volume name prefix for naming on Ceph rbd or FS, suffix is a uuid generated per volume
namingPrefix string
// namespace in which the RADOS objects are stored, default is no namespace
namespace string
}
// CSIVolumeJournal returns an instance of volume keys
@ -125,6 +128,7 @@ func NewCSIVolumeJournal() *CSIJournal {
csiNameKey: "csi.volname",
namingPrefix: "csi-vol-",
cephSnapSourceKey: "",
namespace: "",
}
}
@ -137,6 +141,7 @@ func NewCSISnapshotJournal() *CSIJournal {
csiNameKey: "csi.snapname",
namingPrefix: "csi-snap-",
cephSnapSourceKey: "csi.source",
namespace: "",
}
}
@ -150,6 +155,11 @@ func (cj *CSIJournal) SetCSIDirectorySuffix(suffix string) {
cj.csiDirectory = cj.csiDirectory + "." + suffix
}
// SetNamespace sets the namespace in which all RADOS objects would be created
func (cj *CSIJournal) SetNamespace(ns string) {
cj.namespace = ns
}
/*
CheckReservation checks if given request name contains a valid reservation
- If there is a valid reservation, then the corresponding UUID for the volume/snapshot is returned
@ -177,7 +187,7 @@ func (cj *CSIJournal) CheckReservation(monitors, id, key, pool, reqName, parentN
}
// check if request name is already part of the directory omap
objUUID, err := GetOMapValue(monitors, id, key, pool, cj.csiDirectory,
objUUID, err := GetOMapValue(monitors, id, key, pool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName)
if err != nil {
// error should specifically be not found, for volume to be absent, any other error
@ -237,7 +247,7 @@ func (cj *CSIJournal) UndoReservation(monitors, id, key, pool, volName, reqName
// delete volume UUID omap (first, inverse of create order)
// TODO: Check cases where volName can be empty, and we need to just cleanup the reqName
imageUUID := strings.TrimPrefix(volName, cj.namingPrefix)
err := RemoveObject(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix+imageUUID)
err := RemoveObject(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+imageUUID)
if err != nil {
if _, ok := err.(ErrObjectNotFound); !ok {
klog.Errorf("failed removing oMap %s (%s)", cj.cephUUIDDirectoryPrefix+imageUUID, err)
@ -246,7 +256,7 @@ func (cj *CSIJournal) UndoReservation(monitors, id, key, pool, volName, reqName
}
// delete the request name key (last, inverse of create order)
err = RemoveOMapKey(monitors, id, key, pool, cj.csiDirectory,
err = RemoveOMapKey(monitors, id, key, pool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName)
if err != nil {
klog.Errorf("failed removing oMap key %s (%s)", cj.csiNameKeyPrefix+reqName, err)
@ -259,7 +269,7 @@ func (cj *CSIJournal) UndoReservation(monitors, id, key, pool, volName, reqName
// reserveOMapName creates an omap with passed in oMapNamePrefix and a generated <uuid>.
// It ensures generated omap name does not already exist and if conflicts are detected, a set
// number of retires with newer uuids are attempted before returning an error
func reserveOMapName(monitors, id, key, pool, oMapNamePrefix string) (string, error) {
func reserveOMapName(monitors, id, key, pool, namespace, oMapNamePrefix string) (string, error) {
var iterUUID string
maxAttempts := 5
@ -268,7 +278,7 @@ func reserveOMapName(monitors, id, key, pool, oMapNamePrefix string) (string, er
// generate a uuid for the image name
iterUUID = uuid.NewUUID().String()
err := CreateObject(monitors, id, key, pool, oMapNamePrefix+iterUUID)
err := CreateObject(monitors, id, key, pool, namespace, oMapNamePrefix+iterUUID)
if err != nil {
if _, ok := err.(ErrObjectExists); ok {
attempt++
@ -315,15 +325,15 @@ func (cj *CSIJournal) ReserveName(monitors, id, key, pool, reqName, parentName s
// NOTE: If any service loss occurs post creation of the UUID directory, and before
// setting the request name key (csiNameKey) to point back to the UUID directory, the
// UUID directory key will be leaked
volUUID, err := reserveOMapName(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix)
volUUID, err := reserveOMapName(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix)
if err != nil {
return "", err
}
// Create request name (csiNameKey) key in csiDirectory and store the UUId based
// volume name into it
err = SetOMapKeyValue(monitors, id, key, pool, cj.csiDirectory, cj.csiNameKeyPrefix+reqName,
volUUID)
err = SetOMapKeyValue(monitors, id, key, pool, cj.namespace, cj.csiDirectory,
cj.csiNameKeyPrefix+reqName, volUUID)
if err != nil {
return "", err
}
@ -339,7 +349,7 @@ func (cj *CSIJournal) ReserveName(monitors, id, key, pool, reqName, parentName s
}()
// Update UUID directory to store CSI request name
err = SetOMapKeyValue(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix+volUUID,
err = SetOMapKeyValue(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.csiNameKey, reqName)
if err != nil {
return "", err
@ -347,7 +357,7 @@ func (cj *CSIJournal) ReserveName(monitors, id, key, pool, reqName, parentName s
if snapSource {
// Update UUID directory to store source volume UUID in case of snapshots
err = SetOMapKeyValue(monitors, id, key, pool, cj.cephUUIDDirectoryPrefix+volUUID,
err = SetOMapKeyValue(monitors, id, key, pool, cj.namespace, cj.cephUUIDDirectoryPrefix+volUUID,
cj.cephSnapSourceKey, parentName)
if err != nil {
return "", err
@ -373,14 +383,14 @@ func (cj *CSIJournal) GetObjectUUIDData(monitors, id, key, pool, objectUUID stri
}
// TODO: fetch all omap vals in one call, than make multiple listomapvals
requestName, err := GetOMapValue(monitors, id, key, pool,
requestName, err := GetOMapValue(monitors, id, key, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.csiNameKey)
if err != nil {
return "", "", err
}
if snapSource {
sourceName, err = GetOMapValue(monitors, id, key, pool,
sourceName, err = GetOMapValue(monitors, id, key, pool, cj.namespace,
cj.cephUUIDDirectoryPrefix+objectUUID, cj.cephSnapSourceKey)
if err != nil {
return "", "", err