rebase: bump go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc

Bumps [go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc](https://github.com/open-telemetry/opentelemetry-go-contrib) from 0.35.0 to 0.46.0.
- [Release notes](https://github.com/open-telemetry/opentelemetry-go-contrib/releases)
- [Changelog](https://github.com/open-telemetry/opentelemetry-go-contrib/blob/main/CHANGELOG.md)
- [Commits](https://github.com/open-telemetry/opentelemetry-go-contrib/compare/zpages/v0.35.0...zpages/v0.46.0)

---
updated-dependencies:
- dependency-name: go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2023-12-21 14:13:18 +00:00
committed by mergify[bot]
parent e3e2560512
commit 30150c3be9
32 changed files with 924 additions and 173 deletions

View File

@ -24,8 +24,8 @@ import (
)
const (
// instrumentationName is the name of this instrumentation package.
instrumentationName = "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
// ScopeName is the instrumentation scope name.
ScopeName = "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
// GRPCStatusCodeKey is convention for numeric status code of a gRPC request.
GRPCStatusCodeKey = attribute.Key("rpc.grpc.status_code")
)
@ -37,13 +37,23 @@ type Filter func(*InterceptorInfo) bool
// config is a group of options for this instrumentation.
type config struct {
Filter Filter
Propagators propagation.TextMapPropagator
TracerProvider trace.TracerProvider
MeterProvider metric.MeterProvider
Filter Filter
Propagators propagation.TextMapPropagator
TracerProvider trace.TracerProvider
MeterProvider metric.MeterProvider
SpanStartOptions []trace.SpanStartOption
meter metric.Meter
rpcServerDuration metric.Int64Histogram
ReceivedEvent bool
SentEvent bool
tracer trace.Tracer
meter metric.Meter
rpcDuration metric.Float64Histogram
rpcRequestSize metric.Int64Histogram
rpcResponseSize metric.Int64Histogram
rpcRequestsPerRPC metric.Int64Histogram
rpcResponsesPerRPC metric.Int64Histogram
}
// Option applies an option value for a config.
@ -52,7 +62,7 @@ type Option interface {
}
// newConfig returns a config configured with all the passed Options.
func newConfig(opts []Option) *config {
func newConfig(opts []Option, role string) *config {
c := &config{
Propagators: otel.GetTextMapPropagator(),
TracerProvider: otel.GetTracerProvider(),
@ -62,13 +72,50 @@ func newConfig(opts []Option) *config {
o.apply(c)
}
c.tracer = c.TracerProvider.Tracer(
ScopeName,
trace.WithInstrumentationVersion(SemVersion()),
)
c.meter = c.MeterProvider.Meter(
instrumentationName,
ScopeName,
metric.WithInstrumentationVersion(Version()),
metric.WithSchemaURL(semconv.SchemaURL),
)
var err error
if c.rpcServerDuration, err = c.meter.Int64Histogram("rpc.server.duration", metric.WithUnit("ms")); err != nil {
c.rpcDuration, err = c.meter.Float64Histogram("rpc."+role+".duration",
metric.WithDescription("Measures the duration of inbound RPC."),
metric.WithUnit("ms"))
if err != nil {
otel.Handle(err)
}
c.rpcRequestSize, err = c.meter.Int64Histogram("rpc."+role+".request.size",
metric.WithDescription("Measures size of RPC request messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}
c.rpcResponseSize, err = c.meter.Int64Histogram("rpc."+role+".response.size",
metric.WithDescription("Measures size of RPC response messages (uncompressed)."),
metric.WithUnit("By"))
if err != nil {
otel.Handle(err)
}
c.rpcRequestsPerRPC, err = c.meter.Int64Histogram("rpc."+role+".requests_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}
c.rpcResponsesPerRPC, err = c.meter.Int64Histogram("rpc."+role+".responses_per_rpc",
metric.WithDescription("Measures the number of messages received per RPC. Should be 1 for all non-streaming RPCs."),
metric.WithUnit("{count}"))
if err != nil {
otel.Handle(err)
}
@ -98,6 +145,8 @@ func (o tracerProviderOption) apply(c *config) {
}
// WithInterceptorFilter returns an Option to use the request filter.
//
// Deprecated: Use stats handlers instead.
func WithInterceptorFilter(f Filter) Option {
return interceptorFilterOption{f: f}
}
@ -131,3 +180,50 @@ func (o meterProviderOption) apply(c *config) {
func WithMeterProvider(mp metric.MeterProvider) Option {
return meterProviderOption{mp: mp}
}
// Event type that can be recorded, see WithMessageEvents.
type Event int
// Different types of events that can be recorded, see WithMessageEvents.
const (
ReceivedEvents Event = iota
SentEvents
)
type messageEventsProviderOption struct {
events []Event
}
func (m messageEventsProviderOption) apply(c *config) {
for _, e := range m.events {
switch e {
case ReceivedEvents:
c.ReceivedEvent = true
case SentEvents:
c.SentEvent = true
}
}
}
// WithMessageEvents configures the Handler to record the specified events
// (span.AddEvent) on spans. By default only summary attributes are added at the
// end of the request.
//
// Valid events are:
// - ReceivedEvents: Record the number of bytes read after every gRPC read operation.
// - SentEvents: Record the number of bytes written after every gRPC write operation.
func WithMessageEvents(events ...Event) Option {
return messageEventsProviderOption{events: events}
}
type spanStartOption struct{ opts []trace.SpanStartOption }
func (o spanStartOption) apply(c *config) {
c.SpanStartOptions = append(c.SpanStartOptions, o.opts...)
}
// WithSpanOptions configures an additional set of
// trace.SpanOptions, which are applied to each new span.
func WithSpanOptions(opts ...trace.SpanStartOption) Option {
return spanStartOption{opts}
}

View File

@ -0,0 +1,22 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/*
Package otelgrpc is the instrumentation library for [google.golang.org/grpc].
Use [NewClientHandler] with [grpc.WithStatsHandler] to instrument a gRPC client.
Use [NewServerHandler] with [grpc.StatsHandler] to instrument a gRPC server.
*/
package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"

View File

@ -60,10 +60,12 @@ var (
// UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
// for use in a grpc.Dial call.
//
// Deprecated: Use [NewClientHandler] instead.
func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "client")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
ScopeName,
trace.WithInstrumentationVersion(Version()),
)
@ -83,23 +85,33 @@ func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
return invoker(ctx, method, req, reply, cc, callOpts...)
}
name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx,
name,
name, attr, _ := telemetryAttributes(method, cc.Target())
startOpts := append([]trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
},
cfg.SpanStartOptions...,
)
ctx, span := tracer.Start(
ctx,
name,
startOpts...,
)
defer span.End()
ctx = inject(ctx, cfg.Propagators)
messageSent.Event(ctx, 1, req)
if cfg.SentEvent {
messageSent.Event(ctx, 1, req)
}
err := invoker(ctx, method, req, reply, cc, callOpts...)
messageReceived.Event(ctx, 1, reply)
if cfg.ReceivedEvent {
messageReceived.Event(ctx, 1, reply)
}
if err != nil {
s, _ := status.FromError(err)
@ -135,6 +147,9 @@ type clientStream struct {
eventsDone chan struct{}
finished chan error
receivedEvent bool
sentEvent bool
receivedMessageID int
sentMessageID int
}
@ -152,7 +167,10 @@ func (w *clientStream) RecvMsg(m interface{}) error {
w.sendStreamEvent(errorEvent, err)
} else {
w.receivedMessageID++
messageReceived.Event(w.Context(), w.receivedMessageID, m)
if w.receivedEvent {
messageReceived.Event(w.Context(), w.receivedMessageID, m)
}
}
return err
@ -162,7 +180,10 @@ func (w *clientStream) SendMsg(m interface{}) error {
err := w.ClientStream.SendMsg(m)
w.sentMessageID++
messageSent.Event(w.Context(), w.sentMessageID, m)
if w.sentEvent {
messageSent.Event(w.Context(), w.sentMessageID, m)
}
if err != nil {
w.sendStreamEvent(errorEvent, err)
@ -173,7 +194,6 @@ func (w *clientStream) SendMsg(m interface{}) error {
func (w *clientStream) Header() (metadata.MD, error) {
md, err := w.ClientStream.Header()
if err != nil {
w.sendStreamEvent(errorEvent, err)
}
@ -183,7 +203,6 @@ func (w *clientStream) Header() (metadata.MD, error) {
func (w *clientStream) CloseSend() error {
err := w.ClientStream.CloseSend()
if err != nil {
w.sendStreamEvent(errorEvent, err)
}
@ -191,7 +210,7 @@ func (w *clientStream) CloseSend() error {
return err
}
func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc) *clientStream {
func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, cfg *config) *clientStream {
events := make(chan streamEvent)
eventsDone := make(chan struct{})
finished := make(chan error)
@ -218,11 +237,13 @@ func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.Strea
}()
return &clientStream{
ClientStream: s,
desc: desc,
events: events,
eventsDone: eventsDone,
finished: finished,
ClientStream: s,
desc: desc,
events: events,
eventsDone: eventsDone,
finished: finished,
receivedEvent: cfg.ReceivedEvent,
sentEvent: cfg.SentEvent,
}
}
@ -235,10 +256,12 @@ func (w *clientStream) sendStreamEvent(eventType streamEventType, err error) {
// StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
// for use in a grpc.Dial call.
//
// Deprecated: Use [NewClientHandler] instead.
func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "client")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
ScopeName,
trace.WithInstrumentationVersion(Version()),
)
@ -258,13 +281,19 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
return streamer(ctx, desc, cc, method, callOpts...)
}
name, attr := spanInfo(method, cc.Target())
var span trace.Span
ctx, span = tracer.Start(
ctx,
name,
name, attr, _ := telemetryAttributes(method, cc.Target())
startOpts := append([]trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attr...),
},
cfg.SpanStartOptions...,
)
ctx, span := tracer.Start(
ctx,
name,
startOpts...,
)
ctx = inject(ctx, cfg.Propagators)
@ -277,7 +306,7 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
span.End()
return s, err
}
stream := wrapClientStream(ctx, s, desc)
stream := wrapClientStream(ctx, s, desc, cfg)
go func() {
err := <-stream.finished
@ -299,10 +328,12 @@ func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
// UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
// for use in a grpc.NewServer call.
//
// Deprecated: Use [NewServerHandler] instead.
func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "server")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
ScopeName,
trace.WithInstrumentationVersion(Version()),
)
@ -321,38 +352,48 @@ func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
}
ctx = extract(ctx, cfg.Propagators)
name, attr, metricAttrs := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
startOpts := append([]trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
},
cfg.SpanStartOptions...,
)
name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
startOpts...,
)
defer span.End()
messageReceived.Event(ctx, 1, req)
if cfg.ReceivedEvent {
messageReceived.Event(ctx, 1, req)
}
var statusCode grpc_codes.Code
defer func(t time.Time) {
elapsedTime := time.Since(t) / time.Millisecond
attr = append(attr, semconv.RPCGRPCStatusCodeKey.Int64(int64(statusCode)))
o := metric.WithAttributes(attr...)
cfg.rpcServerDuration.Record(ctx, int64(elapsedTime), o)
}(time.Now())
before := time.Now()
resp, err := handler(ctx, req)
s, _ := status.FromError(err)
if err != nil {
s, _ := status.FromError(err)
statusCode, msg := serverStatus(s)
span.SetStatus(statusCode, msg)
span.SetAttributes(statusCodeAttr(s.Code()))
messageSent.Event(ctx, 1, s.Proto())
if cfg.SentEvent {
messageSent.Event(ctx, 1, s.Proto())
}
} else {
statusCode = grpc_codes.OK
span.SetAttributes(statusCodeAttr(grpc_codes.OK))
messageSent.Event(ctx, 1, resp)
if cfg.SentEvent {
messageSent.Event(ctx, 1, resp)
}
}
grpcStatusCodeAttr := statusCodeAttr(s.Code())
span.SetAttributes(grpcStatusCodeAttr)
elapsedTime := time.Since(before).Milliseconds()
metricAttrs = append(metricAttrs, grpcStatusCodeAttr)
cfg.rpcDuration.Record(ctx, float64(elapsedTime), metric.WithAttributes(metricAttrs...))
return resp, err
}
@ -366,6 +407,9 @@ type serverStream struct {
receivedMessageID int
sentMessageID int
receivedEvent bool
sentEvent bool
}
func (w *serverStream) Context() context.Context {
@ -377,7 +421,9 @@ func (w *serverStream) RecvMsg(m interface{}) error {
if err == nil {
w.receivedMessageID++
messageReceived.Event(w.Context(), w.receivedMessageID, m)
if w.receivedEvent {
messageReceived.Event(w.Context(), w.receivedMessageID, m)
}
}
return err
@ -387,24 +433,30 @@ func (w *serverStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
w.sentMessageID++
messageSent.Event(w.Context(), w.sentMessageID, m)
if w.sentEvent {
messageSent.Event(w.Context(), w.sentMessageID, m)
}
return err
}
func wrapServerStream(ctx context.Context, ss grpc.ServerStream) *serverStream {
func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *serverStream {
return &serverStream{
ServerStream: ss,
ctx: ctx,
ServerStream: ss,
ctx: ctx,
receivedEvent: cfg.ReceivedEvent,
sentEvent: cfg.SentEvent,
}
}
// StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
// for use in a grpc.NewServer call.
//
// Deprecated: Use [NewServerHandler] instead.
func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
cfg := newConfig(opts)
cfg := newConfig(opts, "server")
tracer := cfg.TracerProvider.Tracer(
instrumentationName,
ScopeName,
trace.WithInstrumentationVersion(Version()),
)
@ -420,21 +472,27 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
Type: StreamServer,
}
if cfg.Filter != nil && !cfg.Filter(i) {
return handler(srv, wrapServerStream(ctx, ss))
return handler(srv, wrapServerStream(ctx, ss, cfg))
}
ctx = extract(ctx, cfg.Propagators)
name, attr, _ := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
startOpts := append([]trace.SpanStartOption{
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
},
cfg.SpanStartOptions...,
)
name, attr := spanInfo(info.FullMethod, peerFromCtx(ctx))
ctx, span := tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attr...),
startOpts...,
)
defer span.End()
err := handler(srv, wrapServerStream(ctx, ss))
err := handler(srv, wrapServerStream(ctx, ss, cfg))
if err != nil {
s, _ := status.FromError(err)
statusCode, msg := serverStatus(s)
@ -448,21 +506,25 @@ func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
}
}
// spanInfo returns a span name and all appropriate attributes from the gRPC
// method and peer address.
func spanInfo(fullMethod, peerAddress string) (string, []attribute.KeyValue) {
attrs := []attribute.KeyValue{RPCSystemGRPC}
name, mAttrs := internal.ParseFullMethod(fullMethod)
attrs = append(attrs, mAttrs...)
attrs = append(attrs, peerAttr(peerAddress)...)
return name, attrs
// telemetryAttributes returns a span name and span and metric attributes from
// the gRPC method and peer address.
func telemetryAttributes(fullMethod, peerAddress string) (string, []attribute.KeyValue, []attribute.KeyValue) {
name, methodAttrs := internal.ParseFullMethod(fullMethod)
peerAttrs := peerAttr(peerAddress)
attrs := make([]attribute.KeyValue, 0, 1+len(methodAttrs)+len(peerAttrs))
attrs = append(attrs, RPCSystemGRPC)
attrs = append(attrs, methodAttrs...)
metricAttrs := attrs[:1+len(methodAttrs)]
attrs = append(attrs, peerAttrs...)
return name, attrs, metricAttrs
}
// peerAttr returns attributes about the peer address.
func peerAttr(addr string) []attribute.KeyValue {
host, p, err := net.SplitHostPort(addr)
if err != nil {
return []attribute.KeyValue(nil)
return nil
}
if host == "" {
@ -470,7 +532,7 @@ func peerAttr(addr string) []attribute.KeyValue {
}
port, err := strconv.Atoi(p)
if err != nil {
return []attribute.KeyValue(nil)
return nil
}
var attr []attribute.KeyValue

View File

@ -24,13 +24,21 @@ import (
// ParseFullMethod returns a span name following the OpenTelemetry semantic
// conventions as well as all applicable span attribute.KeyValue attributes based
// on a gRPC's FullMethod.
//
// Parsing is consistent with grpc-go implementation:
// https://github.com/grpc/grpc-go/blob/v1.57.0/internal/grpcutil/method.go#L26-L39
func ParseFullMethod(fullMethod string) (string, []attribute.KeyValue) {
name := strings.TrimLeft(fullMethod, "/")
service, method, found := strings.Cut(name, "/")
if !found {
if !strings.HasPrefix(fullMethod, "/") {
// Invalid format, does not follow `/package.service/method`.
return fullMethod, nil
}
name := fullMethod[1:]
pos := strings.LastIndex(name, "/")
if pos < 0 {
// Invalid format, does not follow `/package.service/method`.
return name, nil
}
service, method := name[:pos], name[pos+1:]
var attrs []attribute.KeyValue
if service != "" {

View File

@ -56,7 +56,7 @@ func (s *metadataSupplier) Keys() []string {
// requests.
// Deprecated: Unnecessary public func.
func Inject(ctx context.Context, md *metadata.MD, opts ...Option) {
c := newConfig(opts)
c := newConfig(opts, "")
c.Propagators.Inject(ctx, &metadataSupplier{
metadata: md,
})
@ -78,7 +78,7 @@ func inject(ctx context.Context, propagators propagation.TextMapPropagator) cont
// This function is meant to be used on incoming requests.
// Deprecated: Unnecessary public func.
func Extract(ctx context.Context, md *metadata.MD, opts ...Option) (baggage.Baggage, trace.SpanContext) {
c := newConfig(opts)
c := newConfig(opts, "")
ctx = c.Propagators.Extract(ctx, &metadataSupplier{
metadata: md,
})

View File

@ -0,0 +1,235 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
import (
"context"
"sync/atomic"
"time"
grpc_codes "google.golang.org/grpc/codes"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
"go.opentelemetry.io/otel/trace"
)
type gRPCContextKey struct{}
type gRPCContext struct {
messagesReceived int64
messagesSent int64
metricAttrs []attribute.KeyValue
}
type serverHandler struct {
*config
}
// NewServerHandler creates a stats.Handler for gRPC server.
func NewServerHandler(opts ...Option) stats.Handler {
h := &serverHandler{
config: newConfig(opts, "server"),
}
return h
}
// TagConn can attach some information to the given context.
func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(peerFromCtx(ctx))
span.SetAttributes(attrs...)
return ctx
}
// HandleConn processes the Conn stats.
func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
}
// TagRPC can attach some information to the given context.
func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
ctx = extract(ctx, h.config.Propagators)
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
name,
trace.WithSpanKind(trace.SpanKindServer),
trace.WithAttributes(attrs...),
)
gctx := gRPCContext{
metricAttrs: attrs,
}
return context.WithValue(ctx, gRPCContextKey{}, &gctx)
}
// HandleRPC processes the RPC stats.
func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
h.handleRPC(ctx, rs)
}
type clientHandler struct {
*config
}
// NewClientHandler creates a stats.Handler for gRPC client.
func NewClientHandler(opts ...Option) stats.Handler {
h := &clientHandler{
config: newConfig(opts, "client"),
}
return h
}
// TagRPC can attach some information to the given context.
func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
name, attrs := internal.ParseFullMethod(info.FullMethodName)
attrs = append(attrs, RPCSystemGRPC)
ctx, _ = h.tracer.Start(
ctx,
name,
trace.WithSpanKind(trace.SpanKindClient),
trace.WithAttributes(attrs...),
)
gctx := gRPCContext{
metricAttrs: attrs,
}
return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
}
// HandleRPC processes the RPC stats.
func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
h.handleRPC(ctx, rs)
}
// TagConn can attach some information to the given context.
func (h *clientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
span := trace.SpanFromContext(ctx)
attrs := peerAttr(cti.RemoteAddr.String())
span.SetAttributes(attrs...)
return ctx
}
// HandleConn processes the Conn stats.
func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
// no-op
}
func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats) {
span := trace.SpanFromContext(ctx)
gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
var messageId int64
metricAttrs := make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
metricAttrs = append(metricAttrs, gctx.metricAttrs...)
wctx := withoutCancel(ctx)
switch rs := rs.(type) {
case *stats.Begin:
case *stats.InPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
c.rpcRequestSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}
if c.ReceivedEvent {
span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeReceived,
semconv.MessageIDKey.Int64(messageId),
semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
semconv.MessageUncompressedSizeKey.Int(rs.Length),
),
)
}
case *stats.OutPayload:
if gctx != nil {
messageId = atomic.AddInt64(&gctx.messagesSent, 1)
c.rpcResponseSize.Record(wctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
}
if c.SentEvent {
span.AddEvent("message",
trace.WithAttributes(
semconv.MessageTypeSent,
semconv.MessageIDKey.Int64(messageId),
semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
semconv.MessageUncompressedSizeKey.Int(rs.Length),
),
)
}
case *stats.OutTrailer:
case *stats.End:
var rpcStatusAttr attribute.KeyValue
if rs.Error != nil {
s, _ := status.FromError(rs.Error)
span.SetStatus(codes.Error, s.Message())
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))
} else {
rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
}
span.SetAttributes(rpcStatusAttr)
span.End()
metricAttrs = append(metricAttrs, rpcStatusAttr)
c.rpcDuration.Record(wctx, float64(rs.EndTime.Sub(rs.BeginTime)), metric.WithAttributes(metricAttrs...))
c.rpcRequestsPerRPC.Record(wctx, gctx.messagesReceived, metric.WithAttributes(metricAttrs...))
c.rpcResponsesPerRPC.Record(wctx, gctx.messagesSent, metric.WithAttributes(metricAttrs...))
default:
return
}
}
func withoutCancel(parent context.Context) context.Context {
if parent == nil {
panic("cannot create context from nil parent")
}
return withoutCancelCtx{parent}
}
type withoutCancelCtx struct {
c context.Context
}
func (withoutCancelCtx) Deadline() (deadline time.Time, ok bool) {
return
}
func (withoutCancelCtx) Done() <-chan struct{} {
return nil
}
func (withoutCancelCtx) Err() error {
return nil
}
func (w withoutCancelCtx) Value(key any) any {
return w.c.Value(key)
}
func (w withoutCancelCtx) String() string {
return "withoutCancel"
}

View File

@ -16,7 +16,7 @@ package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.g
// Version is the current release version of the gRPC instrumentation.
func Version() string {
return "0.42.0"
return "0.46.0"
// This string is updated by the pre_release.sh script during release
}