rebase: bump sigs.k8s.io/controller-runtime

Bumps the k8s-dependencies group with 1 update in the / directory: [sigs.k8s.io/controller-runtime](https://github.com/kubernetes-sigs/controller-runtime).


Updates `sigs.k8s.io/controller-runtime` from 0.19.4 to 0.20.1
- [Release notes](https://github.com/kubernetes-sigs/controller-runtime/releases)
- [Changelog](https://github.com/kubernetes-sigs/controller-runtime/blob/main/RELEASE.md)
- [Commits](https://github.com/kubernetes-sigs/controller-runtime/compare/v0.19.4...v0.20.1)

---
updated-dependencies:
- dependency-name: sigs.k8s.io/controller-runtime
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: k8s-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2025-02-03 07:59:16 +00:00
committed by mergify[bot]
parent c16633c82e
commit 76043afc7c
35 changed files with 2279 additions and 561 deletions

View File

@ -21,9 +21,11 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/uuid"
@ -169,43 +171,66 @@ func (c *Controller[request]) Start(ctx context.Context) error {
defer utilruntime.HandleCrash()
// NB(directxman12): launch the sources *before* trying to wait for the
// caches to sync so that they have a chance to register their intendeded
// caches to sync so that they have a chance to register their intended
// caches.
errGroup := &errgroup.Group{}
for _, watch := range c.startWatches {
c.LogConstructor(nil).Info("Starting EventSource", "source", fmt.Sprintf("%s", watch))
log := c.LogConstructor(nil)
_, ok := watch.(interface {
String() string
})
if err := watch.Start(ctx, c.Queue); err != nil {
return err
}
}
// Start the SharedIndexInformer factories to begin populating the SharedIndexInformer caches
c.LogConstructor(nil).Info("Starting Controller")
for _, watch := range c.startWatches {
syncingSource, ok := watch.(source.SyncingSource)
if !ok {
continue
log = log.WithValues("source", fmt.Sprintf("%T", watch))
} else {
log = log.WithValues("source", fmt.Sprintf("%s", watch))
}
if err := func() error {
// use a context with timeout for launching sources and syncing caches.
didStartSyncingSource := &atomic.Bool{}
errGroup.Go(func() error {
// Use a timeout for starting and syncing the source to avoid silently
// blocking startup indefinitely if it doesn't come up.
sourceStartCtx, cancel := context.WithTimeout(ctx, c.CacheSyncTimeout)
defer cancel()
// WaitForSync waits for a definitive timeout, and returns if there
// is an error or a timeout
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync: %w", c.Name, err)
c.LogConstructor(nil).Error(err, "Could not wait for Cache to sync")
return err
}
sourceStartErrChan := make(chan error, 1) // Buffer chan to not leak goroutine if we time out
go func() {
defer close(sourceStartErrChan)
log.Info("Starting EventSource")
if err := watch.Start(ctx, c.Queue); err != nil {
sourceStartErrChan <- err
return
}
syncingSource, ok := watch.(source.TypedSyncingSource[request])
if !ok {
return
}
didStartSyncingSource.Store(true)
if err := syncingSource.WaitForSync(sourceStartCtx); err != nil {
err := fmt.Errorf("failed to wait for %s caches to sync %v: %w", c.Name, syncingSource, err)
log.Error(err, "Could not wait for Cache to sync")
sourceStartErrChan <- err
}
}()
return nil
}(); err != nil {
return err
}
select {
case err := <-sourceStartErrChan:
return err
case <-sourceStartCtx.Done():
if didStartSyncingSource.Load() { // We are racing with WaitForSync, wait for it to let it tell us what happened
return <-sourceStartErrChan
}
if ctx.Err() != nil { // Don't return an error if the root context got cancelled
return nil
}
return fmt.Errorf("timed out waiting for source %s to Start. Please ensure that its Start() method is non-blocking", watch)
}
})
}
if err := errGroup.Wait(); err != nil {
return err
}
c.LogConstructor(nil).Info("Starting Controller")
// All the watches have been started, we can reset the local slice.
//
@ -311,7 +336,7 @@ func (c *Controller[request]) reconcileHandler(ctx context.Context, req request)
ctrlmetrics.ReconcileErrors.WithLabelValues(c.Name).Inc()
ctrlmetrics.ReconcileTotal.WithLabelValues(c.Name, labelError).Inc()
if !result.IsZero() {
log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes reqeueuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")
log.Info("Warning: Reconciler returned both a non-zero result and a non-nil error. The result will always be ignored if the error is non-nil and the non-nil error causes requeuing with exponential backoff. For more details, see: https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/reconcile#Reconciler")
}
log.Error(err, "Reconciler error")
case result.RequeueAfter > 0:

View File

@ -0,0 +1,131 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)
// This file is copied and adapted from k8s.io/component-base/metrics/prometheus/workqueue
// which registers metrics to the k8s legacy Registry. We require very
// similar functionality, but must register metrics to a different Registry.
// Metrics subsystem and all keys used by the workqueue.
const (
WorkQueueSubsystem = metrics.WorkQueueSubsystem
DepthKey = metrics.DepthKey
AddsKey = metrics.AddsKey
QueueLatencyKey = metrics.QueueLatencyKey
WorkDurationKey = metrics.WorkDurationKey
UnfinishedWorkKey = metrics.UnfinishedWorkKey
LongestRunningProcessorKey = metrics.LongestRunningProcessorKey
RetriesKey = metrics.RetriesKey
)
var (
depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
}, []string{"name", "controller"})
adds = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
}, []string{"name", "controller"})
latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name", "controller"})
workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name", "controller"})
unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has been done that " +
"is in progress and hasn't been observed by work_duration. Large " +
"values indicate stuck threads. One can deduce the number of stuck " +
"threads by observing the rate at which this increases.",
}, []string{"name", "controller"})
longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
}, []string{"name", "controller"})
retries = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
}, []string{"name", "controller"})
)
func init() {
metrics.Registry.MustRegister(depth)
metrics.Registry.MustRegister(adds)
metrics.Registry.MustRegister(latency)
metrics.Registry.MustRegister(workDuration)
metrics.Registry.MustRegister(unfinished)
metrics.Registry.MustRegister(longestRunningProcessor)
metrics.Registry.MustRegister(retries)
workqueue.SetProvider(WorkqueueMetricsProvider{})
}
type WorkqueueMetricsProvider struct{}
func (WorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
return depth.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
return adds.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
return latency.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
return workDuration.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
return unfinished.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
return longestRunningProcessor.WithLabelValues(name, name)
}
func (WorkqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
return retries.WithLabelValues(name, name)
}

View File

@ -52,7 +52,7 @@ func (ks *Kind[object, request]) Start(ctx context.Context, queue workqueue.Type
// cache.GetInformer will block until its context is cancelled if the cache was already started and it can not
// sync that informer (most commonly due to RBAC issues).
ctx, ks.startCancel = context.WithCancel(ctx)
ks.startedErr = make(chan error)
ks.startedErr = make(chan error, 1) // Buffer chan to not leak goroutines if WaitForSync isn't called
go func() {
var (
i cache.Informer