diff --git a/pkg/cephfs/cephuser.go b/pkg/cephfs/cephuser.go index 1f1167b71..9acf183fd 100644 --- a/pkg/cephfs/cephuser.go +++ b/pkg/cephfs/cephuser.go @@ -17,6 +17,8 @@ limitations under the License. package cephfs import ( + "bytes" + "encoding/json" "fmt" "os" ) @@ -38,26 +40,47 @@ type cephEntity struct { Caps cephEntityCaps `json:"caps"` } +func (ent *cephEntity) toCredentials() *credentials { + return &credentials{ + id: ent.Entity[len(cephEntityClientPrefix):], + key: ent.Key, + } +} + func getCephUserName(volId volumeID) string { return cephUserPrefix + string(volId) } -func getCephUser(userId string) (*cephEntity, error) { - entityName := cephEntityClientPrefix + userId - var ents []cephEntity +func getCephUser(adminCr *credentials, volId volumeID) (*cephEntity, error) { + entityName := cephEntityClientPrefix + getCephUserName(volId) - if err := execCommandJson(&ents, "ceph", "auth", "get", entityName); err != nil { - return nil, err + var ents []cephEntity + args := [...]string{ + "auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id, + "get", entityName, + } + + out, err := execCommand("ceph", args[:]...) + if err != nil { + return nil, fmt.Errorf("cephfs: ceph failed with following error: %s\ncephfs: ceph output: %s", err, out) + } + + // Workaround for output from `ceph auth get` + // Contains non-json data: "exported keyring for ENTITY\n\n" + offset := bytes.Index(out, []byte("[{")) + + if json.NewDecoder(bytes.NewReader(out[offset:])).Decode(&ents); err != nil { + return nil, fmt.Errorf("failed to decode json: %v", err) } if len(ents) != 1 { - return nil, fmt.Errorf("error retrieving entity %s", entityName) + return nil, fmt.Errorf("got unexpected number of entities for %s: expected 1, got %d", entityName, len(ents)) } return &ents[0], nil } -func createCephUser(volOptions *volumeOptions, cr *credentials, volId volumeID) (*cephEntity, error) { +func createCephUser(volOptions *volumeOptions, adminCr *credentials, volId volumeID) (*cephEntity, error) { caps := cephEntityCaps{ Mds: fmt.Sprintf("allow rw path=%s", getVolumeRootPath_ceph(volId)), Mon: "allow r", @@ -66,7 +89,7 @@ func createCephUser(volOptions *volumeOptions, cr *credentials, volId volumeID) var ents []cephEntity args := [...]string{ - "auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + cr.id, + "auth", "-f", "json", "-c", getCephConfPath(volId), "-n", cephEntityClientPrefix + adminCr.id, "get-or-create", cephEntityClientPrefix + getCephUserName(volId), "mds", caps.Mds, "mon", caps.Mon, diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index 144e8668f..eab0b1ca5 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -76,6 +76,11 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.Internal, err.Error()) } + if _, err = createCephUser(volOptions, cr, volId); err != nil { + glog.Errorf("failed to create ceph user for volume %s: %v", req.GetName(), err) + return nil, status.Error(codes.Internal, err.Error()) + } + glog.Infof("cephfs: successfuly created volume %s", volId) } else { glog.Infof("cephfs: volume %s is provisioned statically", volId) @@ -148,6 +153,11 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.Internal, err.Error()) } + if err = deleteCephUser(cr, volId); err != nil { + glog.Errorf("failed to delete ceph user for volume %s: %v", volId, err) + return nil, status.Error(codes.Internal, err.Error()) + } + glog.Infof("cephfs: successfuly deleted volume %s", volId) return &csi.DeleteVolumeResponse{}, nil diff --git a/pkg/cephfs/nodecache.go b/pkg/cephfs/nodecache.go deleted file mode 100644 index e05b7d1b3..000000000 --- a/pkg/cephfs/nodecache.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cephfs - -import ( - "fmt" - "sync" -) - -type nodeCacheEntry struct { - volOptions *volumeOptions - cephAdminID string -} - -type nodeCacheMap map[volumeID]*nodeCacheEntry - -var ( - nodeCache = make(nodeCacheMap) - nodeCacheMtx sync.Mutex -) - -func (m nodeCacheMap) insert(volId volumeID, ent *nodeCacheEntry) { - nodeCacheMtx.Lock() - defer nodeCacheMtx.Unlock() - - m[volId] = ent -} - -func (m nodeCacheMap) pop(volId volumeID) (*nodeCacheEntry, error) { - nodeCacheMtx.Lock() - defer nodeCacheMtx.Unlock() - - ent, ok := m[volId] - if !ok { - return nil, fmt.Errorf("node cache entry for volume %s not found", volId) - } - - delete(m, volId) - - return ent, nil -} diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 2064634ab..84b428db3 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -18,6 +18,7 @@ package cephfs import ( "context" + "fmt" "os" "github.com/golang/glog" @@ -32,54 +33,45 @@ type nodeServer struct { *csicommon.DefaultNodeServer } -func getOrCreateUser(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { +func getCredentialsForVolume(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { var ( userCr = &credentials{} err error ) - // Retrieve the credentials (possibly create a new user as well) - if volOptions.ProvisionVolume { - // The volume is provisioned dynamically, create a dedicated user + // The volume is provisioned dynamically, get the credentials directly from Ceph - // First, store admin credentials - those are needed for creating a user + // First, store admin credentials - those are needed for retrieving the user credentials adminCr, err := getAdminCredentials(req.GetNodeStageSecrets()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get admin credentials from node stage secrets: %v", err) } if err = storeCephCredentials(volId, adminCr); err != nil { - return nil, err + return nil, fmt.Errorf("failed to store ceph admin credentials: %v", err) } - nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions, cephAdminID: adminCr.id}) + // Then get the ceph user - // Then create the user - - if ent, err := createCephUser(volOptions, adminCr, volId); err != nil { - return nil, err - } else { - userCr.id = ent.Entity[len(cephEntityClientPrefix):] - userCr.key = ent.Key + entity, err := getCephUser(adminCr, volId) + if err != nil { + return nil, fmt.Errorf("failed to get ceph user: %v", err) } - // Set the correct volume root path - volOptions.RootPath = getVolumeRootPath_ceph(volId) + userCr = entity.toCredentials() } else { - // The volume is pre-made, credentials are supplied by the user + // The volume is pre-made, credentials are in node stage secrets userCr, err = getUserCredentials(req.GetNodeStageSecrets()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get user credentials from node stage secrets: %v", err) } - - nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions}) } if err = storeCephCredentials(volId, userCr); err != nil { - return nil, err + return nil, fmt.Errorf("failed to store ceph user credentials: %v", err) } return userCr, nil @@ -101,6 +93,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.InvalidArgument, err.Error()) } + if volOptions.ProvisionVolume { + // Dynamically provisioned volumes don't have their root path set, do it here + volOptions.RootPath = getVolumeRootPath_ceph(volId) + } + if err = createMountPoint(stagingTargetPath); err != nil { glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err) return nil, status.Error(codes.Internal, err.Error()) @@ -128,9 +125,9 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol // It's not, mount now - cr, err := getOrCreateUser(volOptions, volId, req) + cr, err := getCredentialsForVolume(volOptions, volId, req) if err != nil { - glog.Error(err) + glog.Errorf("failed to get ceph credentials for volume %s: %v", volId, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -140,6 +137,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name()) + if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil { glog.Errorf("failed to mount volume %s: %v", volId, err) return nil, status.Error(codes.Internal, err.Error()) @@ -215,7 +213,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, err.Error()) } - volId := volumeID(req.GetVolumeId()) stagingTargetPath := req.GetStagingTargetPath() // Unmount the volume @@ -225,26 +222,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag os.Remove(stagingTargetPath) - ent, err := nodeCache.pop(volId) - if err != nil { - glog.Error(err) - return nil, status.Error(codes.Internal, err.Error()) - } - - if ent.volOptions.ProvisionVolume { - // We've created a dedicated Ceph user in NodeStageVolume, - // it's about to be deleted here. - - if err = deleteCephUser(&credentials{id: ent.cephAdminID}, volId); err != nil { - glog.Errorf("failed to delete ceph user %s for volume %s: %v", getCephUserName(volId), volId, err) - - // Reinsert cache entry for retry - nodeCache.insert(volId, ent) - - return nil, status.Error(codes.Internal, err.Error()) - } - } - glog.Infof("cephfs: successfuly umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath) return &csi.NodeUnstageVolumeResponse{}, nil diff --git a/pkg/cephfs/util.go b/pkg/cephfs/util.go index 7007d3ce8..ff2655254 100644 --- a/pkg/cephfs/util.go +++ b/pkg/cephfs/util.go @@ -54,8 +54,7 @@ func execCommandAndValidate(program string, args ...string) error { } func execCommandJson(v interface{}, program string, args ...string) error { - cmd := exec.Command(program, args...) - out, err := cmd.CombinedOutput() + out, err := execCommand(program, args...) if err != nil { return fmt.Errorf("cephfs: %s failed with following error: %s\ncephfs: %s output: %s", program, err, program, out) @@ -64,8 +63,11 @@ func execCommandJson(v interface{}, program string, args ...string) error { return json.NewDecoder(bytes.NewReader(out)).Decode(v) } +// Used in isMountPoint() +var dummyMount = mount.New("") + func isMountPoint(p string) (bool, error) { - notMnt, err := mount.New("").IsLikelyNotMountPoint(p) + notMnt, err := dummyMount.IsLikelyNotMountPoint(p) if err != nil { return false, status.Error(codes.Internal, err.Error()) }