diff --git a/pkg/cephfs/cephconf.go b/pkg/cephfs/cephconf.go index 8c74b4fdd..44df828f9 100644 --- a/pkg/cephfs/cephconf.go +++ b/pkg/cephfs/cephconf.go @@ -35,16 +35,6 @@ fuse_set_user_groups = false const cephKeyring = `[client.{{.UserId}}] key = {{.Key}} -caps mds = "allow rw path={{.RootPath}}" -caps mon = "allow r" -caps osd = "allow rw{{if .Pool}} pool={{.Pool}}{{end}}{{if .Namespace}} namespace={{.Namespace}}{{end}}" -` - -const cephFullCapsKeyring = `[client.{{.UserId}}] -key = {{.Key}} -caps mds = "allow" -caps mon = "allow *" -caps osd = "allow *" ` const cephSecret = `{{.Key}}` @@ -57,10 +47,9 @@ const ( ) var ( - cephConfigTempl *template.Template - cephKeyringTempl *template.Template - cephFullCapsKeyringTempl *template.Template - cephSecretTempl *template.Template + cephConfigTempl *template.Template + cephKeyringTempl *template.Template + cephSecretTempl *template.Template ) func init() { @@ -76,7 +65,6 @@ func init() { cephConfigTempl = template.Must(template.New("config").Parse(cephConfig)) cephKeyringTempl = template.Must(template.New("keyring").Funcs(fm).Parse(cephKeyring)) - cephFullCapsKeyringTempl = template.Must(template.New("keyringFullCaps").Parse(cephFullCapsKeyring)) cephSecretTempl = template.Must(template.New("secret").Parse(cephSecret)) } @@ -85,8 +73,8 @@ type cephConfigWriter interface { } type cephConfigData struct { - Monitors string - VolumeUuid string + Monitors string + VolumeID volumeID } func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, data interface{}) error { @@ -108,46 +96,35 @@ func writeCephTemplate(fileName string, m os.FileMode, t *template.Template, dat } func (d *cephConfigData) writeToFile() error { - return writeCephTemplate(fmt.Sprintf(cephConfigFileNameFmt, d.VolumeUuid), 0640, cephConfigTempl, d) + return writeCephTemplate(fmt.Sprintf(cephConfigFileNameFmt, d.VolumeID), 0640, cephConfigTempl, d) } type cephKeyringData struct { - UserId, Key string - RootPath string - Pool, Namespace string - VolumeUuid string + UserId, Key string + VolumeID volumeID } func (d *cephKeyringData) writeToFile() error { - return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeUuid, d.UserId), 0600, cephKeyringTempl, d) -} - -type cephFullCapsKeyringData struct { - UserId, Key string - VolumeUuid string -} - -func (d *cephFullCapsKeyringData) writeToFile() error { - return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeUuid, d.UserId), 0600, cephFullCapsKeyringTempl, d) + return writeCephTemplate(fmt.Sprintf(cephKeyringFileNameFmt, d.VolumeID, d.UserId), 0600, cephKeyringTempl, d) } type cephSecretData struct { UserId, Key string - VolumeUuid string + VolumeID volumeID } func (d *cephSecretData) writeToFile() error { - return writeCephTemplate(fmt.Sprintf(cephSecretFileNameFmt, d.VolumeUuid, d.UserId), 0600, cephSecretTempl, d) + return writeCephTemplate(fmt.Sprintf(cephSecretFileNameFmt, d.VolumeID, d.UserId), 0600, cephSecretTempl, d) } -func getCephSecretPath(volUuid, userId string) string { - return path.Join(cephConfigRoot, fmt.Sprintf(cephSecretFileNameFmt, volUuid, userId)) +func getCephSecretPath(volId volumeID, userId string) string { + return path.Join(cephConfigRoot, fmt.Sprintf(cephSecretFileNameFmt, volId, userId)) } -func getCephKeyringPath(volUuid, userId string) string { - return path.Join(cephConfigRoot, fmt.Sprintf(cephKeyringFileNameFmt, volUuid, userId)) +func getCephKeyringPath(volId volumeID, userId string) string { + return path.Join(cephConfigRoot, fmt.Sprintf(cephKeyringFileNameFmt, volId, userId)) } -func getCephConfPath(volUuid string) string { - return path.Join(cephConfigRoot, fmt.Sprintf(cephConfigFileNameFmt, volUuid)) +func getCephConfPath(volId volumeID) string { + return path.Join(cephConfigRoot, fmt.Sprintf(cephConfigFileNameFmt, volId)) } diff --git a/pkg/cephfs/cephuser.go b/pkg/cephfs/cephuser.go index abd152368..1f1167b71 100644 --- a/pkg/cephfs/cephuser.go +++ b/pkg/cephfs/cephuser.go @@ -22,7 +22,7 @@ import ( ) const ( - cephUserPrefix = "csi-user-" + cephUserPrefix = "user-" cephEntityClientPrefix = "client." ) @@ -38,8 +38,8 @@ type cephEntity struct { Caps cephEntityCaps `json:"caps"` } -func getCephUserName(volUuid string) string { - return cephUserPrefix + volUuid +func getCephUserName(volId volumeID) string { + return cephUserPrefix + string(volId) } func getCephUser(userId string) (*cephEntity, error) { @@ -57,17 +57,17 @@ func getCephUser(userId string) (*cephEntity, error) { return &ents[0], nil } -func createCephUser(volOptions *volumeOptions, cr *credentials, volUuid string) (*cephEntity, error) { +func createCephUser(volOptions *volumeOptions, cr *credentials, volId volumeID) (*cephEntity, error) { caps := cephEntityCaps{ - Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPath_ceph(volUuid)), + Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPath_ceph(volId)), Mon: "allow r", - Osd: fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volUuid)), + Osd: fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volId)), } var ents []cephEntity args := [...]string{ - "auth", "-f", "json", "-c", getCephConfPath(volUuid), "-n", cephEntityClientPrefix + cr.id, - "get-or-create", cephEntityClientPrefix + getCephUserName(volUuid), + "auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + cr.id, + "get-or-create", cephEntityClientPrefix + getCephUserName(volId), "mds", caps.Mds, "mon", caps.Mon, "osd", caps.Osd, @@ -80,11 +80,11 @@ func createCephUser(volOptions *volumeOptions, cr *credentials, volUuid string) return &ents[0], nil } -func deleteCephUser(cr *credentials, volUuid string) error { - userId := getCephUserName(volUuid) +func deleteCephUser(adminCr *credentials, volId volumeID) error { + userId := getCephUserName(volId) args := [...]string{ - "-c", getCephConfPath(volUuid), "-n", cephEntityClientPrefix + cr.id, + "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id, "auth", "rm", cephEntityClientPrefix + userId, } @@ -92,8 +92,8 @@ func deleteCephUser(cr *credentials, volUuid string) error { return err } - os.Remove(getCephKeyringPath(volUuid, userId)) - os.Remove(getCephSecretPath(volUuid, userId)) + os.Remove(getCephKeyringPath(volId, userId)) + os.Remove(getCephSecretPath(volId, userId)) return nil } diff --git a/pkg/cephfs/controllercache.go b/pkg/cephfs/controllercache.go new file mode 100644 index 000000000..c757028dc --- /dev/null +++ b/pkg/cephfs/controllercache.go @@ -0,0 +1,128 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path" + "strings" + "sync" + + "github.com/golang/glog" +) + +const ( + controllerCacheRoot = PluginFolder + "/controller/plugin-cache" +) + +type controllerCacheEntry struct { + VolOptions volumeOptions + VolumeID volumeID +} + +type controllerCacheMap map[volumeID]*controllerCacheEntry + +var ( + ctrCache = make(controllerCacheMap) + ctrCacheMtx sync.Mutex +) + +// Load all .json files from controllerCacheRoot into ctrCache +// Called from driver.go's Run() +func loadControllerCache() error { + cacheDir, err := ioutil.ReadDir(controllerCacheRoot) + if err != nil { + return fmt.Errorf("cannot read controller cache from %s: %v", controllerCacheRoot, err) + } + + ctrCacheMtx.Lock() + defer ctrCacheMtx.Unlock() + + for _, fi := range cacheDir { + if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() { + continue + } + + f, err := os.Open(path.Join(controllerCacheRoot, fi.Name())) + if err != nil { + glog.Errorf("cephfs: cloudn't read '%s' from controller cache: %v", fi.Name(), err) + continue + } + + d := json.NewDecoder(f) + ent := &controllerCacheEntry{} + + if err = d.Decode(ent); err != nil { + glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err) + } else { + ctrCache[ent.VolumeID] = ent + } + + f.Close() + } + + return nil +} + +func getControllerCacheEntryPath(volId volumeID) string { + return path.Join(controllerCacheRoot, string(volId)+".json") +} + +func (m controllerCacheMap) insert(ent *controllerCacheEntry) error { + filePath := getControllerCacheEntryPath(ent.VolumeID) + + ctrCacheMtx.Lock() + defer ctrCacheMtx.Unlock() + + f, err := os.Create(filePath) + if err != nil { + return fmt.Errorf("couldn't create cache entry file '%s': %v", filePath, err) + } + defer f.Close() + + enc := json.NewEncoder(f) + if err = enc.Encode(ent); err != nil { + return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.VolumeID, err) + } + + m[ent.VolumeID] = ent + + return nil +} + +func (m controllerCacheMap) pop(volId volumeID) (*controllerCacheEntry, error) { + ctrCacheMtx.Lock() + defer ctrCacheMtx.Unlock() + + ent, ok := m[volId] + if !ok { + return nil, fmt.Errorf("cache entry for volume %s does not exist", volId) + } + + filePath := getControllerCacheEntryPath(volId) + + if err := os.Remove(filePath); err != nil { + return nil, fmt.Errorf("failed to remove cache entry file '%s': %v", filePath, err) + } + + delete(m, volId) + + return ent, nil +} diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index dd1122c22..144e8668f 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -17,8 +17,6 @@ limitations under the License. package cephfs import ( - "fmt" - "github.com/golang/glog" "golang.org/x/net/context" "google.golang.org/grpc/codes" @@ -36,30 +34,6 @@ const ( oneGB = 1073741824 ) -func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { - if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { - return fmt.Errorf("invalid CreateVolumeRequest: %v", err) - } - - if req.GetName() == "" { - return status.Error(codes.InvalidArgument, "Volume Name cannot be empty") - } - - if req.GetVolumeCapabilities() == nil { - return status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty") - } - - return nil -} - -func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error { - if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { - return fmt.Errorf("invalid DeleteVolumeRequest: %v", err) - } - - return nil -} - func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { if err := cs.validateCreateVolumeRequest(req); err != nil { glog.Errorf("CreateVolumeRequest validation failed: %v", err) @@ -74,41 +48,37 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.InvalidArgument, err.Error()) } - volId := newVolumeIdentifier(volOptions, req) + volId := newVolumeID() - conf := cephConfigData{Monitors: volOptions.Monitors, VolumeUuid: volId.uuid} + conf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volId} if err = conf.writeToFile(); err != nil { - glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volId.uuid), err) + glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volId), err) return nil, status.Error(codes.Internal, err.Error()) } // Create a volume in case the user didn't provide one if volOptions.ProvisionVolume { - // Admin access is required + // Admin credentials are required cr, err := getAdminCredentials(req.GetControllerCreateSecrets()) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } - if err = storeCephAdminCredentials(volId.uuid, cr); err != nil { + if err = storeCephCredentials(volId, cr); err != nil { glog.Errorf("failed to store admin credentials for '%s': %v", cr.id, err) return nil, status.Error(codes.Internal, err.Error()) } - if err = createVolume(volOptions, cr, volId.uuid, req.GetCapacityRange().GetRequiredBytes()); err != nil { - glog.Errorf("failed to create volume %s: %v", volId.name, err) + if err = createVolume(volOptions, cr, volId, req.GetCapacityRange().GetRequiredBytes()); err != nil { + glog.Errorf("failed to create volume %s: %v", req.GetName(), err) return nil, status.Error(codes.Internal, err.Error()) } - glog.V(4).Infof("cephfs: volume %s successfuly created", volId.id) + glog.Infof("cephfs: successfuly created volume %s", volId) } else { - glog.V(4).Infof("cephfs: volume %s is provisioned statically", volId.id) - } - - if err = volCache.insert(&volumeCacheEntry{Identifier: *volId, VolOptions: *volOptions}); err != nil { - glog.Warningf("failed to store a volume cache entry: %v", err) + glog.Infof("cephfs: volume %s is provisioned statically", volId) } sz := req.GetCapacityRange().GetRequiredBytes() @@ -116,9 +86,14 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol sz = oneGB } + if err = ctrCache.insert(&controllerCacheEntry{VolOptions: *volOptions, VolumeID: volId}); err != nil { + glog.Errorf("failed to store a cache entry for volume %s: %v", volId, err) + return nil, status.Error(codes.Internal, err.Error()) + } + return &csi.CreateVolumeResponse{ Volume: &csi.Volume{ - Id: volId.id, + Id: string(volId), CapacityBytes: sz, Attributes: req.GetParameters(), }, @@ -132,27 +107,35 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol } var ( - volId = req.GetVolumeId() - volUuid = uuidFromVolumeId(volId) + volId = volumeID(req.GetVolumeId()) + err error ) // Load volume info from cache - ent, found := volCache.get(volUuid) - if !found { - msg := fmt.Sprintf("failed to retrieve cache entry for volume %s", volId) - glog.Error(msg) - return nil, status.Error(codes.Internal, msg) + ent, err := ctrCache.pop(volId) + if err != nil { + glog.Error(err) + return nil, status.Error(codes.Internal, err.Error()) } if !ent.VolOptions.ProvisionVolume { // DeleteVolume() is forbidden for statically provisioned volumes! - msg := fmt.Sprintf("volume %s is provisioned statically, aborting delete", volId) - glog.Warningf(msg) + + glog.Warningf("volume %s is provisioned statically, aborting delete", volId) return &csi.DeleteVolumeResponse{}, nil } - // Requires admin credentials + defer func() { + if err != nil { + // Reinsert cache entry for retry + if insErr := ctrCache.insert(ent); insErr != nil { + glog.Errorf("failed to reinsert volume cache entry in rollback procedure for volume %s: %v", volId, err) + } + } + }() + + // Deleting a volume requires admin credentials cr, err := getAdminCredentials(req.GetControllerDeleteSecrets()) if err != nil { @@ -160,24 +143,12 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.InvalidArgument, err.Error()) } - // Remove the volume, the user and the volume cache entry - if err = purgeVolume(volId, cr, &ent.VolOptions); err != nil { glog.Errorf("failed to delete volume %s: %v", volId, err) return nil, status.Error(codes.Internal, err.Error()) } - if err = deleteCephUser(cr, volUuid); err != nil { - glog.Errorf("failed to delete ceph user %s: %v", getCephUserName(volUuid), err) - return nil, status.Error(codes.Internal, err.Error()) - } - - if err = volCache.erase(volUuid); err != nil { - glog.Errorf("failed to delete cache entry for volume %s: %v", volId, err) - return nil, status.Error(codes.Internal, err.Error()) - } - - glog.V(4).Infof("cephfs: volume %s successfuly deleted", volId) + glog.Infof("cephfs: successfuly deleted volume %s", volId) return &csi.DeleteVolumeResponse{}, nil } diff --git a/pkg/cephfs/driver.go b/pkg/cephfs/driver.go index 153c6d900..ece2c5ccb 100644 --- a/pkg/cephfs/driver.go +++ b/pkg/cephfs/driver.go @@ -27,7 +27,7 @@ import ( const ( PluginFolder = "/var/lib/kubelet/plugins/csi-cephfsplugin" - Version = "0.2.0" + Version = "0.3.0" ) type cephfsDriver struct { @@ -81,12 +81,12 @@ func (fs *cephfsDriver) Run(driverName, nodeId, endpoint, volumeMounter string) // Configuration - if err := os.MkdirAll(volumeCacheRoot, 0755); err != nil { - glog.Fatalf("cephfs: failed to create %s: %v", volumeCacheRoot, err) + if err := os.MkdirAll(controllerCacheRoot, 0755); err != nil { + glog.Fatalf("cephfs: failed to create %s: %v", controllerCacheRoot, err) return } - if err := loadVolumeCache(); err != nil { + if err := loadControllerCache(); err != nil { glog.Errorf("cephfs: failed to read volume cache: %v", err) } diff --git a/pkg/cephfs/nodecache.go b/pkg/cephfs/nodecache.go new file mode 100644 index 000000000..e05b7d1b3 --- /dev/null +++ b/pkg/cephfs/nodecache.go @@ -0,0 +1,55 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cephfs + +import ( + "fmt" + "sync" +) + +type nodeCacheEntry struct { + volOptions *volumeOptions + cephAdminID string +} + +type nodeCacheMap map[volumeID]*nodeCacheEntry + +var ( + nodeCache = make(nodeCacheMap) + nodeCacheMtx sync.Mutex +) + +func (m nodeCacheMap) insert(volId volumeID, ent *nodeCacheEntry) { + nodeCacheMtx.Lock() + defer nodeCacheMtx.Unlock() + + m[volId] = ent +} + +func (m nodeCacheMap) pop(volId volumeID) (*nodeCacheEntry, error) { + nodeCacheMtx.Lock() + defer nodeCacheMtx.Unlock() + + ent, ok := m[volId] + if !ok { + return nil, fmt.Errorf("node cache entry for volume %s not found", volId) + } + + delete(m, volId) + + return ent, nil +} diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index d66b6a89a..f288a4390 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -32,38 +32,10 @@ type nodeServer struct { *csicommon.DefaultNodeServer } -func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { - if req.GetVolumeCapability() == nil { - return status.Error(codes.InvalidArgument, "Volume capability missing in request") - } - - if req.GetVolumeId() == "" { - return status.Error(codes.InvalidArgument, "Volume ID missing in request") - } - - if req.GetTargetPath() == "" { - return status.Error(codes.InvalidArgument, "Target path missing in request") - } - - return nil -} - -func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error { - if req.GetVolumeId() == "" { - return status.Error(codes.InvalidArgument, "Volume ID missing in request") - } - - if req.GetTargetPath() == "" { - return status.Error(codes.InvalidArgument, "Target path missing in request") - } - - return nil -} - -func handleUser(volOptions *volumeOptions, volUuid string, req *csi.NodePublishVolumeRequest) (*credentials, error) { +func getOrCreateUser(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { var ( - cr = &credentials{} - err error + userCr = &credentials{} + err error ) // Retrieve the credentials (possibly create a new user as well) @@ -73,71 +45,123 @@ func handleUser(volOptions *volumeOptions, volUuid string, req *csi.NodePublishV // First, store admin credentials - those are needed for creating a user - adminCr, err := getAdminCredentials(req.GetNodePublishSecrets()) + adminCr, err := getAdminCredentials(req.GetNodeStageSecrets()) if err != nil { return nil, err } - if err = storeCephAdminCredentials(volUuid, adminCr); err != nil { + if err = storeCephCredentials(volId, adminCr); err != nil { return nil, err } + nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions, cephAdminID: adminCr.id}) + // Then create the user - if ent, err := createCephUser(volOptions, adminCr, volUuid); err != nil { + if ent, err := createCephUser(volOptions, adminCr, volId); err != nil { return nil, err } else { - cr.id = ent.Entity[len(cephEntityClientPrefix):] - cr.key = ent.Key + userCr.id = ent.Entity[len(cephEntityClientPrefix):] + userCr.key = ent.Key } // Set the correct volume root path - volOptions.RootPath = getVolumeRootPath_ceph(volUuid) + volOptions.RootPath = getVolumeRootPath_ceph(volId) } else { // The volume is pre-made, credentials are supplied by the user - cr, err = getUserCredentials(req.GetNodePublishSecrets()) - + userCr, err = getUserCredentials(req.GetNodeStageSecrets()) if err != nil { return nil, err } + + nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions}) } - if err = storeCephUserCredentials(volUuid, cr, volOptions); err != nil { + if err = storeCephCredentials(volId, userCr); err != nil { return nil, err } - return cr, nil + return userCr, nil +} + +func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { + if err := validateNodeStageVolumeRequest(req); err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + // Configuration + + stagingTargetPath := req.GetStagingTargetPath() + volId := volumeID(req.GetVolumeId()) + + volOptions, err := newVolumeOptions(req.GetVolumeAttributes()) + if err != nil { + glog.Errorf("error reading volume options for volume %s: %v", volId, err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + if err = createMountPoint(stagingTargetPath); err != nil { + glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err) + return nil, status.Error(codes.Internal, err.Error()) + } + + cephConf := cephConfigData{Monitors: volOptions.Monitors, VolumeID: volId} + if err = cephConf.writeToFile(); err != nil { + glog.Errorf("failed to write ceph config file to %s for volume %s: %v", getCephConfPath(volId), volId, err) + return nil, status.Error(codes.Internal, err.Error()) + } + + // Check if the volume is already mounted + + isMnt, err := isMountPoint(stagingTargetPath) + + if err != nil { + glog.Errorf("stat failed: %v", err) + return nil, status.Error(codes.Internal, err.Error()) + } + + if isMnt { + glog.Infof("cephfs: volume %s is already mounted to %s, skipping", volId, stagingTargetPath) + return &csi.NodeStageVolumeResponse{}, nil + } + + // It's not, mount now + + cr, err := getOrCreateUser(volOptions, volId, req) + if err != nil { + glog.Error(err) + return nil, status.Error(codes.Internal, err.Error()) + } + + m := newMounter(volOptions) + glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name()) + + if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil { + glog.Errorf("failed to mount volume %s: %v", volId, err) + return nil, status.Error(codes.Internal, err.Error()) + } + + glog.Infof("cephfs: successfully mounted volume %s to %s", volId, stagingTargetPath) + + return &csi.NodeStageVolumeResponse{}, nil } func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (*csi.NodePublishVolumeResponse, error) { if err := validateNodePublishVolumeRequest(req); err != nil { - return nil, err + return nil, status.Error(codes.InvalidArgument, err.Error()) } // Configuration targetPath := req.GetTargetPath() volId := req.GetVolumeId() - volUuid := uuidFromVolumeId(volId) - volOptions, err := newVolumeOptions(req.GetVolumeAttributes()) - if err != nil { - glog.Errorf("error reading volume options: %v", err) - return nil, status.Error(codes.InvalidArgument, err.Error()) - } - - if err = createMountPoint(targetPath); err != nil { + if err := createMountPoint(targetPath); err != nil { glog.Errorf("failed to create mount point at %s: %v", targetPath, err) return nil, status.Error(codes.Internal, err.Error()) } - conf := cephConfigData{Monitors: volOptions.Monitors, VolumeUuid: volUuid} - if err = conf.writeToFile(); err != nil { - glog.Errorf("failed to write ceph config file to %s: %v", getCephConfPath(volUuid), err) - return nil, status.Error(codes.Internal, err.Error()) - } - // Check if the volume is already mounted isMnt, err := isMountPoint(targetPath) @@ -148,36 +172,27 @@ func (ns *nodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublis } if isMnt { - glog.V(4).Infof("cephfs: volume %s is already mounted to %s", volId, targetPath) + glog.Infof("cephfs: volume %s is already bind-mounted to %s", volId, targetPath) return &csi.NodePublishVolumeResponse{}, nil } // It's not, mount now - cr, err := handleUser(volOptions, volUuid, req) - - if err != nil { - glog.Error(err) + if err = bindMount(req.GetStagingTargetPath(), req.GetTargetPath(), req.GetReadonly()); err != nil { + glog.Errorf("failed to bind-mount volume %s: %v", volId, err) return nil, status.Error(codes.Internal, err.Error()) } - m := newMounter(volOptions) - if err = m.mount(targetPath, cr, volOptions, volUuid, req.GetReadonly()); err != nil { - glog.Errorf("failed to mount volume %s: %v", volId, err) - return nil, status.Error(codes.Internal, err.Error()) - } - - glog.V(4).Infof("cephfs: volume %s successfuly mounted to %s", volId, targetPath) + glog.Infof("cephfs: successfuly bind-mounted volume %s to %s", volId, targetPath) return &csi.NodePublishVolumeResponse{}, nil } func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { if err := validateNodeUnpublishVolumeRequest(req); err != nil { - return nil, err + return nil, status.Error(codes.InvalidArgument, err.Error()) } - volId := req.GetVolumeId() targetPath := req.GetTargetPath() // Unmount the bind-mount @@ -185,30 +200,63 @@ func (ns *nodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpu return nil, status.Error(codes.Internal, err.Error()) } - localVolRoot := getVolumeRootPath_local(uuidFromVolumeId(volId)) + os.Remove(targetPath) - // Unmount the volume root - if err := unmountVolume(localVolRoot); err != nil { - return nil, status.Error(codes.Internal, err.Error()) - } - - os.Remove(localVolRoot) - - glog.V(4).Infof("cephfs: volume %s successfuly unmounted from %s", volId, req.GetTargetPath()) + glog.Infof("cephfs: successfuly unbinded volume %s from %s", req.GetVolumeId(), targetPath) return &csi.NodeUnpublishVolumeResponse{}, nil } -func (ns *nodeServer) NodeStageVolume( - ctx context.Context, - req *csi.NodeStageVolumeRequest) ( - *csi.NodeStageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") +func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { + if err := validateNodeUnstageVolumeRequest(req); err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + volId := volumeID(req.GetVolumeId()) + stagingTargetPath := req.GetStagingTargetPath() + + // Unmount the volume + if err := unmountVolume(stagingTargetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + os.Remove(stagingTargetPath) + + ent, err := nodeCache.pop(volId) + if err != nil { + glog.Error(err) + return nil, status.Error(codes.Internal, err.Error()) + } + + if ent.volOptions.ProvisionVolume { + // We've created a dedicated Ceph user in NodeStageVolume, + // it's about to be deleted here. + + if err = deleteCephUser(&credentials{id: ent.cephAdminID}, volId); err != nil { + glog.Errorf("failed to delete ceph user %s for volume %s: %v", getCephUserName(volId), volId, err) + + // Reinsert cache entry for retry + nodeCache.insert(volId, ent) + + return nil, status.Error(codes.Internal, err.Error()) + } + } + + glog.Infof("cephfs: successfuly umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath) + + return &csi.NodeUnstageVolumeResponse{}, nil } -func (ns *nodeServer) NodeUnstageVolume( - ctx context.Context, - req *csi.NodeUnstageVolumeRequest) ( - *csi.NodeUnstageVolumeResponse, error) { - return nil, status.Error(codes.Unimplemented, "") +func (ns *nodeServer) NodeGetCapabilities(ctx context.Context, req *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { + return &csi.NodeGetCapabilitiesResponse{ + Capabilities: []*csi.NodeServiceCapability{ + { + Type: &csi.NodeServiceCapability_Rpc{ + Rpc: &csi.NodeServiceCapability_RPC{ + Type: csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + }, + }, + }, + }, + }, nil } diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 96aa56c36..5e3102313 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -26,17 +26,25 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/kubernetes/pkg/util/keymutex" + "github.com/container-storage-interface/spec/lib/go/csi/v0" + "github.com/pborman/uuid" "k8s.io/kubernetes/pkg/util/mount" ) +type volumeID string + +func newVolumeID() volumeID { + return volumeID("csi-cephfs-" + uuid.NewUUID().String()) +} + func execCommand(command string, args ...string) ([]byte, error) { + glog.V(4).Infof("cephfs: EXEC %s %s", command, args) + cmd := exec.Command(command, args...) return cmd.CombinedOutput() } func execCommandAndValidate(program string, args ...string) error { - glog.V(4).Infof("cephfs: executing command: %s with args: %s", program, args) out, err := execCommand(program, args...) if err != nil { return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out) @@ -65,49 +73,21 @@ func isMountPoint(p string) (bool, error) { return !notMnt, nil } -func tryLock(id string, mtx keymutex.KeyMutex, name string) error { - // TODO uncomment this once TryLockKey gets into Kubernetes - /* - if !mtx.TryLockKey(id) { - msg := fmt.Sprintf("%s has a pending operation on %s", name, req.GetVolumeId()) - glog.Infoln(msg) - - return status.Error(codes.Aborted, msg) - } - */ - - return nil -} - -func storeCephUserCredentials(volUuid string, cr *credentials, volOptions *volumeOptions) error { +func storeCephCredentials(volId volumeID, cr *credentials) error { keyringData := cephKeyringData{ - UserId: cr.id, - Key: cr.key, - RootPath: volOptions.RootPath, - VolumeUuid: volUuid, + UserId: cr.id, + Key: cr.key, + VolumeID: volId, } - if volOptions.ProvisionVolume { - keyringData.Pool = volOptions.Pool - keyringData.Namespace = getVolumeNamespace(volUuid) - } - - return storeCephCredentials(volUuid, cr, &keyringData) -} - -func storeCephAdminCredentials(volUuid string, cr *credentials) error { - return storeCephCredentials(volUuid, cr, &cephFullCapsKeyringData{UserId: cr.id, Key: cr.key, VolumeUuid: volUuid}) -} - -func storeCephCredentials(volUuid string, cr *credentials, keyringData cephConfigWriter) error { if err := keyringData.writeToFile(); err != nil { return err } secret := cephSecretData{ - UserId: cr.id, - Key: cr.key, - VolumeUuid: volUuid, + UserId: cr.id, + Key: cr.key, + VolumeID: volId, } if err := secret.writeToFile(); err != nil { @@ -124,8 +104,6 @@ func newMounter(volOptions *volumeOptions) volumeMounter { mounter = DefaultVolumeMounter } - glog.V(4).Infof("cephfs: setting volume mounter to: %s", mounter) - switch mounter { case volumeMounter_fuse: return &fuseMounter{} @@ -135,3 +113,95 @@ func newMounter(volOptions *volumeOptions) volumeMounter { return nil } + +// +// Controller service request validation +// + +func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error { + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { + return fmt.Errorf("invalid CreateVolumeRequest: %v", err) + } + + if req.GetName() == "" { + return status.Error(codes.InvalidArgument, "Volume Name cannot be empty") + } + + if req.GetVolumeCapabilities() == nil { + return status.Error(codes.InvalidArgument, "Volume Capabilities cannot be empty") + } + + return nil +} + +func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error { + if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil { + return fmt.Errorf("invalid DeleteVolumeRequest: %v", err) + } + + return nil +} + +// +// Node service request validation +// + +func validateNodeStageVolumeRequest(req *csi.NodeStageVolumeRequest) error { + if req.GetVolumeCapability() == nil { + return fmt.Errorf("volume capability missing in request") + } + + if req.GetVolumeId() == "" { + return fmt.Errorf("volume ID missing in request") + } + + if req.GetStagingTargetPath() == "" { + return fmt.Errorf("staging target path missing in request") + } + + if req.GetNodeStageSecrets() == nil || len(req.GetNodeStageSecrets()) == 0 { + return fmt.Errorf("stage secrets cannot be nil or empty") + } + + return nil +} + +func validateNodeUnstageVolumeRequest(req *csi.NodeUnstageVolumeRequest) error { + if req.GetVolumeId() == "" { + return fmt.Errorf("volume ID missing in request") + } + + if req.GetStagingTargetPath() == "" { + return fmt.Errorf("staging target path missing in request") + } + + return nil +} + +func validateNodePublishVolumeRequest(req *csi.NodePublishVolumeRequest) error { + if req.GetVolumeCapability() == nil { + return fmt.Errorf("volume capability missing in request") + } + + if req.GetVolumeId() == "" { + return fmt.Errorf("volume ID missing in request") + } + + if req.GetTargetPath() == "" { + return fmt.Errorf("varget path missing in request") + } + + return nil +} + +func validateNodeUnpublishVolumeRequest(req *csi.NodeUnpublishVolumeRequest) error { + if req.GetVolumeId() == "" { + return fmt.Errorf("volume ID missing in request") + } + + if req.GetTargetPath() == "" { + return fmt.Errorf("target path missing in request") + } + + return nil +} diff --git a/pkg/cephfs/volume.go b/pkg/cephfs/volume.go index 7e0f1e379..7c531055a 100644 --- a/pkg/cephfs/volume.go +++ b/pkg/cephfs/volume.go @@ -22,44 +22,35 @@ import ( "path" ) -// Volumes are mounted in .../controller/volumes/vol-{UUID} -// The actual user data resides in .../vol-{UUID}/volume-data -// purgeVolume moves the user data to .../vol-{UUID}/volume-deleting and only then calls os.RemoveAll - const ( - cephRootPrefix = PluginFolder + "/controller/volumes/root-" - cephVolumePrefix = PluginFolder + "/controller/volumes/vol-" - cephVolumesRoot = "csi-volumes" + cephRootPrefix = PluginFolder + "/controller/volumes/root-" + cephVolumesRoot = "csi-volumes" - namespacePrefix = "csi-ns-" + namespacePrefix = "ns-" ) -func getCephRootPath_local(volUuid string) string { - return cephRootPrefix + volUuid +func getCephRootPath_local(volId volumeID) string { + return cephRootPrefix + string(volId) } -func getCephRootVolumePath_local(volUuid string) string { - return path.Join(getCephRootPath_local(volUuid), cephVolumesRoot, volUuid) +func getCephRootVolumePath_local(volId volumeID) string { + return path.Join(getCephRootPath_local(volId), cephVolumesRoot, string(volId)) } -func getVolumeRootPath_local(volUuid string) string { - return cephVolumePrefix + volUuid +func getVolumeRootPath_ceph(volId volumeID) string { + return path.Join("/", cephVolumesRoot, string(volId)) } -func getVolumeRootPath_ceph(volUuid string) string { - return path.Join("/", cephVolumesRoot, volUuid) -} - -func getVolumeNamespace(volUuid string) string { - return namespacePrefix + volUuid +func getVolumeNamespace(volId volumeID) string { + return namespacePrefix + string(volId) } func setVolumeAttribute(root, attrName, attrValue string) error { return execCommandAndValidate("setfattr", "-n", attrName, "-v", attrValue, root) } -func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid string, bytesQuota int64) error { - cephRoot := getCephRootPath_local(volUuid) +func createVolume(volOptions *volumeOptions, adminCr *credentials, volId volumeID, bytesQuota int64) error { + cephRoot := getCephRootPath_local(volId) if err := createMountPoint(cephRoot); err != nil { return err @@ -69,7 +60,7 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin // Access to cephfs's / is required volOptions.RootPath = "/" - if err := mountKernel(cephRoot, adminCr, volOptions, volUuid); err != nil { + if err := mountKernel(cephRoot, adminCr, volOptions, volId); err != nil { return fmt.Errorf("error mounting ceph root: %v", err) } @@ -78,8 +69,8 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin os.Remove(cephRoot) }() - volOptions.RootPath = getVolumeRootPath_ceph(volUuid) - localVolRoot := getCephRootVolumePath_local(volUuid) + volOptions.RootPath = getVolumeRootPath_ceph(volId) + localVolRoot := getCephRootVolumePath_local(volId) if err := createMountPoint(localVolRoot); err != nil { return err @@ -93,21 +84,20 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin return fmt.Errorf("%v\ncephfs: Does pool '%s' exist?", err, volOptions.Pool) } - if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volUuid)); err != nil { + if err := setVolumeAttribute(localVolRoot, "ceph.dir.layout.pool_namespace", getVolumeNamespace(volId)); err != nil { return err } return nil } -func purgeVolume(volId string, cr *credentials, volOptions *volumeOptions) error { +func purgeVolume(volId volumeID, cr *credentials, volOptions *volumeOptions) error { // Root path is not set for dynamically provisioned volumes volOptions.RootPath = "/" var ( - volUuid = uuidFromVolumeId(volId) - root = getCephRootPath_local(volUuid) - volRoot = getCephRootVolumePath_local(volUuid) + root = getCephRootPath_local(volId) + volRoot = getCephRootVolumePath_local(volId) volRootDeleting = volRoot + "-deleting" ) @@ -115,7 +105,7 @@ func purgeVolume(volId string, cr *credentials, volOptions *volumeOptions) error return err } - if err := mountKernel(root, cr, volOptions, volUuid); err != nil { + if err := mountKernel(root, cr, volOptions, volId); err != nil { return err } diff --git a/pkg/cephfs/volumecache.go b/pkg/cephfs/volumecache.go deleted file mode 100644 index 475cb6471..000000000 --- a/pkg/cephfs/volumecache.go +++ /dev/null @@ -1,131 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cephfs - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" - "path" - "strings" - "sync" - - "github.com/golang/glog" -) - -const ( - volumeCacheRoot = PluginFolder + "/controller/volume-cache" -) - -type volumeCacheEntry struct { - VolOptions volumeOptions - Identifier volumeIdentifier -} - -type volumeCache struct { - entries map[string]*volumeCacheEntry -} - -var ( - volCache volumeCache - volCacheMtx sync.RWMutex -) - -// Loads all .json files from volumeCacheRoot into volCache -// Called from driver.go's Run() -func loadVolumeCache() error { - cacheDir, err := ioutil.ReadDir(volumeCacheRoot) - if err != nil { - return fmt.Errorf("cannot read volume cache: %v", err) - } - - volCacheMtx.Lock() - defer volCacheMtx.Unlock() - - volCache.entries = make(map[string]*volumeCacheEntry) - - for _, fi := range cacheDir { - if !strings.HasSuffix(fi.Name(), ".json") || !fi.Mode().IsRegular() { - continue - } - - f, err := os.Open(path.Join(volumeCacheRoot, fi.Name())) - if err != nil { - glog.Errorf("cephfs: couldn't read '%s' from volume cache: %v", fi.Name(), err) - continue - } - - d := json.NewDecoder(f) - ent := &volumeCacheEntry{} - - if err = d.Decode(ent); err != nil { - glog.Errorf("cephfs: failed to parse '%s': %v", fi.Name(), err) - } else { - volCache.entries[ent.Identifier.uuid] = ent - } - - f.Close() - } - - return nil -} - -func getVolumeCacheEntryPath(volUuid string) string { - return path.Join(volumeCacheRoot, fmt.Sprintf("vol-%s.json", volUuid)) -} - -func (vc *volumeCache) insert(ent *volumeCacheEntry) error { - filePath := getVolumeCacheEntryPath(ent.Identifier.uuid) - - volCacheMtx.Lock() - defer volCacheMtx.Unlock() - - f, err := os.Create(filePath) - if err != nil { - return fmt.Errorf("couldn't create cache entry file %s: %v", filePath, err) - } - defer f.Close() - - e := json.NewEncoder(f) - if err = e.Encode(ent); err != nil { - return fmt.Errorf("failed to encode cache entry for volume %s: %v", ent.Identifier.id, err) - } - - vc.entries[ent.Identifier.uuid] = ent - - return nil -} - -func (vc *volumeCache) erase(volUuid string) error { - volCacheMtx.Lock() - delete(vc.entries, volUuid) - volCacheMtx.Unlock() - - return os.Remove(getVolumeCacheEntryPath(volUuid)) -} - -func (vc *volumeCache) get(volUuid string) (volumeCacheEntry, bool) { - volCacheMtx.RLock() - defer volCacheMtx.RUnlock() - - if ent, ok := vc.entries[volUuid]; ok { - return *ent, true - } else { - return volumeCacheEntry{}, false - } -} diff --git a/pkg/cephfs/volumeidentifier.go b/pkg/cephfs/volumeidentifier.go deleted file mode 100644 index 3380e4030..000000000 --- a/pkg/cephfs/volumeidentifier.go +++ /dev/null @@ -1,60 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cephfs - -import ( - "github.com/container-storage-interface/spec/lib/go/csi/v0" - "github.com/pborman/uuid" -) - -const ( - dynamicallyProvisionedVolumePrefix = "csi-cephfs-dyn-" - staticallyProvisionedVolumePrefix = "csi-cephfs-sta-" - volumePrefixLen = len(dynamicallyProvisionedVolumePrefix) -) - -type volumeIdentifier struct { - name, uuid, id string -} - -func newVolumeIdentifier(volOptions *volumeOptions, req *csi.CreateVolumeRequest) *volumeIdentifier { - volId := volumeIdentifier{ - name: req.GetName(), - uuid: uuid.NewUUID().String(), - } - - prefix := staticallyProvisionedVolumePrefix - if volOptions.ProvisionVolume { - prefix = dynamicallyProvisionedVolumePrefix - } - - volId.id = prefix + volId.uuid - - return &volId -} - -func uuidFromVolumeId(volId string) string { - return volId[volumePrefixLen:] -} - -func isDynProvision(volId string) bool { - if len(volId) <= volumePrefixLen { - return false - } - - return volId[:volumePrefixLen] == dynamicallyProvisionedVolumePrefix -} diff --git a/pkg/cephfs/volumemounter.go b/pkg/cephfs/volumemounter.go index 055690371..7cb5263b6 100644 --- a/pkg/cephfs/volumemounter.go +++ b/pkg/cephfs/volumemounter.go @@ -28,17 +28,18 @@ const ( ) type volumeMounter interface { - mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error + mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error + name() string } type fuseMounter struct{} -func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string) error { +func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error { args := [...]string{ mountPoint, - "-c", getCephConfPath(volUuid), + "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + cr.id, - "--keyring", getCephKeyringPath(volUuid, cr.id), + "--keyring", getCephKeyringPath(volId, cr.id), "-r", volOptions.RootPath, "-o", "nonempty", } @@ -55,27 +56,19 @@ func mountFuse(mountPoint string, cr *credentials, volOptions *volumeOptions, vo return nil } -func (m *fuseMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error { +func (m *fuseMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error { if err := createMountPoint(mountPoint); err != nil { return err } - localVolRoot := getVolumeRootPath_local(volUuid) - - if err := createMountPoint(localVolRoot); err != nil { - return err - } - - if err := mountFuse(localVolRoot, cr, volOptions, volUuid); err != nil { - return err - } - - return bindVolume(volUuid, mountPoint, readOnly) + return mountFuse(mountPoint, cr, volOptions, volId) } +func (m *fuseMounter) name() string { return "Ceph FUSE driver" } + type kernelMounter struct{} -func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string) error { +func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error { if err := execCommandAndValidate("modprobe", "ceph"); err != nil { return err } @@ -85,52 +78,34 @@ func mountKernel(mountPoint string, cr *credentials, volOptions *volumeOptions, fmt.Sprintf("%s:%s", volOptions.Monitors, volOptions.RootPath), mountPoint, "-o", - fmt.Sprintf("name=%s,secretfile=%s", cr.id, getCephSecretPath(volUuid, cr.id)), + fmt.Sprintf("name=%s,secretfile=%s", cr.id, getCephSecretPath(volId, cr.id)), ) } -func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volUuid string, readOnly bool) error { +func (m *kernelMounter) mount(mountPoint string, cr *credentials, volOptions *volumeOptions, volId volumeID) error { if err := createMountPoint(mountPoint); err != nil { return err } - localVolRoot := getVolumeRootPath_local(volUuid) - - if err := createMountPoint(localVolRoot); err != nil { - return err - } - - if err := mountKernel(localVolRoot, cr, volOptions, volUuid); err != nil { - return err - } - - return bindVolume(volUuid, mountPoint, readOnly) + return mountKernel(mountPoint, cr, volOptions, volId) } +func (m *kernelMounter) name() string { return "Ceph kernel client" } + func bindMount(from, to string, readOnly bool) error { if err := execCommandAndValidate("mount", "--bind", from, to); err != nil { - return fmt.Errorf("failed bind-mount of %s to %s: %v", from, to, err) + return fmt.Errorf("failed to bind-mount %s to %s: %v", from, to, err) } if readOnly { if err := execCommandAndValidate("mount", "-o", "remount,ro,bind", to); err != nil { - return err + return fmt.Errorf("failed read-only remount of %s: %v", to, err) } } return nil } -func bindVolume(volUuid, target string, readOnly bool) error { - volDataRoot := getVolumeRootPath_local(volUuid) - - if err := createMountPoint(volDataRoot); err != nil { - return err - } - - return bindMount(volDataRoot, target, readOnly) -} - func unmountVolume(mountPoint string) error { return execCommandAndValidate("umount", mountPoint) }