rebase: update kubernetes and libraries to v1.22.0 version

Kubernetes v1.22 version has been released and this update
ceph csi dependencies to use the same version.

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal
2021-08-09 12:49:24 +05:30
committed by mergify[bot]
parent e077c1fdf5
commit aa698bc3e1
759 changed files with 61864 additions and 6514 deletions

View File

@ -20,7 +20,6 @@ reviewers:
- caesarxuchao
- mikedanese
- liggitt
- erictune
- davidopp
- pmorie
- janetkuo
@ -31,7 +30,6 @@ reviewers:
- hongchaodeng
- krousey
- xiang90
- mml
- ingvagabund
- resouer
- jessfraz

View File

@ -325,7 +325,7 @@ func NewInformer(
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}
// NewIndexerInformer returns a Indexer and a controller for populating the index
// NewIndexerInformer returns an Indexer and a Controller for populating the index
// while also providing event notifications. You should only used the returned
// Index for Get/List operations; Add/Modify/Deletes will cause the event
// notifications to be faulty.

View File

@ -153,7 +153,7 @@ const (
// change happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
// state of the object before it was deleted.
type Delta struct {
Type DeltaType
Object interface{}
@ -174,9 +174,10 @@ type Deltas []Delta
// modifications.
//
// TODO: consider merging keyLister with this object, tracking a list of
// "known" keys when Pop() is called. Have to think about how that
// affects error retrying.
// NOTE: It is possible to misuse this and cause a race when using an
// "known" keys when Pop() is called. Have to think about how that
// affects error retrying.
//
// NOTE: It is possible to misuse this and cause a race when using an
// external known object source.
// Whether there is a potential race depends on how the consumer
// modifies knownObjects. In Pop(), process function is called under
@ -185,8 +186,7 @@ type Deltas []Delta
//
// Example:
// In case of sharedIndexInformer being a consumer
// (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/
// src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
// (https://github.com/kubernetes/kubernetes/blob/0cdd940f/staging/src/k8s.io/client-go/tools/cache/shared_informer.go#L192),
// there is no race as knownObjects (s.indexer) is modified safely
// under DeltaFIFO's lock. The only exceptions are GetStore() and
// GetIndexer() methods, which expose ways to modify the underlying
@ -340,7 +340,7 @@ func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
if !ok {
return fmt.Errorf("object must be of type deltas, but got: %#v", obj)
}
id, err := f.KeyOf(deltas.Newest().Object)
id, err := f.KeyOf(deltas)
if err != nil {
return KeyError{obj, err}
}
@ -373,13 +373,8 @@ func dedupDeltas(deltas Deltas) Deltas {
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
// `a` and `b` are duplicates. Only keep the one returned from isDup().
// TODO: This extra array allocation and copy seems unnecessary if
// all we do to dedup is compare the new delta with the last element
// in `items`, which could be done by mutating `items` directly.
// Might be worth profiling and investigating if it is safe to optimize.
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
deltas[n-2] = *out
return deltas[:n-1]
}
return deltas
}

View File

@ -263,10 +263,7 @@ func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
func (f *FIFO) IsClosed() bool {
f.lock.Lock()
defer f.lock.Unlock()
if f.closed {
return true
}
return false
return f.closed
}
// Pop waits until an item is ready and processes it. If multiple items are

View File

@ -304,10 +304,7 @@ func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
func (h *Heap) IsClosed() bool {
h.lock.RLock()
defer h.lock.RUnlock()
if h.closed {
return true
}
return false
return h.closed
}
// NewHeap returns a Heap which can be used to queue up items to process.

View File

@ -99,7 +99,7 @@ func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
for {
if d.lastRotated.IsZero() {
d.lastRotated = time.Now()
} else if time.Now().Sub(d.lastRotated) > d.retainDuration {
} else if time.Since(d.lastRotated) > d.retainDuration {
d.retainedCachedObjs = d.cachedObjs
d.cachedObjs = nil
d.lastRotated = time.Now()

View File

@ -417,7 +417,8 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case begin exponentially backing off and resend watch request.
if utilnet.IsConnectionRefused(err) {
// Do the same for "429" errors.
if utilnet.IsConnectionRefused(err) || apierrors.IsTooManyRequests(err) {
<-r.initConnBackoffManager.Backoff().C()
continue
}
@ -432,6 +433,10 @@ func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
// has a semantic that it returns data at least as fresh as provided RV.
// So first try to LIST with setting RV to resource version of last observed object.
klog.V(4).Infof("%s: watch of %v closed with: %v", r.name, r.expectedTypeName, err)
case apierrors.IsTooManyRequests(err):
klog.V(2).Infof("%s: watch of %v returned 429 - backing off", r.name, r.expectedTypeName)
<-r.initConnBackoffManager.Backoff().C()
continue
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}

View File

@ -85,6 +85,11 @@ func (k KeyError) Error() string {
return fmt.Sprintf("couldn't create key for object %+v: %v", k.Obj, k.Err)
}
// Unwrap implements errors.Unwrap
func (k KeyError) Unwrap() error {
return k.Err
}
// ExplicitKey can be passed to MetaNamespaceKeyFunc if you have the key for
// the object but not the object itself.
type ExplicitKey string

View File

@ -245,6 +245,33 @@ type ExecConfig struct {
// to be stored directly in the kubeconfig.
// +k8s:conversion-gen=false
Config runtime.Object
// InteractiveMode determines this plugin's relationship with standard input. Valid
// values are "Never" (this exec plugin never uses standard input), "IfAvailable" (this
// exec plugin wants to use standard input if it is available), or "Always" (this exec
// plugin requires standard input to function). See ExecInteractiveMode values for more
// details.
//
// If APIVersion is client.authentication.k8s.io/v1alpha1 or
// client.authentication.k8s.io/v1beta1, then this field is optional and defaults
// to "IfAvailable" when unset. Otherwise, this field is required.
// +optional
InteractiveMode ExecInteractiveMode
// StdinUnavailable indicates whether the exec authenticator can pass standard
// input through to this exec plugin. For example, a higher level entity might be using
// standard input for something else and therefore it would not be safe for the exec
// plugin to use standard input. This is kept here in order to keep all of the exec configuration
// together, but it is never serialized.
// +k8s:conversion-gen=false
StdinUnavailable bool
// StdinUnavailableMessage is an optional message to be displayed when the exec authenticator
// cannot successfully run this exec plugin because it needs to use standard input and
// StdinUnavailable is true. For example, a process that is already using standard input to
// read user instructions might set this to "used by my-program to read user instructions".
// +k8s:conversion-gen=false
StdinUnavailableMessage string
}
var _ fmt.Stringer = new(ExecConfig)
@ -271,7 +298,7 @@ func (c ExecConfig) String() string {
if c.Config != nil {
config = "runtime.Object(--- REDACTED ---)"
}
return fmt.Sprintf("api.ExecConfig{Command: %q, Args: %#v, Env: %s, APIVersion: %q, ProvideClusterInfo: %t, Config: %s}", c.Command, args, env, c.APIVersion, c.ProvideClusterInfo, config)
return fmt.Sprintf("api.ExecConfig{Command: %q, Args: %#v, Env: %s, APIVersion: %q, ProvideClusterInfo: %t, Config: %s, StdinUnavailable: %t}", c.Command, args, env, c.APIVersion, c.ProvideClusterInfo, config, c.StdinUnavailable)
}
// ExecEnvVar is used for setting environment variables when executing an exec-based
@ -281,6 +308,26 @@ type ExecEnvVar struct {
Value string `json:"value"`
}
// ExecInteractiveMode is a string that describes an exec plugin's relationship with standard input.
type ExecInteractiveMode string
const (
// NeverExecInteractiveMode declares that this exec plugin never needs to use standard
// input, and therefore the exec plugin will be run regardless of whether standard input is
// available for user input.
NeverExecInteractiveMode ExecInteractiveMode = "Never"
// IfAvailableExecInteractiveMode declares that this exec plugin would like to use standard input
// if it is available, but can still operate if standard input is not available. Therefore, the
// exec plugin will be run regardless of whether stdin is available for user input. If standard
// input is available for user input, then it will be provided to this exec plugin.
IfAvailableExecInteractiveMode ExecInteractiveMode = "IfAvailable"
// AlwaysExecInteractiveMode declares that this exec plugin requires standard input in order to
// run, and therefore the exec plugin will only be run if standard input is available for user
// input. If standard input is not available for user input, then the exec plugin will not be run
// and an error will be returned by the exec plugin runner.
AlwaysExecInteractiveMode ExecInteractiveMode = "Always"
)
// NewConfig is a convenience function that returns a new Config object with non-nil maps
func NewConfig() *Config {
return &Config{

View File

@ -0,0 +1,37 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1
import (
"k8s.io/apimachinery/pkg/runtime"
)
func addDefaultingFuncs(scheme *runtime.Scheme) error {
return RegisterDefaults(scheme)
}
func SetDefaults_ExecConfig(exec *ExecConfig) {
if len(exec.InteractiveMode) == 0 {
switch exec.APIVersion {
case "client.authentication.k8s.io/v1beta1", "client.authentication.k8s.io/v1alpha1":
// default to IfAvailableExecInteractiveMode for backwards compatibility
exec.InteractiveMode = IfAvailableExecInteractiveMode
default:
// require other versions to explicitly declare whether they want stdin or not
}
}
}

View File

@ -16,5 +16,6 @@ limitations under the License.
// +k8s:conversion-gen=k8s.io/client-go/tools/clientcmd/api
// +k8s:deepcopy-gen=package
// +k8s:defaulter-gen=Kind
package v1

View File

@ -37,7 +37,7 @@ func init() {
// We only register manually written functions here. The registration of the
// generated functions takes place in the generated files. The separation
// makes the code compile even when the generated files are missing.
localSchemeBuilder.Register(addKnownTypes)
localSchemeBuilder.Register(addKnownTypes, addDefaultingFuncs)
}
func addKnownTypes(scheme *runtime.Scheme) error {

View File

@ -221,6 +221,18 @@ type ExecConfig struct {
// to false. Package k8s.io/client-go/tools/auth/exec provides helper methods for
// reading this environment variable.
ProvideClusterInfo bool `json:"provideClusterInfo"`
// InteractiveMode determines this plugin's relationship with standard input. Valid
// values are "Never" (this exec plugin never uses standard input), "IfAvailable" (this
// exec plugin wants to use standard input if it is available), or "Always" (this exec
// plugin requires standard input to function). See ExecInteractiveMode values for more
// details.
//
// If APIVersion is client.authentication.k8s.io/v1alpha1 or
// client.authentication.k8s.io/v1beta1, then this field is optional and defaults
// to "IfAvailable" when unset. Otherwise, this field is required.
//+optional
InteractiveMode ExecInteractiveMode `json:"interactiveMode,omitempty"`
}
// ExecEnvVar is used for setting environment variables when executing an exec-based
@ -229,3 +241,23 @@ type ExecEnvVar struct {
Name string `json:"name"`
Value string `json:"value"`
}
// ExecInteractiveMode is a string that describes an exec plugin's relationship with standard input.
type ExecInteractiveMode string
const (
// NeverExecInteractiveMode declares that this exec plugin never needs to use standard
// input, and therefore the exec plugin will be run regardless of whether standard input is
// available for user input.
NeverExecInteractiveMode ExecInteractiveMode = "Never"
// IfAvailableExecInteractiveMode declares that this exec plugin would like to use standard input
// if it is available, but can still operate if standard input is not available. Therefore, the
// exec plugin will be run regardless of whether stdin is available for user input. If standard
// input is available for user input, then it will be provided to this exec plugin.
IfAvailableExecInteractiveMode ExecInteractiveMode = "IfAvailable"
// AlwaysExecInteractiveMode declares that this exec plugin requires standard input in order to
// run, and therefore the exec plugin will only be run if standard input is available for user
// input. If standard input is not available for user input, then the exec plugin will not be run
// and an error will be returned by the exec plugin runner.
AlwaysExecInteractiveMode ExecInteractiveMode = "Always"
)

View File

@ -376,6 +376,7 @@ func autoConvert_v1_ExecConfig_To_api_ExecConfig(in *ExecConfig, out *api.ExecCo
out.APIVersion = in.APIVersion
out.InstallHint = in.InstallHint
out.ProvideClusterInfo = in.ProvideClusterInfo
out.InteractiveMode = api.ExecInteractiveMode(in.InteractiveMode)
return nil
}
@ -392,6 +393,9 @@ func autoConvert_api_ExecConfig_To_v1_ExecConfig(in *api.ExecConfig, out *ExecCo
out.InstallHint = in.InstallHint
out.ProvideClusterInfo = in.ProvideClusterInfo
// INFO: in.Config opted out of conversion generation
out.InteractiveMode = ExecInteractiveMode(in.InteractiveMode)
// INFO: in.StdinUnavailable opted out of conversion generation
// INFO: in.StdinUnavailableMessage opted out of conversion generation
return nil
}

View File

@ -0,0 +1,42 @@
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by defaulter-gen. DO NOT EDIT.
package v1
import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// RegisterDefaults adds defaulters functions to the given scheme.
// Public to allow building arbitrary schemes.
// All generated defaulters are covering - they call all nested defaulters.
func RegisterDefaults(scheme *runtime.Scheme) error {
scheme.AddTypeDefaultingFunc(&Config{}, func(obj interface{}) { SetObjectDefaults_Config(obj.(*Config)) })
return nil
}
func SetObjectDefaults_Config(in *Config) {
for i := range in.AuthInfos {
a := &in.AuthInfos[i]
if a.AuthInfo.Exec != nil {
SetDefaults_ExecConfig(a.AuthInfo.Exec)
}
}
}

View File

@ -135,11 +135,7 @@ func (o *PathOptions) GetDefaultFilename() string {
}
func (o *PathOptions) IsExplicitFile() bool {
if len(o.LoadingRules.ExplicitPath) > 0 {
return true
}
return false
return len(o.LoadingRules.ExplicitPath) > 0
}
func (o *PathOptions) GetExplicitFile() string {

View File

@ -308,6 +308,14 @@ func validateAuthInfo(authInfoName string, authInfo clientcmdapi.AuthInfo) []err
validationErrors = append(validationErrors, fmt.Errorf("env variable name must be specified for %v to use exec authentication plugin", authInfoName))
}
}
switch authInfo.Exec.InteractiveMode {
case "":
validationErrors = append(validationErrors, fmt.Errorf("interactiveMode must be specified for %v to use exec authentication plugin", authInfoName))
case clientcmdapi.NeverExecInteractiveMode, clientcmdapi.IfAvailableExecInteractiveMode, clientcmdapi.AlwaysExecInteractiveMode:
// These are valid
default:
validationErrors = append(validationErrors, fmt.Errorf("invalid interactiveMode for %v: %q", authInfoName, authInfo.Exec.InteractiveMode))
}
}
// authPath also provides information for the client to identify the server, so allow multiple auth methods in that case

10
vendor/k8s.io/client-go/tools/events/OWNERS generated vendored Normal file
View File

@ -0,0 +1,10 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- sig-instrumentation-approvers
- yastij
- wojtek-t
reviewers:
- sig-instrumentation-reviewers
- yastij
- wojtek-t

19
vendor/k8s.io/client-go/tools/events/doc.go generated vendored Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package events has all client logic for recording and reporting
// "k8s.io/api/events/v1beta1".Event events.
package events // import "k8s.io/client-go/tools/events"

View File

@ -0,0 +1,384 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"context"
"fmt"
"os"
"sync"
"time"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/json"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedv1core "k8s.io/client-go/kubernetes/typed/core/v1"
typedeventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/tools/record/util"
"k8s.io/klog/v2"
)
const (
maxTriesPerEvent = 12
finishTime = 6 * time.Minute
refreshTime = 30 * time.Minute
maxQueuedEvents = 1000
)
var defaultSleepDuration = 10 * time.Second
// TODO: validate impact of copying and investigate hashing
type eventKey struct {
action string
reason string
reportingController string
regarding corev1.ObjectReference
related corev1.ObjectReference
}
type eventBroadcasterImpl struct {
*watch.Broadcaster
mu sync.Mutex
eventCache map[eventKey]*eventsv1.Event
sleepDuration time.Duration
sink EventSink
}
// EventSinkImpl wraps EventsV1Interface to implement EventSink.
// TODO: this makes it easier for testing purpose and masks the logic of performing API calls.
// Note that rollbacking to raw clientset should also be transparent.
type EventSinkImpl struct {
Interface typedeventsv1.EventsV1Interface
}
// Create takes the representation of a event and creates it. Returns the server's representation of the event, and an error, if there is any.
func (e *EventSinkImpl) Create(event *eventsv1.Event) (*eventsv1.Event, error) {
if event.Namespace == "" {
return nil, fmt.Errorf("can't create an event with empty namespace")
}
return e.Interface.Events(event.Namespace).Create(context.TODO(), event, metav1.CreateOptions{})
}
// Update takes the representation of a event and updates it. Returns the server's representation of the event, and an error, if there is any.
func (e *EventSinkImpl) Update(event *eventsv1.Event) (*eventsv1.Event, error) {
if event.Namespace == "" {
return nil, fmt.Errorf("can't update an event with empty namespace")
}
return e.Interface.Events(event.Namespace).Update(context.TODO(), event, metav1.UpdateOptions{})
}
// Patch applies the patch and returns the patched event, and an error, if there is any.
func (e *EventSinkImpl) Patch(event *eventsv1.Event, data []byte) (*eventsv1.Event, error) {
if event.Namespace == "" {
return nil, fmt.Errorf("can't patch an event with empty namespace")
}
return e.Interface.Events(event.Namespace).Patch(context.TODO(), event.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
}
// NewBroadcaster Creates a new event broadcaster.
func NewBroadcaster(sink EventSink) EventBroadcaster {
return newBroadcaster(sink, defaultSleepDuration, map[eventKey]*eventsv1.Event{})
}
// NewBroadcasterForTest Creates a new event broadcaster for test purposes.
func newBroadcaster(sink EventSink, sleepDuration time.Duration, eventCache map[eventKey]*eventsv1.Event) EventBroadcaster {
return &eventBroadcasterImpl{
Broadcaster: watch.NewBroadcaster(maxQueuedEvents, watch.DropIfChannelFull),
eventCache: eventCache,
sleepDuration: sleepDuration,
sink: sink,
}
}
func (e *eventBroadcasterImpl) Shutdown() {
e.Broadcaster.Shutdown()
}
// refreshExistingEventSeries refresh events TTL
func (e *eventBroadcasterImpl) refreshExistingEventSeries() {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
if event.Series != nil {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
if recordedEvent != nil {
e.eventCache[isomorphicKey] = recordedEvent
}
}
}
}
}
// finishSeries checks if a series has ended and either:
// - write final count to the apiserver
// - delete a singleton event (i.e. series field is nil) from the cache
func (e *eventBroadcasterImpl) finishSeries() {
// TODO: Investigate whether lock contention won't be a problem
e.mu.Lock()
defer e.mu.Unlock()
for isomorphicKey, event := range e.eventCache {
eventSerie := event.Series
if eventSerie != nil {
if eventSerie.LastObservedTime.Time.Before(time.Now().Add(-finishTime)) {
if _, retry := recordEvent(e.sink, event); !retry {
delete(e.eventCache, isomorphicKey)
}
}
} else if event.EventTime.Time.Before(time.Now().Add(-finishTime)) {
delete(e.eventCache, isomorphicKey)
}
}
}
// NewRecorder returns an EventRecorder that records events with the given event source.
func (e *eventBroadcasterImpl) NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder {
hostname, _ := os.Hostname()
reportingInstance := reportingController + "-" + hostname
return &recorderImpl{scheme, reportingController, reportingInstance, e.Broadcaster, clock.RealClock{}}
}
func (e *eventBroadcasterImpl) recordToSink(event *eventsv1.Event, clock clock.Clock) {
// Make a copy before modification, because there could be multiple listeners.
eventCopy := event.DeepCopy()
go func() {
evToRecord := func() *eventsv1.Event {
e.mu.Lock()
defer e.mu.Unlock()
eventKey := getKey(eventCopy)
isomorphicEvent, isIsomorphic := e.eventCache[eventKey]
if isIsomorphic {
if isomorphicEvent.Series != nil {
isomorphicEvent.Series.Count++
isomorphicEvent.Series.LastObservedTime = metav1.MicroTime{Time: clock.Now()}
return nil
}
isomorphicEvent.Series = &eventsv1.EventSeries{
Count: 1,
LastObservedTime: metav1.MicroTime{Time: clock.Now()},
}
return isomorphicEvent
}
e.eventCache[eventKey] = eventCopy
return eventCopy
}()
if evToRecord != nil {
recordedEvent := e.attemptRecording(evToRecord)
if recordedEvent != nil {
recordedEventKey := getKey(recordedEvent)
e.mu.Lock()
defer e.mu.Unlock()
e.eventCache[recordedEventKey] = recordedEvent
}
}
}()
}
func (e *eventBroadcasterImpl) attemptRecording(event *eventsv1.Event) *eventsv1.Event {
tries := 0
for {
if recordedEvent, retry := recordEvent(e.sink, event); !retry {
return recordedEvent
}
tries++
if tries >= maxTriesPerEvent {
klog.Errorf("Unable to write event '%#v' (retry limit exceeded!)", event)
return nil
}
// Randomize sleep so that various clients won't all be
// synced up if the master goes down.
time.Sleep(wait.Jitter(e.sleepDuration, 0.25))
}
}
func recordEvent(sink EventSink, event *eventsv1.Event) (*eventsv1.Event, bool) {
var newEvent *eventsv1.Event
var err error
isEventSeries := event.Series != nil
if isEventSeries {
patch, patchBytesErr := createPatchBytesForSeries(event)
if patchBytesErr != nil {
klog.Errorf("Unable to calculate diff, no merge is possible: %v", patchBytesErr)
return nil, false
}
newEvent, err = sink.Patch(event, patch)
}
// Update can fail because the event may have been removed and it no longer exists.
if !isEventSeries || (isEventSeries && util.IsKeyNotFoundError(err)) {
// Making sure that ResourceVersion is empty on creation
event.ResourceVersion = ""
newEvent, err = sink.Create(event)
}
if err == nil {
return newEvent, false
}
// If we can't contact the server, then hold everything while we keep trying.
// Otherwise, something about the event is malformed and we should abandon it.
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
klog.Errorf("Unable to construct event '%#v': '%v' (will not retry!)", event, err)
return nil, false
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
klog.V(5).Infof("Server rejected event '%#v': '%v' (will not retry!)", event, err)
} else {
klog.Errorf("Server rejected event '%#v': '%v' (will not retry!)", event, err)
}
return nil, false
case *errors.UnexpectedObjectError:
// We don't expect this; it implies the server's response didn't match a
// known pattern. Go ahead and retry.
default:
// This case includes actual http transport errors. Go ahead and retry.
}
klog.Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
return nil, true
}
func createPatchBytesForSeries(event *eventsv1.Event) ([]byte, error) {
oldEvent := event.DeepCopy()
oldEvent.Series = nil
oldData, err := json.Marshal(oldEvent)
if err != nil {
return nil, err
}
newData, err := json.Marshal(event)
if err != nil {
return nil, err
}
return strategicpatch.CreateTwoWayMergePatch(oldData, newData, eventsv1.Event{})
}
func getKey(event *eventsv1.Event) eventKey {
key := eventKey{
action: event.Action,
reason: event.Reason,
reportingController: event.ReportingController,
regarding: event.Regarding,
}
if event.Related != nil {
key.related = *event.Related
}
return key
}
// StartEventWatcher starts sending events received from this EventBroadcaster to the given event handler function.
// The return value is used to stop recording
func (e *eventBroadcasterImpl) StartEventWatcher(eventHandler func(event runtime.Object)) func() {
watcher := e.Watch()
go func() {
defer utilruntime.HandleCrash()
for {
watchEvent, ok := <-watcher.ResultChan()
if !ok {
return
}
eventHandler(watchEvent.Object)
}
}()
return watcher.Stop
}
func (e *eventBroadcasterImpl) startRecordingEvents(stopCh <-chan struct{}) {
eventHandler := func(obj runtime.Object) {
event, ok := obj.(*eventsv1.Event)
if !ok {
klog.Errorf("unexpected type, expected eventsv1.Event")
return
}
e.recordToSink(event, clock.RealClock{})
}
stopWatcher := e.StartEventWatcher(eventHandler)
go func() {
<-stopCh
stopWatcher()
}()
}
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
go wait.Until(e.refreshExistingEventSeries, refreshTime, stopCh)
go wait.Until(e.finishSeries, finishTime, stopCh)
e.startRecordingEvents(stopCh)
}
type eventBroadcasterAdapterImpl struct {
coreClient typedv1core.EventsGetter
coreBroadcaster record.EventBroadcaster
eventsv1Client typedeventsv1.EventsV1Interface
eventsv1Broadcaster EventBroadcaster
}
// NewEventBroadcasterAdapter creates a wrapper around new and legacy broadcasters to simplify
// migration of individual components to the new Event API.
func NewEventBroadcasterAdapter(client clientset.Interface) EventBroadcasterAdapter {
eventClient := &eventBroadcasterAdapterImpl{}
if _, err := client.Discovery().ServerResourcesForGroupVersion(eventsv1.SchemeGroupVersion.String()); err == nil {
eventClient.eventsv1Client = client.EventsV1()
eventClient.eventsv1Broadcaster = NewBroadcaster(&EventSinkImpl{Interface: eventClient.eventsv1Client})
}
// Even though there can soon exist cases when coreBroadcaster won't really be needed,
// we create it unconditionally because its overhead is minor and will simplify using usage
// patterns of this library in all components.
eventClient.coreClient = client.CoreV1()
eventClient.coreBroadcaster = record.NewBroadcaster()
return eventClient
}
// StartRecordingToSink starts sending events received from the specified eventBroadcaster to the given sink.
func (e *eventBroadcasterAdapterImpl) StartRecordingToSink(stopCh <-chan struct{}) {
if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
e.eventsv1Broadcaster.StartRecordingToSink(stopCh)
}
if e.coreBroadcaster != nil && e.coreClient != nil {
e.coreBroadcaster.StartRecordingToSink(&typedv1core.EventSinkImpl{Interface: e.coreClient.Events("")})
}
}
func (e *eventBroadcasterAdapterImpl) NewRecorder(name string) EventRecorder {
if e.eventsv1Broadcaster != nil && e.eventsv1Client != nil {
return e.eventsv1Broadcaster.NewRecorder(scheme.Scheme, name)
}
return record.NewEventRecorderAdapter(e.DeprecatedNewLegacyRecorder(name))
}
func (e *eventBroadcasterAdapterImpl) DeprecatedNewLegacyRecorder(name string) record.EventRecorder {
return e.coreBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: name})
}
func (e *eventBroadcasterAdapterImpl) Shutdown() {
if e.coreBroadcaster != nil {
e.coreBroadcaster.Shutdown()
}
if e.eventsv1Broadcaster != nil {
e.eventsv1Broadcaster.Shutdown()
}
}

88
vendor/k8s.io/client-go/tools/events/event_recorder.go generated vendored Normal file
View File

@ -0,0 +1,88 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"fmt"
"time"
v1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/record/util"
"k8s.io/client-go/tools/reference"
"k8s.io/klog/v2"
)
type recorderImpl struct {
scheme *runtime.Scheme
reportingController string
reportingInstance string
*watch.Broadcaster
clock clock.Clock
}
func (recorder *recorderImpl) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
timestamp := metav1.MicroTime{time.Now()}
message := fmt.Sprintf(note, args...)
refRegarding, err := reference.GetReference(recorder.scheme, regarding)
if err != nil {
klog.Errorf("Could not construct reference to: '%#v' due to: '%v'. Will not report event: '%v' '%v' '%v'", regarding, err, eventtype, reason, message)
return
}
refRelated, err := reference.GetReference(recorder.scheme, related)
if err != nil {
klog.V(9).Infof("Could not construct reference to: '%#v' due to: '%v'.", related, err)
}
if !util.ValidateEventType(eventtype) {
klog.Errorf("Unsupported event type: '%v'", eventtype)
return
}
event := recorder.makeEvent(refRegarding, refRelated, timestamp, eventtype, reason, message, recorder.reportingController, recorder.reportingInstance, action)
go func() {
defer utilruntime.HandleCrash()
recorder.Action(watch.Added, event)
}()
}
func (recorder *recorderImpl) makeEvent(refRegarding *v1.ObjectReference, refRelated *v1.ObjectReference, timestamp metav1.MicroTime, eventtype, reason, message string, reportingController string, reportingInstance string, action string) *eventsv1.Event {
t := metav1.Time{Time: recorder.clock.Now()}
namespace := refRegarding.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
return &eventsv1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
Namespace: namespace,
},
EventTime: timestamp,
Series: nil,
ReportingController: reportingController,
ReportingInstance: reportingInstance,
Action: action,
Reason: reason,
Regarding: *refRegarding,
Related: refRelated,
Note: message,
Type: eventtype,
}
}

45
vendor/k8s.io/client-go/tools/events/fake.go generated vendored Normal file
View File

@ -0,0 +1,45 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime"
)
// FakeRecorder is used as a fake during tests. It is thread safe. It is usable
// when created manually and not by NewFakeRecorder, however all events may be
// thrown away in this case.
type FakeRecorder struct {
Events chan string
}
// Eventf emits an event
func (f *FakeRecorder) Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{}) {
if f.Events != nil {
f.Events <- fmt.Sprintf(eventtype+" "+reason+" "+note, args...)
}
}
// NewFakeRecorder creates new fake event recorder with event channel with
// buffer of given size.
func NewFakeRecorder(bufferSize int) *FakeRecorder {
return &FakeRecorder{
Events: make(chan string, bufferSize),
}
}

64
vendor/k8s.io/client-go/tools/events/helper.go generated vendored Normal file
View File

@ -0,0 +1,64 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
"fmt"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
eventsv1beta1 "k8s.io/api/events/v1beta1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
)
var mapping = map[schema.GroupVersion]string{
eventsv1.SchemeGroupVersion: "regarding",
eventsv1beta1.SchemeGroupVersion: "regarding",
corev1.SchemeGroupVersion: "involvedObject",
}
// GetFieldSelector returns the appropriate field selector based on the API version being used to communicate with the server.
// The returned field selector can be used with List and Watch to filter desired events.
func GetFieldSelector(eventsGroupVersion schema.GroupVersion, regardingGroupVersionKind schema.GroupVersionKind, regardingName string, regardingUID types.UID) (fields.Selector, error) {
field := fields.Set{}
if _, ok := mapping[eventsGroupVersion]; !ok {
return nil, fmt.Errorf("unknown version %v", eventsGroupVersion)
}
prefix := mapping[eventsGroupVersion]
if len(regardingName) > 0 {
field[prefix+".name"] = regardingName
}
if len(regardingGroupVersionKind.Kind) > 0 {
field[prefix+".kind"] = regardingGroupVersionKind.Kind
}
regardingGroupVersion := regardingGroupVersionKind.GroupVersion()
if !regardingGroupVersion.Empty() {
field[prefix+".apiVersion"] = regardingGroupVersion.String()
}
if len(regardingUID) > 0 {
field[prefix+".uid"] = string(regardingUID)
}
return field.AsSelector(), nil
}

90
vendor/k8s.io/client-go/tools/events/interfaces.go generated vendored Normal file
View File

@ -0,0 +1,90 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package events
import (
eventsv1 "k8s.io/api/events/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
)
// EventRecorder knows how to record events on behalf of an EventSource.
type EventRecorder interface {
// Eventf constructs an event from the given information and puts it in the queue for sending.
// 'regarding' is the object this event is about. Event will make a reference-- or you may also
// pass a reference to the object directly.
// 'related' is the secondary object for more complex actions. E.g. when regarding object triggers
// a creation or deletion of related object.
// 'type' of this event, and can be one of Normal, Warning. New types could be added in future
// 'reason' is the reason this event is generated. 'reason' should be short and unique; it
// should be in UpperCamelCase format (starting with a capital letter). "reason" will be used
// to automate handling of events, so imagine people writing switch statements to handle them.
// You want to make that easy.
// 'action' explains what happened with regarding/what action did the ReportingController
// (ReportingController is a type of a Controller reporting an Event, e.g. k8s.io/node-controller, k8s.io/kubelet.)
// take in regarding's name; it should be in UpperCamelCase format (starting with a capital letter).
// 'note' is intended to be human readable.
Eventf(regarding runtime.Object, related runtime.Object, eventtype, reason, action, note string, args ...interface{})
}
// EventBroadcaster knows how to receive events and send them to any EventSink, watcher, or log.
type EventBroadcaster interface {
// StartRecordingToSink starts sending events received from the specified eventBroadcaster.
StartRecordingToSink(stopCh <-chan struct{})
// NewRecorder returns an EventRecorder that can be used to send events to this EventBroadcaster
// with the event source set to the given event source.
NewRecorder(scheme *runtime.Scheme, reportingController string) EventRecorder
// StartEventWatcher enables you to watch for emitted events without usage
// of StartRecordingToSink. This lets you also process events in a custom way (e.g. in tests).
// NOTE: events received on your eventHandler should be copied before being used.
// TODO: figure out if this can be removed.
StartEventWatcher(eventHandler func(event runtime.Object)) func()
// Shutdown shuts down the broadcaster
Shutdown()
}
// EventSink knows how to store events (client-go implements it.)
// EventSink must respect the namespace that will be embedded in 'event'.
// It is assumed that EventSink will return the same sorts of errors as
// client-go's REST client.
type EventSink interface {
Create(event *eventsv1.Event) (*eventsv1.Event, error)
Update(event *eventsv1.Event) (*eventsv1.Event, error)
Patch(oldEvent *eventsv1.Event, data []byte) (*eventsv1.Event, error)
}
// EventBroadcasterAdapter is a auxiliary interface to simplify migration to
// the new events API. It is a wrapper around new and legacy broadcasters
// that smartly chooses which one to use.
//
// Deprecated: This interface will be removed once migration is completed.
type EventBroadcasterAdapter interface {
// StartRecordingToSink starts sending events received from the specified eventBroadcaster.
StartRecordingToSink(stopCh <-chan struct{})
// NewRecorder creates a new Event Recorder with specified name.
NewRecorder(name string) EventRecorder
// DeprecatedNewLegacyRecorder creates a legacy Event Recorder with specific name.
DeprecatedNewLegacyRecorder(name string) record.EventRecorder
// Shutdown shuts down the broadcaster.
Shutdown()
}

View File

@ -56,6 +56,7 @@ import (
"bytes"
"context"
"fmt"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/errors"
@ -187,6 +188,9 @@ type LeaderElector struct {
// clock is wrapper around time to allow for less flaky testing
clock clock.Clock
// used to lock the observedRecord
observedRecordLock sync.Mutex
metrics leaderMetricsAdapter
}
@ -224,13 +228,14 @@ func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
// GetLeader returns the identity of the last observed leader or returns the empty string if
// no leader has yet been observed.
// This function is for informational purposes. (e.g. monitoring, logs, etc.)
func (le *LeaderElector) GetLeader() string {
return le.observedRecord.HolderIdentity
return le.getObservedRecord().HolderIdentity
}
// IsLeader returns true if the last observed leader was this client else returns false.
func (le *LeaderElector) IsLeader() bool {
return le.observedRecord.HolderIdentity == le.config.Lock.Identity()
return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity()
}
// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
@ -301,8 +306,8 @@ func (le *LeaderElector) release() bool {
klog.Errorf("Failed to release lock: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
le.setObservedRecord(&leaderElectionRecord)
return true
}
@ -329,16 +334,17 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
klog.Errorf("error initially creating leader election record: %v", err)
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
le.setObservedRecord(&leaderElectionRecord)
return true
}
// 2. Record obtained, check the Identity & Time
if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
le.observedRecord = *oldLeaderElectionRecord
le.setObservedRecord(oldLeaderElectionRecord)
le.observedRawRecord = oldLeaderElectionRawRecord
le.observedTime = le.clock.Now()
}
if len(oldLeaderElectionRecord.HolderIdentity) > 0 &&
le.observedTime.Add(le.config.LeaseDuration).After(now.Time) &&
@ -362,8 +368,7 @@ func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
return false
}
le.observedRecord = leaderElectionRecord
le.observedTime = le.clock.Now()
le.setObservedRecord(&leaderElectionRecord)
return true
}
@ -392,3 +397,22 @@ func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
return nil
}
// setObservedRecord will set a new observedRecord and update observedTime to the current time.
// Protect critical sections with lock.
func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {
le.observedRecordLock.Lock()
defer le.observedRecordLock.Unlock()
le.observedRecord = *observedRecord
le.observedTime = le.clock.Now()
}
// getObservedRecord returns observersRecord.
// Protect critical sections with lock.
func (le *LeaderElector) getObservedRecord() rl.LeaderElectionRecord {
le.observedRecordLock.Lock()
defer le.observedRecordLock.Unlock()
return le.observedRecord
}

19
vendor/k8s.io/client-go/tools/portforward/doc.go generated vendored Normal file
View File

@ -0,0 +1,19 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package portforward adds support for SSH-like port forwarding from the client's
// local host to remote containers.
package portforward // import "k8s.io/client-go/tools/portforward"

View File

@ -0,0 +1,429 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/runtime"
)
// PortForwardProtocolV1Name is the subprotocol used for port forwarding.
// TODO move to API machinery and re-unify with kubelet/server/portfoward
const PortForwardProtocolV1Name = "portforward.k8s.io"
// PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request.
type PortForwarder struct {
addresses []listenAddress
ports []ForwardedPort
stopChan <-chan struct{}
dialer httpstream.Dialer
streamConn httpstream.Connection
listeners []io.Closer
Ready chan struct{}
requestIDLock sync.Mutex
requestID int
out io.Writer
errOut io.Writer
}
// ForwardedPort contains a Local:Remote port pairing.
type ForwardedPort struct {
Local uint16
Remote uint16
}
/*
valid port specifications:
5000
- forwards from localhost:5000 to pod:5000
8888:5000
- forwards from localhost:8888 to pod:5000
0:5000
:5000
- selects a random available local port,
forwards from localhost:<random port> to pod:5000
*/
func parsePorts(ports []string) ([]ForwardedPort, error) {
var forwards []ForwardedPort
for _, portString := range ports {
parts := strings.Split(portString, ":")
var localString, remoteString string
if len(parts) == 1 {
localString = parts[0]
remoteString = parts[0]
} else if len(parts) == 2 {
localString = parts[0]
if localString == "" {
// support :5000
localString = "0"
}
remoteString = parts[1]
} else {
return nil, fmt.Errorf("invalid port format '%s'", portString)
}
localPort, err := strconv.ParseUint(localString, 10, 16)
if err != nil {
return nil, fmt.Errorf("error parsing local port '%s': %s", localString, err)
}
remotePort, err := strconv.ParseUint(remoteString, 10, 16)
if err != nil {
return nil, fmt.Errorf("error parsing remote port '%s': %s", remoteString, err)
}
if remotePort == 0 {
return nil, fmt.Errorf("remote port must be > 0")
}
forwards = append(forwards, ForwardedPort{uint16(localPort), uint16(remotePort)})
}
return forwards, nil
}
type listenAddress struct {
address string
protocol string
failureMode string
}
func parseAddresses(addressesToParse []string) ([]listenAddress, error) {
var addresses []listenAddress
parsed := make(map[string]listenAddress)
for _, address := range addressesToParse {
if address == "localhost" {
if _, exists := parsed["127.0.0.1"]; !exists {
ip := listenAddress{address: "127.0.0.1", protocol: "tcp4", failureMode: "all"}
parsed[ip.address] = ip
}
if _, exists := parsed["::1"]; !exists {
ip := listenAddress{address: "::1", protocol: "tcp6", failureMode: "all"}
parsed[ip.address] = ip
}
} else if net.ParseIP(address).To4() != nil {
parsed[address] = listenAddress{address: address, protocol: "tcp4", failureMode: "any"}
} else if net.ParseIP(address) != nil {
parsed[address] = listenAddress{address: address, protocol: "tcp6", failureMode: "any"}
} else {
return nil, fmt.Errorf("%s is not a valid IP", address)
}
}
addresses = make([]listenAddress, len(parsed))
id := 0
for _, v := range parsed {
addresses[id] = v
id++
}
// Sort addresses before returning to get a stable order
sort.Slice(addresses, func(i, j int) bool { return addresses[i].address < addresses[j].address })
return addresses, nil
}
// New creates a new PortForwarder with localhost listen addresses.
func New(dialer httpstream.Dialer, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
return NewOnAddresses(dialer, []string{"localhost"}, ports, stopChan, readyChan, out, errOut)
}
// NewOnAddresses creates a new PortForwarder with custom listen addresses.
func NewOnAddresses(dialer httpstream.Dialer, addresses []string, ports []string, stopChan <-chan struct{}, readyChan chan struct{}, out, errOut io.Writer) (*PortForwarder, error) {
if len(addresses) == 0 {
return nil, errors.New("you must specify at least 1 address")
}
parsedAddresses, err := parseAddresses(addresses)
if err != nil {
return nil, err
}
if len(ports) == 0 {
return nil, errors.New("you must specify at least 1 port")
}
parsedPorts, err := parsePorts(ports)
if err != nil {
return nil, err
}
return &PortForwarder{
dialer: dialer,
addresses: parsedAddresses,
ports: parsedPorts,
stopChan: stopChan,
Ready: readyChan,
out: out,
errOut: errOut,
}, nil
}
// ForwardPorts formats and executes a port forwarding request. The connection will remain
// open until stopChan is closed.
func (pf *PortForwarder) ForwardPorts() error {
defer pf.Close()
var err error
pf.streamConn, _, err = pf.dialer.Dial(PortForwardProtocolV1Name)
if err != nil {
return fmt.Errorf("error upgrading connection: %s", err)
}
defer pf.streamConn.Close()
return pf.forward()
}
// forward dials the remote host specific in req, upgrades the request, starts
// listeners for each port specified in ports, and forwards local connections
// to the remote host via streams.
func (pf *PortForwarder) forward() error {
var err error
listenSuccess := false
for i := range pf.ports {
port := &pf.ports[i]
err = pf.listenOnPort(port)
switch {
case err == nil:
listenSuccess = true
default:
if pf.errOut != nil {
fmt.Fprintf(pf.errOut, "Unable to listen on port %d: %v\n", port.Local, err)
}
}
}
if !listenSuccess {
return fmt.Errorf("unable to listen on any of the requested ports: %v", pf.ports)
}
if pf.Ready != nil {
close(pf.Ready)
}
// wait for interrupt or conn closure
select {
case <-pf.stopChan:
case <-pf.streamConn.CloseChan():
runtime.HandleError(errors.New("lost connection to pod"))
}
return nil
}
// listenOnPort delegates listener creation and waits for connections on requested bind addresses.
// An error is raised based on address groups (default and localhost) and their failure modes
func (pf *PortForwarder) listenOnPort(port *ForwardedPort) error {
var errors []error
failCounters := make(map[string]int, 2)
successCounters := make(map[string]int, 2)
for _, addr := range pf.addresses {
err := pf.listenOnPortAndAddress(port, addr.protocol, addr.address)
if err != nil {
errors = append(errors, err)
failCounters[addr.failureMode]++
} else {
successCounters[addr.failureMode]++
}
}
if successCounters["all"] == 0 && failCounters["all"] > 0 {
return fmt.Errorf("%s: %v", "Listeners failed to create with the following errors", errors)
}
if failCounters["any"] > 0 {
return fmt.Errorf("%s: %v", "Listeners failed to create with the following errors", errors)
}
return nil
}
// listenOnPortAndAddress delegates listener creation and waits for new connections
// in the background f
func (pf *PortForwarder) listenOnPortAndAddress(port *ForwardedPort, protocol string, address string) error {
listener, err := pf.getListener(protocol, address, port)
if err != nil {
return err
}
pf.listeners = append(pf.listeners, listener)
go pf.waitForConnection(listener, *port)
return nil
}
// getListener creates a listener on the interface targeted by the given hostname on the given port with
// the given protocol. protocol is in net.Listen style which basically admits values like tcp, tcp4, tcp6
func (pf *PortForwarder) getListener(protocol string, hostname string, port *ForwardedPort) (net.Listener, error) {
listener, err := net.Listen(protocol, net.JoinHostPort(hostname, strconv.Itoa(int(port.Local))))
if err != nil {
return nil, fmt.Errorf("unable to create listener: Error %s", err)
}
listenerAddress := listener.Addr().String()
host, localPort, _ := net.SplitHostPort(listenerAddress)
localPortUInt, err := strconv.ParseUint(localPort, 10, 16)
if err != nil {
fmt.Fprintf(pf.out, "Failed to forward from %s:%d -> %d\n", hostname, localPortUInt, port.Remote)
return nil, fmt.Errorf("error parsing local port: %s from %s (%s)", err, listenerAddress, host)
}
port.Local = uint16(localPortUInt)
if pf.out != nil {
fmt.Fprintf(pf.out, "Forwarding from %s -> %d\n", net.JoinHostPort(hostname, strconv.Itoa(int(localPortUInt))), port.Remote)
}
return listener, nil
}
// waitForConnection waits for new connections to listener and handles them in
// the background.
func (pf *PortForwarder) waitForConnection(listener net.Listener, port ForwardedPort) {
for {
conn, err := listener.Accept()
if err != nil {
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
runtime.HandleError(fmt.Errorf("error accepting connection on port %d: %v", port.Local, err))
}
return
}
go pf.handleConnection(conn, port)
}
}
func (pf *PortForwarder) nextRequestID() int {
pf.requestIDLock.Lock()
defer pf.requestIDLock.Unlock()
id := pf.requestID
pf.requestID++
return id
}
// handleConnection copies data between the local connection and the stream to
// the remote server.
func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
defer conn.Close()
if pf.out != nil {
fmt.Fprintf(pf.out, "Handling connection for %d\n", port.Local)
}
requestID := pf.nextRequestID()
// create error stream
headers := http.Header{}
headers.Set(v1.StreamType, v1.StreamTypeError)
headers.Set(v1.PortHeader, fmt.Sprintf("%d", port.Remote))
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
errorStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
return
}
// we're not writing to this stream
errorStream.Close()
errorChan := make(chan error)
go func() {
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil:
errorChan <- fmt.Errorf("error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err)
case len(message) > 0:
errorChan <- fmt.Errorf("an error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message))
}
close(errorChan)
}()
// create data stream
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
return
}
localError := make(chan struct{})
remoteDone := make(chan struct{})
go func() {
// Copy from the remote side to the local port.
if _, err := io.Copy(conn, dataStream); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
runtime.HandleError(fmt.Errorf("error copying from remote stream to local connection: %v", err))
}
// inform the select below that the remote copy is done
close(remoteDone)
}()
go func() {
// inform server we're not sending any more data after copy unblocks
defer dataStream.Close()
// Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, conn); err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
runtime.HandleError(fmt.Errorf("error copying from local connection to remote stream: %v", err))
// break out of the select below without waiting for the other copy to finish
close(localError)
}
}()
// wait for either a local->remote error or for copying from remote->local to finish
select {
case <-remoteDone:
case <-localError:
}
// always expect something on errorChan (it may be nil)
err = <-errorChan
if err != nil {
runtime.HandleError(err)
}
}
// Close stops all listeners of PortForwarder.
func (pf *PortForwarder) Close() {
// stop all listeners
for _, l := range pf.listeners {
if err := l.Close(); err != nil {
runtime.HandleError(fmt.Errorf("error closing listener: %v", err))
}
}
}
// GetPorts will return the ports that were forwarded; this can be used to
// retrieve the locally-bound port in cases where the input was port 0. This
// function will signal an error if the Ready channel is nil or if the
// listeners are not ready yet; this function will succeed after the Ready
// channel has been closed.
func (pf *PortForwarder) GetPorts() ([]ForwardedPort, error) {
if pf.Ready == nil {
return nil, fmt.Errorf("no Ready channel provided")
}
select {
case <-pf.Ready:
return pf.ports, nil
default:
return nil, fmt.Errorf("listeners not ready")
}
}

View File

@ -25,7 +25,7 @@ import (
"github.com/golang/groupcache/lru"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/sets"
@ -175,7 +175,7 @@ func EventAggregatorByReasonFunc(event *v1.Event) (string, string) {
// EventAggregatorMessageFunc is responsible for producing an aggregation message
type EventAggregatorMessageFunc func(event *v1.Event) string
// EventAggregratorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
// EventAggregatorByReasonMessageFunc returns an aggregate message by prefixing the incoming message
func EventAggregatorByReasonMessageFunc(event *v1.Event) string {
return "(combined from similar events): " + event.Message
}
@ -420,7 +420,7 @@ type EventCorrelateResult struct {
// prior to interacting with the API server to record the event.
//
// The default behavior is as follows:
// * Aggregation is performed if a similar event is recorded 10 times in a
// * Aggregation is performed if a similar event is recorded 10 times
// in a 10 minute rolling interval. A similar event is an event that varies only by
// the Event.Message field. Rather than recording the precise event, aggregation
// will create a new event whose message reports that it has combined events with

View File

@ -36,9 +36,5 @@ func ValidateEventType(eventtype string) bool {
func IsKeyNotFoundError(err error) bool {
statusErr, _ := err.(*errors.StatusError)
if statusErr != nil && statusErr.Status().Code == http.StatusNotFound {
return true
}
return false
return statusErr != nil && statusErr.Status().Code == http.StatusNotFound
}

View File

@ -95,7 +95,7 @@ func UntilWithoutRetry(ctx context.Context, watcher watch.Interface, conditions
// Until wraps the watcherClient's watch function with RetryWatcher making sure that watcher gets restarted in case of errors.
// The initialResourceVersion will be given to watch method when first called. It shall not be "" or "0"
// given the underlying WATCH call issues (#74022). If you want the initial list ("", "0") done for you use ListWatchUntil instead.
// given the underlying WATCH call issues (#74022).
// Remaining behaviour is identical to function UntilWithoutRetry. (See above.)
// Until can deal with API timeouts and lost connections.
// It guarantees you to see all events and in the order they happened.