Merge pull request #62 from gman0/v0.3.0-cephfs

[CSI 0.3.0 3/4] csi-cephfs 0.3.0
This commit is contained in:
Huamin Chen 2018-08-08 10:03:16 -04:00 committed by GitHub
commit 4fcd5eb651
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 542 additions and 516 deletions

View File

@ -7,9 +7,12 @@ parameters:
# Comma separated list of Ceph monitors # Comma separated list of Ceph monitors
monitors: mon1:port,mon2:port,... monitors: mon1:port,mon2:port,...
# If set to true, a new volume will be created along with a RADOS user - this requires admin access. # For provisionVolume: "true":
# If set to false, it is assumed the volume already exists and the user is expected to provide # A new volume will be created along with a new Ceph user.
# a rootPath to a cephfs volume and user credentials. # Requires admin credentials (adminID, adminKey).
# For provisionVolume: "false":
# It is assumed the volume already exists and the user is expected
# to provide path to that volume (rootPath) and user credentials (userID, userKey).
provisionVolume: "true" provisionVolume: "true"
# Ceph pool into which the volume shall be created # Ceph pool into which the volume shall be created
@ -27,7 +30,7 @@ parameters:
csiNodeStageSecretNamespace: default csiNodeStageSecretNamespace: default
# (optional) The driver can use either ceph-fuse (fuse) or ceph kernel client (kernel) # (optional) The driver can use either ceph-fuse (fuse) or ceph kernel client (kernel)
# If left out, default volume mounter will be used - this is determined by probing for ceph-fuse # If omitted, default volume mounter will be used - this is determined by probing for ceph-fuse
# or by setting the default mounter explicitly via --volumemounter command-line argument. # or by setting the default mounter explicitly via --volumemounter command-line argument.
# mounter: kernel # mounter: kernel
reclaimPolicy: Delete reclaimPolicy: Delete

View File

@ -35,16 +35,6 @@ fuse_set_user_groups = false
const cephKeyring = `[client.{{.UserId}}] const cephKeyring = `[client.{{.UserId}}]
key = {{.Key}} key = {{.Key}}
caps mds = "allow rw path={{.RootPath}}"
caps mon = "allow r"
caps osd = "allow rw{{if .Pool}} pool={{.Pool}}{{end}}{{if .Namespace}} namespace={{.Namespace}}{{end}}"
`
const cephFullCapsKeyring = `[client.{{.UserId}}]
key = {{.Key}}
caps mds = "allow"
caps mon = "allow *"
caps osd = "allow *"
` `
const cephSecret = `{{.Key}}` const cephSecret = `{{.Key}}`
@ -57,10 +47,9 @@ const (
) )
var ( var (
cephConfigTempl *template.Template cephConfigTempl *template.Template
cephKeyringTempl *template.Template cephKeyringTempl *template.Template
cephFullCapsKeyringTempl *template.Template cephSecretTempl *template.Template
cephSecretTempl *template.Template
) )
func init() { func init() {
@ -76,7 +65,6 @@ func init() {
cephConfigTempl = template.Must(template.New("config").Parse(cephConfig)) cephConfigTempl = template.Must(template.New("config").Parse(cephConfig))
cephKeyringTempl = template.Must(template.New("keyring").Funcs(fm).Parse(cephKeyring)) cephKeyringTempl = template.Must(template.New("keyring").Funcs(fm).Parse(cephKeyring))
cephFullCapsKeyringTempl = template.Must(template.New("keyringFullCaps").Parse(cephFullCapsKeyring))
cephSecretTempl = template.Must(template.New("secret").Parse(cephSecret)) cephSecretTempl = template.Must(template.New("secret").Parse(cephSecret))
} }
@ -85,8 +73,8 @@ type cephConfigWriter interface {
} }
type cephConfigData struct { type cephConfigData struct {
Monitors string Monitors string
VolumeUuid string VolumeID volumeID
} }
func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, data interface{}) error { func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, data interface{}) error {
@ -108,46 +96,35 @@ func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, dat
} }
func (d *cephConfigData) writeToFile() error { func (d *cephConfigData) writeToFile() error {
return writeCephTemplate(fmt.Sprintf(cephConfigFileNameFmt, d.VolumeUuid), 0640, cephConfigTempl, d) return writeCephTemplate(fmt.Sprintf(cephConfigFileNameFmt, d.VolumeID), 0640, cephConfigTempl, d)
} }
type cephKeyringData struct { type cephKeyringData struct {
UserId, Key string UserId, Key string
RootPath string VolumeID volumeID
Pool, Namespace string
VolumeUuid string
} }
func (d *cephKeyringData) writeToFile() error { func (d *cephKeyringData) writeToFile() error {
return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeUuid, d.UserId), 0600, cephKeyringTempl, d) return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeID, d.UserId), 0600, cephKeyringTempl, d)
}
type cephFullCapsKeyringData struct {
UserId, Key string
VolumeUuid string
}
func (d *cephFullCapsKeyringData) writeToFile() error {
return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeUuid, d.UserId), 0600, cephFullCapsKeyringTempl, d)
} }
type cephSecretData struct { type cephSecretData struct {
UserId, Key string UserId, Key string
VolumeUuid string VolumeID volumeID
} }
func (d *cephSecretData) writeToFile() error { func (d *cephSecretData) writeToFile() error {
return writeCephTemplate(fmt.Sprintf(cephSecretFileNameFmt, d.VolumeUuid, d.UserId), 0600, cephSecretTempl, d) return writeCephTemplate(fmt.Sprintf(cephSecretFileNameFmt, d.VolumeID, d.UserId), 0600, cephSecretTempl, d)
} }
func getCephSecretPath(volUuid, userId string) string { func getCephSecretPath(volId volumeID, userId string) string {
return path.Join(cephConfigRoot, fmt.Sprintf(cephSecretFileNameFmt, volUuid, userId)) return path.Join(cephConfigRoot, fmt.Sprintf(cephSecretFileNameFmt, volId, userId))
} }
func getCephKeyringPath(volUuid, userId string) string { func getCephKeyringPath(volId volumeID, userId string) string {
return path.Join(cephConfigRoot, fmt.Sprintf(cephKeyringFileNameFmt, volUuid, userId)) return path.Join(cephConfigRoot, fmt.Sprintf(cephKeyringFileNameFmt, volId, userId))
} }
func getCephConfPath(volUuid string) string { func getCephConfPath(volId volumeID) string {
return path.Join(cephConfigRoot, fmt.Sprintf(cephConfigFileNameFmt, volUuid)) return path.Join(cephConfigRoot, fmt.Sprintf(cephConfigFileNameFmt, volId))
} }

View File

@ -22,7 +22,7 @@ import (
) )
const ( const (
cephUserPrefix = "csi-user-" cephUserPrefix = "user-"
cephEntityClientPrefix = "client." cephEntityClientPrefix = "client."
) )
@ -38,8 +38,8 @@ type cephEntity struct {
Caps cephEntityCaps `json:"caps"` Caps cephEntityCaps `json:"caps"`
} }
func getCephUserName(volUuid string) string { func getCephUserName(volId volumeID) string {
return cephUserPrefix + volUuid return cephUserPrefix + string(volId)
} }
func getCephUser(userId string) (*cephEntity, error) { func getCephUser(userId string) (*cephEntity, error) {
@ -57,17 +57,17 @@ func getCephUser(userId string) (*cephEntity, error) {
return &ents[0], nil return &ents[0], nil
} }
func createCephUser(volOptions *volumeOptions, cr *credentials, volUuid string) (*cephEntity, error) { func createCephUser(volOptions *volumeOptions, cr *credentials, volId volumeID) (*cephEntity, error) {
caps := cephEntityCaps{ caps := cephEntityCaps{
Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPath_ceph(volUuid)), Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPath_ceph(volId)),
Mon: "allow r", Mon: "allow r",
Osd: fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volUuid)), Osd: fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volId)),
} }
var ents []cephEntity var ents []cephEntity
args := [...]string{ args := [...]string{
"auth", "-f", "json", "-c", getCephConfPath(volUuid), "-n", cephEntityClientPrefix + cr.id, "auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + cr.id,
"get-or-create", cephEntityClientPrefix + getCephUserName(volUuid), "get-or-create", cephEntityClientPrefix + getCephUserName(volId),
"mds", caps.Mds, "mds", caps.Mds,
"mon", caps.Mon, "mon", caps.Mon,
"osd", caps.Osd, "osd", caps.Osd,
@ -80,11 +80,11 @@ func createCephUser(volOptions *volumeOptions, cr *credentials, volUuid string)
return &ents[0], nil return &ents[0], nil
} }
func deleteCephUser(cr *credentials, volUuid string) error { func deleteCephUser(adminCr *credentials, volId volumeID) error {
userId := getCephUserName(volUuid) userId := getCephUserName(volId)
args := [...]string{ args := [...]string{
"-c", getCephConfPath(volUuid), "-n", cephEntityClientPrefix + cr.id, "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id,
"auth", "rm", cephEntityClientPrefix + userId, "auth", "rm", cephEntityClientPrefix + userId,
} }
@ -92,8 +92,8 @@ func deleteCephUser(cr *credentials, volUuid string) error {
return err return err
} }
os.Remove(getCephKeyringPath(volUuid, userId)) os.Remove(getCephKeyringPath(volId, userId))
os.Remove(getCephSecretPath(volUuid, userId)) os.Remove(getCephSecretPath(volId, userId))
return nil return nil
} }

View File

@ -0,0 +1,128 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"github.com/golang/glog"
)
const (
controllerCacheRoot = PluginFolder + "/controller/plugin-cache"
)
type controllerCacheEntry struct {
VolOptions volumeOptions
VolumeID volumeID
}
type controllerCacheMap map[volumeID]*controllerCacheEntry
var (
ctrCache = make(controllerCacheMap)
ctrCacheMtx sync.Mutex
)
// Load all .json files from controllerCacheRoot into ctrCache
// Called from driver.go's Run()
func loadControllerCache() error {
cacheDir, err := ioutil.ReadDir(controllerCacheRoot)
if err != nil {
return fmt.Errorf("cannot read controller cache from %s: %v", controllerCacheRoot, err)
}
ctrCacheMtx.Lock()
defer ctrCacheMtx.Unlock()
for _, fi := range cacheDir {
if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() {
continue
}
f, err := os.Open(path.Join(controllerCacheRoot, fi.Name()))
if err != nil {
glog.Errorf("cephfs: cloudn't read '%s' from controller cache: %v", fi.Name(), err)
continue
}
d := json.NewDecoder(f)
ent := &controllerCacheEntry{}
if err = d.Decode(ent); err != nil {
glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err)
} else {
ctrCache[ent.VolumeID] = ent
}
f.Close()
}
return nil
}
func getControllerCacheEntryPath(volId volumeID) string {
return path.Join(controllerCacheRoot, string(volId)+".json")
}
func (m controllerCacheMap) insert(ent *controllerCacheEntry) error {
filePath := getControllerCacheEntryPath(ent.VolumeID)
ctrCacheMtx.Lock()
defer ctrCacheMtx.Unlock()
f, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("couldn't create cache entry file '%s': %v", filePath, err)
}
defer f.Close()
enc := json.NewEncoder(f)
if err = enc.Encode(ent); err != nil {
return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.VolumeID, err)
}
m[ent.VolumeID] = ent
return nil
}
func (m controllerCacheMap) pop(volId volumeID) (*controllerCacheEntry, error) {
ctrCacheMtx.Lock()
defer ctrCacheMtx.Unlock()
ent, ok := m[volId]
if !ok {
return nil, fmt.Errorf("cache entry for volume %s does not exist", volId)
}
filePath := getControllerCacheEntryPath(volId)
if err := os.Remove(filePath); err != nil {
return nil, fmt.Errorf("failed to remove cache entry file '%s': %v", filePath, err)
}
delete(m, volId)
return ent, nil
}

View File

@ -17,8 +17,6 @@ limitations under the License.
package cephfs package cephfs
import ( import (
"fmt"
"github.com/golang/glog" "github.com/golang/glog"
"golang.org/x/net/context" "golang.org/x/net/context"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
@ -36,30 +34,6 @@ const (
oneGB = 1073741824 oneGB = 1073741824
) )
func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return fmt.Errorf("invalid CreateVolumeRequest: %v", err)
}
if req.GetName() == "" {
return status.Error(codes.InvalidArgument, "Volume Name cannot be empty")
}
if req.GetVolumeCapabilities() == nil {
return status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}
return nil
}
func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return fmt.Errorf("invalid DeleteVolumeRequest: %v", err)
}
return nil
}
func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateCreateVolumeRequest(req); err != nil { if err := cs.validateCreateVolumeRequest(req); err != nil {
glog.Errorf("CreateVolumeRequest validation failed: %v", err) glog.Errorf("CreateVolumeRequest validation failed: %v", err)
@ -74,41 +48,37 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
volId := newVolumeIdentifier(volOptions, req) volId := newVolumeID()
conf := cephConfigData{Monitors: volOptions.Monitors, VolumeUuid: volId.uuid} conf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volId}
if err = conf.writeToFile(); err != nil { if err = conf.writeToFile(); err != nil {
glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volId.uuid), err) glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volId), err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
// Create a volume in case the user didn't provide one // Create a volume in case the user didn't provide one
if volOptions.ProvisionVolume { if volOptions.ProvisionVolume {
// Admin access is required // Admin credentials are required
cr, err := getAdminCredentials(req.GetControllerCreateSecrets()) cr, err := getAdminCredentials(req.GetControllerCreateSecrets())
if err != nil { if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
if err = storeCephAdminCredentials(volId.uuid, cr); err != nil { if err = storeCephCredentials(volId, cr); err != nil {
glog.Errorf("failed to store admin credentials for '%s': %v", cr.id, err) glog.Errorf("failed to store admin credentials for '%s': %v", cr.id, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
if err = createVolume(volOptions, cr, volId.uuid, req.GetCapacityRange().GetRequiredBytes()); err != nil { if err = createVolume(volOptions, cr, volId, req.GetCapacityRange().GetRequiredBytes()); err != nil {
glog.Errorf("failed to create volume %s: %v", volId.name, err) glog.Errorf("failed to create volume %s: %v", req.GetName(), err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
glog.V(4).Infof("cephfs: volume %s successfuly created", volId.id) glog.Infof("cephfs: successfuly created volume %s", volId)
} else { } else {
glog.V(4).Infof("cephfs: volume %s is provisioned statically", volId.id) glog.Infof("cephfs: volume %s is provisioned statically", volId)
}
if err = volCache.insert(&volumeCacheEntry{Identifier: *volId, VolOptions: *volOptions}); err != nil {
glog.Warningf("failed to store a volume cache entry: %v", err)
} }
sz := req.GetCapacityRange().GetRequiredBytes() sz := req.GetCapacityRange().GetRequiredBytes()
@ -116,9 +86,14 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
sz = oneGB sz = oneGB
} }
if err = ctrCache.insert(&controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId}); err != nil {
glog.Errorf("failed to store a cache entry for volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.CreateVolumeResponse{ return &csi.CreateVolumeResponse{
Volume: &csi.Volume{ Volume: &csi.Volume{
Id: volId.id, Id: string(volId),
CapacityBytes: sz, CapacityBytes: sz,
Attributes: req.GetParameters(), Attributes: req.GetParameters(),
}, },
@ -132,27 +107,35 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
} }
var ( var (
volId = req.GetVolumeId() volId = volumeID(req.GetVolumeId())
volUuid = uuidFromVolumeId(volId) err error
) )
// Load volume info from cache // Load volume info from cache
ent, found := volCache.get(volUuid) ent, err := ctrCache.pop(volId)
if !found { if err != nil {
msg := fmt.Sprintf("failed to retrieve cache entry for volume %s", volId) glog.Error(err)
glog.Error(msg) return nil, status.Error(codes.Internal, err.Error())
return nil, status.Error(codes.Internal, msg)
} }
if !ent.VolOptions.ProvisionVolume { if !ent.VolOptions.ProvisionVolume {
// DeleteVolume() is forbidden for statically provisioned volumes! // DeleteVolume() is forbidden for statically provisioned volumes!
msg := fmt.Sprintf("volume %s is provisioned statically, aborting delete", volId)
glog.Warningf(msg) glog.Warningf("volume %s is provisioned statically, aborting delete", volId)
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil
} }
// Requires admin credentials defer func() {
if err != nil {
// Reinsert cache entry for retry
if insErr := ctrCache.insert(ent); insErr != nil {
glog.Errorf("failed to reinsert volume cache entry in rollback procedure for volume %s: %v", volId, err)
}
}
}()
// Deleting a volume requires admin credentials
cr, err := getAdminCredentials(req.GetControllerDeleteSecrets()) cr, err := getAdminCredentials(req.GetControllerDeleteSecrets())
if err != nil { if err != nil {
@ -160,24 +143,12 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
// Remove the volume, the user and the volume cache entry
if err = purgeVolume(volId, cr, &ent.VolOptions); err != nil { if err = purgeVolume(volId, cr, &ent.VolOptions); err != nil {
glog.Errorf("failed to delete volume %s: %v", volId, err) glog.Errorf("failed to delete volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
if err = deleteCephUser(cr, volUuid); err != nil { glog.Infof("cephfs: successfuly deleted volume %s", volId)
glog.Errorf("failed to delete ceph user %s: %v", getCephUserName(volUuid), err)
return nil, status.Error(codes.Internal, err.Error())
}
if err = volCache.erase(volUuid); err != nil {
glog.Errorf("failed to delete cache entry for volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("cephfs: volume %s successfuly deleted", volId)
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil
} }

View File

@ -27,7 +27,7 @@ import (
const ( const (
PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin" PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin"
Version = "0.2.0" Version = "0.3.0"
) )
type cephfsDriver struct { type cephfsDriver struct {
@ -81,12 +81,12 @@ func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string)
// Configuration // Configuration
if err := os.MkdirAll(volumeCacheRoot, 0755); err != nil { if err := os.MkdirAll(controllerCacheRoot, 0755); err != nil {
glog.Fatalf("cephfs: failed to create %s: %v", volumeCacheRoot, err) glog.Fatalf("cephfs: failed to create %s: %v", controllerCacheRoot, err)
return return
} }
if err := loadVolumeCache(); err != nil { if err := loadControllerCache(); err != nil {
glog.Errorf("cephfs: failed to read volume cache: %v", err) glog.Errorf("cephfs: failed to read volume cache: %v", err)
} }

55
pkg/cephfs/nodecache.go Normal file
View File

@ -0,0 +1,55 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"sync"
)
type nodeCacheEntry struct {
volOptions *volumeOptions
cephAdminID string
}
type nodeCacheMap map[volumeID]*nodeCacheEntry
var (
nodeCache = make(nodeCacheMap)
nodeCacheMtx sync.Mutex
)
func (m nodeCacheMap) insert(volId volumeID, ent *nodeCacheEntry) {
nodeCacheMtx.Lock()
defer nodeCacheMtx.Unlock()
m[volId] = ent
}
func (m nodeCacheMap) pop(volId volumeID) (*nodeCacheEntry, error) {
nodeCacheMtx.Lock()
defer nodeCacheMtx.Unlock()
ent, ok := m[volId]
if !ok {
return nil, fmt.Errorf("node cache entry for volume %s not found", volId)
}
delete(m, volId)
return ent, nil
}

View File

@ -32,38 +32,10 @@ type nodeServer struct {
*csicommon.DefaultNodeServer *csicommon.DefaultNodeServer
} }
func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { func getOrCreateUser(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) {
if req.GetVolumeCapability() == nil {
return status.Error(codes.InvalidArgument, "Volume capability missing in request")
}
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetTargetPath() == "" {
return status.Error(codes.InvalidArgument, "Target path missing in request")
}
return nil
}
func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error {
if req.GetVolumeId() == "" {
return status.Error(codes.InvalidArgument, "Volume ID missing in request")
}
if req.GetTargetPath() == "" {
return status.Error(codes.InvalidArgument, "Target path missing in request")
}
return nil
}
func handleUser(volOptions *volumeOptions, volUuid string, req *csi.NodePublishVolumeRequest) (*credentials, error) {
var ( var (
cr = &credentials{} userCr = &credentials{}
err error err error
) )
// Retrieve the credentials (possibly create a new user as well) // Retrieve the credentials (possibly create a new user as well)
@ -73,71 +45,123 @@ func handleUser(volOptions *volumeOptions, volUuid string, req *csi.NodePublishV
// First, store admin credentials - those are needed for creating a user // First, store admin credentials - those are needed for creating a user
adminCr, err := getAdminCredentials(req.GetNodePublishSecrets()) adminCr, err := getAdminCredentials(req.GetNodeStageSecrets())
if err != nil { if err != nil {
return nil, err return nil, err
} }
if err = storeCephAdminCredentials(volUuid, adminCr); err != nil { if err = storeCephCredentials(volId, adminCr); err != nil {
return nil, err return nil, err
} }
nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions, cephAdminID: adminCr.id})
// Then create the user // Then create the user
if ent, err := createCephUser(volOptions, adminCr, volUuid); err != nil { if ent, err := createCephUser(volOptions, adminCr, volId); err != nil {
return nil, err return nil, err
} else { } else {
cr.id = ent.Entity[len(cephEntityClientPrefix):] userCr.id = ent.Entity[len(cephEntityClientPrefix):]
cr.key = ent.Key userCr.key = ent.Key
} }
// Set the correct volume root path // Set the correct volume root path
volOptions.RootPath = getVolumeRootPath_ceph(volUuid) volOptions.RootPath = getVolumeRootPath_ceph(volId)
} else { } else {
// The volume is pre-made, credentials are supplied by the user // The volume is pre-made, credentials are supplied by the user
cr, err = getUserCredentials(req.GetNodePublishSecrets()) userCr, err = getUserCredentials(req.GetNodeStageSecrets())
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions})
} }
if err = storeCephUserCredentials(volUuid, cr, volOptions); err != nil { if err = storeCephCredentials(volId, userCr); err != nil {
return nil, err return nil, err
} }
return cr, nil return userCr, nil
}
func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
if err := validateNodeStageVolumeRequest(req); err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
// Configuration
stagingTargetPath := req.GetStagingTargetPath()
volId := volumeID(req.GetVolumeId())
volOptions, err := newVolumeOptions(req.GetVolumeAttributes())
if err != nil {
glog.Errorf("error reading volume options for volume %s: %v", volId, err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if err = createMountPoint(stagingTargetPath); err != nil {
glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
cephConf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volId}
if err = cephConf.writeToFile(); err != nil {
glog.Errorf("failed to write ceph config file to %s for volume %s: %v", getCephConfPath(volId), volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
// Check if the volume is already mounted
isMnt, err := isMountPoint(stagingTargetPath)
if err != nil {
glog.Errorf("stat failed: %v", err)
return nil, status.Error(codes.Internal, err.Error())
}
if isMnt {
glog.Infof("cephfs: volume %s is already mounted to %s, skipping", volId, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
}
// It's not, mount now
cr, err := getOrCreateUser(volOptions, volId, req)
if err != nil {
glog.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
m := newMounter(volOptions)
glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name())
if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil {
glog.Errorf("failed to mount volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.Infof("cephfs: successfully mounted volume %s to %s", volId, stagingTargetPath)
return &csi.NodeStageVolumeResponse{}, nil
} }
func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) {
if err := validateNodePublishVolumeRequest(req); err != nil { if err := validateNodePublishVolumeRequest(req); err != nil {
return nil, err return nil, status.Error(codes.InvalidArgument, err.Error())
} }
// Configuration // Configuration
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
volId := req.GetVolumeId() volId := req.GetVolumeId()
volUuid := uuidFromVolumeId(volId)
volOptions, err := newVolumeOptions(req.GetVolumeAttributes()) if err := createMountPoint(targetPath); err != nil {
if err != nil {
glog.Errorf("error reading volume options: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
if err = createMountPoint(targetPath); err != nil {
glog.Errorf("failed to create mount point at %s: %v", targetPath, err) glog.Errorf("failed to create mount point at %s: %v", targetPath, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
conf := cephConfigData{Monitors: volOptions.Monitors, VolumeUuid: volUuid}
if err = conf.writeToFile(); err != nil {
glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volUuid), err)
return nil, status.Error(codes.Internal, err.Error())
}
// Check if the volume is already mounted // Check if the volume is already mounted
isMnt, err := isMountPoint(targetPath) isMnt, err := isMountPoint(targetPath)
@ -148,36 +172,27 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis
} }
if isMnt { if isMnt {
glog.V(4).Infof("cephfs: volume %s is already mounted to %s", volId, targetPath) glog.Infof("cephfs: volume %s is already bind-mounted to %s", volId, targetPath)
return &csi.NodePublishVolumeResponse{}, nil return &csi.NodePublishVolumeResponse{}, nil
} }
// It's not, mount now // It's not, mount now
cr, err := handleUser(volOptions, volUuid, req) if err = bindMount(req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly()); err != nil {
glog.Errorf("failed to bind-mount volume %s: %v", volId, err)
if err != nil {
glog.Error(err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
m := newMounter(volOptions) glog.Infof("cephfs: successfuly bind-mounted volume %s to %s", volId, targetPath)
if err = m.mount(targetPath, cr, volOptions, volUuid, req.GetReadonly()); err != nil {
glog.Errorf("failed to mount volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.V(4).Infof("cephfs: volume %s successfuly mounted to %s", volId, targetPath)
return &csi.NodePublishVolumeResponse{}, nil return &csi.NodePublishVolumeResponse{}, nil
} }
func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
if err := validateNodeUnpublishVolumeRequest(req); err != nil { if err := validateNodeUnpublishVolumeRequest(req); err != nil {
return nil, err return nil, status.Error(codes.InvalidArgument, err.Error())
} }
volId := req.GetVolumeId()
targetPath := req.GetTargetPath() targetPath := req.GetTargetPath()
// Unmount the bind-mount // Unmount the bind-mount
@ -185,30 +200,63 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
localVolRoot := getVolumeRootPath_local(uuidFromVolumeId(volId)) os.Remove(targetPath)
// Unmount the volume root glog.Infof("cephfs: successfuly unbinded volume %s from %s", req.GetVolumeId(), targetPath)
if err := unmountVolume(localVolRoot); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
os.Remove(localVolRoot)
glog.V(4).Infof("cephfs: volume %s successfuly unmounted from %s", volId, req.GetTargetPath())
return &csi.NodeUnpublishVolumeResponse{}, nil return &csi.NodeUnpublishVolumeResponse{}, nil
} }
func (ns *nodeServer) NodeStageVolume( func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
ctx context.Context, if err := validateNodeUnstageVolumeRequest(req); err != nil {
req *csi.NodeStageVolumeRequest) ( return nil, status.Error(codes.InvalidArgument, err.Error())
*csi.NodeStageVolumeResponse, error) { }
return nil, status.Error(codes.Unimplemented, "")
volId := volumeID(req.GetVolumeId())
stagingTargetPath := req.GetStagingTargetPath()
// Unmount the volume
if err := unmountVolume(stagingTargetPath); err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
os.Remove(stagingTargetPath)
ent, err := nodeCache.pop(volId)
if err != nil {
glog.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
if ent.volOptions.ProvisionVolume {
// We've created a dedicated Ceph user in NodeStageVolume,
// it's about to be deleted here.
if err = deleteCephUser(&credentials{id: ent.cephAdminID}, volId); err != nil {
glog.Errorf("failed to delete ceph user %s for volume %s: %v", getCephUserName(volId), volId, err)
// Reinsert cache entry for retry
nodeCache.insert(volId, ent)
return nil, status.Error(codes.Internal, err.Error())
}
}
glog.Infof("cephfs: successfuly umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath)
return &csi.NodeUnstageVolumeResponse{}, nil
} }
func (ns *nodeServer) NodeUnstageVolume( func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
ctx context.Context, return &csi.NodeGetCapabilitiesResponse{
req *csi.NodeUnstageVolumeRequest) ( Capabilities: []*csi.NodeServiceCapability{
*csi.NodeUnstageVolumeResponse, error) { {
return nil, status.Error(codes.Unimplemented, "") Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
},
},
},
},
}, nil
} }

View File

@ -26,17 +26,25 @@ import (
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"k8s.io/kubernetes/pkg/util/keymutex" "github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/pborman/uuid"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
) )
type volumeID string
func newVolumeID() volumeID {
return volumeID("csi-cephfs-" + uuid.NewUUID().String())
}
func execCommand(command string, args ...string) ([]byte, error) { func execCommand(command string, args ...string) ([]byte, error) {
glog.V(4).Infof("cephfs: EXEC %s %s", command, args)
cmd := exec.Command(command, args...) cmd := exec.Command(command, args...)
return cmd.CombinedOutput() return cmd.CombinedOutput()
} }
func execCommandAndValidate(program string, args ...string) error { func execCommandAndValidate(program string, args ...string) error {
glog.V(4).Infof("cephfs: executing command: %s with args: %s", program, args)
out, err := execCommand(program, args...) out, err := execCommand(program, args...)
if err != nil { if err != nil {
return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out) return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out)
@ -65,49 +73,21 @@ func isMountPoint(p string) (bool, error) {
return !notMnt, nil return !notMnt, nil
} }
func tryLock(id string, mtx keymutex.KeyMutex, name string) error { func storeCephCredentials(volId volumeID, cr *credentials) error {
// TODO uncomment this once TryLockKey gets into Kubernetes
/*
if !mtx.TryLockKey(id) {
msg := fmt.Sprintf("%s has a pending operation on %s", name, req.GetVolumeId())
glog.Infoln(msg)
return status.Error(codes.Aborted, msg)
}
*/
return nil
}
func storeCephUserCredentials(volUuid string, cr *credentials, volOptions *volumeOptions) error {
keyringData := cephKeyringData{ keyringData := cephKeyringData{
UserId: cr.id, UserId: cr.id,
Key: cr.key, Key: cr.key,
RootPath: volOptions.RootPath, VolumeID: volId,
VolumeUuid: volUuid,
} }
if volOptions.ProvisionVolume {
keyringData.Pool = volOptions.Pool
keyringData.Namespace = getVolumeNamespace(volUuid)
}
return storeCephCredentials(volUuid, cr, &keyringData)
}
func storeCephAdminCredentials(volUuid string, cr *credentials) error {
return storeCephCredentials(volUuid, cr, &cephFullCapsKeyringData{UserId: cr.id, Key: cr.key, VolumeUuid: volUuid})
}
func storeCephCredentials(volUuid string, cr *credentials, keyringData cephConfigWriter) error {
if err := keyringData.writeToFile(); err != nil { if err := keyringData.writeToFile(); err != nil {
return err return err
} }
secret := cephSecretData{ secret := cephSecretData{
UserId: cr.id, UserId: cr.id,
Key: cr.key, Key: cr.key,
VolumeUuid: volUuid, VolumeID: volId,
} }
if err := secret.writeToFile(); err != nil { if err := secret.writeToFile(); err != nil {
@ -124,8 +104,6 @@ func newMounter(volOptions *volumeOptions) volumeMounter {
mounter = DefaultVolumeMounter mounter = DefaultVolumeMounter
} }
glog.V(4).Infof("cephfs: setting volume mounter to: %s", mounter)
switch mounter { switch mounter {
case volumeMounter_fuse: case volumeMounter_fuse:
return &fuseMounter{} return &fuseMounter{}
@ -135,3 +113,95 @@ func newMounter(volOptions *volumeOptions) volumeMounter {
return nil return nil
} }
//
// Controller service request validation
//
func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return fmt.Errorf("invalid CreateVolumeRequest: %v", err)
}
if req.GetName() == "" {
return status.Error(codes.InvalidArgument, "Volume Name cannot be empty")
}
if req.GetVolumeCapabilities() == nil {
return status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty")
}
return nil
}
func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return fmt.Errorf("invalid DeleteVolumeRequest: %v", err)
}
return nil
}
//
// Node service request validation
//
func validateNodeStageVolumeRequest(req *csi.NodeStageVolumeRequest) error {
if req.GetVolumeCapability() == nil {
return fmt.Errorf("volume capability missing in request")
}
if req.GetVolumeId() == "" {
return fmt.Errorf("volume ID missing in request")
}
if req.GetStagingTargetPath() == "" {
return fmt.Errorf("staging target path missing in request")
}
if req.GetNodeStageSecrets() == nil || len(req.GetNodeStageSecrets()) == 0 {
return fmt.Errorf("stage secrets cannot be nil or empty")
}
return nil
}
func validateNodeUnstageVolumeRequest(req *csi.NodeUnstageVolumeRequest) error {
if req.GetVolumeId() == "" {
return fmt.Errorf("volume ID missing in request")
}
if req.GetStagingTargetPath() == "" {
return fmt.Errorf("staging target path missing in request")
}
return nil
}
func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error {
if req.GetVolumeCapability() == nil {
return fmt.Errorf("volume capability missing in request")
}
if req.GetVolumeId() == "" {
return fmt.Errorf("volume ID missing in request")
}
if req.GetTargetPath() == "" {
return fmt.Errorf("varget path missing in request")
}
return nil
}
func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error {
if req.GetVolumeId() == "" {
return fmt.Errorf("volume ID missing in request")
}
if req.GetTargetPath() == "" {
return fmt.Errorf("target path missing in request")
}
return nil
}

View File

@ -22,44 +22,35 @@ import (
"path" "path"
) )
// Volumes are mounted in .../controller/volumes/vol-{UUID}
// The actual user data resides in .../vol-{UUID}/volume-data
// purgeVolume moves the user data to .../vol-{UUID}/volume-deleting and only then calls os.RemoveAll
const ( const (
cephRootPrefix = PluginFolder + "/controller/volumes/root-" cephRootPrefix = PluginFolder + "/controller/volumes/root-"
cephVolumePrefix = PluginFolder + "/controller/volumes/vol-" cephVolumesRoot = "csi-volumes"
cephVolumesRoot = "csi-volumes"
namespacePrefix = "csi-ns-" namespacePrefix = "ns-"
) )
func getCephRootPath_local(volUuid string) string { func getCephRootPath_local(volId volumeID) string {
return cephRootPrefix + volUuid return cephRootPrefix + string(volId)
} }
func getCephRootVolumePath_local(volUuid string) string { func getCephRootVolumePath_local(volId volumeID) string {
return path.Join(getCephRootPath_local(volUuid), cephVolumesRoot, volUuid) return path.Join(getCephRootPath_local(volId), cephVolumesRoot, string(volId))
} }
func getVolumeRootPath_local(volUuid string) string { func getVolumeRootPath_ceph(volId volumeID) string {
return cephVolumePrefix + volUuid return path.Join("/", cephVolumesRoot, string(volId))
} }
func getVolumeRootPath_ceph(volUuid string) string { func getVolumeNamespace(volId volumeID) string {
return path.Join("/", cephVolumesRoot, volUuid) return namespacePrefix + string(volId)
}
func getVolumeNamespace(volUuid string) string {
return namespacePrefix + volUuid
} }
func setVolumeAttribute(root, attrName, attrValue string) error { func setVolumeAttribute(root, attrName, attrValue string) error {
return execCommandAndValidate("setfattr", "-n", attrName, "-v", attrValue, root) return execCommandAndValidate("setfattr", "-n", attrName, "-v", attrValue, root)
} }
func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid string, bytesQuota int64) error { func createVolume(volOptions *volumeOptions, adminCr *credentials, volId volumeID, bytesQuota int64) error {
cephRoot := getCephRootPath_local(volUuid) cephRoot := getCephRootPath_local(volId)
if err := createMountPoint(cephRoot); err != nil { if err := createMountPoint(cephRoot); err != nil {
return err return err
@ -69,7 +60,7 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin
// Access to cephfs's / is required // Access to cephfs's / is required
volOptions.RootPath = "/" volOptions.RootPath = "/"
if err := mountKernel(cephRoot, adminCr, volOptions, volUuid); err != nil { if err := mountKernel(cephRoot, adminCr, volOptions, volId); err != nil {
return fmt.Errorf("error mounting ceph root: %v", err) return fmt.Errorf("error mounting ceph root: %v", err)
} }
@ -78,8 +69,8 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin
os.Remove(cephRoot) os.Remove(cephRoot)
}() }()
volOptions.RootPath = getVolumeRootPath_ceph(volUuid) volOptions.RootPath = getVolumeRootPath_ceph(volId)
localVolRoot := getCephRootVolumePath_local(volUuid) localVolRoot := getCephRootVolumePath_local(volId)
if err := createMountPoint(localVolRoot); err != nil { if err := createMountPoint(localVolRoot); err != nil {
return err return err
@ -93,21 +84,20 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin
return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool) return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool)
} }
if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volUuid)); err != nil { if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volId)); err != nil {
return err return err
} }
return nil return nil
} }
func purgeVolume(volId string, cr *credentials, volOptions *volumeOptions) error { func purgeVolume(volId volumeID, cr *credentials, volOptions *volumeOptions) error {
// Root path is not set for dynamically provisioned volumes // Root path is not set for dynamically provisioned volumes
volOptions.RootPath = "/" volOptions.RootPath = "/"
var ( var (
volUuid = uuidFromVolumeId(volId) root = getCephRootPath_local(volId)
root = getCephRootPath_local(volUuid) volRoot = getCephRootVolumePath_local(volId)
volRoot = getCephRootVolumePath_local(volUuid)
volRootDeleting = volRoot + "-deleting" volRootDeleting = volRoot + "-deleting"
) )
@ -115,7 +105,7 @@ func purgeVolume(volId string, cr *credentials, volOptions *volumeOptions) error
return err return err
} }
if err := mountKernel(root, cr, volOptions, volUuid); err != nil { if err := mountKernel(root, cr, volOptions, volId); err != nil {
return err return err
} }

View File

@ -1,131 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path"
"strings"
"sync"
"github.com/golang/glog"
)
const (
volumeCacheRoot = PluginFolder + "/controller/volume-cache"
)
type volumeCacheEntry struct {
VolOptions volumeOptions
Identifier volumeIdentifier
}
type volumeCache struct {
entries map[string]*volumeCacheEntry
}
var (
volCache volumeCache
volCacheMtx sync.RWMutex
)
// Loads all .json files from volumeCacheRoot into volCache
// Called from driver.go's Run()
func loadVolumeCache() error {
cacheDir, err := ioutil.ReadDir(volumeCacheRoot)
if err != nil {
return fmt.Errorf("cannot read volume cache: %v", err)
}
volCacheMtx.Lock()
defer volCacheMtx.Unlock()
volCache.entries = make(map[string]*volumeCacheEntry)
for _, fi := range cacheDir {
if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() {
continue
}
f, err := os.Open(path.Join(volumeCacheRoot, fi.Name()))
if err != nil {
glog.Errorf("cephfs: couldn't read '%s' from volume cache: %v", fi.Name(), err)
continue
}
d := json.NewDecoder(f)
ent := &volumeCacheEntry{}
if err = d.Decode(ent); err != nil {
glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err)
} else {
volCache.entries[ent.Identifier.uuid] = ent
}
f.Close()
}
return nil
}
func getVolumeCacheEntryPath(volUuid string) string {
return path.Join(volumeCacheRoot, fmt.Sprintf("vol-%s.json", volUuid))
}
func (vc *volumeCache) insert(ent *volumeCacheEntry) error {
filePath := getVolumeCacheEntryPath(ent.Identifier.uuid)
volCacheMtx.Lock()
defer volCacheMtx.Unlock()
f, err := os.Create(filePath)
if err != nil {
return fmt.Errorf("couldn't create cache entry file %s: %v", filePath, err)
}
defer f.Close()
e := json.NewEncoder(f)
if err = e.Encode(ent); err != nil {
return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.Identifier.id, err)
}
vc.entries[ent.Identifier.uuid] = ent
return nil
}
func (vc *volumeCache) erase(volUuid string) error {
volCacheMtx.Lock()
delete(vc.entries, volUuid)
volCacheMtx.Unlock()
return os.Remove(getVolumeCacheEntryPath(volUuid))
}
func (vc *volumeCache) get(volUuid string) (volumeCacheEntry, bool) {
volCacheMtx.RLock()
defer volCacheMtx.RUnlock()
if ent, ok := vc.entries[volUuid]; ok {
return *ent, true
} else {
return volumeCacheEntry{}, false
}
}

View File

@ -1,60 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"github.com/container-storage-interface/spec/lib/go/csi/v0"
"github.com/pborman/uuid"
)
const (
dynamicallyProvisionedVolumePrefix = "csi-cephfs-dyn-"
staticallyProvisionedVolumePrefix = "csi-cephfs-sta-"
volumePrefixLen = len(dynamicallyProvisionedVolumePrefix)
)
type volumeIdentifier struct {
name, uuid, id string
}
func newVolumeIdentifier(volOptions *volumeOptions, req *csi.CreateVolumeRequest) *volumeIdentifier {
volId := volumeIdentifier{
name: req.GetName(),
uuid: uuid.NewUUID().String(),
}
prefix := staticallyProvisionedVolumePrefix
if volOptions.ProvisionVolume {
prefix = dynamicallyProvisionedVolumePrefix
}
volId.id = prefix + volId.uuid
return &volId
}
func uuidFromVolumeId(volId string) string {
return volId[volumePrefixLen:]
}
func isDynProvision(volId string) bool {
if len(volId) <= volumePrefixLen {
return false
}
return volId[:volumePrefixLen] == dynamicallyProvisionedVolumePrefix
}

View File

@ -28,17 +28,18 @@ const (
) )
type volumeMounter interface { type volumeMounter interface {
mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error
name() string
} }
type fuseMounter struct{} type fuseMounter struct{}
func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string) error { func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error {
args := [...]string{ args := [...]string{
mountPoint, mountPoint,
"-c", getCephConfPath(volUuid), "-c", getCephConfPath(volId),
"-n", cephEntityClientPrefix + cr.id, "-n", cephEntityClientPrefix + cr.id,
"--keyring", getCephKeyringPath(volUuid, cr.id), "--keyring", getCephKeyringPath(volId, cr.id),
"-r", volOptions.RootPath, "-r", volOptions.RootPath,
"-o", "nonempty", "-o", "nonempty",
} }
@ -55,27 +56,19 @@ func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, vo
return nil return nil
} }
func (m *fuseMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error { func (m *fuseMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error {
if err := createMountPoint(mountPoint); err != nil { if err := createMountPoint(mountPoint); err != nil {
return err return err
} }
localVolRoot := getVolumeRootPath_local(volUuid) return mountFuse(mountPoint, cr, volOptions, volId)
if err := createMountPoint(localVolRoot); err != nil {
return err
}
if err := mountFuse(localVolRoot, cr, volOptions, volUuid); err != nil {
return err
}
return bindVolume(volUuid, mountPoint, readOnly)
} }
func (m *fuseMounter) name() string { return "Ceph FUSE driver" }
type kernelMounter struct{} type kernelMounter struct{}
func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string) error { func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error {
if err := execCommandAndValidate("modprobe", "ceph"); err != nil { if err := execCommandAndValidate("modprobe", "ceph"); err != nil {
return err return err
} }
@ -85,52 +78,34 @@ func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions,
fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath), fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath),
mountPoint, mountPoint,
"-o", "-o",
fmt.Sprintf("name=%s,secretfile=%s", cr.id, getCephSecretPath(volUuid, cr.id)), fmt.Sprintf("name=%s,secretfile=%s", cr.id, getCephSecretPath(volId, cr.id)),
) )
} }
func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error { func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error {
if err := createMountPoint(mountPoint); err != nil { if err := createMountPoint(mountPoint); err != nil {
return err return err
} }
localVolRoot := getVolumeRootPath_local(volUuid) return mountKernel(mountPoint, cr, volOptions, volId)
if err := createMountPoint(localVolRoot); err != nil {
return err
}
if err := mountKernel(localVolRoot, cr, volOptions, volUuid); err != nil {
return err
}
return bindVolume(volUuid, mountPoint, readOnly)
} }
func (m *kernelMounter) name() string { return "Ceph kernel client" }
func bindMount(from, to string, readOnly bool) error { func bindMount(from, to string, readOnly bool) error {
if err := execCommandAndValidate("mount", "--bind", from, to); err != nil { if err := execCommandAndValidate("mount", "--bind", from, to); err != nil {
return fmt.Errorf("failed bind-mount of %s to %s: %v", from, to, err) return fmt.Errorf("failed to bind-mount %s to %s: %v", from, to, err)
} }
if readOnly { if readOnly {
if err := execCommandAndValidate("mount", "-o", "remount,ro,bind", to); err != nil { if err := execCommandAndValidate("mount", "-o", "remount,ro,bind", to); err != nil {
return err return fmt.Errorf("failed read-only remount of %s: %v", to, err)
} }
} }
return nil return nil
} }
func bindVolume(volUuid, target string, readOnly bool) error {
volDataRoot := getVolumeRootPath_local(volUuid)
if err := createMountPoint(volDataRoot); err != nil {
return err
}
return bindMount(volDataRoot, target, readOnly)
}
func unmountVolume(mountPoint string) error { func unmountVolume(mountPoint string) error {
return execCommandAndValidate("umount", mountPoint) return execCommandAndValidate("umount", mountPoint)
} }