rebase: update kubernetes to v1.25.0

update kubernetes to latest v1.25.0
release.

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2022-08-24 07:54:25 +05:30
committed by mergify[bot]
parent f47839d73d
commit e3bf375035
645 changed files with 42507 additions and 9219 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package watch
import (
"fmt"
"sync"
"k8s.io/apimachinery/pkg/runtime"
@ -44,8 +45,11 @@ type Broadcaster struct {
nextWatcher int64
distributing sync.WaitGroup
incoming chan Event
stopped chan struct{}
// incomingBlock allows us to ensure we don't race and end up sending events
// to a closed channel following a broadcaster shutdown.
incomingBlock sync.Mutex
incoming chan Event
stopped chan struct{}
// How large to make watcher's channel.
watchQueueLength int
@ -111,6 +115,8 @@ func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object {
// won't ever see that event, and will always see any event after they are
// added.
func (m *Broadcaster) blockQueue(f func()) {
m.incomingBlock.Lock()
defer m.incomingBlock.Unlock()
select {
case <-m.stopped:
return
@ -132,7 +138,7 @@ func (m *Broadcaster) blockQueue(f func()) {
// Note: new watchers will only receive new events. They won't get an entire history
// of previous events. It will block until the watcher is actually added to the
// broadcaster.
func (m *Broadcaster) Watch() Interface {
func (m *Broadcaster) Watch() (Interface, error) {
var w *broadcasterWatcher
m.blockQueue(func() {
id := m.nextWatcher
@ -146,11 +152,9 @@ func (m *Broadcaster) Watch() Interface {
m.watchers[id] = w
})
if w == nil {
// The panic here is to be consistent with the previous interface behavior
// we are willing to re-evaluate in the future.
panic("broadcaster already stopped")
return nil, fmt.Errorf("broadcaster already stopped")
}
return w
return w, nil
}
// WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends
@ -158,7 +162,7 @@ func (m *Broadcaster) Watch() Interface {
// The returned watch will have a queue length that is at least large enough to accommodate
// all of the items in queuedEvents. It will block until the watcher is actually added to
// the broadcaster.
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) (Interface, error) {
var w *broadcasterWatcher
m.blockQueue(func() {
id := m.nextWatcher
@ -179,11 +183,9 @@ func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface {
}
})
if w == nil {
// The panic here is to be consistent with the previous interface behavior
// we are willing to re-evaluate in the future.
panic("broadcaster already stopped")
return nil, fmt.Errorf("broadcaster already stopped")
}
return w
return w, nil
}
// stopWatching stops the given watcher and removes it from the list.
@ -210,19 +212,38 @@ func (m *Broadcaster) closeAll() {
}
// Action distributes the given event among all watchers.
func (m *Broadcaster) Action(action EventType, obj runtime.Object) {
func (m *Broadcaster) Action(action EventType, obj runtime.Object) error {
m.incomingBlock.Lock()
defer m.incomingBlock.Unlock()
select {
case <-m.stopped:
return fmt.Errorf("broadcaster already stopped")
default:
}
m.incoming <- Event{action, obj}
return nil
}
// Action distributes the given event among all watchers, or drops it on the floor
// if too many incoming actions are queued up. Returns true if the action was sent,
// false if dropped.
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) bool {
func (m *Broadcaster) ActionOrDrop(action EventType, obj runtime.Object) (bool, error) {
m.incomingBlock.Lock()
defer m.incomingBlock.Unlock()
// Ensure that if the broadcaster is stopped we do not send events to it.
select {
case <-m.stopped:
return false, fmt.Errorf("broadcaster already stopped")
default:
}
select {
case m.incoming <- Event{action, obj}:
return true
return true, nil
default:
return false
return false, nil
}
}