diff --git a/pkg/cephfs/controllerserver.go b/pkg/cephfs/controllerserver.go index 144e8668f..eab0b1ca5 100644 --- a/pkg/cephfs/controllerserver.go +++ b/pkg/cephfs/controllerserver.go @@ -76,6 +76,11 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol return nil, status.Error(codes.Internal, err.Error()) } + if _, err = createCephUser(volOptions, cr, volId); err != nil { + glog.Errorf("failed to create ceph user for volume %s: %v", req.GetName(), err) + return nil, status.Error(codes.Internal, err.Error()) + } + glog.Infof("cephfs: successfuly created volume %s", volId) } else { glog.Infof("cephfs: volume %s is provisioned statically", volId) @@ -148,6 +153,11 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol return nil, status.Error(codes.Internal, err.Error()) } + if err = deleteCephUser(cr, volId); err != nil { + glog.Errorf("failed to delete ceph user for volume %s: %v", volId, err) + return nil, status.Error(codes.Internal, err.Error()) + } + glog.Infof("cephfs: successfuly deleted volume %s", volId) return &csi.DeleteVolumeResponse{}, nil diff --git a/pkg/cephfs/nodecache.go b/pkg/cephfs/nodecache.go deleted file mode 100644 index e05b7d1b3..000000000 --- a/pkg/cephfs/nodecache.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -Copyright 2018 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cephfs - -import ( - "fmt" - "sync" -) - -type nodeCacheEntry struct { - volOptions *volumeOptions - cephAdminID string -} - -type nodeCacheMap map[volumeID]*nodeCacheEntry - -var ( - nodeCache = make(nodeCacheMap) - nodeCacheMtx sync.Mutex -) - -func (m nodeCacheMap) insert(volId volumeID, ent *nodeCacheEntry) { - nodeCacheMtx.Lock() - defer nodeCacheMtx.Unlock() - - m[volId] = ent -} - -func (m nodeCacheMap) pop(volId volumeID) (*nodeCacheEntry, error) { - nodeCacheMtx.Lock() - defer nodeCacheMtx.Unlock() - - ent, ok := m[volId] - if !ok { - return nil, fmt.Errorf("node cache entry for volume %s not found", volId) - } - - delete(m, volId) - - return ent, nil -} diff --git a/pkg/cephfs/nodeserver.go b/pkg/cephfs/nodeserver.go index 2064634ab..84b428db3 100644 --- a/pkg/cephfs/nodeserver.go +++ b/pkg/cephfs/nodeserver.go @@ -18,6 +18,7 @@ package cephfs import ( "context" + "fmt" "os" "github.com/golang/glog" @@ -32,54 +33,45 @@ type nodeServer struct { *csicommon.DefaultNodeServer } -func getOrCreateUser(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { +func getCredentialsForVolume(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { var ( userCr = &credentials{} err error ) - // Retrieve the credentials (possibly create a new user as well) - if volOptions.ProvisionVolume { - // The volume is provisioned dynamically, create a dedicated user + // The volume is provisioned dynamically, get the credentials directly from Ceph - // First, store admin credentials - those are needed for creating a user + // First, store admin credentials - those are needed for retrieving the user credentials adminCr, err := getAdminCredentials(req.GetNodeStageSecrets()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get admin credentials from node stage secrets: %v", err) } if err = storeCephCredentials(volId, adminCr); err != nil { - return nil, err + return nil, fmt.Errorf("failed to store ceph admin credentials: %v", err) } - nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions, cephAdminID: adminCr.id}) + // Then get the ceph user - // Then create the user - - if ent, err := createCephUser(volOptions, adminCr, volId); err != nil { - return nil, err - } else { - userCr.id = ent.Entity[len(cephEntityClientPrefix):] - userCr.key = ent.Key + entity, err := getCephUser(adminCr, volId) + if err != nil { + return nil, fmt.Errorf("failed to get ceph user: %v", err) } - // Set the correct volume root path - volOptions.RootPath = getVolumeRootPath_ceph(volId) + userCr = entity.toCredentials() } else { - // The volume is pre-made, credentials are supplied by the user + // The volume is pre-made, credentials are in node stage secrets userCr, err = getUserCredentials(req.GetNodeStageSecrets()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to get user credentials from node stage secrets: %v", err) } - - nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions}) } if err = storeCephCredentials(volId, userCr); err != nil { - return nil, err + return nil, fmt.Errorf("failed to store ceph user credentials: %v", err) } return userCr, nil @@ -101,6 +93,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol return nil, status.Error(codes.InvalidArgument, err.Error()) } + if volOptions.ProvisionVolume { + // Dynamically provisioned volumes don't have their root path set, do it here + volOptions.RootPath = getVolumeRootPath_ceph(volId) + } + if err = createMountPoint(stagingTargetPath); err != nil { glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err) return nil, status.Error(codes.Internal, err.Error()) @@ -128,9 +125,9 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol // It's not, mount now - cr, err := getOrCreateUser(volOptions, volId, req) + cr, err := getCredentialsForVolume(volOptions, volId, req) if err != nil { - glog.Error(err) + glog.Errorf("failed to get ceph credentials for volume %s: %v", volId, err) return nil, status.Error(codes.Internal, err.Error()) } @@ -140,6 +137,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol } glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name()) + if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil { glog.Errorf("failed to mount volume %s: %v", volId, err) return nil, status.Error(codes.Internal, err.Error()) @@ -215,7 +213,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag return nil, status.Error(codes.InvalidArgument, err.Error()) } - volId := volumeID(req.GetVolumeId()) stagingTargetPath := req.GetStagingTargetPath() // Unmount the volume @@ -225,26 +222,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag os.Remove(stagingTargetPath) - ent, err := nodeCache.pop(volId) - if err != nil { - glog.Error(err) - return nil, status.Error(codes.Internal, err.Error()) - } - - if ent.volOptions.ProvisionVolume { - // We've created a dedicated Ceph user in NodeStageVolume, - // it's about to be deleted here. - - if err = deleteCephUser(&credentials{id: ent.cephAdminID}, volId); err != nil { - glog.Errorf("failed to delete ceph user %s for volume %s: %v", getCephUserName(volId), volId, err) - - // Reinsert cache entry for retry - nodeCache.insert(volId, ent) - - return nil, status.Error(codes.Internal, err.Error()) - } - } - glog.Infof("cephfs: successfuly umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath) return &csi.NodeUnstageVolumeResponse{}, nil