From d1fe12b4f00a0ca07c587dffe6219faead97b454 Mon Sep 17 00:00:00 2001 From: Humble Chirammal Date: Tue, 4 Aug 2020 09:26:43 +0530 Subject: [PATCH] cephfs: change createvolume for snapshot/clone workflow Signed-off-by: Humble Chirammal --- internal/cephfs/controllerserver.go | 60 +++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 8 deletions(-) diff --git a/internal/cephfs/controllerserver.go b/internal/cephfs/controllerserver.go index 4cab838fb..f40691c5b 100644 --- a/internal/cephfs/controllerserver.go +++ b/internal/cephfs/controllerserver.go @@ -124,6 +124,7 @@ func checkContentSource(ctx context.Context, req *csi.CreateVolumeRequest, cr *u } // CreateVolume creates a reservation and the volume in backend, if it is not already present. +// nolint:gocognit:gocyclo // TODO: reduce complexity func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { if err := cs.validateCreateVolumeRequest(req); err != nil { klog.Errorf(util.Log(ctx, "CreateVolumeRequest validation failed: %v"), err) @@ -134,6 +135,13 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol secret := req.GetSecrets() requestName := req.GetName() + cr, err := util.NewAdminCredentials(secret) + if err != nil { + klog.Errorf(util.Log(ctx, "failed to retrieve admin credentials: %v"), err) + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + defer cr.DeleteCredentials() + // Existence and conflict checks if acquired := cs.VolumeLocks.TryAcquire(requestName); !acquired { klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), requestName) @@ -152,19 +160,49 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol } // TODO need to add check for 0 volume size - vID, err := checkVolExists(ctx, volOptions, secret) + parentVol, pvID, sID, err := checkContentSource(ctx, req, cr) if err != nil { + return nil, err + } + vID, err := checkVolExists(ctx, volOptions, parentVol, pvID, sID, cr) + if err != nil { + if errors.Is(err, ErrCloneInProgress) { + return nil, status.Error(codes.Aborted, err.Error()) + } return nil, status.Error(codes.Internal, err.Error()) } - // TODO return error message if requested vol size greater than found volume return error if vID != nil { + if sID != nil || pvID != nil { + // while cloning the volume the size is not populated properly to the new volume now. + // it will be fixed in cephfs soon with the parentvolume size. Till then by below + // resize we are making sure we return or satisfy the requested size by setting the size + // explictly + err = resizeVolume(ctx, volOptions, cr, volumeID(vID.FsSubvolName), volOptions.Size) + if err != nil { + if purgeErr := purgeVolume(ctx, volumeID(vID.FsSubvolName), cr, volOptions, false); err != nil { + klog.Errorf(util.Log(ctx, "failed to delete volume %s: %v"), requestName, purgeErr) + // All errors other than ErrVolumeNotFound should return an error back to the caller + if !errors.Is(purgeErr, ErrVolumeNotFound) { + return nil, status.Error(codes.Internal, purgeErr.Error()) + } + } + errUndo := undoVolReservation(ctx, volOptions, *vID, secret) + if errUndo != nil { + klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), + requestName, errUndo) + } + klog.Errorf(util.Log(ctx, "failed to expand volume %s: %v"), volumeID(vID.FsSubvolName), err) + return nil, status.Error(codes.Internal, err.Error()) + } + } volumeContext := req.GetParameters() volumeContext["subvolumeName"] = vID.FsSubvolName volume := &csi.Volume{ VolumeId: vID.VolumeID, CapacityBytes: volOptions.Size, + ContentSource: req.GetVolumeContentSource(), VolumeContext: volumeContext, } if volOptions.Topology != nil { @@ -183,22 +221,27 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol if err != nil { return nil, status.Error(codes.Internal, err.Error()) } + defer func() { if err != nil { - errDefer := undoVolReservation(ctx, volOptions, *vID, secret) - if errDefer != nil { - klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), - requestName, errDefer) + if !errors.Is(err, ErrCloneInProgress) { + errDefer := undoVolReservation(ctx, volOptions, *vID, secret) + if errDefer != nil { + klog.Warningf(util.Log(ctx, "failed undoing reservation of volume: %s (%s)"), + requestName, errDefer) + } } } }() // Create a volume - err = cs.createBackingVolume(ctx, volOptions, vID, secret) + err = cs.createBackingVolume(ctx, volOptions, parentVol, vID, pvID, sID, cr) if err != nil { + if errors.Is(err, ErrCloneInProgress) { + return nil, status.Error(codes.Aborted, err.Error()) + } return nil, err } - util.DebugLog(ctx, "cephfs: successfully created backing volume named %s for request name %s", vID.FsSubvolName, requestName) volumeContext := req.GetParameters() @@ -206,6 +249,7 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol volume := &csi.Volume{ VolumeId: vID.VolumeID, CapacityBytes: volOptions.Size, + ContentSource: req.GetVolumeContentSource(), VolumeContext: volumeContext, } if volOptions.Topology != nil {