rbd: take lock when reconciling the PV

there can be a change we can reconcile same
PV parallelly we can endup in generating and
deleting multiple omap keys. to be on safer
side taking lock to process one volumeHandle
at a time.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna 2021-04-06 21:08:37 +05:30 committed by mergify[bot]
parent 0f8813d89f
commit e2fa84357a

View File

@ -41,6 +41,7 @@ import (
type ReconcilePersistentVolume struct { type ReconcilePersistentVolume struct {
client client.Client client client.Client
config ctrl.Config config ctrl.Config
Locks *util.VolumeLocks
} }
var _ reconcile.Reconciler = &ReconcilePersistentVolume{} var _ reconcile.Reconciler = &ReconcilePersistentVolume{}
@ -62,6 +63,7 @@ func newPVReconciler(mgr manager.Manager, config ctrl.Config) reconcile.Reconcil
r := &ReconcilePersistentVolume{ r := &ReconcilePersistentVolume{
client: mgr.GetClient(), client: mgr.GetClient(),
config: config, config: config,
Locks: util.NewVolumeLocks(),
} }
return r return r
} }
@ -174,6 +176,12 @@ func (r ReconcilePersistentVolume) reconcilePV(obj runtime.Object) error {
secretNamespace = pv.Spec.CSI.NodeStageSecretRef.Namespace secretNamespace = pv.Spec.CSI.NodeStageSecretRef.Namespace
} }
// Take lock to process only one volumeHandle at a time.
if ok := r.Locks.TryAcquire(pv.Spec.CSI.VolumeHandle); !ok {
return fmt.Errorf(util.VolumeOperationAlreadyExistsFmt, pv.Spec.CSI.VolumeHandle)
}
defer r.Locks.Release(pv.Spec.CSI.VolumeHandle)
cr, err := r.getCredentials(secretName, secretNamespace) cr, err := r.getCredentials(secretName, secretNamespace)
if err != nil { if err != nil {
util.ErrorLogMsg("failed to get credentials from secret %s", err) util.ErrorLogMsg("failed to get credentials from secret %s", err)