mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-06-13 18:43:34 +00:00
rebase: bump google.golang.org/grpc from 1.68.1 to 1.69.0
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.68.1 to 1.69.0. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.68.1...v1.69.0) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
committed by
mergify[bot]
parent
afd950ebed
commit
431e9231d2
2
vendor/google.golang.org/grpc/internal/backoff/backoff.go
generated
vendored
2
vendor/google.golang.org/grpc/internal/backoff/backoff.go
generated
vendored
@ -25,7 +25,7 @@ package backoff
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"math/rand"
|
||||
rand "math/rand/v2"
|
||||
"time"
|
||||
|
||||
grpcbackoff "google.golang.org/grpc/backoff"
|
||||
|
22
vendor/google.golang.org/grpc/internal/internal.go
generated
vendored
22
vendor/google.golang.org/grpc/internal/internal.go
generated
vendored
@ -29,8 +29,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
// WithHealthCheckFunc is set by dialoptions.go
|
||||
WithHealthCheckFunc any // func (HealthChecker) DialOption
|
||||
// HealthCheckFunc is used to provide client-side LB channel health checking
|
||||
HealthCheckFunc HealthChecker
|
||||
// BalancerUnregister is exported by package balancer to unregister a balancer.
|
||||
@ -149,6 +147,20 @@ var (
|
||||
// other features, including the CSDS service.
|
||||
NewXDSResolverWithConfigForTesting any // func([]byte) (resolver.Builder, error)
|
||||
|
||||
// NewXDSResolverWithClientForTesting creates a new xDS resolver builder
|
||||
// using the provided xDS client instead of creating a new one using the
|
||||
// bootstrap configuration specified by the supported environment variables.
|
||||
// The resolver.Builder is meant to be used in conjunction with the
|
||||
// grpc.WithResolvers DialOption. The resolver.Builder does not take
|
||||
// ownership of the provided xDS client and it is the responsibility of the
|
||||
// caller to close the client when no longer required.
|
||||
//
|
||||
// Testing Only
|
||||
//
|
||||
// This function should ONLY be used for testing and may not work with some
|
||||
// other features, including the CSDS service.
|
||||
NewXDSResolverWithClientForTesting any // func(xdsclient.XDSClient) (resolver.Builder, error)
|
||||
|
||||
// RegisterRLSClusterSpecifierPluginForTesting registers the RLS Cluster
|
||||
// Specifier Plugin for testing purposes, regardless of the XDSRLS environment
|
||||
// variable.
|
||||
@ -255,3 +267,9 @@ const (
|
||||
// It currently has an experimental suffix which would be removed once
|
||||
// end-to-end testing of the policy is completed.
|
||||
const RLSLoadBalancingPolicyName = "rls_experimental"
|
||||
|
||||
// EnforceSubConnEmbedding is used to enforce proper SubConn implementation
|
||||
// embedding.
|
||||
type EnforceSubConnEmbedding interface {
|
||||
enforceSubConnEmbedding()
|
||||
}
|
||||
|
41
vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
generated
vendored
41
vendor/google.golang.org/grpc/internal/resolver/dns/dns_resolver.go
generated
vendored
@ -24,8 +24,9 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
rand "math/rand/v2"
|
||||
"net"
|
||||
"net/netip"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
@ -122,7 +123,7 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
|
||||
}
|
||||
|
||||
// IP address.
|
||||
if ipAddr, ok := formatIP(host); ok {
|
||||
if ipAddr, err := formatIP(host); err == nil {
|
||||
addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
|
||||
cc.UpdateState(resolver.State{Addresses: addr})
|
||||
return deadResolver{}, nil
|
||||
@ -260,9 +261,9 @@ func (d *dnsResolver) lookupSRV(ctx context.Context) ([]resolver.Address, error)
|
||||
return nil, err
|
||||
}
|
||||
for _, a := range lbAddrs {
|
||||
ip, ok := formatIP(a)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("dns: error parsing A record IP address %v", a)
|
||||
ip, err := formatIP(a)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dns: error parsing A record IP address %v: %v", a, err)
|
||||
}
|
||||
addr := ip + ":" + strconv.Itoa(int(s.Port))
|
||||
newAddrs = append(newAddrs, resolver.Address{Addr: addr, ServerName: s.Target})
|
||||
@ -322,9 +323,9 @@ func (d *dnsResolver) lookupHost(ctx context.Context) ([]resolver.Address, error
|
||||
}
|
||||
newAddrs := make([]resolver.Address, 0, len(addrs))
|
||||
for _, a := range addrs {
|
||||
ip, ok := formatIP(a)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("dns: error parsing A record IP address %v", a)
|
||||
ip, err := formatIP(a)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dns: error parsing A record IP address %v: %v", a, err)
|
||||
}
|
||||
addr := ip + ":" + d.port
|
||||
newAddrs = append(newAddrs, resolver.Address{Addr: addr})
|
||||
@ -351,19 +352,19 @@ func (d *dnsResolver) lookup() (*resolver.State, error) {
|
||||
return &state, nil
|
||||
}
|
||||
|
||||
// formatIP returns ok = false if addr is not a valid textual representation of
|
||||
// an IP address. If addr is an IPv4 address, return the addr and ok = true.
|
||||
// formatIP returns an error if addr is not a valid textual representation of
|
||||
// an IP address. If addr is an IPv4 address, return the addr and error = nil.
|
||||
// If addr is an IPv6 address, return the addr enclosed in square brackets and
|
||||
// ok = true.
|
||||
func formatIP(addr string) (addrIP string, ok bool) {
|
||||
ip := net.ParseIP(addr)
|
||||
if ip == nil {
|
||||
return "", false
|
||||
// error = nil.
|
||||
func formatIP(addr string) (string, error) {
|
||||
ip, err := netip.ParseAddr(addr)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if ip.To4() != nil {
|
||||
return addr, true
|
||||
if ip.Is4() {
|
||||
return addr, nil
|
||||
}
|
||||
return "[" + addr + "]", true
|
||||
return "[" + addr + "]", nil
|
||||
}
|
||||
|
||||
// parseTarget takes the user input target string and default port, returns
|
||||
@ -379,7 +380,7 @@ func parseTarget(target, defaultPort string) (host, port string, err error) {
|
||||
if target == "" {
|
||||
return "", "", internal.ErrMissingAddr
|
||||
}
|
||||
if ip := net.ParseIP(target); ip != nil {
|
||||
if _, err := netip.ParseAddr(target); err == nil {
|
||||
// target is an IPv4 or IPv6(without brackets) address
|
||||
return target, defaultPort, nil
|
||||
}
|
||||
@ -427,7 +428,7 @@ func chosenByPercentage(a *int) bool {
|
||||
if a == nil {
|
||||
return true
|
||||
}
|
||||
return rand.Intn(100)+1 <= *a
|
||||
return rand.IntN(100)+1 <= *a
|
||||
}
|
||||
|
||||
func canaryingSC(js string) string {
|
||||
|
144
vendor/google.golang.org/grpc/internal/transport/client_stream.go
generated
vendored
Normal file
144
vendor/google.golang.org/grpc/internal/transport/client_stream.go
generated
vendored
Normal file
@ -0,0 +1,144 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2024 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ClientStream implements streaming functionality for a gRPC client.
|
||||
type ClientStream struct {
|
||||
*Stream // Embed for common stream functionality.
|
||||
|
||||
ct *http2Client
|
||||
done chan struct{} // closed at the end of stream to unblock writers.
|
||||
doneFunc func() // invoked at the end of stream.
|
||||
|
||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||
// headerValid indicates whether a valid header was received. Only
|
||||
// meaningful after headerChan is closed (always call waitOnHeader() before
|
||||
// reading its value).
|
||||
headerValid bool
|
||||
header metadata.MD // the received header metadata
|
||||
noHeaders bool // set if the client never received headers (set only after the stream is done).
|
||||
|
||||
bytesReceived atomic.Bool // indicates whether any bytes have been received on this stream
|
||||
unprocessed atomic.Bool // set if the server sends a refused stream or GOAWAY including this stream
|
||||
|
||||
status *status.Status // the status error received from the server
|
||||
}
|
||||
|
||||
// Read reads an n byte message from the input stream.
|
||||
func (s *ClientStream) Read(n int) (mem.BufferSlice, error) {
|
||||
b, err := s.Stream.read(n)
|
||||
if err == nil {
|
||||
s.ct.incrMsgRecv()
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
|
||||
// Close closes the stream and popagates err to any readers.
|
||||
func (s *ClientStream) Close(err error) {
|
||||
var (
|
||||
rst bool
|
||||
rstCode http2.ErrCode
|
||||
)
|
||||
if err != nil {
|
||||
rst = true
|
||||
rstCode = http2.ErrCodeCancel
|
||||
}
|
||||
s.ct.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
|
||||
}
|
||||
|
||||
// Write writes the hdr and data bytes to the output stream.
|
||||
func (s *ClientStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
|
||||
return s.ct.write(s, hdr, data, opts)
|
||||
}
|
||||
|
||||
// BytesReceived indicates whether any bytes have been received on this stream.
|
||||
func (s *ClientStream) BytesReceived() bool {
|
||||
return s.bytesReceived.Load()
|
||||
}
|
||||
|
||||
// Unprocessed indicates whether the server did not process this stream --
|
||||
// i.e. it sent a refused stream or GOAWAY including this stream ID.
|
||||
func (s *ClientStream) Unprocessed() bool {
|
||||
return s.unprocessed.Load()
|
||||
}
|
||||
|
||||
func (s *ClientStream) waitOnHeader() {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
// Close the stream to prevent headers/trailers from changing after
|
||||
// this function returns.
|
||||
s.Close(ContextErr(s.ctx.Err()))
|
||||
// headerChan could possibly not be closed yet if closeStream raced
|
||||
// with operateHeaders; wait until it is closed explicitly here.
|
||||
<-s.headerChan
|
||||
case <-s.headerChan:
|
||||
}
|
||||
}
|
||||
|
||||
// RecvCompress returns the compression algorithm applied to the inbound
|
||||
// message. It is empty string if there is no compression applied.
|
||||
func (s *ClientStream) RecvCompress() string {
|
||||
s.waitOnHeader()
|
||||
return s.recvCompress
|
||||
}
|
||||
|
||||
// Done returns a channel which is closed when it receives the final status
|
||||
// from the server.
|
||||
func (s *ClientStream) Done() <-chan struct{} {
|
||||
return s.done
|
||||
}
|
||||
|
||||
// Header returns the header metadata of the stream. Acquires the key-value
|
||||
// pairs of header metadata once it is available. It blocks until i) the
|
||||
// metadata is ready or ii) there is no header metadata or iii) the stream is
|
||||
// canceled/expired.
|
||||
func (s *ClientStream) Header() (metadata.MD, error) {
|
||||
s.waitOnHeader()
|
||||
|
||||
if !s.headerValid || s.noHeaders {
|
||||
return nil, s.status.Err()
|
||||
}
|
||||
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
|
||||
// TrailersOnly blocks until a header or trailers-only frame is received and
|
||||
// then returns true if the stream was trailers-only. If the stream ends
|
||||
// before headers are received, returns true, nil.
|
||||
func (s *ClientStream) TrailersOnly() bool {
|
||||
s.waitOnHeader()
|
||||
return s.noHeaders
|
||||
}
|
||||
|
||||
// Status returns the status received from the server.
|
||||
// Status can be read safely only after the stream has ended,
|
||||
// that is, after Done() is closed.
|
||||
func (s *ClientStream) Status() *status.Status {
|
||||
return s.status
|
||||
}
|
9
vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
generated
vendored
9
vendor/google.golang.org/grpc/internal/transport/flowcontrol.go
generated
vendored
@ -92,14 +92,11 @@ func (f *trInFlow) newLimit(n uint32) uint32 {
|
||||
|
||||
func (f *trInFlow) onData(n uint32) uint32 {
|
||||
f.unacked += n
|
||||
if f.unacked >= f.limit/4 {
|
||||
w := f.unacked
|
||||
f.unacked = 0
|
||||
if f.unacked < f.limit/4 {
|
||||
f.updateEffectiveWindowSize()
|
||||
return w
|
||||
return 0
|
||||
}
|
||||
f.updateEffectiveWindowSize()
|
||||
return 0
|
||||
return f.reset()
|
||||
}
|
||||
|
||||
func (f *trInFlow) reset() uint32 {
|
||||
|
36
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
36
vendor/google.golang.org/grpc/internal/transport/handler_server.go
generated
vendored
@ -225,7 +225,7 @@ func (ht *serverHandlerTransport) do(fn func()) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
|
||||
func (ht *serverHandlerTransport) writeStatus(s *ServerStream, st *status.Status) error {
|
||||
ht.writeStatusMu.Lock()
|
||||
defer ht.writeStatusMu.Unlock()
|
||||
|
||||
@ -289,14 +289,14 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
|
||||
|
||||
// writePendingHeaders sets common and custom headers on the first
|
||||
// write call (Write, WriteHeader, or WriteStatus)
|
||||
func (ht *serverHandlerTransport) writePendingHeaders(s *Stream) {
|
||||
func (ht *serverHandlerTransport) writePendingHeaders(s *ServerStream) {
|
||||
ht.writeCommonHeaders(s)
|
||||
ht.writeCustomHeaders(s)
|
||||
}
|
||||
|
||||
// writeCommonHeaders sets common headers on the first write
|
||||
// call (Write, WriteHeader, or WriteStatus).
|
||||
func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
|
||||
func (ht *serverHandlerTransport) writeCommonHeaders(s *ServerStream) {
|
||||
h := ht.rw.Header()
|
||||
h["Date"] = nil // suppress Date to make tests happy; TODO: restore
|
||||
h.Set("Content-Type", ht.contentType)
|
||||
@ -317,7 +317,7 @@ func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
|
||||
|
||||
// writeCustomHeaders sets custom headers set on the stream via SetHeader
|
||||
// on the first write call (Write, WriteHeader, or WriteStatus)
|
||||
func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
|
||||
func (ht *serverHandlerTransport) writeCustomHeaders(s *ServerStream) {
|
||||
h := ht.rw.Header()
|
||||
|
||||
s.hdrMu.Lock()
|
||||
@ -333,7 +333,7 @@ func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
|
||||
s.hdrMu.Unlock()
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data mem.BufferSlice, _ *Options) error {
|
||||
func (ht *serverHandlerTransport) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
|
||||
// Always take a reference because otherwise there is no guarantee the data will
|
||||
// be available after this function returns. This is what callers to Write
|
||||
// expect.
|
||||
@ -357,7 +357,7 @@ func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data mem.BufferSl
|
||||
return nil
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
func (ht *serverHandlerTransport) writeHeader(s *ServerStream, md metadata.MD) error {
|
||||
if err := s.SetHeader(md); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -385,7 +385,7 @@ func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {
|
||||
func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*ServerStream)) {
|
||||
// With this transport type there will be exactly 1 stream: this HTTP request.
|
||||
var cancel context.CancelFunc
|
||||
if ht.timeoutSet {
|
||||
@ -408,16 +408,18 @@ func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream
|
||||
|
||||
ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
|
||||
req := ht.req
|
||||
s := &Stream{
|
||||
id: 0, // irrelevant
|
||||
ctx: ctx,
|
||||
requestRead: func(int) {},
|
||||
s := &ServerStream{
|
||||
Stream: &Stream{
|
||||
id: 0, // irrelevant
|
||||
ctx: ctx,
|
||||
requestRead: func(int) {},
|
||||
buf: newRecvBuffer(),
|
||||
method: req.URL.Path,
|
||||
recvCompress: req.Header.Get("grpc-encoding"),
|
||||
contentSubtype: ht.contentSubtype,
|
||||
},
|
||||
cancel: cancel,
|
||||
buf: newRecvBuffer(),
|
||||
st: ht,
|
||||
method: req.URL.Path,
|
||||
recvCompress: req.Header.Get("grpc-encoding"),
|
||||
contentSubtype: ht.contentSubtype,
|
||||
headerWireLength: 0, // won't have access to header wire length until golang/go#18997.
|
||||
}
|
||||
s.trReader = &transportReader{
|
||||
@ -471,9 +473,7 @@ func (ht *serverHandlerTransport) runStream() {
|
||||
}
|
||||
}
|
||||
|
||||
func (ht *serverHandlerTransport) IncrMsgSent() {}
|
||||
|
||||
func (ht *serverHandlerTransport) IncrMsgRecv() {}
|
||||
func (ht *serverHandlerTransport) incrMsgRecv() {}
|
||||
|
||||
func (ht *serverHandlerTransport) Drain(string) {
|
||||
panic("Drain() is not implemented")
|
||||
|
91
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
91
vendor/google.golang.org/grpc/internal/transport/http2_client.go
generated
vendored
@ -123,7 +123,7 @@ type http2Client struct {
|
||||
mu sync.Mutex // guard the following variables
|
||||
nextID uint32
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
activeStreams map[uint32]*ClientStream
|
||||
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
|
||||
prevGoAwayID uint32
|
||||
// goAwayReason records the http2.ErrCode and debug data received with the
|
||||
@ -199,10 +199,10 @@ func isTemporary(err error) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
// NewHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||
// and starts to receive messages on it. Non-nil error returns if construction
|
||||
// fails.
|
||||
func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
|
||||
func NewHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ ClientTransport, err error) {
|
||||
scheme := "http"
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer func() {
|
||||
@ -339,7 +339,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
|
||||
fc: &trInFlow{limit: uint32(icwz)},
|
||||
scheme: scheme,
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
activeStreams: make(map[uint32]*ClientStream),
|
||||
isSecure: isSecure,
|
||||
perRPCCreds: perRPCCreds,
|
||||
kp: kp,
|
||||
@ -480,17 +480,19 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
|
||||
return t, nil
|
||||
}
|
||||
|
||||
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||
func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *ClientStream {
|
||||
// TODO(zhaoq): Handle uint32 overflow of Stream.id.
|
||||
s := &Stream{
|
||||
ct: t,
|
||||
done: make(chan struct{}),
|
||||
method: callHdr.Method,
|
||||
sendCompress: callHdr.SendCompress,
|
||||
buf: newRecvBuffer(),
|
||||
headerChan: make(chan struct{}),
|
||||
contentSubtype: callHdr.ContentSubtype,
|
||||
doneFunc: callHdr.DoneFunc,
|
||||
s := &ClientStream{
|
||||
Stream: &Stream{
|
||||
method: callHdr.Method,
|
||||
sendCompress: callHdr.SendCompress,
|
||||
buf: newRecvBuffer(),
|
||||
contentSubtype: callHdr.ContentSubtype,
|
||||
},
|
||||
ct: t,
|
||||
done: make(chan struct{}),
|
||||
headerChan: make(chan struct{}),
|
||||
doneFunc: callHdr.DoneFunc,
|
||||
}
|
||||
s.wq = newWriteQuota(defaultWriteQuota, s.done)
|
||||
s.requestRead = func(n int) {
|
||||
@ -506,7 +508,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
||||
ctxDone: s.ctx.Done(),
|
||||
recv: s.buf,
|
||||
closeStream: func(err error) {
|
||||
t.CloseStream(s, err)
|
||||
s.Close(err)
|
||||
},
|
||||
},
|
||||
windowHandler: func(n int) {
|
||||
@ -597,12 +599,6 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
|
||||
for k, v := range callAuthData {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
||||
}
|
||||
if b := stats.OutgoingTags(ctx); b != nil {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
|
||||
}
|
||||
if b := stats.OutgoingTrace(ctx); b != nil {
|
||||
headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
|
||||
}
|
||||
|
||||
if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
|
||||
var k string
|
||||
@ -738,7 +734,7 @@ func (e NewStreamError) Error() string {
|
||||
|
||||
// NewStream creates a stream and registers it into the transport as "active"
|
||||
// streams. All non-nil errors returned will be *NewStreamError.
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
|
||||
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error) {
|
||||
ctx = peer.NewContext(ctx, t.getPeer())
|
||||
|
||||
// ServerName field of the resolver returned address takes precedence over
|
||||
@ -763,7 +759,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
return
|
||||
}
|
||||
// The stream was unprocessed by the server.
|
||||
atomic.StoreUint32(&s.unprocessed, 1)
|
||||
s.unprocessed.Store(true)
|
||||
s.write(recvMsg{err: err})
|
||||
close(s.done)
|
||||
// If headerChan isn't closed, then close it.
|
||||
@ -908,21 +904,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream,
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// CloseStream clears the footprint of a stream when the stream is not needed any more.
|
||||
// This must not be executed in reader's goroutine.
|
||||
func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
var (
|
||||
rst bool
|
||||
rstCode http2.ErrCode
|
||||
)
|
||||
if err != nil {
|
||||
rst = true
|
||||
rstCode = http2.ErrCodeCancel
|
||||
}
|
||||
t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
|
||||
}
|
||||
|
||||
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
|
||||
func (t *http2Client) closeStream(s *ClientStream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
|
||||
// Set stream status to done.
|
||||
if s.swapState(streamDone) == streamDone {
|
||||
// If it was already done, return. If multiple closeStream calls
|
||||
@ -1085,7 +1067,7 @@ func (t *http2Client) GracefulClose() {
|
||||
|
||||
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
|
||||
// should proceed only if Write returns nil.
|
||||
func (t *http2Client) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error {
|
||||
func (t *http2Client) write(s *ClientStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
|
||||
reader := data.Reader()
|
||||
|
||||
if opts.Last {
|
||||
@ -1114,10 +1096,11 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *O
|
||||
_ = reader.Close()
|
||||
return err
|
||||
}
|
||||
t.incrMsgSent()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *http2Client) getStream(f http2.Frame) *Stream {
|
||||
func (t *http2Client) getStream(f http2.Frame) *ClientStream {
|
||||
t.mu.Lock()
|
||||
s := t.activeStreams[f.Header().StreamID]
|
||||
t.mu.Unlock()
|
||||
@ -1127,7 +1110,7 @@ func (t *http2Client) getStream(f http2.Frame) *Stream {
|
||||
// adjustWindow sends out extra window update over the initial window size
|
||||
// of stream if the application is requesting data larger in size than
|
||||
// the window.
|
||||
func (t *http2Client) adjustWindow(s *Stream, n uint32) {
|
||||
func (t *http2Client) adjustWindow(s *ClientStream, n uint32) {
|
||||
if w := s.fc.maybeAdjust(n); w > 0 {
|
||||
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
|
||||
}
|
||||
@ -1136,7 +1119,7 @@ func (t *http2Client) adjustWindow(s *Stream, n uint32) {
|
||||
// updateWindow adjusts the inbound quota for the stream.
|
||||
// Window updates will be sent out when the cumulative quota
|
||||
// exceeds the corresponding threshold.
|
||||
func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
||||
func (t *http2Client) updateWindow(s *ClientStream, n uint32) {
|
||||
if w := s.fc.onRead(n); w > 0 {
|
||||
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
|
||||
}
|
||||
@ -1242,7 +1225,7 @@ func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||
}
|
||||
if f.ErrCode == http2.ErrCodeRefusedStream {
|
||||
// The stream was unprocessed by the server.
|
||||
atomic.StoreUint32(&s.unprocessed, 1)
|
||||
s.unprocessed.Store(true)
|
||||
}
|
||||
statusCode, ok := http2ErrConvTab[f.ErrCode]
|
||||
if !ok {
|
||||
@ -1383,11 +1366,11 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) error {
|
||||
return connectionErrorf(true, nil, "received goaway and there are no active streams")
|
||||
}
|
||||
|
||||
streamsToClose := make([]*Stream, 0)
|
||||
streamsToClose := make([]*ClientStream, 0)
|
||||
for streamID, stream := range t.activeStreams {
|
||||
if streamID > id && streamID <= upperLimit {
|
||||
// The stream was unprocessed by the server.
|
||||
atomic.StoreUint32(&stream.unprocessed, 1)
|
||||
stream.unprocessed.Store(true)
|
||||
streamsToClose = append(streamsToClose, stream)
|
||||
}
|
||||
}
|
||||
@ -1439,7 +1422,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
||||
return
|
||||
}
|
||||
endStream := frame.StreamEnded()
|
||||
atomic.StoreUint32(&s.bytesReceived, 1)
|
||||
s.bytesReceived.Store(true)
|
||||
initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
|
||||
|
||||
if !initialHeader && !endStream {
|
||||
@ -1809,14 +1792,18 @@ func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
|
||||
|
||||
func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
|
||||
|
||||
func (t *http2Client) IncrMsgSent() {
|
||||
t.channelz.SocketMetrics.MessagesSent.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
|
||||
func (t *http2Client) incrMsgSent() {
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.MessagesSent.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Client) IncrMsgRecv() {
|
||||
t.channelz.SocketMetrics.MessagesReceived.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
|
||||
func (t *http2Client) incrMsgRecv() {
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.MessagesReceived.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Client) getOutFlowWindow() int64 {
|
||||
|
69
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
69
vendor/google.golang.org/grpc/internal/transport/http2_server.go
generated
vendored
@ -25,7 +25,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
rand "math/rand/v2"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
@ -111,7 +111,7 @@ type http2Server struct {
|
||||
// already initialized since draining is already underway.
|
||||
drainEvent *grpcsync.Event
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
activeStreams map[uint32]*ServerStream
|
||||
// idle is the time instant when the connection went idle.
|
||||
// This is either the beginning of the connection or when the number of
|
||||
// RPCs go down to 0.
|
||||
@ -256,7 +256,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
inTapHandle: config.InTapHandle,
|
||||
fc: &trInFlow{limit: uint32(icwz)},
|
||||
state: reachable,
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
activeStreams: make(map[uint32]*ServerStream),
|
||||
stats: config.StatsHandlers,
|
||||
kp: kp,
|
||||
idle: time.Now(),
|
||||
@ -359,7 +359,7 @@ func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport,
|
||||
|
||||
// operateHeaders takes action on the decoded headers. Returns an error if fatal
|
||||
// error encountered and transport needs to close, otherwise returns nil.
|
||||
func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
|
||||
func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*ServerStream)) error {
|
||||
// Acquire max stream ID lock for entire duration
|
||||
t.maxStreamMu.Lock()
|
||||
defer t.maxStreamMu.Unlock()
|
||||
@ -385,11 +385,13 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
t.maxStreamID = streamID
|
||||
|
||||
buf := newRecvBuffer()
|
||||
s := &Stream{
|
||||
id: streamID,
|
||||
s := &ServerStream{
|
||||
Stream: &Stream{
|
||||
id: streamID,
|
||||
buf: buf,
|
||||
fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
||||
},
|
||||
st: t,
|
||||
buf: buf,
|
||||
fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
||||
headerWireLength: int(frame.Header().Length),
|
||||
}
|
||||
var (
|
||||
@ -537,12 +539,6 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
// Attach the received metadata to the context.
|
||||
if len(mdata) > 0 {
|
||||
s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
|
||||
if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
|
||||
s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
|
||||
}
|
||||
if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
|
||||
s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
|
||||
}
|
||||
}
|
||||
t.mu.Lock()
|
||||
if t.state != reachable {
|
||||
@ -634,7 +630,7 @@ func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeade
|
||||
// HandleStreams receives incoming streams using the given handler. This is
|
||||
// typically run in a separate goroutine.
|
||||
// traceCtx attaches trace to ctx and returns the new context.
|
||||
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
|
||||
func (t *http2Server) HandleStreams(ctx context.Context, handle func(*ServerStream)) {
|
||||
defer func() {
|
||||
close(t.readerDone)
|
||||
<-t.loopyWriterDone
|
||||
@ -698,7 +694,7 @@ func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
|
||||
func (t *http2Server) getStream(f http2.Frame) (*ServerStream, bool) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
if t.activeStreams == nil {
|
||||
@ -716,7 +712,7 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
|
||||
// adjustWindow sends out extra window update over the initial window size
|
||||
// of stream if the application is requesting data larger in size than
|
||||
// the window.
|
||||
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
|
||||
func (t *http2Server) adjustWindow(s *ServerStream, n uint32) {
|
||||
if w := s.fc.maybeAdjust(n); w > 0 {
|
||||
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
|
||||
}
|
||||
@ -726,7 +722,7 @@ func (t *http2Server) adjustWindow(s *Stream, n uint32) {
|
||||
// updateWindow adjusts the inbound quota for the stream and the transport.
|
||||
// Window updates will deliver to the controller for sending when
|
||||
// the cumulative quota exceeds the corresponding threshold.
|
||||
func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
||||
func (t *http2Server) updateWindow(s *ServerStream, n uint32) {
|
||||
if w := s.fc.onRead(n); w > 0 {
|
||||
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
|
||||
increment: w,
|
||||
@ -963,7 +959,7 @@ func (t *http2Server) checkForHeaderListSize(it any) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *http2Server) streamContextErr(s *Stream) error {
|
||||
func (t *http2Server) streamContextErr(s *ServerStream) error {
|
||||
select {
|
||||
case <-t.done:
|
||||
return ErrConnClosing
|
||||
@ -973,7 +969,7 @@ func (t *http2Server) streamContextErr(s *Stream) error {
|
||||
}
|
||||
|
||||
// WriteHeader sends the header metadata md back to the client.
|
||||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||
func (t *http2Server) writeHeader(s *ServerStream, md metadata.MD) error {
|
||||
s.hdrMu.Lock()
|
||||
defer s.hdrMu.Unlock()
|
||||
if s.getState() == streamDone {
|
||||
@ -1006,7 +1002,7 @@ func (t *http2Server) setResetPingStrikes() {
|
||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||
}
|
||||
|
||||
func (t *http2Server) writeHeaderLocked(s *Stream) error {
|
||||
func (t *http2Server) writeHeaderLocked(s *ServerStream) error {
|
||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||
// first and create a slice of that exact size.
|
||||
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
|
||||
@ -1046,7 +1042,7 @@ func (t *http2Server) writeHeaderLocked(s *Stream) error {
|
||||
// There is no further I/O operations being able to perform on this stream.
|
||||
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
|
||||
// OK is adopted.
|
||||
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
func (t *http2Server) writeStatus(s *ServerStream, st *status.Status) error {
|
||||
s.hdrMu.Lock()
|
||||
defer s.hdrMu.Unlock()
|
||||
|
||||
@ -1117,11 +1113,11 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||
|
||||
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
|
||||
// is returns if it fails (e.g., framing error, transport error).
|
||||
func (t *http2Server) Write(s *Stream, hdr []byte, data mem.BufferSlice, _ *Options) error {
|
||||
func (t *http2Server) write(s *ServerStream, hdr []byte, data mem.BufferSlice, _ *WriteOptions) error {
|
||||
reader := data.Reader()
|
||||
|
||||
if !s.isHeaderSent() { // Headers haven't been written yet.
|
||||
if err := t.WriteHeader(s, nil); err != nil {
|
||||
if err := t.writeHeader(s, nil); err != nil {
|
||||
_ = reader.Close()
|
||||
return err
|
||||
}
|
||||
@ -1147,6 +1143,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data mem.BufferSlice, _ *Opti
|
||||
_ = reader.Close()
|
||||
return err
|
||||
}
|
||||
t.incrMsgSent()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1276,7 +1273,7 @@ func (t *http2Server) Close(err error) {
|
||||
}
|
||||
|
||||
// deleteStream deletes the stream s from transport's active streams.
|
||||
func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
||||
func (t *http2Server) deleteStream(s *ServerStream, eosReceived bool) {
|
||||
|
||||
t.mu.Lock()
|
||||
if _, ok := t.activeStreams[s.id]; ok {
|
||||
@ -1297,7 +1294,7 @@ func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
|
||||
}
|
||||
|
||||
// finishStream closes the stream and puts the trailing headerFrame into controlbuf.
|
||||
func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
|
||||
func (t *http2Server) finishStream(s *ServerStream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
@ -1321,7 +1318,7 @@ func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, h
|
||||
}
|
||||
|
||||
// closeStream clears the footprint of a stream when the stream is not needed any more.
|
||||
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
|
||||
func (t *http2Server) closeStream(s *ServerStream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
// called to interrupt the potential blocking on other goroutines.
|
||||
@ -1415,14 +1412,18 @@ func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) IncrMsgSent() {
|
||||
t.channelz.SocketMetrics.MessagesSent.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
|
||||
func (t *http2Server) incrMsgSent() {
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.MessagesSent.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) IncrMsgRecv() {
|
||||
t.channelz.SocketMetrics.MessagesReceived.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
|
||||
func (t *http2Server) incrMsgRecv() {
|
||||
if channelz.IsOn() {
|
||||
t.channelz.SocketMetrics.MessagesReceived.Add(1)
|
||||
t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Server) getOutFlowWindow() int64 {
|
||||
@ -1455,7 +1456,7 @@ func getJitter(v time.Duration) time.Duration {
|
||||
}
|
||||
// Generate a jitter between +/- 10% of the value.
|
||||
r := int64(v / 10)
|
||||
j := rand.Int63n(2*r) - r
|
||||
j := rand.Int64N(2*r) - r
|
||||
return time.Duration(j)
|
||||
}
|
||||
|
||||
|
178
vendor/google.golang.org/grpc/internal/transport/server_stream.go
generated
vendored
Normal file
178
vendor/google.golang.org/grpc/internal/transport/server_stream.go
generated
vendored
Normal file
@ -0,0 +1,178 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2024 gRPC authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*
|
||||
*/
|
||||
|
||||
package transport
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// ServerStream implements streaming functionality for a gRPC server.
|
||||
type ServerStream struct {
|
||||
*Stream // Embed for common stream functionality.
|
||||
|
||||
st internalServerTransport
|
||||
ctxDone <-chan struct{} // closed at the end of stream. Cache of ctx.Done() (for performance)
|
||||
cancel context.CancelFunc // invoked at the end of stream to cancel ctx.
|
||||
|
||||
// Holds compressor names passed in grpc-accept-encoding metadata from the
|
||||
// client.
|
||||
clientAdvertisedCompressors string
|
||||
headerWireLength int
|
||||
|
||||
// hdrMu protects outgoing header and trailer metadata.
|
||||
hdrMu sync.Mutex
|
||||
header metadata.MD // the outgoing header metadata. Updated by WriteHeader.
|
||||
headerSent atomic.Bool // atomically set when the headers are sent out.
|
||||
}
|
||||
|
||||
// Read reads an n byte message from the input stream.
|
||||
func (s *ServerStream) Read(n int) (mem.BufferSlice, error) {
|
||||
b, err := s.Stream.read(n)
|
||||
if err == nil {
|
||||
s.st.incrMsgRecv()
|
||||
}
|
||||
return b, err
|
||||
}
|
||||
|
||||
// SendHeader sends the header metadata for the given stream.
|
||||
func (s *ServerStream) SendHeader(md metadata.MD) error {
|
||||
return s.st.writeHeader(s, md)
|
||||
}
|
||||
|
||||
// Write writes the hdr and data bytes to the output stream.
|
||||
func (s *ServerStream) Write(hdr []byte, data mem.BufferSlice, opts *WriteOptions) error {
|
||||
return s.st.write(s, hdr, data, opts)
|
||||
}
|
||||
|
||||
// WriteStatus sends the status of a stream to the client. WriteStatus is
|
||||
// the final call made on a stream and always occurs.
|
||||
func (s *ServerStream) WriteStatus(st *status.Status) error {
|
||||
return s.st.writeStatus(s, st)
|
||||
}
|
||||
|
||||
// isHeaderSent indicates whether headers have been sent.
|
||||
func (s *ServerStream) isHeaderSent() bool {
|
||||
return s.headerSent.Load()
|
||||
}
|
||||
|
||||
// updateHeaderSent updates headerSent and returns true
|
||||
// if it was already set.
|
||||
func (s *ServerStream) updateHeaderSent() bool {
|
||||
return s.headerSent.Swap(true)
|
||||
}
|
||||
|
||||
// RecvCompress returns the compression algorithm applied to the inbound
|
||||
// message. It is empty string if there is no compression applied.
|
||||
func (s *ServerStream) RecvCompress() string {
|
||||
return s.recvCompress
|
||||
}
|
||||
|
||||
// SendCompress returns the send compressor name.
|
||||
func (s *ServerStream) SendCompress() string {
|
||||
return s.sendCompress
|
||||
}
|
||||
|
||||
// ContentSubtype returns the content-subtype for a request. For example, a
|
||||
// content-subtype of "proto" will result in a content-type of
|
||||
// "application/grpc+proto". This will always be lowercase. See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
|
||||
// more details.
|
||||
func (s *ServerStream) ContentSubtype() string {
|
||||
return s.contentSubtype
|
||||
}
|
||||
|
||||
// SetSendCompress sets the compression algorithm to the stream.
|
||||
func (s *ServerStream) SetSendCompress(name string) error {
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return errors.New("transport: set send compressor called after headers sent or stream done")
|
||||
}
|
||||
|
||||
s.sendCompress = name
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetContext sets the context of the stream. This will be deleted once the
|
||||
// stats handler callouts all move to gRPC layer.
|
||||
func (s *ServerStream) SetContext(ctx context.Context) {
|
||||
s.ctx = ctx
|
||||
}
|
||||
|
||||
// ClientAdvertisedCompressors returns the compressor names advertised by the
|
||||
// client via grpc-accept-encoding header.
|
||||
func (s *ServerStream) ClientAdvertisedCompressors() []string {
|
||||
values := strings.Split(s.clientAdvertisedCompressors, ",")
|
||||
for i, v := range values {
|
||||
values[i] = strings.TrimSpace(v)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// Header returns the header metadata of the stream. It returns the out header
|
||||
// after t.WriteHeader is called. It does not block and must not be called
|
||||
// until after WriteHeader.
|
||||
func (s *ServerStream) Header() (metadata.MD, error) {
|
||||
// Return the header in stream. It will be the out
|
||||
// header after t.WriteHeader is called.
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
|
||||
// HeaderWireLength returns the size of the headers of the stream as received
|
||||
// from the wire.
|
||||
func (s *ServerStream) HeaderWireLength() int {
|
||||
return s.headerWireLength
|
||||
}
|
||||
|
||||
// SetHeader sets the header metadata. This can be called multiple times.
|
||||
// This should not be called in parallel to other data writes.
|
||||
func (s *ServerStream) SetHeader(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.header = metadata.Join(s.header, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetTrailer sets the trailer metadata which will be sent with the RPC status
|
||||
// by the server. This can be called multiple times.
|
||||
// This should not be called parallel to other data writes.
|
||||
func (s *ServerStream) SetTrailer(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.trailer = metadata.Join(s.trailer, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
321
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
321
vendor/google.golang.org/grpc/internal/transport/transport.go
generated
vendored
@ -27,7 +27,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -39,7 +38,6 @@ import (
|
||||
"google.golang.org/grpc/mem"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/grpc/tap"
|
||||
@ -133,7 +131,7 @@ type recvBufferReader struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) ReadHeader(header []byte) (n int, err error) {
|
||||
func (r *recvBufferReader) ReadMessageHeader(header []byte) (n int, err error) {
|
||||
if r.err != nil {
|
||||
return 0, r.err
|
||||
}
|
||||
@ -142,9 +140,9 @@ func (r *recvBufferReader) ReadHeader(header []byte) (n int, err error) {
|
||||
return n, nil
|
||||
}
|
||||
if r.closeStream != nil {
|
||||
n, r.err = r.readHeaderClient(header)
|
||||
n, r.err = r.readMessageHeaderClient(header)
|
||||
} else {
|
||||
n, r.err = r.readHeader(header)
|
||||
n, r.err = r.readMessageHeader(header)
|
||||
}
|
||||
return n, r.err
|
||||
}
|
||||
@ -174,12 +172,12 @@ func (r *recvBufferReader) Read(n int) (buf mem.Buffer, err error) {
|
||||
return buf, r.err
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) readHeader(header []byte) (n int, err error) {
|
||||
func (r *recvBufferReader) readMessageHeader(header []byte) (n int, err error) {
|
||||
select {
|
||||
case <-r.ctxDone:
|
||||
return 0, ContextErr(r.ctx.Err())
|
||||
case m := <-r.recv.get():
|
||||
return r.readHeaderAdditional(m, header)
|
||||
return r.readMessageHeaderAdditional(m, header)
|
||||
}
|
||||
}
|
||||
|
||||
@ -192,7 +190,7 @@ func (r *recvBufferReader) read(n int) (buf mem.Buffer, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) readHeaderClient(header []byte) (n int, err error) {
|
||||
func (r *recvBufferReader) readMessageHeaderClient(header []byte) (n int, err error) {
|
||||
// If the context is canceled, then closes the stream with nil metadata.
|
||||
// closeStream writes its error parameter to r.recv as a recvMsg.
|
||||
// r.readAdditional acts on that message and returns the necessary error.
|
||||
@ -213,9 +211,9 @@ func (r *recvBufferReader) readHeaderClient(header []byte) (n int, err error) {
|
||||
// faster.
|
||||
r.closeStream(ContextErr(r.ctx.Err()))
|
||||
m := <-r.recv.get()
|
||||
return r.readHeaderAdditional(m, header)
|
||||
return r.readMessageHeaderAdditional(m, header)
|
||||
case m := <-r.recv.get():
|
||||
return r.readHeaderAdditional(m, header)
|
||||
return r.readMessageHeaderAdditional(m, header)
|
||||
}
|
||||
}
|
||||
|
||||
@ -246,7 +244,7 @@ func (r *recvBufferReader) readClient(n int) (buf mem.Buffer, err error) {
|
||||
}
|
||||
}
|
||||
|
||||
func (r *recvBufferReader) readHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
|
||||
func (r *recvBufferReader) readMessageHeaderAdditional(m recvMsg, header []byte) (n int, err error) {
|
||||
r.recv.load()
|
||||
if m.err != nil {
|
||||
if m.buffer != nil {
|
||||
@ -288,14 +286,8 @@ const (
|
||||
// Stream represents an RPC in the transport layer.
|
||||
type Stream struct {
|
||||
id uint32
|
||||
st ServerTransport // nil for client side Stream
|
||||
ct ClientTransport // nil for server side Stream
|
||||
ctx context.Context // the associated context of the stream
|
||||
cancel context.CancelFunc // always nil for client side Stream
|
||||
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
|
||||
doneFunc func() // invoked at the end of stream on client side.
|
||||
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
|
||||
method string // the associated RPC method of the stream
|
||||
ctx context.Context // the associated context of the stream
|
||||
method string // the associated RPC method of the stream
|
||||
recvCompress string
|
||||
sendCompress string
|
||||
buf *recvBuffer
|
||||
@ -303,58 +295,17 @@ type Stream struct {
|
||||
fc *inFlow
|
||||
wq *writeQuota
|
||||
|
||||
// Holds compressor names passed in grpc-accept-encoding metadata from the
|
||||
// client. This is empty for the client side stream.
|
||||
clientAdvertisedCompressors string
|
||||
// Callback to state application's intentions to read data. This
|
||||
// is used to adjust flow control, if needed.
|
||||
requestRead func(int)
|
||||
|
||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||
// headerValid indicates whether a valid header was received. Only
|
||||
// meaningful after headerChan is closed (always call waitOnHeader() before
|
||||
// reading its value). Not valid on server side.
|
||||
headerValid bool
|
||||
headerWireLength int // Only set on server side.
|
||||
|
||||
// hdrMu protects header and trailer metadata on the server-side.
|
||||
hdrMu sync.Mutex
|
||||
// On client side, header keeps the received header metadata.
|
||||
//
|
||||
// On server side, header keeps the header set by SetHeader(). The complete
|
||||
// header will merged into this after t.WriteHeader() is called.
|
||||
header metadata.MD
|
||||
trailer metadata.MD // the key-value map of trailer metadata.
|
||||
|
||||
noHeaders bool // set if the client never received headers (set only after the stream is done).
|
||||
|
||||
// On the server-side, headerSent is atomically set to 1 when the headers are sent out.
|
||||
headerSent uint32
|
||||
|
||||
state streamState
|
||||
|
||||
// On client-side it is the status error received from the server.
|
||||
// On server-side it is unused.
|
||||
status *status.Status
|
||||
|
||||
bytesReceived uint32 // indicates whether any bytes have been received on this stream
|
||||
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
|
||||
|
||||
// contentSubtype is the content-subtype for requests.
|
||||
// this must be lowercase or the behavior is undefined.
|
||||
contentSubtype string
|
||||
}
|
||||
|
||||
// isHeaderSent is only valid on the server-side.
|
||||
func (s *Stream) isHeaderSent() bool {
|
||||
return atomic.LoadUint32(&s.headerSent) == 1
|
||||
}
|
||||
|
||||
// updateHeaderSent updates headerSent and returns true
|
||||
// if it was already set. It is valid only on server-side.
|
||||
func (s *Stream) updateHeaderSent() bool {
|
||||
return atomic.SwapUint32(&s.headerSent, 1) == 1
|
||||
trailer metadata.MD // the key-value map of trailer metadata.
|
||||
}
|
||||
|
||||
func (s *Stream) swapState(st streamState) streamState {
|
||||
@ -369,110 +320,12 @@ func (s *Stream) getState() streamState {
|
||||
return streamState(atomic.LoadUint32((*uint32)(&s.state)))
|
||||
}
|
||||
|
||||
func (s *Stream) waitOnHeader() {
|
||||
if s.headerChan == nil {
|
||||
// On the server headerChan is always nil since a stream originates
|
||||
// only after having received headers.
|
||||
return
|
||||
}
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
// Close the stream to prevent headers/trailers from changing after
|
||||
// this function returns.
|
||||
s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
|
||||
// headerChan could possibly not be closed yet if closeStream raced
|
||||
// with operateHeaders; wait until it is closed explicitly here.
|
||||
<-s.headerChan
|
||||
case <-s.headerChan:
|
||||
}
|
||||
}
|
||||
|
||||
// RecvCompress returns the compression algorithm applied to the inbound
|
||||
// message. It is empty string if there is no compression applied.
|
||||
func (s *Stream) RecvCompress() string {
|
||||
s.waitOnHeader()
|
||||
return s.recvCompress
|
||||
}
|
||||
|
||||
// SetSendCompress sets the compression algorithm to the stream.
|
||||
func (s *Stream) SetSendCompress(name string) error {
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return errors.New("transport: set send compressor called after headers sent or stream done")
|
||||
}
|
||||
|
||||
s.sendCompress = name
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendCompress returns the send compressor name.
|
||||
func (s *Stream) SendCompress() string {
|
||||
return s.sendCompress
|
||||
}
|
||||
|
||||
// ClientAdvertisedCompressors returns the compressor names advertised by the
|
||||
// client via grpc-accept-encoding header.
|
||||
func (s *Stream) ClientAdvertisedCompressors() []string {
|
||||
values := strings.Split(s.clientAdvertisedCompressors, ",")
|
||||
for i, v := range values {
|
||||
values[i] = strings.TrimSpace(v)
|
||||
}
|
||||
return values
|
||||
}
|
||||
|
||||
// Done returns a channel which is closed when it receives the final status
|
||||
// from the server.
|
||||
func (s *Stream) Done() <-chan struct{} {
|
||||
return s.done
|
||||
}
|
||||
|
||||
// Header returns the header metadata of the stream.
|
||||
//
|
||||
// On client side, it acquires the key-value pairs of header metadata once it is
|
||||
// available. It blocks until i) the metadata is ready or ii) there is no header
|
||||
// metadata or iii) the stream is canceled/expired.
|
||||
//
|
||||
// On server side, it returns the out header after t.WriteHeader is called. It
|
||||
// does not block and must not be called until after WriteHeader.
|
||||
func (s *Stream) Header() (metadata.MD, error) {
|
||||
if s.headerChan == nil {
|
||||
// On server side, return the header in stream. It will be the out
|
||||
// header after t.WriteHeader is called.
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
s.waitOnHeader()
|
||||
|
||||
if !s.headerValid || s.noHeaders {
|
||||
return nil, s.status.Err()
|
||||
}
|
||||
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
|
||||
// TrailersOnly blocks until a header or trailers-only frame is received and
|
||||
// then returns true if the stream was trailers-only. If the stream ends
|
||||
// before headers are received, returns true, nil. Client-side only.
|
||||
func (s *Stream) TrailersOnly() bool {
|
||||
s.waitOnHeader()
|
||||
return s.noHeaders
|
||||
}
|
||||
|
||||
// Trailer returns the cached trailer metadata. Note that if it is not called
|
||||
// after the entire stream is done, it could return an empty MD. Client
|
||||
// side only.
|
||||
// after the entire stream is done, it could return an empty MD.
|
||||
// It can be safely read only after stream has ended that is either read
|
||||
// or write have returned io.EOF.
|
||||
func (s *Stream) Trailer() metadata.MD {
|
||||
c := s.trailer.Copy()
|
||||
return c
|
||||
}
|
||||
|
||||
// ContentSubtype returns the content-subtype for a request. For example, a
|
||||
// content-subtype of "proto" will result in a content-type of
|
||||
// "application/grpc+proto". This will always be lowercase. See
|
||||
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
|
||||
// more details.
|
||||
func (s *Stream) ContentSubtype() string {
|
||||
return s.contentSubtype
|
||||
return s.trailer.Copy()
|
||||
}
|
||||
|
||||
// Context returns the context of the stream.
|
||||
@ -480,90 +333,31 @@ func (s *Stream) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
// SetContext sets the context of the stream. This will be deleted once the
|
||||
// stats handler callouts all move to gRPC layer.
|
||||
func (s *Stream) SetContext(ctx context.Context) {
|
||||
s.ctx = ctx
|
||||
}
|
||||
|
||||
// Method returns the method for the stream.
|
||||
func (s *Stream) Method() string {
|
||||
return s.method
|
||||
}
|
||||
|
||||
// Status returns the status received from the server.
|
||||
// Status can be read safely only after the stream has ended,
|
||||
// that is, after Done() is closed.
|
||||
func (s *Stream) Status() *status.Status {
|
||||
return s.status
|
||||
}
|
||||
|
||||
// HeaderWireLength returns the size of the headers of the stream as received
|
||||
// from the wire. Valid only on the server.
|
||||
func (s *Stream) HeaderWireLength() int {
|
||||
return s.headerWireLength
|
||||
}
|
||||
|
||||
// SetHeader sets the header metadata. This can be called multiple times.
|
||||
// Server side only.
|
||||
// This should not be called in parallel to other data writes.
|
||||
func (s *Stream) SetHeader(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.isHeaderSent() || s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.header = metadata.Join(s.header, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// SendHeader sends the given header metadata. The given metadata is
|
||||
// combined with any metadata set by previous calls to SetHeader and
|
||||
// then written to the transport stream.
|
||||
func (s *Stream) SendHeader(md metadata.MD) error {
|
||||
return s.st.WriteHeader(s, md)
|
||||
}
|
||||
|
||||
// SetTrailer sets the trailer metadata which will be sent with the RPC status
|
||||
// by the server. This can be called multiple times. Server side only.
|
||||
// This should not be called parallel to other data writes.
|
||||
func (s *Stream) SetTrailer(md metadata.MD) error {
|
||||
if md.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if s.getState() == streamDone {
|
||||
return ErrIllegalHeaderWrite
|
||||
}
|
||||
s.hdrMu.Lock()
|
||||
s.trailer = metadata.Join(s.trailer, md)
|
||||
s.hdrMu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Stream) write(m recvMsg) {
|
||||
s.buf.put(m)
|
||||
}
|
||||
|
||||
// ReadHeader reads data into the provided header slice from the stream. It
|
||||
// first checks if there was an error during a previous read operation and
|
||||
// ReadMessageHeader reads data into the provided header slice from the stream.
|
||||
// It first checks if there was an error during a previous read operation and
|
||||
// returns it if present. It then requests a read operation for the length of
|
||||
// the header. It continues to read from the stream until the entire header
|
||||
// slice is filled or an error occurs. If an `io.EOF` error is encountered
|
||||
// with partially read data, it is converted to `io.ErrUnexpectedEOF` to
|
||||
// indicate an unexpected end of the stream. The method returns any error
|
||||
// encountered during the read process or nil if the header was successfully
|
||||
// read.
|
||||
func (s *Stream) ReadHeader(header []byte) (err error) {
|
||||
// slice is filled or an error occurs. If an `io.EOF` error is encountered with
|
||||
// partially read data, it is converted to `io.ErrUnexpectedEOF` to indicate an
|
||||
// unexpected end of the stream. The method returns any error encountered during
|
||||
// the read process or nil if the header was successfully read.
|
||||
func (s *Stream) ReadMessageHeader(header []byte) (err error) {
|
||||
// Don't request a read if there was an error earlier
|
||||
if er := s.trReader.er; er != nil {
|
||||
return er
|
||||
}
|
||||
s.requestRead(len(header))
|
||||
for len(header) != 0 {
|
||||
n, err := s.trReader.ReadHeader(header)
|
||||
n, err := s.trReader.ReadMessageHeader(header)
|
||||
header = header[n:]
|
||||
if len(header) == 0 {
|
||||
err = nil
|
||||
@ -579,7 +373,7 @@ func (s *Stream) ReadHeader(header []byte) (err error) {
|
||||
}
|
||||
|
||||
// Read reads n bytes from the wire for this stream.
|
||||
func (s *Stream) Read(n int) (data mem.BufferSlice, err error) {
|
||||
func (s *Stream) read(n int) (data mem.BufferSlice, err error) {
|
||||
// Don't request a read if there was an error earlier
|
||||
if er := s.trReader.er; er != nil {
|
||||
return nil, er
|
||||
@ -619,8 +413,8 @@ type transportReader struct {
|
||||
er error
|
||||
}
|
||||
|
||||
func (t *transportReader) ReadHeader(header []byte) (int, error) {
|
||||
n, err := t.reader.ReadHeader(header)
|
||||
func (t *transportReader) ReadMessageHeader(header []byte) (int, error) {
|
||||
n, err := t.reader.ReadMessageHeader(header)
|
||||
if err != nil {
|
||||
t.er = err
|
||||
return 0, err
|
||||
@ -639,17 +433,6 @@ func (t *transportReader) Read(n int) (mem.Buffer, error) {
|
||||
return buf, nil
|
||||
}
|
||||
|
||||
// BytesReceived indicates whether any bytes have been received on this stream.
|
||||
func (s *Stream) BytesReceived() bool {
|
||||
return atomic.LoadUint32(&s.bytesReceived) == 1
|
||||
}
|
||||
|
||||
// Unprocessed indicates whether the server did not process this stream --
|
||||
// i.e. it sent a refused stream or GOAWAY including this stream ID.
|
||||
func (s *Stream) Unprocessed() bool {
|
||||
return atomic.LoadUint32(&s.unprocessed) == 1
|
||||
}
|
||||
|
||||
// GoString is implemented by Stream so context.String() won't
|
||||
// race when printing %#v.
|
||||
func (s *Stream) GoString() string {
|
||||
@ -725,15 +508,9 @@ type ConnectOptions struct {
|
||||
BufferPool mem.BufferPool
|
||||
}
|
||||
|
||||
// NewClientTransport establishes the transport with the required ConnectOptions
|
||||
// and returns it to the caller.
|
||||
func NewClientTransport(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (ClientTransport, error) {
|
||||
return newHTTP2Client(connectCtx, ctx, addr, opts, onClose)
|
||||
}
|
||||
|
||||
// Options provides additional hints and information for message
|
||||
// WriteOptions provides additional hints and information for message
|
||||
// transmission.
|
||||
type Options struct {
|
||||
type WriteOptions struct {
|
||||
// Last indicates whether this write is the last piece for
|
||||
// this stream.
|
||||
Last bool
|
||||
@ -782,18 +559,8 @@ type ClientTransport interface {
|
||||
// It does not block.
|
||||
GracefulClose()
|
||||
|
||||
// Write sends the data for the given stream. A nil stream indicates
|
||||
// the write is to be performed on the transport as a whole.
|
||||
Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error
|
||||
|
||||
// NewStream creates a Stream for an RPC.
|
||||
NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error)
|
||||
|
||||
// CloseStream clears the footprint of a stream when the stream is
|
||||
// not needed any more. The err indicates the error incurred when
|
||||
// CloseStream is called. Must be called when a stream is finished
|
||||
// unless the associated transport is closing.
|
||||
CloseStream(stream *Stream, err error)
|
||||
NewStream(ctx context.Context, callHdr *CallHdr) (*ClientStream, error)
|
||||
|
||||
// Error returns a channel that is closed when some I/O error
|
||||
// happens. Typically the caller should have a goroutine to monitor
|
||||
@ -813,12 +580,6 @@ type ClientTransport interface {
|
||||
|
||||
// RemoteAddr returns the remote network address.
|
||||
RemoteAddr() net.Addr
|
||||
|
||||
// IncrMsgSent increments the number of message sent through this transport.
|
||||
IncrMsgSent()
|
||||
|
||||
// IncrMsgRecv increments the number of message received through this transport.
|
||||
IncrMsgRecv()
|
||||
}
|
||||
|
||||
// ServerTransport is the common interface for all gRPC server-side transport
|
||||
@ -828,19 +589,7 @@ type ClientTransport interface {
|
||||
// Write methods for a given Stream will be called serially.
|
||||
type ServerTransport interface {
|
||||
// HandleStreams receives incoming streams using the given handler.
|
||||
HandleStreams(context.Context, func(*Stream))
|
||||
|
||||
// WriteHeader sends the header metadata for the given stream.
|
||||
// WriteHeader may not be called on all streams.
|
||||
WriteHeader(s *Stream, md metadata.MD) error
|
||||
|
||||
// Write sends the data for the given stream.
|
||||
// Write may not be called on all streams.
|
||||
Write(s *Stream, hdr []byte, data mem.BufferSlice, opts *Options) error
|
||||
|
||||
// WriteStatus sends the status of a stream to the client. WriteStatus is
|
||||
// the final call made on a stream and always occurs.
|
||||
WriteStatus(s *Stream, st *status.Status) error
|
||||
HandleStreams(context.Context, func(*ServerStream))
|
||||
|
||||
// Close tears down the transport. Once it is called, the transport
|
||||
// should not be accessed any more. All the pending streams and their
|
||||
@ -852,12 +601,14 @@ type ServerTransport interface {
|
||||
|
||||
// Drain notifies the client this ServerTransport stops accepting new RPCs.
|
||||
Drain(debugData string)
|
||||
}
|
||||
|
||||
// IncrMsgSent increments the number of message sent through this transport.
|
||||
IncrMsgSent()
|
||||
|
||||
// IncrMsgRecv increments the number of message received through this transport.
|
||||
IncrMsgRecv()
|
||||
type internalServerTransport interface {
|
||||
ServerTransport
|
||||
writeHeader(s *ServerStream, md metadata.MD) error
|
||||
write(s *ServerStream, hdr []byte, data mem.BufferSlice, opts *WriteOptions) error
|
||||
writeStatus(s *ServerStream, st *status.Status) error
|
||||
incrMsgRecv()
|
||||
}
|
||||
|
||||
// connectionErrorf creates an ConnectionError with the specified error description.
|
||||
|
Reference in New Issue
Block a user