Merge pull request #71 from gman0/cephfs-ceph-user

csi-cephfs now creates/deletes dedicated ceph user in CreateVolume/DeleteVolume
This commit is contained in:
Huamin Chen 2018-09-04 09:35:06 -04:00 committed by GitHub
commit 06e6fed9ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 68 additions and 111 deletions

View File

@ -17,6 +17,8 @@ limitations under the License.
package cephfs package cephfs
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"os" "os"
) )
@ -38,26 +40,47 @@ type cephEntity struct {
Caps cephEntityCaps `json:"caps"` Caps cephEntityCaps `json:"caps"`
} }
func (ent *cephEntity) toCredentials() *credentials {
return &credentials{
id: ent.Entity[len(cephEntityClientPrefix):],
key: ent.Key,
}
}
func getCephUserName(volId volumeID) string { func getCephUserName(volId volumeID) string {
return cephUserPrefix + string(volId) return cephUserPrefix + string(volId)
} }
func getCephUser(userId string) (*cephEntity, error) { func getCephUser(adminCr *credentials, volId volumeID) (*cephEntity, error) {
entityName := cephEntityClientPrefix + userId entityName := cephEntityClientPrefix + getCephUserName(volId)
var ents []cephEntity
if err := execCommandJson(&ents, "ceph", "auth", "get", entityName); err != nil { var ents []cephEntity
return nil, err args := [...]string{
"auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id,
"get", entityName,
}
out, err := execCommand("ceph", args[:]...)
if err != nil {
return nil, fmt.Errorf("cephfs: ceph failed with following error: %s\ncephfs: ceph output: %s", err, out)
}
// Workaround for output from `ceph auth get`
// Contains non-json data: "exported keyring for ENTITY\n\n"
offset := bytes.Index(out, []byte("[{"))
if json.NewDecoder(bytes.NewReader(out[offset:])).Decode(&ents); err != nil {
return nil, fmt.Errorf("failed to decode json: %v", err)
} }
if len(ents) != 1 { if len(ents) != 1 {
return nil, fmt.Errorf("error retrieving entity %s", entityName) return nil, fmt.Errorf("got unexpected number of entities for %s: expected 1, got %d", entityName, len(ents))
} }
return &ents[0], nil return &ents[0], nil
} }
func createCephUser(volOptions *volumeOptions, cr *credentials, volId volumeID) (*cephEntity, error) { func createCephUser(volOptions *volumeOptions, adminCr *credentials, volId volumeID) (*cephEntity, error) {
caps := cephEntityCaps{ caps := cephEntityCaps{
Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPath_ceph(volId)), Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPath_ceph(volId)),
Mon: "allow r", Mon: "allow r",
@ -66,7 +89,7 @@ func createCephUser(volOptions *volumeOptions, cr *credentials, volId volumeID)
var ents []cephEntity var ents []cephEntity
args := [...]string{ args := [...]string{
"auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + cr.id, "auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id,
"get-or-create", cephEntityClientPrefix + getCephUserName(volId), "get-or-create", cephEntityClientPrefix + getCephUserName(volId),
"mds", caps.Mds, "mds", caps.Mds,
"mon", caps.Mon, "mon", caps.Mon,

View File

@ -76,6 +76,11 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
if _, err = createCephUser(volOptions, cr, volId); err != nil {
glog.Errorf("failed to create ceph user for volume %s: %v", req.GetName(), err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.Infof("cephfs: successfuly created volume %s", volId) glog.Infof("cephfs: successfuly created volume %s", volId)
} else { } else {
glog.Infof("cephfs: volume %s is provisioned statically", volId) glog.Infof("cephfs: volume %s is provisioned statically", volId)
@ -148,6 +153,11 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
if err = deleteCephUser(cr, volId); err != nil {
glog.Errorf("failed to delete ceph user for volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.Infof("cephfs: successfuly deleted volume %s", volId) glog.Infof("cephfs: successfuly deleted volume %s", volId)
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil

View File

@ -1,55 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"sync"
)
type nodeCacheEntry struct {
volOptions *volumeOptions
cephAdminID string
}
type nodeCacheMap map[volumeID]*nodeCacheEntry
var (
nodeCache = make(nodeCacheMap)
nodeCacheMtx sync.Mutex
)
func (m nodeCacheMap) insert(volId volumeID, ent *nodeCacheEntry) {
nodeCacheMtx.Lock()
defer nodeCacheMtx.Unlock()
m[volId] = ent
}
func (m nodeCacheMap) pop(volId volumeID) (*nodeCacheEntry, error) {
nodeCacheMtx.Lock()
defer nodeCacheMtx.Unlock()
ent, ok := m[volId]
if !ok {
return nil, fmt.Errorf("node cache entry for volume %s not found", volId)
}
delete(m, volId)
return ent, nil
}

View File

@ -18,6 +18,7 @@ package cephfs
import ( import (
"context" "context"
"fmt"
"os" "os"
"github.com/golang/glog" "github.com/golang/glog"
@ -32,54 +33,45 @@ type nodeServer struct {
*csicommon.DefaultNodeServer *csicommon.DefaultNodeServer
} }
func getOrCreateUser(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { func getCredentialsForVolume(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) {
var ( var (
userCr = &credentials{} userCr = &credentials{}
err error err error
) )
// Retrieve the credentials (possibly create a new user as well)
if volOptions.ProvisionVolume { if volOptions.ProvisionVolume {
// The volume is provisioned dynamically, create a dedicated user // The volume is provisioned dynamically, get the credentials directly from Ceph
// First, store admin credentials - those are needed for creating a user // First, store admin credentials - those are needed for retrieving the user credentials
adminCr, err := getAdminCredentials(req.GetNodeStageSecrets()) adminCr, err := getAdminCredentials(req.GetNodeStageSecrets())
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to get admin credentials from node stage secrets: %v", err)
} }
if err = storeCephCredentials(volId, adminCr); err != nil { if err = storeCephCredentials(volId, adminCr); err != nil {
return nil, err return nil, fmt.Errorf("failed to store ceph admin credentials: %v", err)
} }
nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions, cephAdminID: adminCr.id}) // Then get the ceph user
// Then create the user entity, err := getCephUser(adminCr, volId)
if err != nil {
if ent, err := createCephUser(volOptions, adminCr, volId); err != nil { return nil, fmt.Errorf("failed to get ceph user: %v", err)
return nil, err
} else {
userCr.id = ent.Entity[len(cephEntityClientPrefix):]
userCr.key = ent.Key
} }
// Set the correct volume root path userCr = entity.toCredentials()
volOptions.RootPath = getVolumeRootPath_ceph(volId)
} else { } else {
// The volume is pre-made, credentials are supplied by the user // The volume is pre-made, credentials are in node stage secrets
userCr, err = getUserCredentials(req.GetNodeStageSecrets()) userCr, err = getUserCredentials(req.GetNodeStageSecrets())
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to get user credentials from node stage secrets: %v", err)
} }
nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions})
} }
if err = storeCephCredentials(volId, userCr); err != nil { if err = storeCephCredentials(volId, userCr); err != nil {
return nil, err return nil, fmt.Errorf("failed to store ceph user credentials: %v", err)
} }
return userCr, nil return userCr, nil
@ -101,6 +93,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
if volOptions.ProvisionVolume {
// Dynamically provisioned volumes don't have their root path set, do it here
volOptions.RootPath = getVolumeRootPath_ceph(volId)
}
if err = createMountPoint(stagingTargetPath); err != nil { if err = createMountPoint(stagingTargetPath); err != nil {
glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err) glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -128,9 +125,9 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
// It's not, mount now // It's not, mount now
cr, err := getOrCreateUser(volOptions, volId, req) cr, err := getCredentialsForVolume(volOptions, volId, req)
if err != nil { if err != nil {
glog.Error(err) glog.Errorf("failed to get ceph credentials for volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
@ -140,6 +137,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
} }
glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name()) glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name())
if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil { if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil {
glog.Errorf("failed to mount volume %s: %v", volId, err) glog.Errorf("failed to mount volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -215,7 +213,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
volId := volumeID(req.GetVolumeId())
stagingTargetPath := req.GetStagingTargetPath() stagingTargetPath := req.GetStagingTargetPath()
// Unmount the volume // Unmount the volume
@ -225,26 +222,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
os.Remove(stagingTargetPath) os.Remove(stagingTargetPath)
ent, err := nodeCache.pop(volId)
if err != nil {
glog.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
if ent.volOptions.ProvisionVolume {
// We've created a dedicated Ceph user in NodeStageVolume,
// it's about to be deleted here.
if err = deleteCephUser(&credentials{id: ent.cephAdminID}, volId); err != nil {
glog.Errorf("failed to delete ceph user %s for volume %s: %v", getCephUserName(volId), volId, err)
// Reinsert cache entry for retry
nodeCache.insert(volId, ent)
return nil, status.Error(codes.Internal, err.Error())
}
}
glog.Infof("cephfs: successfuly umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath) glog.Infof("cephfs: successfuly umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath)
return &csi.NodeUnstageVolumeResponse{}, nil return &csi.NodeUnstageVolumeResponse{}, nil

View File

@ -54,8 +54,7 @@ func execCommandAndValidate(program string, args ...string) error {
} }
func execCommandJson(v interface{}, program string, args ...string) error { func execCommandJson(v interface{}, program string, args ...string) error {
cmd := exec.Command(program, args...) out, err := execCommand(program, args...)
out, err := cmd.CombinedOutput()
if err != nil { if err != nil {
return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out) return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out)
@ -64,8 +63,11 @@ func execCommandJson(v interface{}, program string, args ...string) error {
return json.NewDecoder(bytes.NewReader(out)).Decode(v) return json.NewDecoder(bytes.NewReader(out)).Decode(v)
} }
// Used in isMountPoint()
var dummyMount = mount.New("")
func isMountPoint(p string) (bool, error) { func isMountPoint(p string) (bool, error) {
notMnt, err := mount.New("").IsLikelyNotMountPoint(p) notMnt, err := dummyMount.IsLikelyNotMountPoint(p)
if err != nil { if err != nil {
return false, status.Error(codes.Internal, err.Error()) return false, status.Error(codes.Internal, err.Error())
} }