2018-12-19 14:29:25 +00:00
/ *
Copyright 2014 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package rest
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
"mime"
"net/http"
2023-08-17 05:15:28 +00:00
"net/http/httptrace"
2018-12-19 14:29:25 +00:00
"net/url"
2023-02-01 17:06:36 +00:00
"os"
2018-12-19 14:29:25 +00:00
"path"
"reflect"
"strconv"
"strings"
2020-04-14 07:04:33 +00:00
"sync"
2018-12-19 14:29:25 +00:00
"time"
"golang.org/x/net/http2"
2023-02-01 17:06:36 +00:00
2018-12-19 14:29:25 +00:00
"k8s.io/apimachinery/pkg/api/errors"
2024-08-19 08:01:33 +00:00
"k8s.io/apimachinery/pkg/api/meta"
2018-12-19 14:29:25 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2024-08-19 08:01:33 +00:00
"k8s.io/apimachinery/pkg/conversion"
2018-12-19 14:29:25 +00:00
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/streaming"
"k8s.io/apimachinery/pkg/util/net"
"k8s.io/apimachinery/pkg/watch"
2024-08-19 08:01:33 +00:00
clientfeatures "k8s.io/client-go/features"
2018-12-19 14:29:25 +00:00
restclientwatch "k8s.io/client-go/rest/watch"
"k8s.io/client-go/tools/metrics"
"k8s.io/client-go/util/flowcontrol"
2020-12-17 12:28:29 +00:00
"k8s.io/klog/v2"
2021-12-08 13:50:47 +00:00
"k8s.io/utils/clock"
2018-12-19 14:29:25 +00:00
)
var (
// longThrottleLatency defines threshold for logging requests. All requests being
2020-01-14 10:38:55 +00:00
// throttled (via the provided rateLimiter) for more than longThrottleLatency will
// be logged.
2018-12-19 14:29:25 +00:00
longThrottleLatency = 50 * time . Millisecond
2020-04-14 07:04:33 +00:00
// extraLongThrottleLatency defines the threshold for logging requests at log level 2.
extraLongThrottleLatency = 1 * time . Second
2018-12-19 14:29:25 +00:00
)
// HTTPClient is an interface for testing a request object.
type HTTPClient interface {
Do ( req * http . Request ) ( * http . Response , error )
}
// ResponseWrapper is an interface for getting a response.
// The response may be either accessed as a raw data (the whole output is put into memory) or as a stream.
type ResponseWrapper interface {
2020-04-14 07:04:33 +00:00
DoRaw ( context . Context ) ( [ ] byte , error )
Stream ( context . Context ) ( io . ReadCloser , error )
2018-12-19 14:29:25 +00:00
}
// RequestConstructionError is returned when there's an error assembling a request.
type RequestConstructionError struct {
Err error
}
// Error returns a textual description of 'r'.
func ( r * RequestConstructionError ) Error ( ) string {
return fmt . Sprintf ( "request construction error: '%v'" , r . Err )
}
2020-01-14 10:38:55 +00:00
var noBackoff = & NoBackoff { }
2022-05-05 02:47:06 +00:00
type requestRetryFunc func ( maxRetries int ) WithRetry
func defaultRequestRetryFn ( maxRetries int ) WithRetry {
return & withRetry { maxRetries : maxRetries }
}
2018-12-19 14:29:25 +00:00
// Request allows for building up a request to a server in a chained fashion.
// Any errors are stored until the end of your call, so you only have to
// check once.
type Request struct {
2020-01-14 10:38:55 +00:00
c * RESTClient
2018-12-19 14:29:25 +00:00
2020-12-17 12:28:29 +00:00
warningHandler WarningHandler
2020-01-14 10:38:55 +00:00
rateLimiter flowcontrol . RateLimiter
backoff BackoffManager
timeout time . Duration
2022-05-05 02:47:06 +00:00
maxRetries int
2018-12-19 14:29:25 +00:00
// generic components accessible via method setters
2020-01-14 10:38:55 +00:00
verb string
2018-12-19 14:29:25 +00:00
pathPrefix string
subpath string
params url . Values
headers http . Header
// structural elements of the request that are part of the Kubernetes API conventions
namespace string
namespaceSet bool
resource string
resourceName string
subresource string
// output
2023-02-01 17:06:36 +00:00
err error
// only one of body / bodyBytes may be set. requests using body are not retriable.
body io . Reader
bodyBytes [ ] byte
2022-05-05 02:47:06 +00:00
retryFn requestRetryFunc
2018-12-19 14:29:25 +00:00
}
// NewRequest creates a new request helper object for accessing runtime.Objects on a server.
2020-01-14 10:38:55 +00:00
func NewRequest ( c * RESTClient ) * Request {
var backoff BackoffManager
if c . createBackoffMgr != nil {
backoff = c . createBackoffMgr ( )
}
2018-12-19 14:29:25 +00:00
if backoff == nil {
2020-01-14 10:38:55 +00:00
backoff = noBackoff
}
var pathPrefix string
if c . base != nil {
pathPrefix = path . Join ( "/" , c . base . Path , c . versionedAPIPath )
} else {
pathPrefix = path . Join ( "/" , c . versionedAPIPath )
2018-12-19 14:29:25 +00:00
}
2020-01-14 10:38:55 +00:00
var timeout time . Duration
if c . Client != nil {
timeout = c . Client . Timeout
2018-12-19 14:29:25 +00:00
}
2020-01-14 10:38:55 +00:00
2018-12-19 14:29:25 +00:00
r := & Request {
2020-12-17 12:28:29 +00:00
c : c ,
rateLimiter : c . rateLimiter ,
backoff : backoff ,
timeout : timeout ,
pathPrefix : pathPrefix ,
2022-05-05 02:47:06 +00:00
maxRetries : 10 ,
retryFn : defaultRequestRetryFn ,
2020-12-17 12:28:29 +00:00
warningHandler : c . warningHandler ,
2018-12-19 14:29:25 +00:00
}
2020-01-14 10:38:55 +00:00
2018-12-19 14:29:25 +00:00
switch {
2020-01-14 10:38:55 +00:00
case len ( c . content . AcceptContentTypes ) > 0 :
r . SetHeader ( "Accept" , c . content . AcceptContentTypes )
case len ( c . content . ContentType ) > 0 :
r . SetHeader ( "Accept" , c . content . ContentType + ", */*" )
2018-12-19 14:29:25 +00:00
}
return r
}
2020-01-14 10:38:55 +00:00
// NewRequestWithClient creates a Request with an embedded RESTClient for use in test scenarios.
func NewRequestWithClient ( base * url . URL , versionedAPIPath string , content ClientContentConfig , client * http . Client ) * Request {
return NewRequest ( & RESTClient {
base : base ,
versionedAPIPath : versionedAPIPath ,
content : content ,
Client : client ,
} )
}
// Verb sets the verb this request will use.
func ( r * Request ) Verb ( verb string ) * Request {
r . verb = verb
return r
}
2018-12-19 14:29:25 +00:00
// Prefix adds segments to the relative beginning to the request path. These
// items will be placed before the optional Namespace, Resource, or Name sections.
// Setting AbsPath will clear any previously set Prefix segments
func ( r * Request ) Prefix ( segments ... string ) * Request {
if r . err != nil {
return r
}
r . pathPrefix = path . Join ( r . pathPrefix , path . Join ( segments ... ) )
return r
}
// Suffix appends segments to the end of the path. These items will be placed after the prefix and optional
// Namespace, Resource, or Name sections.
func ( r * Request ) Suffix ( segments ... string ) * Request {
if r . err != nil {
return r
}
r . subpath = path . Join ( r . subpath , path . Join ( segments ... ) )
return r
}
// Resource sets the resource to access (<resource>/[ns/<namespace>/]<name>)
func ( r * Request ) Resource ( resource string ) * Request {
if r . err != nil {
return r
}
if len ( r . resource ) != 0 {
r . err = fmt . Errorf ( "resource already set to %q, cannot change to %q" , r . resource , resource )
return r
}
if msgs := IsValidPathSegmentName ( resource ) ; len ( msgs ) != 0 {
r . err = fmt . Errorf ( "invalid resource %q: %v" , resource , msgs )
return r
}
r . resource = resource
return r
}
// BackOff sets the request's backoff manager to the one specified,
// or defaults to the stub implementation if nil is provided
func ( r * Request ) BackOff ( manager BackoffManager ) * Request {
if manager == nil {
2020-01-14 10:38:55 +00:00
r . backoff = & NoBackoff { }
2018-12-19 14:29:25 +00:00
return r
}
2020-01-14 10:38:55 +00:00
r . backoff = manager
2018-12-19 14:29:25 +00:00
return r
}
2020-12-17 12:28:29 +00:00
// WarningHandler sets the handler this client uses when warning headers are encountered.
// If set to nil, this client will use the default warning handler (see SetDefaultWarningHandler).
func ( r * Request ) WarningHandler ( handler WarningHandler ) * Request {
r . warningHandler = handler
return r
}
2018-12-19 14:29:25 +00:00
// Throttle receives a rate-limiter and sets or replaces an existing request limiter
func ( r * Request ) Throttle ( limiter flowcontrol . RateLimiter ) * Request {
2020-01-14 10:38:55 +00:00
r . rateLimiter = limiter
2018-12-19 14:29:25 +00:00
return r
}
2019-01-15 16:20:41 +00:00
// SubResource sets a sub-resource path which can be multiple segments after the resource
2018-12-19 14:29:25 +00:00
// name but before the suffix.
func ( r * Request ) SubResource ( subresources ... string ) * Request {
if r . err != nil {
return r
}
subresource := path . Join ( subresources ... )
if len ( r . subresource ) != 0 {
2021-06-25 04:59:51 +00:00
r . err = fmt . Errorf ( "subresource already set to %q, cannot change to %q" , r . subresource , subresource )
2018-12-19 14:29:25 +00:00
return r
}
for _ , s := range subresources {
if msgs := IsValidPathSegmentName ( s ) ; len ( msgs ) != 0 {
r . err = fmt . Errorf ( "invalid subresource %q: %v" , s , msgs )
return r
}
}
r . subresource = subresource
return r
}
// Name sets the name of a resource to access (<resource>/[ns/<namespace>/]<name>)
func ( r * Request ) Name ( resourceName string ) * Request {
if r . err != nil {
return r
}
if len ( resourceName ) == 0 {
r . err = fmt . Errorf ( "resource name may not be empty" )
return r
}
if len ( r . resourceName ) != 0 {
r . err = fmt . Errorf ( "resource name already set to %q, cannot change to %q" , r . resourceName , resourceName )
return r
}
if msgs := IsValidPathSegmentName ( resourceName ) ; len ( msgs ) != 0 {
r . err = fmt . Errorf ( "invalid resource name %q: %v" , resourceName , msgs )
return r
}
r . resourceName = resourceName
return r
}
// Namespace applies the namespace scope to a request (<resource>/[ns/<namespace>/]<name>)
func ( r * Request ) Namespace ( namespace string ) * Request {
if r . err != nil {
return r
}
if r . namespaceSet {
r . err = fmt . Errorf ( "namespace already set to %q, cannot change to %q" , r . namespace , namespace )
return r
}
if msgs := IsValidPathSegmentName ( namespace ) ; len ( msgs ) != 0 {
r . err = fmt . Errorf ( "invalid namespace %q: %v" , namespace , msgs )
return r
}
r . namespaceSet = true
r . namespace = namespace
return r
}
// NamespaceIfScoped is a convenience function to set a namespace if scoped is true
func ( r * Request ) NamespaceIfScoped ( namespace string , scoped bool ) * Request {
if scoped {
return r . Namespace ( namespace )
}
return r
}
// AbsPath overwrites an existing path with the segments provided. Trailing slashes are preserved
// when a single segment is passed.
func ( r * Request ) AbsPath ( segments ... string ) * Request {
if r . err != nil {
return r
}
2020-01-14 10:38:55 +00:00
r . pathPrefix = path . Join ( r . c . base . Path , path . Join ( segments ... ) )
if len ( segments ) == 1 && ( len ( r . c . base . Path ) > 1 || len ( segments [ 0 ] ) > 1 ) && strings . HasSuffix ( segments [ 0 ] , "/" ) {
2018-12-19 14:29:25 +00:00
// preserve any trailing slashes for legacy behavior
r . pathPrefix += "/"
}
return r
}
// RequestURI overwrites existing path and parameters with the value of the provided server relative
// URI.
func ( r * Request ) RequestURI ( uri string ) * Request {
if r . err != nil {
return r
}
locator , err := url . Parse ( uri )
if err != nil {
r . err = err
return r
}
r . pathPrefix = locator . Path
if len ( locator . Query ( ) ) > 0 {
if r . params == nil {
r . params = make ( url . Values )
}
for k , v := range locator . Query ( ) {
r . params [ k ] = v
}
}
return r
}
// Param creates a query parameter with the given string value.
func ( r * Request ) Param ( paramName , s string ) * Request {
if r . err != nil {
return r
}
return r . setParam ( paramName , s )
}
// VersionedParams will take the provided object, serialize it to a map[string][]string using the
// implicit RESTClient API version and the default parameter codec, and then add those as parameters
// to the request. Use this to provide versioned query parameters from client libraries.
// VersionedParams will not write query parameters that have omitempty set and are empty. If a
// parameter has already been set it is appended to (Params and VersionedParams are additive).
func ( r * Request ) VersionedParams ( obj runtime . Object , codec runtime . ParameterCodec ) * Request {
2020-01-14 10:38:55 +00:00
return r . SpecificallyVersionedParams ( obj , codec , r . c . content . GroupVersion )
2019-01-15 16:20:41 +00:00
}
func ( r * Request ) SpecificallyVersionedParams ( obj runtime . Object , codec runtime . ParameterCodec , version schema . GroupVersion ) * Request {
2018-12-19 14:29:25 +00:00
if r . err != nil {
return r
}
2019-01-15 16:20:41 +00:00
params , err := codec . EncodeParameters ( obj , version )
2018-12-19 14:29:25 +00:00
if err != nil {
r . err = err
return r
}
for k , v := range params {
if r . params == nil {
r . params = make ( url . Values )
}
r . params [ k ] = append ( r . params [ k ] , v ... )
}
return r
}
func ( r * Request ) setParam ( paramName , value string ) * Request {
if r . params == nil {
r . params = make ( url . Values )
}
r . params [ paramName ] = append ( r . params [ paramName ] , value )
return r
}
func ( r * Request ) SetHeader ( key string , values ... string ) * Request {
if r . headers == nil {
r . headers = http . Header { }
}
r . headers . Del ( key )
for _ , value := range values {
r . headers . Add ( key , value )
}
return r
}
2019-01-15 16:20:41 +00:00
// Timeout makes the request use the given duration as an overall timeout for the
// request. Additionally, if set passes the value as "timeout" parameter in URL.
2018-12-19 14:29:25 +00:00
func ( r * Request ) Timeout ( d time . Duration ) * Request {
if r . err != nil {
return r
}
r . timeout = d
return r
}
2020-12-17 12:28:29 +00:00
// MaxRetries makes the request use the given integer as a ceiling of retrying upon receiving
// "Retry-After" headers and 429 status-code in the response. The default is 10 unless this
// function is specifically called with a different value.
// A zero maxRetries prevent it from doing retires and return an error immediately.
func ( r * Request ) MaxRetries ( maxRetries int ) * Request {
2022-05-05 02:47:06 +00:00
if maxRetries < 0 {
maxRetries = 0
}
r . maxRetries = maxRetries
2020-12-17 12:28:29 +00:00
return r
}
2018-12-19 14:29:25 +00:00
// Body makes the request use obj as the body. Optional.
// If obj is a string, try to read a file of that name.
// If obj is a []byte, send it directly.
// If obj is an io.Reader, use it directly.
// If obj is a runtime.Object, marshal it correctly, and set Content-Type header.
// If obj is a runtime.Object and nil, do nothing.
// Otherwise, set an error.
func ( r * Request ) Body ( obj interface { } ) * Request {
if r . err != nil {
return r
}
switch t := obj . ( type ) {
case string :
2023-02-01 17:06:36 +00:00
data , err := os . ReadFile ( t )
2018-12-19 14:29:25 +00:00
if err != nil {
r . err = err
return r
}
glogBody ( "Request Body" , data )
2023-02-01 17:06:36 +00:00
r . body = nil
r . bodyBytes = data
2018-12-19 14:29:25 +00:00
case [ ] byte :
glogBody ( "Request Body" , t )
2023-02-01 17:06:36 +00:00
r . body = nil
r . bodyBytes = t
2018-12-19 14:29:25 +00:00
case io . Reader :
r . body = t
2023-02-01 17:06:36 +00:00
r . bodyBytes = nil
2018-12-19 14:29:25 +00:00
case runtime . Object :
// callers may pass typed interface pointers, therefore we must check nil with reflection
if reflect . ValueOf ( t ) . IsNil ( ) {
return r
}
2020-01-14 10:38:55 +00:00
encoder , err := r . c . content . Negotiator . Encoder ( r . c . content . ContentType , nil )
if err != nil {
r . err = err
return r
}
data , err := runtime . Encode ( encoder , t )
2018-12-19 14:29:25 +00:00
if err != nil {
r . err = err
return r
}
glogBody ( "Request Body" , data )
2023-02-01 17:06:36 +00:00
r . body = nil
r . bodyBytes = data
2020-01-14 10:38:55 +00:00
r . SetHeader ( "Content-Type" , r . c . content . ContentType )
2018-12-19 14:29:25 +00:00
default :
r . err = fmt . Errorf ( "unknown type used for body: %+v" , obj )
}
return r
}
2023-06-01 16:58:10 +00:00
// Error returns any error encountered constructing the request, if any.
func ( r * Request ) Error ( ) error {
return r . err
}
// URL returns the current working URL. Check the result of Error() to ensure
// that the returned URL is valid.
2018-12-19 14:29:25 +00:00
func ( r * Request ) URL ( ) * url . URL {
p := r . pathPrefix
if r . namespaceSet && len ( r . namespace ) > 0 {
p = path . Join ( p , "namespaces" , r . namespace )
}
if len ( r . resource ) != 0 {
p = path . Join ( p , strings . ToLower ( r . resource ) )
}
// Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
if len ( r . resourceName ) != 0 || len ( r . subpath ) != 0 || len ( r . subresource ) != 0 {
p = path . Join ( p , r . resourceName , r . subresource , r . subpath )
}
finalURL := & url . URL { }
2020-01-14 10:38:55 +00:00
if r . c . base != nil {
* finalURL = * r . c . base
2018-12-19 14:29:25 +00:00
}
finalURL . Path = p
query := url . Values { }
for key , values := range r . params {
for _ , value := range values {
query . Add ( key , value )
}
}
// timeout is handled specially here.
if r . timeout != 0 {
query . Set ( "timeout" , r . timeout . String ( ) )
}
finalURL . RawQuery = query . Encode ( )
return finalURL
}
2022-11-01 09:43:55 +00:00
// finalURLTemplate is similar to URL(), but will make all specific parameter values equal
// - instead of name or namespace, "{name}" and "{namespace}" will be used, and all query
// parameters will be reset. This creates a copy of the url so as not to change the
// underlying object.
func ( r Request ) finalURLTemplate ( ) url . URL {
newParams := url . Values { }
v := [ ] string { "{value}" }
for k := range r . params {
newParams [ k ] = v
}
r . params = newParams
u := r . URL ( )
if u == nil {
return url . URL { }
}
segments := strings . Split ( u . Path , "/" )
groupIndex := 0
index := 0
trimmedBasePath := ""
if r . c . base != nil && strings . Contains ( u . Path , r . c . base . Path ) {
p := strings . TrimPrefix ( u . Path , r . c . base . Path )
if ! strings . HasPrefix ( p , "/" ) {
p = "/" + p
}
// store the base path that we have trimmed so we can append it
// before returning the URL
trimmedBasePath = r . c . base . Path
segments = strings . Split ( p , "/" )
groupIndex = 1
}
if len ( segments ) <= 2 {
return * u
}
const CoreGroupPrefix = "api"
const NamedGroupPrefix = "apis"
isCoreGroup := segments [ groupIndex ] == CoreGroupPrefix
isNamedGroup := segments [ groupIndex ] == NamedGroupPrefix
if isCoreGroup {
// checking the case of core group with /api/v1/... format
index = groupIndex + 2
} else if isNamedGroup {
// checking the case of named group with /apis/apps/v1/... format
index = groupIndex + 3
} else {
// this should not happen that the only two possibilities are /api... and /apis..., just want to put an
// outlet here in case more API groups are added in future if ever possible:
// https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-groups
// if a wrong API groups name is encountered, return the {prefix} for url.Path
u . Path = "/{prefix}"
u . RawQuery = ""
return * u
}
// switch segLength := len(segments) - index; segLength {
switch {
// case len(segments) - index == 1:
// resource (with no name) do nothing
case len ( segments ) - index == 2 :
// /$RESOURCE/$NAME: replace $NAME with {name}
segments [ index + 1 ] = "{name}"
case len ( segments ) - index == 3 :
if segments [ index + 2 ] == "finalize" || segments [ index + 2 ] == "status" {
// /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
segments [ index + 1 ] = "{name}"
} else {
// /namespace/$NAMESPACE/$RESOURCE: replace $NAMESPACE with {namespace}
segments [ index + 1 ] = "{namespace}"
}
case len ( segments ) - index >= 4 :
segments [ index + 1 ] = "{namespace}"
// /namespace/$NAMESPACE/$RESOURCE/$NAME: replace $NAMESPACE with {namespace}, $NAME with {name}
if segments [ index + 3 ] != "finalize" && segments [ index + 3 ] != "status" {
// /$RESOURCE/$NAME/$SUBRESOURCE: replace $NAME with {name}
segments [ index + 3 ] = "{name}"
}
}
u . Path = path . Join ( trimmedBasePath , path . Join ( segments ... ) )
return * u
}
2021-06-25 04:59:51 +00:00
func ( r * Request ) tryThrottleWithInfo ( ctx context . Context , retryInfo string ) error {
2020-01-14 10:38:55 +00:00
if r . rateLimiter == nil {
return nil
}
2018-12-19 14:29:25 +00:00
now := time . Now ( )
2020-01-14 10:38:55 +00:00
2020-04-14 07:04:33 +00:00
err := r . rateLimiter . Wait ( ctx )
2022-05-05 02:47:06 +00:00
if err != nil {
err = fmt . Errorf ( "client rate limiter Wait returned an error: %w" , err )
}
2020-04-14 07:04:33 +00:00
latency := time . Since ( now )
2021-06-25 04:59:51 +00:00
var message string
switch {
case len ( retryInfo ) > 0 :
message = fmt . Sprintf ( "Waited for %v, %s - request: %s:%s" , latency , retryInfo , r . verb , r . URL ( ) . String ( ) )
default :
message = fmt . Sprintf ( "Waited for %v due to client-side throttling, not priority and fairness, request: %s:%s" , latency , r . verb , r . URL ( ) . String ( ) )
}
2020-04-14 07:04:33 +00:00
if latency > longThrottleLatency {
2021-06-25 04:59:51 +00:00
klog . V ( 3 ) . Info ( message )
2018-12-19 14:29:25 +00:00
}
2020-04-14 07:04:33 +00:00
if latency > extraLongThrottleLatency {
// If the rate limiter latency is very high, the log message should be printed at a higher log level,
// but we use a throttled logger to prevent spamming.
2021-08-09 07:19:24 +00:00
globalThrottledLogger . Infof ( "%s" , message )
2020-04-14 07:04:33 +00:00
}
2022-11-01 09:43:55 +00:00
metrics . RateLimiterLatency . Observe ( ctx , r . verb , r . finalURLTemplate ( ) , latency )
2020-01-14 10:38:55 +00:00
return err
2018-12-19 14:29:25 +00:00
}
2021-06-25 04:59:51 +00:00
func ( r * Request ) tryThrottle ( ctx context . Context ) error {
return r . tryThrottleWithInfo ( ctx , "" )
}
2020-04-14 07:04:33 +00:00
type throttleSettings struct {
logLevel klog . Level
minLogInterval time . Duration
lastLogTime time . Time
lock sync . RWMutex
}
type throttledLogger struct {
2021-12-08 13:50:47 +00:00
clock clock . PassiveClock
2020-04-14 07:04:33 +00:00
settings [ ] * throttleSettings
}
var globalThrottledLogger = & throttledLogger {
2021-12-08 13:50:47 +00:00
clock : clock . RealClock { } ,
2020-04-14 07:04:33 +00:00
settings : [ ] * throttleSettings {
{
logLevel : 2 ,
minLogInterval : 1 * time . Second ,
} , {
logLevel : 0 ,
minLogInterval : 10 * time . Second ,
} ,
} ,
}
func ( b * throttledLogger ) attemptToLog ( ) ( klog . Level , bool ) {
for _ , setting := range b . settings {
2020-12-17 12:28:29 +00:00
if bool ( klog . V ( setting . logLevel ) . Enabled ( ) ) {
2020-04-14 07:04:33 +00:00
// Return early without write locking if possible.
if func ( ) bool {
setting . lock . RLock ( )
defer setting . lock . RUnlock ( )
return b . clock . Since ( setting . lastLogTime ) >= setting . minLogInterval
} ( ) {
setting . lock . Lock ( )
defer setting . lock . Unlock ( )
if b . clock . Since ( setting . lastLogTime ) >= setting . minLogInterval {
setting . lastLogTime = b . clock . Now ( )
return setting . logLevel , true
}
}
return - 1 , false
}
}
return - 1 , false
}
2020-12-17 12:28:29 +00:00
// Infof will write a log message at each logLevel specified by the receiver's throttleSettings
2020-04-14 07:04:33 +00:00
// as long as it hasn't written a log message more recently than minLogInterval.
func ( b * throttledLogger ) Infof ( message string , args ... interface { } ) {
if logLevel , ok := b . attemptToLog ( ) ; ok {
klog . V ( logLevel ) . Infof ( message , args ... )
}
}
2018-12-19 14:29:25 +00:00
// Watch attempts to begin watching the requested location.
// Returns a watch.Interface, or an error.
2020-04-14 07:04:33 +00:00
func ( r * Request ) Watch ( ctx context . Context ) ( watch . Interface , error ) {
2018-12-19 14:29:25 +00:00
// We specifically don't want to rate limit watches, so we
2020-01-14 10:38:55 +00:00
// don't use r.rateLimiter here.
2018-12-19 14:29:25 +00:00
if r . err != nil {
return nil , r . err
}
2020-01-14 10:38:55 +00:00
client := r . c . Client
2018-12-19 14:29:25 +00:00
if client == nil {
client = http . DefaultClient
}
2021-08-09 07:19:24 +00:00
isErrRetryableFunc := func ( request * http . Request , err error ) bool {
2018-12-19 14:29:25 +00:00
// The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases.
2020-07-24 14:20:51 +00:00
if net . IsProbableEOF ( err ) || net . IsTimeout ( err ) {
2021-08-09 07:19:24 +00:00
return true
2018-12-19 14:29:25 +00:00
}
2021-08-09 07:19:24 +00:00
return false
2018-12-19 14:29:25 +00:00
}
2022-05-05 02:47:06 +00:00
retry := r . retryFn ( r . maxRetries )
2021-08-09 07:19:24 +00:00
url := r . URL ( ) . String ( )
for {
2022-05-05 02:47:06 +00:00
if err := retry . Before ( ctx , r ) ; err != nil {
return nil , retry . WrapPreviousError ( err )
}
2021-08-09 07:19:24 +00:00
req , err := r . newHTTPRequest ( ctx )
if err != nil {
return nil , err
}
resp , err := client . Do ( req )
2022-05-05 02:47:06 +00:00
retry . After ( ctx , r , resp , err )
2021-08-09 07:19:24 +00:00
if err == nil && resp . StatusCode == http . StatusOK {
return r . newStreamWatcher ( resp )
}
done , transformErr := func ( ) ( bool , error ) {
defer readAndCloseResponseBody ( resp )
2022-05-05 02:47:06 +00:00
if retry . IsNextRetry ( ctx , r , req , resp , err , isErrRetryableFunc ) {
return false , nil
2021-08-09 07:19:24 +00:00
}
if resp == nil {
// the server must have sent us an error in 'err'
return true , nil
}
if result := r . transformResponse ( resp , req ) ; result . err != nil {
return true , result . err
}
return true , fmt . Errorf ( "for request %s, got status: %v" , url , resp . StatusCode )
} ( )
if done {
if isErrRetryableFunc ( req , err ) {
return watch . NewEmptyWatch ( ) , nil
}
if err == nil {
// if the server sent us an HTTP Response object,
// we need to return the error object from that.
err = transformErr
}
2022-05-05 02:47:06 +00:00
return nil , retry . WrapPreviousError ( err )
2018-12-19 14:29:25 +00:00
}
}
2021-08-09 07:19:24 +00:00
}
2020-01-14 10:38:55 +00:00
2024-08-19 08:01:33 +00:00
type WatchListResult struct {
// err holds any errors we might have received
// during streaming.
err error
// items hold the collected data
items [ ] runtime . Object
// initialEventsEndBookmarkRV holds the resource version
// extracted from the bookmark event that marks
// the end of the stream.
initialEventsEndBookmarkRV string
// gv represents the API version
// it is used to construct the final list response
// normally this information is filled by the server
gv schema . GroupVersion
}
func ( r WatchListResult ) Into ( obj runtime . Object ) error {
if r . err != nil {
return r . err
}
listPtr , err := meta . GetItemsPtr ( obj )
if err != nil {
return err
}
listVal , err := conversion . EnforcePtr ( listPtr )
if err != nil {
return err
}
if listVal . Kind ( ) != reflect . Slice {
return fmt . Errorf ( "need a pointer to slice, got %v" , listVal . Kind ( ) )
}
if len ( r . items ) == 0 {
listVal . Set ( reflect . MakeSlice ( listVal . Type ( ) , 0 , 0 ) )
} else {
listVal . Set ( reflect . MakeSlice ( listVal . Type ( ) , len ( r . items ) , len ( r . items ) ) )
for i , o := range r . items {
if listVal . Type ( ) . Elem ( ) != reflect . TypeOf ( o ) . Elem ( ) {
return fmt . Errorf ( "received object type = %v at index = %d, doesn't match the list item type = %v" , reflect . TypeOf ( o ) . Elem ( ) , i , listVal . Type ( ) . Elem ( ) )
}
listVal . Index ( i ) . Set ( reflect . ValueOf ( o ) . Elem ( ) )
}
}
listMeta , err := meta . ListAccessor ( obj )
if err != nil {
return err
}
listMeta . SetResourceVersion ( r . initialEventsEndBookmarkRV )
typeMeta , err := meta . TypeAccessor ( obj )
if err != nil {
return err
}
version := r . gv . String ( )
typeMeta . SetAPIVersion ( version )
typeMeta . SetKind ( reflect . TypeOf ( obj ) . Elem ( ) . Name ( ) )
return nil
}
// WatchList establishes a stream to get a consistent snapshot of data
// from the server as described in https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#proposal
//
// Note that the watchlist requires properly setting the ListOptions
// otherwise it just establishes a regular watch with the server.
// Check the documentation https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
// to see what parameters are currently required.
func ( r * Request ) WatchList ( ctx context . Context ) WatchListResult {
if ! clientfeatures . FeatureGates ( ) . Enabled ( clientfeatures . WatchListClient ) {
return WatchListResult { err : fmt . Errorf ( "%q feature gate is not enabled" , clientfeatures . WatchListClient ) }
}
// TODO(#115478): consider validating request parameters (i.e sendInitialEvents).
// Most users use the generated client, which handles the proper setting of parameters.
// We don't have validation for other methods (e.g., the Watch)
// thus, for symmetry, we haven't added additional checks for the WatchList method.
w , err := r . Watch ( ctx )
if err != nil {
return WatchListResult { err : err }
}
return r . handleWatchList ( ctx , w )
}
// handleWatchList holds the actual logic for easier unit testing.
// Note that this function will close the passed watch.
func ( r * Request ) handleWatchList ( ctx context . Context , w watch . Interface ) WatchListResult {
defer w . Stop ( )
var lastKey string
var items [ ] runtime . Object
for {
select {
case <- ctx . Done ( ) :
return WatchListResult { err : ctx . Err ( ) }
case event , ok := <- w . ResultChan ( ) :
if ! ok {
return WatchListResult { err : fmt . Errorf ( "unexpected watch close" ) }
}
if event . Type == watch . Error {
return WatchListResult { err : errors . FromObject ( event . Object ) }
}
meta , err := meta . Accessor ( event . Object )
if err != nil {
return WatchListResult { err : fmt . Errorf ( "failed to parse watch event: %#v" , event ) }
}
switch event . Type {
case watch . Added :
// the following check ensures that the response is ordered.
// earlier servers had a bug that caused them to not sort the output.
// in such cases, return an error which can trigger fallback logic.
key := objectKeyFromMeta ( meta )
if len ( lastKey ) > 0 && lastKey > key {
return WatchListResult { err : fmt . Errorf ( "cannot add the obj (%#v) with the key = %s, as it violates the ordering guarantees provided by the watchlist feature in beta phase, lastInsertedKey was = %s" , event . Object , key , lastKey ) }
}
items = append ( items , event . Object )
lastKey = key
case watch . Bookmark :
if meta . GetAnnotations ( ) [ metav1 . InitialEventsAnnotationKey ] == "true" {
return WatchListResult {
items : items ,
initialEventsEndBookmarkRV : meta . GetResourceVersion ( ) ,
gv : r . c . content . GroupVersion ,
}
}
default :
return WatchListResult { err : fmt . Errorf ( "unexpected watch event %#v, expected to only receive watch.Added and watch.Bookmark events" , event ) }
}
}
}
}
2021-08-09 07:19:24 +00:00
func ( r * Request ) newStreamWatcher ( resp * http . Response ) ( watch . Interface , error ) {
2020-01-14 10:38:55 +00:00
contentType := resp . Header . Get ( "Content-Type" )
mediaType , params , err := mime . ParseMediaType ( contentType )
if err != nil {
klog . V ( 4 ) . Infof ( "Unexpected content type from the server: %q: %v" , contentType , err )
}
objectDecoder , streamingSerializer , framer , err := r . c . content . Negotiator . StreamDecoder ( mediaType , params )
if err != nil {
return nil , err
}
2020-12-17 12:28:29 +00:00
handleWarnings ( resp . Header , r . warningHandler )
2020-01-14 10:38:55 +00:00
frameReader := framer . NewFrameReader ( resp . Body )
watchEventDecoder := streaming . NewDecoder ( frameReader , streamingSerializer )
2019-06-24 09:08:09 +00:00
return watch . NewStreamWatcher (
2020-01-14 10:38:55 +00:00
restclientwatch . NewDecoder ( watchEventDecoder , objectDecoder ) ,
2019-06-24 09:08:09 +00:00
// use 500 to indicate that the cause of the error is unknown - other error codes
// are more specific to HTTP interactions, and set a reason
errors . NewClientErrorReporter ( http . StatusInternalServerError , r . verb , "ClientWatchDecoding" ) ,
) , nil
2018-12-19 14:29:25 +00:00
}
2023-06-01 16:58:10 +00:00
// updateRequestResultMetric increments the RequestResult metric counter,
// it should be called with the (response, err) tuple from the final
// reply from the server.
func updateRequestResultMetric ( ctx context . Context , req * Request , resp * http . Response , err error ) {
code , host := sanitize ( req , resp , err )
metrics . RequestResult . Increment ( ctx , code , req . verb , host )
}
// updateRequestRetryMetric increments the RequestRetry metric counter,
// it should be called with the (response, err) tuple for each retry
// except for the final attempt.
func updateRequestRetryMetric ( ctx context . Context , req * Request , resp * http . Response , err error ) {
code , host := sanitize ( req , resp , err )
metrics . RequestRetry . IncrementRetry ( ctx , code , req . verb , host )
}
func sanitize ( req * Request , resp * http . Response , err error ) ( string , string ) {
host := "none"
2020-01-14 10:38:55 +00:00
if req . c . base != nil {
2023-06-01 16:58:10 +00:00
host = req . c . base . Host
2018-12-19 14:29:25 +00:00
}
// Errors can be arbitrary strings. Unbound label cardinality is not suitable for a metric
// system so we just report them as `<error>`.
2023-06-01 16:58:10 +00:00
code := "<error>"
if resp != nil {
code = strconv . Itoa ( resp . StatusCode )
2018-12-19 14:29:25 +00:00
}
2023-06-01 16:58:10 +00:00
return code , host
2018-12-19 14:29:25 +00:00
}
// Stream formats and executes the request, and offers streaming of the response.
// Returns io.ReadCloser which could be used for streaming of the response, or an error
// Any non-2xx http status code causes an error. If we get a non-2xx code, we try to convert the body into an APIStatus object.
// If we can, we return that as an error. Otherwise, we create an error that lists the http status and the content of the response.
2020-04-14 07:04:33 +00:00
func ( r * Request ) Stream ( ctx context . Context ) ( io . ReadCloser , error ) {
2018-12-19 14:29:25 +00:00
if r . err != nil {
return nil , r . err
}
2020-04-14 07:04:33 +00:00
if err := r . tryThrottle ( ctx ) ; err != nil {
2020-01-14 10:38:55 +00:00
return nil , err
}
2018-12-19 14:29:25 +00:00
2020-01-14 10:38:55 +00:00
client := r . c . Client
2018-12-19 14:29:25 +00:00
if client == nil {
client = http . DefaultClient
}
2021-08-09 07:19:24 +00:00
2022-05-05 02:47:06 +00:00
retry := r . retryFn ( r . maxRetries )
2021-08-09 07:19:24 +00:00
url := r . URL ( ) . String ( )
for {
2022-05-05 02:47:06 +00:00
if err := retry . Before ( ctx , r ) ; err != nil {
return nil , err
}
2021-08-09 07:19:24 +00:00
req , err := r . newHTTPRequest ( ctx )
2018-12-19 14:29:25 +00:00
if err != nil {
2021-08-09 07:19:24 +00:00
return nil , err
}
resp , err := client . Do ( req )
2022-05-05 02:47:06 +00:00
retry . After ( ctx , r , resp , err )
2021-08-09 07:19:24 +00:00
if err != nil {
// we only retry on an HTTP response with 'Retry-After' header
return nil , err
}
2018-12-19 14:29:25 +00:00
2021-08-09 07:19:24 +00:00
switch {
case ( resp . StatusCode >= 200 ) && ( resp . StatusCode < 300 ) :
handleWarnings ( resp . Header , r . warningHandler )
return resp . Body , nil
default :
done , transformErr := func ( ) ( bool , error ) {
defer resp . Body . Close ( )
2022-05-05 02:47:06 +00:00
if retry . IsNextRetry ( ctx , r , req , resp , err , neverRetryError ) {
return false , nil
2021-08-09 07:19:24 +00:00
}
result := r . transformResponse ( resp , req )
if err := result . Error ( ) ; err != nil {
return true , err
}
return true , fmt . Errorf ( "%d while accessing %v: %s" , result . statusCode , url , string ( result . body ) )
} ( )
if done {
return nil , transformErr
}
2018-12-19 14:29:25 +00:00
}
}
}
2020-01-14 10:38:55 +00:00
// requestPreflightCheck looks for common programmer errors on Request.
//
// We tackle here two programmer mistakes. The first one is to try to create
// something(POST) using an empty string as namespace with namespaceSet as
// true. If namespaceSet is true then namespace should also be defined. The
// second mistake is, when under the same circumstances, the programmer tries
// to GET, PUT or DELETE a named resource(resourceName != ""), again, if
// namespaceSet is true then namespace must not be empty.
func ( r * Request ) requestPreflightCheck ( ) error {
if ! r . namespaceSet {
return nil
}
if len ( r . namespace ) > 0 {
return nil
}
switch r . verb {
case "POST" :
return fmt . Errorf ( "an empty namespace may not be set during creation" )
case "GET" , "PUT" , "DELETE" :
if len ( r . resourceName ) > 0 {
return fmt . Errorf ( "an empty namespace may not be set when a resource name is provided" )
}
}
return nil
}
2021-08-09 07:19:24 +00:00
func ( r * Request ) newHTTPRequest ( ctx context . Context ) ( * http . Request , error ) {
2023-02-01 17:06:36 +00:00
var body io . Reader
switch {
case r . body != nil && r . bodyBytes != nil :
return nil , fmt . Errorf ( "cannot set both body and bodyBytes" )
case r . body != nil :
body = r . body
case r . bodyBytes != nil :
// Create a new reader specifically for this request.
// Giving each request a dedicated reader allows retries to avoid races resetting the request body.
body = bytes . NewReader ( r . bodyBytes )
}
2021-08-09 07:19:24 +00:00
url := r . URL ( ) . String ( )
2023-08-17 05:15:28 +00:00
req , err := http . NewRequestWithContext ( httptrace . WithClientTrace ( ctx , newDNSMetricsTrace ( ctx ) ) , r . verb , url , body )
2021-08-09 07:19:24 +00:00
if err != nil {
return nil , err
}
req . Header = r . headers
return req , nil
}
2023-08-17 05:15:28 +00:00
// newDNSMetricsTrace returns an HTTP trace that tracks time spent on DNS lookups per host.
// This metric is available in client as "rest_client_dns_resolution_duration_seconds".
func newDNSMetricsTrace ( ctx context . Context ) * httptrace . ClientTrace {
type dnsMetric struct {
start time . Time
host string
sync . Mutex
}
dns := & dnsMetric { }
return & httptrace . ClientTrace {
DNSStart : func ( info httptrace . DNSStartInfo ) {
dns . Lock ( )
defer dns . Unlock ( )
dns . start = time . Now ( )
dns . host = info . Host
} ,
DNSDone : func ( info httptrace . DNSDoneInfo ) {
dns . Lock ( )
defer dns . Unlock ( )
metrics . ResolverLatency . Observe ( ctx , dns . host , time . Since ( dns . start ) )
} ,
}
}
2018-12-19 14:29:25 +00:00
// request connects to the server and invokes the provided function when a server response is
// received. It handles retry behavior and up front validation of requests. It will invoke
// fn at most once. It will return an error if a problem occurred prior to connecting to the
// server - the provided function is responsible for handling server errors.
2020-04-14 07:04:33 +00:00
func ( r * Request ) request ( ctx context . Context , fn func ( * http . Request , * http . Response ) ) error {
2022-08-24 02:24:25 +00:00
// Metrics for total request latency
2018-12-19 14:29:25 +00:00
start := time . Now ( )
defer func ( ) {
2022-11-01 09:43:55 +00:00
metrics . RequestLatency . Observe ( ctx , r . verb , r . finalURLTemplate ( ) , time . Since ( start ) )
2018-12-19 14:29:25 +00:00
} ( )
if r . err != nil {
2019-01-15 16:20:41 +00:00
klog . V ( 4 ) . Infof ( "Error in request: %v" , r . err )
2018-12-19 14:29:25 +00:00
return r . err
}
2020-01-14 10:38:55 +00:00
if err := r . requestPreflightCheck ( ) ; err != nil {
return err
2018-12-19 14:29:25 +00:00
}
2020-01-14 10:38:55 +00:00
client := r . c . Client
2018-12-19 14:29:25 +00:00
if client == nil {
client = http . DefaultClient
}
2020-04-14 07:04:33 +00:00
// Throttle the first try before setting up the timeout configured on the
// client. We don't want a throttled client to return timeouts to callers
// before it makes a single request.
if err := r . tryThrottle ( ctx ) ; err != nil {
return err
}
if r . timeout > 0 {
var cancel context . CancelFunc
ctx , cancel = context . WithTimeout ( ctx , r . timeout )
defer cancel ( )
}
2022-05-05 02:47:06 +00:00
isErrRetryableFunc := func ( req * http . Request , err error ) bool {
// "Connection reset by peer" or "apiserver is shutting down" are usually a transient errors.
// Thus in case of "GET" operations, we simply retry it.
// We are not automatically retrying "write" operations, as they are not idempotent.
if req . Method != "GET" {
return false
}
// For connection errors and apiserver shutdown errors retry.
if net . IsConnectionReset ( err ) || net . IsProbableEOF ( err ) {
return true
}
return false
}
2018-12-19 14:29:25 +00:00
// Right now we make about ten retry attempts if we get a Retry-After response.
2022-05-05 02:47:06 +00:00
retry := r . retryFn ( r . maxRetries )
2018-12-19 14:29:25 +00:00
for {
2022-05-05 02:47:06 +00:00
if err := retry . Before ( ctx , r ) ; err != nil {
return retry . WrapPreviousError ( err )
}
2021-08-09 07:19:24 +00:00
req , err := r . newHTTPRequest ( ctx )
2018-12-19 14:29:25 +00:00
if err != nil {
return err
}
resp , err := client . Do ( req )
2022-05-05 02:47:06 +00:00
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
// https://pkg.go.dev/net/http#Request
if req . ContentLength >= 0 && ! ( req . Body != nil && req . ContentLength == 0 ) {
metrics . RequestSize . Observe ( ctx , r . verb , r . URL ( ) . Host , float64 ( req . ContentLength ) )
2018-12-19 14:29:25 +00:00
}
2022-05-05 02:47:06 +00:00
retry . After ( ctx , r , resp , err )
2018-12-19 14:29:25 +00:00
done := func ( ) bool {
2021-08-09 07:19:24 +00:00
defer readAndCloseResponseBody ( resp )
2018-12-19 14:29:25 +00:00
2022-08-24 02:24:25 +00:00
// if the server returns an error in err, the response will be nil.
2021-08-09 07:19:24 +00:00
f := func ( req * http . Request , resp * http . Response ) {
if resp == nil {
return
2018-12-19 14:29:25 +00:00
}
2021-08-09 07:19:24 +00:00
fn ( req , resp )
}
2018-12-19 14:29:25 +00:00
2022-05-05 02:47:06 +00:00
if retry . IsNextRetry ( ctx , r , req , resp , err , isErrRetryableFunc ) {
2018-12-19 14:29:25 +00:00
return false
}
2021-08-09 07:19:24 +00:00
f ( req , resp )
2018-12-19 14:29:25 +00:00
return true
} ( )
if done {
2022-05-05 02:47:06 +00:00
return retry . WrapPreviousError ( err )
2018-12-19 14:29:25 +00:00
}
}
}
// Do formats and executes the request. Returns a Result object for easy response
// processing.
//
// Error type:
2022-08-24 02:24:25 +00:00
// - If the server responds with a status: *errors.StatusError or *errors.UnexpectedObjectError
// - http.Client.Do errors are returned directly.
2020-04-14 07:04:33 +00:00
func ( r * Request ) Do ( ctx context . Context ) Result {
2018-12-19 14:29:25 +00:00
var result Result
2020-04-14 07:04:33 +00:00
err := r . request ( ctx , func ( req * http . Request , resp * http . Response ) {
2018-12-19 14:29:25 +00:00
result = r . transformResponse ( resp , req )
} )
if err != nil {
return Result { err : err }
}
2022-05-05 02:47:06 +00:00
if result . err == nil || len ( result . body ) > 0 {
metrics . ResponseSize . Observe ( ctx , r . verb , r . URL ( ) . Host , float64 ( len ( result . body ) ) )
}
2018-12-19 14:29:25 +00:00
return result
}
// DoRaw executes the request but does not process the response body.
2020-04-14 07:04:33 +00:00
func ( r * Request ) DoRaw ( ctx context . Context ) ( [ ] byte , error ) {
2018-12-19 14:29:25 +00:00
var result Result
2020-04-14 07:04:33 +00:00
err := r . request ( ctx , func ( req * http . Request , resp * http . Response ) {
2023-02-01 17:06:36 +00:00
result . body , result . err = io . ReadAll ( resp . Body )
2018-12-19 14:29:25 +00:00
glogBody ( "Response Body" , result . body )
if resp . StatusCode < http . StatusOK || resp . StatusCode > http . StatusPartialContent {
result . err = r . transformUnstructuredResponseError ( resp , req , result . body )
}
} )
if err != nil {
return nil , err
}
2022-05-05 02:47:06 +00:00
if result . err == nil || len ( result . body ) > 0 {
metrics . ResponseSize . Observe ( ctx , r . verb , r . URL ( ) . Host , float64 ( len ( result . body ) ) )
}
2018-12-19 14:29:25 +00:00
return result . body , result . err
}
// transformResponse converts an API response into a structured API object
func ( r * Request ) transformResponse ( resp * http . Response , req * http . Request ) Result {
var body [ ] byte
if resp . Body != nil {
2023-02-01 17:06:36 +00:00
data , err := io . ReadAll ( resp . Body )
2018-12-19 14:29:25 +00:00
switch err . ( type ) {
case nil :
body = data
case http2 . StreamError :
// This is trying to catch the scenario that the server may close the connection when sending the
// response body. This can be caused by server timeout due to a slow network connection.
// TODO: Add test for this. Steps may be:
// 1. client-go (or kubectl) sends a GET request.
// 2. Apiserver sends back the headers and then part of the body
// 3. Apiserver closes connection.
// 4. client-go should catch this and return an error.
2019-01-15 16:20:41 +00:00
klog . V ( 2 ) . Infof ( "Stream error %#v when reading response body, may be caused by closed connection." , err )
2021-08-09 07:19:24 +00:00
streamErr := fmt . Errorf ( "stream error when reading response body, may be caused by closed connection. Please retry. Original error: %w" , err )
2018-12-19 14:29:25 +00:00
return Result {
err : streamErr ,
}
default :
2019-06-24 09:08:09 +00:00
klog . Errorf ( "Unexpected error when reading response body: %v" , err )
2021-08-09 07:19:24 +00:00
unexpectedErr := fmt . Errorf ( "unexpected error when reading response body. Please retry. Original error: %w" , err )
2018-12-19 14:29:25 +00:00
return Result {
err : unexpectedErr ,
}
}
}
glogBody ( "Response Body" , body )
// verify the content type is accurate
2020-01-14 10:38:55 +00:00
var decoder runtime . Decoder
2018-12-19 14:29:25 +00:00
contentType := resp . Header . Get ( "Content-Type" )
2020-01-14 10:38:55 +00:00
if len ( contentType ) == 0 {
contentType = r . c . content . ContentType
}
if len ( contentType ) > 0 {
var err error
2018-12-19 14:29:25 +00:00
mediaType , params , err := mime . ParseMediaType ( contentType )
if err != nil {
return Result { err : errors . NewInternalError ( err ) }
}
2020-01-14 10:38:55 +00:00
decoder , err = r . c . content . Negotiator . Decoder ( mediaType , params )
2018-12-19 14:29:25 +00:00
if err != nil {
// if we fail to negotiate a decoder, treat this as an unstructured error
switch {
case resp . StatusCode == http . StatusSwitchingProtocols :
// no-op, we've been upgraded
case resp . StatusCode < http . StatusOK || resp . StatusCode > http . StatusPartialContent :
return Result { err : r . transformUnstructuredResponseError ( resp , req , body ) }
}
return Result {
body : body ,
contentType : contentType ,
statusCode : resp . StatusCode ,
2020-12-17 12:28:29 +00:00
warnings : handleWarnings ( resp . Header , r . warningHandler ) ,
2018-12-19 14:29:25 +00:00
}
}
}
switch {
case resp . StatusCode == http . StatusSwitchingProtocols :
// no-op, we've been upgraded
case resp . StatusCode < http . StatusOK || resp . StatusCode > http . StatusPartialContent :
// calculate an unstructured error from the response which the Result object may use if the caller
// did not return a structured error.
retryAfter , _ := retryAfterSeconds ( resp )
err := r . newUnstructuredResponseError ( body , isTextResponse ( resp ) , resp . StatusCode , req . Method , retryAfter )
return Result {
body : body ,
contentType : contentType ,
statusCode : resp . StatusCode ,
decoder : decoder ,
err : err ,
2020-12-17 12:28:29 +00:00
warnings : handleWarnings ( resp . Header , r . warningHandler ) ,
2018-12-19 14:29:25 +00:00
}
}
return Result {
body : body ,
contentType : contentType ,
statusCode : resp . StatusCode ,
decoder : decoder ,
2020-12-17 12:28:29 +00:00
warnings : handleWarnings ( resp . Header , r . warningHandler ) ,
2018-12-19 14:29:25 +00:00
}
}
// truncateBody decides if the body should be truncated, based on the glog Verbosity.
func truncateBody ( body string ) string {
max := 0
switch {
2020-12-17 12:28:29 +00:00
case bool ( klog . V ( 10 ) . Enabled ( ) ) :
2018-12-19 14:29:25 +00:00
return body
2020-12-17 12:28:29 +00:00
case bool ( klog . V ( 9 ) . Enabled ( ) ) :
2018-12-19 14:29:25 +00:00
max = 10240
2020-12-17 12:28:29 +00:00
case bool ( klog . V ( 8 ) . Enabled ( ) ) :
2018-12-19 14:29:25 +00:00
max = 1024
}
if len ( body ) <= max {
return body
}
return body [ : max ] + fmt . Sprintf ( " [truncated %d chars]" , len ( body ) - max )
}
// glogBody logs a body output that could be either JSON or protobuf. It explicitly guards against
// allocating a new string for the body output unless necessary. Uses a simple heuristic to determine
// whether the body is printable.
func glogBody ( prefix string , body [ ] byte ) {
2022-05-05 02:47:06 +00:00
if klogV := klog . V ( 8 ) ; klogV . Enabled ( ) {
2018-12-19 14:29:25 +00:00
if bytes . IndexFunc ( body , func ( r rune ) bool {
return r < 0x0a
} ) != - 1 {
2022-05-05 02:47:06 +00:00
klogV . Infof ( "%s:\n%s" , prefix , truncateBody ( hex . Dump ( body ) ) )
2018-12-19 14:29:25 +00:00
} else {
2022-05-05 02:47:06 +00:00
klogV . Infof ( "%s: %s" , prefix , truncateBody ( string ( body ) ) )
2018-12-19 14:29:25 +00:00
}
}
}
// maxUnstructuredResponseTextBytes is an upper bound on how much output to include in the unstructured error.
const maxUnstructuredResponseTextBytes = 2048
// transformUnstructuredResponseError handles an error from the server that is not in a structured form.
// It is expected to transform any response that is not recognizable as a clear server sent error from the
// K8S API using the information provided with the request. In practice, HTTP proxies and client libraries
// introduce a level of uncertainty to the responses returned by servers that in common use result in
// unexpected responses. The rough structure is:
//
// 1. Assume the server sends you something sane - JSON + well defined error objects + proper codes
2022-08-24 02:24:25 +00:00
// - this is the happy path
// - when you get this output, trust what the server sends
// 2. Guard against empty fields / bodies in received JSON and attempt to cull sufficient info from them to
// generate a reasonable facsimile of the original failure.
// - Be sure to use a distinct error type or flag that allows a client to distinguish between this and error 1 above
// 3. Handle true disconnect failures / completely malformed data by moving up to a more generic client error
// 4. Distinguish between various connection failures like SSL certificates, timeouts, proxy errors, unexpected
// initial contact, the presence of mismatched body contents from posted content types
// - Give these a separate distinct error type and capture as much as possible of the original message
2018-12-19 14:29:25 +00:00
//
// TODO: introduce transformation of generic http.Client.Do() errors that separates 4.
func ( r * Request ) transformUnstructuredResponseError ( resp * http . Response , req * http . Request , body [ ] byte ) error {
if body == nil && resp . Body != nil {
2023-02-01 17:06:36 +00:00
if data , err := io . ReadAll ( & io . LimitedReader { R : resp . Body , N : maxUnstructuredResponseTextBytes } ) ; err == nil {
2018-12-19 14:29:25 +00:00
body = data
}
}
retryAfter , _ := retryAfterSeconds ( resp )
return r . newUnstructuredResponseError ( body , isTextResponse ( resp ) , resp . StatusCode , req . Method , retryAfter )
}
// newUnstructuredResponseError instantiates the appropriate generic error for the provided input. It also logs the body.
func ( r * Request ) newUnstructuredResponseError ( body [ ] byte , isTextResponse bool , statusCode int , method string , retryAfter int ) error {
// cap the amount of output we create
if len ( body ) > maxUnstructuredResponseTextBytes {
body = body [ : maxUnstructuredResponseTextBytes ]
}
message := "unknown"
if isTextResponse {
message = strings . TrimSpace ( string ( body ) )
}
var groupResource schema . GroupResource
if len ( r . resource ) > 0 {
2020-01-14 10:38:55 +00:00
groupResource . Group = r . c . content . GroupVersion . Group
2018-12-19 14:29:25 +00:00
groupResource . Resource = r . resource
}
return errors . NewGenericServerResponse (
statusCode ,
method ,
groupResource ,
r . resourceName ,
message ,
retryAfter ,
true ,
)
}
// isTextResponse returns true if the response appears to be a textual media type.
func isTextResponse ( resp * http . Response ) bool {
contentType := resp . Header . Get ( "Content-Type" )
if len ( contentType ) == 0 {
return true
}
media , _ , err := mime . ParseMediaType ( contentType )
if err != nil {
return false
}
return strings . HasPrefix ( media , "text/" )
}
// retryAfterSeconds returns the value of the Retry-After header and true, or 0 and false if
// the header was missing or not a valid number.
func retryAfterSeconds ( resp * http . Response ) ( int , bool ) {
if h := resp . Header . Get ( "Retry-After" ) ; len ( h ) > 0 {
if i , err := strconv . Atoi ( h ) ; err == nil {
return i , true
}
}
return 0 , false
}
// Result contains the result of calling Request.Do().
type Result struct {
body [ ] byte
2020-12-17 12:28:29 +00:00
warnings [ ] net . WarningHeader
2018-12-19 14:29:25 +00:00
contentType string
err error
statusCode int
decoder runtime . Decoder
}
// Raw returns the raw result.
func ( r Result ) Raw ( ) ( [ ] byte , error ) {
return r . body , r . err
}
// Get returns the result as an object, which means it passes through the decoder.
// If the returned object is of type Status and has .Status != StatusSuccess, the
// additional information in Status will be used to enrich the error.
func ( r Result ) Get ( ) ( runtime . Object , error ) {
if r . err != nil {
// Check whether the result has a Status object in the body and prefer that.
return nil , r . Error ( )
}
if r . decoder == nil {
return nil , fmt . Errorf ( "serializer for %s doesn't exist" , r . contentType )
}
// decode, but if the result is Status return that as an error instead.
out , _ , err := r . decoder . Decode ( r . body , nil , nil )
if err != nil {
return nil , err
}
switch t := out . ( type ) {
case * metav1 . Status :
// any status besides StatusSuccess is considered an error.
if t . Status != metav1 . StatusSuccess {
return nil , errors . FromObject ( t )
}
}
return out , nil
}
// StatusCode returns the HTTP status code of the request. (Only valid if no
// error was returned.)
func ( r Result ) StatusCode ( statusCode * int ) Result {
* statusCode = r . statusCode
return r
}
2023-02-01 17:06:36 +00:00
// ContentType returns the "Content-Type" response header into the passed
// string, returning the Result for possible chaining. (Only valid if no
// error code was returned.)
func ( r Result ) ContentType ( contentType * string ) Result {
* contentType = r . contentType
return r
}
2018-12-19 14:29:25 +00:00
// Into stores the result into obj, if possible. If obj is nil it is ignored.
// If the returned object is of type Status and has .Status != StatusSuccess, the
// additional information in Status will be used to enrich the error.
func ( r Result ) Into ( obj runtime . Object ) error {
if r . err != nil {
// Check whether the result has a Status object in the body and prefer that.
return r . Error ( )
}
if r . decoder == nil {
return fmt . Errorf ( "serializer for %s doesn't exist" , r . contentType )
}
if len ( r . body ) == 0 {
2019-04-03 07:57:13 +00:00
return fmt . Errorf ( "0-length response with status code: %d and content type: %s" ,
r . statusCode , r . contentType )
2018-12-19 14:29:25 +00:00
}
out , _ , err := r . decoder . Decode ( r . body , nil , obj )
if err != nil || out == obj {
return err
}
// if a different object is returned, see if it is Status and avoid double decoding
// the object.
switch t := out . ( type ) {
case * metav1 . Status :
// any status besides StatusSuccess is considered an error.
if t . Status != metav1 . StatusSuccess {
return errors . FromObject ( t )
}
}
return nil
}
// WasCreated updates the provided bool pointer to whether the server returned
// 201 created or a different response.
func ( r Result ) WasCreated ( wasCreated * bool ) Result {
* wasCreated = r . statusCode == http . StatusCreated
return r
}
// Error returns the error executing the request, nil if no error occurred.
// If the returned object is of type Status and has Status != StatusSuccess, the
// additional information in Status will be used to enrich the error.
// See the Request.Do() comment for what errors you might get.
func ( r Result ) Error ( ) error {
// if we have received an unexpected server error, and we have a body and decoder, we can try to extract
// a Status object.
if r . err == nil || ! errors . IsUnexpectedServerError ( r . err ) || len ( r . body ) == 0 || r . decoder == nil {
return r . err
}
// attempt to convert the body into a Status object
// to be backwards compatible with old servers that do not return a version, default to "v1"
out , _ , err := r . decoder . Decode ( r . body , & schema . GroupVersionKind { Version : "v1" } , nil )
if err != nil {
2019-01-15 16:20:41 +00:00
klog . V ( 5 ) . Infof ( "body was not decodable (unable to check for Status): %v" , err )
2018-12-19 14:29:25 +00:00
return r . err
}
switch t := out . ( type ) {
case * metav1 . Status :
// because we default the kind, we *must* check for StatusFailure
if t . Status == metav1 . StatusFailure {
return errors . FromObject ( t )
}
}
return r . err
}
2020-12-17 12:28:29 +00:00
// Warnings returns any warning headers received in the response
func ( r Result ) Warnings ( ) [ ] net . WarningHeader {
return r . warnings
}
2018-12-19 14:29:25 +00:00
// NameMayNotBe specifies strings that cannot be used as names specified as path segments (like the REST API or etcd store)
var NameMayNotBe = [ ] string { "." , ".." }
// NameMayNotContain specifies substrings that cannot be used in names specified as path segments (like the REST API or etcd store)
var NameMayNotContain = [ ] string { "/" , "%" }
// IsValidPathSegmentName validates the name can be safely encoded as a path segment
func IsValidPathSegmentName ( name string ) [ ] string {
for _ , illegalName := range NameMayNotBe {
if name == illegalName {
return [ ] string { fmt . Sprintf ( ` may not be '%s' ` , illegalName ) }
}
}
var errors [ ] string
for _ , illegalContent := range NameMayNotContain {
if strings . Contains ( name , illegalContent ) {
errors = append ( errors , fmt . Sprintf ( ` may not contain '%s' ` , illegalContent ) )
}
}
return errors
}
// IsValidPathSegmentPrefix validates the name can be used as a prefix for a name which will be encoded as a path segment
// It does not check for exact matches with disallowed names, since an arbitrary suffix might make the name valid
func IsValidPathSegmentPrefix ( name string ) [ ] string {
var errors [ ] string
for _ , illegalContent := range NameMayNotContain {
if strings . Contains ( name , illegalContent ) {
errors = append ( errors , fmt . Sprintf ( ` may not contain '%s' ` , illegalContent ) )
}
}
return errors
}
// ValidatePathSegmentName validates the name can be safely encoded as a path segment
func ValidatePathSegmentName ( name string , prefix bool ) [ ] string {
if prefix {
return IsValidPathSegmentPrefix ( name )
}
2019-04-03 07:57:13 +00:00
return IsValidPathSegmentName ( name )
2018-12-19 14:29:25 +00:00
}
2024-08-19 08:01:33 +00:00
func objectKeyFromMeta ( objMeta metav1 . Object ) string {
if len ( objMeta . GetNamespace ( ) ) > 0 {
return fmt . Sprintf ( "%s/%s" , objMeta . GetNamespace ( ) , objMeta . GetName ( ) )
}
return objMeta . GetName ( )
}