rebase: update kubernetes to 1.26.1

update kubernetes and its dependencies
to v1.26.1

Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
This commit is contained in:
Madhu Rajanna
2023-02-01 18:06:36 +01:00
committed by mergify[bot]
parent e9e33fb851
commit 9c8de9471e
937 changed files with 75539 additions and 33050 deletions

View File

@ -29,6 +29,9 @@ import (
"google.golang.org/grpc"
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics"
commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
)
@ -131,6 +134,9 @@ type grpcTunnel struct {
// closing should only be accessed through atomic methods.
// TODO: switch this to an atomic.Bool once the client is exclusively buit with go1.19+
closing uint32
// Stores the current metrics.ClientConnectionStatus
prevStatus atomic.Value
}
type clientConn interface {
@ -139,6 +145,11 @@ type clientConn interface {
var _ clientConn = &grpc.ClientConn{}
var (
// Expose metrics for client to register.
Metrics = metrics.Metrics
)
// CreateSingleUseGrpcTunnel creates a Tunnel to dial to a remote server through a
// gRPC based proxy service.
// Currently, a single tunnel supports a single connection, and the tunnel is closed when the connection is terminated
@ -177,7 +188,7 @@ func CreateSingleUseGrpcTunnelWithContext(createCtx, tunnelCtx context.Context,
}
func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *grpcTunnel {
return &grpcTunnel{
t := grpcTunnel{
stream: stream,
clientConn: c,
pendingDial: pendingDialManager{pendingDials: make(map[int64]pendingDial)},
@ -185,6 +196,36 @@ func newUnstartedTunnel(stream client.ProxyService_ProxyClient, c clientConn) *g
readTimeoutSeconds: 10,
done: make(chan struct{}),
}
s := metrics.ClientConnectionStatusCreated
t.prevStatus.Store(s)
metrics.Metrics.GetClientConnectionsMetric().WithLabelValues(string(s)).Inc()
return &t
}
func (t *grpcTunnel) updateMetric(status metrics.ClientConnectionStatus) {
select {
case <-t.Done():
return
default:
}
prevStatus := t.prevStatus.Swap(status).(metrics.ClientConnectionStatus)
m := metrics.Metrics.GetClientConnectionsMetric()
m.WithLabelValues(string(prevStatus)).Dec()
m.WithLabelValues(string(status)).Inc()
}
// closeMetric should be called exactly once to finalize client_connections metric.
func (t *grpcTunnel) closeMetric() {
select {
case <-t.Done():
return
default:
}
prevStatus := t.prevStatus.Load().(metrics.ClientConnectionStatus)
metrics.Metrics.GetClientConnectionsMetric().WithLabelValues(string(prevStatus)).Dec()
}
func (t *grpcTunnel) serve(tunnelCtx context.Context) {
@ -196,19 +237,29 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
// close any channels remaining for these connections.
t.conns.closeAll()
t.closeMetric()
close(t.done)
}()
for {
pkt, err := t.stream.Recv()
if err == io.EOF || t.isClosing() {
if err == io.EOF {
return
}
const segment = commonmetrics.SegmentToClient
isClosing := t.isClosing()
if err != nil || pkt == nil {
klog.ErrorS(err, "stream read failure")
if !isClosing {
klog.ErrorS(err, "stream read failure")
}
metrics.Metrics.ObserveStreamErrorNoPacket(segment, err)
return
}
metrics.Metrics.ObservePacket(segment, pkt.Type)
if isClosing {
return
}
klog.V(5).InfoS("[tracing] recv packet", "type", pkt.Type)
switch pkt.Type {
@ -222,13 +273,19 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
// 2. grpcTunnel.DialContext() returned early due to a dial timeout or the client canceling the context
//
// In either scenario, we should return here and close the tunnel as it is no longer needed.
klog.V(1).InfoS("DialResp not recognized; dropped", "connectionID", resp.ConnectID, "dialID", resp.Random)
kvs := []interface{}{"dialID", resp.Random, "connectID", resp.ConnectID}
if resp.Error != "" {
kvs = append(kvs, "error", resp.Error)
}
klog.V(1).InfoS("DialResp not recognized; dropped", kvs...)
return
}
result := dialResult{connid: resp.ConnectID}
if resp.Error != "" {
result.err = &dialFailure{resp.Error, DialFailureEndpoint}
result.err = &dialFailure{resp.Error, metrics.DialFailureEndpoint}
} else {
t.updateMetric(metrics.ClientConnectionStatusOk)
}
select {
// try to send to the result channel
@ -263,7 +320,7 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
klog.V(1).InfoS("DIAL_CLS after dial finished", "dialID", resp.Random)
} else {
result := dialResult{
err: &dialFailure{"dial closed", DialFailureDialClosed},
err: &dialFailure{"dial closed", metrics.DialFailureDialClosed},
}
select {
case pendingDial.resultCh <- result:
@ -316,6 +373,15 @@ func (t *grpcTunnel) serve(tunnelCtx context.Context) {
// Dial connects to the address on the named network, similar to
// what net.Dial does. The only supported protocol is tcp.
func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address string) (net.Conn, error) {
conn, err := t.dialContext(requestCtx, protocol, address)
if err != nil {
_, reason := GetDialFailureReason(err)
metrics.Metrics.ObserveDialFailure(reason)
}
return conn, err
}
func (t *grpcTunnel) dialContext(requestCtx context.Context, protocol, address string) (net.Conn, error) {
select {
case <-t.done:
return nil, errors.New("tunnel is closed")
@ -326,6 +392,8 @@ func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address s
return nil, errors.New("protocol not supported")
}
t.updateMetric(metrics.ClientConnectionStatusDialing)
random := rand.Int63() /* #nosec G404 */
// This channel is closed once we're returning and no longer waiting on resultCh
@ -350,8 +418,11 @@ func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address s
}
klog.V(5).InfoS("[tracing] send packet", "type", req.Type)
const segment = commonmetrics.SegmentFromClient
metrics.Metrics.ObservePacket(segment, req.Type)
err := t.stream.Send(req)
if err != nil {
metrics.Metrics.ObserveStreamError(segment, err, req.Type)
return nil, err
}
@ -375,14 +446,14 @@ func (t *grpcTunnel) DialContext(requestCtx context.Context, protocol, address s
case <-time.After(30 * time.Second):
klog.V(5).InfoS("Timed out waiting for DialResp", "dialID", random)
go t.closeDial(random)
return nil, &dialFailure{"dial timeout, backstop", DialFailureTimeout}
return nil, &dialFailure{"dial timeout, backstop", metrics.DialFailureTimeout}
case <-requestCtx.Done():
klog.V(5).InfoS("Context canceled waiting for DialResp", "ctxErr", requestCtx.Err(), "dialID", random)
go t.closeDial(random)
return nil, &dialFailure{"dial timeout, context", DialFailureContext}
return nil, &dialFailure{"dial timeout, context", metrics.DialFailureContext}
case <-t.done:
klog.V(5).InfoS("Tunnel closed while waiting for DialResp", "dialID", random)
return nil, &dialFailure{"tunnel closed", DialFailureTunnelClosed}
return nil, &dialFailure{"tunnel closed", metrics.DialFailureTunnelClosed}
}
return c, nil
@ -402,7 +473,10 @@ func (t *grpcTunnel) closeDial(dialID int64) {
},
},
}
const segment = commonmetrics.SegmentFromClient
metrics.Metrics.ObservePacket(segment, req.Type)
if err := t.stream.Send(req); err != nil {
metrics.Metrics.ObserveStreamError(segment, err, req.Type)
klog.V(5).InfoS("Failed to send DIAL_CLS", "err", err, "dialID", dialID)
}
t.closeTunnel()
@ -417,38 +491,19 @@ func (t *grpcTunnel) isClosing() bool {
return atomic.LoadUint32(&t.closing) != 0
}
func GetDialFailureReason(err error) (isDialFailure bool, reason DialFailureReason) {
func GetDialFailureReason(err error) (isDialFailure bool, reason metrics.DialFailureReason) {
var df *dialFailure
if errors.As(err, &df) {
return true, df.reason
}
return false, DialFailureUnknown
return false, metrics.DialFailureUnknown
}
type dialFailure struct {
msg string
reason DialFailureReason
reason metrics.DialFailureReason
}
func (df *dialFailure) Error() string {
return df.msg
}
type DialFailureReason string
const (
DialFailureUnknown DialFailureReason = "unknown"
// DialFailureTimeout indicates the hard 30 second timeout was hit.
DialFailureTimeout DialFailureReason = "timeout"
// DialFailureContext indicates that the context was cancelled or reached it's deadline before
// the dial response was returned.
DialFailureContext DialFailureReason = "context"
// DialFailureEndpoint indicates that the konnectivity-agent was unable to reach the backend endpoint.
DialFailureEndpoint DialFailureReason = "endpoint"
// DialFailureDialClosed indicates that the client received a CloseDial response, indicating the
// connection was closed before the dial could complete.
DialFailureDialClosed DialFailureReason = "dialclosed"
// DialFailureTunnelClosed indicates that the client connection was closed before the dial could
// complete.
DialFailureTunnelClosed DialFailureReason = "tunnelclosed"
)

View File

@ -23,6 +23,9 @@ import (
"time"
"k8s.io/klog/v2"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/client/metrics"
commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
)
@ -62,8 +65,11 @@ func (c *conn) Write(data []byte) (n int, err error) {
klog.V(5).InfoS("[tracing] send req", "type", req.Type)
const segment = commonmetrics.SegmentFromClient
metrics.Metrics.ObservePacket(segment, req.Type)
err = c.stream.Send(req)
if err != nil {
metrics.Metrics.ObserveStreamError(segment, err, req.Type)
return 0, err
}
return len(data), err
@ -147,7 +153,10 @@ func (c *conn) Close() error {
klog.V(5).InfoS("[tracing] send req", "type", req.Type)
const segment = commonmetrics.SegmentFromClient
metrics.Metrics.ObservePacket(segment, req.Type)
if err := c.stream.Send(req); err != nil {
metrics.Metrics.ObserveStreamError(segment, err, req.Type)
return err
}

View File

@ -0,0 +1,162 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
commonmetrics "sigs.k8s.io/apiserver-network-proxy/konnectivity-client/pkg/common/metrics"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
)
const (
Namespace = "konnectivity_network_proxy"
Subsystem = "client"
)
var (
// Metrics provides access to all client metrics. The client
// application is responsible for registering (via Metrics.RegisterMetrics).
Metrics = newMetrics()
)
// ClientMetrics includes all the metrics of the konnectivity-client.
type ClientMetrics struct {
registerOnce sync.Once
streamPackets *prometheus.CounterVec
streamErrors *prometheus.CounterVec
dialFailures *prometheus.CounterVec
clientConns *prometheus.GaugeVec
}
type DialFailureReason string
const (
DialFailureUnknown DialFailureReason = "unknown"
// DialFailureTimeout indicates the hard 30 second timeout was hit.
DialFailureTimeout DialFailureReason = "timeout"
// DialFailureContext indicates that the context was cancelled or reached it's deadline before
// the dial response was returned.
DialFailureContext DialFailureReason = "context"
// DialFailureEndpoint indicates that the konnectivity-agent was unable to reach the backend endpoint.
DialFailureEndpoint DialFailureReason = "endpoint"
// DialFailureDialClosed indicates that the client received a CloseDial response, indicating the
// connection was closed before the dial could complete.
DialFailureDialClosed DialFailureReason = "dialclosed"
// DialFailureTunnelClosed indicates that the client connection was closed before the dial could
// complete.
DialFailureTunnelClosed DialFailureReason = "tunnelclosed"
)
type ClientConnectionStatus string
const (
// The connection is created but has not yet been dialed.
ClientConnectionStatusCreated ClientConnectionStatus = "created"
// The connection is pending dial response.
ClientConnectionStatusDialing ClientConnectionStatus = "dialing"
// The connection is established.
ClientConnectionStatusOk ClientConnectionStatus = "ok"
// The connection is closing.
ClientConnectionStatusClosing ClientConnectionStatus = "closing"
)
func newMetrics() *ClientMetrics {
// The denominator (total dials started) for both
// dial_failure_total and dial_duration_seconds is the
// stream_packets_total (common metric), where segment is
// "from_client" and packet_type is "DIAL_REQ".
dialFailures := prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "dial_failure_total",
Help: "Number of dial failures observed, by reason (example: remote endpoint error)",
},
[]string{
"reason",
},
)
clientConns := prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: Namespace,
Subsystem: Subsystem,
Name: "client_connections",
Help: "Number of open client connections, by status (Example: dialing)",
},
[]string{
"status",
},
)
return &ClientMetrics{
streamPackets: commonmetrics.MakeStreamPacketsTotalMetric(Namespace, Subsystem),
streamErrors: commonmetrics.MakeStreamErrorsTotalMetric(Namespace, Subsystem),
dialFailures: dialFailures,
clientConns: clientConns,
}
}
// RegisterMetrics registers all metrics with the client application.
func (c *ClientMetrics) RegisterMetrics(r prometheus.Registerer) {
c.registerOnce.Do(func() {
r.MustRegister(c.streamPackets)
r.MustRegister(c.streamErrors)
r.MustRegister(c.dialFailures)
r.MustRegister(c.clientConns)
})
}
// LegacyRegisterMetrics registers all metrics via MustRegister func.
// TODO: remove this once https://github.com/kubernetes/kubernetes/pull/114293 is available.
func (c *ClientMetrics) LegacyRegisterMetrics(mustRegisterFn func(...prometheus.Collector)) {
c.registerOnce.Do(func() {
mustRegisterFn(c.streamPackets)
mustRegisterFn(c.streamErrors)
mustRegisterFn(c.dialFailures)
mustRegisterFn(c.clientConns)
})
}
// Reset resets the metrics.
func (c *ClientMetrics) Reset() {
c.streamPackets.Reset()
c.streamErrors.Reset()
c.dialFailures.Reset()
c.clientConns.Reset()
}
func (c *ClientMetrics) ObserveDialFailure(reason DialFailureReason) {
c.dialFailures.WithLabelValues(string(reason)).Inc()
}
func (c *ClientMetrics) GetClientConnectionsMetric() *prometheus.GaugeVec {
return c.clientConns
}
func (c *ClientMetrics) ObservePacket(segment commonmetrics.Segment, packetType client.PacketType) {
commonmetrics.ObservePacket(c.streamPackets, segment, packetType)
}
func (c *ClientMetrics) ObserveStreamErrorNoPacket(segment commonmetrics.Segment, err error) {
commonmetrics.ObserveStreamErrorNoPacket(c.streamErrors, segment, err)
}
func (c *ClientMetrics) ObserveStreamError(segment commonmetrics.Segment, err error, packetType client.PacketType) {
commonmetrics.ObserveStreamError(c.streamErrors, segment, err, packetType)
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package metrics provides metric definitions and helpers used
// across konnectivity client, server, and agent.
package metrics
import (
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc/status"
"sigs.k8s.io/apiserver-network-proxy/konnectivity-client/proto/client"
)
// Segment identifies one of four tunnel segments (e.g. from server to agent).
type Segment string
const (
// SegmentFromClient indicates a packet from client to server.
SegmentFromClient Segment = "from_client"
// SegmentToClient indicates a packet from server to client.
SegmentToClient Segment = "to_client"
// SegmentFromAgent indicates a packet from agent to server.
SegmentFromAgent Segment = "from_agent"
// SegmentToAgent indicates a packet from server to agent.
SegmentToAgent Segment = "to_agent"
)
func MakeStreamPacketsTotalMetric(namespace, subsystem string) *prometheus.CounterVec {
return prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stream_packets_total",
Help: "Count of packets processed, by segment and packet type (example: from_client, DIAL_REQ)",
},
[]string{"segment", "packet_type"},
)
}
func MakeStreamErrorsTotalMetric(namespace, subsystem string) *prometheus.CounterVec {
return prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "stream_errors_total",
Help: "Count of gRPC stream errors, by segment, grpc Code, packet type. (example: from_agent, Code.Unavailable, DIAL_RSP)",
},
[]string{"segment", "code", "packet_type"},
)
}
func ObservePacket(m *prometheus.CounterVec, segment Segment, packetType client.PacketType) {
m.WithLabelValues(string(segment), packetType.String()).Inc()
}
func ObserveStreamErrorNoPacket(m *prometheus.CounterVec, segment Segment, err error) {
code := status.Code(err)
m.WithLabelValues(string(segment), code.String(), "Unknown").Inc()
}
func ObserveStreamError(m *prometheus.CounterVec, segment Segment, err error, packetType client.PacketType) {
code := status.Code(err)
m.WithLabelValues(string(segment), code.String(), packetType.String()).Inc()
}