cephfs: ceph user is created in CreateVolume and deleted in DeleteVolume

This commit is contained in:
gman 2018-08-28 10:21:11 +02:00
parent 9c3389d784
commit 3c11129149
3 changed files with 32 additions and 100 deletions

View File

@ -76,6 +76,11 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
if _, err = createCephUser(volOptions, cr, volId); err != nil {
glog.Errorf("failed to create ceph user for volume %s: %v", req.GetName(), err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.Infof("cephfs: successfuly created volume %s", volId) glog.Infof("cephfs: successfuly created volume %s", volId)
} else { } else {
glog.Infof("cephfs: volume %s is provisioned statically", volId) glog.Infof("cephfs: volume %s is provisioned statically", volId)
@ -148,6 +153,11 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
if err = deleteCephUser(cr, volId); err != nil {
glog.Errorf("failed to delete ceph user for volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
glog.Infof("cephfs: successfuly deleted volume %s", volId) glog.Infof("cephfs: successfuly deleted volume %s", volId)
return &csi.DeleteVolumeResponse{}, nil return &csi.DeleteVolumeResponse{}, nil

View File

@ -1,55 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cephfs
import (
"fmt"
"sync"
)
type nodeCacheEntry struct {
volOptions *volumeOptions
cephAdminID string
}
type nodeCacheMap map[volumeID]*nodeCacheEntry
var (
nodeCache = make(nodeCacheMap)
nodeCacheMtx sync.Mutex
)
func (m nodeCacheMap) insert(volId volumeID, ent *nodeCacheEntry) {
nodeCacheMtx.Lock()
defer nodeCacheMtx.Unlock()
m[volId] = ent
}
func (m nodeCacheMap) pop(volId volumeID) (*nodeCacheEntry, error) {
nodeCacheMtx.Lock()
defer nodeCacheMtx.Unlock()
ent, ok := m[volId]
if !ok {
return nil, fmt.Errorf("node cache entry for volume %s not found", volId)
}
delete(m, volId)
return ent, nil
}

View File

@ -18,6 +18,7 @@ package cephfs
import ( import (
"context" "context"
"fmt"
"os" "os"
"github.com/golang/glog" "github.com/golang/glog"
@ -32,54 +33,45 @@ type nodeServer struct {
*csicommon.DefaultNodeServer *csicommon.DefaultNodeServer
} }
func getOrCreateUser(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) { func getCredentialsForVolume(volOptions *volumeOptions, volId volumeID, req *csi.NodeStageVolumeRequest) (*credentials, error) {
var ( var (
userCr = &credentials{} userCr = &credentials{}
err error err error
) )
// Retrieve the credentials (possibly create a new user as well)
if volOptions.ProvisionVolume { if volOptions.ProvisionVolume {
// The volume is provisioned dynamically, create a dedicated user // The volume is provisioned dynamically, get the credentials directly from Ceph
// First, store admin credentials - those are needed for creating a user // First, store admin credentials - those are needed for retrieving the user credentials
adminCr, err := getAdminCredentials(req.GetNodeStageSecrets()) adminCr, err := getAdminCredentials(req.GetNodeStageSecrets())
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to get admin credentials from node stage secrets: %v", err)
} }
if err = storeCephCredentials(volId, adminCr); err != nil { if err = storeCephCredentials(volId, adminCr); err != nil {
return nil, err return nil, fmt.Errorf("failed to store ceph admin credentials: %v", err)
} }
nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions, cephAdminID: adminCr.id}) // Then get the ceph user
// Then create the user entity, err := getCephUser(adminCr, volId)
if err != nil {
if ent, err := createCephUser(volOptions, adminCr, volId); err != nil { return nil, fmt.Errorf("failed to get ceph user: %v", err)
return nil, err
} else {
userCr.id = ent.Entity[len(cephEntityClientPrefix):]
userCr.key = ent.Key
} }
// Set the correct volume root path userCr = entity.toCredentials()
volOptions.RootPath = getVolumeRootPath_ceph(volId)
} else { } else {
// The volume is pre-made, credentials are supplied by the user // The volume is pre-made, credentials are in node stage secrets
userCr, err = getUserCredentials(req.GetNodeStageSecrets()) userCr, err = getUserCredentials(req.GetNodeStageSecrets())
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to get user credentials from node stage secrets: %v", err)
} }
nodeCache.insert(volId, &nodeCacheEntry{volOptions: volOptions})
} }
if err = storeCephCredentials(volId, userCr); err != nil { if err = storeCephCredentials(volId, userCr); err != nil {
return nil, err return nil, fmt.Errorf("failed to store ceph user credentials: %v", err)
} }
return userCr, nil return userCr, nil
@ -101,6 +93,11 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
if volOptions.ProvisionVolume {
// Dynamically provisioned volumes don't have their root path set, do it here
volOptions.RootPath = getVolumeRootPath_ceph(volId)
}
if err = createMountPoint(stagingTargetPath); err != nil { if err = createMountPoint(stagingTargetPath); err != nil {
glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err) glog.Errorf("failed to create staging mount point at %s for volume %s: %v", stagingTargetPath, volId, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -128,9 +125,9 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
// It's not, mount now // It's not, mount now
cr, err := getOrCreateUser(volOptions, volId, req) cr, err := getCredentialsForVolume(volOptions, volId, req)
if err != nil { if err != nil {
glog.Error(err) glog.Errorf("failed to get ceph credentials for volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
@ -140,6 +137,7 @@ func (ns *nodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
} }
glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name()) glog.V(4).Infof("cephfs: mounting volume %s with %s", volId, m.name())
if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil { if err = m.mount(stagingTargetPath, cr, volOptions, volId); err != nil {
glog.Errorf("failed to mount volume %s: %v", volId, err) glog.Errorf("failed to mount volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -215,7 +213,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
volId := volumeID(req.GetVolumeId())
stagingTargetPath := req.GetStagingTargetPath() stagingTargetPath := req.GetStagingTargetPath()
// Unmount the volume // Unmount the volume
@ -225,26 +222,6 @@ func (ns *nodeServer) NodeUnstageVolume(ctx context.Context, req *csi.NodeUnstag
os.Remove(stagingTargetPath) os.Remove(stagingTargetPath)
ent, err := nodeCache.pop(volId)
if err != nil {
glog.Error(err)
return nil, status.Error(codes.Internal, err.Error())
}
if ent.volOptions.ProvisionVolume {
// We've created a dedicated Ceph user in NodeStageVolume,
// it's about to be deleted here.
if err = deleteCephUser(&credentials{id: ent.cephAdminID}, volId); err != nil {
glog.Errorf("failed to delete ceph user %s for volume %s: %v", getCephUserName(volId), volId, err)
// Reinsert cache entry for retry
nodeCache.insert(volId, ent)
return nil, status.Error(codes.Internal, err.Error())
}
}
glog.Infof("cephfs: successfuly umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath) glog.Infof("cephfs: successfuly umounted volume %s from %s", req.GetVolumeId(), stagingTargetPath)
return &csi.NodeUnstageVolumeResponse{}, nil return &csi.NodeUnstageVolumeResponse{}, nil