rebase: bump k8s.io/kubernetes from 1.23.4 to 1.23.5

Bumps [k8s.io/kubernetes](https://github.com/kubernetes/kubernetes) from 1.23.4 to 1.23.5.
- [Release notes](https://github.com/kubernetes/kubernetes/releases)
- [Commits](https://github.com/kubernetes/kubernetes/compare/v1.23.4...v1.23.5)

---
updated-dependencies:
- dependency-name: k8s.io/kubernetes
  dependency-type: direct:production
  update-type: version-update:semver-patch
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2022-03-21 20:11:55 +00:00
committed by mergify[bot]
parent 8c5e414d53
commit fe4b5fe337
7 changed files with 145 additions and 86 deletions

View File

@ -43,10 +43,17 @@ type dialResult struct {
connid int64
}
type pendingDial struct {
// resultCh is the channel to send the dial result to
resultCh chan<- dialResult
// cancelCh is the channel closed when resultCh no longer has a receiver
cancelCh <-chan struct{}
}
// grpcTunnel implements Tunnel
type grpcTunnel struct {
stream client.ProxyService_ProxyClient
pendingDial map[int64]chan<- dialResult
pendingDial map[int64]pendingDial
conns map[int64]*conn
pendingDialLock sync.RWMutex
connsLock sync.RWMutex
@ -76,12 +83,13 @@ func CreateSingleUseGrpcTunnel(ctx context.Context, address string, opts ...grpc
stream, err := grpcClient.Proxy(ctx)
if err != nil {
c.Close()
return nil, err
}
tunnel := &grpcTunnel{
stream: stream,
pendingDial: make(map[int64]chan<- dialResult),
pendingDial: make(map[int64]pendingDial),
conns: make(map[int64]*conn),
readTimeoutSeconds: 10,
}
@ -110,7 +118,7 @@ func (t *grpcTunnel) serve(c clientConn) {
case client.PacketType_DIAL_RSP:
resp := pkt.GetDialResponse()
t.pendingDialLock.RLock()
ch, ok := t.pendingDial[resp.Random]
pendingDial, ok := t.pendingDial[resp.Random]
t.pendingDialLock.RUnlock()
if !ok {
@ -122,10 +130,16 @@ func (t *grpcTunnel) serve(c clientConn) {
connid: resp.ConnectID,
}
select {
case ch <- result:
default:
klog.ErrorS(fmt.Errorf("blocked pending channel"), "Received second dial response for connection request", "connectionID", resp.ConnectID, "dialID", resp.Random)
// On multiple dial responses, avoid leaking serve goroutine.
// try to send to the result channel
case pendingDial.resultCh <- result:
// unblock if the cancel channel is closed
case <-pendingDial.cancelCh:
// If there are no readers of the pending dial channel above, it means one of two things:
// 1. There was a second DIAL_RSP for the connection request (this is very unlikely but possible)
// 2. grpcTunnel.DialContext() returned early due to a dial timeout or the client canceling the context
//
// In either scenario, we should return here as this tunnel is no longer needed.
klog.V(1).InfoS("Pending dial has been cancelled; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
return
}
}
@ -182,9 +196,16 @@ func (t *grpcTunnel) DialContext(ctx context.Context, protocol, address string)
}
random := rand.Int63() /* #nosec G404 */
resCh := make(chan dialResult, 1)
// This channel is closed once we're returning and no longer waiting on resultCh
cancelCh := make(chan struct{})
defer close(cancelCh)
// This channel MUST NOT be buffered. The sender needs to know when we are not receiving things, so they can abort.
resCh := make(chan dialResult)
t.pendingDialLock.Lock()
t.pendingDial[random] = resCh
t.pendingDial[random] = pendingDial{resultCh: resCh, cancelCh: cancelCh}
t.pendingDialLock.Unlock()
defer func() {
t.pendingDialLock.Lock()
@ -225,8 +246,10 @@ func (t *grpcTunnel) DialContext(ctx context.Context, protocol, address string)
t.conns[res.connid] = c
t.connsLock.Unlock()
case <-time.After(30 * time.Second):
klog.V(5).InfoS("Timed out waiting for DialResp", "dialID", random)
return nil, errors.New("dial timeout, backstop")
case <-ctx.Done():
klog.V(5).InfoS("Context canceled waiting for DialResp", "ctxErr", ctx.Err(), "dialID", random)
return nil, errors.New("dial timeout, context")
}