2020-04-14 07:04:33 +00:00
/ *
Copyright 2019 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package client
import (
"context"
"errors"
2021-06-25 04:59:51 +00:00
"fmt"
2020-04-14 07:04:33 +00:00
"io"
"math/rand"
"net"
"sync"
2022-11-01 09:43:55 +00:00
"sync/atomic"
2020-04-14 07:04:33 +00:00
"time"
"google.golang.org/grpc"
2020-12-17 12:28:29 +00:00
"k8s.io/klog/v2"
2020-04-14 07:04:33 +00:00
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
)
// Tunnel provides ability to dial a connection through a tunnel.
type Tunnel interface {
// Dial connects to the address on the named network, similar to
// what net.Dial does. The only supported protocol is tcp.
2022-08-24 02:24:25 +00:00
DialContext ( requestCtx context . Context , protocol , address string ) ( net . Conn , error )
2022-11-01 09:43:55 +00:00
// Done returns a channel that is closed when the tunnel is no longer serving any connections,
// and can no longer be used.
Done ( ) <- chan struct { }
2020-04-14 07:04:33 +00:00
}
type dialResult struct {
2022-11-01 09:43:55 +00:00
err * dialFailure
2020-04-14 07:04:33 +00:00
connid int64
}
2022-03-21 20:11:55 +00:00
type pendingDial struct {
// resultCh is the channel to send the dial result to
resultCh chan <- dialResult
// cancelCh is the channel closed when resultCh no longer has a receiver
cancelCh <- chan struct { }
}
2022-11-01 09:43:55 +00:00
// TODO: Replace with a generic implementation once it is safe to assume the client is built with go1.18+
type pendingDialManager struct {
pendingDials map [ int64 ] pendingDial
mutex sync . RWMutex
}
func ( p * pendingDialManager ) add ( dialID int64 , pd pendingDial ) {
p . mutex . Lock ( )
defer p . mutex . Unlock ( )
p . pendingDials [ dialID ] = pd
}
func ( p * pendingDialManager ) remove ( dialID int64 ) {
p . mutex . Lock ( )
defer p . mutex . Unlock ( )
delete ( p . pendingDials , dialID )
}
func ( p * pendingDialManager ) get ( dialID int64 ) ( pendingDial , bool ) {
p . mutex . RLock ( )
defer p . mutex . RUnlock ( )
pd , ok := p . pendingDials [ dialID ]
return pd , ok
}
// TODO: Replace with a generic implementation once it is safe to assume the client is built with go1.18+
type connectionManager struct {
conns map [ int64 ] * conn
mutex sync . RWMutex
}
func ( cm * connectionManager ) add ( connID int64 , c * conn ) {
cm . mutex . Lock ( )
defer cm . mutex . Unlock ( )
cm . conns [ connID ] = c
}
func ( cm * connectionManager ) remove ( connID int64 ) {
cm . mutex . Lock ( )
defer cm . mutex . Unlock ( )
delete ( cm . conns , connID )
}
func ( cm * connectionManager ) get ( connID int64 ) ( * conn , bool ) {
cm . mutex . RLock ( )
defer cm . mutex . RUnlock ( )
c , ok := cm . conns [ connID ]
return c , ok
}
func ( cm * connectionManager ) closeAll ( ) {
cm . mutex . Lock ( )
defer cm . mutex . Unlock ( )
for _ , conn := range cm . conns {
close ( conn . readCh )
}
}
2020-04-14 07:04:33 +00:00
// grpcTunnel implements Tunnel
type grpcTunnel struct {
2022-11-01 09:43:55 +00:00
stream client . ProxyService_ProxyClient
clientConn clientConn
pendingDial pendingDialManager
conns connectionManager
2021-06-25 04:59:51 +00:00
// The tunnel will be closed if the caller fails to read via conn.Read()
// more than readTimeoutSeconds after a packet has been received.
readTimeoutSeconds int
2022-11-01 09:43:55 +00:00
// The done channel is closed after the tunnel has cleaned up all connections and is no longer
// serving.
done chan struct { }
// closing is an atomic bool represented as a 0 or 1, and set to true when the tunnel is being closed.
// closing should only be accessed through atomic methods.
// TODO: switch this to an atomic.Bool once the client is exclusively buit with go1.19+
closing uint32
2020-04-14 07:04:33 +00:00
}
2020-12-17 12:28:29 +00:00
type clientConn interface {
Close ( ) error
}
var _ clientConn = & grpc . ClientConn { }
// CreateSingleUseGrpcTunnel creates a Tunnel to dial to a remote server through a
2020-04-14 07:04:33 +00:00
// gRPC based proxy service.
2020-12-17 12:28:29 +00:00
// Currently, a single tunnel supports a single connection, and the tunnel is closed when the connection is terminated
// The Dial() method of the returned tunnel should only be called once
2022-08-24 02:24:25 +00:00
// Deprecated 2022-06-07: use CreateSingleUseGrpcTunnelWithContext
func CreateSingleUseGrpcTunnel ( tunnelCtx context . Context , address string , opts ... grpc . DialOption ) ( Tunnel , error ) {
return CreateSingleUseGrpcTunnelWithContext ( context . TODO ( ) , tunnelCtx , address , opts ... )
}
// CreateSingleUseGrpcTunnelWithContext creates a Tunnel to dial to a remote server through a
// gRPC based proxy service.
// Currently, a single tunnel supports a single connection.
// The tunnel is normally closed when the connection is terminated.
// If createCtx is cancelled before tunnel creation, an error will be returned.
// If tunnelCtx is cancelled while the tunnel is still in use, the tunnel (and any in flight connections) will be closed.
// The Dial() method of the returned tunnel should only be called once
func CreateSingleUseGrpcTunnelWithContext ( createCtx , tunnelCtx context . Context , address string , opts ... grpc . DialOption ) ( Tunnel , error ) {
c , err := grpc . DialContext ( createCtx , address , opts ... )
2020-04-14 07:04:33 +00:00
if err != nil {
return nil , err
}
grpcClient := client . NewProxyServiceClient ( c )
2022-08-24 02:24:25 +00:00
stream , err := grpcClient . Proxy ( tunnelCtx )
2020-04-14 07:04:33 +00:00
if err != nil {
2022-03-21 20:11:55 +00:00
c . Close ( )
2020-04-14 07:04:33 +00:00
return nil , err
}
2022-11-01 09:43:55 +00:00
tunnel := newUnstartedTunnel ( stream , c )
2020-04-14 07:04:33 +00:00
2022-11-01 09:43:55 +00:00
go tunnel . serve ( tunnelCtx )
2020-04-14 07:04:33 +00:00
return tunnel , nil
}
2022-11-01 09:43:55 +00:00
func newUnstartedTunnel ( stream client . ProxyService_ProxyClient , c clientConn ) * grpcTunnel {
return & grpcTunnel {
stream : stream ,
clientConn : c ,
pendingDial : pendingDialManager { pendingDials : make ( map [ int64 ] pendingDial ) } ,
conns : connectionManager { conns : make ( map [ int64 ] * conn ) } ,
readTimeoutSeconds : 10 ,
done : make ( chan struct { } ) ,
}
}
func ( t * grpcTunnel ) serve ( tunnelCtx context . Context ) {
2022-08-24 02:24:25 +00:00
defer func ( ) {
2022-11-01 09:43:55 +00:00
t . clientConn . Close ( )
2022-08-24 02:24:25 +00:00
// A connection in t.conns after serve() returns means
// we never received a CLOSE_RSP for it, so we need to
// close any channels remaining for these connections.
2022-11-01 09:43:55 +00:00
t . conns . closeAll ( )
close ( t . done )
2022-08-24 02:24:25 +00:00
} ( )
2020-12-17 12:28:29 +00:00
2020-04-14 07:04:33 +00:00
for {
pkt , err := t . stream . Recv ( )
2022-11-01 09:43:55 +00:00
if err == io . EOF || t . isClosing ( ) {
2020-04-14 07:04:33 +00:00
return
}
if err != nil || pkt == nil {
2020-12-17 12:28:29 +00:00
klog . ErrorS ( err , "stream read failure" )
2020-04-14 07:04:33 +00:00
return
}
2020-12-17 12:28:29 +00:00
klog . V ( 5 ) . InfoS ( "[tracing] recv packet" , "type" , pkt . Type )
2020-04-14 07:04:33 +00:00
switch pkt . Type {
case client . PacketType_DIAL_RSP :
resp := pkt . GetDialResponse ( )
2022-11-01 09:43:55 +00:00
pendingDial , ok := t . pendingDial . get ( resp . Random )
2020-04-14 07:04:33 +00:00
if ! ok {
2022-11-01 09:43:55 +00:00
// If the DIAL_RSP does not match a pending dial, it means one of two things:
// 1. There was a second DIAL_RSP for the connection request (this is very unlikely but possible)
// 2. grpcTunnel.DialContext() returned early due to a dial timeout or the client canceling the context
//
// In either scenario, we should return here and close the tunnel as it is no longer needed.
2022-01-20 07:03:08 +00:00
klog . V ( 1 ) . InfoS ( "DialResp not recognized; dropped" , "connectionID" , resp . ConnectID , "dialID" , resp . Random )
return
2022-11-01 09:43:55 +00:00
}
result := dialResult { connid : resp . ConnectID }
if resp . Error != "" {
result . err = & dialFailure { resp . Error , DialFailureEndpoint }
}
select {
// try to send to the result channel
case pendingDial . resultCh <- result :
// unblock if the cancel channel is closed
case <- pendingDial . cancelCh :
// Note: this condition can only be hit by a race condition where the
// DialContext() returns early (timeout) after the pendingDial is already
// fetched here, but before the result is sent.
klog . V ( 1 ) . InfoS ( "Pending dial has been cancelled; dropped" , "connectionID" , resp . ConnectID , "dialID" , resp . Random )
return
case <- tunnelCtx . Done ( ) :
klog . V ( 1 ) . InfoS ( "Tunnel has been closed; dropped" , "connectionID" , resp . ConnectID , "dialID" , resp . Random )
return
}
if resp . Error != "" {
// On dial error, avoid leaking serve goroutine.
return
}
case client . PacketType_DIAL_CLS :
resp := pkt . GetCloseDial ( )
pendingDial , ok := t . pendingDial . get ( resp . Random )
if ! ok {
// If the DIAL_CLS does not match a pending dial, it means one of two things:
// 1. There was a DIAL_CLS receieved after a DIAL_RSP (unlikely but possible)
// 2. grpcTunnel.DialContext() returned early due to a dial timeout or the client canceling the context
//
// In either scenario, we should return here and close the tunnel as it is no longer needed.
klog . V ( 1 ) . InfoS ( "DIAL_CLS after dial finished" , "dialID" , resp . Random )
2020-04-14 07:04:33 +00:00
} else {
2021-06-25 04:59:51 +00:00
result := dialResult {
2022-11-01 09:43:55 +00:00
err : & dialFailure { "dial closed" , DialFailureDialClosed } ,
2020-04-14 07:04:33 +00:00
}
2022-01-20 07:03:08 +00:00
select {
2022-03-21 20:11:55 +00:00
case pendingDial . resultCh <- result :
case <- pendingDial . cancelCh :
2022-11-01 09:43:55 +00:00
// Note: this condition can only be hit by a race condition where the
// DialContext() returns early (timeout) after the pendingDial is already
// fetched here, but before the result is sent.
2022-08-24 02:24:25 +00:00
case <- tunnelCtx . Done ( ) :
2021-06-25 04:59:51 +00:00
}
2020-04-14 07:04:33 +00:00
}
2022-11-01 09:43:55 +00:00
return // Stop serving & close the tunnel.
2021-05-10 10:45:47 +00:00
2020-04-14 07:04:33 +00:00
case client . PacketType_DATA :
resp := pkt . GetData ( )
// TODO: flow control
2022-11-01 09:43:55 +00:00
conn , ok := t . conns . get ( resp . ConnectID )
2020-04-14 07:04:33 +00:00
2022-11-01 09:43:55 +00:00
if ! ok {
klog . V ( 1 ) . InfoS ( "Connection not recognized" , "connectionID" , resp . ConnectID )
continue
}
timer := time . NewTimer ( ( time . Duration ) ( t . readTimeoutSeconds ) * time . Second )
select {
case conn . readCh <- resp . Data :
timer . Stop ( )
case <- timer . C :
klog . ErrorS ( fmt . Errorf ( "timeout" ) , "readTimeout has been reached, the grpc connection to the proxy server will be closed" , "connectionID" , conn . connID , "readTimeoutSeconds" , t . readTimeoutSeconds )
return
case <- tunnelCtx . Done ( ) :
klog . V ( 1 ) . InfoS ( "Tunnel has been closed, the grpc connection to the proxy server will be closed" , "connectionID" , conn . connID )
2020-04-14 07:04:33 +00:00
}
2022-11-01 09:43:55 +00:00
2020-04-14 07:04:33 +00:00
case client . PacketType_CLOSE_RSP :
resp := pkt . GetCloseResponse ( )
2022-11-01 09:43:55 +00:00
conn , ok := t . conns . get ( resp . ConnectID )
if ! ok {
klog . V ( 1 ) . InfoS ( "Connection not recognized" , "connectionID" , resp . ConnectID )
continue
2020-04-14 07:04:33 +00:00
}
2022-11-01 09:43:55 +00:00
close ( conn . readCh )
conn . closeCh <- resp . Error
close ( conn . closeCh )
t . conns . remove ( resp . ConnectID )
return
2020-04-14 07:04:33 +00:00
}
}
}
// Dial connects to the address on the named network, similar to
// what net.Dial does. The only supported protocol is tcp.
2022-08-24 02:24:25 +00:00
func ( t * grpcTunnel ) DialContext ( requestCtx context . Context , protocol , address string ) ( net . Conn , error ) {
2022-11-01 09:43:55 +00:00
select {
case <- t . done :
return nil , errors . New ( "tunnel is closed" )
default : // Tunnel is open, carry on.
}
2020-04-14 07:04:33 +00:00
if protocol != "tcp" {
return nil , errors . New ( "protocol not supported" )
}
2021-06-25 04:59:51 +00:00
random := rand . Int63 ( ) /* #nosec G404 */
2022-03-21 20:11:55 +00:00
// This channel is closed once we're returning and no longer waiting on resultCh
cancelCh := make ( chan struct { } )
defer close ( cancelCh )
// This channel MUST NOT be buffered. The sender needs to know when we are not receiving things, so they can abort.
resCh := make ( chan dialResult )
2022-11-01 09:43:55 +00:00
t . pendingDial . add ( random , pendingDial { resultCh : resCh , cancelCh : cancelCh } )
defer t . pendingDial . remove ( random )
2020-04-14 07:04:33 +00:00
req := & client . Packet {
Type : client . PacketType_DIAL_REQ ,
Payload : & client . Packet_DialRequest {
DialRequest : & client . DialRequest {
Protocol : protocol ,
Address : address ,
Random : random ,
} ,
} ,
}
2020-12-17 12:28:29 +00:00
klog . V ( 5 ) . InfoS ( "[tracing] send packet" , "type" , req . Type )
2020-04-14 07:04:33 +00:00
err := t . stream . Send ( req )
if err != nil {
return nil , err
}
2020-12-17 12:28:29 +00:00
klog . V ( 5 ) . Infoln ( "DIAL_REQ sent to proxy server" )
2020-04-14 07:04:33 +00:00
2022-11-01 09:43:55 +00:00
c := & conn {
stream : t . stream ,
random : random ,
closeTunnel : t . closeTunnel ,
}
2020-04-14 07:04:33 +00:00
select {
case res := <- resCh :
2022-11-01 09:43:55 +00:00
if res . err != nil {
return nil , res . err
2020-04-14 07:04:33 +00:00
}
c . connID = res . connid
c . readCh = make ( chan [ ] byte , 10 )
2021-06-25 04:59:51 +00:00
c . closeCh = make ( chan string , 1 )
2022-11-01 09:43:55 +00:00
t . conns . add ( res . connid , c )
2020-04-14 07:04:33 +00:00
case <- time . After ( 30 * time . Second ) :
2022-03-21 20:11:55 +00:00
klog . V ( 5 ) . InfoS ( "Timed out waiting for DialResp" , "dialID" , random )
2022-11-01 09:43:55 +00:00
go t . closeDial ( random )
return nil , & dialFailure { "dial timeout, backstop" , DialFailureTimeout }
2022-08-24 02:24:25 +00:00
case <- requestCtx . Done ( ) :
klog . V ( 5 ) . InfoS ( "Context canceled waiting for DialResp" , "ctxErr" , requestCtx . Err ( ) , "dialID" , random )
2022-11-01 09:43:55 +00:00
go t . closeDial ( random )
return nil , & dialFailure { "dial timeout, context" , DialFailureContext }
case <- t . done :
klog . V ( 5 ) . InfoS ( "Tunnel closed while waiting for DialResp" , "dialID" , random )
return nil , & dialFailure { "tunnel closed" , DialFailureTunnelClosed }
2020-04-14 07:04:33 +00:00
}
return c , nil
}
2022-11-01 09:43:55 +00:00
func ( t * grpcTunnel ) Done ( ) <- chan struct { } {
return t . done
}
// Send a best-effort DIAL_CLS request for the given dial ID.
func ( t * grpcTunnel ) closeDial ( dialID int64 ) {
req := & client . Packet {
Type : client . PacketType_DIAL_CLS ,
Payload : & client . Packet_CloseDial {
CloseDial : & client . CloseDial {
Random : dialID ,
} ,
} ,
}
if err := t . stream . Send ( req ) ; err != nil {
klog . V ( 5 ) . InfoS ( "Failed to send DIAL_CLS" , "err" , err , "dialID" , dialID )
}
t . closeTunnel ( )
}
func ( t * grpcTunnel ) closeTunnel ( ) {
atomic . StoreUint32 ( & t . closing , 1 )
t . clientConn . Close ( )
}
func ( t * grpcTunnel ) isClosing ( ) bool {
return atomic . LoadUint32 ( & t . closing ) != 0
}
func GetDialFailureReason ( err error ) ( isDialFailure bool , reason DialFailureReason ) {
var df * dialFailure
if errors . As ( err , & df ) {
return true , df . reason
}
return false , DialFailureUnknown
}
type dialFailure struct {
msg string
reason DialFailureReason
}
func ( df * dialFailure ) Error ( ) string {
return df . msg
}
type DialFailureReason string
const (
DialFailureUnknown DialFailureReason = "unknown"
// DialFailureTimeout indicates the hard 30 second timeout was hit.
DialFailureTimeout DialFailureReason = "timeout"
// DialFailureContext indicates that the context was cancelled or reached it's deadline before
// the dial response was returned.
DialFailureContext DialFailureReason = "context"
// DialFailureEndpoint indicates that the konnectivity-agent was unable to reach the backend endpoint.
DialFailureEndpoint DialFailureReason = "endpoint"
// DialFailureDialClosed indicates that the client received a CloseDial response, indicating the
// connection was closed before the dial could complete.
DialFailureDialClosed DialFailureReason = "dialclosed"
// DialFailureTunnelClosed indicates that the client connection was closed before the dial could
// complete.
DialFailureTunnelClosed DialFailureReason = "tunnelclosed"
)