rebase: bump google.golang.org/grpc from 1.62.1 to 1.63.2

Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.62.1 to 1.63.2.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.62.1...v1.63.2)

---
updated-dependencies:
- dependency-name: google.golang.org/grpc
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
This commit is contained in:
dependabot[bot]
2024-04-11 07:22:01 +00:00
committed by mergify[bot]
parent 6b8549e6a9
commit fbf768ac0f
43 changed files with 1901 additions and 2072 deletions

View File

@ -0,0 +1,255 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import (
"fmt"
"sync/atomic"
"google.golang.org/grpc/connectivity"
)
// Channel represents a channel within channelz, which includes metrics and
// internal channelz data, such as channelz id, child list, etc.
type Channel struct {
Entity
// ID is the channelz id of this channel.
ID int64
// RefName is the human readable reference string of this channel.
RefName string
closeCalled bool
nestedChans map[int64]string
subChans map[int64]string
Parent *Channel
trace *ChannelTrace
// traceRefCount is the number of trace events that reference this channel.
// Non-zero traceRefCount means the trace of this channel cannot be deleted.
traceRefCount int32
ChannelMetrics ChannelMetrics
}
// Implemented to make Channel implement the Identifier interface used for
// nesting.
func (c *Channel) channelzIdentifier() {}
func (c *Channel) String() string {
if c.Parent == nil {
return fmt.Sprintf("Channel #%d", c.ID)
}
return fmt.Sprintf("%s Channel #%d", c.Parent, c.ID)
}
func (c *Channel) id() int64 {
return c.ID
}
func (c *Channel) SubChans() map[int64]string {
db.mu.RLock()
defer db.mu.RUnlock()
return copyMap(c.subChans)
}
func (c *Channel) NestedChans() map[int64]string {
db.mu.RLock()
defer db.mu.RUnlock()
return copyMap(c.nestedChans)
}
func (c *Channel) Trace() *ChannelTrace {
db.mu.RLock()
defer db.mu.RUnlock()
return c.trace.copy()
}
type ChannelMetrics struct {
// The current connectivity state of the channel.
State atomic.Pointer[connectivity.State]
// The target this channel originally tried to connect to. May be absent
Target atomic.Pointer[string]
// The number of calls started on the channel.
CallsStarted atomic.Int64
// The number of calls that have completed with an OK status.
CallsSucceeded atomic.Int64
// The number of calls that have a completed with a non-OK status.
CallsFailed atomic.Int64
// The last time a call was started on the channel.
LastCallStartedTimestamp atomic.Int64
}
// CopyFrom copies the metrics in o to c. For testing only.
func (c *ChannelMetrics) CopyFrom(o *ChannelMetrics) {
c.State.Store(o.State.Load())
c.Target.Store(o.Target.Load())
c.CallsStarted.Store(o.CallsStarted.Load())
c.CallsSucceeded.Store(o.CallsSucceeded.Load())
c.CallsFailed.Store(o.CallsFailed.Load())
c.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load())
}
// Equal returns true iff the metrics of c are the same as the metrics of o.
// For testing only.
func (c *ChannelMetrics) Equal(o any) bool {
oc, ok := o.(*ChannelMetrics)
if !ok {
return false
}
if (c.State.Load() == nil) != (oc.State.Load() == nil) {
return false
}
if c.State.Load() != nil && *c.State.Load() != *oc.State.Load() {
return false
}
if (c.Target.Load() == nil) != (oc.Target.Load() == nil) {
return false
}
if c.Target.Load() != nil && *c.Target.Load() != *oc.Target.Load() {
return false
}
return c.CallsStarted.Load() == oc.CallsStarted.Load() &&
c.CallsFailed.Load() == oc.CallsFailed.Load() &&
c.CallsSucceeded.Load() == oc.CallsSucceeded.Load() &&
c.LastCallStartedTimestamp.Load() == oc.LastCallStartedTimestamp.Load()
}
func strFromPointer(s *string) string {
if s == nil {
return ""
}
return *s
}
func (c *ChannelMetrics) String() string {
return fmt.Sprintf("State: %v, Target: %s, CallsStarted: %v, CallsSucceeded: %v, CallsFailed: %v, LastCallStartedTimestamp: %v",
c.State.Load(), strFromPointer(c.Target.Load()), c.CallsStarted.Load(), c.CallsSucceeded.Load(), c.CallsFailed.Load(), c.LastCallStartedTimestamp.Load(),
)
}
func NewChannelMetricForTesting(state connectivity.State, target string, started, succeeded, failed, timestamp int64) *ChannelMetrics {
c := &ChannelMetrics{}
c.State.Store(&state)
c.Target.Store(&target)
c.CallsStarted.Store(started)
c.CallsSucceeded.Store(succeeded)
c.CallsFailed.Store(failed)
c.LastCallStartedTimestamp.Store(timestamp)
return c
}
func (c *Channel) addChild(id int64, e entry) {
switch v := e.(type) {
case *SubChannel:
c.subChans[id] = v.RefName
case *Channel:
c.nestedChans[id] = v.RefName
default:
logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e)
}
}
func (c *Channel) deleteChild(id int64) {
delete(c.subChans, id)
delete(c.nestedChans, id)
c.deleteSelfIfReady()
}
func (c *Channel) triggerDelete() {
c.closeCalled = true
c.deleteSelfIfReady()
}
func (c *Channel) getParentID() int64 {
if c.Parent == nil {
return -1
}
return c.Parent.ID
}
// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means
// deleting the channel reference from its parent's child list.
//
// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the
// corresponding grpc object has been invoked, and the channel does not have any children left.
//
// The returned boolean value indicates whether the channel has been successfully deleted from tree.
func (c *Channel) deleteSelfFromTree() (deleted bool) {
if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 {
return false
}
// not top channel
if c.Parent != nil {
c.Parent.deleteChild(c.ID)
}
return true
}
// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means
// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the
// channel, and its memory will be garbage collected.
//
// The trace reference count of the channel must be 0 in order to be deleted from the map. This is
// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
// the trace of the referenced entity must not be deleted. In order to release the resource allocated
// by grpc, the reference to the grpc object is reset to a dummy object.
//
// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
//
// It returns a bool to indicate whether the channel can be safely deleted from map.
func (c *Channel) deleteSelfFromMap() (delete bool) {
return c.getTraceRefCount() == 0
}
// deleteSelfIfReady tries to delete the channel itself from the channelz database.
// The delete process includes two steps:
// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
// parent's child list.
// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
// will return entry not found error.
func (c *Channel) deleteSelfIfReady() {
if !c.deleteSelfFromTree() {
return
}
if !c.deleteSelfFromMap() {
return
}
db.deleteEntry(c.ID)
c.trace.clear()
}
func (c *Channel) getChannelTrace() *ChannelTrace {
return c.trace
}
func (c *Channel) incrTraceRefCount() {
atomic.AddInt32(&c.traceRefCount, 1)
}
func (c *Channel) decrTraceRefCount() {
atomic.AddInt32(&c.traceRefCount, -1)
}
func (c *Channel) getTraceRefCount() int {
i := atomic.LoadInt32(&c.traceRefCount)
return int(i)
}
func (c *Channel) getRefName() string {
return c.RefName
}

View File

@ -0,0 +1,402 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import (
"fmt"
"sort"
"sync"
"time"
)
// entry represents a node in the channelz database.
type entry interface {
// addChild adds a child e, whose channelz id is id to child list
addChild(id int64, e entry)
// deleteChild deletes a child with channelz id to be id from child list
deleteChild(id int64)
// triggerDelete tries to delete self from channelz database. However, if
// child list is not empty, then deletion from the database is on hold until
// the last child is deleted from database.
triggerDelete()
// deleteSelfIfReady check whether triggerDelete() has been called before,
// and whether child list is now empty. If both conditions are met, then
// delete self from database.
deleteSelfIfReady()
// getParentID returns parent ID of the entry. 0 value parent ID means no parent.
getParentID() int64
Entity
}
// channelMap is the storage data structure for channelz.
//
// Methods of channelMap can be divided in two two categories with respect to
// locking.
//
// 1. Methods acquire the global lock.
// 2. Methods that can only be called when global lock is held.
//
// A second type of method need always to be called inside a first type of method.
type channelMap struct {
mu sync.RWMutex
topLevelChannels map[int64]struct{}
channels map[int64]*Channel
subChannels map[int64]*SubChannel
sockets map[int64]*Socket
servers map[int64]*Server
}
func newChannelMap() *channelMap {
return &channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*Channel),
subChannels: make(map[int64]*SubChannel),
sockets: make(map[int64]*Socket),
servers: make(map[int64]*Server),
}
}
func (c *channelMap) addServer(id int64, s *Server) {
c.mu.Lock()
defer c.mu.Unlock()
s.cm = c
c.servers[id] = s
}
func (c *channelMap) addChannel(id int64, cn *Channel, isTopChannel bool, pid int64) {
c.mu.Lock()
defer c.mu.Unlock()
cn.trace.cm = c
c.channels[id] = cn
if isTopChannel {
c.topLevelChannels[id] = struct{}{}
} else if p := c.channels[pid]; p != nil {
p.addChild(id, cn)
} else {
logger.Infof("channel %d references invalid parent ID %d", id, pid)
}
}
func (c *channelMap) addSubChannel(id int64, sc *SubChannel, pid int64) {
c.mu.Lock()
defer c.mu.Unlock()
sc.trace.cm = c
c.subChannels[id] = sc
if p := c.channels[pid]; p != nil {
p.addChild(id, sc)
} else {
logger.Infof("subchannel %d references invalid parent ID %d", id, pid)
}
}
func (c *channelMap) addSocket(s *Socket) {
c.mu.Lock()
defer c.mu.Unlock()
s.cm = c
c.sockets[s.ID] = s
if s.Parent == nil {
logger.Infof("normal socket %d has no parent", s.ID)
}
s.Parent.(entry).addChild(s.ID, s)
}
// removeEntry triggers the removal of an entry, which may not indeed delete the
// entry, if it has to wait on the deletion of its children and until no other
// entity's channel trace references it. It may lead to a chain of entry
// deletion. For example, deleting the last socket of a gracefully shutting down
// server will lead to the server being also deleted.
func (c *channelMap) removeEntry(id int64) {
c.mu.Lock()
defer c.mu.Unlock()
c.findEntry(id).triggerDelete()
}
// tracedChannel represents tracing operations which are present on both
// channels and subChannels.
type tracedChannel interface {
getChannelTrace() *ChannelTrace
incrTraceRefCount()
decrTraceRefCount()
getRefName() string
}
// c.mu must be held by the caller
func (c *channelMap) decrTraceRefCount(id int64) {
e := c.findEntry(id)
if v, ok := e.(tracedChannel); ok {
v.decrTraceRefCount()
e.deleteSelfIfReady()
}
}
// c.mu must be held by the caller.
func (c *channelMap) findEntry(id int64) entry {
if v, ok := c.channels[id]; ok {
return v
}
if v, ok := c.subChannels[id]; ok {
return v
}
if v, ok := c.servers[id]; ok {
return v
}
if v, ok := c.sockets[id]; ok {
return v
}
return &dummyEntry{idNotFound: id}
}
// c.mu must be held by the caller
//
// deleteEntry deletes an entry from the channelMap. Before calling this method,
// caller must check this entry is ready to be deleted, i.e removeEntry() has
// been called on it, and no children still exist.
func (c *channelMap) deleteEntry(id int64) entry {
if v, ok := c.sockets[id]; ok {
delete(c.sockets, id)
return v
}
if v, ok := c.subChannels[id]; ok {
delete(c.subChannels, id)
return v
}
if v, ok := c.channels[id]; ok {
delete(c.channels, id)
delete(c.topLevelChannels, id)
return v
}
if v, ok := c.servers[id]; ok {
delete(c.servers, id)
return v
}
return &dummyEntry{idNotFound: id}
}
func (c *channelMap) traceEvent(id int64, desc *TraceEvent) {
c.mu.Lock()
defer c.mu.Unlock()
child := c.findEntry(id)
childTC, ok := child.(tracedChannel)
if !ok {
return
}
childTC.getChannelTrace().append(&traceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
if desc.Parent != nil {
parent := c.findEntry(child.getParentID())
var chanType RefChannelType
switch child.(type) {
case *Channel:
chanType = RefChannel
case *SubChannel:
chanType = RefSubChannel
}
if parentTC, ok := parent.(tracedChannel); ok {
parentTC.getChannelTrace().append(&traceEvent{
Desc: desc.Parent.Desc,
Severity: desc.Parent.Severity,
Timestamp: time.Now(),
RefID: id,
RefName: childTC.getRefName(),
RefType: chanType,
})
childTC.incrTraceRefCount()
}
}
}
type int64Slice []int64
func (s int64Slice) Len() int { return len(s) }
func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
func copyMap(m map[int64]string) map[int64]string {
n := make(map[int64]string)
for k, v := range m {
n[k] = v
}
return n
}
func min(a, b int) int {
if a < b {
return a
}
return b
}
func (c *channelMap) getTopChannels(id int64, maxResults int) ([]*Channel, bool) {
if maxResults <= 0 {
maxResults = EntriesPerPage
}
c.mu.RLock()
defer c.mu.RUnlock()
l := int64(len(c.topLevelChannels))
ids := make([]int64, 0, l)
for k := range c.topLevelChannels {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
end := true
var t []*Channel
for _, v := range ids[idx:] {
if len(t) == maxResults {
end = false
break
}
if cn, ok := c.channels[v]; ok {
t = append(t, cn)
}
}
return t, end
}
func (c *channelMap) getServers(id int64, maxResults int) ([]*Server, bool) {
if maxResults <= 0 {
maxResults = EntriesPerPage
}
c.mu.RLock()
defer c.mu.RUnlock()
ids := make([]int64, 0, len(c.servers))
for k := range c.servers {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
end := true
var s []*Server
for _, v := range ids[idx:] {
if len(s) == maxResults {
end = false
break
}
if svr, ok := c.servers[v]; ok {
s = append(s, svr)
}
}
return s, end
}
func (c *channelMap) getServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
if maxResults <= 0 {
maxResults = EntriesPerPage
}
c.mu.RLock()
defer c.mu.RUnlock()
svr, ok := c.servers[id]
if !ok {
// server with id doesn't exist.
return nil, true
}
svrskts := svr.sockets
ids := make([]int64, 0, len(svrskts))
sks := make([]*Socket, 0, min(len(svrskts), maxResults))
for k := range svrskts {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
end := true
for _, v := range ids[idx:] {
if len(sks) == maxResults {
end = false
break
}
if ns, ok := c.sockets[v]; ok {
sks = append(sks, ns)
}
}
return sks, end
}
func (c *channelMap) getChannel(id int64) *Channel {
c.mu.RLock()
defer c.mu.RUnlock()
return c.channels[id]
}
func (c *channelMap) getSubChannel(id int64) *SubChannel {
c.mu.RLock()
defer c.mu.RUnlock()
return c.subChannels[id]
}
func (c *channelMap) getSocket(id int64) *Socket {
c.mu.RLock()
defer c.mu.RUnlock()
return c.sockets[id]
}
func (c *channelMap) getServer(id int64) *Server {
c.mu.RLock()
defer c.mu.RUnlock()
return c.servers[id]
}
type dummyEntry struct {
// dummyEntry is a fake entry to handle entry not found case.
idNotFound int64
Entity
}
func (d *dummyEntry) String() string {
return fmt.Sprintf("non-existent entity #%d", d.idNotFound)
}
func (d *dummyEntry) ID() int64 { return d.idNotFound }
func (d *dummyEntry) addChild(id int64, e entry) {
// Note: It is possible for a normal program to reach here under race
// condition. For example, there could be a race between ClientConn.Close()
// info being propagated to addrConn and http2Client. ClientConn.Close()
// cancel the context and result in http2Client to error. The error info is
// then caught by transport monitor and before addrConn.tearDown() is called
// in side ClientConn.Close(). Therefore, the addrConn will create a new
// transport. And when registering the new transport in channelz, its parent
// addrConn could have already been torn down and deleted from channelz
// tracking, and thus reach the code here.
logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
}
func (d *dummyEntry) deleteChild(id int64) {
// It is possible for a normal program to reach here under race condition.
// Refer to the example described in addChild().
logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
}
func (d *dummyEntry) triggerDelete() {
logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
}
func (*dummyEntry) deleteSelfIfReady() {
// code should not reach here. deleteSelfIfReady is always called on an existing entry.
}
func (*dummyEntry) getParentID() int64 {
return 0
}
// Entity is implemented by all channelz types.
type Entity interface {
isEntity()
fmt.Stringer
id() int64
}

View File

@ -16,47 +16,32 @@
*
*/
// Package channelz defines APIs for enabling channelz service, entry
// Package channelz defines internal APIs for enabling channelz service, entry
// registration/deletion, and accessing channelz data. It also defines channelz
// metric struct formats.
//
// All APIs in this package are experimental.
package channelz
import (
"errors"
"sort"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
)
const (
defaultMaxTraceEntry int32 = 30
)
var (
// IDGen is the global channelz entity ID generator. It should not be used
// outside this package except by tests.
IDGen IDGenerator
db dbWrapper
// EntryPerPage defines the number of channelz entries to be shown on a web page.
EntryPerPage = int64(50)
curState int32
maxTraceEntry = defaultMaxTraceEntry
db *channelMap = newChannelMap()
// EntriesPerPage defines the number of channelz entries to be shown on a web page.
EntriesPerPage = 50
curState int32
)
// TurnOn turns on channelz data collection.
func TurnOn() {
if !IsOn() {
db.set(newChannelMap())
IDGen.Reset()
atomic.StoreInt32(&curState, 1)
}
atomic.StoreInt32(&curState, 1)
}
func init() {
@ -70,49 +55,15 @@ func IsOn() bool {
return atomic.LoadInt32(&curState) == 1
}
// SetMaxTraceEntry sets maximum number of trace entry per entity (i.e. channel/subchannel).
// Setting it to 0 will disable channel tracing.
func SetMaxTraceEntry(i int32) {
atomic.StoreInt32(&maxTraceEntry, i)
}
// ResetMaxTraceEntryToDefault resets the maximum number of trace entry per entity to default.
func ResetMaxTraceEntryToDefault() {
atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
}
func getMaxTraceEntry() int {
i := atomic.LoadInt32(&maxTraceEntry)
return int(i)
}
// dbWarpper wraps around a reference to internal channelz data storage, and
// provide synchronized functionality to set and get the reference.
type dbWrapper struct {
mu sync.RWMutex
DB *channelMap
}
func (d *dbWrapper) set(db *channelMap) {
d.mu.Lock()
d.DB = db
d.mu.Unlock()
}
func (d *dbWrapper) get() *channelMap {
d.mu.RLock()
defer d.mu.RUnlock()
return d.DB
}
// GetTopChannels returns a slice of top channel's ChannelMetric, along with a
// boolean indicating whether there's more top channels to be queried for.
//
// The arg id specifies that only top channel with id at or above it will be included
// in the result. The returned slice is up to a length of the arg maxResults or
// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
return db.get().GetTopChannels(id, maxResults)
// The arg id specifies that only top channel with id at or above it will be
// included in the result. The returned slice is up to a length of the arg
// maxResults or EntriesPerPage if maxResults is zero, and is sorted in ascending
// id order.
func GetTopChannels(id int64, maxResults int) ([]*Channel, bool) {
return db.getTopChannels(id, maxResults)
}
// GetServers returns a slice of server's ServerMetric, along with a
@ -120,73 +71,69 @@ func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
//
// The arg id specifies that only server with id at or above it will be included
// in the result. The returned slice is up to a length of the arg maxResults or
// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
return db.get().GetServers(id, maxResults)
// EntriesPerPage if maxResults is zero, and is sorted in ascending id order.
func GetServers(id int64, maxResults int) ([]*Server, bool) {
return db.getServers(id, maxResults)
}
// GetServerSockets returns a slice of server's (identified by id) normal socket's
// SocketMetric, along with a boolean indicating whether there's more sockets to
// SocketMetrics, along with a boolean indicating whether there's more sockets to
// be queried for.
//
// The arg startID specifies that only sockets with id at or above it will be
// included in the result. The returned slice is up to a length of the arg maxResults
// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
return db.get().GetServerSockets(id, startID, maxResults)
// or EntriesPerPage if maxResults is zero, and is sorted in ascending id order.
func GetServerSockets(id int64, startID int64, maxResults int) ([]*Socket, bool) {
return db.getServerSockets(id, startID, maxResults)
}
// GetChannel returns the ChannelMetric for the channel (identified by id).
func GetChannel(id int64) *ChannelMetric {
return db.get().GetChannel(id)
// GetChannel returns the Channel for the channel (identified by id).
func GetChannel(id int64) *Channel {
return db.getChannel(id)
}
// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id).
func GetSubChannel(id int64) *SubChannelMetric {
return db.get().GetSubChannel(id)
// GetSubChannel returns the SubChannel for the subchannel (identified by id).
func GetSubChannel(id int64) *SubChannel {
return db.getSubChannel(id)
}
// GetSocket returns the SocketInternalMetric for the socket (identified by id).
func GetSocket(id int64) *SocketMetric {
return db.get().GetSocket(id)
// GetSocket returns the Socket for the socket (identified by id).
func GetSocket(id int64) *Socket {
return db.getSocket(id)
}
// GetServer returns the ServerMetric for the server (identified by id).
func GetServer(id int64) *ServerMetric {
return db.get().GetServer(id)
func GetServer(id int64) *Server {
return db.getServer(id)
}
// RegisterChannel registers the given channel c in the channelz database with
// ref as its reference name, and adds it to the child list of its parent
// (identified by pid). pid == nil means no parent.
// target as its target and reference name, and adds it to the child list of its
// parent. parent == nil means no parent.
//
// Returns a unique channelz identifier assigned to this channel.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
func RegisterChannel(parent *Channel, target string) *Channel {
id := IDGen.genID()
var parent int64
isTopChannel := true
if pid != nil {
isTopChannel = false
parent = pid.Int()
}
if !IsOn() {
return newIdentifer(RefChannel, id, pid)
return &Channel{ID: id}
}
cn := &channel{
refName: ref,
c: c,
subChans: make(map[int64]string),
isTopChannel := parent == nil
cn := &Channel{
ID: id,
RefName: target,
nestedChans: make(map[int64]string),
id: id,
pid: parent,
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
subChans: make(map[int64]string),
Parent: parent,
trace: &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())},
}
db.get().addChannel(id, cn, isTopChannel, parent)
return newIdentifer(RefChannel, id, pid)
cn.ChannelMetrics.Target.Store(&target)
db.addChannel(id, cn, isTopChannel, cn.getParentID())
return cn
}
// RegisterSubChannel registers the given subChannel c in the channelz database
@ -196,555 +143,67 @@ func RegisterChannel(c Channel, pid *Identifier, ref string) *Identifier {
// Returns a unique channelz identifier assigned to this subChannel.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterSubChannel(c Channel, pid *Identifier, ref string) (*Identifier, error) {
if pid == nil {
return nil, errors.New("a SubChannel's parent id cannot be nil")
}
func RegisterSubChannel(parent *Channel, ref string) *SubChannel {
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefSubChannel, id, pid), nil
sc := &SubChannel{
ID: id,
RefName: ref,
parent: parent,
}
sc := &subChannel{
refName: ref,
c: c,
sockets: make(map[int64]string),
id: id,
pid: pid.Int(),
trace: &channelTrace{createdTime: time.Now(), events: make([]*TraceEvent, 0, getMaxTraceEntry())},
if !IsOn() {
return sc
}
db.get().addSubChannel(id, sc, pid.Int())
return newIdentifer(RefSubChannel, id, pid), nil
sc.sockets = make(map[int64]string)
sc.trace = &ChannelTrace{CreationTime: time.Now(), Events: make([]*traceEvent, 0, getMaxTraceEntry())}
db.addSubChannel(id, sc, parent.ID)
return sc
}
// RegisterServer registers the given server s in channelz database. It returns
// the unique channelz tracking id assigned to this server.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterServer(s Server, ref string) *Identifier {
func RegisterServer(ref string) *Server {
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefServer, id, nil)
return &Server{ID: id}
}
svr := &server{
refName: ref,
s: s,
svr := &Server{
RefName: ref,
sockets: make(map[int64]string),
listenSockets: make(map[int64]string),
id: id,
ID: id,
}
db.get().addServer(id, svr)
return newIdentifer(RefServer, id, nil)
db.addServer(id, svr)
return svr
}
// RegisterListenSocket registers the given listen socket s in channelz database
// with ref as its reference name, and add it to the child list of its parent
// (identified by pid). It returns the unique channelz tracking id assigned to
// this listen socket.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterListenSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
if pid == nil {
return nil, errors.New("a ListenSocket's parent id cannot be 0")
}
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefListenSocket, id, pid), nil
}
ls := &listenSocket{refName: ref, s: s, id: id, pid: pid.Int()}
db.get().addListenSocket(id, ls, pid.Int())
return newIdentifer(RefListenSocket, id, pid), nil
}
// RegisterNormalSocket registers the given normal socket s in channelz database
// RegisterSocket registers the given normal socket s in channelz database
// with ref as its reference name, and adds it to the child list of its parent
// (identified by pid). It returns the unique channelz tracking id assigned to
// this normal socket.
// (identified by skt.Parent, which must be set). It returns the unique channelz
// tracking id assigned to this normal socket.
//
// If channelz is not turned ON, the channelz database is not mutated.
func RegisterNormalSocket(s Socket, pid *Identifier, ref string) (*Identifier, error) {
if pid == nil {
return nil, errors.New("a NormalSocket's parent id cannot be 0")
func RegisterSocket(skt *Socket) *Socket {
skt.ID = IDGen.genID()
if IsOn() {
db.addSocket(skt)
}
id := IDGen.genID()
if !IsOn() {
return newIdentifer(RefNormalSocket, id, pid), nil
}
ns := &normalSocket{refName: ref, s: s, id: id, pid: pid.Int()}
db.get().addNormalSocket(id, ns, pid.Int())
return newIdentifer(RefNormalSocket, id, pid), nil
return skt
}
// RemoveEntry removes an entry with unique channelz tracking id to be id from
// channelz database.
//
// If channelz is not turned ON, this function is a no-op.
func RemoveEntry(id *Identifier) {
func RemoveEntry(id int64) {
if !IsOn() {
return
}
db.get().removeEntry(id.Int())
}
// TraceEventDesc is what the caller of AddTraceEvent should provide to describe
// the event to be added to the channel trace.
//
// The Parent field is optional. It is used for an event that will be recorded
// in the entity's parent trace.
type TraceEventDesc struct {
Desc string
Severity Severity
Parent *TraceEventDesc
}
// AddTraceEvent adds trace related to the entity with specified id, using the
// provided TraceEventDesc.
//
// If channelz is not turned ON, this will simply log the event descriptions.
func AddTraceEvent(l grpclog.DepthLoggerV2, id *Identifier, depth int, desc *TraceEventDesc) {
// Log only the trace description associated with the bottom most entity.
switch desc.Severity {
case CtUnknown, CtInfo:
l.InfoDepth(depth+1, withParens(id)+desc.Desc)
case CtWarning:
l.WarningDepth(depth+1, withParens(id)+desc.Desc)
case CtError:
l.ErrorDepth(depth+1, withParens(id)+desc.Desc)
}
if getMaxTraceEntry() == 0 {
return
}
if IsOn() {
db.get().traceEvent(id.Int(), desc)
}
}
// channelMap is the storage data structure for channelz.
// Methods of channelMap can be divided in two two categories with respect to locking.
// 1. Methods acquire the global lock.
// 2. Methods that can only be called when global lock is held.
// A second type of method need always to be called inside a first type of method.
type channelMap struct {
mu sync.RWMutex
topLevelChannels map[int64]struct{}
servers map[int64]*server
channels map[int64]*channel
subChannels map[int64]*subChannel
listenSockets map[int64]*listenSocket
normalSockets map[int64]*normalSocket
}
func newChannelMap() *channelMap {
return &channelMap{
topLevelChannels: make(map[int64]struct{}),
channels: make(map[int64]*channel),
listenSockets: make(map[int64]*listenSocket),
normalSockets: make(map[int64]*normalSocket),
servers: make(map[int64]*server),
subChannels: make(map[int64]*subChannel),
}
}
func (c *channelMap) addServer(id int64, s *server) {
c.mu.Lock()
s.cm = c
c.servers[id] = s
c.mu.Unlock()
}
func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64) {
c.mu.Lock()
cn.cm = c
cn.trace.cm = c
c.channels[id] = cn
if isTopChannel {
c.topLevelChannels[id] = struct{}{}
} else {
c.findEntry(pid).addChild(id, cn)
}
c.mu.Unlock()
}
func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64) {
c.mu.Lock()
sc.cm = c
sc.trace.cm = c
c.subChannels[id] = sc
c.findEntry(pid).addChild(id, sc)
c.mu.Unlock()
}
func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64) {
c.mu.Lock()
ls.cm = c
c.listenSockets[id] = ls
c.findEntry(pid).addChild(id, ls)
c.mu.Unlock()
}
func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64) {
c.mu.Lock()
ns.cm = c
c.normalSockets[id] = ns
c.findEntry(pid).addChild(id, ns)
c.mu.Unlock()
}
// removeEntry triggers the removal of an entry, which may not indeed delete the entry, if it has to
// wait on the deletion of its children and until no other entity's channel trace references it.
// It may lead to a chain of entry deletion. For example, deleting the last socket of a gracefully
// shutting down server will lead to the server being also deleted.
func (c *channelMap) removeEntry(id int64) {
c.mu.Lock()
c.findEntry(id).triggerDelete()
c.mu.Unlock()
}
// c.mu must be held by the caller
func (c *channelMap) decrTraceRefCount(id int64) {
e := c.findEntry(id)
if v, ok := e.(tracedChannel); ok {
v.decrTraceRefCount()
e.deleteSelfIfReady()
}
}
// c.mu must be held by the caller.
func (c *channelMap) findEntry(id int64) entry {
var v entry
var ok bool
if v, ok = c.channels[id]; ok {
return v
}
if v, ok = c.subChannels[id]; ok {
return v
}
if v, ok = c.servers[id]; ok {
return v
}
if v, ok = c.listenSockets[id]; ok {
return v
}
if v, ok = c.normalSockets[id]; ok {
return v
}
return &dummyEntry{idNotFound: id}
}
// c.mu must be held by the caller
// deleteEntry simply deletes an entry from the channelMap. Before calling this
// method, caller must check this entry is ready to be deleted, i.e removeEntry()
// has been called on it, and no children still exist.
// Conditionals are ordered by the expected frequency of deletion of each entity
// type, in order to optimize performance.
func (c *channelMap) deleteEntry(id int64) {
var ok bool
if _, ok = c.normalSockets[id]; ok {
delete(c.normalSockets, id)
return
}
if _, ok = c.subChannels[id]; ok {
delete(c.subChannels, id)
return
}
if _, ok = c.channels[id]; ok {
delete(c.channels, id)
delete(c.topLevelChannels, id)
return
}
if _, ok = c.listenSockets[id]; ok {
delete(c.listenSockets, id)
return
}
if _, ok = c.servers[id]; ok {
delete(c.servers, id)
return
}
}
func (c *channelMap) traceEvent(id int64, desc *TraceEventDesc) {
c.mu.Lock()
child := c.findEntry(id)
childTC, ok := child.(tracedChannel)
if !ok {
c.mu.Unlock()
return
}
childTC.getChannelTrace().append(&TraceEvent{Desc: desc.Desc, Severity: desc.Severity, Timestamp: time.Now()})
if desc.Parent != nil {
parent := c.findEntry(child.getParentID())
var chanType RefChannelType
switch child.(type) {
case *channel:
chanType = RefChannel
case *subChannel:
chanType = RefSubChannel
}
if parentTC, ok := parent.(tracedChannel); ok {
parentTC.getChannelTrace().append(&TraceEvent{
Desc: desc.Parent.Desc,
Severity: desc.Parent.Severity,
Timestamp: time.Now(),
RefID: id,
RefName: childTC.getRefName(),
RefType: chanType,
})
childTC.incrTraceRefCount()
}
}
c.mu.Unlock()
}
type int64Slice []int64
func (s int64Slice) Len() int { return len(s) }
func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] }
func copyMap(m map[int64]string) map[int64]string {
n := make(map[int64]string)
for k, v := range m {
n[k] = v
}
return n
}
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
if maxResults <= 0 {
maxResults = EntryPerPage
}
c.mu.RLock()
l := int64(len(c.topLevelChannels))
ids := make([]int64, 0, l)
cns := make([]*channel, 0, min(l, maxResults))
for k := range c.topLevelChannels {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := int64(0)
var end bool
var t []*ChannelMetric
for i, v := range ids[idx:] {
if count == maxResults {
break
}
if cn, ok := c.channels[v]; ok {
cns = append(cns, cn)
t = append(t, &ChannelMetric{
NestedChans: copyMap(cn.nestedChans),
SubChans: copyMap(cn.subChans),
})
count++
}
if i == len(ids[idx:])-1 {
end = true
break
}
}
c.mu.RUnlock()
if count == 0 {
end = true
}
for i, cn := range cns {
t[i].ChannelData = cn.c.ChannelzMetric()
t[i].ID = cn.id
t[i].RefName = cn.refName
t[i].Trace = cn.trace.dumpData()
}
return t, end
}
func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
if maxResults <= 0 {
maxResults = EntryPerPage
}
c.mu.RLock()
l := int64(len(c.servers))
ids := make([]int64, 0, l)
ss := make([]*server, 0, min(l, maxResults))
for k := range c.servers {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := int64(0)
var end bool
var s []*ServerMetric
for i, v := range ids[idx:] {
if count == maxResults {
break
}
if svr, ok := c.servers[v]; ok {
ss = append(ss, svr)
s = append(s, &ServerMetric{
ListenSockets: copyMap(svr.listenSockets),
})
count++
}
if i == len(ids[idx:])-1 {
end = true
break
}
}
c.mu.RUnlock()
if count == 0 {
end = true
}
for i, svr := range ss {
s[i].ServerData = svr.s.ChannelzMetric()
s[i].ID = svr.id
s[i].RefName = svr.refName
}
return s, end
}
func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
if maxResults <= 0 {
maxResults = EntryPerPage
}
var svr *server
var ok bool
c.mu.RLock()
if svr, ok = c.servers[id]; !ok {
// server with id doesn't exist.
c.mu.RUnlock()
return nil, true
}
svrskts := svr.sockets
l := int64(len(svrskts))
ids := make([]int64, 0, l)
sks := make([]*normalSocket, 0, min(l, maxResults))
for k := range svrskts {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
count := int64(0)
var end bool
for i, v := range ids[idx:] {
if count == maxResults {
break
}
if ns, ok := c.normalSockets[v]; ok {
sks = append(sks, ns)
count++
}
if i == len(ids[idx:])-1 {
end = true
break
}
}
c.mu.RUnlock()
if count == 0 {
end = true
}
s := make([]*SocketMetric, 0, len(sks))
for _, ns := range sks {
sm := &SocketMetric{}
sm.SocketData = ns.s.ChannelzMetric()
sm.ID = ns.id
sm.RefName = ns.refName
s = append(s, sm)
}
return s, end
}
func (c *channelMap) GetChannel(id int64) *ChannelMetric {
cm := &ChannelMetric{}
var cn *channel
var ok bool
c.mu.RLock()
if cn, ok = c.channels[id]; !ok {
// channel with id doesn't exist.
c.mu.RUnlock()
return nil
}
cm.NestedChans = copyMap(cn.nestedChans)
cm.SubChans = copyMap(cn.subChans)
// cn.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of cn.c when
// holding the lock to prevent potential data race.
chanCopy := cn.c
c.mu.RUnlock()
cm.ChannelData = chanCopy.ChannelzMetric()
cm.ID = cn.id
cm.RefName = cn.refName
cm.Trace = cn.trace.dumpData()
return cm
}
func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric {
cm := &SubChannelMetric{}
var sc *subChannel
var ok bool
c.mu.RLock()
if sc, ok = c.subChannels[id]; !ok {
// subchannel with id doesn't exist.
c.mu.RUnlock()
return nil
}
cm.Sockets = copyMap(sc.sockets)
// sc.c can be set to &dummyChannel{} when deleteSelfFromMap is called. Save a copy of sc.c when
// holding the lock to prevent potential data race.
chanCopy := sc.c
c.mu.RUnlock()
cm.ChannelData = chanCopy.ChannelzMetric()
cm.ID = sc.id
cm.RefName = sc.refName
cm.Trace = sc.trace.dumpData()
return cm
}
func (c *channelMap) GetSocket(id int64) *SocketMetric {
sm := &SocketMetric{}
c.mu.RLock()
if ls, ok := c.listenSockets[id]; ok {
c.mu.RUnlock()
sm.SocketData = ls.s.ChannelzMetric()
sm.ID = ls.id
sm.RefName = ls.refName
return sm
}
if ns, ok := c.normalSockets[id]; ok {
c.mu.RUnlock()
sm.SocketData = ns.s.ChannelzMetric()
sm.ID = ns.id
sm.RefName = ns.refName
return sm
}
c.mu.RUnlock()
return nil
}
func (c *channelMap) GetServer(id int64) *ServerMetric {
sm := &ServerMetric{}
var svr *server
var ok bool
c.mu.RLock()
if svr, ok = c.servers[id]; !ok {
c.mu.RUnlock()
return nil
}
sm.ListenSockets = copyMap(svr.listenSockets)
c.mu.RUnlock()
sm.ID = svr.id
sm.RefName = svr.refName
sm.ServerData = svr.s.ChannelzMetric()
return sm
db.removeEntry(id)
}
// IDGenerator is an incrementing atomic that tracks IDs for channelz entities.
@ -761,3 +220,11 @@ func (i *IDGenerator) Reset() {
func (i *IDGenerator) genID() int64 {
return atomic.AddInt64(&i.id, 1)
}
// Identifier is an opaque channelz identifier used to expose channelz symbols
// outside of grpc. Currently only implemented by Channel since no other
// types require exposure outside grpc.
type Identifier interface {
Entity
channelzIdentifier()
}

View File

@ -1,75 +0,0 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import "fmt"
// Identifier is an opaque identifier which uniquely identifies an entity in the
// channelz database.
type Identifier struct {
typ RefChannelType
id int64
str string
pid *Identifier
}
// Type returns the entity type corresponding to id.
func (id *Identifier) Type() RefChannelType {
return id.typ
}
// Int returns the integer identifier corresponding to id.
func (id *Identifier) Int() int64 {
return id.id
}
// String returns a string representation of the entity corresponding to id.
//
// This includes some information about the parent as well. Examples:
// Top-level channel: [Channel #channel-number]
// Nested channel: [Channel #parent-channel-number Channel #channel-number]
// Sub channel: [Channel #parent-channel SubChannel #subchannel-number]
func (id *Identifier) String() string {
return id.str
}
// Equal returns true if other is the same as id.
func (id *Identifier) Equal(other *Identifier) bool {
if (id != nil) != (other != nil) {
return false
}
if id == nil && other == nil {
return true
}
return id.typ == other.typ && id.id == other.id && id.pid == other.pid
}
// NewIdentifierForTesting returns a new opaque identifier to be used only for
// testing purposes.
func NewIdentifierForTesting(typ RefChannelType, id int64, pid *Identifier) *Identifier {
return newIdentifer(typ, id, pid)
}
func newIdentifer(typ RefChannelType, id int64, pid *Identifier) *Identifier {
str := fmt.Sprintf("%s #%d", typ, id)
if pid != nil {
str = fmt.Sprintf("%s %s", pid, str)
}
return &Identifier{typ: typ, id: id, str: str, pid: pid}
}

View File

@ -26,53 +26,49 @@ import (
var logger = grpclog.Component("channelz")
func withParens(id *Identifier) string {
return "[" + id.String() + "] "
}
// Info logs and adds a trace event if channelz is on.
func Info(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
func Info(l grpclog.DepthLoggerV2, e Entity, args ...any) {
AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprint(args...),
Severity: CtInfo,
})
}
// Infof logs and adds a trace event if channelz is on.
func Infof(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
func Infof(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) {
AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprintf(format, args...),
Severity: CtInfo,
})
}
// Warning logs and adds a trace event if channelz is on.
func Warning(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
func Warning(l grpclog.DepthLoggerV2, e Entity, args ...any) {
AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprint(args...),
Severity: CtWarning,
})
}
// Warningf logs and adds a trace event if channelz is on.
func Warningf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
func Warningf(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) {
AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprintf(format, args...),
Severity: CtWarning,
})
}
// Error logs and adds a trace event if channelz is on.
func Error(l grpclog.DepthLoggerV2, id *Identifier, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
func Error(l grpclog.DepthLoggerV2, e Entity, args ...any) {
AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprint(args...),
Severity: CtError,
})
}
// Errorf logs and adds a trace event if channelz is on.
func Errorf(l grpclog.DepthLoggerV2, id *Identifier, format string, args ...any) {
AddTraceEvent(l, id, 1, &TraceEventDesc{
func Errorf(l grpclog.DepthLoggerV2, e Entity, format string, args ...any) {
AddTraceEvent(l, e, 1, &TraceEvent{
Desc: fmt.Sprintf(format, args...),
Severity: CtError,
})

View File

@ -0,0 +1,119 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import (
"fmt"
"sync/atomic"
)
// Server is the channelz representation of a server.
type Server struct {
Entity
ID int64
RefName string
ServerMetrics ServerMetrics
closeCalled bool
sockets map[int64]string
listenSockets map[int64]string
cm *channelMap
}
// ServerMetrics defines a struct containing metrics for servers.
type ServerMetrics struct {
// The number of incoming calls started on the server.
CallsStarted atomic.Int64
// The number of incoming calls that have completed with an OK status.
CallsSucceeded atomic.Int64
// The number of incoming calls that have a completed with a non-OK status.
CallsFailed atomic.Int64
// The last time a call was started on the server.
LastCallStartedTimestamp atomic.Int64
}
// NewServerMetricsForTesting returns an initialized ServerMetrics.
func NewServerMetricsForTesting(started, succeeded, failed, timestamp int64) *ServerMetrics {
sm := &ServerMetrics{}
sm.CallsStarted.Store(started)
sm.CallsSucceeded.Store(succeeded)
sm.CallsFailed.Store(failed)
sm.LastCallStartedTimestamp.Store(timestamp)
return sm
}
func (sm *ServerMetrics) CopyFrom(o *ServerMetrics) {
sm.CallsStarted.Store(o.CallsStarted.Load())
sm.CallsSucceeded.Store(o.CallsSucceeded.Load())
sm.CallsFailed.Store(o.CallsFailed.Load())
sm.LastCallStartedTimestamp.Store(o.LastCallStartedTimestamp.Load())
}
// ListenSockets returns the listening sockets for s.
func (s *Server) ListenSockets() map[int64]string {
db.mu.RLock()
defer db.mu.RUnlock()
return copyMap(s.listenSockets)
}
// String returns a printable description of s.
func (s *Server) String() string {
return fmt.Sprintf("Server #%d", s.ID)
}
func (s *Server) id() int64 {
return s.ID
}
func (s *Server) addChild(id int64, e entry) {
switch v := e.(type) {
case *Socket:
switch v.SocketType {
case SocketTypeNormal:
s.sockets[id] = v.RefName
case SocketTypeListen:
s.listenSockets[id] = v.RefName
}
default:
logger.Errorf("cannot add a child (id = %d) of type %T to a server", id, e)
}
}
func (s *Server) deleteChild(id int64) {
delete(s.sockets, id)
delete(s.listenSockets, id)
s.deleteSelfIfReady()
}
func (s *Server) triggerDelete() {
s.closeCalled = true
s.deleteSelfIfReady()
}
func (s *Server) deleteSelfIfReady() {
if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 {
return
}
s.cm.deleteEntry(s.ID)
}
func (s *Server) getParentID() int64 {
return 0
}

View File

@ -0,0 +1,130 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import (
"fmt"
"net"
"sync/atomic"
"google.golang.org/grpc/credentials"
)
// SocketMetrics defines the struct that the implementor of Socket interface
// should return from ChannelzMetric().
type SocketMetrics struct {
// The number of streams that have been started.
StreamsStarted atomic.Int64
// The number of streams that have ended successfully:
// On client side, receiving frame with eos bit set.
// On server side, sending frame with eos bit set.
StreamsSucceeded atomic.Int64
// The number of streams that have ended unsuccessfully:
// On client side, termination without receiving frame with eos bit set.
// On server side, termination without sending frame with eos bit set.
StreamsFailed atomic.Int64
// The number of messages successfully sent on this socket.
MessagesSent atomic.Int64
MessagesReceived atomic.Int64
// The number of keep alives sent. This is typically implemented with HTTP/2
// ping messages.
KeepAlivesSent atomic.Int64
// The last time a stream was created by this endpoint. Usually unset for
// servers.
LastLocalStreamCreatedTimestamp atomic.Int64
// The last time a stream was created by the remote endpoint. Usually unset
// for clients.
LastRemoteStreamCreatedTimestamp atomic.Int64
// The last time a message was sent by this endpoint.
LastMessageSentTimestamp atomic.Int64
// The last time a message was received by this endpoint.
LastMessageReceivedTimestamp atomic.Int64
}
// EphemeralSocketMetrics are metrics that change rapidly and are tracked
// outside of channelz.
type EphemeralSocketMetrics struct {
// The amount of window, granted to the local endpoint by the remote endpoint.
// This may be slightly out of date due to network latency. This does NOT
// include stream level or TCP level flow control info.
LocalFlowControlWindow int64
// The amount of window, granted to the remote endpoint by the local endpoint.
// This may be slightly out of date due to network latency. This does NOT
// include stream level or TCP level flow control info.
RemoteFlowControlWindow int64
}
type SocketType string
const (
SocketTypeNormal = "NormalSocket"
SocketTypeListen = "ListenSocket"
)
type Socket struct {
Entity
SocketType SocketType
ID int64
Parent Entity
cm *channelMap
SocketMetrics SocketMetrics
EphemeralMetrics func() *EphemeralSocketMetrics
RefName string
// The locally bound address. Immutable.
LocalAddr net.Addr
// The remote bound address. May be absent. Immutable.
RemoteAddr net.Addr
// Optional, represents the name of the remote endpoint, if different than
// the original target name. Immutable.
RemoteName string
// Immutable.
SocketOptions *SocketOptionData
// Immutable.
Security credentials.ChannelzSecurityValue
}
func (ls *Socket) String() string {
return fmt.Sprintf("%s %s #%d", ls.Parent, ls.SocketType, ls.ID)
}
func (ls *Socket) id() int64 {
return ls.ID
}
func (ls *Socket) addChild(id int64, e entry) {
logger.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e)
}
func (ls *Socket) deleteChild(id int64) {
logger.Errorf("cannot delete a child (id = %d) from a listen socket", id)
}
func (ls *Socket) triggerDelete() {
ls.cm.deleteEntry(ls.ID)
ls.Parent.(entry).deleteChild(ls.ID)
}
func (ls *Socket) deleteSelfIfReady() {
logger.Errorf("cannot call deleteSelfIfReady on a listen socket")
}
func (ls *Socket) getParentID() int64 {
return ls.Parent.id()
}

View File

@ -0,0 +1,151 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import (
"fmt"
"sync/atomic"
)
// SubChannel is the channelz representation of a subchannel.
type SubChannel struct {
Entity
// ID is the channelz id of this subchannel.
ID int64
// RefName is the human readable reference string of this subchannel.
RefName string
closeCalled bool
sockets map[int64]string
parent *Channel
trace *ChannelTrace
traceRefCount int32
ChannelMetrics ChannelMetrics
}
func (sc *SubChannel) String() string {
return fmt.Sprintf("%s SubChannel #%d", sc.parent, sc.ID)
}
func (sc *SubChannel) id() int64 {
return sc.ID
}
func (sc *SubChannel) Sockets() map[int64]string {
db.mu.RLock()
defer db.mu.RUnlock()
return copyMap(sc.sockets)
}
func (sc *SubChannel) Trace() *ChannelTrace {
db.mu.RLock()
defer db.mu.RUnlock()
return sc.trace.copy()
}
func (sc *SubChannel) addChild(id int64, e entry) {
if v, ok := e.(*Socket); ok && v.SocketType == SocketTypeNormal {
sc.sockets[id] = v.RefName
} else {
logger.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e)
}
}
func (sc *SubChannel) deleteChild(id int64) {
delete(sc.sockets, id)
sc.deleteSelfIfReady()
}
func (sc *SubChannel) triggerDelete() {
sc.closeCalled = true
sc.deleteSelfIfReady()
}
func (sc *SubChannel) getParentID() int64 {
return sc.parent.ID
}
// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which
// means deleting the subchannel reference from its parent's child list.
//
// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of
// the corresponding grpc object has been invoked, and the subchannel does not have any children left.
//
// The returned boolean value indicates whether the channel has been successfully deleted from tree.
func (sc *SubChannel) deleteSelfFromTree() (deleted bool) {
if !sc.closeCalled || len(sc.sockets) != 0 {
return false
}
sc.parent.deleteChild(sc.ID)
return true
}
// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means
// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query
// the subchannel, and its memory will be garbage collected.
//
// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is
// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
// the trace of the referenced entity must not be deleted. In order to release the resource allocated
// by grpc, the reference to the grpc object is reset to a dummy object.
//
// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
//
// It returns a bool to indicate whether the channel can be safely deleted from map.
func (sc *SubChannel) deleteSelfFromMap() (delete bool) {
return sc.getTraceRefCount() == 0
}
// deleteSelfIfReady tries to delete the subchannel itself from the channelz database.
// The delete process includes two steps:
// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
// its parent's child list.
// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
// by id will return entry not found error.
func (sc *SubChannel) deleteSelfIfReady() {
if !sc.deleteSelfFromTree() {
return
}
if !sc.deleteSelfFromMap() {
return
}
db.deleteEntry(sc.ID)
sc.trace.clear()
}
func (sc *SubChannel) getChannelTrace() *ChannelTrace {
return sc.trace
}
func (sc *SubChannel) incrTraceRefCount() {
atomic.AddInt32(&sc.traceRefCount, 1)
}
func (sc *SubChannel) decrTraceRefCount() {
atomic.AddInt32(&sc.traceRefCount, -1)
}
func (sc *SubChannel) getTraceRefCount() int {
i := atomic.LoadInt32(&sc.traceRefCount)
return int(i)
}
func (sc *SubChannel) getRefName() string {
return sc.RefName
}

View File

@ -49,3 +49,17 @@ func (s *SocketOptionData) Getsockopt(fd uintptr) {
s.TCPInfo = v
}
}
// GetSocketOption gets the socket option info of the conn.
func GetSocketOption(socket any) *SocketOptionData {
c, ok := socket.(syscall.Conn)
if !ok {
return nil
}
data := &SocketOptionData{}
if rawConn, err := c.SyscallConn(); err == nil {
rawConn.Control(data.Getsockopt)
return data
}
return nil
}

View File

@ -1,5 +1,4 @@
//go:build !linux
// +build !linux
/*
*
@ -41,3 +40,8 @@ func (s *SocketOptionData) Getsockopt(fd uintptr) {
logger.Warning("Channelz: socket options are not supported on non-linux environments")
})
}
// GetSocketOption gets the socket option info of the conn.
func GetSocketOption(c any) *SocketOptionData {
return nil
}

View File

@ -0,0 +1,204 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import (
"fmt"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/grpclog"
)
const (
defaultMaxTraceEntry int32 = 30
)
var maxTraceEntry = defaultMaxTraceEntry
// SetMaxTraceEntry sets maximum number of trace entries per entity (i.e.
// channel/subchannel). Setting it to 0 will disable channel tracing.
func SetMaxTraceEntry(i int32) {
atomic.StoreInt32(&maxTraceEntry, i)
}
// ResetMaxTraceEntryToDefault resets the maximum number of trace entries per
// entity to default.
func ResetMaxTraceEntryToDefault() {
atomic.StoreInt32(&maxTraceEntry, defaultMaxTraceEntry)
}
func getMaxTraceEntry() int {
i := atomic.LoadInt32(&maxTraceEntry)
return int(i)
}
// traceEvent is an internal representation of a single trace event
type traceEvent struct {
// Desc is a simple description of the trace event.
Desc string
// Severity states the severity of this trace event.
Severity Severity
// Timestamp is the event time.
Timestamp time.Time
// RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is
// involved in this event.
// e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside [])
RefID int64
// RefName is the reference name for the entity that gets referenced in the event.
RefName string
// RefType indicates the referenced entity type, i.e Channel or SubChannel.
RefType RefChannelType
}
// TraceEvent is what the caller of AddTraceEvent should provide to describe the
// event to be added to the channel trace.
//
// The Parent field is optional. It is used for an event that will be recorded
// in the entity's parent trace.
type TraceEvent struct {
Desc string
Severity Severity
Parent *TraceEvent
}
type ChannelTrace struct {
cm *channelMap
clearCalled bool
CreationTime time.Time
EventNum int64
mu sync.Mutex
Events []*traceEvent
}
func (c *ChannelTrace) copy() *ChannelTrace {
return &ChannelTrace{
CreationTime: c.CreationTime,
EventNum: c.EventNum,
Events: append(([]*traceEvent)(nil), c.Events...),
}
}
func (c *ChannelTrace) append(e *traceEvent) {
c.mu.Lock()
if len(c.Events) == getMaxTraceEntry() {
del := c.Events[0]
c.Events = c.Events[1:]
if del.RefID != 0 {
// start recursive cleanup in a goroutine to not block the call originated from grpc.
go func() {
// need to acquire c.cm.mu lock to call the unlocked attemptCleanup func.
c.cm.mu.Lock()
c.cm.decrTraceRefCount(del.RefID)
c.cm.mu.Unlock()
}()
}
}
e.Timestamp = time.Now()
c.Events = append(c.Events, e)
c.EventNum++
c.mu.Unlock()
}
func (c *ChannelTrace) clear() {
if c.clearCalled {
return
}
c.clearCalled = true
c.mu.Lock()
for _, e := range c.Events {
if e.RefID != 0 {
// caller should have already held the c.cm.mu lock.
c.cm.decrTraceRefCount(e.RefID)
}
}
c.mu.Unlock()
}
// Severity is the severity level of a trace event.
// The canonical enumeration of all valid values is here:
// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126.
type Severity int
const (
// CtUnknown indicates unknown severity of a trace event.
CtUnknown Severity = iota
// CtInfo indicates info level severity of a trace event.
CtInfo
// CtWarning indicates warning level severity of a trace event.
CtWarning
// CtError indicates error level severity of a trace event.
CtError
)
// RefChannelType is the type of the entity being referenced in a trace event.
type RefChannelType int
const (
// RefUnknown indicates an unknown entity type, the zero value for this type.
RefUnknown RefChannelType = iota
// RefChannel indicates the referenced entity is a Channel.
RefChannel
// RefSubChannel indicates the referenced entity is a SubChannel.
RefSubChannel
// RefServer indicates the referenced entity is a Server.
RefServer
// RefListenSocket indicates the referenced entity is a ListenSocket.
RefListenSocket
// RefNormalSocket indicates the referenced entity is a NormalSocket.
RefNormalSocket
)
var refChannelTypeToString = map[RefChannelType]string{
RefUnknown: "Unknown",
RefChannel: "Channel",
RefSubChannel: "SubChannel",
RefServer: "Server",
RefListenSocket: "ListenSocket",
RefNormalSocket: "NormalSocket",
}
func (r RefChannelType) String() string {
return refChannelTypeToString[r]
}
// AddTraceEvent adds trace related to the entity with specified id, using the
// provided TraceEventDesc.
//
// If channelz is not turned ON, this will simply log the event descriptions.
func AddTraceEvent(l grpclog.DepthLoggerV2, e Entity, depth int, desc *TraceEvent) {
// Log only the trace description associated with the bottom most entity.
d := fmt.Sprintf("[%s]%s", e, desc.Desc)
switch desc.Severity {
case CtUnknown, CtInfo:
l.InfoDepth(depth+1, d)
case CtWarning:
l.WarningDepth(depth+1, d)
case CtError:
l.ErrorDepth(depth+1, d)
}
if getMaxTraceEntry() == 0 {
return
}
if IsOn() {
db.traceEvent(e.id(), desc)
}
}

View File

@ -1,727 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import (
"net"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
)
// entry represents a node in the channelz database.
type entry interface {
// addChild adds a child e, whose channelz id is id to child list
addChild(id int64, e entry)
// deleteChild deletes a child with channelz id to be id from child list
deleteChild(id int64)
// triggerDelete tries to delete self from channelz database. However, if child
// list is not empty, then deletion from the database is on hold until the last
// child is deleted from database.
triggerDelete()
// deleteSelfIfReady check whether triggerDelete() has been called before, and whether child
// list is now empty. If both conditions are met, then delete self from database.
deleteSelfIfReady()
// getParentID returns parent ID of the entry. 0 value parent ID means no parent.
getParentID() int64
}
// dummyEntry is a fake entry to handle entry not found case.
type dummyEntry struct {
idNotFound int64
}
func (d *dummyEntry) addChild(id int64, e entry) {
// Note: It is possible for a normal program to reach here under race condition.
// For example, there could be a race between ClientConn.Close() info being propagated
// to addrConn and http2Client. ClientConn.Close() cancel the context and result
// in http2Client to error. The error info is then caught by transport monitor
// and before addrConn.tearDown() is called in side ClientConn.Close(). Therefore,
// the addrConn will create a new transport. And when registering the new transport in
// channelz, its parent addrConn could have already been torn down and deleted
// from channelz tracking, and thus reach the code here.
logger.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound)
}
func (d *dummyEntry) deleteChild(id int64) {
// It is possible for a normal program to reach here under race condition.
// Refer to the example described in addChild().
logger.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound)
}
func (d *dummyEntry) triggerDelete() {
logger.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound)
}
func (*dummyEntry) deleteSelfIfReady() {
// code should not reach here. deleteSelfIfReady is always called on an existing entry.
}
func (*dummyEntry) getParentID() int64 {
return 0
}
// ChannelMetric defines the info channelz provides for a specific Channel, which
// includes ChannelInternalMetric and channelz-specific data, such as channelz id,
// child list, etc.
type ChannelMetric struct {
// ID is the channelz id of this channel.
ID int64
// RefName is the human readable reference string of this channel.
RefName string
// ChannelData contains channel internal metric reported by the channel through
// ChannelzMetric().
ChannelData *ChannelInternalMetric
// NestedChans tracks the nested channel type children of this channel in the format of
// a map from nested channel channelz id to corresponding reference string.
NestedChans map[int64]string
// SubChans tracks the subchannel type children of this channel in the format of a
// map from subchannel channelz id to corresponding reference string.
SubChans map[int64]string
// Sockets tracks the socket type children of this channel in the format of a map
// from socket channelz id to corresponding reference string.
// Note current grpc implementation doesn't allow channel having sockets directly,
// therefore, this is field is unused.
Sockets map[int64]string
// Trace contains the most recent traced events.
Trace *ChannelTrace
}
// SubChannelMetric defines the info channelz provides for a specific SubChannel,
// which includes ChannelInternalMetric and channelz-specific data, such as
// channelz id, child list, etc.
type SubChannelMetric struct {
// ID is the channelz id of this subchannel.
ID int64
// RefName is the human readable reference string of this subchannel.
RefName string
// ChannelData contains subchannel internal metric reported by the subchannel
// through ChannelzMetric().
ChannelData *ChannelInternalMetric
// NestedChans tracks the nested channel type children of this subchannel in the format of
// a map from nested channel channelz id to corresponding reference string.
// Note current grpc implementation doesn't allow subchannel to have nested channels
// as children, therefore, this field is unused.
NestedChans map[int64]string
// SubChans tracks the subchannel type children of this subchannel in the format of a
// map from subchannel channelz id to corresponding reference string.
// Note current grpc implementation doesn't allow subchannel to have subchannels
// as children, therefore, this field is unused.
SubChans map[int64]string
// Sockets tracks the socket type children of this subchannel in the format of a map
// from socket channelz id to corresponding reference string.
Sockets map[int64]string
// Trace contains the most recent traced events.
Trace *ChannelTrace
}
// ChannelInternalMetric defines the struct that the implementor of Channel interface
// should return from ChannelzMetric().
type ChannelInternalMetric struct {
// current connectivity state of the channel.
State connectivity.State
// The target this channel originally tried to connect to. May be absent
Target string
// The number of calls started on the channel.
CallsStarted int64
// The number of calls that have completed with an OK status.
CallsSucceeded int64
// The number of calls that have a completed with a non-OK status.
CallsFailed int64
// The last time a call was started on the channel.
LastCallStartedTimestamp time.Time
}
// ChannelTrace stores traced events on a channel/subchannel and related info.
type ChannelTrace struct {
// EventNum is the number of events that ever got traced (i.e. including those that have been deleted)
EventNum int64
// CreationTime is the creation time of the trace.
CreationTime time.Time
// Events stores the most recent trace events (up to $maxTraceEntry, newer event will overwrite the
// oldest one)
Events []*TraceEvent
}
// TraceEvent represent a single trace event
type TraceEvent struct {
// Desc is a simple description of the trace event.
Desc string
// Severity states the severity of this trace event.
Severity Severity
// Timestamp is the event time.
Timestamp time.Time
// RefID is the id of the entity that gets referenced in the event. RefID is 0 if no other entity is
// involved in this event.
// e.g. SubChannel (id: 4[]) Created. --> RefID = 4, RefName = "" (inside [])
RefID int64
// RefName is the reference name for the entity that gets referenced in the event.
RefName string
// RefType indicates the referenced entity type, i.e Channel or SubChannel.
RefType RefChannelType
}
// Channel is the interface that should be satisfied in order to be tracked by
// channelz as Channel or SubChannel.
type Channel interface {
ChannelzMetric() *ChannelInternalMetric
}
type dummyChannel struct{}
func (d *dummyChannel) ChannelzMetric() *ChannelInternalMetric {
return &ChannelInternalMetric{}
}
type channel struct {
refName string
c Channel
closeCalled bool
nestedChans map[int64]string
subChans map[int64]string
id int64
pid int64
cm *channelMap
trace *channelTrace
// traceRefCount is the number of trace events that reference this channel.
// Non-zero traceRefCount means the trace of this channel cannot be deleted.
traceRefCount int32
}
func (c *channel) addChild(id int64, e entry) {
switch v := e.(type) {
case *subChannel:
c.subChans[id] = v.refName
case *channel:
c.nestedChans[id] = v.refName
default:
logger.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e)
}
}
func (c *channel) deleteChild(id int64) {
delete(c.subChans, id)
delete(c.nestedChans, id)
c.deleteSelfIfReady()
}
func (c *channel) triggerDelete() {
c.closeCalled = true
c.deleteSelfIfReady()
}
func (c *channel) getParentID() int64 {
return c.pid
}
// deleteSelfFromTree tries to delete the channel from the channelz entry relation tree, which means
// deleting the channel reference from its parent's child list.
//
// In order for a channel to be deleted from the tree, it must meet the criteria that, removal of the
// corresponding grpc object has been invoked, and the channel does not have any children left.
//
// The returned boolean value indicates whether the channel has been successfully deleted from tree.
func (c *channel) deleteSelfFromTree() (deleted bool) {
if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 {
return false
}
// not top channel
if c.pid != 0 {
c.cm.findEntry(c.pid).deleteChild(c.id)
}
return true
}
// deleteSelfFromMap checks whether it is valid to delete the channel from the map, which means
// deleting the channel from channelz's tracking entirely. Users can no longer use id to query the
// channel, and its memory will be garbage collected.
//
// The trace reference count of the channel must be 0 in order to be deleted from the map. This is
// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
// the trace of the referenced entity must not be deleted. In order to release the resource allocated
// by grpc, the reference to the grpc object is reset to a dummy object.
//
// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
//
// It returns a bool to indicate whether the channel can be safely deleted from map.
func (c *channel) deleteSelfFromMap() (delete bool) {
if c.getTraceRefCount() != 0 {
c.c = &dummyChannel{}
return false
}
return true
}
// deleteSelfIfReady tries to delete the channel itself from the channelz database.
// The delete process includes two steps:
// 1. delete the channel from the entry relation tree, i.e. delete the channel reference from its
// parent's child list.
// 2. delete the channel from the map, i.e. delete the channel entirely from channelz. Lookup by id
// will return entry not found error.
func (c *channel) deleteSelfIfReady() {
if !c.deleteSelfFromTree() {
return
}
if !c.deleteSelfFromMap() {
return
}
c.cm.deleteEntry(c.id)
c.trace.clear()
}
func (c *channel) getChannelTrace() *channelTrace {
return c.trace
}
func (c *channel) incrTraceRefCount() {
atomic.AddInt32(&c.traceRefCount, 1)
}
func (c *channel) decrTraceRefCount() {
atomic.AddInt32(&c.traceRefCount, -1)
}
func (c *channel) getTraceRefCount() int {
i := atomic.LoadInt32(&c.traceRefCount)
return int(i)
}
func (c *channel) getRefName() string {
return c.refName
}
type subChannel struct {
refName string
c Channel
closeCalled bool
sockets map[int64]string
id int64
pid int64
cm *channelMap
trace *channelTrace
traceRefCount int32
}
func (sc *subChannel) addChild(id int64, e entry) {
if v, ok := e.(*normalSocket); ok {
sc.sockets[id] = v.refName
} else {
logger.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e)
}
}
func (sc *subChannel) deleteChild(id int64) {
delete(sc.sockets, id)
sc.deleteSelfIfReady()
}
func (sc *subChannel) triggerDelete() {
sc.closeCalled = true
sc.deleteSelfIfReady()
}
func (sc *subChannel) getParentID() int64 {
return sc.pid
}
// deleteSelfFromTree tries to delete the subchannel from the channelz entry relation tree, which
// means deleting the subchannel reference from its parent's child list.
//
// In order for a subchannel to be deleted from the tree, it must meet the criteria that, removal of
// the corresponding grpc object has been invoked, and the subchannel does not have any children left.
//
// The returned boolean value indicates whether the channel has been successfully deleted from tree.
func (sc *subChannel) deleteSelfFromTree() (deleted bool) {
if !sc.closeCalled || len(sc.sockets) != 0 {
return false
}
sc.cm.findEntry(sc.pid).deleteChild(sc.id)
return true
}
// deleteSelfFromMap checks whether it is valid to delete the subchannel from the map, which means
// deleting the subchannel from channelz's tracking entirely. Users can no longer use id to query
// the subchannel, and its memory will be garbage collected.
//
// The trace reference count of the subchannel must be 0 in order to be deleted from the map. This is
// specified in the channel tracing gRFC that as long as some other trace has reference to an entity,
// the trace of the referenced entity must not be deleted. In order to release the resource allocated
// by grpc, the reference to the grpc object is reset to a dummy object.
//
// deleteSelfFromMap must be called after deleteSelfFromTree returns true.
//
// It returns a bool to indicate whether the channel can be safely deleted from map.
func (sc *subChannel) deleteSelfFromMap() (delete bool) {
if sc.getTraceRefCount() != 0 {
// free the grpc struct (i.e. addrConn)
sc.c = &dummyChannel{}
return false
}
return true
}
// deleteSelfIfReady tries to delete the subchannel itself from the channelz database.
// The delete process includes two steps:
// 1. delete the subchannel from the entry relation tree, i.e. delete the subchannel reference from
// its parent's child list.
// 2. delete the subchannel from the map, i.e. delete the subchannel entirely from channelz. Lookup
// by id will return entry not found error.
func (sc *subChannel) deleteSelfIfReady() {
if !sc.deleteSelfFromTree() {
return
}
if !sc.deleteSelfFromMap() {
return
}
sc.cm.deleteEntry(sc.id)
sc.trace.clear()
}
func (sc *subChannel) getChannelTrace() *channelTrace {
return sc.trace
}
func (sc *subChannel) incrTraceRefCount() {
atomic.AddInt32(&sc.traceRefCount, 1)
}
func (sc *subChannel) decrTraceRefCount() {
atomic.AddInt32(&sc.traceRefCount, -1)
}
func (sc *subChannel) getTraceRefCount() int {
i := atomic.LoadInt32(&sc.traceRefCount)
return int(i)
}
func (sc *subChannel) getRefName() string {
return sc.refName
}
// SocketMetric defines the info channelz provides for a specific Socket, which
// includes SocketInternalMetric and channelz-specific data, such as channelz id, etc.
type SocketMetric struct {
// ID is the channelz id of this socket.
ID int64
// RefName is the human readable reference string of this socket.
RefName string
// SocketData contains socket internal metric reported by the socket through
// ChannelzMetric().
SocketData *SocketInternalMetric
}
// SocketInternalMetric defines the struct that the implementor of Socket interface
// should return from ChannelzMetric().
type SocketInternalMetric struct {
// The number of streams that have been started.
StreamsStarted int64
// The number of streams that have ended successfully:
// On client side, receiving frame with eos bit set.
// On server side, sending frame with eos bit set.
StreamsSucceeded int64
// The number of streams that have ended unsuccessfully:
// On client side, termination without receiving frame with eos bit set.
// On server side, termination without sending frame with eos bit set.
StreamsFailed int64
// The number of messages successfully sent on this socket.
MessagesSent int64
MessagesReceived int64
// The number of keep alives sent. This is typically implemented with HTTP/2
// ping messages.
KeepAlivesSent int64
// The last time a stream was created by this endpoint. Usually unset for
// servers.
LastLocalStreamCreatedTimestamp time.Time
// The last time a stream was created by the remote endpoint. Usually unset
// for clients.
LastRemoteStreamCreatedTimestamp time.Time
// The last time a message was sent by this endpoint.
LastMessageSentTimestamp time.Time
// The last time a message was received by this endpoint.
LastMessageReceivedTimestamp time.Time
// The amount of window, granted to the local endpoint by the remote endpoint.
// This may be slightly out of date due to network latency. This does NOT
// include stream level or TCP level flow control info.
LocalFlowControlWindow int64
// The amount of window, granted to the remote endpoint by the local endpoint.
// This may be slightly out of date due to network latency. This does NOT
// include stream level or TCP level flow control info.
RemoteFlowControlWindow int64
// The locally bound address.
LocalAddr net.Addr
// The remote bound address. May be absent.
RemoteAddr net.Addr
// Optional, represents the name of the remote endpoint, if different than
// the original target name.
RemoteName string
SocketOptions *SocketOptionData
Security credentials.ChannelzSecurityValue
}
// Socket is the interface that should be satisfied in order to be tracked by
// channelz as Socket.
type Socket interface {
ChannelzMetric() *SocketInternalMetric
}
type listenSocket struct {
refName string
s Socket
id int64
pid int64
cm *channelMap
}
func (ls *listenSocket) addChild(id int64, e entry) {
logger.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e)
}
func (ls *listenSocket) deleteChild(id int64) {
logger.Errorf("cannot delete a child (id = %d) from a listen socket", id)
}
func (ls *listenSocket) triggerDelete() {
ls.cm.deleteEntry(ls.id)
ls.cm.findEntry(ls.pid).deleteChild(ls.id)
}
func (ls *listenSocket) deleteSelfIfReady() {
logger.Errorf("cannot call deleteSelfIfReady on a listen socket")
}
func (ls *listenSocket) getParentID() int64 {
return ls.pid
}
type normalSocket struct {
refName string
s Socket
id int64
pid int64
cm *channelMap
}
func (ns *normalSocket) addChild(id int64, e entry) {
logger.Errorf("cannot add a child (id = %d) of type %T to a normal socket", id, e)
}
func (ns *normalSocket) deleteChild(id int64) {
logger.Errorf("cannot delete a child (id = %d) from a normal socket", id)
}
func (ns *normalSocket) triggerDelete() {
ns.cm.deleteEntry(ns.id)
ns.cm.findEntry(ns.pid).deleteChild(ns.id)
}
func (ns *normalSocket) deleteSelfIfReady() {
logger.Errorf("cannot call deleteSelfIfReady on a normal socket")
}
func (ns *normalSocket) getParentID() int64 {
return ns.pid
}
// ServerMetric defines the info channelz provides for a specific Server, which
// includes ServerInternalMetric and channelz-specific data, such as channelz id,
// child list, etc.
type ServerMetric struct {
// ID is the channelz id of this server.
ID int64
// RefName is the human readable reference string of this server.
RefName string
// ServerData contains server internal metric reported by the server through
// ChannelzMetric().
ServerData *ServerInternalMetric
// ListenSockets tracks the listener socket type children of this server in the
// format of a map from socket channelz id to corresponding reference string.
ListenSockets map[int64]string
}
// ServerInternalMetric defines the struct that the implementor of Server interface
// should return from ChannelzMetric().
type ServerInternalMetric struct {
// The number of incoming calls started on the server.
CallsStarted int64
// The number of incoming calls that have completed with an OK status.
CallsSucceeded int64
// The number of incoming calls that have a completed with a non-OK status.
CallsFailed int64
// The last time a call was started on the server.
LastCallStartedTimestamp time.Time
}
// Server is the interface to be satisfied in order to be tracked by channelz as
// Server.
type Server interface {
ChannelzMetric() *ServerInternalMetric
}
type server struct {
refName string
s Server
closeCalled bool
sockets map[int64]string
listenSockets map[int64]string
id int64
cm *channelMap
}
func (s *server) addChild(id int64, e entry) {
switch v := e.(type) {
case *normalSocket:
s.sockets[id] = v.refName
case *listenSocket:
s.listenSockets[id] = v.refName
default:
logger.Errorf("cannot add a child (id = %d) of type %T to a server", id, e)
}
}
func (s *server) deleteChild(id int64) {
delete(s.sockets, id)
delete(s.listenSockets, id)
s.deleteSelfIfReady()
}
func (s *server) triggerDelete() {
s.closeCalled = true
s.deleteSelfIfReady()
}
func (s *server) deleteSelfIfReady() {
if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 {
return
}
s.cm.deleteEntry(s.id)
}
func (s *server) getParentID() int64 {
return 0
}
type tracedChannel interface {
getChannelTrace() *channelTrace
incrTraceRefCount()
decrTraceRefCount()
getRefName() string
}
type channelTrace struct {
cm *channelMap
clearCalled bool
createdTime time.Time
eventCount int64
mu sync.Mutex
events []*TraceEvent
}
func (c *channelTrace) append(e *TraceEvent) {
c.mu.Lock()
if len(c.events) == getMaxTraceEntry() {
del := c.events[0]
c.events = c.events[1:]
if del.RefID != 0 {
// start recursive cleanup in a goroutine to not block the call originated from grpc.
go func() {
// need to acquire c.cm.mu lock to call the unlocked attemptCleanup func.
c.cm.mu.Lock()
c.cm.decrTraceRefCount(del.RefID)
c.cm.mu.Unlock()
}()
}
}
e.Timestamp = time.Now()
c.events = append(c.events, e)
c.eventCount++
c.mu.Unlock()
}
func (c *channelTrace) clear() {
if c.clearCalled {
return
}
c.clearCalled = true
c.mu.Lock()
for _, e := range c.events {
if e.RefID != 0 {
// caller should have already held the c.cm.mu lock.
c.cm.decrTraceRefCount(e.RefID)
}
}
c.mu.Unlock()
}
// Severity is the severity level of a trace event.
// The canonical enumeration of all valid values is here:
// https://github.com/grpc/grpc-proto/blob/9b13d199cc0d4703c7ea26c9c330ba695866eb23/grpc/channelz/v1/channelz.proto#L126.
type Severity int
const (
// CtUnknown indicates unknown severity of a trace event.
CtUnknown Severity = iota
// CtInfo indicates info level severity of a trace event.
CtInfo
// CtWarning indicates warning level severity of a trace event.
CtWarning
// CtError indicates error level severity of a trace event.
CtError
)
// RefChannelType is the type of the entity being referenced in a trace event.
type RefChannelType int
const (
// RefUnknown indicates an unknown entity type, the zero value for this type.
RefUnknown RefChannelType = iota
// RefChannel indicates the referenced entity is a Channel.
RefChannel
// RefSubChannel indicates the referenced entity is a SubChannel.
RefSubChannel
// RefServer indicates the referenced entity is a Server.
RefServer
// RefListenSocket indicates the referenced entity is a ListenSocket.
RefListenSocket
// RefNormalSocket indicates the referenced entity is a NormalSocket.
RefNormalSocket
)
var refChannelTypeToString = map[RefChannelType]string{
RefUnknown: "Unknown",
RefChannel: "Channel",
RefSubChannel: "SubChannel",
RefServer: "Server",
RefListenSocket: "ListenSocket",
RefNormalSocket: "NormalSocket",
}
func (r RefChannelType) String() string {
return refChannelTypeToString[r]
}
func (c *channelTrace) dumpData() *ChannelTrace {
c.mu.Lock()
ct := &ChannelTrace{EventNum: c.eventCount, CreationTime: c.createdTime}
ct.Events = c.events[:len(c.events)]
c.mu.Unlock()
return ct
}

View File

@ -1,37 +0,0 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
import (
"syscall"
)
// GetSocketOption gets the socket option info of the conn.
func GetSocketOption(socket any) *SocketOptionData {
c, ok := socket.(syscall.Conn)
if !ok {
return nil
}
data := &SocketOptionData{}
if rawConn, err := c.SyscallConn(); err == nil {
rawConn.Control(data.Getsockopt)
return data
}
return nil
}

View File

@ -1,27 +0,0 @@
//go:build !linux
// +build !linux
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package channelz
// GetSocketOption gets the socket option info of the conn.
func GetSocketOption(c any) *SocketOptionData {
return nil
}