mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-11-22 22:30:23 +00:00
cephfs: cleaning/renaming
This commit is contained in:
parent
257a11780f
commit
0df8415067
@ -1,332 +0,0 @@
|
|||||||
#!/usr/bin/env python
|
|
||||||
|
|
||||||
# Copyright 2017 The Kubernetes Authors.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
import os
|
|
||||||
import rados
|
|
||||||
import getopt
|
|
||||||
import sys
|
|
||||||
import json
|
|
||||||
|
|
||||||
"""
|
|
||||||
CEPH_CLUSTER_NAME=test CEPH_MON=172.24.0.4 CEPH_AUTH_ID=admin CEPH_AUTH_KEY=AQCMpH9YM4Q1BhAAXGNQyyOne8ZsXqWGon/dIQ== cephfs_provisioner.py -n foo -u bar
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
import ceph_volume_client
|
|
||||||
ceph_module_found = True
|
|
||||||
except ImportError as e:
|
|
||||||
ceph_volume_client = None
|
|
||||||
ceph_module_found = False
|
|
||||||
|
|
||||||
VOlUME_GROUP="kubernetes"
|
|
||||||
CONF_PATH="/etc/ceph/"
|
|
||||||
|
|
||||||
class CephFSNativeDriver(object):
|
|
||||||
"""Driver for the Ceph Filesystem.
|
|
||||||
|
|
||||||
This driver is 'native' in the sense that it exposes a CephFS filesystem
|
|
||||||
for use directly by guests, with no intermediate layer like NFS.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
self._volume_client = None
|
|
||||||
|
|
||||||
|
|
||||||
def _create_conf(self, cluster_name, mons):
|
|
||||||
""" Create conf using monitors
|
|
||||||
Create a minimal ceph conf with monitors and cephx
|
|
||||||
"""
|
|
||||||
conf_path = CONF_PATH + cluster_name + ".conf"
|
|
||||||
if not os.path.isfile(conf_path) or os.access(conf_path, os.W_OK):
|
|
||||||
conf = open(conf_path, 'w')
|
|
||||||
conf.write("[global]\n")
|
|
||||||
conf.write("mon_host = " + mons + "\n")
|
|
||||||
conf.write("auth_cluster_required = cephx\nauth_service_required = cephx\nauth_client_required = cephx\n")
|
|
||||||
conf.close()
|
|
||||||
return conf_path
|
|
||||||
|
|
||||||
def _create_keyring(self, cluster_name, id, key):
|
|
||||||
""" Create client keyring using id and key
|
|
||||||
"""
|
|
||||||
keyring_path = CONF_PATH + cluster_name + "." + "client." + id + ".keyring"
|
|
||||||
if not os.path.isfile(keyring_path) or os.access(keyring_path, os.W_OK):
|
|
||||||
keyring = open(keyring_path, 'w')
|
|
||||||
keyring.write("[client." + id + "]\n")
|
|
||||||
keyring.write("key = " + key + "\n")
|
|
||||||
keyring.write("caps mds = \"allow *\"\n")
|
|
||||||
keyring.write("caps mon = \"allow *\"\n")
|
|
||||||
keyring.write("caps osd = \"allow *\"\n")
|
|
||||||
keyring.close()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def volume_client(self):
|
|
||||||
if self._volume_client:
|
|
||||||
return self._volume_client
|
|
||||||
|
|
||||||
if not ceph_module_found:
|
|
||||||
raise ValueError("Ceph client libraries not found.")
|
|
||||||
|
|
||||||
try:
|
|
||||||
cluster_name = os.environ["CEPH_CLUSTER_NAME"]
|
|
||||||
except KeyError:
|
|
||||||
cluster_name = "ceph"
|
|
||||||
try:
|
|
||||||
mons = os.environ["CEPH_MON"]
|
|
||||||
except KeyError:
|
|
||||||
raise ValueError("Missing CEPH_MON env")
|
|
||||||
try:
|
|
||||||
auth_id = os.environ["CEPH_AUTH_ID"]
|
|
||||||
except KeyError:
|
|
||||||
raise ValueError("Missing CEPH_AUTH_ID")
|
|
||||||
try:
|
|
||||||
auth_key = os.environ["CEPH_AUTH_KEY"]
|
|
||||||
except:
|
|
||||||
raise ValueError("Missing CEPH_AUTH_KEY")
|
|
||||||
|
|
||||||
conf_path = self._create_conf(cluster_name, mons)
|
|
||||||
self._create_keyring(cluster_name, auth_id, auth_key)
|
|
||||||
|
|
||||||
self._volume_client = ceph_volume_client.CephFSVolumeClient(
|
|
||||||
auth_id, conf_path, cluster_name)
|
|
||||||
try:
|
|
||||||
self._volume_client.connect(None)
|
|
||||||
except Exception:
|
|
||||||
self._volume_client = None
|
|
||||||
raise
|
|
||||||
|
|
||||||
return self._volume_client
|
|
||||||
|
|
||||||
def _authorize_ceph(self, volume_path, auth_id, readonly):
|
|
||||||
path = self._volume_client._get_path(volume_path)
|
|
||||||
|
|
||||||
# First I need to work out what the data pool is for this share:
|
|
||||||
# read the layout
|
|
||||||
pool_name = self._volume_client._get_ancestor_xattr(path, "ceph.dir.layout.pool")
|
|
||||||
namespace = self._volume_client.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
|
|
||||||
|
|
||||||
# Now construct auth capabilities that give the guest just enough
|
|
||||||
# permissions to access the share
|
|
||||||
client_entity = "client.{0}".format(auth_id)
|
|
||||||
want_access_level = 'r' if readonly else 'rw'
|
|
||||||
want_mds_cap = 'allow r,allow {0} path={1}'.format(want_access_level, path)
|
|
||||||
want_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
|
|
||||||
want_access_level, pool_name, namespace)
|
|
||||||
|
|
||||||
try:
|
|
||||||
existing = self._volume_client._rados_command(
|
|
||||||
'auth get',
|
|
||||||
{
|
|
||||||
'entity': client_entity
|
|
||||||
}
|
|
||||||
)
|
|
||||||
# FIXME: rados raising Error instead of ObjectNotFound in auth get failure
|
|
||||||
except rados.Error:
|
|
||||||
caps = self._volume_client._rados_command(
|
|
||||||
'auth get-or-create',
|
|
||||||
{
|
|
||||||
'entity': client_entity,
|
|
||||||
'caps': [
|
|
||||||
'mds', want_mds_cap,
|
|
||||||
'osd', want_osd_cap,
|
|
||||||
'mon', 'allow r']
|
|
||||||
})
|
|
||||||
else:
|
|
||||||
# entity exists, update it
|
|
||||||
cap = existing[0]
|
|
||||||
|
|
||||||
# Construct auth caps that if present might conflict with the desired
|
|
||||||
# auth caps.
|
|
||||||
unwanted_access_level = 'r' if want_access_level is 'rw' else 'rw'
|
|
||||||
unwanted_mds_cap = 'allow {0} path={1}'.format(unwanted_access_level, path)
|
|
||||||
unwanted_osd_cap = 'allow {0} pool={1} namespace={2}'.format(
|
|
||||||
unwanted_access_level, pool_name, namespace)
|
|
||||||
|
|
||||||
def cap_update(orig, want, unwanted):
|
|
||||||
# Updates the existing auth caps such that there is a single
|
|
||||||
# occurrence of wanted auth caps and no occurrence of
|
|
||||||
# conflicting auth caps.
|
|
||||||
|
|
||||||
cap_tokens = set(orig.split(","))
|
|
||||||
|
|
||||||
cap_tokens.discard(unwanted)
|
|
||||||
cap_tokens.add(want)
|
|
||||||
|
|
||||||
return ",".join(cap_tokens)
|
|
||||||
|
|
||||||
osd_cap_str = cap_update(cap['caps'].get('osd', ""), want_osd_cap, unwanted_osd_cap)
|
|
||||||
mds_cap_str = cap_update(cap['caps'].get('mds', ""), want_mds_cap, unwanted_mds_cap)
|
|
||||||
|
|
||||||
caps = self._volume_client._rados_command(
|
|
||||||
'auth caps',
|
|
||||||
{
|
|
||||||
'entity': client_entity,
|
|
||||||
'caps': [
|
|
||||||
'mds', mds_cap_str,
|
|
||||||
'osd', osd_cap_str,
|
|
||||||
'mon', cap['caps'].get('mon')]
|
|
||||||
})
|
|
||||||
caps = self._volume_client._rados_command(
|
|
||||||
'auth get',
|
|
||||||
{
|
|
||||||
'entity': client_entity
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
# Result expected like this:
|
|
||||||
# [
|
|
||||||
# {
|
|
||||||
# "entity": "client.foobar",
|
|
||||||
# "key": "AQBY0\/pViX\/wBBAAUpPs9swy7rey1qPhzmDVGQ==",
|
|
||||||
# "caps": {
|
|
||||||
# "mds": "allow *",
|
|
||||||
# "mon": "allow *"
|
|
||||||
# }
|
|
||||||
# }
|
|
||||||
# ]
|
|
||||||
assert len(caps) == 1
|
|
||||||
assert caps[0]['entity'] == client_entity
|
|
||||||
return caps[0]
|
|
||||||
|
|
||||||
def create_share(self, path, user_id, size=None):
|
|
||||||
"""Create a CephFS volume.
|
|
||||||
"""
|
|
||||||
volume_path = ceph_volume_client.VolumePath(VOlUME_GROUP, path)
|
|
||||||
|
|
||||||
# Create the CephFS volume
|
|
||||||
volume = self.volume_client.create_volume(volume_path, size=size)
|
|
||||||
|
|
||||||
# To mount this you need to know the mon IPs and the path to the volume
|
|
||||||
mon_addrs = self.volume_client.get_mon_addrs()
|
|
||||||
|
|
||||||
export_location = "{addrs}:{path}".format(
|
|
||||||
addrs=",".join(mon_addrs),
|
|
||||||
path=volume['mount_path'])
|
|
||||||
|
|
||||||
"""TODO
|
|
||||||
restrict to user_id
|
|
||||||
"""
|
|
||||||
auth_result = self._authorize_ceph(volume_path, user_id, False)
|
|
||||||
ret = {
|
|
||||||
'path': volume['mount_path'],
|
|
||||||
'user': auth_result['entity'],
|
|
||||||
'key': auth_result['key']
|
|
||||||
}
|
|
||||||
|
|
||||||
self._create_keyring(self.volume_client.cluster_name, user_id, auth_result['key'])
|
|
||||||
|
|
||||||
return json.dumps(ret)
|
|
||||||
|
|
||||||
def _deauthorize(self, volume_path, auth_id):
|
|
||||||
"""
|
|
||||||
The volume must still exist.
|
|
||||||
NOTE: In our `_authorize_ceph` method we give user extra mds `allow r`
|
|
||||||
cap to work around a kernel cephfs issue. So we need a customized
|
|
||||||
`_deauthorize` method to remove caps instead of using
|
|
||||||
`volume_client._deauthorize`.
|
|
||||||
This methid is modified from
|
|
||||||
https://github.com/ceph/ceph/blob/v13.0.0/src/pybind/ceph_volume_client.py#L1181.
|
|
||||||
"""
|
|
||||||
client_entity = "client.{0}".format(auth_id)
|
|
||||||
path = self.volume_client._get_path(volume_path)
|
|
||||||
path = self.volume_client._get_path(volume_path)
|
|
||||||
pool_name = self.volume_client._get_ancestor_xattr(path, "ceph.dir.layout.pool")
|
|
||||||
namespace = self.volume_client.fs.getxattr(path, "ceph.dir.layout.pool_namespace")
|
|
||||||
|
|
||||||
# The auth_id might have read-only or read-write mount access for the
|
|
||||||
# volume path.
|
|
||||||
access_levels = ('r', 'rw')
|
|
||||||
want_mds_caps = {'allow {0} path={1}'.format(access_level, path)
|
|
||||||
for access_level in access_levels}
|
|
||||||
want_osd_caps = {'allow {0} pool={1} namespace={2}'.format(
|
|
||||||
access_level, pool_name, namespace)
|
|
||||||
for access_level in access_levels}
|
|
||||||
|
|
||||||
try:
|
|
||||||
existing = self.volume_client._rados_command(
|
|
||||||
'auth get',
|
|
||||||
{
|
|
||||||
'entity': client_entity
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
def cap_remove(orig, want):
|
|
||||||
cap_tokens = set(orig.split(","))
|
|
||||||
return ",".join(cap_tokens.difference(want))
|
|
||||||
|
|
||||||
cap = existing[0]
|
|
||||||
osd_cap_str = cap_remove(cap['caps'].get('osd', ""), want_osd_caps)
|
|
||||||
mds_cap_str = cap_remove(cap['caps'].get('mds', ""), want_mds_caps)
|
|
||||||
|
|
||||||
if (not osd_cap_str) and (not osd_cap_str or mds_cap_str == "allow r"):
|
|
||||||
# If osd caps are removed and mds caps are removed or only have "allow r", we can remove entity safely.
|
|
||||||
self.volume_client._rados_command('auth del', {'entity': client_entity}, decode=False)
|
|
||||||
else:
|
|
||||||
self.volume_client._rados_command(
|
|
||||||
'auth caps',
|
|
||||||
{
|
|
||||||
'entity': client_entity,
|
|
||||||
'caps': [
|
|
||||||
'mds', mds_cap_str,
|
|
||||||
'osd', osd_cap_str,
|
|
||||||
'mon', cap['caps'].get('mon', 'allow r')]
|
|
||||||
})
|
|
||||||
|
|
||||||
# FIXME: rados raising Error instead of ObjectNotFound in auth get failure
|
|
||||||
except rados.Error:
|
|
||||||
# Already gone, great.
|
|
||||||
return
|
|
||||||
|
|
||||||
def delete_share(self, path, user_id):
|
|
||||||
volume_path = ceph_volume_client.VolumePath(VOlUME_GROUP, path)
|
|
||||||
self._deauthorize(volume_path, user_id)
|
|
||||||
self.volume_client.delete_volume(volume_path)
|
|
||||||
self.volume_client.purge_volume(volume_path)
|
|
||||||
|
|
||||||
def __del__(self):
|
|
||||||
if self._volume_client:
|
|
||||||
self._volume_client.disconnect()
|
|
||||||
self._volume_client = None
|
|
||||||
|
|
||||||
def main():
|
|
||||||
create = True
|
|
||||||
share = ""
|
|
||||||
user = ""
|
|
||||||
cephfs = CephFSNativeDriver()
|
|
||||||
try:
|
|
||||||
opts, args = getopt.getopt(sys.argv[1:], "rn:u:", ["remove"])
|
|
||||||
except getopt.GetoptError:
|
|
||||||
print "Usage: " + sys.argv[0] + " --remove -n share_name -u ceph_user_id"
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
for opt, arg in opts:
|
|
||||||
if opt == '-n':
|
|
||||||
share = arg
|
|
||||||
elif opt == '-u':
|
|
||||||
user = arg
|
|
||||||
elif opt in ("-r", "--remove"):
|
|
||||||
create = False
|
|
||||||
|
|
||||||
if share == "" or user == "":
|
|
||||||
print "Usage: " + sys.argv[0] + " --remove -n share_name -u ceph_user_id"
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
if create == True:
|
|
||||||
print cephfs.create_share(share, user)
|
|
||||||
else:
|
|
||||||
cephfs.delete_share(share, user)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
main()
|
|
@ -19,12 +19,12 @@ package cephfs
|
|||||||
import (
|
import (
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
|
|
||||||
"github.com/container-storage-interface/spec/lib/go/csi"
|
"github.com/container-storage-interface/spec/lib/go/csi/v0"
|
||||||
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
"github.com/kubernetes-csi/drivers/pkg/csi-common"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
PluginFolder = "/var/lib/kubelet/plugins/cephfsplugin"
|
PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin"
|
||||||
)
|
)
|
||||||
|
|
||||||
type cephfsDriver struct {
|
type cephfsDriver struct {
|
||||||
@ -40,15 +40,9 @@ type cephfsDriver struct {
|
|||||||
|
|
||||||
var (
|
var (
|
||||||
driver *cephfsDriver
|
driver *cephfsDriver
|
||||||
version = csi.Version{
|
version = "0.2.0"
|
||||||
Minor: 2,
|
|
||||||
}
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetSupportedVersions() []*csi.Version {
|
|
||||||
return []*csi.Version{&version}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewCephFSDriver() *cephfsDriver {
|
func NewCephFSDriver() *cephfsDriver {
|
||||||
return &cephfsDriver{}
|
return &cephfsDriver{}
|
||||||
}
|
}
|
||||||
@ -72,11 +66,11 @@ func NewNodeServer(d *csicommon.CSIDriver) *nodeServer {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (fs *cephfsDriver) Run(driverName, nodeId, endpoint string) {
|
func (fs *cephfsDriver) Run(driverName, nodeId, endpoint string) {
|
||||||
glog.Infof("Driver: %v version: %v", driverName, GetVersionString(&version))
|
glog.Infof("Driver: %v version: %v", driverName, version)
|
||||||
|
|
||||||
// Initialize default library driver
|
// Initialize default library driver
|
||||||
|
|
||||||
fs.driver = csicommon.NewCSIDriver(driverName, &version, GetSupportedVersions(), nodeId)
|
fs.driver = csicommon.NewCSIDriver(driverName, version, nodeId)
|
||||||
if fs.driver == nil {
|
if fs.driver == nil {
|
||||||
glog.Fatalln("Failed to initialize CSI driver")
|
glog.Fatalln("Failed to initialize CSI driver")
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user