cephfs: change createvolume for snapshot/clone workflow

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal 2020-08-04 09:26:43 +05:30 committed by mergify[bot]
parent c31f349c6f
commit d1fe12b4f0

View File

@ -124,6 +124,7 @@ func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *u
}
// CreateVolume creates a reservation and the volume in backend, if it is not already present.
// nolint:gocognit:gocyclo // TODO: reduce complexity
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
if err := cs.validateCreateVolumeRequest(req); err != nil {
klog.Errorf(util.Log(ctx, "CreateVolumeRequest validation failed: %v"), err)
@ -134,6 +135,13 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
secret := req.GetSecrets()
requestName := req.GetName()
cr, err := util.NewAdminCredentials(secret)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to retrieve admin credentials: %v"), err)
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer cr.DeleteCredentials()
// Existence and conflict checks
if acquired := cs.VolumeLocks.TryAcquire(requestName); !acquired {
klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), requestName)
@ -152,19 +160,49 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
}
// TODO need to add check for 0 volume size
vID, err := checkVolExists(ctx, volOptions, secret)
parentVol, pvID, sID, err := checkContentSource(ctx, req, cr)
if err != nil {
return nil, err
}
vID, err := checkVolExists(ctx, volOptions, parentVol, pvID, sID, cr)
if err != nil {
if errors.Is(err, ErrCloneInProgress) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
// TODO return error message if requested vol size greater than found volume return error
if vID != nil {
if sID != nil || pvID != nil {
// while cloning the volume the size is not populated properly to the new volume now.
// it will be fixed in cephfs soon with the parentvolume size. Till then by below
// resize we are making sure we return or satisfy the requested size by setting the size
// explictly
err = resizeVolume(ctx, volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size)
if err != nil {
if purgeErr := purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions, false); err != nil {
klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), requestName, purgeErr)
// All errors other than ErrVolumeNotFound should return an error back to the caller
if !errors.Is(purgeErr, ErrVolumeNotFound) {
return nil, status.Error(codes.Internal, purgeErr.Error())
}
}
errUndo := undoVolReservation(ctx, volOptions, *vID, secret)
if errUndo != nil {
klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"),
requestName, errUndo)
}
klog.Errorf(util.Log(ctx, "failed to expand volume %s: %v"), volumeID(vID.FsSubvolName), err)
return nil, status.Error(codes.Internal, err.Error())
}
}
volumeContext := req.GetParameters()
volumeContext["subvolumeName"] = vID.FsSubvolName
volume := &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
ContentSource: req.GetVolumeContentSource(),
VolumeContext: volumeContext,
}
if volOptions.Topology != nil {
@ -183,22 +221,27 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil {
errDefer := undoVolReservation(ctx, volOptions, *vID, secret)
if errDefer != nil {
klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"),
requestName, errDefer)
if !errors.Is(err, ErrCloneInProgress) {
errDefer := undoVolReservation(ctx, volOptions, *vID, secret)
if errDefer != nil {
klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"),
requestName, errDefer)
}
}
}
}()
// Create a volume
err = cs.createBackingVolume(ctx, volOptions, vID, secret)
err = cs.createBackingVolume(ctx, volOptions, parentVol, vID, pvID, sID, cr)
if err != nil {
if errors.Is(err, ErrCloneInProgress) {
return nil, status.Error(codes.Aborted, err.Error())
}
return nil, err
}
util.DebugLog(ctx, "cephfs: successfully created backing volume named %s for request name %s",
vID.FsSubvolName, requestName)
volumeContext := req.GetParameters()
@ -206,6 +249,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
volume := &csi.Volume{
VolumeId: vID.VolumeID,
CapacityBytes: volOptions.Size,
ContentSource: req.GetVolumeContentSource(),
VolumeContext: volumeContext,
}
if volOptions.Topology != nil {