rbd: improve healer to run multiple NodeStageVolume req concurrently

This will bring down the healer run time by a great factor.

Signed-off-by: Prasanna Kumar Kalever <prasanna.kalever@redhat.com>
This commit is contained in:
Prasanna Kumar Kalever 2021-05-31 16:59:48 +05:30 committed by mergify[bot]
parent b6a88dd728
commit 78f740d903

View File

@ -18,6 +18,7 @@ package rbd
import ( import (
"context" "context"
"sync"
"github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util"
@ -127,6 +128,8 @@ func runVolumeHealer(ns *NodeServer, conf *util.Config) error {
return err return err
} }
var wg sync.WaitGroup
channel := make(chan error)
for i := range val.Items { for i := range val.Items {
// skip if the volumeattachments doesn't belong to current node or driver // skip if the volumeattachments doesn't belong to current node or driver
if val.Items[i].Spec.NodeName != conf.NodeID || val.Items[i].Spec.Attacher != conf.DriverName { if val.Items[i].Spec.NodeName != conf.NodeID || val.Items[i].Spec.Attacher != conf.DriverName {
@ -165,10 +168,22 @@ func runVolumeHealer(ns *NodeServer, conf *util.Config) error {
continue continue
} }
err = callNodeStageVolume(ns, c, pv, conf.StagingPath) wg.Add(1)
if err != nil { // run multiple NodeStageVolume calls concurrently
util.ErrorLogMsg("callNodeStageVolume failed for VolID: %s, err: %v", go func(wg *sync.WaitGroup, ns *NodeServer, c *k8s.Clientset, pv *v1.PersistentVolume, stagingPath string) {
pv.Spec.PersistentVolumeSource.CSI.VolumeHandle, err) defer wg.Done()
channel <- callNodeStageVolume(ns, c, pv, stagingPath)
}(&wg, ns, c, pv, conf.StagingPath)
}
go func() {
wg.Wait()
close(channel)
}()
for s := range channel {
if s != nil {
util.ErrorLogMsg("callNodeStageVolume failed, err: %v", s)
} }
} }