mirror of
https://github.com/ceph/ceph-csi.git
synced 2024-12-18 02:50:30 +00:00
rbd: don't delete volume/snapshot if metadata creation fails
This commit is contained in:
parent
8223ae325b
commit
1803a1be97
@ -115,6 +115,15 @@ func parseVolCreateRequest(req *csi.CreateVolumeRequest) (*rbdVolume, error) {
|
||||
return rbdVol, nil
|
||||
}
|
||||
|
||||
func storeVolumeMetadata(vol *rbdVolume, cp util.CachePersister) error {
|
||||
if err := cp.Create(vol.VolID, vol); err != nil {
|
||||
klog.Errorf("failed to store metadata for volume %s: %v", vol.VolID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateVolume creates the volume in backend and store the volume metadata
|
||||
func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
|
||||
|
||||
@ -136,6 +145,11 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
// request
|
||||
if exVol.VolSize >= req.GetCapacityRange().GetRequiredBytes() {
|
||||
// existing volume is compatible with new request and should be reused.
|
||||
|
||||
if err = storeVolumeMetadata(exVol, cs.MetadataStore); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
// TODO (sbezverk) Do I need to make sure that RBD volume still exists?
|
||||
return &csi.CreateVolumeResponse{
|
||||
Volume: &csi.Volume{
|
||||
@ -160,16 +174,13 @@ func (cs *ControllerServer) CreateVolume(ctx context.Context, req *csi.CreateVol
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if createErr := cs.MetadataStore.Create(rbdVol.VolID, rbdVol); createErr != nil {
|
||||
klog.Warningf("failed to store volume metadata with error: %v", err)
|
||||
if err = deleteRBDImage(rbdVol, rbdVol.AdminID, req.GetSecrets()); err != nil {
|
||||
klog.V(3).Infof("failed to delete rbd image: %s/%s with error: %v", rbdVol.Pool, rbdVol.VolName, err)
|
||||
return nil, err
|
||||
}
|
||||
return nil, createErr
|
||||
}
|
||||
|
||||
rbdVolumes[rbdVol.VolID] = rbdVol
|
||||
|
||||
if err = storeVolumeMetadata(rbdVol, cs.MetadataStore); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return &csi.CreateVolumeResponse{
|
||||
Volume: &csi.Volume{
|
||||
VolumeId: rbdVol.VolID,
|
||||
@ -311,6 +322,10 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
||||
// check for the requested source volume id and already allocated source volume id
|
||||
if exSnap, err := getRBDSnapshotByName(req.GetName()); err == nil {
|
||||
if req.SourceVolumeId == exSnap.SourceVolumeID {
|
||||
if err = storeSnapshotMetadata(exSnap, cs.MetadataStore); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
return &csi.CreateSnapshotResponse{
|
||||
Snapshot: &csi.Snapshot{
|
||||
SizeBytes: exSnap.SizeBytes,
|
||||
@ -357,11 +372,12 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
||||
|
||||
rbdSnap.CreatedAt = ptypes.TimestampNow().GetSeconds()
|
||||
|
||||
if err = cs.storeSnapMetadata(rbdSnap, req.GetSecrets()); err != nil {
|
||||
return nil, err
|
||||
rbdSnapshots[snapshotID] = rbdSnap
|
||||
|
||||
if err = storeSnapshotMetadata(rbdSnap, cs.MetadataStore); err != nil {
|
||||
return nil, status.Error(codes.Internal, err.Error())
|
||||
}
|
||||
|
||||
rbdSnapshots[snapshotID] = rbdSnap
|
||||
return &csi.CreateSnapshotResponse{
|
||||
Snapshot: &csi.Snapshot{
|
||||
SizeBytes: rbdSnap.SizeBytes,
|
||||
@ -375,22 +391,13 @@ func (cs *ControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateS
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) storeSnapMetadata(rbdSnap *rbdSnapshot, secret map[string]string) error {
|
||||
errCreate := cs.MetadataStore.Create(rbdSnap.SnapID, rbdSnap)
|
||||
if errCreate != nil {
|
||||
klog.Warningf("rbd: failed to store snapInfo with error: %v", errCreate)
|
||||
// Unprotect snapshot
|
||||
err := unprotectSnapshot(rbdSnap, rbdSnap.AdminID, secret)
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to unprotect snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
|
||||
}
|
||||
// Deleting snapshot
|
||||
klog.V(4).Infof("deleting Snaphot %s", rbdSnap.SnapName)
|
||||
if err = deleteSnapshot(rbdSnap, rbdSnap.AdminID, secret); err != nil {
|
||||
return status.Errorf(codes.Unknown, "This Snapshot should be removed but failed to delete snapshot: %s/%s with error: %v", rbdSnap.Pool, rbdSnap.SnapName, err)
|
||||
}
|
||||
func storeSnapshotMetadata(rbdSnap *rbdSnapshot, cp util.CachePersister) error {
|
||||
if err := cp.Create(rbdSnap.SnapID, rbdSnap); err != nil {
|
||||
klog.Errorf("failed to store metadata for snapshot %s: %v", rbdSnap.SnapID, err)
|
||||
return err
|
||||
}
|
||||
return errCreate
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cs *ControllerServer) validateSnapshotReq(req *csi.CreateSnapshotRequest) error {
|
||||
|
Loading…
Reference in New Issue
Block a user