cephfs: DeleteVolume() calls are allowed only for volumes with provisionVolume=true parameter

This commit is contained in:
gman 2018-06-13 16:23:13 +02:00
parent f5af7d0a94
commit 675ee93e46
3 changed files with 43 additions and 90 deletions

View File

@ -18,7 +18,6 @@ package cephfs
import (
"fmt"
"os"
"github.com/golang/glog"
"golang.org/x/net/context"
@ -39,7 +38,7 @@ const (
func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return fmt.Errorf("Invalid CreateVolumeRequest: %v", err)
return fmt.Errorf("invalid CreateVolumeRequest: %v", err)
}
if req.GetName() == "" {
@ -55,7 +54,7 @@ func (cs *controllerServer) validateCreateVolumeRequest(req *csi.CreateVolumeReq
func (cs *controllerServer) validateDeleteVolumeRequest(req *csi.DeleteVolumeRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME); err != nil {
return fmt.Errorf("Invalid DeleteVolumeRequest: %v", err)
return fmt.Errorf("invalid DeleteVolumeRequest: %v", err)
}
return nil
@ -112,10 +111,15 @@ func (cs *controllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
glog.Warningf("failed to store a volume cache entry: %v", err)
}
sz := req.GetCapacityRange().GetRequiredBytes()
if sz == 0 {
sz = oneGB
}
return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
Id: volId.id,
CapacityBytes: req.GetCapacityRange().GetRequiredBytes(),
CapacityBytes: sz,
Attributes: req.GetParameters(),
},
}, nil
@ -128,8 +132,6 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
}
var (
cr *credentials
err error
volId = req.GetVolumeId()
volUuid = uuidFromVolumeId(volId)
)
@ -143,46 +145,35 @@ func (cs *controllerServer) DeleteVolume(ctx context.Context, req *csi.DeleteVol
return nil, status.Error(codes.Internal, msg)
}
// Set the correct user for mounting
if !ent.VolOptions.ProvisionVolume {
// DeleteVolume() is forbidden for statically provisioned volumes!
msg := fmt.Sprintf("volume %s is provisioned statically, aborting delete", volId)
glog.Warningf(msg)
return &csi.DeleteVolumeResponse{}, nil
}
if ent.VolOptions.ProvisionVolume {
// Admin access is required
// Requires admin credentials
cr, err = getAdminCredentials(req.GetControllerDeleteSecrets())
cr, err := getAdminCredentials(req.GetControllerDeleteSecrets())
if err != nil {
glog.Errorf("failed to retrieve admin credentials: %v", err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
} else {
cr, err = getUserCredentials(req.GetControllerDeleteSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
}
// Delete the volume contents
// Remove the volume, the user and the volume cache entry
if err := purgeVolume(volId, cr, &ent.VolOptions); err != nil {
glog.Error(err)
if err = purgeVolume(volId, cr, &ent.VolOptions); err != nil {
glog.Errorf("failed to delete volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}
// Clean up remaining files
if ent.VolOptions.ProvisionVolume {
// The user is no longer needed
if err := deleteCephUser(cr, volUuid); err != nil {
glog.Warningf("failed to delete ceph user '%s': %v", cr.id, err)
if err = deleteCephUser(cr, volUuid); err != nil {
glog.Errorf("failed to delete ceph user %s: %v", getCephUserName(volUuid), err)
return nil, status.Error(codes.Internal, err.Error())
}
userId := getCephUserName(volUuid)
os.Remove(getCephKeyringPath(volUuid, userId))
os.Remove(getCephSecretPath(volUuid, userId))
} else {
os.Remove(getCephKeyringPath(volUuid, cr.id))
os.Remove(getCephSecretPath(volUuid, cr.id))
}
if err := volCache.erase(volUuid); err != nil {
if err = volCache.erase(volUuid); err != nil {
glog.Errorf("failed to delete cache entry for volume %s: %v", volId, err)
return nil, status.Error(codes.Internal, err.Error())
}

View File

@ -31,9 +31,6 @@ const (
cephVolumePrefix = PluginFolder + "/controller/volumes/vol-"
cephVolumesRoot = "csi-volumes"
volumeDataSuffix = "volume-data"
volumeDeletingSuffix = "volume-deleting"
namespacePrefix = "csi-ns-"
)
@ -45,14 +42,6 @@ func getCephRootVolumePath_local(volUuid string) string {
return path.Join(getCephRootPath_local(volUuid), cephVolumesRoot, volUuid)
}
func getCephRootVolumeDataPath_local(volUuid string) string {
return path.Join(getCephRootVolumePath_local(volUuid), volumeDataSuffix)
}
func getCephRootVolumeDeletingPath_local(volUuid string) string {
return path.Join(getCephRootVolumePath_local(volUuid), volumeDeletingSuffix)
}
func getVolumeRootPath_local(volUuid string) string {
return cephVolumePrefix + volUuid
}
@ -61,14 +50,6 @@ func getVolumeRootPath_ceph(volUuid string) string {
return path.Join("/", cephVolumesRoot, volUuid)
}
func getVolumeDataPath_local(volUuid string) string {
return path.Join(getVolumeRootPath_local(volUuid), volumeDataSuffix)
}
func getVolumeDeletingPath_local(volUuid string) string {
return path.Join(getVolumeRootPath_local(volUuid), volumeDeletingSuffix)
}
func getVolumeNamespace(volUuid string) string {
return namespacePrefix + volUuid
}
@ -120,54 +101,35 @@ func createVolume(volOptions *volumeOptions, adminCr *credentials, volUuid strin
}
func purgeVolume(volId string, cr *credentials, volOptions *volumeOptions) error {
var (
volUuid = uuidFromVolumeId(volId)
volRoot string
dataPath string
delPath string
)
if volOptions.ProvisionVolume {
// RootPath is not set for a dynamically provisioned volume
// Root path is not set for dynamically provisioned volumes
volOptions.RootPath = "/"
volRoot = getCephRootPath_local(volUuid)
dataPath = getCephRootVolumeDataPath_local(volUuid)
delPath = getCephRootVolumeDeletingPath_local(volUuid)
} else {
volRoot = getVolumeRootPath_local(volUuid)
dataPath = getVolumeDataPath_local(volUuid)
delPath = getVolumeDeletingPath_local(volUuid)
}
var (
volUuid = uuidFromVolumeId(volId)
root = getCephRootPath_local(volUuid)
volRoot = getCephRootVolumePath_local(volUuid)
volRootDeleting = volRoot + "-deleting"
)
if err := createMountPoint(volRoot); err != nil {
if err := createMountPoint(root); err != nil {
return err
}
if err := mountKernel(volRoot, cr, volOptions, volUuid); err != nil {
if err := mountKernel(root, cr, volOptions, volUuid); err != nil {
return err
}
defer func() {
if volOptions.ProvisionVolume {
os.Remove(getCephRootVolumePath_local(volUuid))
}
unmountVolume(volRoot)
os.Remove(volRoot)
}()
if err := os.Rename(dataPath, delPath); err != nil {
if os.IsNotExist(err) {
// dataPath doesn't exist if NodePublishVolume wasn't called
return nil
} else {
return fmt.Errorf("couldn't mark volume %s for deletion: %v", volId, err)
}
if err := os.Rename(volRoot, volRootDeleting); err != nil {
return fmt.Errorf("coudln't mark volume %s for deletion: %v", volId, err)
}
if err := os.RemoveAll(delPath); err != nil {
return fmt.Errorf("couldn't delete volume %s: %v", volId, err)
if err := os.RemoveAll(volRootDeleting); err != nil {
return fmt.Errorf("failed to delete volume %s: %v", volId, err)
}
return nil

View File

@ -121,7 +121,7 @@ func bindMount(from, to string, readOnly bool) error {
}
func bindVolume(volUuid, target string, readOnly bool) error {
volDataRoot := getVolumeDataPath_local(volUuid)
volDataRoot := getVolumeRootPath_local(volUuid)
if err := createMountPoint(volDataRoot); err != nil {
return err