2023-05-29 21:03:29 +00:00
/ *
Copyright 2014 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package handlers
import (
2024-05-15 06:54:18 +00:00
"context"
2023-05-29 21:03:29 +00:00
"fmt"
2024-05-15 06:54:18 +00:00
"io"
2023-05-29 21:03:29 +00:00
"net/http"
"time"
"golang.org/x/net/websocket"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
2023-08-17 05:15:28 +00:00
"k8s.io/apimachinery/pkg/util/httpstream/wsstream"
2023-05-29 21:03:29 +00:00
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
2023-06-01 16:58:10 +00:00
apirequest "k8s.io/apiserver/pkg/endpoints/request"
2023-12-20 12:23:59 +00:00
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage"
utilfeature "k8s.io/apiserver/pkg/util/feature"
2023-05-29 21:03:29 +00:00
)
// nothing will ever be sent down this channel
var neverExitWatch <- chan time . Time = make ( chan time . Time )
// timeoutFactory abstracts watch timeout logic for testing
type TimeoutFactory interface {
TimeoutCh ( ) ( <- chan time . Time , func ( ) bool )
}
// realTimeoutFactory implements timeoutFactory
type realTimeoutFactory struct {
timeout time . Duration
}
// TimeoutCh returns a channel which will receive something when the watch times out,
// and a cleanup function to call when this happens.
func ( w * realTimeoutFactory ) TimeoutCh ( ) ( <- chan time . Time , func ( ) bool ) {
if w . timeout == 0 {
return neverExitWatch , func ( ) bool { return false }
}
t := time . NewTimer ( w . timeout )
return t . C , t . Stop
}
2024-05-15 06:54:18 +00:00
// serveWatchHandler returns a handle to serve a watch response.
2023-05-29 21:03:29 +00:00
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
2024-05-15 06:54:18 +00:00
func serveWatchHandler ( watcher watch . Interface , scope * RequestScope , mediaTypeOptions negotiation . MediaTypeOptions , req * http . Request , w http . ResponseWriter , timeout time . Duration , metricsScope string ) ( http . Handler , error ) {
2023-05-29 21:03:29 +00:00
options , err := optionsForTransform ( mediaTypeOptions , req )
if err != nil {
2024-05-15 06:54:18 +00:00
return nil , err
2023-05-29 21:03:29 +00:00
}
// negotiate for the stream serializer from the scope's serializer
serializer , err := negotiation . NegotiateOutputMediaTypeStream ( req , scope . Serializer , scope )
if err != nil {
2024-05-15 06:54:18 +00:00
return nil , err
2023-05-29 21:03:29 +00:00
}
framer := serializer . StreamSerializer . Framer
streamSerializer := serializer . StreamSerializer . Serializer
encoder := scope . Serializer . EncoderForVersion ( streamSerializer , scope . Kind . GroupVersion ( ) )
useTextFraming := serializer . EncodesAsText
if framer == nil {
2024-05-15 06:54:18 +00:00
return nil , fmt . Errorf ( "no framer defined for %q available for embedded encoding" , serializer . MediaType )
2023-05-29 21:03:29 +00:00
}
// TODO: next step, get back mediaTypeOptions from negotiate and return the exact value here
mediaType := serializer . MediaType
if mediaType != runtime . ContentTypeJSON {
mediaType += ";stream=watch"
}
2023-12-20 12:23:59 +00:00
ctx := req . Context ( )
2023-05-29 21:03:29 +00:00
// locate the appropriate embedded encoder based on the transform
var embeddedEncoder runtime . Encoder
contentKind , contentSerializer , transform := targetEncodingForTransform ( scope , mediaTypeOptions , req )
if transform {
info , ok := runtime . SerializerInfoForMediaType ( contentSerializer . SupportedMediaTypes ( ) , serializer . MediaType )
if ! ok {
2024-05-15 06:54:18 +00:00
return nil , fmt . Errorf ( "no encoder for %q exists in the requested target %#v" , serializer . MediaType , contentSerializer )
2023-05-29 21:03:29 +00:00
}
embeddedEncoder = contentSerializer . EncoderForVersion ( info . Serializer , contentKind . GroupVersion ( ) )
} else {
embeddedEncoder = scope . Serializer . EncoderForVersion ( serializer . Serializer , contentKind . GroupVersion ( ) )
}
2023-12-20 12:23:59 +00:00
var memoryAllocator runtime . MemoryAllocator
if encoderWithAllocator , supportsAllocator := embeddedEncoder . ( runtime . EncoderWithAllocator ) ; supportsAllocator {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime . AllocatorPool . Get ( ) . ( * runtime . Allocator )
embeddedEncoder = runtime . NewEncoderWithAllocator ( encoderWithAllocator , memoryAllocator )
}
var tableOptions * metav1 . TableOptions
if options != nil {
if passedOptions , ok := options . ( * metav1 . TableOptions ) ; ok {
tableOptions = passedOptions
} else {
2024-05-15 06:54:18 +00:00
return nil , fmt . Errorf ( "unexpected options type: %T" , options )
2023-12-20 12:23:59 +00:00
}
}
embeddedEncoder = newWatchEmbeddedEncoder ( ctx , embeddedEncoder , mediaTypeOptions . Convert , tableOptions , scope )
if encoderWithAllocator , supportsAllocator := encoder . ( runtime . EncoderWithAllocator ) ; supportsAllocator {
if memoryAllocator == nil {
// don't put the allocator inside the embeddedEncodeFn as that would allocate memory on every call.
// instead, we allocate the buffer for the entire watch session and release it when we close the connection.
memoryAllocator = runtime . AllocatorPool . Get ( ) . ( * runtime . Allocator )
}
encoder = runtime . NewEncoderWithAllocator ( encoderWithAllocator , memoryAllocator )
}
2023-06-01 16:58:10 +00:00
var serverShuttingDownCh <- chan struct { }
if signals := apirequest . ServerShutdownSignalFrom ( req . Context ( ) ) ; signals != nil {
serverShuttingDownCh = signals . ShuttingDown ( )
}
2023-05-29 21:03:29 +00:00
server := & WatchServer {
Watching : watcher ,
Scope : scope ,
UseTextFraming : useTextFraming ,
MediaType : mediaType ,
Framer : framer ,
Encoder : encoder ,
EmbeddedEncoder : embeddedEncoder ,
2024-05-15 06:54:18 +00:00
MemoryAllocator : memoryAllocator ,
2023-06-01 16:58:10 +00:00
TimeoutFactory : & realTimeoutFactory { timeout } ,
ServerShuttingDownCh : serverShuttingDownCh ,
2023-12-20 12:23:59 +00:00
metricsScope : metricsScope ,
2023-05-29 21:03:29 +00:00
}
2024-05-15 06:54:18 +00:00
if wsstream . IsWebSocketRequest ( req ) {
w . Header ( ) . Set ( "Content-Type" , server . MediaType )
return websocket . Handler ( server . HandleWS ) , nil
}
return http . HandlerFunc ( server . HandleHTTP ) , nil
2023-05-29 21:03:29 +00:00
}
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
type WatchServer struct {
Watching watch . Interface
Scope * RequestScope
// true if websocket messages should use text framing (as opposed to binary framing)
UseTextFraming bool
// the media type this watch is being served with
MediaType string
// used to frame the watch stream
Framer runtime . Framer
// used to encode the watch stream event itself
Encoder runtime . Encoder
// used to encode the nested object in the watch stream
EmbeddedEncoder runtime . Encoder
2024-05-15 06:54:18 +00:00
MemoryAllocator runtime . MemoryAllocator
2023-06-01 16:58:10 +00:00
TimeoutFactory TimeoutFactory
ServerShuttingDownCh <- chan struct { }
2023-12-20 12:23:59 +00:00
metricsScope string
2023-05-29 21:03:29 +00:00
}
2024-05-15 06:54:18 +00:00
// HandleHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked.
2023-05-29 21:03:29 +00:00
// or over a websocket connection.
2024-05-15 06:54:18 +00:00
func ( s * WatchServer ) HandleHTTP ( w http . ResponseWriter , req * http . Request ) {
defer func ( ) {
if s . MemoryAllocator != nil {
runtime . AllocatorPool . Put ( s . MemoryAllocator )
}
} ( )
2023-05-29 21:03:29 +00:00
flusher , ok := w . ( http . Flusher )
if ! ok {
err := fmt . Errorf ( "unable to start watch - can't get http.Flusher: %#v" , w )
utilruntime . HandleError ( err )
s . Scope . err ( errors . NewInternalError ( err ) , w , req )
return
}
framer := s . Framer . NewFrameWriter ( w )
if framer == nil {
// programmer error
err := fmt . Errorf ( "no stream framing support is available for media type %q" , s . MediaType )
utilruntime . HandleError ( err )
s . Scope . err ( errors . NewBadRequest ( err . Error ( ) ) , w , req )
return
}
// ensure the connection times out
timeoutCh , cleanup := s . TimeoutFactory . TimeoutCh ( )
defer cleanup ( )
// begin the stream
w . Header ( ) . Set ( "Content-Type" , s . MediaType )
w . Header ( ) . Set ( "Transfer-Encoding" , "chunked" )
w . WriteHeader ( http . StatusOK )
flusher . Flush ( )
2024-05-15 06:54:18 +00:00
kind := s . Scope . Kind
2023-12-20 12:23:59 +00:00
watchEncoder := newWatchEncoder ( req . Context ( ) , kind , s . EmbeddedEncoder , s . Encoder , framer )
2023-05-29 21:03:29 +00:00
ch := s . Watching . ResultChan ( )
done := req . Context ( ) . Done ( )
for {
select {
2023-06-01 16:58:10 +00:00
case <- s . ServerShuttingDownCh :
// the server has signaled that it is shutting down (not accepting
// any new request), all active watch request(s) should return
// immediately here. The WithWatchTerminationDuringShutdown server
// filter will ensure that the response to the client is rate
// limited in order to avoid any thundering herd issue when the
// client(s) try to reestablish the WATCH on the other
// available apiserver instance(s).
return
2023-05-29 21:03:29 +00:00
case <- done :
return
case <- timeoutCh :
return
case event , ok := <- ch :
if ! ok {
// End of results.
return
}
metrics . WatchEvents . WithContext ( req . Context ( ) ) . WithLabelValues ( kind . Group , kind . Version , kind . Kind ) . Inc ( )
2023-12-20 12:23:59 +00:00
isWatchListLatencyRecordingRequired := shouldRecordWatchListLatency ( event )
2023-05-29 21:03:29 +00:00
2023-12-20 12:23:59 +00:00
if err := watchEncoder . Encode ( event ) ; err != nil {
utilruntime . HandleError ( err )
2023-05-29 21:03:29 +00:00
// client disconnect.
return
}
2023-12-20 12:23:59 +00:00
2023-05-29 21:03:29 +00:00
if len ( ch ) == 0 {
flusher . Flush ( )
}
2023-12-20 12:23:59 +00:00
if isWatchListLatencyRecordingRequired {
metrics . RecordWatchListLatency ( req . Context ( ) , s . Scope . Resource , s . metricsScope )
}
2023-05-29 21:03:29 +00:00
}
}
}
2024-05-15 06:54:18 +00:00
// HandleWS serves a series of encoded events over a websocket connection.
2023-05-29 21:03:29 +00:00
func ( s * WatchServer ) HandleWS ( ws * websocket . Conn ) {
2024-05-15 06:54:18 +00:00
defer func ( ) {
if s . MemoryAllocator != nil {
runtime . AllocatorPool . Put ( s . MemoryAllocator )
}
} ( )
2023-05-29 21:03:29 +00:00
defer ws . Close ( )
done := make ( chan struct { } )
2024-05-15 06:54:18 +00:00
// ensure the connection times out
timeoutCh , cleanup := s . TimeoutFactory . TimeoutCh ( )
defer cleanup ( )
2023-05-29 21:03:29 +00:00
go func ( ) {
defer utilruntime . HandleCrash ( )
// This blocks until the connection is closed.
// Client should not send anything.
wsstream . IgnoreReceives ( ws , 0 )
// Once the client closes, we should also close
close ( done )
} ( )
2024-05-15 06:54:18 +00:00
framer := newWebsocketFramer ( ws , s . UseTextFraming )
kind := s . Scope . Kind
watchEncoder := newWatchEncoder ( context . TODO ( ) , kind , s . EmbeddedEncoder , s . Encoder , framer )
2023-05-29 21:03:29 +00:00
ch := s . Watching . ResultChan ( )
for {
select {
case <- done :
return
2024-05-15 06:54:18 +00:00
case <- timeoutCh :
return
2023-05-29 21:03:29 +00:00
case event , ok := <- ch :
if ! ok {
// End of results.
return
}
2023-12-20 12:23:59 +00:00
2024-05-15 06:54:18 +00:00
if err := watchEncoder . Encode ( event ) ; err != nil {
utilruntime . HandleError ( err )
2023-05-29 21:03:29 +00:00
// client disconnect.
return
}
}
}
}
2023-12-20 12:23:59 +00:00
2024-05-15 06:54:18 +00:00
type websocketFramer struct {
ws * websocket . Conn
useTextFraming bool
}
func newWebsocketFramer ( ws * websocket . Conn , useTextFraming bool ) io . Writer {
return & websocketFramer {
ws : ws ,
useTextFraming : useTextFraming ,
}
}
func ( w * websocketFramer ) Write ( p [ ] byte ) ( int , error ) {
if w . useTextFraming {
// bytes.Buffer::String() has a special handling of nil value, but given
// we're writing serialized watch events, this will never happen here.
if err := websocket . Message . Send ( w . ws , string ( p ) ) ; err != nil {
return 0 , err
}
return len ( p ) , nil
}
if err := websocket . Message . Send ( w . ws , p ) ; err != nil {
return 0 , err
}
return len ( p ) , nil
}
var _ io . Writer = & websocketFramer { }
2023-12-20 12:23:59 +00:00
func shouldRecordWatchListLatency ( event watch . Event ) bool {
if event . Type != watch . Bookmark || ! utilfeature . DefaultFeatureGate . Enabled ( features . WatchList ) {
return false
}
// as of today the initial-events-end annotation is added only to a single event
// by the watch cache and only when certain conditions are met
//
// for more please read https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list
hasAnnotation , err := storage . HasInitialEventsEndBookmarkAnnotation ( event . Object )
if err != nil {
utilruntime . HandleError ( fmt . Errorf ( "unable to determine if the obj has the required annotation for measuring watchlist latency, obj %T: %v" , event . Object , err ) )
return false
}
return hasAnnotation
}