cleanup: address "nolint" comments for RBD CreateSnapshot

Introduce helper function cloneFromSnapshot() that takes care of the
procedures that are needed when an existing snapshot has been found.

Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2021-04-08 11:21:44 +02:00 committed by mergify[bot]
parent b5d0524c39
commit 596410ae60

View File

@ -711,10 +711,7 @@ func (cs *ControllerServer) ValidateVolumeCapabilities(ctx context.Context, req
}, nil }, nil
} }
// CreateSnapshot creates the snapshot in backend and stores metadata // CreateSnapshot creates the snapshot in backend and stores metadata in store.
// in store
// TODO: make this function less complex
// nolint:gocyclo,nestif // complexity needs to be reduced.
func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) {
if err := cs.validateSnapshotReq(ctx, req); err != nil { if err := cs.validateSnapshotReq(ctx, req); err != nil {
return nil, err return nil, err
@ -780,57 +777,7 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
return nil, status.Errorf(codes.Internal, err.Error()) return nil, status.Errorf(codes.Internal, err.Error())
} }
if found { if found {
vol := generateVolFromSnap(rbdSnap) return cloneFromSnapshot(ctx, rbdVol, rbdSnap, cr)
err = vol.Connect(cr)
if err != nil {
uErr := undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr)
if uErr != nil {
util.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", req.GetName(), uErr)
}
return nil, status.Errorf(codes.Internal, err.Error())
}
defer vol.Destroy()
if rbdVol.isEncrypted() {
cryptErr := rbdVol.copyEncryptionConfig(&vol.rbdImage)
if cryptErr != nil {
util.WarningLog(ctx, "failed copy encryption "+
"config for %q: %v", vol.String(),
req.GetName(), cryptErr)
return nil, status.Errorf(codes.Internal,
err.Error())
}
}
err = vol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
switch {
case errors.Is(err, ErrFlattenInProgress):
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: rbdSnap.SizeBytes,
SnapshotId: rbdSnap.VolID,
SourceVolumeId: rbdSnap.SourceVolumeID,
CreationTime: rbdSnap.CreatedAt,
ReadyToUse: false,
},
}, nil
case err != nil:
uErr := undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr)
if uErr != nil {
util.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", req.GetName(), uErr)
}
return nil, status.Errorf(codes.Internal, err.Error())
default:
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: rbdSnap.SizeBytes,
SnapshotId: rbdSnap.VolID,
SourceVolumeId: rbdSnap.SourceVolumeID,
CreationTime: rbdSnap.CreatedAt,
ReadyToUse: true,
},
}, nil
}
} }
err = flattenTemporaryClonedImages(ctx, rbdVol, cr) err = flattenTemporaryClonedImages(ctx, rbdVol, cr)
@ -870,6 +817,58 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
}, nil }, nil
} }
// cloneFromSnapshot is a helper for CreateSnapshot that continues creating an
// RBD image from an RBD snapshot if the process was interrupted at one point.
func cloneFromSnapshot(ctx context.Context, rbdVol *rbdVolume, rbdSnap *rbdSnapshot, cr *util.Credentials) (*csi.CreateSnapshotResponse, error) {
vol := generateVolFromSnap(rbdSnap)
err := vol.Connect(cr)
if err != nil {
uErr := undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr)
if uErr != nil {
util.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", rbdSnap.RequestName, uErr)
}
return nil, status.Errorf(codes.Internal, err.Error())
}
defer vol.Destroy()
if rbdVol.isEncrypted() {
// FIXME: vol.VolID should be different from rbdVol.VolID
vol.VolID = rbdVol.VolID
cryptErr := rbdVol.copyEncryptionConfig(&vol.rbdImage)
if cryptErr != nil {
util.WarningLog(ctx, "failed copy encryption "+
"config for %q: %v", vol.String(),
rbdSnap.RequestName, cryptErr)
return nil, status.Errorf(codes.Internal,
err.Error())
}
}
// if flattening is not needed, the snapshot is ready for use
readyToUse := true
err = vol.flattenRbdImage(ctx, cr, false, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if errors.Is(err, ErrFlattenInProgress) {
readyToUse = false
} else if err != nil {
uErr := undoSnapshotCloning(ctx, vol, rbdSnap, vol, cr)
if uErr != nil {
util.WarningLog(ctx, "failed undoing reservation of snapshot: %s %v", rbdSnap.RequestName, uErr)
}
return nil, status.Errorf(codes.Internal, err.Error())
}
return &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{
SizeBytes: rbdSnap.SizeBytes,
SnapshotId: rbdSnap.VolID,
SourceVolumeId: rbdSnap.SourceVolumeID,
CreationTime: rbdSnap.CreatedAt,
ReadyToUse: readyToUse,
},
}, nil
}
func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.CreateSnapshotRequest) error { func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.CreateSnapshotRequest) error {
if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil { if err := cs.Driver.ValidateControllerServiceRequest(csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT); err != nil {
util.ErrorLog(ctx, "invalid create snapshot req: %v", protosanitizer.StripSecrets(req)) util.ErrorLog(ctx, "invalid create snapshot req: %v", protosanitizer.StripSecrets(req))