rebase: bump the golang-dependencies group with 1 update

Bumps the golang-dependencies group with 1 update: [golang.org/x/crypto](https://github.com/golang/crypto).


Updates `golang.org/x/crypto` from 0.16.0 to 0.17.0
- [Commits](https://github.com/golang/crypto/compare/v0.16.0...v0.17.0)

---
updated-dependencies:
- dependency-name: golang.org/x/crypto
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: golang-dependencies
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-12-18 20:31:00 +00:00
committed by mergify[bot]
parent 1ad79314f9
commit e5d9b68d36
398 changed files with 33924 additions and 10753 deletions

View File

@ -120,7 +120,7 @@ type grpcTunnel struct {
stream client.ProxyService_ProxyClient
sendLock sync.Mutex
recvLock sync.Mutex
clientConn clientConn
grpcConn clientConn
pendingDial pendingDialManager
conns connectionManager
@ -197,7 +197,7 @@ func CreateSingleUseGrpcTunnelWithContext(createCtx, tunnelCtx context.Context,
func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *grpcTunnel {
t := grpcTunnel{
stream: stream,
clientConn: c,
grpcConn: c,
pendingDial: pendingDialManager{pendingDials: make(map[int64]pendingDial)},
conns: connectionManager{conns: make(map[int64]*conn)},
readTimeoutSeconds: 10,
@ -238,7 +238,7 @@ func (t *grpcTunnel) closeMetric() {
func (t *grpcTunnel) serve(tunnelCtx context.Context) {
defer func() {
t.clientConn.Close()
t.grpcConn.Close()
// A connection in t.conns after serve() returns means
// we never received a CLOSE_RSP for it, so we need to
@ -278,7 +278,7 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
// 2. grpcTunnel.DialContext() returned early due to a dial timeout or the client canceling the context
//
// In either scenario, we should return here and close the tunnel as it is no longer needed.
kvs := []interface{}{"dialID", resp.Random, "connectID", resp.ConnectID}
kvs := []interface{}{"dialID", resp.Random, "connectionID", resp.ConnectID}
if resp.Error != "" {
kvs = append(kvs, "error", resp.Error)
}
@ -349,14 +349,7 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
if !ok {
klog.ErrorS(nil, "Connection not recognized", "connectionID", resp.ConnectID, "packetType", "DATA")
t.Send(&client.Packet{
Type: client.PacketType_CLOSE_REQ,
Payload: &client.Packet_CloseRequest{
CloseRequest: &client.CloseRequest{
ConnectID: resp.ConnectID,
},
},
})
t.sendCloseRequest(resp.ConnectID)
continue
}
timer := time.NewTimer((time.Duration)(t.readTimeoutSeconds) * time.Second)
@ -448,9 +441,8 @@ func (t *grpcTunnel) dialContext(requestCtx context.Context, protocol, address s
klog.V(5).Infoln("DIAL_REQ sent to proxy server")
c := &conn{
tunnel: t,
random: random,
closeTunnel: t.closeTunnel,
tunnel: t,
random: random,
}
select {
@ -464,11 +456,17 @@ func (t *grpcTunnel) dialContext(requestCtx context.Context, protocol, address s
t.conns.add(res.connid, c)
case <-time.After(30 * time.Second):
klog.V(5).InfoS("Timed out waiting for DialResp", "dialID", random)
go t.closeDial(random)
go func() {
defer t.closeTunnel()
t.sendDialClose(random)
}()
return nil, &dialFailure{"dial timeout, backstop", metrics.DialFailureTimeout}
case <-requestCtx.Done():
klog.V(5).InfoS("Context canceled waiting for DialResp", "ctxErr", requestCtx.Err(), "dialID", random)
go t.closeDial(random)
go func() {
defer t.closeTunnel()
t.sendDialClose(random)
}()
return nil, &dialFailure{"dial timeout, context", metrics.DialFailureContext}
case <-t.done:
klog.V(5).InfoS("Tunnel closed while waiting for DialResp", "dialID", random)
@ -483,7 +481,21 @@ func (t *grpcTunnel) Done() <-chan struct{} {
}
// Send a best-effort DIAL_CLS request for the given dial ID.
func (t *grpcTunnel) closeDial(dialID int64) {
func (t *grpcTunnel) sendCloseRequest(connID int64) error {
req := &client.Packet{
Type: client.PacketType_CLOSE_REQ,
Payload: &client.Packet_CloseRequest{
CloseRequest: &client.CloseRequest{
ConnectID: connID,
},
},
}
klog.V(5).InfoS("[tracing] send req", "type", req.Type)
return t.Send(req)
}
func (t *grpcTunnel) sendDialClose(dialID int64) error {
req := &client.Packet{
Type: client.PacketType_DIAL_CLS,
Payload: &client.Packet_CloseDial{
@ -492,15 +504,13 @@ func (t *grpcTunnel) closeDial(dialID int64) {
},
},
}
if err := t.Send(req); err != nil {
klog.V(5).InfoS("Failed to send DIAL_CLS", "err", err, "dialID", dialID)
}
t.closeTunnel()
klog.V(5).InfoS("[tracing] send req", "type", req.Type)
return t.Send(req)
}
func (t *grpcTunnel) closeTunnel() {
atomic.StoreUint32(&t.closing, 1)
t.clientConn.Close()
t.grpcConn.Close()
}
func (t *grpcTunnel) isClosing() bool {

View File

@ -20,6 +20,7 @@ import (
"errors"
"io"
"net"
"sync/atomic"
"time"
"k8s.io/klog/v2"
@ -31,25 +32,31 @@ import (
// successful delivery of CLOSE_REQ.
const CloseTimeout = 10 * time.Second
var errConnTunnelClosed = errors.New("tunnel closed")
var errConnCloseTimeout = errors.New("close timeout")
// conn is an implementation of net.Conn, where the data is transported
// over an established tunnel defined by a gRPC service ProxyService.
type conn struct {
tunnel *grpcTunnel
connID int64
random int64
readCh chan []byte
tunnel *grpcTunnel
// connID is set when a successful DIAL_RSP is received
connID int64
// random (dialID) is always initialized
random int64
readCh chan []byte
// On receiving CLOSE_RSP, closeCh will be sent any error message and closed.
closeCh chan string
rdata []byte
// closeTunnel is an optional callback to close the underlying grpc connection.
closeTunnel func()
// closing is an atomic bool represented as a 0 or 1, and set to true when the connection is being closed.
// closing should only be accessed through atomic methods.
// TODO: switch this to an atomic.Bool once the client is exclusively buit with go1.19+
closing uint32
}
var _ net.Conn = &conn{}
// Write sends the data thru the connection over proxy service
// Write sends the data through the connection over proxy service
func (c *conn) Write(data []byte) (n int, err error) {
req := &client.Packet{
Type: client.PacketType_DATA,
@ -116,40 +123,23 @@ func (c *conn) SetWriteDeadline(t time.Time) error {
return errors.New("not implemented")
}
// Close closes the connection. It also sends CLOSE_REQ packet over
// proxy service to notify remote to drop the connection.
// Close closes the connection, sends best-effort close signal to proxy
// service, and frees resources.
func (c *conn) Close() error {
klog.V(4).Infoln("closing connection")
if c.closeTunnel != nil {
defer c.closeTunnel()
old := atomic.SwapUint32(&c.closing, 1)
if old != 0 {
// prevent duplicate messages
return nil
}
klog.V(4).Infoln("closing connection", "dialID", c.random, "connectionID", c.connID)
defer c.tunnel.closeTunnel()
var req *client.Packet
if c.connID != 0 {
req = &client.Packet{
Type: client.PacketType_CLOSE_REQ,
Payload: &client.Packet_CloseRequest{
CloseRequest: &client.CloseRequest{
ConnectID: c.connID,
},
},
}
c.tunnel.sendCloseRequest(c.connID)
} else {
// Never received a DIAL response so no connection ID.
req = &client.Packet{
Type: client.PacketType_DIAL_CLS,
Payload: &client.Packet_CloseDial{
CloseDial: &client.CloseDial{
Random: c.random,
},
},
}
}
klog.V(5).InfoS("[tracing] send req", "type", req.Type)
if err := c.tunnel.Send(req); err != nil {
return err
c.tunnel.sendDialClose(c.random)
}
select {
@ -158,6 +148,8 @@ func (c *conn) Close() error {
return errors.New(errMsg)
}
return nil
case <-c.tunnel.Done():
return errConnTunnelClosed
case <-time.After(CloseTimeout):
}