diff --git a/internal/rbd/rbd_healer.go b/internal/rbd/rbd_healer.go index 25ae0eebc..d15cb9875 100644 --- a/internal/rbd/rbd_healer.go +++ b/internal/rbd/rbd_healer.go @@ -18,6 +18,7 @@ package rbd import ( "context" + "sync" "github.com/ceph/ceph-csi/internal/util" @@ -127,6 +128,8 @@ func runVolumeHealer(ns *NodeServer, conf *util.Config) error { return err } + var wg sync.WaitGroup + channel := make(chan error) for i := range val.Items { // skip if the volumeattachments doesn't belong to current node or driver if val.Items[i].Spec.NodeName != conf.NodeID || val.Items[i].Spec.Attacher != conf.DriverName { @@ -165,10 +168,22 @@ func runVolumeHealer(ns *NodeServer, conf *util.Config) error { continue } - err = callNodeStageVolume(ns, c, pv, conf.StagingPath) - if err != nil { - util.ErrorLogMsg("callNodeStageVolume failed for VolID: %s, err: %v", - pv.Spec.PersistentVolumeSource.CSI.VolumeHandle, err) + wg.Add(1) + // run multiple NodeStageVolume calls concurrently + go func(wg *sync.WaitGroup, ns *NodeServer, c *k8s.Clientset, pv *v1.PersistentVolume, stagingPath string) { + defer wg.Done() + channel <- callNodeStageVolume(ns, c, pv, stagingPath) + }(&wg, ns, c, pv, conf.StagingPath) + } + + go func() { + wg.Wait() + close(channel) + }() + + for s := range channel { + if s != nil { + util.ErrorLogMsg("callNodeStageVolume failed, err: %v", s) } }