rebase: update replaced k8s.io modules to v0.33.0

Signed-off-by: Niels de Vos <ndevos@ibm.com>
This commit is contained in:
Niels de Vos
2025-05-07 13:13:33 +02:00
committed by mergify[bot]
parent dd77e72800
commit 107407b44b
1723 changed files with 65035 additions and 175239 deletions

View File

@ -20,8 +20,10 @@ import (
"sync"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
)
func newEventProcessor(out chan<- watch.Event) *eventProcessor {
@ -103,7 +105,16 @@ func (e *eventProcessor) stop() {
// NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
// it also returns a channel you can use to wait for the informers to fully shutdown.
//
// Contextual logging: NewIndexerInformerWatcherWithLogger should be used instead of NewIndexerInformerWatcher in code which supports contextual logging.
func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
return NewIndexerInformerWatcherWithLogger(klog.Background(), lw, objType)
}
// NewIndexerInformerWatcherWithLogger will create an IndexerInformer and wrap it into watch.Interface
// so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
// it also returns a channel you can use to wait for the informers to fully shutdown.
func NewIndexerInformerWatcherWithLogger(logger klog.Logger, lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
ch := make(chan watch.Event)
w := watch.NewProxyWatcher(ch)
e := newEventProcessor(ch)
@ -137,13 +148,18 @@ func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (
},
}, cache.Indexers{})
// This will get stopped, but without waiting for it.
go e.run()
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
defer e.stop()
informer.Run(w.StopChan())
// Waiting for w.StopChan() is the traditional behavior which gets
// preserved here, with the logger added to support contextual logging.
ctx := wait.ContextForChannel(w.StopChan())
ctx = klog.NewContext(ctx, logger)
informer.RunWithContext(ctx)
}()
return indexer, informer, w, doneCh

View File

@ -22,7 +22,6 @@ import (
"fmt"
"io"
"net/http"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
@ -48,23 +47,31 @@ type resourceVersionGetter interface {
// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to
// use Informers for that.
type RetryWatcher struct {
cancel func(error)
lastResourceVersion string
watcherClient cache.Watcher
watcherClient cache.WatcherWithContext
resultChan chan watch.Event
stopChan chan struct{}
doneChan chan struct{}
minRestartDelay time.Duration
stopChanLock sync.Mutex
}
// NewRetryWatcher creates a new RetryWatcher.
// It will make sure that watches gets restarted in case of recoverable errors.
// The initialResourceVersion will be given to watch method when first called.
//
// Deprecated: use NewRetryWatcherWithContext instead.
func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) {
return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second)
return NewRetryWatcherWithContext(context.Background(), initialResourceVersion, cache.ToWatcherWithContext(watcherClient))
}
func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
// NewRetryWatcherWithContext creates a new RetryWatcher.
// It will make sure that watches gets restarted in case of recoverable errors.
// The initialResourceVersion will be given to watch method when first called.
func NewRetryWatcherWithContext(ctx context.Context, initialResourceVersion string, watcherClient cache.WatcherWithContext) (*RetryWatcher, error) {
return newRetryWatcher(ctx, initialResourceVersion, watcherClient, 1*time.Second)
}
func newRetryWatcher(ctx context.Context, initialResourceVersion string, watcherClient cache.WatcherWithContext, minRestartDelay time.Duration) (*RetryWatcher, error) {
switch initialResourceVersion {
case "", "0":
// TODO: revisit this if we ever get WATCH v2 where it means start "now"
@ -74,34 +81,36 @@ func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher,
break
}
ctx, cancel := context.WithCancelCause(ctx)
rw := &RetryWatcher{
cancel: cancel,
lastResourceVersion: initialResourceVersion,
watcherClient: watcherClient,
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
resultChan: make(chan watch.Event, 0),
minRestartDelay: minRestartDelay,
}
go rw.receive()
go rw.receive(ctx)
return rw, nil
}
func (rw *RetryWatcher) send(event watch.Event) bool {
func (rw *RetryWatcher) send(ctx context.Context, event watch.Event) bool {
// Writing to an unbuffered channel is blocking operation
// and we need to check if stop wasn't requested while doing so.
select {
case rw.resultChan <- event:
return true
case <-rw.stopChan:
case <-ctx.Done():
return false
}
}
// doReceive returns true when it is done, false otherwise.
// If it is not done the second return value holds the time to wait before calling it again.
func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
func (rw *RetryWatcher) doReceive(ctx context.Context) (bool, time.Duration) {
watcher, err := rw.watcherClient.WatchWithContext(ctx, metav1.ListOptions{
ResourceVersion: rw.lastResourceVersion,
AllowWatchBookmarks: true,
})
@ -117,13 +126,13 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
return false, 0
case io.ErrUnexpectedEOF:
klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err)
klog.FromContext(ctx).V(1).Info("Watch closed with unexpected EOF", "err", err)
return false, 0
default:
msg := "Watch failed"
if net.IsProbableEOF(err) || net.IsTimeout(err) {
klog.V(5).InfoS(msg, "err", err)
klog.FromContext(ctx).V(5).Info(msg, "err", err)
// Retry
return false, 0
}
@ -132,38 +141,38 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
// being invalid (e.g. expired token).
if apierrors.IsForbidden(err) || apierrors.IsUnauthorized(err) {
// Add more detail since the forbidden message returned by the Kubernetes API is just "unknown".
klog.ErrorS(err, msg+": ensure the client has valid credentials and watch permissions on the resource")
klog.FromContext(ctx).Error(err, msg+": ensure the client has valid credentials and watch permissions on the resource")
if apiStatus, ok := err.(apierrors.APIStatus); ok {
statusErr := apiStatus.Status()
sent := rw.send(watch.Event{
sent := rw.send(ctx, watch.Event{
Type: watch.Error,
Object: &statusErr,
})
if !sent {
// This likely means the RetryWatcher is stopping but return false so the caller to doReceive can
// verify this and potentially retry.
klog.Error("Failed to send the Unauthorized or Forbidden watch event")
klog.FromContext(ctx).Error(nil, "Failed to send the Unauthorized or Forbidden watch event")
return false, 0
}
} else {
// This should never happen since apierrors only handles apierrors.APIStatus. Still, this is an
// unrecoverable error, so still allow it to return true below.
klog.ErrorS(err, msg+": encountered an unexpected Unauthorized or Forbidden error type")
klog.FromContext(ctx).Error(err, msg+": encountered an unexpected Unauthorized or Forbidden error type")
}
return true, 0
}
klog.ErrorS(err, msg)
klog.FromContext(ctx).Error(err, msg)
// Retry
return false, 0
}
if watcher == nil {
klog.ErrorS(nil, "Watch returned nil watcher")
klog.FromContext(ctx).Error(nil, "Watch returned nil watcher")
// Retry
return false, 0
}
@ -173,12 +182,12 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
for {
select {
case <-rw.stopChan:
klog.V(4).InfoS("Stopping RetryWatcher.")
case <-ctx.Done():
klog.FromContext(ctx).V(4).Info("Stopping RetryWatcher")
return true, 0
case event, ok := <-ch:
if !ok {
klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion)
klog.FromContext(ctx).V(4).Info("Failed to get event - re-creating the watcher", "resourceVersion", rw.lastResourceVersion)
return false, 0
}
@ -187,7 +196,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
metaObject, ok := event.Object.(resourceVersionGetter)
if !ok {
_ = rw.send(watch.Event{
_ = rw.send(ctx, watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus,
})
@ -197,7 +206,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
resourceVersion := metaObject.GetResourceVersion()
if resourceVersion == "" {
_ = rw.send(watch.Event{
_ = rw.send(ctx, watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus,
})
@ -207,7 +216,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
// All is fine; send the non-bookmark events and update resource version.
if event.Type != watch.Bookmark {
ok = rw.send(event)
ok = rw.send(ctx, event)
if !ok {
return true, 0
}
@ -221,7 +230,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
errObject := apierrors.FromObject(event.Object)
statusErr, ok := errObject.(*apierrors.StatusError)
if !ok {
klog.Error(fmt.Sprintf("Received an error which is not *metav1.Status but %s", dump.Pretty(event.Object)))
klog.FromContext(ctx).Error(nil, "Received an error which is not *metav1.Status", "errorObject", dump.Pretty(event.Object))
// Retry unknown errors
return false, 0
}
@ -236,7 +245,7 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
switch status.Code {
case http.StatusGone:
// Never retry RV too old errors
_ = rw.send(event)
_ = rw.send(ctx, event)
return true, 0
case http.StatusGatewayTimeout, http.StatusInternalServerError:
@ -250,15 +259,15 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
// Log here so we have a record of hitting the unexpected error
// and we can whitelist some error codes if we missed any that are expected.
klog.V(5).Info(fmt.Sprintf("Retrying after unexpected error: %s", dump.Pretty(event.Object)))
klog.FromContext(ctx).V(5).Info("Retrying after unexpected error", "errorObject", dump.Pretty(event.Object))
// Retry
return false, statusDelay
}
default:
klog.Errorf("Failed to recognize Event type %q", event.Type)
_ = rw.send(watch.Event{
klog.FromContext(ctx).Error(nil, "Failed to recognize event", "type", event.Type)
_ = rw.send(ctx, watch.Event{
Type: watch.Error,
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus,
})
@ -270,29 +279,21 @@ func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
}
// receive reads the result from a watcher, restarting it if necessary.
func (rw *RetryWatcher) receive() {
func (rw *RetryWatcher) receive(ctx context.Context) {
defer close(rw.doneChan)
defer close(rw.resultChan)
klog.V(4).Info("Starting RetryWatcher.")
defer klog.V(4).Info("Stopping RetryWatcher.")
logger := klog.FromContext(ctx)
logger.V(4).Info("Starting RetryWatcher")
defer logger.V(4).Info("Stopping RetryWatcher")
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-rw.stopChan:
cancel()
return
case <-ctx.Done():
return
}
}()
// We use non sliding until so we don't introduce delays on happy path when WATCH call
// timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
done, retryAfter := rw.doReceive()
done, retryAfter := rw.doReceive(ctx)
if done {
cancel()
return
@ -306,7 +307,7 @@ func (rw *RetryWatcher) receive() {
case <-timer.C:
}
klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
logger.V(4).Info("Restarting RetryWatcher", "resourceVersion", rw.lastResourceVersion)
}, rw.minRestartDelay)
}
@ -317,15 +318,7 @@ func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
// Stop implements Interface.
func (rw *RetryWatcher) Stop() {
rw.stopChanLock.Lock()
defer rw.stopChanLock.Unlock()
// Prevent closing an already closed channel to prevent a panic
select {
case <-rw.stopChan:
default:
close(rw.stopChan)
}
rw.cancel(errors.New("asked to stop"))
}
// Done allows the caller to be notified when Retry watcher stops.

View File

@ -105,7 +105,7 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions
//
// The most frequent usage for Until would be a test where you want to verify exact order of events ("edges").
func Until(ctx context.Context, initialResourceVersion string, watcherClient cache.Watcher, conditions ...ConditionFunc) (*watch.Event, error) {
w, err := NewRetryWatcher(initialResourceVersion, watcherClient)
w, err := NewRetryWatcherWithContext(ctx, initialResourceVersion, cache.ToWatcherWithContext(watcherClient))
if err != nil {
return nil, err
}
@ -126,7 +126,7 @@ func Until(ctx context.Context, initialResourceVersion string, watcherClient cac
// The most frequent usage would be a command that needs to watch the "state of the world" and should't fail, like:
// waiting for object reaching a state, "small" controllers, ...
func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.Object, precondition PreconditionFunc, conditions ...ConditionFunc) (*watch.Event, error) {
indexer, informer, watcher, done := NewIndexerInformerWatcher(lw, objType)
indexer, informer, watcher, done := NewIndexerInformerWatcherWithLogger(klog.FromContext(ctx), lw, objType)
// We need to wait for the internal informers to fully stop so it's easier to reason about
// and it works with non-thread safe clients.
defer func() { <-done }()
@ -156,7 +156,7 @@ func UntilWithSync(ctx context.Context, lw cache.ListerWatcher, objType runtime.
func ContextWithOptionalTimeout(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) {
if timeout < 0 {
// This should be handled in validation
klog.Errorf("Timeout for context shall not be negative!")
klog.FromContext(parent).Error(nil, "Timeout for context shall not be negative")
timeout = 0
}