rbd: use go-ceph rbd admin task api instead of cli

This commit adds support to go-ceph rbd task api
`trash remove` and `flatten` instead of using cli
cmds.

Fixes: #2186

Signed-off-by: Rakshith R <rar@redhat.com>
This commit is contained in:
Rakshith R 2021-11-09 17:11:17 +05:30 committed by mergify[bot]
parent 0d1c2aa983
commit ad3c334a3a

View File

@ -56,9 +56,8 @@ const (
// Output strings returned during invocation of "ceph rbd task add remove <imagespec>" when
// command is not supported by ceph manager. Used to check errors and recover when the command
// is unsupported.
rbdTaskRemoveCmdInvalidString1 = "no valid command found"
rbdTaskRemoveCmdInvalidString2 = "Error EINVAL: invalid command"
rbdTaskRemoveCmdAccessDeniedMessage = "Error EACCES:"
rbdTaskRemoveCmdInvalidString = "No handler found"
rbdTaskRemoveCmdAccessDeniedMessage = "access denied:"
// image metadata key for thick-provisioning.
// As image metadata key starting with '.rbd' will not be copied when we do
@ -595,43 +594,27 @@ func isNotMountPoint(mounter mount.Interface, stagingTargetPath string) (bool, e
return isNotMnt, err
}
// addRbdManagerTask adds a ceph manager task to execute command
// asynchronously. If command is not found returns a bool set to false
// example arg ["trash", "remove","pool/image"].
func addRbdManagerTask(ctx context.Context, pOpts *rbdVolume, arg []string) (bool, error) {
args := []string{"rbd", "task", "add"}
args = append(args, arg...)
log.DebugLog(
ctx,
"executing %v for image (%s) using mon %s, pool %s",
args,
pOpts.RbdImageName,
pOpts.Monitors,
pOpts.Pool)
supported := true
_, stderr, err := util.ExecCommand(ctx, "ceph", args...)
if err != nil {
switch {
case strings.Contains(stderr, rbdTaskRemoveCmdInvalidString1) &&
strings.Contains(stderr, rbdTaskRemoveCmdInvalidString2):
log.WarningLog(
ctx,
"cluster with cluster ID (%s) does not support Ceph manager based rbd commands"+
"(minimum ceph version required is v14.2.3)",
pOpts.ClusterID)
supported = false
case strings.HasPrefix(stderr, rbdTaskRemoveCmdAccessDeniedMessage):
log.WarningLog(ctx, "access denied to Ceph MGR-based rbd commands on cluster ID (%s)", pOpts.ClusterID)
supported = false
default:
log.WarningLog(ctx, "uncaught error while scheduling a task (%v): %s", err, stderr)
}
}
if err != nil {
err = fmt.Errorf("%w. stdError:%s", err, stderr)
// isCephMgrSupported determines if the cluster has support for MGR based operation
// depending on the error.
func isCephMgrSupported(ctx context.Context, clusterID string, err error) bool {
switch {
case err == nil:
return true
case strings.Contains(err.Error(), rbdTaskRemoveCmdInvalidString):
log.WarningLog(
ctx,
"cluster with cluster ID (%s) does not support Ceph manager based rbd commands"+
"(minimum ceph version required is v14.2.3)",
clusterID)
return false
case strings.Contains(err.Error(), rbdTaskRemoveCmdAccessDeniedMessage):
log.WarningLog(ctx, "access denied to Ceph MGR-based rbd commands on cluster ID (%s)", clusterID)
return false
}
return supported, err
return true
}
// getTrashPath returns the image path for trash operation.
@ -704,14 +687,17 @@ func deleteImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) er
func trashRemoveImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credentials) error {
// attempt to use Ceph manager based deletion support if available
args := []string{
"trash", "remove",
pOpts.getTrashPath(),
"--id", cr.ID,
"--keyfile=" + cr.KeyFile,
"-m", pOpts.Monitors,
ra, err := pOpts.conn.GetRBDAdmin()
if err != nil {
return err
}
rbdCephMgrSupported, err := addRbdManagerTask(ctx, pOpts, args)
log.DebugLog(ctx, "rbd: adding task to remove image %s with id %s from trash", pOpts, pOpts.ImageID)
ta := ra.Task()
_, err = ta.AddTrashRemove(admin.NewImageSpec(pOpts.Pool, pOpts.RadosNamespace, pOpts.ImageID))
rbdCephMgrSupported := isCephMgrSupported(ctx, pOpts.ClusterID, err)
if rbdCephMgrSupported && err != nil {
log.ErrorLog(ctx, "failed to add task to delete rbd image: %s, %v", pOpts, err)
@ -725,6 +711,8 @@ func trashRemoveImage(ctx context.Context, pOpts *rbdVolume, cr *util.Credential
return err
}
} else {
log.DebugLog(ctx, "rbd: successfully added task to move image %s with id %s to trash", pOpts, pOpts.ImageID)
}
return nil
@ -850,9 +838,18 @@ func (rv *rbdVolume) flattenRbdImage(
if !forceFlatten && (depth < hardlimit) && (depth < softlimit) {
return nil
}
args := []string{"flatten", rv.String(), "--id", cr.ID, "--keyfile=" + cr.KeyFile, "-m", rv.Monitors}
supported, err := addRbdManagerTask(ctx, rv, args)
if supported {
ra, err := rv.conn.GetRBDAdmin()
if err != nil {
return err
}
log.DebugLog(ctx, "rbd: adding task to flatten image %s", rv)
ta := ra.Task()
_, err = ta.AddFlatten(admin.NewImageSpec(rv.Pool, rv.RadosNamespace, rv.RbdImageName))
rbdCephMgrSupported := isCephMgrSupported(ctx, rv.ClusterID, err)
if rbdCephMgrSupported {
if err != nil {
// discard flattening error if the image does not have any parent
rbdFlattenNoParent := fmt.Sprintf("Image %s/%s does not have a parent", rv.Pool, rv.RbdImageName)
@ -866,8 +863,9 @@ func (rv *rbdVolume) flattenRbdImage(
if forceFlatten || depth >= hardlimit {
return fmt.Errorf("%w: flatten is in progress for image %s", ErrFlattenInProgress, rv.RbdImageName)
}
log.DebugLog(ctx, "successfully added task to flatten image %s", rv)
}
if !supported {
if !rbdCephMgrSupported {
log.ErrorLog(
ctx,
"task manager does not support flatten,image will be flattened once hardlimit is reached: %v",