cephfs: add snapshot create and delete functionalilies

It also add helper routines like parsetime,doSnapshot..etc

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal 2020-08-04 09:55:28 +05:30 committed by mergify[bot]
parent 20c90ddfc4
commit c3400bfb97

View File

@ -19,11 +19,14 @@ package cephfs
import ( import (
"context" "context"
"errors" "errors"
"fmt"
csicommon "github.com/ceph/ceph-csi/internal/csi-common" csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
"github.com/container-storage-interface/spec/lib/go/csi" "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
klog "k8s.io/klog/v2" klog "k8s.io/klog/v2"
@ -423,3 +426,290 @@ func (cs *ControllerServer) ControllerExpandVolume(ctx context.Context, req *csi
NodeExpansionRequired: false, NodeExpansionRequired: false,
}, nil }, nil
} }
// CreateSnapshot creates the snapshot in backend and stores metadata
// in store
// nolint:gocyclo // golangci-lint did not catch this earlier, needs to get fixed late
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
if err := cs.validateSnapshotReq(ctx, req); err != nil {
return nil, err
}
cr, err := util.NewAdminCredentials(req.GetSecrets())
if err != nil {
return nil, err
}
defer cr.DeleteCredentials()
clusterData, err := getClusterInformation(req.GetParameters())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
requestName := req.GetName()
sourceVolID := req.GetSourceVolumeId()
// Existence and conflict checks
if acquired := cs.SnapshotLocks.TryAcquire(requestName); !acquired {
klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), requestName)
return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, requestName)
}
defer cs.SnapshotLocks.Release(requestName)
if err = cs.OperationLocks.GetSnapshotCreateLock(sourceVolID); err != nil {
klog.Error(util.Log(ctx, err.Error()))
return nil, status.Error(codes.Aborted, err.Error())
}
defer cs.OperationLocks.ReleaseSnapshotCreateLock(sourceVolID)
// Find the volume using the provided VolumeID
parentVolOptions, vid, err := newVolumeOptionsFromVolID(ctx, sourceVolID, nil, req.GetSecrets())
if err != nil {
if errors.Is(err, util.ErrPoolNotFound) {
klog.Warningf(util.Log(ctx, "failed to get backend volume for %s: %v"), sourceVolID, err)
return nil, status.Error(codes.NotFound, err.Error())
}
if errors.Is(err, ErrVolumeNotFound) {
return nil, status.Error(codes.NotFound, err.Error())
}
return nil, status.Error(codes.Internal, err.Error())
}
if clusterData.ClusterID != parentVolOptions.ClusterID {
return nil, status.Errorf(codes.InvalidArgument, "requested cluster id %s not matching subvolume cluster id %s", clusterData.ClusterID, parentVolOptions.ClusterID)
}
cephfsSnap, genSnapErr := genSnapFromOptions(ctx, req)
if genSnapErr != nil {
return nil, status.Error(codes.Internal, genSnapErr.Error())
}
// lock out parallel snapshot create operations
if acquired := cs.VolumeLocks.TryAcquire(sourceVolID); !acquired {
klog.Errorf(util.Log(ctx, util.VolumeOperationAlreadyExistsFmt), sourceVolID)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, sourceVolID)
}
defer cs.VolumeLocks.Release(sourceVolID)
snapName := req.GetName()
sid, snapInfo, err := checkSnapExists(ctx, parentVolOptions, vid.FsSubvolName, cephfsSnap, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
// check are we able to retrieve the size of parent
// ceph fs subvolume info command got added in 14.2.10 and 15.+
// as we are not able to retrieve the parent size we are rejecting the
// request to create snapshot.
// TODO: For this purpose we could make use of cached clusterAdditionalInfo too.
var info Subvolume
info, err = getSubVolumeInfo(ctx, parentVolOptions, cr, volumeID(vid.FsSubvolName))
if err != nil {
// Check error code value against ErrInvalidCommand to understand the cluster
// support it or not, its safe to evaluat as the filtering
// is already done from getSubVolumeInfo() and send out the error here.
if errors.Is(err, ErrInvalidCommand) {
return nil, status.Error(codes.FailedPrecondition, "subvolume info command not supported in current ceph cluster")
}
if sid != nil {
errDefer := undoSnapReservation(ctx, parentVolOptions, *sid, snapName, cr)
if errDefer != nil {
klog.Warningf(util.Log(ctx, "failed undoing reservation of snapshot: %s (%s)"),
requestName, errDefer)
}
}
return nil, status.Error(codes.Internal, err.Error())
}
if sid != nil {
// check snapshot is protected
protected := true
if !(snapInfo.Protected == snapshotIsProtected) {
err = protectSnapshot(ctx, parentVolOptions, cr, volumeID(sid.FsSnapshotName), volumeID(vid.FsSubvolName))
if err != nil {
protected = false
klog.Warningf(util.Log(ctx, "failed to protect snapshot of snapshot: %s (%s)"),
sid.FsSnapshotName, err)
}
}
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: int64(info.BytesQuota),
SnapshotId: sid.SnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: sid.CreationTime,
ReadyToUse: protected,
},
}, nil
}
// Reservation
sID, err := reserveSnap(ctx, parentVolOptions, vid.FsSubvolName, cephfsSnap, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
defer func() {
if err != nil {
errDefer := undoSnapReservation(ctx, parentVolOptions, *sID, snapName, cr)
if errDefer != nil {
klog.Warningf(util.Log(ctx, "failed undoing reservation of snapshot: %s (%s)"),
requestName, errDefer)
}
}
}()
snap := snapshotInfo{}
snap, err = doSnapshot(ctx, parentVolOptions, vid.FsSubvolName, sID.FsSnapshotName, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: int64(info.BytesQuota),
SnapshotId: sID.SnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: snap.CreationTime,
ReadyToUse: true,
},
}, nil
}
func doSnapshot(ctx context.Context, volOpt *volumeOptions, subvolumeName, snapshotName string, cr *util.Credentials) (snapshotInfo, error) {
volID := volumeID(subvolumeName)
snapID := volumeID(snapshotName)
snap := snapshotInfo{}
err := createSnapshot(ctx, volOpt, cr, snapID, volID)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to create snapshot %s %v"), snapID, err)
return snap, err
}
defer func() {
if err != nil {
dErr := deleteSnapshot(ctx, volOpt, cr, snapID, volID)
if dErr != nil {
klog.Errorf(util.Log(ctx, "failed to delete snapshot %s %v"), snapID, err)
}
}
}()
snap, err = getSnapshotInfo(ctx, volOpt, cr, snapID, volID)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to get snapshot info %s %v"), snapID, err)
return snap, fmt.Errorf("failed to get snapshot info for snapshot:%s", snapID)
}
var t *timestamp.Timestamp
t, err = parseTime(ctx, snap.CreatedAt)
if err != nil {
return snap, err
}
snap.CreationTime = t
err = protectSnapshot(ctx, volOpt, cr, snapID, volID)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to protect snapshot %s %v"), snapID, err)
}
return snap, err
}
func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.CreateSnapshotRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
klog.Errorf(util.Log(ctx, "invalid create snapshot req: %v"), protosanitizer.StripSecrets(req))
return err
}
// Check sanity of request Snapshot Name, Source Volume Id
if req.Name == "" {
return status.Error(codes.NotFound, "snapshot Name cannot be empty")
}
if req.SourceVolumeId == "" {
return status.Error(codes.NotFound, "source Volume ID cannot be empty")
}
return nil
}
// DeleteSnapshot deletes the snapshot in backend and removes the
// snapshot metadata from store.
func (cs *ControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
klog.Errorf(util.Log(ctx, "invalid delete snapshot req: %v"), protosanitizer.StripSecrets(req))
return nil, err
}
cr, err := util.NewAdminCredentials(req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
defer cr.DeleteCredentials()
snapshotID := req.GetSnapshotId()
if snapshotID == "" {
return nil, status.Error(codes.InvalidArgument, "snapshot ID cannot be empty")
}
if acquired := cs.SnapshotLocks.TryAcquire(snapshotID); !acquired {
klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), snapshotID)
return nil, status.Errorf(codes.Aborted, util.SnapshotOperationAlreadyExistsFmt, snapshotID)
}
defer cs.SnapshotLocks.Release(snapshotID)
// lock out snapshotID for restore operation
if err = cs.OperationLocks.GetDeleteLock(snapshotID); err != nil {
klog.Error(util.Log(ctx, err.Error()))
return nil, status.Error(codes.Aborted, err.Error())
}
defer cs.OperationLocks.ReleaseDeleteLock(snapshotID)
volOpt, snapInfo, sid, err := newSnapshotOptionsFromID(ctx, snapshotID, cr)
if err != nil {
// if error is ErrPoolNotFound, the pool is already deleted we dont
// need to worry about deleting snapshot or omap data, return success
if errors.Is(err, util.ErrPoolNotFound) {
klog.Warningf(util.Log(ctx, "failed to get backend snapshot for %s: %v"), snapshotID, err)
return &csi.DeleteSnapshotResponse{}, nil
}
// if error is ErrKeyNotFound, then a previous attempt at deletion was complete
// or partially complete (snap and snapOMap are garbage collected already), hence return
// success as deletion is complete
if errors.Is(err, util.ErrKeyNotFound) {
return &csi.DeleteSnapshotResponse{}, nil
}
if errors.Is(err, ErrSnapNotFound) {
err = undoSnapReservation(ctx, volOpt, *sid, sid.FsSnapshotName, cr)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)"),
sid.FsSubvolName, sid.FsSnapshotName, err)
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteSnapshotResponse{}, nil
}
return nil, status.Error(codes.Internal, err.Error())
}
// safeguard against parallel create or delete requests against the same
// name
if acquired := cs.SnapshotLocks.TryAcquire(sid.RequestName); !acquired {
klog.Errorf(util.Log(ctx, util.SnapshotOperationAlreadyExistsFmt), sid.RequestName)
return nil, status.Errorf(codes.Aborted, util.VolumeOperationAlreadyExistsFmt, sid.RequestName)
}
defer cs.SnapshotLocks.Release(sid.RequestName)
if snapInfo.HasPendingClones == "yes" {
return nil, status.Errorf(codes.FailedPrecondition, "snapshot %s has pending clones", snapshotID)
}
if snapInfo.Protected == snapshotIsProtected {
err = unprotectSnapshot(ctx, volOpt, cr, volumeID(sid.FsSnapshotName), volumeID(sid.FsSubvolName))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}
err = deleteSnapshot(ctx, volOpt, cr, volumeID(sid.FsSnapshotName), volumeID(sid.FsSubvolName))
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
err = undoSnapReservation(ctx, volOpt, *sid, sid.FsSnapshotName, cr)
if err != nil {
klog.Errorf(util.Log(ctx, "failed to remove reservation for snapname (%s) with backing snap (%s) (%s)"),
sid.RequestName, sid.FsSnapshotName, err)
return nil, status.Error(codes.Internal, err.Error())
}
return &csi.DeleteSnapshotResponse{}, nil
}