rbd: replace cli with go-ceph for snapshot namespace

cephcsi uses cli for fetching snap list as well as to check the
snapshot namespace, replaced that with go-ceph calls.

Signed-off-by: Mudit Agarwal <muagarwa@redhat.com>
This commit is contained in:
Mudit Agarwal 2020-08-27 16:39:45 +05:30 committed by mergify[bot]
parent 06066cd153
commit e1237f348f
2 changed files with 82 additions and 49 deletions

View File

@ -347,7 +347,7 @@ func flattenParentImage(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent
// If the snapshots are more than the `maxSnapshotsOnImage` Add a task to
// flatten all the temporary cloned images.
func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *util.Credentials) error {
snaps, err := rbdVol.listSnapshots(ctx, cr)
snaps, err := rbdVol.listSnapshots()
if err != nil {
if errors.Is(err, ErrImageNotFound) {
return status.Error(codes.InvalidArgument, err.Error())
@ -356,7 +356,13 @@ func flattenTemporaryClonedImages(ctx context.Context, rbdVol *rbdVolume, cr *ut
}
if len(snaps) > int(maxSnapshotsOnImage) {
err = flattenClonedRbdImages(ctx, snaps, rbdVol.Pool, rbdVol.Monitors, cr)
err = flattenClonedRbdImages(
ctx,
snaps,
rbdVol.Pool,
rbdVol.Monitors,
rbdVol.RbdImageName,
cr)
if err != nil {
return status.Error(codes.Internal, err.Error())
}

View File

@ -417,10 +417,15 @@ func (rv *rbdVolume) getCloneDepth(ctx context.Context) (uint, error) {
}
}
func flattenClonedRbdImages(ctx context.Context, snaps []snapshotInfo, pool, monitors string, cr *util.Credentials) error {
type trashSnapInfo struct {
origSnapName string
}
func flattenClonedRbdImages(ctx context.Context, snaps []librbd.SnapInfo, pool, monitors, rbdImageName string, cr *util.Credentials) error {
rv := &rbdVolume{
Monitors: monitors,
Pool: pool,
Monitors: monitors,
Pool: pool,
RbdImageName: rbdImageName,
}
defer rv.Destroy()
err := rv.Connect(cr)
@ -428,14 +433,30 @@ func flattenClonedRbdImages(ctx context.Context, snaps []snapshotInfo, pool, mon
util.ErrorLog(ctx, "failed to open connection %s; err %v", rv, err)
return err
}
for _, s := range snaps {
if s.Namespace.Type == "trash" {
rv.RbdImageName = s.Namespace.OriginalName
err = rv.flattenRbdImage(ctx, cr, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
util.ErrorLog(ctx, "failed to flatten %s; err %v", rv, err)
continue
var origNameList []trashSnapInfo
for _, snapInfo := range snaps {
// check if the snapshot belongs to trash namespace.
isTrash, retErr := rv.isTrashSnap(snapInfo.Id)
if retErr != nil {
return retErr
}
if isTrash {
// get original snap name for the snapshot in trash namespace
origSnapName, retErr := rv.getOrigSnapName(snapInfo.Id)
if retErr != nil {
return retErr
}
origNameList = append(origNameList, trashSnapInfo{origSnapName})
}
}
for _, snapName := range origNameList {
rv.RbdImageName = snapName.origSnapName
err = rv.flattenRbdImage(ctx, cr, true, rbdHardMaxCloneDepth, rbdSoftMaxCloneDepth)
if err != nil {
util.ErrorLog(ctx, "failed to flatten %s; err %v", rv, err)
continue
}
}
return nil
@ -1129,47 +1150,53 @@ func (rv *rbdVolume) ensureEncryptionMetadataSet(status string) error {
return nil
}
// SnapshotInfo holds snapshots details.
type snapshotInfo struct {
ID int `json:"id"`
Name string `json:"name"`
Size int64 `json:"size"`
Protected string `json:"protected"`
Timestamp string `json:"timestamp"`
Namespace struct {
Type string `json:"type"`
OriginalName string `json:"original_name"`
} `json:"namespace"`
func (rv *rbdVolume) listSnapshots() ([]librbd.SnapInfo, error) {
image, err := rv.open()
if err != nil {
return nil, err
}
defer image.Close()
snapInfoList, err := image.GetSnapshotNames()
if err != nil {
return nil, err
}
return snapInfoList, nil
}
// TODO: use go-ceph once https://github.com/ceph/go-ceph/issues/300 is available in a release.
func (rv *rbdVolume) listSnapshots(ctx context.Context, cr *util.Credentials) ([]snapshotInfo, error) {
// rbd snap ls <image> --pool=<pool-name> --all --format=json
var snapInfo []snapshotInfo
stdout, stderr, err := util.ExecCommand(
ctx,
"rbd",
"-m", rv.Monitors,
"--id", cr.ID,
"--keyfile="+cr.KeyFile,
"-c", util.CephConfigPath,
"--format="+"json",
"snap",
"ls",
"--all", rv.String())
// isTrashSnap returns true if the snapshot belongs to trash namespace.
func (rv *rbdVolume) isTrashSnap(snapID uint64) (bool, error) {
image, err := rv.open()
if err != nil {
util.ErrorLog(ctx, "failed getting information for image (%s): (%s)", rv, err)
if strings.Contains(stderr, "rbd: error opening image "+rv.RbdImageName+
": (2) No such file or directory") {
return snapInfo, util.JoinErrors(ErrImageNotFound, err)
}
return snapInfo, err
return false, err
}
defer image.Close()
// Get namespace type for the snapshot
nsType, err := image.GetSnapNamespaceType(snapID)
if err != nil {
return false, err
}
err = json.Unmarshal([]byte(stdout), &snapInfo)
if err != nil {
util.ErrorLog(ctx, "failed to parse JSON output of snapshot info (%s)", err)
return snapInfo, fmt.Errorf("unmarshal failed: %w. raw buffer response: %s", err, stdout)
if nsType == librbd.SnapNamespaceTypeTrash {
return true, nil
}
return snapInfo, nil
return false, nil
}
// getOrigSnapName returns the original snap name for
// the snapshots in Trash Namespace.
func (rv *rbdVolume) getOrigSnapName(snapID uint64) (string, error) {
image, err := rv.open()
if err != nil {
return "", err
}
defer image.Close()
origSnapName, err := image.GetSnapTrashNamespace(snapID)
if err != nil {
return "", err
}
return origSnapName, nil
}