rbd: do not read pvc namespace from volume attributes

Below are the 3 different cases where we need
the PVC namespace for encryption

* CreateVolume:- Read the namespace from the
createVolume parameters and store it in the omap
* NodeStage:- Read the namespace from the omap
not from the volumeContext
* Regenerate:- Read the pvc namespace from the claimRef
not from the volumeAttributes.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2022-03-15 18:28:02 +05:30 committed by mergify[bot]
parent 77011fbc61
commit 8c5e414d53
6 changed files with 39 additions and 24 deletions

View File

@ -139,6 +139,12 @@ func (r ReconcilePersistentVolume) reconcilePV(ctx context.Context, obj runtime.
if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.config.DriverName { if pv.Spec.CSI == nil || pv.Spec.CSI.Driver != r.config.DriverName {
return nil return nil
} }
// PV is not attached to any PVC
if pv.Spec.ClaimRef == nil {
return nil
}
pvcNamespace := pv.Spec.ClaimRef.Namespace
requestName := pv.Name requestName := pv.Name
volumeHandler := pv.Spec.CSI.VolumeHandle volumeHandler := pv.Spec.CSI.VolumeHandle
secretName := "" secretName := ""
@ -171,7 +177,7 @@ func (r ReconcilePersistentVolume) reconcilePV(ctx context.Context, obj runtime.
} }
defer cr.DeleteCredentials() defer cr.DeleteCredentials()
rbdVolID, err := rbd.RegenerateJournal(pv.Spec.CSI.VolumeAttributes, volumeHandler, requestName, cr) rbdVolID, err := rbd.RegenerateJournal(pv.Spec.CSI.VolumeAttributes, volumeHandler, requestName, pvcNamespace, cr)
if err != nil { if err != nil {
log.ErrorLogMsg("failed to regenerate journal %s", err) log.ErrorLogMsg("failed to regenerate journal %s", err)

View File

@ -124,13 +124,27 @@ func (cs *ControllerServer) parseVolCreateRequest(
rbdVol, err := genVolFromVolumeOptions( rbdVol, err := genVolFromVolumeOptions(
ctx, ctx,
req.GetParameters(), req.GetParameters(),
req.GetSecrets(),
isMultiWriter && isBlock, isMultiWriter && isBlock,
false) false)
if err != nil { if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error()) return nil, status.Error(codes.InvalidArgument, err.Error())
} }
// if the KMS is of type VaultToken, additional metadata is needed
// depending on the tenant, the KMS can be configured with other
// options
// FIXME: this works only on Kubernetes, how do other CO supply metadata?
// namespace is derived from the `csi.storage.k8s.io/pvc/namespace`
// parameter.
// get the owner of the PVC which is required for few encryption related operations
rbdVol.Owner = k8s.GetOwner(req.GetParameters())
err = rbdVol.initKMS(ctx, req.GetParameters(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
rbdVol.RequestName = req.GetName() rbdVol.RequestName = req.GetName()
// Volume Size - Default is 1 GiB // Volume Size - Default is 1 GiB

View File

@ -265,22 +265,14 @@ func (ri *rbdImage) initKMS(ctx context.Context, volOptions, credentials map[str
} }
// ParseEncryptionOpts returns kmsID and sets Owner attribute. // ParseEncryptionOpts returns kmsID and sets Owner attribute.
func (ri *rbdImage) ParseEncryptionOpts(ctx context.Context, volOptions map[string]string) (string, error) { func (ri *rbdImage) ParseEncryptionOpts(
ctx context.Context,
volOptions map[string]string) (string, error) {
var ( var (
err error err error
ok bool ok bool
encrypted, kmsID string encrypted, kmsID string
) )
// if the KMS is of type VaultToken, additional metadata is needed
// depending on the tenant, the KMS can be configured with other
// options
// FIXME: this works only on Kubernetes, how do other CO supply metadata?
ri.Owner, ok = volOptions["csi.storage.k8s.io/pvc/namespace"]
if !ok {
log.DebugLog(ctx, "could not detect owner for %s", ri)
}
encrypted, ok = volOptions["encrypted"] encrypted, ok = volOptions["encrypted"]
if !ok { if !ok {
return "", nil return "", nil

View File

@ -147,8 +147,7 @@ func healerStageTransaction(ctx context.Context, cr *util.Credentials, volOps *r
func populateRbdVol( func populateRbdVol(
ctx context.Context, ctx context.Context,
req *csi.NodeStageVolumeRequest, req *csi.NodeStageVolumeRequest,
cr *util.Credentials, cr *util.Credentials) (*rbdVolume, error) {
secrets map[string]string) (*rbdVolume, error) {
var err error var err error
var j *journal.Connection var j *journal.Connection
volID := req.GetVolumeId() volID := req.GetVolumeId()
@ -173,7 +172,7 @@ func populateRbdVol(
disableInUseChecks = true disableInUseChecks = true
} }
rv, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), secrets, disableInUseChecks, true) rv, err := genVolFromVolumeOptions(ctx, req.GetVolumeContext(), disableInUseChecks, true)
if err != nil { if err != nil {
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
@ -213,6 +212,8 @@ func populateRbdVol(
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
rv.RbdImageName = imageAttributes.ImageName rv.RbdImageName = imageAttributes.ImageName
// set owner after extracting the owner name from the journal
rv.Owner = imageAttributes.Owner
} }
err = rv.Connect(cr) err = rv.Connect(cr)
@ -235,6 +236,11 @@ func populateRbdVol(
return nil, status.Error(codes.Internal, err.Error()) return nil, status.Error(codes.Internal, err.Error())
} }
err = rv.initKMS(ctx, req.GetVolumeContext(), req.GetSecrets())
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
if req.GetVolumeContext()["mounter"] == rbdDefaultMounter && if req.GetVolumeContext()["mounter"] == rbdDefaultMounter &&
!isKrbdFeatureSupported(ctx, strings.Join(rv.ImageFeatureSet.Names(), ",")) { !isKrbdFeatureSupported(ctx, strings.Join(rv.ImageFeatureSet.Names(), ",")) {
if !parseBoolOption(ctx, req.GetVolumeContext(), tryOtherMounters, false) { if !parseBoolOption(ctx, req.GetVolumeContext(), tryOtherMounters, false) {
@ -320,7 +326,7 @@ func (ns *NodeServer) NodeStageVolume(
} }
isStaticVol := parseBoolOption(ctx, req.GetVolumeContext(), staticVol, false) isStaticVol := parseBoolOption(ctx, req.GetVolumeContext(), staticVol, false)
rv, err := populateRbdVol(ctx, req, cr, req.GetSecrets()) rv, err := populateRbdVol(ctx, req, cr)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -531,7 +531,7 @@ func undoVolReservation(ctx context.Context, rbdVol *rbdVolume, cr *util.Credent
// which are not same across clusters. // which are not same across clusters.
func RegenerateJournal( func RegenerateJournal(
volumeAttributes map[string]string, volumeAttributes map[string]string,
volumeID, requestName string, volumeID, requestName, owner string,
cr *util.Credentials) (string, error) { cr *util.Credentials) (string, error) {
ctx := context.Background() ctx := context.Background()
var ( var (
@ -551,6 +551,8 @@ func RegenerateJournal(
ErrInvalidVolID, err, rbdVol.VolID) ErrInvalidVolID, err, rbdVol.VolID)
} }
rbdVol.Owner = owner
kmsID, err = rbdVol.ParseEncryptionOpts(ctx, volumeAttributes) kmsID, err = rbdVol.ParseEncryptionOpts(ctx, volumeAttributes)
if err != nil { if err != nil {
return "", err return "", err

View File

@ -1142,7 +1142,7 @@ func generateVolumeFromMapping(
func genVolFromVolumeOptions( func genVolFromVolumeOptions(
ctx context.Context, ctx context.Context,
volOptions, credentials map[string]string, volOptions map[string]string,
disableInUseChecks, checkClusterIDMapping bool) (*rbdVolume, error) { disableInUseChecks, checkClusterIDMapping bool) (*rbdVolume, error) {
var ( var (
ok bool ok bool
@ -1195,11 +1195,6 @@ func genVolFromVolumeOptions(
rbdVol.Mounter) rbdVol.Mounter)
rbdVol.DisableInUseChecks = disableInUseChecks rbdVol.DisableInUseChecks = disableInUseChecks
err = rbdVol.initKMS(ctx, volOptions, credentials)
if err != nil {
return nil, err
}
return rbdVol, nil return rbdVol, nil
} }