mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-11 14:29:29 +00:00
cephfs: CSI 0.3.0; NodeStageVolume/NodeUnstageVolume; refactoring
This commit is contained in:
parent
4331960ab3
commit
1c38412e39
@ -35,16 +35,6 @@ fuse_set_user_groups = false
|
||||
|
||||
const cephKeyring = `[client.{{.UserId}}]
|
||||
key = {{.Key}}
|
||||
caps mds = "allow rw path={{.RootPath}}"
|
||||
caps mon = "allow r"
|
||||
caps osd = "allow rw{{if .Pool}} pool={{.Pool}}{{end}}{{if .Namespace}} namespace={{.Namespace}}{{end}}"
|
||||
`
|
||||
|
||||
const cephFullCapsKeyring = `[client.{{.UserId}}]
|
||||
key = {{.Key}}
|
||||
caps mds = "allow"
|
||||
caps mon = "allow *"
|
||||
caps osd = "allow *"
|
||||
`
|
||||
|
||||
const cephSecret = `{{.Key}}`
|
||||
@ -57,10 +47,9 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
cephConfigTempl *template.Template
|
||||
cephKeyringTempl *template.Template
|
||||
cephFullCapsKeyringTempl *template.Template
|
||||
cephSecretTempl *template.Template
|
||||
cephConfigTempl *template.Template
|
||||
cephKeyringTempl *template.Template
|
||||
cephSecretTempl *template.Template
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -76,7 +65,6 @@ func init() {
|
||||
|
||||
cephConfigTempl = template.Must(template.New("config").Parse(cephConfig))
|
||||
cephKeyringTempl = template.Must(template.New("keyring").Funcs(fm).Parse(cephKeyring))
|
||||
cephFullCapsKeyringTempl = template.Must(template.New("keyringFullCaps").Parse(cephFullCapsKeyring))
|
||||
cephSecretTempl = template.Must(template.New("secret").Parse(cephSecret))
|
||||
}
|
||||
|
||||
@ -85,8 +73,8 @@ type cephConfigWriter interface {
|
||||
}
|
||||
|
||||
type cephConfigData struct {
|
||||
Monitors string
|
||||
VolumeUuid string
|
||||
Monitors string
|
||||
VolumeID volumeID
|
||||
}
|
||||
|
||||
func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, data interface{}) error {
|
||||
@ -108,46 +96,35 @@ func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, dat
|
||||
}
|
||||
|
||||
func (d *cephConfigData) writeToFile() error {
|
||||
return writeCephTemplate(fmt.Sprintf(cephConfigFileNameFmt, d.VolumeUuid), 0640, cephConfigTempl, d)
|
||||
return writeCephTemplate(fmt.Sprintf(cephConfigFileNameFmt, d.VolumeID), 0640, cephConfigTempl, d)
|
||||
}
|
||||
|
||||
type cephKeyringData struct {
|
||||
UserId, Key string
|
||||
RootPath string
|
||||
Pool, Namespace string
|
||||
VolumeUuid string
|
||||
UserId, Key string
|
||||
VolumeID volumeID
|
||||
}
|
||||
|
||||
func (d *cephKeyringData) writeToFile() error {
|
||||
return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeUuid, d.UserId), 0600, cephKeyringTempl, d)
|
||||
}
|
||||
|
||||
type cephFullCapsKeyringData struct {
|
||||
UserId, Key string
|
||||
VolumeUuid string
|
||||
}
|
||||
|
||||
func (d *cephFullCapsKeyringData) writeToFile() error {
|
||||
return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeUuid, d.UserId), 0600, cephFullCapsKeyringTempl, d)
|
||||
return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeID, d.UserId), 0600, cephKeyringTempl, d)
|
||||
}
|
||||
|
||||
type cephSecretData struct {
|
||||
UserId, Key string
|
||||
VolumeUuid string
|
||||
VolumeID volumeID
|
||||
}
|
||||
|
||||
func (d *cephSecretData) writeToFile() error {
|
||||
return writeCephTemplate(fmt.Sprintf(cephSecretFileNameFmt, d.VolumeUuid, d.UserId), 0600, cephSecretTempl, d)
|
||||
return writeCephTemplate(fmt.Sprintf(cephSecretFileNameFmt, d.VolumeID, d.UserId), 0600, cephSecretTempl, d)
|
||||
}
|
||||
|
||||
func getCephSecretPath(volUuid, userId string) string {
|
||||
return path.Join(cephConfigRoot, fmt.Sprintf(cephSecretFileNameFmt, volUuid, userId))
|
||||
func getCephSecretPath(volId volumeID, userId string) string {
|
||||
return path.Join(cephConfigRoot, fmt.Sprintf(cephSecretFileNameFmt, volId, userId))
|
||||
}
|
||||
|
||||
func getCephKeyringPath(volUuid, userId string) string {
|
||||
return path.Join(cephConfigRoot, fmt.Sprintf(cephKeyringFileNameFmt, volUuid, userId))
|
||||
func getCephKeyringPath(volId volumeID, userId string) string {
|
||||
return path.Join(cephConfigRoot, fmt.Sprintf(cephKeyringFileNameFmt, volId, userId))
|
||||
}
|
||||
|
||||
func getCephConfPath(volUuid string) string {
|
||||
return path.Join(cephConfigRoot, fmt.Sprintf(cephConfigFileNameFmt, volUuid))
|
||||
func getCephConfPath(volId volumeID) string {
|
||||
return path.Join(cephConfigRoot, fmt.Sprintf(cephConfigFileNameFmt, volId))
|
||||
}
|
||||
|
@ -22,7 +22,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
cephUserPrefix = "csi-user-"
|
||||
cephUserPrefix = "user-"
|
||||
cephEntityClientPrefix = "client."
|
||||
)
|
||||
|
||||
@ -38,8 +38,8 @@ type cephEntity struct {
|
||||
Caps cephEntityCaps `json:"caps"`
|
||||
}
|
||||
|
||||
func getCephUserName(volUuid string) string {
|
||||
return cephUserPrefix + volUuid
|
||||
func getCephUserName(volId volumeID) string {
|
||||
return cephUserPrefix + string(volId)
|
||||
}
|
||||
|
||||
func getCephUser(userId string) (*cephEntity, error) {
|
||||
@ -57,17 +57,17 @@ func getCephUser(userId string) (*cephEntity, error) {
|
||||
return &ents[0], nil
|
||||
}
|
||||
|
||||
func createCephUser(volOptions *volumeOptions, cr *credentials, volUuid string) (*cephEntity, error) {
|
||||
func createCephUser(volOptions *volumeOptions, cr *credentials, volId volumeID) (*cephEntity, error) {
|
||||
caps := cephEntityCaps{
|
||||
Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPath_ceph(volUuid)),
|
||||
Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPath_ceph(volId)),
|
||||
Mon: "allow r",
|
||||
Osd: fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volUuid)),
|
||||
Osd: fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volId)),
|
||||
}
|
||||
|
||||
var ents []cephEntity
|
||||
args := [...]string{
|
||||
"auth", "-f", "json", "-c", getCephConfPath(volUuid), "-n", cephEntityClientPrefix + cr.id,
|
||||
"get-or-create", cephEntityClientPrefix + getCephUserName(volUuid),
|
||||
"auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + cr.id,
|
||||
"get-or-create", cephEntityClientPrefix + getCephUserName(volId),
|
||||
"mds", caps.Mds,
|
||||
"mon", caps.Mon,
|
||||
"osd", caps.Osd,
|
||||
@ -80,11 +80,11 @@ func createCephUser(volOptions *volumeOptions, cr *credentials, volUuid string)
|
||||
return &ents[0], nil
|
||||
}
|
||||
|
||||
func deleteCephUser(cr *credentials, volUuid string) error {
|
||||
userId := getCephUserName(volUuid)
|
||||
func deleteCephUser(adminCr *credentials, volId volumeID) error {
|
||||
userId := getCephUserName(volId)
|
||||
|
||||
args := [...]string{
|
||||
"-c", getCephConfPath(volUuid), "-n", cephEntityClientPrefix + cr.id,
|
||||
"-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id,
|
||||
"auth", "rm", cephEntityClientPrefix + userId,
|
||||
}
|
||||
|
||||
@ -92,8 +92,8 @@ func deleteCephUser(cr *credentials, volUuid string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
os.Remove(getCephKeyringPath(volUuid, userId))
|
||||
os.Remove(getCephSecretPath(volUuid, userId))
|
||||
os.Remove(getCephKeyringPath(volId, userId))
|
||||
os.Remove(getCephSecretPath(volId, userId))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
128
pkg/cephfs/controllercache.go
Normal file
128
pkg/cephfs/controllercache.go
Normal file
@ -0,0 +1,128 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
controllerCacheRoot = PluginFolder + "/controller/plugin-cache"
|
||||
)
|
||||
|
||||
type controllerCacheEntry struct {
|
||||
VolOptions volumeOptions
|
||||
VolumeID volumeID
|
||||
}
|
||||
|
||||
type controllerCacheMap map[volumeID]*controllerCacheEntry
|
||||
|
||||
var (
|
||||
ctrCache = make(controllerCacheMap)
|
||||
ctrCacheMtx sync.Mutex
|
||||
)
|
||||
|
||||
// Load all .json files from controllerCacheRoot into ctrCache
|
||||
// Called from driver.go's Run()
|
||||
func loadControllerCache() error {
|
||||
cacheDir, err := ioutil.ReadDir(controllerCacheRoot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read controller cache from %s: %v", controllerCacheRoot, err)
|
||||
}
|
||||
|
||||
ctrCacheMtx.Lock()
|
||||
defer ctrCacheMtx.Unlock()
|
||||
|
||||
for _, fi := range cacheDir {
|
||||
if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() {
|
||||
continue
|
||||
}
|
||||
|
||||
f, err := os.Open(path.Join(controllerCacheRoot, fi.Name()))
|
||||
if err != nil {
|
||||
glog.Errorf("cephfs: cloudn't read '%s' from controller cache: %v", fi.Name(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
d := json.NewDecoder(f)
|
||||
ent := &controllerCacheEntry{}
|
||||
|
||||
if err = d.Decode(ent); err != nil {
|
||||
glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err)
|
||||
} else {
|
||||
ctrCache[ent.VolumeID] = ent
|
||||
}
|
||||
|
||||
f.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getControllerCacheEntryPath(volId volumeID) string {
|
||||
return path.Join(controllerCacheRoot, string(volId)+".json")
|
||||
}
|
||||
|
||||
func (m controllerCacheMap) insert(ent *controllerCacheEntry) error {
|
||||
filePath := getControllerCacheEntryPath(ent.VolumeID)
|
||||
|
||||
ctrCacheMtx.Lock()
|
||||
defer ctrCacheMtx.Unlock()
|
||||
|
||||
f, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't create cache entry file '%s': %v", filePath, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
enc := json.NewEncoder(f)
|
||||
if err = enc.Encode(ent); err != nil {
|
||||
return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.VolumeID, err)
|
||||
}
|
||||
|
||||
m[ent.VolumeID] = ent
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m controllerCacheMap) pop(volId volumeID) (*controllerCacheEntry, error) {
|
||||
ctrCacheMtx.Lock()
|
||||
defer ctrCacheMtx.Unlock()
|
||||
|
||||
ent, ok := m[volId]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("cache entry for volume %s does not exist", volId)
|
||||
}
|
||||
|
||||
filePath := getControllerCacheEntryPath(volId)
|
||||
|
||||
if err := os.Remove(filePath); err != nil {
|
||||
return nil, fmt.Errorf("failed to remove cache entry file '%s': %v", filePath, err)
|
||||
}
|
||||
|
||||
delete(m, volId)
|
||||
|
||||
return ent, nil
|
||||
}
|
@ -17,8 +17,6 @@ limitations under the License.
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/codes"
|
||||
@ -36,30 +34,6 @@ const (
|
||||
oneGB = 1073741824
|
||||
)
|
||||
|
||||
func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
|
||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
||||
return fmt.Errorf("invalid CreateVolumeRequest: %v", err)
|
||||
}
|
||||
|
||||
if req.GetName() == "" {
|
||||
return status.Error(codes.InvalidArgument, "Volume Name cannot be empty")
|
||||
}
|
||||
|
||||
if req.GetVolumeCapabilities() == nil {
|
||||
return status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error {
|
||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
||||
return fmt.Errorf("invalid DeleteVolumeRequest: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||
if err := cs.validateCreateVolumeRequest(req); err != nil {
|
||||
glog.Errorf("CreateVolumeRequest validation failed: %v", err)
|
||||
@ -74,41 +48,37 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
volId := newVolumeIdentifier(volOptions, req)
|
||||
volId := newVolumeID()
|
||||
|
||||
conf := cephConfigData{Monitors: volOptions.Monitors, VolumeUuid: volId.uuid}
|
||||
conf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volId}
|
||||
if err = conf.writeToFile(); err != nil {
|
||||
glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volId.uuid), err)
|
||||
glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volId), err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// Create a volume in case the user didn't provide one
|
||||
|
||||
if volOptions.ProvisionVolume {
|
||||
// Admin access is required
|
||||
// Admin credentials are required
|
||||
|
||||
cr, err := getAdminCredentials(req.GetControllerCreateSecrets())
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
if err = storeCephAdminCredentials(volId.uuid, cr); err != nil {
|
||||
if err = storeCephCredentials(volId, cr); err != nil {
|
||||
glog.Errorf("failed to store admin credentials for '%s': %v", cr.id, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = createVolume(volOptions, cr, volId.uuid, req.GetCapacityRange().GetRequiredBytes()); err != nil {
|
||||
glog.Errorf("failed to create volume %s: %v", volId.name, err)
|
||||
if err = createVolume(volOptions, cr, volId, req.GetCapacityRange().GetRequiredBytes()); err != nil {
|
||||
glog.Errorf("failed to create volume %s: %v", req.GetName(), err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
glog.V(4).Infof("cephfs: volume %s successfuly created", volId.id)
|
||||
glog.Infof("cephfs: successfuly created volume %s", volId)
|
||||
} else {
|
||||
glog.V(4).Infof("cephfs: volume %s is provisioned statically", volId.id)
|
||||
}
|
||||
|
||||
if err = volCache.insert(&volumeCacheEntry{Identifier: *volId, VolOptions: *volOptions}); err != nil {
|
||||
glog.Warningf("failed to store a volume cache entry: %v", err)
|
||||
glog.Infof("cephfs: volume %s is provisioned statically", volId)
|
||||
}
|
||||
|
||||
sz := req.GetCapacityRange().GetRequiredBytes()
|
||||
@ -116,9 +86,14 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
sz = oneGB
|
||||
}
|
||||
|
||||
if err = ctrCache.insert(&controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId}); err != nil {
|
||||
glog.Errorf("failed to store a cache entry for volume %s: %v", volId, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return &csi.CreateVolumeResponse{
|
||||
Volume: &csi.Volume{
|
||||
Id: volId.id,
|
||||
Id: string(volId),
|
||||
CapacityBytes: sz,
|
||||
Attributes: req.GetParameters(),
|
||||
},
|
||||
@ -132,27 +107,35 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
}
|
||||
|
||||
var (
|
||||
volId = req.GetVolumeId()
|
||||
volUuid = uuidFromVolumeId(volId)
|
||||
volId = volumeID(req.GetVolumeId())
|
||||
err error
|
||||
)
|
||||
|
||||
// Load volume info from cache
|
||||
|
||||
ent, found := volCache.get(volUuid)
|
||||
if !found {
|
||||
msg := fmt.Sprintf("failed to retrieve cache entry for volume %s", volId)
|
||||
glog.Error(msg)
|
||||
return nil, status.Error(codes.Internal, msg)
|
||||
ent, err := ctrCache.pop(volId)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if !ent.VolOptions.ProvisionVolume {
|
||||
// DeleteVolume() is forbidden for statically provisioned volumes!
|
||||
msg := fmt.Sprintf("volume %s is provisioned statically, aborting delete", volId)
|
||||
glog.Warningf(msg)
|
||||
|
||||
glog.Warningf("volume %s is provisioned statically, aborting delete", volId)
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// Requires admin credentials
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// Reinsert cache entry for retry
|
||||
if insErr := ctrCache.insert(ent); insErr != nil {
|
||||
glog.Errorf("failed to reinsert volume cache entry in rollback procedure for volume %s: %v", volId, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Deleting a volume requires admin credentials
|
||||
|
||||
cr, err := getAdminCredentials(req.GetControllerDeleteSecrets())
|
||||
if err != nil {
|
||||
@ -160,24 +143,12 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
// Remove the volume, the user and the volume cache entry
|
||||
|
||||
if err = purgeVolume(volId, cr, &ent.VolOptions); err != nil {
|
||||
glog.Errorf("failed to delete volume %s: %v", volId, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = deleteCephUser(cr, volUuid); err != nil {
|
||||
glog.Errorf("failed to delete ceph user %s: %v", getCephUserName(volUuid), err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if err = volCache.erase(volUuid); err != nil {
|
||||
glog.Errorf("failed to delete cache entry for volume %s: %v", volId, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
glog.V(4).Infof("cephfs: volume %s successfuly deleted", volId)
|
||||
glog.Infof("cephfs: successfuly deleted volume %s", volId)
|
||||
|
||||
return &csi.DeleteVolumeResponse{}, nil
|
||||
}
|
||||
|
@ -27,7 +27,7 @@ import (
|
||||
|
||||
const (
|
||||
PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin"
|
||||
Version = "0.2.0"
|
||||
Version = "0.3.0"
|
||||
)
|
||||
|
||||
type cephfsDriver struct {
|
||||
@ -81,12 +81,12 @@ func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string)
|
||||
|
||||
// Configuration
|
||||
|
||||
if err := os.MkdirAll(volumeCacheRoot, 0755); err != nil {
|
||||
glog.Fatalf("cephfs: failed to create %s: %v", volumeCacheRoot, err)
|
||||
if err := os.MkdirAll(controllerCacheRoot, 0755); err != nil {
|
||||
glog.Fatalf("cephfs: failed to create %s: %v", controllerCacheRoot, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := loadVolumeCache(); err != nil {
|
||||
if err := loadControllerCache(); err != nil {
|
||||
glog.Errorf("cephfs: failed to read volume cache: %v", err)
|
||||
}
|
||||
|
||||
|
55
pkg/cephfs/nodecache.go
Normal file
55
pkg/cephfs/nodecache.go
Normal file
@ -0,0 +1,55 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type nodeCacheEntry struct {
|
||||
volOptions *volumeOptions
|
||||
cephAdminID string
|
||||
}
|
||||
|
||||
type nodeCacheMap map[volumeID]*nodeCacheEntry
|
||||
|
||||
var (
|
||||
nodeCache = make(nodeCacheMap)
|
||||
nodeCacheMtx sync.Mutex
|
||||
)
|
||||
|
||||
func (m nodeCacheMap) insert(volId volumeID, ent *nodeCacheEntry) {
|
||||
nodeCacheMtx.Lock()
|
||||
defer nodeCacheMtx.Unlock()
|
||||
|
||||
m[volId] = ent
|
||||
}
|
||||
|
||||
func (m nodeCacheMap) pop(volId volumeID) (*nodeCacheEntry, error) {
|
||||
nodeCacheMtx.Lock()
|
||||
defer nodeCacheMtx.Unlock()
|
||||
|
||||
ent, ok := m[volId]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("node cache entry for volume %s not found", volId)
|
||||
}
|
||||
|
||||
delete(m, volId)
|
||||
|
||||
return ent, nil
|
||||
}
|
@ -32,38 +32,10 @@ type nodeServer struct {
|
||||
*csicommon.DefaultNodeServer
|
||||
}
|
||||
|
||||
func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
|
||||
if req.GetVolumeCapability() == nil {
|
||||
return status.Error(codes.InvalidArgument, "Volume capability missing in request")
|
||||
}
|
||||
|
||||
if req.GetVolumeId() == "" {
|
||||
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetTargetPath() == "" {
|
||||
return status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error {
|
||||
if req.GetVolumeId() == "" {
|
||||
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetTargetPath() == "" {
|
||||
return status.Error(codes.InvalidArgument, "Target path missing in request")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func handleUser(volOptions *volumeOptions, volUuid string, req *csi.NodePublishVolumeRequest) (*credentials, error) {
|
||||
func getOrCreateUser(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) {
|
||||
var (
|
||||
cr = &credentials{}
|
||||
err error
|
||||
userCr = &credentials{}
|
||||
err error
|
||||
)
|
||||
|
||||
// Retrieve the credentials (possibly create a new user as well)
|
||||
@ -73,71 +45,123 @@ func handleUser(volOptions *volumeOptions, volUuid string, req *csi.NodePublishV
|
||||
|
||||
// First, store admin credentials - those are needed for creating a user
|
||||
|
||||
adminCr, err := getAdminCredentials(req.GetNodePublishSecrets())
|
||||
adminCr, err := getAdminCredentials(req.GetNodeStageSecrets())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = storeCephAdminCredentials(volUuid, adminCr); err != nil {
|
||||
if err = storeCephCredentials(volId, adminCr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions, cephAdminID: adminCr.id})
|
||||
|
||||
// Then create the user
|
||||
|
||||
if ent, err := createCephUser(volOptions, adminCr, volUuid); err != nil {
|
||||
if ent, err := createCephUser(volOptions, adminCr, volId); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
cr.id = ent.Entity[len(cephEntityClientPrefix):]
|
||||
cr.key = ent.Key
|
||||
userCr.id = ent.Entity[len(cephEntityClientPrefix):]
|
||||
userCr.key = ent.Key
|
||||
}
|
||||
|
||||
// Set the correct volume root path
|
||||
volOptions.RootPath = getVolumeRootPath_ceph(volUuid)
|
||||
volOptions.RootPath = getVolumeRootPath_ceph(volId)
|
||||
} else {
|
||||
// The volume is pre-made, credentials are supplied by the user
|
||||
|
||||
cr, err = getUserCredentials(req.GetNodePublishSecrets())
|
||||
|
||||
userCr, err = getUserCredentials(req.GetNodeStageSecrets())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions})
|
||||
}
|
||||
|
||||
if err = storeCephUserCredentials(volUuid, cr, volOptions); err != nil {
|
||||
if err = storeCephCredentials(volId, userCr); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cr, nil
|
||||
return userCr, nil
|
||||
}
|
||||
|
||||
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
|
||||
if err := validateNodeStageVolumeRequest(req); err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
// Configuration
|
||||
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
volId := volumeID(req.GetVolumeId())
|
||||
|
||||
volOptions, err := newVolumeOptions(req.GetVolumeAttributes())
|
||||
if err != nil {
|
||||
glog.Errorf("error reading volume options for volume %s: %v", volId, err)
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
if err = createMountPoint(stagingTargetPath); err != nil {
|
||||
glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
cephConf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volId}
|
||||
if err = cephConf.writeToFile(); err != nil {
|
||||
glog.Errorf("failed to write ceph config file to %s for volume %s: %v", getCephConfPath(volId), volId, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// Check if the volume is already mounted
|
||||
|
||||
isMnt, err := isMountPoint(stagingTargetPath)
|
||||
|
||||
if err != nil {
|
||||
glog.Errorf("stat failed: %v", err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if isMnt {
|
||||
glog.Infof("cephfs: volume %s is already mounted to %s, skipping", volId, stagingTargetPath)
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// It's not, mount now
|
||||
|
||||
cr, err := getOrCreateUser(volOptions, volId, req)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
m := newMounter(volOptions)
|
||||
glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name())
|
||||
|
||||
if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil {
|
||||
glog.Errorf("failed to mount volume %s: %v", volId, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
glog.Infof("cephfs: successfully mounted volume %s to %s", volId, stagingTargetPath)
|
||||
|
||||
return &csi.NodeStageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
|
||||
if err := validateNodePublishVolumeRequest(req); err != nil {
|
||||
return nil, err
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
// Configuration
|
||||
|
||||
targetPath := req.GetTargetPath()
|
||||
volId := req.GetVolumeId()
|
||||
volUuid := uuidFromVolumeId(volId)
|
||||
|
||||
volOptions, err := newVolumeOptions(req.GetVolumeAttributes())
|
||||
if err != nil {
|
||||
glog.Errorf("error reading volume options: %v", err)
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
if err = createMountPoint(targetPath); err != nil {
|
||||
if err := createMountPoint(targetPath); err != nil {
|
||||
glog.Errorf("failed to create mount point at %s: %v", targetPath, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
conf := cephConfigData{Monitors: volOptions.Monitors, VolumeUuid: volUuid}
|
||||
if err = conf.writeToFile(); err != nil {
|
||||
glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volUuid), err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// Check if the volume is already mounted
|
||||
|
||||
isMnt, err := isMountPoint(targetPath)
|
||||
@ -148,36 +172,27 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
|
||||
}
|
||||
|
||||
if isMnt {
|
||||
glog.V(4).Infof("cephfs: volume %s is already mounted to %s", volId, targetPath)
|
||||
glog.Infof("cephfs: volume %s is already bind-mounted to %s", volId, targetPath)
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
// It's not, mount now
|
||||
|
||||
cr, err := handleUser(volOptions, volUuid, req)
|
||||
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
if err = bindMount(req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly()); err != nil {
|
||||
glog.Errorf("failed to bind-mount volume %s: %v", volId, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
m := newMounter(volOptions)
|
||||
if err = m.mount(targetPath, cr, volOptions, volUuid, req.GetReadonly()); err != nil {
|
||||
glog.Errorf("failed to mount volume %s: %v", volId, err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
glog.V(4).Infof("cephfs: volume %s successfuly mounted to %s", volId, targetPath)
|
||||
glog.Infof("cephfs: successfuly bind-mounted volume %s to %s", volId, targetPath)
|
||||
|
||||
return &csi.NodePublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
|
||||
if err := validateNodeUnpublishVolumeRequest(req); err != nil {
|
||||
return nil, err
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
volId := req.GetVolumeId()
|
||||
targetPath := req.GetTargetPath()
|
||||
|
||||
// Unmount the bind-mount
|
||||
@ -185,30 +200,63 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
localVolRoot := getVolumeRootPath_local(uuidFromVolumeId(volId))
|
||||
os.Remove(targetPath)
|
||||
|
||||
// Unmount the volume root
|
||||
if err := unmountVolume(localVolRoot); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
os.Remove(localVolRoot)
|
||||
|
||||
glog.V(4).Infof("cephfs: volume %s successfuly unmounted from %s", volId, req.GetTargetPath())
|
||||
glog.Infof("cephfs: successfuly unbinded volume %s from %s", req.GetVolumeId(), targetPath)
|
||||
|
||||
return &csi.NodeUnpublishVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func (ns *nodeServer) NodeStageVolume(
|
||||
ctx context.Context,
|
||||
req *csi.NodeStageVolumeRequest) (
|
||||
*csi.NodeStageVolumeResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "")
|
||||
func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
|
||||
if err := validateNodeUnstageVolumeRequest(req); err != nil {
|
||||
return nil, status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
volId := volumeID(req.GetVolumeId())
|
||||
stagingTargetPath := req.GetStagingTargetPath()
|
||||
|
||||
// Unmount the volume
|
||||
if err := unmountVolume(stagingTargetPath); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
os.Remove(stagingTargetPath)
|
||||
|
||||
ent, err := nodeCache.pop(volId)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
if ent.volOptions.ProvisionVolume {
|
||||
// We've created a dedicated Ceph user in NodeStageVolume,
|
||||
// it's about to be deleted here.
|
||||
|
||||
if err = deleteCephUser(&credentials{id: ent.cephAdminID}, volId); err != nil {
|
||||
glog.Errorf("failed to delete ceph user %s for volume %s: %v", getCephUserName(volId), volId, err)
|
||||
|
||||
// Reinsert cache entry for retry
|
||||
nodeCache.insert(volId, ent)
|
||||
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
glog.Infof("cephfs: successfuly umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath)
|
||||
|
||||
return &csi.NodeUnstageVolumeResponse{}, nil
|
||||
}
|
||||
|
||||
func (ns *nodeServer) NodeUnstageVolume(
|
||||
ctx context.Context,
|
||||
req *csi.NodeUnstageVolumeRequest) (
|
||||
*csi.NodeUnstageVolumeResponse, error) {
|
||||
return nil, status.Error(codes.Unimplemented, "")
|
||||
func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
|
||||
return &csi.NodeGetCapabilitiesResponse{
|
||||
Capabilities: []*csi.NodeServiceCapability{
|
||||
{
|
||||
Type: &csi.NodeServiceCapability_Rpc{
|
||||
Rpc: &csi.NodeServiceCapability_RPC{
|
||||
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
@ -26,17 +26,25 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"k8s.io/kubernetes/pkg/util/keymutex"
|
||||
"github.com/container-storage-interface/spec/lib/go/csi/v0"
|
||||
"github.com/pborman/uuid"
|
||||
"k8s.io/kubernetes/pkg/util/mount"
|
||||
)
|
||||
|
||||
type volumeID string
|
||||
|
||||
func newVolumeID() volumeID {
|
||||
return volumeID("csi-cephfs-" + uuid.NewUUID().String())
|
||||
}
|
||||
|
||||
func execCommand(command string, args ...string) ([]byte, error) {
|
||||
glog.V(4).Infof("cephfs: EXEC %s %s", command, args)
|
||||
|
||||
cmd := exec.Command(command, args...)
|
||||
return cmd.CombinedOutput()
|
||||
}
|
||||
|
||||
func execCommandAndValidate(program string, args ...string) error {
|
||||
glog.V(4).Infof("cephfs: executing command: %s with args: %s", program, args)
|
||||
out, err := execCommand(program, args...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out)
|
||||
@ -65,49 +73,21 @@ func isMountPoint(p string) (bool, error) {
|
||||
return !notMnt, nil
|
||||
}
|
||||
|
||||
func tryLock(id string, mtx keymutex.KeyMutex, name string) error {
|
||||
// TODO uncomment this once TryLockKey gets into Kubernetes
|
||||
/*
|
||||
if !mtx.TryLockKey(id) {
|
||||
msg := fmt.Sprintf("%s has a pending operation on %s", name, req.GetVolumeId())
|
||||
glog.Infoln(msg)
|
||||
|
||||
return status.Error(codes.Aborted, msg)
|
||||
}
|
||||
*/
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func storeCephUserCredentials(volUuid string, cr *credentials, volOptions *volumeOptions) error {
|
||||
func storeCephCredentials(volId volumeID, cr *credentials) error {
|
||||
keyringData := cephKeyringData{
|
||||
UserId: cr.id,
|
||||
Key: cr.key,
|
||||
RootPath: volOptions.RootPath,
|
||||
VolumeUuid: volUuid,
|
||||
UserId: cr.id,
|
||||
Key: cr.key,
|
||||
VolumeID: volId,
|
||||
}
|
||||
|
||||
if volOptions.ProvisionVolume {
|
||||
keyringData.Pool = volOptions.Pool
|
||||
keyringData.Namespace = getVolumeNamespace(volUuid)
|
||||
}
|
||||
|
||||
return storeCephCredentials(volUuid, cr, &keyringData)
|
||||
}
|
||||
|
||||
func storeCephAdminCredentials(volUuid string, cr *credentials) error {
|
||||
return storeCephCredentials(volUuid, cr, &cephFullCapsKeyringData{UserId: cr.id, Key: cr.key, VolumeUuid: volUuid})
|
||||
}
|
||||
|
||||
func storeCephCredentials(volUuid string, cr *credentials, keyringData cephConfigWriter) error {
|
||||
if err := keyringData.writeToFile(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
secret := cephSecretData{
|
||||
UserId: cr.id,
|
||||
Key: cr.key,
|
||||
VolumeUuid: volUuid,
|
||||
UserId: cr.id,
|
||||
Key: cr.key,
|
||||
VolumeID: volId,
|
||||
}
|
||||
|
||||
if err := secret.writeToFile(); err != nil {
|
||||
@ -124,8 +104,6 @@ func newMounter(volOptions *volumeOptions) volumeMounter {
|
||||
mounter = DefaultVolumeMounter
|
||||
}
|
||||
|
||||
glog.V(4).Infof("cephfs: setting volume mounter to: %s", mounter)
|
||||
|
||||
switch mounter {
|
||||
case volumeMounter_fuse:
|
||||
return &fuseMounter{}
|
||||
@ -135,3 +113,95 @@ func newMounter(volOptions *volumeOptions) volumeMounter {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
// Controller service request validation
|
||||
//
|
||||
|
||||
func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
|
||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
||||
return fmt.Errorf("invalid CreateVolumeRequest: %v", err)
|
||||
}
|
||||
|
||||
if req.GetName() == "" {
|
||||
return status.Error(codes.InvalidArgument, "Volume Name cannot be empty")
|
||||
}
|
||||
|
||||
if req.GetVolumeCapabilities() == nil {
|
||||
return status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error {
|
||||
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
|
||||
return fmt.Errorf("invalid DeleteVolumeRequest: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
//
|
||||
// Node service request validation
|
||||
//
|
||||
|
||||
func validateNodeStageVolumeRequest(req *csi.NodeStageVolumeRequest) error {
|
||||
if req.GetVolumeCapability() == nil {
|
||||
return fmt.Errorf("volume capability missing in request")
|
||||
}
|
||||
|
||||
if req.GetVolumeId() == "" {
|
||||
return fmt.Errorf("volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetStagingTargetPath() == "" {
|
||||
return fmt.Errorf("staging target path missing in request")
|
||||
}
|
||||
|
||||
if req.GetNodeStageSecrets() == nil || len(req.GetNodeStageSecrets()) == 0 {
|
||||
return fmt.Errorf("stage secrets cannot be nil or empty")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateNodeUnstageVolumeRequest(req *csi.NodeUnstageVolumeRequest) error {
|
||||
if req.GetVolumeId() == "" {
|
||||
return fmt.Errorf("volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetStagingTargetPath() == "" {
|
||||
return fmt.Errorf("staging target path missing in request")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
|
||||
if req.GetVolumeCapability() == nil {
|
||||
return fmt.Errorf("volume capability missing in request")
|
||||
}
|
||||
|
||||
if req.GetVolumeId() == "" {
|
||||
return fmt.Errorf("volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetTargetPath() == "" {
|
||||
return fmt.Errorf("varget path missing in request")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error {
|
||||
if req.GetVolumeId() == "" {
|
||||
return fmt.Errorf("volume ID missing in request")
|
||||
}
|
||||
|
||||
if req.GetTargetPath() == "" {
|
||||
return fmt.Errorf("target path missing in request")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -22,44 +22,35 @@ import (
|
||||
"path"
|
||||
)
|
||||
|
||||
// Volumes are mounted in .../controller/volumes/vol-{UUID}
|
||||
// The actual user data resides in .../vol-{UUID}/volume-data
|
||||
// purgeVolume moves the user data to .../vol-{UUID}/volume-deleting and only then calls os.RemoveAll
|
||||
|
||||
const (
|
||||
cephRootPrefix = PluginFolder + "/controller/volumes/root-"
|
||||
cephVolumePrefix = PluginFolder + "/controller/volumes/vol-"
|
||||
cephVolumesRoot = "csi-volumes"
|
||||
cephRootPrefix = PluginFolder + "/controller/volumes/root-"
|
||||
cephVolumesRoot = "csi-volumes"
|
||||
|
||||
namespacePrefix = "csi-ns-"
|
||||
namespacePrefix = "ns-"
|
||||
)
|
||||
|
||||
func getCephRootPath_local(volUuid string) string {
|
||||
return cephRootPrefix + volUuid
|
||||
func getCephRootPath_local(volId volumeID) string {
|
||||
return cephRootPrefix + string(volId)
|
||||
}
|
||||
|
||||
func getCephRootVolumePath_local(volUuid string) string {
|
||||
return path.Join(getCephRootPath_local(volUuid), cephVolumesRoot, volUuid)
|
||||
func getCephRootVolumePath_local(volId volumeID) string {
|
||||
return path.Join(getCephRootPath_local(volId), cephVolumesRoot, string(volId))
|
||||
}
|
||||
|
||||
func getVolumeRootPath_local(volUuid string) string {
|
||||
return cephVolumePrefix + volUuid
|
||||
func getVolumeRootPath_ceph(volId volumeID) string {
|
||||
return path.Join("/", cephVolumesRoot, string(volId))
|
||||
}
|
||||
|
||||
func getVolumeRootPath_ceph(volUuid string) string {
|
||||
return path.Join("/", cephVolumesRoot, volUuid)
|
||||
}
|
||||
|
||||
func getVolumeNamespace(volUuid string) string {
|
||||
return namespacePrefix + volUuid
|
||||
func getVolumeNamespace(volId volumeID) string {
|
||||
return namespacePrefix + string(volId)
|
||||
}
|
||||
|
||||
func setVolumeAttribute(root, attrName, attrValue string) error {
|
||||
return execCommandAndValidate("setfattr", "-n", attrName, "-v", attrValue, root)
|
||||
}
|
||||
|
||||
func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid string, bytesQuota int64) error {
|
||||
cephRoot := getCephRootPath_local(volUuid)
|
||||
func createVolume(volOptions *volumeOptions, adminCr *credentials, volId volumeID, bytesQuota int64) error {
|
||||
cephRoot := getCephRootPath_local(volId)
|
||||
|
||||
if err := createMountPoint(cephRoot); err != nil {
|
||||
return err
|
||||
@ -69,7 +60,7 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin
|
||||
// Access to cephfs's / is required
|
||||
volOptions.RootPath = "/"
|
||||
|
||||
if err := mountKernel(cephRoot, adminCr, volOptions, volUuid); err != nil {
|
||||
if err := mountKernel(cephRoot, adminCr, volOptions, volId); err != nil {
|
||||
return fmt.Errorf("error mounting ceph root: %v", err)
|
||||
}
|
||||
|
||||
@ -78,8 +69,8 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin
|
||||
os.Remove(cephRoot)
|
||||
}()
|
||||
|
||||
volOptions.RootPath = getVolumeRootPath_ceph(volUuid)
|
||||
localVolRoot := getCephRootVolumePath_local(volUuid)
|
||||
volOptions.RootPath = getVolumeRootPath_ceph(volId)
|
||||
localVolRoot := getCephRootVolumePath_local(volId)
|
||||
|
||||
if err := createMountPoint(localVolRoot); err != nil {
|
||||
return err
|
||||
@ -93,21 +84,20 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin
|
||||
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
|
||||
}
|
||||
|
||||
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volUuid)); err != nil {
|
||||
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volId)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func purgeVolume(volId string, cr *credentials, volOptions *volumeOptions) error {
|
||||
func purgeVolume(volId volumeID, cr *credentials, volOptions *volumeOptions) error {
|
||||
// Root path is not set for dynamically provisioned volumes
|
||||
volOptions.RootPath = "/"
|
||||
|
||||
var (
|
||||
volUuid = uuidFromVolumeId(volId)
|
||||
root = getCephRootPath_local(volUuid)
|
||||
volRoot = getCephRootVolumePath_local(volUuid)
|
||||
root = getCephRootPath_local(volId)
|
||||
volRoot = getCephRootVolumePath_local(volId)
|
||||
volRootDeleting = volRoot + "-deleting"
|
||||
)
|
||||
|
||||
@ -115,7 +105,7 @@ func purgeVolume(volId string, cr *credentials, volOptions *volumeOptions) error
|
||||
return err
|
||||
}
|
||||
|
||||
if err := mountKernel(root, cr, volOptions, volUuid); err != nil {
|
||||
if err := mountKernel(root, cr, volOptions, volId); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1,131 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
volumeCacheRoot = PluginFolder + "/controller/volume-cache"
|
||||
)
|
||||
|
||||
type volumeCacheEntry struct {
|
||||
VolOptions volumeOptions
|
||||
Identifier volumeIdentifier
|
||||
}
|
||||
|
||||
type volumeCache struct {
|
||||
entries map[string]*volumeCacheEntry
|
||||
}
|
||||
|
||||
var (
|
||||
volCache volumeCache
|
||||
volCacheMtx sync.RWMutex
|
||||
)
|
||||
|
||||
// Loads all .json files from volumeCacheRoot into volCache
|
||||
// Called from driver.go's Run()
|
||||
func loadVolumeCache() error {
|
||||
cacheDir, err := ioutil.ReadDir(volumeCacheRoot)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot read volume cache: %v", err)
|
||||
}
|
||||
|
||||
volCacheMtx.Lock()
|
||||
defer volCacheMtx.Unlock()
|
||||
|
||||
volCache.entries = make(map[string]*volumeCacheEntry)
|
||||
|
||||
for _, fi := range cacheDir {
|
||||
if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() {
|
||||
continue
|
||||
}
|
||||
|
||||
f, err := os.Open(path.Join(volumeCacheRoot, fi.Name()))
|
||||
if err != nil {
|
||||
glog.Errorf("cephfs: couldn't read '%s' from volume cache: %v", fi.Name(), err)
|
||||
continue
|
||||
}
|
||||
|
||||
d := json.NewDecoder(f)
|
||||
ent := &volumeCacheEntry{}
|
||||
|
||||
if err = d.Decode(ent); err != nil {
|
||||
glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err)
|
||||
} else {
|
||||
volCache.entries[ent.Identifier.uuid] = ent
|
||||
}
|
||||
|
||||
f.Close()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getVolumeCacheEntryPath(volUuid string) string {
|
||||
return path.Join(volumeCacheRoot, fmt.Sprintf("vol-%s.json", volUuid))
|
||||
}
|
||||
|
||||
func (vc *volumeCache) insert(ent *volumeCacheEntry) error {
|
||||
filePath := getVolumeCacheEntryPath(ent.Identifier.uuid)
|
||||
|
||||
volCacheMtx.Lock()
|
||||
defer volCacheMtx.Unlock()
|
||||
|
||||
f, err := os.Create(filePath)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't create cache entry file %s: %v", filePath, err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
e := json.NewEncoder(f)
|
||||
if err = e.Encode(ent); err != nil {
|
||||
return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.Identifier.id, err)
|
||||
}
|
||||
|
||||
vc.entries[ent.Identifier.uuid] = ent
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vc *volumeCache) erase(volUuid string) error {
|
||||
volCacheMtx.Lock()
|
||||
delete(vc.entries, volUuid)
|
||||
volCacheMtx.Unlock()
|
||||
|
||||
return os.Remove(getVolumeCacheEntryPath(volUuid))
|
||||
}
|
||||
|
||||
func (vc *volumeCache) get(volUuid string) (volumeCacheEntry, bool) {
|
||||
volCacheMtx.RLock()
|
||||
defer volCacheMtx.RUnlock()
|
||||
|
||||
if ent, ok := vc.entries[volUuid]; ok {
|
||||
return *ent, true
|
||||
} else {
|
||||
return volumeCacheEntry{}, false
|
||||
}
|
||||
}
|
@ -1,60 +0,0 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cephfs
|
||||
|
||||
import (
|
||||
"github.com/container-storage-interface/spec/lib/go/csi/v0"
|
||||
"github.com/pborman/uuid"
|
||||
)
|
||||
|
||||
const (
|
||||
dynamicallyProvisionedVolumePrefix = "csi-cephfs-dyn-"
|
||||
staticallyProvisionedVolumePrefix = "csi-cephfs-sta-"
|
||||
volumePrefixLen = len(dynamicallyProvisionedVolumePrefix)
|
||||
)
|
||||
|
||||
type volumeIdentifier struct {
|
||||
name, uuid, id string
|
||||
}
|
||||
|
||||
func newVolumeIdentifier(volOptions *volumeOptions, req *csi.CreateVolumeRequest) *volumeIdentifier {
|
||||
volId := volumeIdentifier{
|
||||
name: req.GetName(),
|
||||
uuid: uuid.NewUUID().String(),
|
||||
}
|
||||
|
||||
prefix := staticallyProvisionedVolumePrefix
|
||||
if volOptions.ProvisionVolume {
|
||||
prefix = dynamicallyProvisionedVolumePrefix
|
||||
}
|
||||
|
||||
volId.id = prefix + volId.uuid
|
||||
|
||||
return &volId
|
||||
}
|
||||
|
||||
func uuidFromVolumeId(volId string) string {
|
||||
return volId[volumePrefixLen:]
|
||||
}
|
||||
|
||||
func isDynProvision(volId string) bool {
|
||||
if len(volId) <= volumePrefixLen {
|
||||
return false
|
||||
}
|
||||
|
||||
return volId[:volumePrefixLen] == dynamicallyProvisionedVolumePrefix
|
||||
}
|
@ -28,17 +28,18 @@ const (
|
||||
)
|
||||
|
||||
type volumeMounter interface {
|
||||
mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error
|
||||
mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error
|
||||
name() string
|
||||
}
|
||||
|
||||
type fuseMounter struct{}
|
||||
|
||||
func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string) error {
|
||||
func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error {
|
||||
args := [...]string{
|
||||
mountPoint,
|
||||
"-c", getCephConfPath(volUuid),
|
||||
"-c", getCephConfPath(volId),
|
||||
"-n", cephEntityClientPrefix + cr.id,
|
||||
"--keyring", getCephKeyringPath(volUuid, cr.id),
|
||||
"--keyring", getCephKeyringPath(volId, cr.id),
|
||||
"-r", volOptions.RootPath,
|
||||
"-o", "nonempty",
|
||||
}
|
||||
@ -55,27 +56,19 @@ func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, vo
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *fuseMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error {
|
||||
func (m *fuseMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error {
|
||||
if err := createMountPoint(mountPoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
localVolRoot := getVolumeRootPath_local(volUuid)
|
||||
|
||||
if err := createMountPoint(localVolRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := mountFuse(localVolRoot, cr, volOptions, volUuid); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bindVolume(volUuid, mountPoint, readOnly)
|
||||
return mountFuse(mountPoint, cr, volOptions, volId)
|
||||
}
|
||||
|
||||
func (m *fuseMounter) name() string { return "Ceph FUSE driver" }
|
||||
|
||||
type kernelMounter struct{}
|
||||
|
||||
func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string) error {
|
||||
func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error {
|
||||
if err := execCommandAndValidate("modprobe", "ceph"); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -85,52 +78,34 @@ func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions,
|
||||
fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath),
|
||||
mountPoint,
|
||||
"-o",
|
||||
fmt.Sprintf("name=%s,secretfile=%s", cr.id, getCephSecretPath(volUuid, cr.id)),
|
||||
fmt.Sprintf("name=%s,secretfile=%s", cr.id, getCephSecretPath(volId, cr.id)),
|
||||
)
|
||||
}
|
||||
|
||||
func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error {
|
||||
func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error {
|
||||
if err := createMountPoint(mountPoint); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
localVolRoot := getVolumeRootPath_local(volUuid)
|
||||
|
||||
if err := createMountPoint(localVolRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := mountKernel(localVolRoot, cr, volOptions, volUuid); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bindVolume(volUuid, mountPoint, readOnly)
|
||||
return mountKernel(mountPoint, cr, volOptions, volId)
|
||||
}
|
||||
|
||||
func (m *kernelMounter) name() string { return "Ceph kernel client" }
|
||||
|
||||
func bindMount(from, to string, readOnly bool) error {
|
||||
if err := execCommandAndValidate("mount", "--bind", from, to); err != nil {
|
||||
return fmt.Errorf("failed bind-mount of %s to %s: %v", from, to, err)
|
||||
return fmt.Errorf("failed to bind-mount %s to %s: %v", from, to, err)
|
||||
}
|
||||
|
||||
if readOnly {
|
||||
if err := execCommandAndValidate("mount", "-o", "remount,ro,bind", to); err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed read-only remount of %s: %v", to, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func bindVolume(volUuid, target string, readOnly bool) error {
|
||||
volDataRoot := getVolumeRootPath_local(volUuid)
|
||||
|
||||
if err := createMountPoint(volDataRoot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return bindMount(volDataRoot, target, readOnly)
|
||||
}
|
||||
|
||||
func unmountVolume(mountPoint string) error {
|
||||
return execCommandAndValidate("umount", mountPoint)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user