rbd: check volume details from original volumeID

Checking volume details for the existing volumeID
first. if details like OMAP, RBD Image, Pool doesnot
exists try to use clusterIDMapping to look for the
correct informations.

fixes: #2929

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2022-03-10 17:27:38 +05:30 committed by mergify[bot]
parent 6bba64c872
commit 07e9dede2c

View File

@ -25,7 +25,6 @@ import (
"strings"
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/journal"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/fscrypt"
"github.com/ceph/ceph-csi/internal/util/log"
@ -144,14 +143,12 @@ func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *r
// this function also receive the credentials and secrets args as it differs in its data.
// The credentials are used directly by functions like voljournal.Connect() and other functions
// like genVolFromVolumeOptions() make use of secrets.
// nolint:gocyclo,cyclop // reduce complexity
func populateRbdVol(
ctx context.Context,
req *csi.NodeStageVolumeRequest,
cr *util.Credentials,
) (*rbdVolume, error) {
var err error
var j *journal.Connection
volID := req.GetVolumeId()
isBlock := req.GetVolumeCapability().GetBlock() != nil
disableInUseChecks := false
@ -173,11 +170,7 @@ func populateRbdVol(
disableInUseChecks = true
}
rv, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), disableInUseChecks, true)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
var rv *rbdVolume
isStaticVol := parseBoolOption(ctx, req.GetVolumeContext(), staticVol, false)
// get rbd image name from the volume journal
@ -187,35 +180,24 @@ func populateRbdVol(
// if migration static volume, use imageName as volID
volID = req.GetVolumeContext()["imageName"]
}
rv, err = genVolFromVolumeOptions(ctx, req.GetVolumeContext(), disableInUseChecks, true)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
rv.RbdImageName = volID
} else {
var vi util.CSIIdentifier
var imageAttributes *journal.ImageAttributes
err = vi.DecomposeCSIID(volID)
rv, err = GenVolFromVolID(ctx, volID, cr, req.GetSecrets())
if err != nil {
err = fmt.Errorf("error decoding volume ID (%s): %w", volID, err)
rv.Destroy()
log.ErrorLog(ctx, "error generating volume %s: %v", volID, err)
return nil, status.Error(codes.Internal, err.Error())
return nil, status.Errorf(codes.Internal, "error generating volume %s: %v", volID, err)
}
j, err = volJournal.Connect(rv.Monitors, rv.RadosNamespace, cr)
if err != nil {
log.ErrorLog(ctx, "failed to establish cluster connection: %v", err)
return nil, status.Error(codes.Internal, err.Error())
rv.DataPool = req.GetVolumeContext()["dataPool"]
var ok bool
if rv.Mounter, ok = req.GetVolumeContext()["mounter"]; !ok {
rv.Mounter = rbdDefaultMounter
}
defer j.Destroy()
imageAttributes, err = j.GetImageAttributes(
ctx, rv.Pool, vi.ObjectUUID, false)
if err != nil {
err = fmt.Errorf("error fetching image attributes for volume ID (%s): %w", volID, err)
return nil, status.Error(codes.Internal, err.Error())
}
rv.RbdImageName = imageAttributes.ImageName
// set owner after extracting the owner name from the journal
rv.Owner = imageAttributes.Owner
}
err = rv.Connect(cr)