Remove user creation for every volume

Currently, provisioner creates user for every volume and nodeplugin
uses this user to mount that volume. But nodeplugin and provisioner
already have admin credentials, hence using the admin credentials
to mount the volume and getting rid of user creation for each volume.

Signed-off-by: Poornima G <pgurusid@redhat.com>
This commit is contained in:
Poornima G 2019-06-20 07:35:53 +00:00 committed by mergify[bot]
parent a5164cfa41
commit c2835183e5
5 changed files with 13 additions and 112 deletions

View File

@ -17,8 +17,6 @@ limitations under the License.
package cephfs package cephfs
import ( import (
"fmt"
"github.com/ceph/ceph-csi/pkg/util" "github.com/ceph/ceph-csi/pkg/util"
) )
@ -27,81 +25,15 @@ const (
cephEntityClientPrefix = "client." cephEntityClientPrefix = "client."
) )
type cephEntityCaps struct { func genUserIDs(adminCr *util.Credentials, volID volumeID) (adminID, userID string) {
Mds string `json:"mds"` return cephEntityClientPrefix + adminCr.ID, cephEntityClientPrefix + getCephUserName(volID)
Mon string `json:"mon"`
Osd string `json:"osd"`
}
type cephEntity struct {
Entity string `json:"entity"`
Key string `json:"key"`
Caps cephEntityCaps `json:"caps"`
}
func (ent *cephEntity) toCredentials() *util.Credentials {
return &util.Credentials{
ID: ent.Entity[len(cephEntityClientPrefix):],
Key: ent.Key,
}
} }
func getCephUserName(volID volumeID) string { func getCephUserName(volID volumeID) string {
return cephUserPrefix + string(volID) return cephUserPrefix + string(volID)
} }
func getSingleCephEntity(args ...string) (*cephEntity, error) { func deleteCephUserDeprecated(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) error {
var ents []cephEntity
if err := execCommandJSON(&ents, "ceph", args...); err != nil {
return nil, err
}
if len(ents) != 1 {
return nil, fmt.Errorf("got unexpected number of entities: expected 1, got %d", len(ents))
}
return &ents[0], nil
}
func genUserIDs(adminCr *util.Credentials, volID volumeID) (adminID, userID string) {
return cephEntityClientPrefix + adminCr.ID, cephEntityClientPrefix + getCephUserName(volID)
}
func getCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) (*cephEntity, error) {
adminID, userID := genUserIDs(adminCr, volID)
return getSingleCephEntity(
"-m", volOptions.Monitors,
"-n", adminID,
"--key="+adminCr.Key,
"-c", util.CephConfigPath,
"-f", "json",
"auth", "get", userID,
)
}
func createCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) (*cephEntity, error) {
adminID, userID := genUserIDs(adminCr, volID)
volRootPath, err := getVolumeRootPathCeph(volOptions, adminCr, volID)
if err != nil {
return nil, err
}
return getSingleCephEntity(
"-m", volOptions.Monitors,
"-n", adminID,
"--key="+adminCr.Key,
"-c", util.CephConfigPath,
"-f", "json",
"auth", "get-or-create", userID,
// User capabilities
"mds", fmt.Sprintf("allow rw path=%s", volRootPath),
"mon", "allow r",
"osd", fmt.Sprintf("allow rw pool=%s namespace=%s", volOptions.Pool, getVolumeNamespace(volID)),
)
}
func deleteCephUser(volOptions *volumeOptions, adminCr *util.Credentials, volID volumeID) error {
adminID, userID := genUserIDs(adminCr, volID) adminID, userID := genUserIDs(adminCr, volID)
// TODO: Need to return success if userID is not found // TODO: Need to return success if userID is not found

View File

@ -44,8 +44,7 @@ var (
volumeNameLocker = util.NewIDLocker() volumeNameLocker = util.NewIDLocker()
) )
// createBackingVolume creates the backing subvolume and user/key for the given volOptions and vID, // createBackingVolume creates the backing subvolume and on any error cleans up any created entities
// and on any error cleans up any created entities
func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error { func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *volumeIdentifier, secret map[string]string) error {
cr, err := util.GetAdminCredentials(secret) cr, err := util.GetAdminCredentials(secret)
if err != nil { if err != nil {
@ -64,11 +63,6 @@ func (cs *ControllerServer) createBackingVolume(volOptions *volumeOptions, vID *
} }
}() }()
if _, err = createCephUser(volOptions, cr, volumeID(vID.FsSubvolName)); err != nil {
klog.Errorf("failed to create ceph user for volume %s: %v", volOptions.RequestName, err)
return status.Error(codes.Internal, err.Error())
}
return nil return nil
} }
@ -188,7 +182,7 @@ func (cs *ControllerServer) deleteVolumeDeprecated(req *csi.DeleteVolumeRequest)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
if err = deleteCephUser(&ce.VolOptions, cr, volID); err != nil { if err = deleteCephUserDeprecated(&ce.VolOptions, cr, volID); err != nil {
klog.Errorf("failed to delete ceph user for volume %s: %v", volID, err) klog.Errorf("failed to delete ceph user for volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
@ -247,11 +241,6 @@ func (cs *ControllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
if err = deleteCephUser(volOptions, cr, volumeID(vID.FsSubvolName)); err != nil {
klog.Errorf("failed to delete ceph user for volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
if err := undoVolReservation(volOptions, *vID, secrets); err != nil { if err := undoVolReservation(volOptions, *vID, secrets); err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }

View File

@ -104,13 +104,6 @@ func mountOneCacheEntry(volOptions *volumeOptions, vid *volumeIdentifier, me *vo
if err != nil { if err != nil {
return err return err
} }
var entity *cephEntity
entity, err = getCephUser(volOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
return err
}
cr = entity.toCredentials()
} else { } else {
cr, err = util.GetUserCredentials(decodeCredentials(me.Secrets)) cr, err = util.GetUserCredentials(decodeCredentials(me.Secrets))
if err != nil { if err != nil {

View File

@ -42,7 +42,7 @@ var (
nodeVolumeIDLocker = util.NewIDLocker() nodeVolumeIDLocker = util.NewIDLocker()
) )
func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) { func getCredentialsForVolume(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) (*util.Credentials, error) {
var ( var (
cr *util.Credentials cr *util.Credentials
secrets = req.GetSecrets() secrets = req.GetSecrets()
@ -58,14 +58,7 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi
return nil, fmt.Errorf("failed to get admin credentials from node stage secrets: %v", err) return nil, fmt.Errorf("failed to get admin credentials from node stage secrets: %v", err)
} }
// Then get the ceph user cr = adminCr
entity, err := getCephUser(volOptions, adminCr, volID)
if err != nil {
return nil, fmt.Errorf("failed to get ceph user: %v", err)
}
cr = entity.toCredentials()
} else { } else {
// The volume is pre-made, credentials are in node stage secrets // The volume is pre-made, credentials are in node stage secrets
@ -84,7 +77,6 @@ func getCredentialsForVolume(volOptions *volumeOptions, volID volumeID, req *csi
func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
var ( var (
volOptions *volumeOptions volOptions *volumeOptions
vid *volumeIdentifier
) )
if err := util.ValidateNodeStageVolumeRequest(req); err != nil { if err := util.ValidateNodeStageVolumeRequest(req); err != nil {
return nil, err return nil, err
@ -95,21 +87,21 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
stagingTargetPath := req.GetStagingTargetPath() stagingTargetPath := req.GetStagingTargetPath()
volID := volumeID(req.GetVolumeId()) volID := volumeID(req.GetVolumeId())
volOptions, vid, err := newVolumeOptionsFromVolID(string(volID), req.GetVolumeContext(), req.GetSecrets()) volOptions, _, err := newVolumeOptionsFromVolID(string(volID), req.GetVolumeContext(), req.GetSecrets())
if err != nil { if err != nil {
if _, ok := err.(ErrInvalidVolID); !ok { if _, ok := err.(ErrInvalidVolID); !ok {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
// check for pre-provisioned volumes (plugin versions > 1.0.0) // check for pre-provisioned volumes (plugin versions > 1.0.0)
volOptions, vid, err = newVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext()) volOptions, _, err = newVolumeOptionsFromStaticVolume(string(volID), req.GetVolumeContext())
if err != nil { if err != nil {
if _, ok := err.(ErrNonStaticVolume); !ok { if _, ok := err.(ErrNonStaticVolume); !ok {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
// check for volumes from plugin versions <= 1.0.0 // check for volumes from plugin versions <= 1.0.0
volOptions, vid, err = newVolumeOptionsFromVersion1Context(string(volID), req.GetVolumeContext(), volOptions, _, err = newVolumeOptionsFromVersion1Context(string(volID), req.GetVolumeContext(),
req.GetSecrets()) req.GetSecrets())
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
@ -140,7 +132,7 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
} }
// It's not, mount now // It's not, mount now
if err = ns.mount(volOptions, req, vid); err != nil { if err = ns.mount(volOptions, req); err != nil {
return nil, err return nil, err
} }
@ -149,11 +141,11 @@ func (ns *NodeServer) NodeStageVolume(ctx context.Context, req *csi.NodeStageVol
return &csi.NodeStageVolumeResponse{}, nil return &csi.NodeStageVolumeResponse{}, nil
} }
func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest, vid *volumeIdentifier) error { func (*NodeServer) mount(volOptions *volumeOptions, req *csi.NodeStageVolumeRequest) error {
stagingTargetPath := req.GetStagingTargetPath() stagingTargetPath := req.GetStagingTargetPath()
volID := volumeID(req.GetVolumeId()) volID := volumeID(req.GetVolumeId())
cr, err := getCredentialsForVolume(volOptions, volumeID(vid.FsSubvolName), req) cr, err := getCredentialsForVolume(volOptions, req)
if err != nil { if err != nil {
klog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err) klog.Errorf("failed to get ceph credentials for volume %s: %v", volID, err)
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())

View File

@ -29,7 +29,6 @@ import (
) )
const ( const (
namespacePrefix = "fsvolumens_"
csiSubvolumeGroup = "csi" csiSubvolumeGroup = "csi"
) )
@ -74,10 +73,6 @@ func getVolumeRootPathCeph(volOptions *volumeOptions, cr *util.Credentials, volI
return strings.TrimSuffix(string(stdout), "\n"), nil return strings.TrimSuffix(string(stdout), "\n"), nil
} }
func getVolumeNamespace(volID volumeID) string {
return namespacePrefix + string(volID)
}
func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error { func createVolume(volOptions *volumeOptions, cr *util.Credentials, volID volumeID, bytesQuota int64) error {
//TODO: When we support multiple fs, need to hande subvolume group create for all fs's //TODO: When we support multiple fs, need to hande subvolume group create for all fs's
if !cephfsInit { if !cephfsInit {