Merge pull request #30 from gman0/wip-cephfs

cephfs CSI plugin
This commit is contained in:
Serguei Bezverkhi 2018-04-20 10:46:24 -04:00 committed by GitHub
commit 2da9522fe4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 2301 additions and 26 deletions

3
.gitignore vendored
View File

@ -4,7 +4,8 @@
/_output
# docker build
/deploy/docker/rbdplugin
/deploy/rbd/docker/rbdplugin
/deploy/cephfs/docker/cephfsplugin
# rbdplugin executable
rbdplugin

View File

@ -14,10 +14,13 @@
.PHONY: all rbdplugin
IMAGE_NAME=quay.io/cephcsi/rbdplugin
IMAGE_VERSION=v0.2.0
RBD_IMAGE_NAME=quay.io/cephcsi/rbdplugin
RBD_IMAGE_VERSION=v0.2.0
all: rbdplugin
CEPHFS_IMAGE_NAME=quay.io/cephcsi/cephfsplugin
CEPHFS_IMAGE_VERSION=v0.2.0
all: rbdplugin cephfsplugin
test:
go test github.com/ceph/ceph-csi/pkg/... -cover
@ -27,11 +30,25 @@ rbdplugin:
if [ ! -d ./vendor ]; then dep ensure; fi
CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o _output/rbdplugin ./rbd
container: rbdplugin
docker build -t $(IMAGE_NAME):$(IMAGE_VERSION) .
rbdplugin-container: rbdplugin
cp _output/rbdplugin deploy/rbd/docker
docker build -t $(IMAGE_NAME):$(IMAGE_VERSION) deploy/rbd/docker
cephfsplugin:
if [ ! -d ./vendor ]; then dep ensure; fi
CGO_ENABLED=0 GOOS=linux go build -a -ldflags '-extldflags "-static"' -o _output/cephfsplugin ./cephfs
cephfsplugin-container: cephfsplugin
cp _output/cephfsplugin deploy/cephfs/docker
docker build -t $(CEPHFS_IMAGE_NAME):$(CEPHFS_IMAGE_VERSION) deploy/cephfs/docker
push-rbdplugin-container: rbdplugin-container
docker push $(RBD_IMAGE_NAME):$(RBD_IMAGE_VERSION)
push-cephfsplugin-container: cephfsplugin-container
docker push $(CEPHFS_IMAGE_NAME):$(CEPHFS_IMAGE_VERSION)
push-container: container
docker push $(IMAGE_NAME):$(IMAGE_VERSION)
clean:
go clean -r -x
-rm -rf _output
rm -f deploy/rbd/docker/rbdplugin
rm -f deploy/cephfs/docker/rbdplugin

174
README.md
View File

@ -2,21 +2,22 @@
## Overview
RBD CSI plugin implements an interface between CSI enabled Container
Ceph CSI plugins implement an interface between CSI enabled Container
Orchestrator and CEPH cluster. It allows dynamically provision CEPH
volumes and attach it to workloads.
Current implementation of CSI RBD plugin was tested in Kubernetes environment,
but its code does not rely on any Kubernetes specific calls (WIP to make it k8s agnostic)
Current implementation of Ceph CSI plugins was tested in Kubernetes environment (requires Kubernetes 1.10+),
but the code does not rely on any Kubernetes specific calls (WIP to make it k8s agnostic)
and should be able to run with any CSI enabled CO (Containers Orchestration).
[Container Storage Interface (CSI)](https://github.com/container-storage-interface/) driver, provisioner, and attacher for Ceph RBD and CephFS
## RBD Plugin
An RBD CSI plugin is available to help simplify storage management.
Once user creates PVC with the reference to a RBD storage class, rbd image and
corresponding PV object gets dynamically created and becomes ready to be used by
workloads.
[Container Storage Interface (CSI)](https://github.com/container-storage-interface/) driver, provisioner, and attacher for Ceph RBD and CephFS
## RBD Plugin
### Configuration Requirements
* Secret object with the authentication key for ceph cluster
@ -44,7 +45,7 @@ $ make rbdplugin
To build a container:
```
$ make container
$ make container-rbdplugin
```
By running:
```
@ -73,29 +74,29 @@ Enable features `MountPropagation=true,CSIPersistentVolume=true` and runtime con
#### Step 1: Create Secret
```
$ kubectl create -f ./deploy/kubernetes/rbd-secrets.yaml
$ kubectl create -f ./deploy/rbd/kubernetes/rbd-secrets.yaml
```
**Important:** rbd-secrets.yaml, must be customized to match your ceph environment.
#### Step 2: Create StorageClass
```
$ kubectl create -f ./deploy/kubernetes/rbd-storage-class.yaml
$ kubectl create -f ./deploy/rbd/kubernetes/rbd-storage-class.yaml
```
**Important:** rbd-storage-class.yaml, must be customized to match your ceph environment.
#### Step 3: Start CSI CEPH RBD plugin
```
$ kubectl create -f ./deploy/kubernetes/rbdplugin.yaml
$ kubectl create -f ./deploy/rbd/kubernetes/rbdplugin.yaml
```
#### Step 4: Start CSI External Attacher
```
$ kubectl create -f ./deploy/kubernetes/csi-attacher.yaml
$ kubectl create -f ./deploy/rbd/kubernetes/csi-attacher.yaml
```
#### Step 5: Start CSI External Provisioner
```
$ kubectl create -f ./deploy/kubernetes/csi-provisioner.yaml
$ kubectl create -f ./deploy/rbd/kubernetes/csi-provisioner.yaml
```
**Important:** Deployment yaml files includes required Service Account definitions and
required RBAC rules.
@ -116,7 +117,7 @@ default csi-provisioner-0 1/1 Runn
#### Step 7: Create PVC
```
$ kubectl create -f ./deploy/kubernetes/pvc.yaml
$ kubectl create -f ./deploy/rbd/kubernetes/pvc.yaml
```
#### Step 8: Check status of provisioner PV
@ -152,12 +153,155 @@ Source:
#### Step 9: Create a test pod
```bash
# kubectl create -f ./deploy/pod.yaml
# kubectl create -f ./deploy/rbd/pod.yaml
```
## CephFS plugin
TODO
A CephFS CSI plugin is available to help simplify storage management.
Once user creates PVC with the reference to a CephFS CSI storage class, corresponding
PV object gets dynamically created and becomes ready to be used by workloads.
### Configuration Requirements
* Secret object with the authentication user ID `userID` and key `userKey` for ceph cluster
* StorageClass with csi-cephfsplugin (default CSI CephFS plugin name) as a provisioner name
and information about ceph cluster (monitors, pool, rootPath, ...)
* Service Accounts with required RBAC permissions
Mounter options: specifies whether to use FUSE or ceph kernel client for mounting. By default, the plugin will probe for `ceph-fuse`. If this fails, the kernel client will be used instead. Command line argument `--volumemounter=[fuse|kernel]` overrides this behaviour.
StorageClass options:
* `provisionVolume: "bool"`: if set to true, the plugin will provision and mount a new volume. Admin credentials `adminID` and `adminKey` are required in the secret object, since this also creates a dedicated RADOS user used for mounting the volume.
* `rootPath: /path-in-cephfs`: required field if `provisionVolume=true`. CephFS is mounted from the specified path. User credentials `userID` and `userKey` are required in the secret object.
* `mounter: "kernel" or "fuse"`: (optional) per-StorageClass mounter configuration. Overrides the default mounter.
### Feature Status
### 1.10: Alpha
**Important:** `CSIPersistentVolume` and `MountPropagation`
[feature gates must be enabled starting in 1.9](#enabling-the-alpha-feature-gates).
Also API server must run with running config set to: `storage.k8s.io/v1alpha1`
* `kube-apiserver` must be launched with `--feature-gates=CSIPersistentVolume=true,MountPropagation=true`
and `--runtime-config=storage.k8s.io/v1alpha1=true`
* `kube-controller-manager` must be launched with `--feature-gates=CSIPersistentVolume=true`
* `kubelet` must be launched with `--feature-gates=CSIPersistentVolume=true,MountPropagation=true`
### Compiling
CSI CephFS plugin can be compiled in a form of a binary file or in a form of a container. When compiled
as a binary file, it gets stored in \_output folder with the name cephfsplugin. When compiled as a container,
the resulting image is stored in a local docker's image store.
To compile just a binary file:
```
$ make cephfsplugin
```
To build a container:
```
$ make cephfsplugin-container
```
By running:
```
$ docker images | grep cephfsplugin
```
You should see the following line in the output:
```
quay.io/cephcsi/cephfsplugin v0.2.0 79482e644593 4 minutes ago 305MB
```
### Testing
#### Prerequisite
##### Enable Mount Propagation in Docker
Comment out `MountFlags=slave` in docker systemd service then restart docker service.
```
# systemctl daemon-reload
# systemctl restart docker
```
##### Enable Kubernetes Feature Gates
Enable features `MountPropagation=true,CSIPersistentVolume=true` and runtime config `storage.k8s.io/v1alpha1=true`
#### Step 1: Create Secret
```
$ kubectl create -f ./deploy/cephfs/kubernetes/secret.yaml
```
**Important:** secret.yaml, must be customized to match your ceph environment.
#### Step 2: Create StorageClass
```
$ kubectl create -f ./deploy/cephfs/kubernetes/cephfs-storage-class.yaml
```
**Important:** cephfs-storage-class.yaml, must be customized to match your ceph environment.
#### Step 3: Start CSI CEPH CephFS plugin
```
$ kubectl create -f ./deploy/cephfs/kubernetes/cephfsplugin.yaml
```
#### Step 4: Start CSI External Attacher
```
$ kubectl create -f ./deploy/cephfs/kubernetes/csi-attacher.yaml
```
#### Step 5: Start CSI External Provisioner
```
$ kubectl create -f ./deploy/cephfs/kubernetes/csi-provisioner.yaml
```
**Important:** Deployment yaml files includes required Service Account definitions and
required RBAC rules.
#### Step 6: Check status of CSI CephFS plugin
```
$ kubectl get pods | grep csi
csi-attacher-0 1/1 Running 0 6m
csi-cephfsplugin-hmqpk 2/2 Running 0 6m
csi-provisioner-0 1/1 Running 0 6m
```
#### Step 7: Create PVC
```
$ kubectl create -f ./deploy/cephfs/kubernetes/pvc.yaml
```
#### Step 8: Check status of provisioner PV
```
$ kubectl get pv
NAME CAPACITY ACCESS MODES RECLAIM POLICY STATUS CLAIM STORAGECLASS REASON AGE
kubernetes-dynamic-pv-715cef0b30d811e8 5Gi RWX Delete Bound default/csi-cephfs-pvc csi-cephfs 5s
```
```
$ kubectl describe pv kubernetes-dynamic-pv-715cef0b30d811e8
Name: kubernetes-dynamic-pv-715cef0b30d811e8
Labels: <none>
Annotations: pv.kubernetes.io/provisioned-by=csi-cephfsplugin
StorageClass: csi-cephfs
Status: Bound
Claim: default/csi-cephfs-pvc
Reclaim Policy: Delete
Access Modes: RWX
Capacity: 5Gi
Message:
Source:
Type: CSI (a Container Storage Interface (CSI) volume source)
Driver: ReadOnly: %v
VolumeHandle: csi-cephfsplugin
%!(EXTRA string=csi-cephfs-7182b779-30d8-11e8-bf01-5254007d7491, bool=false)Events: <none>
```
#### Step 9: Create a test pod
```
$ kubectl create -f ./deploy/cephfs/kubernetes/pod.yaml
```
## Troubleshooting

61
cephfs/main.go Normal file
View File

@ -0,0 +1,61 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"flag"
"os"
"path"
// "github.com/ceph/ceph-csi/pkg/cephfs"
"github.com/gman0/ceph-csi/pkg/cephfs"
"github.com/golang/glog"
)
func init() {
flag.Set("logtostderr", "true")
}
var (
endpoint = flag.String("endpoint", "unix://tmp/csi.sock", "CSI endpoint")
driverName = flag.String("drivername", "csi-cephfsplugin", "name of the driver")
nodeId = flag.String("nodeid", "", "node id")
volumeMounter = flag.String("volumemounter", "", "default volume mounter (possible options are 'kernel', 'fuse')")
)
func main() {
flag.Parse()
if err := createPersistentStorage(path.Join(cephfs.PluginFolder, "controller")); err != nil {
glog.Errorf("failed to create persistent storage for controller: %v", err)
os.Exit(1)
}
if err := createPersistentStorage(path.Join(cephfs.PluginFolder, "node")); err != nil {
glog.Errorf("failed to create persistent storage for node: %v", err)
os.Exit(1)
}
driver := cephfs.NewCephFSDriver()
driver.Run(*driverName, *nodeId, *endpoint, *volumeMounter)
os.Exit(0)
}
func createPersistentStorage(persistentStoragePath string) error {
return os.MkdirAll(persistentStoragePath, os.FileMode(0755))
}

View File

@ -2,5 +2,5 @@
if [ "${TRAVIS_BRANCH}" == "master" ] && [ "${TRAVIS_PULL_REQUEST}" == "false" ]; then
docker login -u "${QUAY_IO_USERNAME}" -p "${QUAY_IO_PASSWORD}" quay.io
make push-container
make push-cephfs-container
fi

6
deploy-rbd.sh Executable file
View File

@ -0,0 +1,6 @@
#!/bin/bash
if [ "${TRAVIS_BRANCH}" == "master" ] && [ "${TRAVIS_PULL_REQUEST}" == "false" ]; then
docker login -u "${QUAY_IO_USERNAME}" -p "${QUAY_IO_PASSWORD}" quay.io
make push-rbdplugin-container
fi

View File

@ -0,0 +1,18 @@
FROM ubuntu:16.04
LABEL maintainers="Kubernetes Authors"
LABEL description="CephFS CSI Plugin"
ENV CEPH_VERSION "luminous"
RUN apt-get update && apt-get install -y wget && \
wget -q -O- 'https://download.ceph.com/keys/release.asc' | apt-key add - && \
echo "deb http://download.ceph.com/debian-$CEPH_VERSION/ xenial main" | tee /etc/apt/sources.list.d/ceph-$CEPH_VERSION.list && \
apt-get update && apt-get install -y kmod ceph-common ceph-fuse attr --no-install-recommends && \
rm -rf /var/lib/apt/lists/*
COPY cephfsplugin /cephfsplugin
RUN chmod +x /cephfsplugin && \
mkdir -p /var/log/ceph
ENTRYPOINT ["/cephfsplugin"]

View File

@ -0,0 +1,28 @@
apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: csi-cephfs
provisioner: csi-cephfsplugin
parameters:
monitors: mon1:port,mon2:port
# If set to true, a new volume will be created along with a RADOS user - this requires admin access.
# If set to false, it is assumed the volume already exists and the user is expected to provide
# a rootPath to a cephfs volume and user credentials.
provisionVolume: "true"
# Required if provisionVolume is set to false
# rootPath: /path-in-cephfs
# Required if provisionVolume is set to true
# pool: cephfs_data
# The secret has to contain user and/or admin credentials.
csiProvisionerSecretName: csi-cephfs-secret
csiProvisionerSecretNameSpace: default
# (optional) The driver can use either ceph-fuse (fuse) or ceph kernel client (kernel)
# If left out, default volume mounter will be used - this is determined by probing for ceph-fuse
# or by setting the default mounter explicitly via --volumemounter command-line argument.
# mounter: kernel
reclaimPolicy: Delete

View File

@ -0,0 +1,129 @@
# This YAML defines all API objects to create RBAC roles for csi node plugin.
apiVersion: v1
kind: ServiceAccount
metadata:
name: csi-cephfsplugin
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: csi-cephfsplugin
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "update"]
- apiGroups: [""]
resources: ["namespaces"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["volumeattachments"]
verbs: ["get", "list", "watch", "update"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: csi-cephfsplugin
subjects:
- kind: ServiceAccount
name: csi-cephfsplugin
namespace: default
roleRef:
kind: ClusterRole
name: csi-cephfsplugin
apiGroup: rbac.authorization.k8s.io
---
# This YAML file contains driver-registrar & csi driver nodeplugin API objects,
# which are necessary to run csi nodeplugin for cephfs.
kind: DaemonSet
apiVersion: apps/v1beta2
metadata:
name: csi-cephfsplugin
spec:
selector:
matchLabels:
app: csi-cephfsplugin
template:
metadata:
labels:
app: csi-cephfsplugin
spec:
serviceAccount: csi-cephfsplugin
hostNetwork: true
containers:
- name: driver-registrar
image: quay.io/k8scsi/driver-registrar:v0.2.0
args:
- "--v=5"
- "--csi-address=$(ADDRESS)"
env:
- name: ADDRESS
value: /var/lib/kubelet/plugins/csi-cephfsplugin/csi.sock
- name: KUBE_NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
volumeMounts:
- name: socket-dir
mountPath: /var/lib/kubelet/plugins/csi-cephfsplugin
- name: csi-cephfsplugin
securityContext:
privileged: true
capabilities:
add: ["SYS_ADMIN"]
allowPrivilegeEscalation: true
image: quay.io/cephcsi/cephfsplugin:v0.2.0
args :
- "--nodeid=$(NODE_ID)"
- "--endpoint=$(CSI_ENDPOINT)"
- "--v=5"
- "--drivername=csi-cephfsplugin"
env:
- name: NODE_ID
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: CSI_ENDPOINT
value: unix://var/lib/kubelet/plugins/csi-cephfsplugin/csi.sock
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: plugin-dir
mountPath: /var/lib/kubelet/plugins/csi-cephfsplugin
- name: pods-mount-dir
mountPath: /var/lib/kubelet/pods
mountPropagation: "Bidirectional"
- mountPath: /sys
name: host-sys
- name: lib-modules
mountPath: /lib/modules
readOnly: true
- name: host-dev
mountPath: /dev
volumes:
- name: plugin-dir
hostPath:
path: /var/lib/kubelet/plugins/csi-cephfsplugin
type: DirectoryOrCreate
- name: pods-mount-dir
hostPath:
path: /var/lib/kubelet/pods
type: Directory
- name: socket-dir
hostPath:
path: /var/lib/kubelet/plugins/csi-cephfsplugin
type: DirectoryOrCreate
- name: host-sys
hostPath:
path: /sys
- name: lib-modules
hostPath:
path: /lib/modules
- name: host-dev
hostPath:
path: /dev

View File

@ -0,0 +1,87 @@
# This YAML file contains RBAC API objects,
# which are necessary to run external csi attacher for cinder.
apiVersion: v1
kind: ServiceAccount
metadata:
name: csi-attacher
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: external-attacher-runner
rules:
- apiGroups: [""]
resources: ["events"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list", "watch"]
- apiGroups: ["storage.k8s.io"]
resources: ["volumeattachments"]
verbs: ["get", "list", "watch", "update"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: csi-attacher-role
subjects:
- kind: ServiceAccount
name: csi-attacher
namespace: default
roleRef:
kind: ClusterRole
name: external-attacher-runner
apiGroup: rbac.authorization.k8s.io
---
kind: Service
apiVersion: v1
metadata:
name: csi-attacher
labels:
app: csi-attacher
spec:
selector:
app: csi-attacher
ports:
- name: dummy
port: 12345
---
kind: StatefulSet
apiVersion: apps/v1beta1
metadata:
name: csi-attacher
spec:
serviceName: "csi-attacher"
replicas: 1
template:
metadata:
labels:
app: csi-attacher
spec:
serviceAccount: csi-attacher
containers:
- name: csi-attacher
image: quay.io/k8scsi/csi-attacher:v0.2.0
args:
- "--v=5"
- "--csi-address=$(ADDRESS)"
env:
- name: ADDRESS
value: /var/lib/kubelet/plugins/csi-cephfsplugin/csi.sock
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
mountPath: /var/lib/kubelet/plugins/csi-cephfsplugin
volumes:
- name: socket-dir
hostPath:
path: /var/lib/kubelet/plugins/csi-cephfsplugin
type: DirectoryOrCreate

View File

@ -0,0 +1,97 @@
# This YAML file contains all API objects that are necessary to run external
# CSI provisioner.
#
# In production, this needs to be in separate files, e.g. service account and
# role and role binding needs to be created once, while stateful set may
# require some tuning.
#
# In addition, mock CSI driver is hardcoded as the CSI driver.
apiVersion: v1
kind: ServiceAccount
metadata:
name: csi-provisioner
---
kind: ClusterRole
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: external-provisioner-runner
rules:
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["persistentvolumes"]
verbs: ["get", "list", "watch", "create", "delete"]
- apiGroups: [""]
resources: ["persistentvolumeclaims"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["list", "watch", "create", "update", "patch"]
---
kind: ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
metadata:
name: csi-provisioner-role
subjects:
- kind: ServiceAccount
name: csi-provisioner
namespace: default
roleRef:
kind: ClusterRole
name: external-provisioner-runner
apiGroup: rbac.authorization.k8s.io
---
kind: Service
apiVersion: v1
metadata:
name: csi-provisioner
labels:
app: csi-provisioner
spec:
selector:
app: csi-provisioner
ports:
- name: dummy
port: 12345
---
kind: StatefulSet
apiVersion: apps/v1beta1
metadata:
name: csi-provisioner
spec:
serviceName: "csi-provisioner"
replicas: 1
template:
metadata:
labels:
app: csi-provisioner
spec:
serviceAccount: csi-provisioner
containers:
- name: csi-provisioner
image: quay.io/k8scsi/csi-provisioner:v0.2.0
args:
- "--provisioner=csi-cephfsplugin"
- "--csi-address=$(ADDRESS)"
- "--v=5"
env:
- name: ADDRESS
value: /var/lib/kubelet/plugins/csi-cephfsplugin/csi.sock
imagePullPolicy: "IfNotPresent"
volumeMounts:
- name: socket-dir
mountPath: /var/lib/kubelet/plugins/csi-cephfsplugin
volumes:
- name: socket-dir
hostPath:
path: /var/lib/kubelet/plugins/csi-cephfsplugin
type: DirectoryOrCreate

View File

@ -0,0 +1,7 @@
#!/bin/bash
objects=(cephfs-storage-class cephfsplugin csi-attacher csi-provisioner)
for obj in ${objects[@]}; do
kubectl create -f "./$obj.yaml"
done

View File

@ -0,0 +1,4 @@
#!/bin/sh
kubectl create -f ./pvc.yaml
kubectl create -f ./pod.yaml

View File

@ -0,0 +1,3 @@
#!/bin/sh
kubectl exec -it $(kubectl get pods -l app=csi-cephfsplugin -o=name | head -n 1 | cut -f2 -d"/") -c csi-cephfsplugin bash

View File

@ -0,0 +1,3 @@
#!/bin/sh
kubectl logs $(kubectl get pods -l app=csi-cephfsplugin -o=name | head -n 1) -c csi-cephfsplugin

View File

@ -0,0 +1,17 @@
apiVersion: v1
kind: Pod
metadata:
name: web-server
spec:
containers:
- name: web-server
image: nginx
volumeMounts:
- mountPath: /var/lib/www/html
name: mypvc
volumes:
- name: mypvc
persistentVolumeClaim:
claimName: csi-cephfs-pvc
readOnly: false

View File

@ -0,0 +1,11 @@
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: csi-cephfs-pvc
spec:
accessModes:
- ReadWriteMany
resources:
requests:
storage: 5Gi
storageClassName: csi-cephfs

View File

@ -0,0 +1,13 @@
apiVersion: v1
kind: Secret
metadata:
name: csi-cephfs-secret
namespace: default
data:
# Required if provisionVolume is set to false
userID: userID-encoded-by-base64
userKey: userKey-encoded-by-base64
# Required if provisionVolume is set to true
adminID: adminID-encoded-by-base64
adminKey: adminKey-encoded-by-base64

View File

@ -0,0 +1,7 @@
#!/bin/bash
objects=(cephfsplugin csi-provisioner csi-attacher cephfs-storage-class)
for obj in ${objects[@]}; do
kubectl delete -f "./$obj.yaml"
done

View File

@ -0,0 +1,4 @@
#!/bin/sh
kubectl delete -f ./pod.yaml
kubectl delete -f ./pvc.yaml

View File

@ -7,6 +7,6 @@ RUN yum install -y centos-release-ceph && \
yum install -y ceph-common e2fsprogs && \
yum clean all
COPY _output/rbdplugin /rbdplugin
COPY rbdplugin /rbdplugin
RUN chmod +x /rbdplugin
ENTRYPOINT ["/rbdplugin"]

149
pkg/cephfs/cephconf.go Normal file
View File

@ -0,0 +1,149 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"os"
"path"
"text/template"
)
const cephConfig = `[global]
mon_host = {{.Monitors}}
auth_cluster_required = cephx
auth_service_required = cephx
auth_client_required = cephx
# Workaround for http://tracker.ceph.com/issues/23446
fuse_set_user_groups = false
`
const cephKeyring = `[client.{{.UserId}}]
key = {{.Key}}
caps mds = "allow rw path={{.RootPath}}"
caps mon = "allow r"
caps osd = "allow rw{{if .Pool}} pool={{.Pool}}{{end}}{{if .Namespace}} namespace={{.Namespace}}{{end}}"
`
const cephFullCapsKeyring = `[client.{{.UserId}}]
key = {{.Key}}
caps mds = "allow"
caps mon = "allow *"
caps osd = "allow *"
`
const cephSecret = `{{.Key}}`
const (
cephConfigRoot = "/etc/ceph"
cephConfigFileName = "ceph.conf"
cephKeyringFileNameFmt = "ceph.client.%s.keyring"
cephSecretFileNameFmt = "ceph.client.%s.secret"
)
var (
cephConfigTempl *template.Template
cephKeyringTempl *template.Template
cephFullCapsKeyringTempl *template.Template
cephSecretTempl *template.Template
)
func init() {
fm := map[string]interface{}{
"perms": func(readOnly bool) string {
if readOnly {
return "r"
}
return "rw"
},
}
cephConfigTempl = template.Must(template.New("config").Parse(cephConfig))
cephKeyringTempl = template.Must(template.New("keyring").Funcs(fm).Parse(cephKeyring))
cephFullCapsKeyringTempl = template.Must(template.New("keyringFullCaps").Parse(cephFullCapsKeyring))
cephSecretTempl = template.Must(template.New("secret").Parse(cephSecret))
}
type cephConfigWriter interface {
writeToFile() error
}
type cephConfigData struct {
Monitors string
}
func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, data interface{}) error {
if err := os.MkdirAll(cephConfigRoot, 0755); err != nil {
return err
}
f, err := os.OpenFile(path.Join(cephConfigRoot, fileName), os.O_CREATE|os.O_EXCL|os.O_WRONLY, m)
if err != nil {
if os.IsExist(err) {
return nil
}
return err
}
defer f.Close()
return t.Execute(f, data)
}
func (d *cephConfigData) writeToFile() error {
return writeCephTemplate(cephConfigFileName, 0640, cephConfigTempl, d)
}
type cephKeyringData struct {
UserId, Key string
RootPath string
Pool, Namespace string
}
func (d *cephKeyringData) writeToFile() error {
return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.UserId), 0600, cephKeyringTempl, d)
}
type cephFullCapsKeyringData struct {
UserId, Key string
}
func (d *cephFullCapsKeyringData) writeToFile() error {
return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.UserId), 0600, cephFullCapsKeyringTempl, d)
}
type cephSecretData struct {
UserId, Key string
}
func (d *cephSecretData) writeToFile() error {
return writeCephTemplate(fmt.Sprintf(cephSecretFileNameFmt, d.UserId), 0600, cephSecretTempl, d)
}
func getCephSecretPath(userId string) string {
return path.Join(cephConfigRoot, fmt.Sprintf(cephSecretFileNameFmt, userId))
}
func getCephKeyringPath(userId string) string {
return path.Join(cephConfigRoot, fmt.Sprintf(cephKeyringFileNameFmt, userId))
}
func getCephConfPath() string {
return path.Join(cephConfigRoot, cephConfigFileName)
}

104
pkg/cephfs/cephuser.go Normal file
View File

@ -0,0 +1,104 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"os"
)
const (
cephUserPrefix = "csi-user-"
cephEntityClientPrefix = "client."
)
type cephEntityCaps struct {
Mds string `json:"mds"`
Mon string `json:"mon"`
Osd string `json:"osd"`
}
type cephEntity struct {
Entity string `json:"entity"`
Key string `json:"key"`
Caps cephEntityCaps `json:"caps"`
}
func getCephUserName(volUuid string) string {
return cephUserPrefix + volUuid
}
func getCephUser(userId string) (*cephEntity, error) {
entityName := cephEntityClientPrefix + userId
var ents []cephEntity
if err := execCommandJson(&ents, "ceph", "auth", "get", entityName); err != nil {
return nil, err
}
if len(ents) != 1 {
return nil, fmt.Errorf("error retrieving entity %s", entityName)
}
return &ents[0], nil
}
func (e *cephEntity) create() error {
return execCommandJson(e, "ceph", "auth", "get-or-create", e.Entity, "mds", e.Caps.Mds, "osd", e.Caps.Osd, "mon", e.Caps.Mon)
}
func createCephUser(volOptions *volumeOptions, volUuid string, readOnly bool) (*cephEntity, error) {
access := "rw"
if readOnly {
access = "r"
}
caps := cephEntityCaps{
Mds: fmt.Sprintf("allow %s path=%s", access, getVolumeRootPath_ceph(volUuid)),
Mon: "allow r",
Osd: fmt.Sprintf("allow %s pool=%s namespace=%s", access, volOptions.Pool, getVolumeNamespace(volUuid)),
}
var ents []cephEntity
args := [...]string{
"auth", "-f", "json",
"get-or-create", cephEntityClientPrefix + getCephUserName(volUuid),
"mds", caps.Mds,
"mon", caps.Mon,
"osd", caps.Osd,
}
if err := execCommandJson(&ents, "ceph", args[:]...); err != nil {
return nil, fmt.Errorf("error creating ceph user: %v", err)
}
return &ents[0], nil
}
func deleteCephUser(volUuid string) error {
userId := getCephUserName(volUuid)
if err := execCommandAndValidate("ceph", "auth", "rm", cephEntityClientPrefix+userId); err != nil {
return err
}
os.Remove(getCephKeyringPath(userId))
os.Remove(getCephSecretPath(userId))
return nil
}

View File

@ -0,0 +1,198 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"os"
"github.com/golang/glog"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type controllerServer struct {
*csicommon.DefaultControllerServer
}
const (
oneGB = 1073741824
)
func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return fmt.Errorf("Invalid CreateVolumeRequest: %v", err)
}
if req.GetName() == "" {
return status.Error(codes.InvalidArgument, "Volume Name cannot be empty")
}
if req.GetVolumeCapabilities() == nil {
return status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}
return nil
}
func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return fmt.Errorf("Invalid DeleteVolumeRequest: %v", err)
}
return nil
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateCreateVolumeRequest(req); err != nil {
glog.Errorf("CreateVolumeRequest validation failed: %v", err)
return nil, err
}
// Configuration
volOptions, err := newVolumeOptions(req.GetParameters())
if err != nil {
glog.Errorf("validation of volume options failed: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
volId := newVolumeIdentifier(volOptions, req)
conf := cephConfigData{Monitors: volOptions.Monitors}
if err = conf.writeToFile(); err != nil {
glog.Errorf("couldn't generate ceph.conf: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
// Create a volume in case the user didn't provide one
if volOptions.ProvisionVolume {
// Admin access is required
cr, err := getAdminCredentials(req.GetControllerCreateSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if err = storeCephAdminCredentials(cr); err != nil {
glog.Errorf("failed to store admin credentials for '%s': %v", cr.id, err)
return nil, status.Error(codes.Internal, err.Error())
}
if err = createVolume(volOptions, cr, volId.uuid, req.GetCapacityRange().GetRequiredBytes()); err != nil {
glog.Errorf("failed to create volume %s: %v", volId.name, err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("cephfs: volume %s successfuly created", volId.id)
} else {
glog.V(4).Infof("cephfs: volume %s is provisioned statically", volId.id)
}
if err = volCache.insert(&volumeCacheEntry{Identifier: *volId, VolOptions: *volOptions}); err != nil {
glog.Warningf("failed to store a volume cache entry: %v", err)
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Id: volId.id,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
Attributes: req.GetParameters(),
},
}, nil
}
func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVolumeRequest) (*csi.DeleteVolumeResponse, error) {
if err := cs.validateDeleteVolumeRequest(req); err != nil {
glog.Errorf("DeleteVolumeRequest validation failed: %v", err)
return nil, err
}
var (
cr *credentials
err error
volId = req.GetVolumeId()
volUuid = uuidFromVolumeId(volId)
)
// Load volume info from cache
ent, found := volCache.get(volUuid)
if !found {
msg := fmt.Sprintf("failed to retrieve cache entry for volume %s", volId)
glog.Error(msg)
return nil, status.Error(codes.Internal, msg)
}
// Set the correct user for mounting
if ent.VolOptions.ProvisionVolume {
// Admin access is required
cr, err = getAdminCredentials(req.GetControllerDeleteSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
} else {
cr, err = getUserCredentials(req.GetControllerDeleteSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
// Delete the volume contents
if err := purgeVolume(volId, cr, &ent.VolOptions); err != nil {
glog.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
// Clean up remaining files
if ent.VolOptions.ProvisionVolume {
// The user is no longer needed
if err := deleteCephUser(volUuid); err != nil {
glog.Warningf("failed to delete ceph user '%s': %v", cr.id, err)
}
userId := getCephUserName(volUuid)
os.Remove(getCephKeyringPath(userId))
os.Remove(getCephSecretPath(userId))
} else {
os.Remove(getCephKeyringPath(cr.id))
os.Remove(getCephSecretPath(cr.id))
}
if err := volCache.erase(volUuid); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("cephfs: volume %s successfuly deleted", volId)
return &csi.DeleteVolumeResponse{}, nil
}
func (cs *controllerServer) ValidateVolumeCapabilities(
ctx context.Context,
req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) {
return &csi.ValidateVolumeCapabilitiesResponse{Supported: true}, nil
}

56
pkg/cephfs/credentials.go Normal file
View File

@ -0,0 +1,56 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import "fmt"
const (
credUserId = "userID"
credUserKey = "userKey"
credAdminId = "adminID"
credAdminKey = "adminKey"
)
type credentials struct {
id string
key string
}
func getCredentials(idField, keyField string, secrets map[string]string) (*credentials, error) {
var (
c = &credentials{}
ok bool
)
if c.id, ok = secrets[idField]; !ok {
return nil, fmt.Errorf("missing ID field '%s' in secrets", idField)
}
if c.key, ok = secrets[keyField]; !ok {
return nil, fmt.Errorf("missing key field '%s' in secrets", keyField)
}
return c, nil
}
func getUserCredentials(secrets map[string]string) (*credentials, error) {
return getCredentials(credUserId, credUserKey, secrets)
}
func getAdminCredentials(secrets map[string]string) (*credentials, error) {
return getCredentials(credAdminId, credAdminKey, secrets)
}

129
pkg/cephfs/driver.go Normal file
View File

@ -0,0 +1,129 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"os"
"github.com/golang/glog"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
const (
PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin"
Version = "0.2.0"
)
type cephfsDriver struct {
driver *csicommon.CSIDriver
is *identityServer
ns *nodeServer
cs *controllerServer
caps []*csi.VolumeCapability_AccessMode
cscaps []*csi.ControllerServiceCapability
}
var (
driver *cephfsDriver
DefaultVolumeMounter string
)
func getVolumeMounterByProbing() string {
if execCommandAndValidate("ceph-fuse", "--version") == nil {
return volumeMounter_fuse
} else {
return volumeMounter_kernel
}
}
func NewCephFSDriver() *cephfsDriver {
return &cephfsDriver{}
}
func NewIdentityServer(d *csicommon.CSIDriver) *identityServer {
return &identityServer{
DefaultIdentityServer: csicommon.NewDefaultIdentityServer(d),
}
}
func NewControllerServer(d *csicommon.CSIDriver) *controllerServer {
return &controllerServer{
DefaultControllerServer: csicommon.NewDefaultControllerServer(d),
}
}
func NewNodeServer(d *csicommon.CSIDriver) *nodeServer {
return &nodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(d),
}
}
func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string) {
glog.Infof("Driver: %v version: %v", driverName, Version)
// Configuration
if err := os.MkdirAll(volumeCacheRoot, 0755); err != nil {
glog.Fatalf("cephfs: failed to create %s: %v", volumeCacheRoot, err)
return
}
if err := loadVolumeCache(); err != nil {
glog.Errorf("cephfs: failed to read volume cache: %v", err)
}
if volumeMounter != "" {
if err := validateMounter(volumeMounter); err != nil {
glog.Fatalln(err)
} else {
DefaultVolumeMounter = volumeMounter
}
} else {
DefaultVolumeMounter = getVolumeMounterByProbing()
}
glog.Infof("cephfs: setting default volume mounter to %s", DefaultVolumeMounter)
// Initialize default library driver
fs.driver = csicommon.NewCSIDriver(driverName, Version, nodeId)
if fs.driver == nil {
glog.Fatalln("Failed to initialize CSI driver")
}
fs.driver.AddControllerServiceCapabilities([]csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
})
fs.driver.AddVolumeCapabilityAccessModes([]csi.VolumeCapability_AccessMode_Mode{
csi.VolumeCapability_AccessMode_MULTI_NODE_MULTI_WRITER,
})
// Create gRPC servers
fs.is = NewIdentityServer(fs.driver)
fs.ns = NewNodeServer(fs.driver)
fs.cs = NewControllerServer(fs.driver)
server := csicommon.NewNonBlockingGRPCServer()
server.Start(endpoint, fs.is, fs.cs, fs.ns)
server.Wait()
}

View File

@ -0,0 +1,25 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type identityServer struct {
*csicommon.DefaultIdentityServer
}

195
pkg/cephfs/nodeserver.go Normal file
View File

@ -0,0 +1,195 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"context"
"os"
"github.com/golang/glog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/kubernetes-csi/drivers/pkg/csi-common"
)
type nodeServer struct {
*csicommon.DefaultNodeServer
}
func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
if req.GetVolumeCapability() == nil {
return status.Error(codes.InvalidArgument, "Volume capability missing in request")
}
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetTargetPath() == "" {
return status.Error(codes.InvalidArgument, "Target path missing in request")
}
return nil
}
func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error {
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetTargetPath() == "" {
return status.Error(codes.InvalidArgument, "Target path missing in request")
}
return nil
}
func handleUser(volOptions *volumeOptions, volUuid string, req *csi.NodePublishVolumeRequest) (*credentials, error) {
var (
cr = &credentials{}
err error
)
// Retrieve the credentials (possibly create a new user as well)
if volOptions.ProvisionVolume {
// The volume is provisioned dynamically, create a dedicated user
if ent, err := createCephUser(volOptions, volUuid, req.GetReadonly()); err != nil {
return nil, err
} else {
cr.id = ent.Entity[len(cephEntityClientPrefix):]
cr.key = ent.Key
}
// Set the correct volume root path
volOptions.RootPath = getVolumeRootPath_ceph(volUuid)
} else {
// The volume is pre-made, credentials are supplied by the user
cr, err = getUserCredentials(req.GetNodePublishSecrets())
if err != nil {
return nil, err
}
}
if err = storeCephUserCredentials(volUuid, cr, volOptions); err != nil {
return nil, err
}
return cr, nil
}
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if err := validateNodePublishVolumeRequest(req); err != nil {
return nil, err
}
// Configuration
targetPath := req.GetTargetPath()
volId := req.GetVolumeId()
volUuid := uuidFromVolumeId(volId)
volOptions, err := newVolumeOptions(req.GetVolumeAttributes())
if err != nil {
glog.Errorf("error reading volume options: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if err = createMountPoint(targetPath); err != nil {
glog.Errorf("failed to create mount point at %s: %v", targetPath, err)
return nil, status.Error(codes.Internal, err.Error())
}
// Check if the volume is already mounted
isMnt, err := isMountPoint(targetPath)
if err != nil {
glog.Errorf("stat failed: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
if isMnt {
glog.V(4).Infof("cephfs: volume %s is already mounted to %s", volId, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
// It's not, mount now
cr, err := handleUser(volOptions, volUuid, req)
if err != nil {
glog.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
m := newMounter(volOptions)
if err = m.mount(targetPath, cr, volOptions, volUuid, req.GetReadonly()); err != nil {
glog.Errorf("failed to mount volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("cephfs: volume %s successfuly mounted to %s", volId, targetPath)
return &csi.NodePublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if err := validateNodeUnpublishVolumeRequest(req); err != nil {
return nil, err
}
volId := req.GetVolumeId()
targetPath := req.GetTargetPath()
// Unmount the bind-mount
if err := unmountVolume(targetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
localVolRoot := getVolumeRootPath_local(uuidFromVolumeId(volId))
// Unmount the volume root
if err := unmountVolume(localVolRoot); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
os.Remove(localVolRoot)
glog.V(4).Infof("cephfs: volume %s successfuly unmounted from %s", volId, req.GetTargetPath())
return &csi.NodeUnpublishVolumeResponse{}, nil
}
func (ns *nodeServer) NodeStageVolume(
ctx context.Context,
req *csi.NodeStageVolumeRequest) (
*csi.NodeStageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}
func (ns *nodeServer) NodeUnstageVolume(
ctx context.Context,
req *csi.NodeUnstageVolumeRequest) (
*csi.NodeUnstageVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
}

131
pkg/cephfs/util.go Normal file
View File

@ -0,0 +1,131 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"bytes"
"encoding/json"
"fmt"
"os/exec"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/util/keymutex"
"k8s.io/kubernetes/pkg/util/mount"
)
func execCommand(command string, args ...string) ([]byte, error) {
cmd := exec.Command(command, args...)
return cmd.CombinedOutput()
}
func execCommandAndValidate(program string, args ...string) error {
out, err := execCommand(program, args...)
if err != nil {
return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out)
}
return nil
}
func execCommandJson(v interface{}, program string, args ...string) error {
cmd := exec.Command(program, args...)
out, err := cmd.CombinedOutput()
if err != nil {
return err
}
return json.NewDecoder(bytes.NewReader(out)).Decode(v)
}
func isMountPoint(p string) (bool, error) {
notMnt, err := mount.New("").IsLikelyNotMountPoint(p)
if err != nil {
return false, status.Error(codes.Internal, err.Error())
}
return !notMnt, nil
}
func tryLock(id string, mtx keymutex.KeyMutex, name string) error {
// TODO uncomment this once TryLockKey gets into Kubernetes
/*
if !mtx.TryLockKey(id) {
msg := fmt.Sprintf("%s has a pending operation on %s", name, req.GetVolumeId())
glog.Infoln(msg)
return status.Error(codes.Aborted, msg)
}
*/
return nil
}
func storeCephUserCredentials(volUuid string, cr *credentials, volOptions *volumeOptions) error {
keyringData := cephKeyringData{
UserId: cr.id,
Key: cr.key,
RootPath: volOptions.RootPath,
}
if volOptions.ProvisionVolume {
keyringData.Pool = volOptions.Pool
keyringData.Namespace = getVolumeNamespace(volUuid)
}
return storeCephCredentials(cr, &keyringData)
}
func storeCephAdminCredentials(cr *credentials) error {
return storeCephCredentials(cr, &cephFullCapsKeyringData{UserId: cr.id, Key: cr.key})
}
func storeCephCredentials(cr *credentials, keyringData cephConfigWriter) error {
if err := keyringData.writeToFile(); err != nil {
return err
}
secret := cephSecretData{
UserId: cr.id,
Key: cr.key,
}
if err := secret.writeToFile(); err != nil {
return err
}
return nil
}
func newMounter(volOptions *volumeOptions) volumeMounter {
mounter := volOptions.Mounter
if mounter == "" {
mounter = DefaultVolumeMounter
}
switch mounter {
case volumeMounter_fuse:
return &fuseMounter{}
case volumeMounter_kernel:
return &kernelMounter{}
}
return nil
}

174
pkg/cephfs/volume.go Normal file
View File

@ -0,0 +1,174 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"os"
"path"
)
// Volumes are mounted in .../controller/volumes/vol-{UUID}
// The actual user data resides in .../vol-{UUID}/volume-data
// purgeVolume moves the user data to .../vol-{UUID}/volume-deleting and only then calls os.RemoveAll
const (
cephRootPrefix = PluginFolder + "/controller/volumes/root-"
cephVolumePrefix = PluginFolder + "/controller/volumes/vol-"
cephVolumesRoot = "csi-volumes"
volumeDataSuffix = "volume-data"
volumeDeletingSuffix = "volume-deleting"
namespacePrefix = "csi-ns-"
)
func getCephRootPath_local(volUuid string) string {
return cephRootPrefix + volUuid
}
func getCephRootVolumePath_local(volUuid string) string {
return path.Join(getCephRootPath_local(volUuid), cephVolumesRoot, volUuid)
}
func getCephRootVolumeDataPath_local(volUuid string) string {
return path.Join(getCephRootVolumePath_local(volUuid), volumeDataSuffix)
}
func getCephRootVolumeDeletingPath_local(volUuid string) string {
return path.Join(getCephRootVolumePath_local(volUuid), volumeDeletingSuffix)
}
func getVolumeRootPath_local(volUuid string) string {
return cephVolumePrefix + volUuid
}
func getVolumeRootPath_ceph(volUuid string) string {
return path.Join("/", cephVolumesRoot, volUuid)
}
func getVolumeDataPath_local(volUuid string) string {
return path.Join(getVolumeRootPath_local(volUuid), volumeDataSuffix)
}
func getVolumeDeletingPath_local(volUuid string) string {
return path.Join(getVolumeRootPath_local(volUuid), volumeDeletingSuffix)
}
func getVolumeNamespace(volUuid string) string {
return namespacePrefix + volUuid
}
func setVolumeAttribute(root, attrName, attrValue string) error {
return execCommandAndValidate("setfattr", "-n", attrName, "-v", attrValue, root)
}
func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid string, bytesQuota int64) error {
cephRoot := getCephRootPath_local(volUuid)
if err := createMountPoint(cephRoot); err != nil {
return err
}
// RootPath is not set for a dynamically provisioned volume
// Access to cephfs's / is required
volOptions.RootPath = "/"
if err := mountKernel(cephRoot, adminCr, volOptions); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err)
}
defer func() {
unmountVolume(cephRoot)
os.Remove(cephRoot)
}()
volOptions.RootPath = getVolumeRootPath_ceph(volUuid)
localVolRoot := getCephRootVolumePath_local(volUuid)
if err := createMountPoint(localVolRoot); err != nil {
return err
}
if err := setVolumeAttribute(localVolRoot, "ceph.quota.max_bytes", fmt.Sprintf("%d", bytesQuota)); err != nil {
return err
}
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool", volOptions.Pool); err != nil {
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
}
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volUuid)); err != nil {
return err
}
return nil
}
func purgeVolume(volId string, cr *credentials, volOptions *volumeOptions) error {
var (
volUuid = uuidFromVolumeId(volId)
volRoot string
dataPath string
delPath string
)
if volOptions.ProvisionVolume {
// RootPath is not set for a dynamically provisioned volume
volOptions.RootPath = "/"
volRoot = getCephRootPath_local(volUuid)
dataPath = getCephRootVolumeDataPath_local(volUuid)
delPath = getCephRootVolumeDeletingPath_local(volUuid)
} else {
volRoot = getVolumeRootPath_local(volUuid)
dataPath = getVolumeDataPath_local(volUuid)
delPath = getVolumeDeletingPath_local(volUuid)
}
if err := createMountPoint(volRoot); err != nil {
return err
}
if err := mountKernel(volRoot, cr, volOptions); err != nil {
return err
}
defer func() {
if volOptions.ProvisionVolume {
os.Remove(getCephRootVolumePath_local(volUuid))
}
unmountVolume(volRoot)
os.Remove(volRoot)
}()
if err := os.Rename(dataPath, delPath); err != nil {
if os.IsNotExist(err) {
// dataPath doesn't exist if NodePublishVolume wasn't called
return nil
} else {
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volId, err)
}
}
if err := os.RemoveAll(delPath); err != nil {
return fmt.Errorf("couldn't delete volume %s: %v", volId, err)
}
return nil
}

131
pkg/cephfs/volumecache.go Normal file
View File

@ -0,0 +1,131 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"github.com/golang/glog"
)
const (
volumeCacheRoot = PluginFolder + "/controller/volume-cache"
)
type volumeCacheEntry struct {
VolOptions volumeOptions
Identifier volumeIdentifier
}
type volumeCache struct {
entries map[string]*volumeCacheEntry
}
var (
volCache volumeCache
volCacheMtx sync.RWMutex
)
// Loads all .json files from volumeCacheRoot into volCache
// Called from driver.go's Run()
func loadVolumeCache() error {
cacheDir, err := ioutil.ReadDir(volumeCacheRoot)
if err != nil {
return fmt.Errorf("cannot read volume cache: %v", err)
}
volCacheMtx.Lock()
defer volCacheMtx.Unlock()
volCache.entries = make(map[string]*volumeCacheEntry)
for _, fi := range cacheDir {
if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() {
continue
}
f, err := os.Open(path.Join(volumeCacheRoot, fi.Name()))
if err != nil {
glog.Errorf("cephfs: couldn't read '%s' from volume cache: %v", fi.Name(), err)
continue
}
d := json.NewDecoder(f)
ent := &volumeCacheEntry{}
if err = d.Decode(ent); err != nil {
glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err)
} else {
volCache.entries[ent.Identifier.uuid] = ent
}
f.Close()
}
return nil
}
func getVolumeCacheEntryPath(volUuid string) string {
return path.Join(volumeCacheRoot, fmt.Sprintf("vol-%s.json", volUuid))
}
func (vc *volumeCache) insert(ent *volumeCacheEntry) error {
filePath := getVolumeCacheEntryPath(ent.Identifier.uuid)
volCacheMtx.Lock()
defer volCacheMtx.Unlock()
f, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("couldn't create cache entry file %s: %v", filePath, err)
}
defer f.Close()
e := json.NewEncoder(f)
if err = e.Encode(ent); err != nil {
return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.Identifier.id, err)
}
vc.entries[ent.Identifier.uuid] = ent
return nil
}
func (vc *volumeCache) erase(volUuid string) error {
volCacheMtx.Lock()
delete(vc.entries, volUuid)
volCacheMtx.Unlock()
return os.Remove(getVolumeCacheEntryPath(volUuid))
}
func (vc *volumeCache) get(volUuid string) (volumeCacheEntry, bool) {
volCacheMtx.RLock()
defer volCacheMtx.RUnlock()
if ent, ok := vc.entries[volUuid]; ok {
return *ent, true
} else {
return volumeCacheEntry{}, false
}
}

View File

@ -0,0 +1,60 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/pborman/uuid"
)
const (
dynamicallyProvisionedVolumePrefix = "csi-cephfs-dyn-"
staticallyProvisionedVolumePrefix = "csi-cephfs-sta-"
volumePrefixLen = len(dynamicallyProvisionedVolumePrefix)
)
type volumeIdentifier struct {
name, uuid, id string
}
func newVolumeIdentifier(volOptions *volumeOptions, req *csi.CreateVolumeRequest) *volumeIdentifier {
volId := volumeIdentifier{
name: req.GetName(),
uuid: uuid.NewUUID().String(),
}
prefix := staticallyProvisionedVolumePrefix
if volOptions.ProvisionVolume {
prefix = dynamicallyProvisionedVolumePrefix
}
volId.id = prefix + volId.uuid
return &volId
}
func uuidFromVolumeId(volId string) string {
return volId[volumePrefixLen:]
}
func isDynProvision(volId string) bool {
if len(volId) <= volumePrefixLen {
return false
}
return volId[:volumePrefixLen] == dynamicallyProvisionedVolumePrefix
}

137
pkg/cephfs/volumemounter.go Normal file
View File

@ -0,0 +1,137 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"bytes"
"fmt"
"os"
)
const (
volumeMounter_fuse = "fuse"
volumeMounter_kernel = "kernel"
)
type volumeMounter interface {
mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error
}
type fuseMounter struct{}
func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions) error {
args := [...]string{
mountPoint,
"-n", cephEntityClientPrefix + cr.id,
"-r", volOptions.RootPath,
}
out, err := execCommand("ceph-fuse", args[:]...)
if err != nil {
return fmt.Errorf("cephfs: ceph-fuse failed with following error: %s\ncephfs: ceph-fuse output: %s", err, out)
}
if !bytes.Contains(out, []byte("starting fuse"[:])) {
return fmt.Errorf("cephfs: ceph-fuse failed:\ncephfs: ceph-fuse output: %s", out)
}
return nil
}
func (m *fuseMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error {
if err := createMountPoint(mountPoint); err != nil {
return err
}
localVolRoot := getVolumeRootPath_local(volUuid)
if err := createMountPoint(localVolRoot); err != nil {
return err
}
if err := mountFuse(localVolRoot, cr, volOptions); err != nil {
return err
}
return bindVolume(volUuid, mountPoint, readOnly)
}
type kernelMounter struct{}
func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions) error {
if err := execCommandAndValidate("modprobe", "ceph"); err != nil {
return err
}
return execCommandAndValidate("mount",
"-t", "ceph",
fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath),
mountPoint,
"-o",
fmt.Sprintf("name=%s,secretfile=%s", cr.id, getCephSecretPath(cr.id)),
)
}
func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error {
if err := createMountPoint(mountPoint); err != nil {
return err
}
localVolRoot := getVolumeRootPath_local(volUuid)
if err := createMountPoint(localVolRoot); err != nil {
return err
}
if err := mountKernel(localVolRoot, cr, volOptions); err != nil {
return err
}
return bindVolume(volUuid, mountPoint, readOnly)
}
func bindMount(from, to string, readOnly bool) error {
if err := execCommandAndValidate("mount", "--bind", from, to); err != nil {
return fmt.Errorf("failed bind-mount of %s to %s: %v", from, to, err)
}
if readOnly {
if err := execCommandAndValidate("mount", "-o", "remount,ro,bind", to); err != nil {
return err
}
}
return nil
}
func bindVolume(volUuid, target string, readOnly bool) error {
volDataRoot := getVolumeDataPath_local(volUuid)
if err := createMountPoint(volDataRoot); err != nil {
return err
}
return bindMount(volDataRoot, target, readOnly)
}
func unmountVolume(mountPoint string) error {
return execCommandAndValidate("umount", mountPoint)
}
func createMountPoint(root string) error {
return os.MkdirAll(root, 0750)
}

129
pkg/cephfs/volumeoptions.go Normal file
View File

@ -0,0 +1,129 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"errors"
"fmt"
"strconv"
)
type volumeOptions struct {
Monitors string `json:"monitors"`
Pool string `json:"pool"`
RootPath string `json:"rootPath"`
Mounter string `json:"mounter"`
ProvisionVolume bool `json:"provisionVolume"`
}
func validateNonEmptyField(field, fieldName string) error {
if field == "" {
return fmt.Errorf("Parameter '%s' cannot be empty", fieldName)
}
return nil
}
func (o *volumeOptions) validate() error {
if err := validateNonEmptyField(o.Monitors, "monitors"); err != nil {
return err
}
if err := validateNonEmptyField(o.RootPath, "rootPath"); err != nil {
if !o.ProvisionVolume {
return err
}
} else {
if o.ProvisionVolume {
return fmt.Errorf("Non-empty field rootPath is in conflict with provisionVolume=true")
}
}
if o.ProvisionVolume {
if err := validateNonEmptyField(o.Pool, "pool"); err != nil {
return err
}
}
if o.Mounter != "" {
if err := validateMounter(o.Mounter); err != nil {
return err
}
}
return nil
}
func extractOption(dest *string, optionLabel string, options map[string]string) error {
if opt, ok := options[optionLabel]; !ok {
return errors.New("Missing required field " + optionLabel)
} else {
*dest = opt
return nil
}
}
func validateMounter(m string) error {
switch m {
case volumeMounter_fuse:
case volumeMounter_kernel:
default:
return fmt.Errorf("Unknown mounter '%s'. Valid options are 'fuse' and 'kernel'", m)
}
return nil
}
func newVolumeOptions(volOptions map[string]string) (*volumeOptions, error) {
var (
opts volumeOptions
provisionVolumeBool string
err error
)
if err = extractOption(&opts.Monitors, "monitors", volOptions); err != nil {
return nil, err
}
if err = extractOption(&provisionVolumeBool, "provisionVolume", volOptions); err != nil {
return nil, err
}
if opts.ProvisionVolume, err = strconv.ParseBool(provisionVolumeBool); err != nil {
return nil, fmt.Errorf("Failed to parse provisionVolume: %v", err)
}
if opts.ProvisionVolume {
if err = extractOption(&opts.Pool, "pool", volOptions); err != nil {
return nil, err
}
} else {
if err = extractOption(&opts.RootPath, "rootPath", volOptions); err != nil {
return nil, err
}
}
// This field is optional, don't check for its presence
extractOption(&opts.Mounter, "mounter", volOptions)
if err = opts.validate(); err != nil {
return nil, err
}
return &opts, nil
}