rebase: update google.golang.org/grpc to latest version

Signed-off-by: Niels de Vos <ndevos@redhat.com>
This commit is contained in:
Niels de Vos 2021-06-17 10:07:08 +02:00 committed by mergify[bot]
parent f5dd8e769f
commit f8a08ada6d
35 changed files with 656 additions and 285 deletions

3
go.mod
View File

@ -23,7 +23,7 @@ require (
github.com/stretchr/testify v1.7.0 github.com/stretchr/testify v1.7.0
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22 golang.org/x/sys v0.0.0-20210616094352-59db8d763f22
google.golang.org/grpc v1.36.1 google.golang.org/grpc v1.38.0
k8s.io/api v0.20.6 k8s.io/api v0.20.6
k8s.io/apimachinery v0.20.6 k8s.io/apimachinery v0.20.6
k8s.io/client-go v0.20.6 k8s.io/client-go v0.20.6
@ -40,7 +40,6 @@ replace (
github.com/hashicorp/vault/api => github.com/hashicorp/vault/api v1.0.5-0.20200902155336-f9d5ce5a171a github.com/hashicorp/vault/api => github.com/hashicorp/vault/api v1.0.5-0.20200902155336-f9d5ce5a171a
github.com/hashicorp/vault/sdk => github.com/hashicorp/vault/sdk v0.1.14-0.20201116234512-b4d4137dfe8b github.com/hashicorp/vault/sdk => github.com/hashicorp/vault/sdk v0.1.14-0.20201116234512-b4d4137dfe8b
github.com/kubernetes-csi/external-snapshotter/v2 => github.com/kubernetes-csi/external-snapshotter/v2 v2.1.1-0.20200504125226-859696c419ff github.com/kubernetes-csi/external-snapshotter/v2 => github.com/kubernetes-csi/external-snapshotter/v2 v2.1.1-0.20200504125226-859696c419ff
google.golang.org/grpc => google.golang.org/grpc v1.35.0
k8s.io/api => k8s.io/api v0.20.6 k8s.io/api => k8s.io/api v0.20.6
k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.20.6 k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.20.6
k8s.io/apimachinery => k8s.io/apimachinery v0.20.6 k8s.io/apimachinery => k8s.io/apimachinery v0.20.6

40
go.sum
View File

@ -1,5 +1,6 @@
bazil.org/fuse v0.0.0-20160811212531-371fbbdaa898/go.mod h1:Xbm+BRKSBEpa4q4hTSxohYNQpsxXPbPry4JJWOB3LB8= bazil.org/fuse v0.0.0-20160811212531-371fbbdaa898/go.mod h1:Xbm+BRKSBEpa4q4hTSxohYNQpsxXPbPry4JJWOB3LB8=
bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690/go.mod h1:Ulb78X89vxKYgdL24HMTiXYHlyHEvruOj1ZPlqeNEZM= bitbucket.org/bertimus9/systemstat v0.0.0-20180207000608-0eeff89b0690/go.mod h1:Ulb78X89vxKYgdL24HMTiXYHlyHEvruOj1ZPlqeNEZM=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.37.4/go.mod h1:NHPJ89PdicEuT9hdPXMROBD91xc5uRDxsMtSB16k7hw= cloud.google.com/go v0.37.4/go.mod h1:NHPJ89PdicEuT9hdPXMROBD91xc5uRDxsMtSB16k7hw=
cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU=
@ -197,6 +198,7 @@ github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudfoundry-community/go-cfclient v0.0.0-20190201205600-f136f9222381/go.mod h1:e5+USP2j8Le2M0Jo3qKPFnNhuo1wueU4nWHCXBOfQ14= github.com/cloudfoundry-community/go-cfclient v0.0.0-20190201205600-f136f9222381/go.mod h1:e5+USP2j8Le2M0Jo3qKPFnNhuo1wueU4nWHCXBOfQ14=
github.com/clusterhq/flocker-go v0.0.0-20160920122132-2b8b7259d313/go.mod h1:P1wt9Z3DP8O6W3rvwCt0REIlshg1InHImaLW0t3ObY0= github.com/clusterhq/flocker-go v0.0.0-20160920122132-2b8b7259d313/go.mod h1:P1wt9Z3DP8O6W3rvwCt0REIlshg1InHImaLW0t3ObY0=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk= github.com/cockroachdb/cockroach-go v0.0.0-20181001143604-e0a95dfd547c/go.mod h1:XGLbWH/ujMcbPbhZq52Nv6UrCghb1yGn//133kEsvDk=
@ -295,7 +297,12 @@ github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153 h1:yUdfgN0XgIJw7fo
github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/elazarl/goproxy v0.0.0-20180725130230-947c36da3153/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs= github.com/emicklei/go-restful v2.9.5+incompatible/go.mod h1:otzb+WCGbkyDHkqmQmT5YD2WR4BBwUdeQoFo8l/7tVs=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/euank/go-kmsg-parser v2.0.0+incompatible/go.mod h1:MhmAMZ8V4CYH4ybgdRwPr2TU5ThnS43puaKEMpja1uw= github.com/euank/go-kmsg-parser v2.0.0+incompatible/go.mod h1:MhmAMZ8V4CYH4ybgdRwPr2TU5ThnS43puaKEMpja1uw=
github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
@ -432,6 +439,7 @@ github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
@ -447,7 +455,9 @@ github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4er
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e h1:1r7pUrabqp18hOBcwBwiTsbnFeTZHV9eER/QT5JVZxY=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E=
github.com/golang/mock v1.0.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.0.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw= github.com/golang/mock v1.4.0/go.mod h1:UOMv5ysSaYNkG+OFQykRIcU/QvvxJf3p21QfJ2Bt3cw=
@ -758,6 +768,7 @@ github.com/lucas-clemente/aes12 v0.0.0-20171027163421-cd47fb39b79f/go.mod h1:JpH
github.com/lucas-clemente/quic-clients v0.1.0/go.mod h1:y5xVIEoObKqULIKivu+gD/LU90pL73bTdtQjPBvtCBk= github.com/lucas-clemente/quic-clients v0.1.0/go.mod h1:y5xVIEoObKqULIKivu+gD/LU90pL73bTdtQjPBvtCBk=
github.com/lucas-clemente/quic-go v0.10.2/go.mod h1:hvaRS9IHjFLMq76puFJeWNfmn+H70QZ/CXoxqw9bzao= github.com/lucas-clemente/quic-go v0.10.2/go.mod h1:hvaRS9IHjFLMq76puFJeWNfmn+H70QZ/CXoxqw9bzao=
github.com/lucas-clemente/quic-go-certificates v0.0.0-20160823095156-d2f86524cced/go.mod h1:NCcRLrOTZbzhZvixZLlERbJtDtYsmMw8Jc4vS8Z0g58= github.com/lucas-clemente/quic-go-certificates v0.0.0-20160823095156-d2f86524cced/go.mod h1:NCcRLrOTZbzhZvixZLlERbJtDtYsmMw8Jc4vS8Z0g58=
github.com/lyft/protoc-gen-validate v0.0.13/go.mod h1:XbGvPuh87YZc5TdIa2/I4pLk0QoUACkjt2znoq26NVQ=
github.com/magiconair/properties v1.7.6/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.7.6/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
@ -1198,6 +1209,8 @@ golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMk
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181217174547-8f45f776aaf1/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190301231843-5614ed5bae6f/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
@ -1288,6 +1301,7 @@ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20171026204733-164713f0dfce/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20171026204733-164713f0dfce/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181026203630-95b1ffbd15a5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -1385,12 +1399,14 @@ golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/tools v0.0.0-20170915040203-e531a2a1c15f/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20170915040203-e531a2a1c15f/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181011042414-1f849cf54d09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181117154741-2ddaf7f79a09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181117154741-2ddaf7f79a09/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181219222714-6e267b5cc78e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20181219222714-6e267b5cc78e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190110163146-51295c7ec13a/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190110163146-51295c7ec13a/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190121143147-24cd39ecf745/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190121143147-24cd39ecf745/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190122202912-9c309ee22fab/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190122202912-9c309ee22fab/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190125232054-d66bd3c5d5a6/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190125232054-d66bd3c5d5a6/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@ -1473,6 +1489,7 @@ google.golang.org/api v0.15.1-0.20200106000736-b8fc810ca6b5/go.mod h1:iLdEw5Ide6
google.golang.org/api v0.17.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE= google.golang.org/api v0.17.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE=
google.golang.org/api v0.18.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE= google.golang.org/api v0.18.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE=
google.golang.org/api v0.20.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE= google.golang.org/api v0.20.0/go.mod h1:BwFmGc8tA3vsd7r/7kR8DY7iEEGSU04BFxCo5jP/sfE=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@ -1510,8 +1527,28 @@ google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a h1:pOwg4OoaRYScjmR4LlLgdtnyoHYTSAVhhqe5uPdpII8= google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a h1:pOwg4OoaRYScjmR4LlLgdtnyoHYTSAVhhqe5uPdpII8=
google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/grpc v1.35.0 h1:TwIQcH3es+MojMVojxxfQ3l3OF2KzlRxML2xZq0kRo8= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
google.golang.org/grpc v1.22.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.22.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.23.1/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.28.0/go.mod h1:rpkK4SK4GF4Ach/+MFLZUBavHOvF2JJB5uozKKal+60=
google.golang.org/grpc v1.29.0/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.38.0 h1:/9BgsAsa5nWe26HqOlvlgJnqBuktYOLCgjCPqsa56W0=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4= google.golang.org/protobuf v1.24.0/go.mod h1:r/3tXBNzIEhYS9I1OUVjXDlt8tc493IdKGjtUeSXeh4=
@ -1568,6 +1605,7 @@ gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81
gotest.tools/gotestsum v0.3.5/go.mod h1:Mnf3e5FUzXbkCfynWBGOwLssY7gTQgCHObK9tMpAriY= gotest.tools/gotestsum v0.3.5/go.mod h1:Mnf3e5FUzXbkCfynWBGOwLssY7gTQgCHObK9tMpAriY=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20180920025451-e3ad64cb4ed3/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -101,6 +101,9 @@ type SubConn interface {
// a new connection will be created. // a new connection will be created.
// //
// This will trigger a state transition for the SubConn. // This will trigger a state transition for the SubConn.
//
// Deprecated: This method is now part of the ClientConn interface and will
// eventually be removed from here.
UpdateAddresses([]resolver.Address) UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn. // Connect starts the connecting for this SubConn.
Connect() Connect()
@ -143,6 +146,13 @@ type ClientConn interface {
// RemoveSubConn removes the SubConn from ClientConn. // RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown. // The SubConn will be shutdown.
RemoveSubConn(SubConn) RemoveSubConn(SubConn)
// UpdateAddresses updates the addresses used in the passed in SubConn.
// gRPC checks if the currently connected address is still in the new list.
// If so, the connection will be kept. Else, the connection will be
// gracefully closed, and a new connection will be created.
//
// This will trigger a state transition for the SubConn.
UpdateAddresses(SubConn, []resolver.Address)
// UpdateState notifies gRPC that the balancer's internal state has // UpdateState notifies gRPC that the balancer's internal state has
// changed. // changed.

View File

@ -22,6 +22,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
@ -41,7 +42,7 @@ func (bb *baseBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions)
cc: cc, cc: cc,
pickerBuilder: bb.pickerBuilder, pickerBuilder: bb.pickerBuilder,
subConns: make(map[resolver.Address]balancer.SubConn), subConns: make(map[resolver.Address]subConnInfo),
scStates: make(map[balancer.SubConn]connectivity.State), scStates: make(map[balancer.SubConn]connectivity.State),
csEvltr: &balancer.ConnectivityStateEvaluator{}, csEvltr: &balancer.ConnectivityStateEvaluator{},
config: bb.config, config: bb.config,
@ -57,6 +58,11 @@ func (bb *baseBuilder) Name() string {
return bb.name return bb.name
} }
type subConnInfo struct {
subConn balancer.SubConn
attrs *attributes.Attributes
}
type baseBalancer struct { type baseBalancer struct {
cc balancer.ClientConn cc balancer.ClientConn
pickerBuilder PickerBuilder pickerBuilder PickerBuilder
@ -64,7 +70,7 @@ type baseBalancer struct {
csEvltr *balancer.ConnectivityStateEvaluator csEvltr *balancer.ConnectivityStateEvaluator
state connectivity.State state connectivity.State
subConns map[resolver.Address]balancer.SubConn // `attributes` is stripped from the keys of this map (the addresses) subConns map[resolver.Address]subConnInfo // `attributes` is stripped from the keys of this map (the addresses)
scStates map[balancer.SubConn]connectivity.State scStates map[balancer.SubConn]connectivity.State
picker balancer.Picker picker balancer.Picker
config Config config Config
@ -114,7 +120,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
aNoAttrs := a aNoAttrs := a
aNoAttrs.Attributes = nil aNoAttrs.Attributes = nil
addrsSet[aNoAttrs] = struct{}{} addrsSet[aNoAttrs] = struct{}{}
if sc, ok := b.subConns[aNoAttrs]; !ok { if scInfo, ok := b.subConns[aNoAttrs]; !ok {
// a is a new address (not existing in b.subConns). // a is a new address (not existing in b.subConns).
// //
// When creating SubConn, the original address with attributes is // When creating SubConn, the original address with attributes is
@ -125,7 +131,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue continue
} }
b.subConns[aNoAttrs] = sc b.subConns[aNoAttrs] = subConnInfo{subConn: sc, attrs: a.Attributes}
b.scStates[sc] = connectivity.Idle b.scStates[sc] = connectivity.Idle
sc.Connect() sc.Connect()
} else { } else {
@ -135,13 +141,15 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
// The SubConn does a reflect.DeepEqual of the new and old // The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same // addresses. So this is a noop if the current address is the same
// as the old one (including attributes). // as the old one (including attributes).
sc.UpdateAddresses([]resolver.Address{a}) scInfo.attrs = a.Attributes
b.subConns[aNoAttrs] = scInfo
b.cc.UpdateAddresses(scInfo.subConn, []resolver.Address{a})
} }
} }
for a, sc := range b.subConns { for a, scInfo := range b.subConns {
// a was removed by resolver. // a was removed by resolver.
if _, ok := addrsSet[a]; !ok { if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(sc) b.cc.RemoveSubConn(scInfo.subConn)
delete(b.subConns, a) delete(b.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown. // Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState. // The entry will be deleted in UpdateSubConnState.
@ -184,9 +192,10 @@ func (b *baseBalancer) regeneratePicker() {
readySCs := make(map[balancer.SubConn]SubConnInfo) readySCs := make(map[balancer.SubConn]SubConnInfo)
// Filter out all ready SCs from full subConn map. // Filter out all ready SCs from full subConn map.
for addr, sc := range b.subConns { for addr, scInfo := range b.subConns {
if st, ok := b.scStates[sc]; ok && st == connectivity.Ready { if st, ok := b.scStates[scInfo.subConn]; ok && st == connectivity.Ready {
readySCs[sc] = SubConnInfo{Address: addr} addr.Attributes = scInfo.attrs
readySCs[scInfo.subConn] = SubConnInfo{Address: addr}
} }
} }
b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs}) b.picker = b.pickerBuilder.Build(PickerBuildInfo{ReadySCs: readySCs})

View File

@ -163,6 +163,14 @@ func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain) ccb.cc.removeAddrConn(acbw.getAddrConn(), errConnDrain)
} }
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
acbw.UpdateAddresses(addrs)
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) { func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
ccb.mu.Lock() ccb.mu.Lock()
defer ccb.mu.Unlock() defer ccb.mu.Unlock()
@ -197,7 +205,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.mu.Lock() acbw.mu.Lock()
defer acbw.mu.Unlock() defer acbw.mu.Unlock()
if len(addrs) <= 0 { if len(addrs) <= 0 {
acbw.ac.tearDown(errConnDrain) acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
return return
} }
if !acbw.ac.tryUpdateAddrs(addrs) { if !acbw.ac.tryUpdateAddrs(addrs) {
@ -212,7 +220,7 @@ func (acbw *acBalancerWrapper) UpdateAddresses(addrs []resolver.Address) {
acbw.ac.acbw = nil acbw.ac.acbw = nil
acbw.ac.mu.Unlock() acbw.ac.mu.Unlock()
acState := acbw.ac.getState() acState := acbw.ac.getState()
acbw.ac.tearDown(errConnDrain) acbw.ac.cc.removeAddrConn(acbw.ac, errConnDrain)
if acState == connectivity.Shutdown { if acState == connectivity.Shutdown {
return return

View File

@ -143,6 +143,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
firstResolveEvent: grpcsync.NewEvent(), firstResolveEvent: grpcsync.NewEvent(),
} }
cc.retryThrottler.Store((*retryThrottler)(nil)) cc.retryThrottler.Store((*retryThrottler)(nil))
cc.safeConfigSelector.UpdateConfigSelector(&defaultConfigSelector{nil})
cc.ctx, cc.cancel = context.WithCancel(context.Background()) cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts { for _, opt := range opts {
@ -1197,7 +1198,7 @@ func (ac *addrConn) resetTransport() {
ac.mu.Lock() ac.mu.Lock()
if ac.state == connectivity.Shutdown { if ac.state == connectivity.Shutdown {
ac.mu.Unlock() ac.mu.Unlock()
newTr.Close() newTr.Close(fmt.Errorf("reached connectivity state: SHUTDOWN"))
return return
} }
ac.curAddr = addr ac.curAddr = addr
@ -1329,7 +1330,7 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
select { select {
case <-time.After(time.Until(connectDeadline)): case <-time.After(time.Until(connectDeadline)):
// We didn't get the preface in time. // We didn't get the preface in time.
newTr.Close() newTr.Close(fmt.Errorf("failed to receive server preface within timeout"))
channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr) channelz.Warningf(logger, ac.channelzID, "grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, nil, errors.New("timed out waiting for server handshake") return nil, nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived: case <-prefaceReceived:
@ -1446,10 +1447,9 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
} }
// tearDown starts to tear down the addrConn. // tearDown starts to tear down the addrConn.
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in //
// some edge cases (e.g., the caller opens and closes many addrConn's in a // Note that tearDown doesn't remove ac from ac.cc.conns, so the addrConn struct
// tight loop. // will leak. In most cases, call cc.removeAddrConn() instead.
// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) { func (ac *addrConn) tearDown(err error) {
ac.mu.Lock() ac.mu.Lock()
if ac.state == connectivity.Shutdown { if ac.state == connectivity.Shutdown {

View File

@ -30,7 +30,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"google.golang.org/grpc/attributes" "google.golang.org/grpc/attributes"
"google.golang.org/grpc/internal" icredentials "google.golang.org/grpc/internal/credentials"
) )
// PerRPCCredentials defines the common interface for the credentials which need to // PerRPCCredentials defines the common interface for the credentials which need to
@ -188,15 +188,12 @@ type RequestInfo struct {
AuthInfo AuthInfo AuthInfo AuthInfo
} }
// requestInfoKey is a struct to be used as the key when attaching a RequestInfo to a context object.
type requestInfoKey struct{}
// RequestInfoFromContext extracts the RequestInfo from the context if it exists. // RequestInfoFromContext extracts the RequestInfo from the context if it exists.
// //
// This API is experimental. // This API is experimental.
func RequestInfoFromContext(ctx context.Context) (ri RequestInfo, ok bool) { func RequestInfoFromContext(ctx context.Context) (ri RequestInfo, ok bool) {
ri, ok = ctx.Value(requestInfoKey{}).(RequestInfo) ri, ok = icredentials.RequestInfoFromContext(ctx).(RequestInfo)
return return ri, ok
} }
// ClientHandshakeInfo holds data to be passed to ClientHandshake. This makes // ClientHandshakeInfo holds data to be passed to ClientHandshake. This makes
@ -211,16 +208,12 @@ type ClientHandshakeInfo struct {
Attributes *attributes.Attributes Attributes *attributes.Attributes
} }
// clientHandshakeInfoKey is a struct used as the key to store
// ClientHandshakeInfo in a context.
type clientHandshakeInfoKey struct{}
// ClientHandshakeInfoFromContext returns the ClientHandshakeInfo struct stored // ClientHandshakeInfoFromContext returns the ClientHandshakeInfo struct stored
// in ctx. // in ctx.
// //
// This API is experimental. // This API is experimental.
func ClientHandshakeInfoFromContext(ctx context.Context) ClientHandshakeInfo { func ClientHandshakeInfoFromContext(ctx context.Context) ClientHandshakeInfo {
chi, _ := ctx.Value(clientHandshakeInfoKey{}).(ClientHandshakeInfo) chi, _ := icredentials.ClientHandshakeInfoFromContext(ctx).(ClientHandshakeInfo)
return chi return chi
} }
@ -249,15 +242,6 @@ func CheckSecurityLevel(ai AuthInfo, level SecurityLevel) error {
return nil return nil
} }
func init() {
internal.NewRequestInfoContext = func(ctx context.Context, ri RequestInfo) context.Context {
return context.WithValue(ctx, requestInfoKey{}, ri)
}
internal.NewClientHandshakeInfoContext = func(ctx context.Context, chi ClientHandshakeInfo) context.Context {
return context.WithValue(ctx, clientHandshakeInfoKey{}, chi)
}
}
// ChannelzSecurityInfo defines the interface that security protocols should implement // ChannelzSecurityInfo defines the interface that security protocols should implement
// in order to provide security info to channelz. // in order to provide security info to channelz.
// //

View File

@ -66,11 +66,7 @@ type dialOptions struct {
minConnectTimeout func() time.Duration minConnectTimeout func() time.Duration
defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON. defaultServiceConfig *ServiceConfig // defaultServiceConfig is parsed from defaultServiceConfigRawJSON.
defaultServiceConfigRawJSON *string defaultServiceConfigRawJSON *string
// This is used by ccResolverWrapper to backoff between successive calls to resolvers []resolver.Builder
// resolver.ResolveNow(). The user will have no need to configure this, but
// we need to be able to configure this in tests.
resolveNowBackoff func(int) time.Duration
resolvers []resolver.Builder
} }
// DialOption configures how we set up the connection. // DialOption configures how we set up the connection.
@ -596,7 +592,6 @@ func defaultDialOptions() dialOptions {
ReadBufferSize: defaultReadBufSize, ReadBufferSize: defaultReadBufSize,
UseProxy: true, UseProxy: true,
}, },
resolveNowBackoff: internalbackoff.DefaultExponential.Backoff,
} }
} }
@ -611,16 +606,6 @@ func withMinConnectDeadline(f func() time.Duration) DialOption {
}) })
} }
// withResolveNowBackoff specifies the function that clientconn uses to backoff
// between successive calls to resolver.ResolveNow().
//
// For testing purpose only.
func withResolveNowBackoff(f func(int) time.Duration) DialOption {
return newFuncDialOption(func(o *dialOptions) {
o.resolveNowBackoff = f
})
}
// WithResolvers allows a list of resolver implementations to be registered // WithResolvers allows a list of resolver implementations to be registered
// locally with the ClientConn without needing to be globally registered via // locally with the ClientConn without needing to be globally registered via
// resolver.Register. They will be matched against the scheme used for the // resolver.Register. They will be matched against the scheme used for the

View File

@ -21,6 +21,8 @@
package proto package proto
import ( import (
"fmt"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"google.golang.org/grpc/encoding" "google.golang.org/grpc/encoding"
) )
@ -36,11 +38,19 @@ func init() {
type codec struct{} type codec struct{}
func (codec) Marshal(v interface{}) ([]byte, error) { func (codec) Marshal(v interface{}) ([]byte, error) {
return proto.Marshal(v.(proto.Message)) vv, ok := v.(proto.Message)
if !ok {
return nil, fmt.Errorf("failed to marshal, message is %T, want proto.Message", v)
}
return proto.Marshal(vv)
} }
func (codec) Unmarshal(data []byte, v interface{}) error { func (codec) Unmarshal(data []byte, v interface{}) error {
return proto.Unmarshal(data, v.(proto.Message)) vv, ok := v.(proto.Message)
if !ok {
return fmt.Errorf("failed to unmarshal, message is %T, want proto.Message", v)
}
return proto.Unmarshal(data, vv)
} }
func (codec) Name() string { func (codec) Name() string {

View File

@ -4,7 +4,7 @@ go 1.11
require ( require (
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/protobuf v1.4.2 github.com/golang/protobuf v1.4.2
github.com/google/go-cmp v0.5.0 github.com/google/go-cmp v0.5.0

View File

@ -1,10 +1,8 @@
cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ= cloud.google.com/go v0.26.0 h1:e0WKqKTd5BnrG8aKH3J3h+QvEIQtSUcf2n5UZ5ZgLtQ=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk= github.com/census-instrumentation/opencensus-proto v0.2.1 h1:glEXhBS5PSLLv4IXzLA5yPRVX4bilULVyxxbrfOtDAk=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4 h1:ta993UF76GwbvJcIo3Y68y/M3WxlpEHPWIGDkJYwzJI=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 h1:cqQfy1jclcSy/FwLjemeg3SR1yaINm74aQyupQ0Bl8M= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 h1:cqQfy1jclcSy/FwLjemeg3SR1yaINm74aQyupQ0Bl8M=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
@ -12,8 +10,8 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad h1:EmNYJhPYy0pOFjCx2PrgtaBXmee0iUX9hLlxE1xHOJE= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d h1:QyzYnTnPE15SQyUeqU6qLbWxMkwyAyu+vGksa0b7j00=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
@ -43,7 +41,6 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=

View File

@ -0,0 +1,49 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package credentials
import (
"context"
)
// requestInfoKey is a struct to be used as the key to store RequestInfo in a
// context.
type requestInfoKey struct{}
// NewRequestInfoContext creates a context with ri.
func NewRequestInfoContext(ctx context.Context, ri interface{}) context.Context {
return context.WithValue(ctx, requestInfoKey{}, ri)
}
// RequestInfoFromContext extracts the RequestInfo from ctx.
func RequestInfoFromContext(ctx context.Context) interface{} {
return ctx.Value(requestInfoKey{})
}
// clientHandshakeInfoKey is a struct used as the key to store
// ClientHandshakeInfo in a context.
type clientHandshakeInfoKey struct{}
// ClientHandshakeInfoFromContext extracts the ClientHandshakeInfo from ctx.
func ClientHandshakeInfoFromContext(ctx context.Context) interface{} {
return ctx.Value(clientHandshakeInfoKey{})
}
// NewClientHandshakeInfoContext creates a context with chi.
func NewClientHandshakeInfoContext(ctx context.Context, chi interface{}) context.Context {
return context.WithValue(ctx, clientHandshakeInfoKey{}, chi)
}

View File

@ -38,12 +38,6 @@ var (
// KeepaliveMinPingTime is the minimum ping interval. This must be 10s by // KeepaliveMinPingTime is the minimum ping interval. This must be 10s by
// default, but tests may wish to set it lower for convenience. // default, but tests may wish to set it lower for convenience.
KeepaliveMinPingTime = 10 * time.Second KeepaliveMinPingTime = 10 * time.Second
// NewRequestInfoContext creates a new context based on the argument context attaching
// the passed in RequestInfo to the new context.
NewRequestInfoContext interface{} // func(context.Context, credentials.RequestInfo) context.Context
// NewClientHandshakeInfoContext returns a copy of the input context with
// the passed in ClientHandshakeInfo struct added to it.
NewClientHandshakeInfoContext interface{} // func(context.Context, credentials.ClientHandshakeInfo) context.Context
// ParseServiceConfigForTesting is for creating a fake // ParseServiceConfigForTesting is for creating a fake
// ClientConn for resolver testing only // ClientConn for resolver testing only
ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult ParseServiceConfigForTesting interface{} // func(string) *serviceconfig.ParseResult
@ -65,6 +59,11 @@ var (
// gRPC server. An xDS-enabled server needs to know what type of credentials // gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go. // is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
) )
// HealthChecker defines the signature of the client-side LB channel health checking function. // HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@ -24,6 +24,7 @@ import (
"sync" "sync"
"google.golang.org/grpc/internal/serviceconfig" "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
) )
@ -51,6 +52,74 @@ type RPCConfig struct {
Context context.Context Context context.Context
MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC MethodConfig serviceconfig.MethodConfig // configuration to use for this RPC
OnCommitted func() // Called when the RPC has been committed (retries no longer possible) OnCommitted func() // Called when the RPC has been committed (retries no longer possible)
Interceptor ClientInterceptor
}
// ClientStream is the same as grpc.ClientStream, but defined here for circular
// dependency reasons.
type ClientStream interface {
// Header returns the header metadata received from the server if there
// is any. It blocks if the metadata is not ready to read.
Header() (metadata.MD, error)
// Trailer returns the trailer metadata from the server, if there is any.
// It must only be called after stream.CloseAndRecv has returned, or
// stream.Recv has returned a non-nil error (including io.EOF).
Trailer() metadata.MD
// CloseSend closes the send direction of the stream. It closes the stream
// when non-nil error is met. It is also not safe to call CloseSend
// concurrently with SendMsg.
CloseSend() error
// Context returns the context for this stream.
//
// It should not be called until after Header or RecvMsg has returned. Once
// called, subsequent client-side retries are disabled.
Context() context.Context
// SendMsg is generally called by generated code. On error, SendMsg aborts
// the stream. If the error was generated by the client, the status is
// returned directly; otherwise, io.EOF is returned and the status of
// the stream may be discovered using RecvMsg.
//
// SendMsg blocks until:
// - There is sufficient flow control to schedule m with the transport, or
// - The stream is done, or
// - The stream breaks.
//
// SendMsg does not wait until the message is received by the server. An
// untimely stream closure may result in lost messages. To ensure delivery,
// users should ensure the RPC completed successfully using RecvMsg.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not safe
// to call SendMsg on the same stream in different goroutines. It is also
// not safe to call CloseSend concurrently with SendMsg.
SendMsg(m interface{}) error
// RecvMsg blocks until it receives a message into m or the stream is
// done. It returns io.EOF when the stream completes successfully. On
// any other error, the stream is aborted and the error contains the RPC
// status.
//
// It is safe to have a goroutine calling SendMsg and another goroutine
// calling RecvMsg on the same stream at the same time, but it is not
// safe to call RecvMsg on the same stream in different goroutines.
RecvMsg(m interface{}) error
}
// ClientInterceptor is an interceptor for gRPC client streams.
type ClientInterceptor interface {
// NewStream produces a ClientStream for an RPC which may optionally use
// the provided function to produce a stream for delegation. Note:
// RPCInfo.Context should not be used (will be nil).
//
// done is invoked when the RPC is finished using its connection, or could
// not be assigned a connection. RPC operations may still occur on
// ClientStream after done is called, since the interceptor is invoked by
// application-layer operations. done must never be nil when called.
NewStream(ctx context.Context, ri RPCInfo, done func(), newStream func(ctx context.Context, done func()) (ClientStream, error)) (ClientStream, error)
}
// ServerInterceptor is unimplementable; do not use.
type ServerInterceptor interface {
notDefined()
} }
type csKeyType string type csKeyType string

View File

@ -34,6 +34,7 @@ import (
grpclbstate "google.golang.org/grpc/balancer/grpclb/state" grpclbstate "google.golang.org/grpc/balancer/grpclb/state"
"google.golang.org/grpc/grpclog" "google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/envconfig" "google.golang.org/grpc/internal/envconfig"
"google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
@ -46,6 +47,13 @@ var EnableSRVLookups = false
var logger = grpclog.Component("dns") var logger = grpclog.Component("dns")
// Globals to stub out in tests. TODO: Perhaps these two can be combined into a
// single variable for testing the resolver?
var (
newTimer = time.NewTimer
newTimerDNSResRate = time.NewTimer
)
func init() { func init() {
resolver.Register(NewBuilder()) resolver.Register(NewBuilder())
} }
@ -143,7 +151,6 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
d.wg.Add(1) d.wg.Add(1)
go d.watcher() go d.watcher()
d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil return d, nil
} }
@ -201,28 +208,38 @@ func (d *dnsResolver) Close() {
func (d *dnsResolver) watcher() { func (d *dnsResolver) watcher() {
defer d.wg.Done() defer d.wg.Done()
backoffIndex := 1
for { for {
select {
case <-d.ctx.Done():
return
case <-d.rn:
}
state, err := d.lookup() state, err := d.lookup()
if err != nil { if err != nil {
// Report error to the underlying grpc.ClientConn.
d.cc.ReportError(err) d.cc.ReportError(err)
} else { } else {
d.cc.UpdateState(*state) err = d.cc.UpdateState(*state)
} }
// Sleep to prevent excessive re-resolutions. Incoming resolution requests var timer *time.Timer
// will be queued in d.rn. if err == nil {
t := time.NewTimer(minDNSResRate) // Success resolving, wait for the next ResolveNow. However, also wait 30 seconds at the very least
// to prevent constantly re-resolving.
backoffIndex = 1
timer = newTimerDNSResRate(minDNSResRate)
select {
case <-d.ctx.Done():
timer.Stop()
return
case <-d.rn:
}
} else {
// Poll on an error found in DNS Resolver or an error received from ClientConn.
timer = newTimer(backoff.DefaultExponential.Backoff(backoffIndex))
backoffIndex++
}
select { select {
case <-t.C:
case <-d.ctx.Done(): case <-d.ctx.Done():
t.Stop() timer.Stop()
return return
case <-timer.C:
} }
} }
} }

View File

@ -46,6 +46,22 @@ type BalancerConfig struct {
type intermediateBalancerConfig []map[string]json.RawMessage type intermediateBalancerConfig []map[string]json.RawMessage
// MarshalJSON implements the json.Marshaler interface.
//
// It marshals the balancer and config into a length-1 slice
// ([]map[string]config).
func (bc *BalancerConfig) MarshalJSON() ([]byte, error) {
if bc.Config == nil {
// If config is nil, return empty config `{}`.
return []byte(fmt.Sprintf(`[{%q: %v}]`, bc.Name, "{}")), nil
}
c, err := json.Marshal(bc.Config)
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf(`[{%q: %s}]`, bc.Name, c)), nil
}
// UnmarshalJSON implements the json.Unmarshaler interface. // UnmarshalJSON implements the json.Unmarshaler interface.
// //
// ServiceConfig contains a list of loadBalancingConfigs, each with a name and // ServiceConfig contains a list of loadBalancingConfigs, each with a name and

View File

@ -20,13 +20,17 @@ package transport
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"runtime" "runtime"
"strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"golang.org/x/net/http2/hpack" "golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpcutil"
"google.golang.org/grpc/status"
) )
var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) { var updateHeaderTblSize = func(e *hpack.Encoder, v uint32) {
@ -128,6 +132,14 @@ type cleanupStream struct {
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
type earlyAbortStream struct {
streamID uint32
contentSubtype string
status *status.Status
}
func (*earlyAbortStream) isTransportResponseFrame() bool { return false }
type dataFrame struct { type dataFrame struct {
streamID uint32 streamID uint32
endStream bool endStream bool
@ -749,6 +761,24 @@ func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
return nil return nil
} }
func (l *loopyWriter) earlyAbortStreamHandler(eas *earlyAbortStream) error {
if l.side == clientSide {
return errors.New("earlyAbortStream not handled on client")
}
headerFields := []hpack.HeaderField{
{Name: ":status", Value: "200"},
{Name: "content-type", Value: grpcutil.ContentType(eas.contentSubtype)},
{Name: "grpc-status", Value: strconv.Itoa(int(eas.status.Code()))},
{Name: "grpc-message", Value: encodeGrpcMessage(eas.status.Message())},
}
if err := l.writeHeader(eas.streamID, true, headerFields, nil); err != nil {
return err
}
return nil
}
func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error { func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
if l.side == clientSide { if l.side == clientSide {
l.draining = true l.draining = true
@ -787,6 +817,8 @@ func (l *loopyWriter) handle(i interface{}) error {
return l.registerStreamHandler(i) return l.registerStreamHandler(i)
case *cleanupStream: case *cleanupStream:
return l.cleanupStreamHandler(i) return l.cleanupStreamHandler(i)
case *earlyAbortStream:
return l.earlyAbortStreamHandler(i)
case *incomingGoAway: case *incomingGoAway:
return l.incomingGoAwayHandler(i) return l.incomingGoAwayHandler(i)
case *dataFrame: case *dataFrame:

View File

@ -32,15 +32,14 @@ import (
"golang.org/x/net/http2" "golang.org/x/net/http2"
"golang.org/x/net/http2/hpack" "golang.org/x/net/http2/hpack"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/channelz"
icredentials "google.golang.org/grpc/internal/credentials"
"google.golang.org/grpc/internal/grpcutil"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/syscall" "google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/internal/transport/networktype"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
@ -116,6 +115,9 @@ type http2Client struct {
// goAwayReason records the http2.ErrCode and debug data received with the // goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame. // GoAway frame.
goAwayReason GoAwayReason goAwayReason GoAwayReason
// goAwayDebugMessage contains a detailed human readable string about a
// GoAway frame, useful for error messages.
goAwayDebugMessage string
// A condition variable used to signal when the keepalive goroutine should // A condition variable used to signal when the keepalive goroutine should
// go dormant. The condition for dormancy is based on the number of active // go dormant. The condition for dormancy is based on the number of active
// streams and the `PermitWithoutStream` keepalive client parameter. And // streams and the `PermitWithoutStream` keepalive client parameter. And
@ -238,8 +240,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Attributes field of resolver.Address, which is shoved into connectCtx // Attributes field of resolver.Address, which is shoved into connectCtx
// and passed to the credential handshaker. This makes it possible for // and passed to the credential handshaker. This makes it possible for
// address specific arbitrary data to reach the credential handshaker. // address specific arbitrary data to reach the credential handshaker.
contextWithHandshakeInfo := internal.NewClientHandshakeInfoContext.(func(context.Context, credentials.ClientHandshakeInfo) context.Context) connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
connectCtx = contextWithHandshakeInfo(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn) conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
if err != nil { if err != nil {
return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err) return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
@ -347,12 +348,14 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
// Send connection preface to server. // Send connection preface to server.
n, err := t.conn.Write(clientPreface) n, err := t.conn.Write(clientPreface)
if err != nil { if err != nil {
t.Close() err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
return nil, connectionErrorf(true, err, "transport: failed to write client preface: %v", err) t.Close(err)
return nil, err
} }
if n != len(clientPreface) { if n != len(clientPreface) {
t.Close() err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
return nil, connectionErrorf(true, err, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface)) t.Close(err)
return nil, err
} }
var ss []http2.Setting var ss []http2.Setting
@ -370,14 +373,16 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts
} }
err = t.framer.fr.WriteSettings(ss...) err = t.framer.fr.WriteSettings(ss...)
if err != nil { if err != nil {
t.Close() err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
return nil, connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err) t.Close(err)
return nil, err
} }
// Adjust the connection flow control window if needed. // Adjust the connection flow control window if needed.
if delta := uint32(icwz - defaultWindowSize); delta > 0 { if delta := uint32(icwz - defaultWindowSize); delta > 0 {
if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil { if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
t.Close() err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) t.Close(err)
return nil, err
} }
} }
@ -414,6 +419,7 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
buf: newRecvBuffer(), buf: newRecvBuffer(),
headerChan: make(chan struct{}), headerChan: make(chan struct{}),
contentSubtype: callHdr.ContentSubtype, contentSubtype: callHdr.ContentSubtype,
doneFunc: callHdr.DoneFunc,
} }
s.wq = newWriteQuota(defaultWriteQuota, s.done) s.wq = newWriteQuota(defaultWriteQuota, s.done)
s.requestRead = func(n int) { s.requestRead = func(n int) {
@ -453,7 +459,7 @@ func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr)
Method: callHdr.Method, Method: callHdr.Method,
AuthInfo: t.authInfo, AuthInfo: t.authInfo,
} }
ctxWithRequestInfo := internal.NewRequestInfoContext.(func(context.Context, credentials.RequestInfo) context.Context)(ctx, ri) ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
authData, err := t.getTrAuthData(ctxWithRequestInfo, aud) authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
if err != nil { if err != nil {
return nil, err return nil, err
@ -832,6 +838,9 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup) t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
// This will unblock write. // This will unblock write.
close(s.done) close(s.done)
if s.doneFunc != nil {
s.doneFunc()
}
} }
// Close kicks off the shutdown process of the transport. This should be called // Close kicks off the shutdown process of the transport. This should be called
@ -841,12 +850,12 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
// This method blocks until the addrConn that initiated this transport is // This method blocks until the addrConn that initiated this transport is
// re-connected. This happens because t.onClose() begins reconnect logic at the // re-connected. This happens because t.onClose() begins reconnect logic at the
// addrConn level and blocks until the addrConn is successfully connected. // addrConn level and blocks until the addrConn is successfully connected.
func (t *http2Client) Close() error { func (t *http2Client) Close(err error) {
t.mu.Lock() t.mu.Lock()
// Make sure we only Close once. // Make sure we only Close once.
if t.state == closing { if t.state == closing {
t.mu.Unlock() t.mu.Unlock()
return nil return
} }
// Call t.onClose before setting the state to closing to prevent the client // Call t.onClose before setting the state to closing to prevent the client
// from attempting to create new streams ASAP. // from attempting to create new streams ASAP.
@ -862,13 +871,19 @@ func (t *http2Client) Close() error {
t.mu.Unlock() t.mu.Unlock()
t.controlBuf.finish() t.controlBuf.finish()
t.cancel() t.cancel()
err := t.conn.Close() t.conn.Close()
if channelz.IsOn() { if channelz.IsOn() {
channelz.RemoveEntry(t.channelzID) channelz.RemoveEntry(t.channelzID)
} }
// Append info about previous goaways if there were any, since this may be important
// for understanding the root cause for this connection to be closed.
_, goAwayDebugMessage := t.GetGoAwayReason()
if len(goAwayDebugMessage) > 0 {
err = fmt.Errorf("closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
}
// Notify all active streams. // Notify all active streams.
for _, s := range streams { for _, s := range streams {
t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, status.New(codes.Unavailable, ErrConnClosing.Desc), nil, false) t.closeStream(s, err, false, http2.ErrCodeNo, status.New(codes.Unavailable, err.Error()), nil, false)
} }
if t.statsHandler != nil { if t.statsHandler != nil {
connEnd := &stats.ConnEnd{ connEnd := &stats.ConnEnd{
@ -876,7 +891,6 @@ func (t *http2Client) Close() error {
} }
t.statsHandler.HandleConn(t.ctx, connEnd) t.statsHandler.HandleConn(t.ctx, connEnd)
} }
return err
} }
// GracefulClose sets the state to draining, which prevents new streams from // GracefulClose sets the state to draining, which prevents new streams from
@ -895,7 +909,7 @@ func (t *http2Client) GracefulClose() {
active := len(t.activeStreams) active := len(t.activeStreams)
t.mu.Unlock() t.mu.Unlock()
if active == 0 { if active == 0 {
t.Close() t.Close(ErrConnClosing)
return return
} }
t.controlBuf.put(&incomingGoAway{}) t.controlBuf.put(&incomingGoAway{})
@ -1141,9 +1155,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
} }
} }
id := f.LastStreamID id := f.LastStreamID
if id > 0 && id%2 != 1 { if id > 0 && id%2 == 0 {
t.mu.Unlock() t.mu.Unlock()
t.Close() t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
return return
} }
// A client can receive multiple GoAways from the server (see // A client can receive multiple GoAways from the server (see
@ -1161,7 +1175,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
// If there are multiple GoAways the first one should always have an ID greater than the following ones. // If there are multiple GoAways the first one should always have an ID greater than the following ones.
if id > t.prevGoAwayID { if id > t.prevGoAwayID {
t.mu.Unlock() t.mu.Unlock()
t.Close() t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
return return
} }
default: default:
@ -1191,7 +1205,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
active := len(t.activeStreams) active := len(t.activeStreams)
t.mu.Unlock() t.mu.Unlock()
if active == 0 { if active == 0 {
t.Close() t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
} }
} }
@ -1207,12 +1221,13 @@ func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = GoAwayTooManyPings t.goAwayReason = GoAwayTooManyPings
} }
} }
t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %v", f.ErrCode, string(f.DebugData()))
} }
func (t *http2Client) GetGoAwayReason() GoAwayReason { func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
return t.goAwayReason return t.goAwayReason, t.goAwayDebugMessage
} }
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) { func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
@ -1309,7 +1324,8 @@ func (t *http2Client) reader() {
// Check the validity of server preface. // Check the validity of server preface.
frame, err := t.framer.fr.ReadFrame() frame, err := t.framer.fr.ReadFrame()
if err != nil { if err != nil {
t.Close() // this kicks off resetTransport, so must be last before return err = connectionErrorf(true, err, "error reading server preface: %v", err)
t.Close(err) // this kicks off resetTransport, so must be last before return
return return
} }
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
@ -1318,7 +1334,8 @@ func (t *http2Client) reader() {
} }
sf, ok := frame.(*http2.SettingsFrame) sf, ok := frame.(*http2.SettingsFrame)
if !ok { if !ok {
t.Close() // this kicks off resetTransport, so must be last before return // this kicks off resetTransport, so must be last before return
t.Close(connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame))
return return
} }
t.onPrefaceReceipt() t.onPrefaceReceipt()
@ -1354,7 +1371,7 @@ func (t *http2Client) reader() {
continue continue
} else { } else {
// Transport error. // Transport error.
t.Close() t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
return return
} }
} }
@ -1413,7 +1430,7 @@ func (t *http2Client) keepalive() {
continue continue
} }
if outstandingPing && timeoutLeft <= 0 { if outstandingPing && timeoutLeft <= 0 {
t.Close() t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
return return
} }
t.mu.Lock() t.mu.Lock()

View File

@ -26,6 +26,7 @@ import (
"io" "io"
"math" "math"
"net" "net"
"net/http"
"strconv" "strconv"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -355,26 +356,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
if state.data.statsTrace != nil { if state.data.statsTrace != nil {
s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace) s.ctx = stats.SetIncomingTrace(s.ctx, state.data.statsTrace)
} }
if t.inTapHandle != nil {
var err error
info := &tap.Info{
FullMethodName: state.data.method,
}
s.ctx, err = t.inTapHandle(s.ctx, info)
if err != nil {
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
}
t.controlBuf.put(&cleanupStream{
streamID: s.id,
rst: true,
rstCode: http2.ErrCodeRefusedStream,
onWrite: func() {},
})
s.cancel()
return false
}
}
t.mu.Lock() t.mu.Lock()
if t.state != reachable { if t.state != reachable {
t.mu.Unlock() t.mu.Unlock()
@ -402,6 +383,39 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
return true return true
} }
t.maxStreamID = streamID t.maxStreamID = streamID
if state.data.httpMethod != http.MethodPost {
t.mu.Unlock()
if logger.V(logLevel) {
logger.Warningf("transport: http2Server.operateHeaders parsed a :method field: %v which should be POST", state.data.httpMethod)
}
t.controlBuf.put(&cleanupStream{
streamID: streamID,
rst: true,
rstCode: http2.ErrCodeProtocol,
onWrite: func() {},
})
s.cancel()
return false
}
if t.inTapHandle != nil {
var err error
if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: state.data.method}); err != nil {
t.mu.Unlock()
if logger.V(logLevel) {
logger.Infof("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
}
stat, ok := status.FromError(err)
if !ok {
stat = status.New(codes.PermissionDenied, err.Error())
}
t.controlBuf.put(&earlyAbortStream{
streamID: s.id,
contentSubtype: s.contentSubtype,
status: stat,
})
return false
}
}
t.activeStreams[streamID] = s t.activeStreams[streamID] = s
if len(t.activeStreams) == 1 { if len(t.activeStreams) == 1 {
t.idle = time.Time{} t.idle = time.Time{}

View File

@ -111,6 +111,7 @@ type parsedHeaderData struct {
timeoutSet bool timeoutSet bool
timeout time.Duration timeout time.Duration
method string method string
httpMethod string
// key-value metadata map from the peer. // key-value metadata map from the peer.
mdata map[string][]string mdata map[string][]string
statsTags []byte statsTags []byte
@ -363,6 +364,8 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) {
} }
d.data.statsTrace = v d.data.statsTrace = v
d.addMetadata(f.Name, string(v)) d.addMetadata(f.Name, string(v))
case ":method":
d.data.httpMethod = f.Value
default: default:
if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) { if isReservedHeader(f.Name) && !isWhitelistedHeader(f.Name) {
break break

View File

@ -241,6 +241,7 @@ type Stream struct {
ctx context.Context // the associated context of the stream ctx context.Context // the associated context of the stream
cancel context.CancelFunc // always nil for client side Stream cancel context.CancelFunc // always nil for client side Stream
done chan struct{} // closed at the end of stream to unblock writers. On the client side. done chan struct{} // closed at the end of stream to unblock writers. On the client side.
doneFunc func() // invoked at the end of stream on client side.
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance) ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
method string // the associated RPC method of the stream method string // the associated RPC method of the stream
recvCompress string recvCompress string
@ -611,6 +612,8 @@ type CallHdr struct {
ContentSubtype string ContentSubtype string
PreviousAttempts int // value of grpc-previous-rpc-attempts header to set PreviousAttempts int // value of grpc-previous-rpc-attempts header to set
DoneFunc func() // called when the stream is finished
} }
// ClientTransport is the common interface for all gRPC client-side transport // ClientTransport is the common interface for all gRPC client-side transport
@ -619,7 +622,7 @@ type ClientTransport interface {
// Close tears down this transport. Once it returns, the transport // Close tears down this transport. Once it returns, the transport
// should not be accessed any more. The caller must make sure this // should not be accessed any more. The caller must make sure this
// is called only once. // is called only once.
Close() error Close(err error)
// GracefulClose starts to tear down the transport: the transport will stop // GracefulClose starts to tear down the transport: the transport will stop
// accepting new RPCs and NewStream will return error. Once all streams are // accepting new RPCs and NewStream will return error. Once all streams are
@ -653,8 +656,9 @@ type ClientTransport interface {
// HTTP/2). // HTTP/2).
GoAway() <-chan struct{} GoAway() <-chan struct{}
// GetGoAwayReason returns the reason why GoAway frame was received. // GetGoAwayReason returns the reason why GoAway frame was received, along
GetGoAwayReason() GoAwayReason // with a human readable string with debug info.
GetGoAwayReason() (GoAwayReason, string)
// RemoteAddr returns the remote network address. // RemoteAddr returns the remote network address.
RemoteAddr() net.Addr RemoteAddr() net.Addr

View File

@ -0,0 +1,40 @@
/*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package internal
import (
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/resolver"
)
// handshakeClusterNameKey is the type used as the key to store cluster name in
// the Attributes field of resolver.Address.
type handshakeClusterNameKey struct{}
// SetXDSHandshakeClusterName returns a copy of addr in which the Attributes field
// is updated with the cluster name.
func SetXDSHandshakeClusterName(addr resolver.Address, clusterName string) resolver.Address {
addr.Attributes = addr.Attributes.WithValues(handshakeClusterNameKey{}, clusterName)
return addr
}
// GetXDSHandshakeClusterName returns cluster name stored in attr.
func GetXDSHandshakeClusterName(attr *attributes.Attributes) (string, bool) {
v := attr.Value(handshakeClusterNameKey{})
name, ok := v.(string)
return name, ok
}

View File

@ -75,13 +75,9 @@ func Pairs(kv ...string) MD {
panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv))) panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))
} }
md := MD{} md := MD{}
var key string for i := 0; i < len(kv); i += 2 {
for i, s := range kv { key := strings.ToLower(kv[i])
if i%2 == 0 { md[key] = append(md[key], kv[i+1])
key = strings.ToLower(s)
continue
}
md[key] = append(md[key], s)
} }
return md return md
} }
@ -195,12 +191,18 @@ func FromOutgoingContext(ctx context.Context) (MD, bool) {
return nil, false return nil, false
} }
mds := make([]MD, 0, len(raw.added)+1) out := raw.md.Copy()
mds = append(mds, raw.md) for _, added := range raw.added {
for _, vv := range raw.added { if len(added)%2 == 1 {
mds = append(mds, Pairs(vv...)) panic(fmt.Sprintf("metadata: FromOutgoingContext got an odd number of input pairs for metadata: %d", len(added)))
}
for i := 0; i < len(added); i += 2 {
key := strings.ToLower(added[i])
out[key] = append(out[key], added[i+1])
}
} }
return Join(mds...), ok return out, ok
} }
type rawMD struct { type rawMD struct {

View File

@ -84,7 +84,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(cs balancer.ClientConnState) e
b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}}) b.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Idle, Picker: &picker{result: balancer.PickResult{SubConn: b.sc}}})
b.sc.Connect() b.sc.Connect()
} else { } else {
b.sc.UpdateAddresses(cs.ResolverState.Addresses) b.cc.UpdateAddresses(b.sc, cs.ResolverState.Addresses)
b.sc.Connect() b.sc.Connect()
} }
return nil return nil

View File

@ -40,16 +40,14 @@ echo "go install cmd/protoc-gen-go-grpc"
echo "git clone https://github.com/grpc/grpc-proto" echo "git clone https://github.com/grpc/grpc-proto"
git clone --quiet https://github.com/grpc/grpc-proto ${WORKDIR}/grpc-proto git clone --quiet https://github.com/grpc/grpc-proto ${WORKDIR}/grpc-proto
echo "git clone https://github.com/protocolbuffers/protobuf"
git clone --quiet https://github.com/protocolbuffers/protobuf ${WORKDIR}/protobuf
# Pull in code.proto as a proto dependency # Pull in code.proto as a proto dependency
mkdir -p ${WORKDIR}/googleapis/google/rpc mkdir -p ${WORKDIR}/googleapis/google/rpc
echo "curl https://raw.githubusercontent.com/googleapis/googleapis/master/google/rpc/code.proto" echo "curl https://raw.githubusercontent.com/googleapis/googleapis/master/google/rpc/code.proto"
curl --silent https://raw.githubusercontent.com/googleapis/googleapis/master/google/rpc/code.proto > ${WORKDIR}/googleapis/google/rpc/code.proto curl --silent https://raw.githubusercontent.com/googleapis/googleapis/master/google/rpc/code.proto > ${WORKDIR}/googleapis/google/rpc/code.proto
# Pull in the MeshCA service proto.
mkdir -p ${WORKDIR}/istio/istio/google/security/meshca/v1
echo "curl https://raw.githubusercontent.com/istio/istio/master/security/proto/providers/google/meshca.proto"
curl --silent https://raw.githubusercontent.com/istio/istio/master/security/proto/providers/google/meshca.proto > ${WORKDIR}/istio/istio/google/security/meshca/v1/meshca.proto
mkdir -p ${WORKDIR}/out mkdir -p ${WORKDIR}/out
# Generates sources without the embed requirement # Generates sources without the embed requirement
@ -73,7 +71,6 @@ SOURCES=(
${WORKDIR}/grpc-proto/grpc/service_config/service_config.proto ${WORKDIR}/grpc-proto/grpc/service_config/service_config.proto
${WORKDIR}/grpc-proto/grpc/testing/*.proto ${WORKDIR}/grpc-proto/grpc/testing/*.proto
${WORKDIR}/grpc-proto/grpc/core/*.proto ${WORKDIR}/grpc-proto/grpc/core/*.proto
${WORKDIR}/istio/istio/google/security/meshca/v1/meshca.proto
) )
# These options of the form 'Mfoo.proto=bar' instruct the codegen to use an # These options of the form 'Mfoo.proto=bar' instruct the codegen to use an
@ -87,6 +84,7 @@ for src in ${SOURCES[@]}; do
-I"." \ -I"." \
-I${WORKDIR}/grpc-proto \ -I${WORKDIR}/grpc-proto \
-I${WORKDIR}/googleapis \ -I${WORKDIR}/googleapis \
-I${WORKDIR}/protobuf/src \
-I${WORKDIR}/istio \ -I${WORKDIR}/istio \
${src} ${src}
done done
@ -97,6 +95,7 @@ for src in ${LEGACY_SOURCES[@]}; do
-I"." \ -I"." \
-I${WORKDIR}/grpc-proto \ -I${WORKDIR}/grpc-proto \
-I${WORKDIR}/googleapis \ -I${WORKDIR}/googleapis \
-I${WORKDIR}/protobuf/src \
-I${WORKDIR}/istio \ -I${WORKDIR}/istio \
${src} ${src}
done done
@ -117,8 +116,4 @@ mv ${WORKDIR}/out/grpc/service_config/service_config.pb.go internal/proto/grpc_s
mv ${WORKDIR}/out/grpc/testing/*.pb.go interop/grpc_testing/ mv ${WORKDIR}/out/grpc/testing/*.pb.go interop/grpc_testing/
mv ${WORKDIR}/out/grpc/core/*.pb.go interop/grpc_testing/core/ mv ${WORKDIR}/out/grpc/core/*.pb.go interop/grpc_testing/core/
# istio/google/security/meshca/v1/meshca.proto does not have a go_package option.
mkdir -p ${WORKDIR}/out/google.golang.org/grpc/credentials/tls/certprovider/meshca/internal/v1/
mv ${WORKDIR}/out/istio/google/security/meshca/v1/* ${WORKDIR}/out/google.golang.org/grpc/credentials/tls/certprovider/meshca/internal/v1/
cp -R ${WORKDIR}/out/google.golang.org/grpc/* . cp -R ${WORKDIR}/out/google.golang.org/grpc/* .

View File

@ -181,7 +181,7 @@ type State struct {
// gRPC to add new methods to this interface. // gRPC to add new methods to this interface.
type ClientConn interface { type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately. // UpdateState updates the state of the ClientConn appropriately.
UpdateState(State) UpdateState(State) error
// ReportError notifies the ClientConn that the Resolver encountered an // ReportError notifies the ClientConn that the Resolver encountered an
// error. The ClientConn will notify the load balancer and begin calling // error. The ClientConn will notify the load balancer and begin calling
// ResolveNow on the Resolver with exponential backoff. // ResolveNow on the Resolver with exponential backoff.

View File

@ -22,7 +22,6 @@ import (
"fmt" "fmt"
"strings" "strings"
"sync" "sync"
"time"
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
@ -40,9 +39,6 @@ type ccResolverWrapper struct {
resolver resolver.Resolver resolver resolver.Resolver
done *grpcsync.Event done *grpcsync.Event
curState resolver.State curState resolver.State
pollingMu sync.Mutex
polling chan struct{}
} }
// newCCResolverWrapper uses the resolver.Builder to build a Resolver and // newCCResolverWrapper uses the resolver.Builder to build a Resolver and
@ -93,59 +89,19 @@ func (ccr *ccResolverWrapper) close() {
ccr.resolverMu.Unlock() ccr.resolverMu.Unlock()
} }
// poll begins or ends asynchronous polling of the resolver based on whether func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
// err is ErrBadResolverState.
func (ccr *ccResolverWrapper) poll(err error) {
ccr.pollingMu.Lock()
defer ccr.pollingMu.Unlock()
if err != balancer.ErrBadResolverState {
// stop polling
if ccr.polling != nil {
close(ccr.polling)
ccr.polling = nil
}
return
}
if ccr.polling != nil {
// already polling
return
}
p := make(chan struct{})
ccr.polling = p
go func() {
for i := 0; ; i++ {
ccr.resolveNow(resolver.ResolveNowOptions{})
t := time.NewTimer(ccr.cc.dopts.resolveNowBackoff(i))
select {
case <-p:
t.Stop()
return
case <-ccr.done.Done():
// Resolver has been closed.
t.Stop()
return
case <-t.C:
select {
case <-p:
return
default:
}
// Timer expired; re-resolve.
}
}
}()
}
func (ccr *ccResolverWrapper) UpdateState(s resolver.State) {
if ccr.done.HasFired() { if ccr.done.HasFired() {
return return nil
} }
channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s) channelz.Infof(logger, ccr.cc.channelzID, "ccResolverWrapper: sending update to cc: %v", s)
if channelz.IsOn() { if channelz.IsOn() {
ccr.addChannelzTraceEvent(s) ccr.addChannelzTraceEvent(s)
} }
ccr.curState = s ccr.curState = s
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil)) if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
return balancer.ErrBadResolverState
}
return nil
} }
func (ccr *ccResolverWrapper) ReportError(err error) { func (ccr *ccResolverWrapper) ReportError(err error) {
@ -153,7 +109,7 @@ func (ccr *ccResolverWrapper) ReportError(err error) {
return return
} }
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err) channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: reporting error to cc: %v", err)
ccr.poll(ccr.cc.updateResolverState(resolver.State{}, err)) ccr.cc.updateResolverState(resolver.State{}, err)
} }
// NewAddress is called by the resolver implementation to send addresses to gRPC. // NewAddress is called by the resolver implementation to send addresses to gRPC.
@ -166,7 +122,7 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}) ccr.addChannelzTraceEvent(resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig})
} }
ccr.curState.Addresses = addrs ccr.curState.Addresses = addrs
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil)) ccr.cc.updateResolverState(ccr.curState, nil)
} }
// NewServiceConfig is called by the resolver implementation to send service // NewServiceConfig is called by the resolver implementation to send service
@ -183,14 +139,13 @@ func (ccr *ccResolverWrapper) NewServiceConfig(sc string) {
scpr := parseServiceConfig(sc) scpr := parseServiceConfig(sc)
if scpr.Err != nil { if scpr.Err != nil {
channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err) channelz.Warningf(logger, ccr.cc.channelzID, "ccResolverWrapper: error parsing service config: %v", scpr.Err)
ccr.poll(balancer.ErrBadResolverState)
return return
} }
if channelz.IsOn() { if channelz.IsOn() {
ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr}) ccr.addChannelzTraceEvent(resolver.State{Addresses: ccr.curState.Addresses, ServiceConfig: scpr})
} }
ccr.curState.ServiceConfig = scpr ccr.curState.ServiceConfig = scpr
ccr.poll(ccr.cc.updateResolverState(ccr.curState, nil)) ccr.cc.updateResolverState(ccr.curState, nil)
} }
func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult { func (ccr *ccResolverWrapper) ParseServiceConfig(scJSON string) *serviceconfig.ParseResult {

View File

@ -429,9 +429,10 @@ func (o ContentSubtypeCallOption) before(c *callInfo) error {
} }
func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {} func (o ContentSubtypeCallOption) after(c *callInfo, attempt *csAttempt) {}
// ForceCodec returns a CallOption that will set the given Codec to be // ForceCodec returns a CallOption that will set codec to be used for all
// used for all request and response messages for a call. The result of calling // request and response messages for a call. The result of calling Name() will
// String() will be used as the content-subtype in a case-insensitive manner. // be used as the content-subtype after converting to lowercase, unless
// CallContentSubtype is also used.
// //
// See Content-Type on // See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
@ -853,7 +854,17 @@ func toRPCErr(err error) error {
// setCallInfoCodec should only be called after CallOptions have been applied. // setCallInfoCodec should only be called after CallOptions have been applied.
func setCallInfoCodec(c *callInfo) error { func setCallInfoCodec(c *callInfo) error {
if c.codec != nil { if c.codec != nil {
// codec was already set by a CallOption; use it. // codec was already set by a CallOption; use it, but set the content
// subtype if it is not set.
if c.contentSubtype == "" {
// c.codec is a baseCodec to hide the difference between grpc.Codec and
// encoding.Codec (Name vs. String method name). We only support
// setting content subtype from encoding.Codec to avoid a behavior
// change with the deprecated version.
if ec, ok := c.codec.(encoding.Codec); ok {
c.contentSubtype = strings.ToLower(ec.Name())
}
}
return nil return nil
} }
@ -888,8 +899,7 @@ type channelzData struct {
// buffer files to ensure compatibility with the gRPC version used. The latest // buffer files to ensure compatibility with the gRPC version used. The latest
// support package version is 7. // support package version is 7.
// //
// Older versions are kept for compatibility. They may be removed if // Older versions are kept for compatibility.
// compatibility cannot be maintained.
// //
// These constants should not be referenced from any other code. // These constants should not be referenced from any other code.
const ( const (

View File

@ -57,12 +57,22 @@ import (
const ( const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4 defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32 defaultServerMaxSendMessageSize = math.MaxInt32
// Server transports are tracked in a map which is keyed on listener
// address. For regular gRPC traffic, connections are accepted in Serve()
// through a call to Accept(), and we use the actual listener address as key
// when we add it to the map. But for connections received through
// ServeHTTP(), we do not have a listener and hence use this dummy value.
listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
) )
func init() { func init() {
internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials { internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
return srv.opts.creds return srv.opts.creds
} }
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
} }
var statusOK = status.New(codes.OK, "") var statusOK = status.New(codes.OK, "")
@ -107,9 +117,12 @@ type serverWorkerData struct {
type Server struct { type Server struct {
opts serverOptions opts serverOptions
mu sync.Mutex // guards following mu sync.Mutex // guards following
lis map[net.Listener]bool lis map[net.Listener]bool
conns map[transport.ServerTransport]bool // conns contains all active server transports. It is a map keyed on a
// listener address with the value being the set of active transports
// belonging to that listener.
conns map[string]map[transport.ServerTransport]bool
serve bool serve bool
drain bool drain bool
cv *sync.Cond // signaled when connections close for GracefulStop cv *sync.Cond // signaled when connections close for GracefulStop
@ -266,6 +279,35 @@ func CustomCodec(codec Codec) ServerOption {
}) })
} }
// ForceServerCodec returns a ServerOption that sets a codec for message
// marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered
// with RegisterCodec.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details. Also see the documentation on RegisterCodec and
// CallContentSubtype for more details on the interaction between encoding.Codec
// and content-subtype.
//
// This function is provided for advanced users; prefer to register codecs
// using encoding.RegisterCodec.
// The server will automatically use registered codecs based on the incoming
// requests' headers. See also
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
// Will be supported throughout 1.x.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ForceServerCodec(codec encoding.Codec) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.codec = codec
})
}
// RPCCompressor returns a ServerOption that sets a compressor for outbound // RPCCompressor returns a ServerOption that sets a compressor for outbound
// messages. For backward compatibility, all outbound messages will be sent // messages. For backward compatibility, all outbound messages will be sent
// using this compressor, regardless of incoming message compression. By // using this compressor, regardless of incoming message compression. By
@ -376,6 +418,11 @@ func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptio
// InTapHandle returns a ServerOption that sets the tap handle for all the server // InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed. // transport to be created. Only one can be installed.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func InTapHandle(h tap.ServerInHandle) ServerOption { func InTapHandle(h tap.ServerInHandle) ServerOption {
return newFuncServerOption(func(o *serverOptions) { return newFuncServerOption(func(o *serverOptions) {
if o.inTapHandle != nil { if o.inTapHandle != nil {
@ -519,7 +566,7 @@ func NewServer(opt ...ServerOption) *Server {
s := &Server{ s := &Server{
lis: make(map[net.Listener]bool), lis: make(map[net.Listener]bool),
opts: opts, opts: opts,
conns: make(map[transport.ServerTransport]bool), conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo), services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(), quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(), done: grpcsync.NewEvent(),
@ -778,7 +825,7 @@ func (s *Server) Serve(lis net.Listener) error {
// s.conns before this conn can be added. // s.conns before this conn can be added.
s.serveWG.Add(1) s.serveWG.Add(1)
go func() { go func() {
s.handleRawConn(rawConn) s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done() s.serveWG.Done()
}() }()
} }
@ -786,7 +833,7 @@ func (s *Server) Serve(lis net.Listener) error {
// handleRawConn forks a goroutine to handle a just-accepted connection that // handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet. // has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) { func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
if s.quit.HasFired() { if s.quit.HasFired() {
rawConn.Close() rawConn.Close()
return return
@ -814,15 +861,24 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
} }
rawConn.SetDeadline(time.Time{}) rawConn.SetDeadline(time.Time{})
if !s.addConn(st) { if !s.addConn(lisAddr, st) {
return return
} }
go func() { go func() {
s.serveStreams(st) s.serveStreams(st)
s.removeConn(st) s.removeConn(lisAddr, st)
}() }()
} }
func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain()
}
s.mu.Unlock()
}
// newHTTP2Transport sets up a http/2 transport (using the // newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go). // gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
@ -924,10 +980,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError) http.Error(w, err.Error(), http.StatusInternalServerError)
return return
} }
if !s.addConn(st) { if !s.addConn(listenerAddressForServeHTTP, st) {
return return
} }
defer s.removeConn(st) defer s.removeConn(listenerAddressForServeHTTP, st)
s.serveStreams(st) s.serveStreams(st)
} }
@ -955,7 +1011,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
return trInfo return trInfo
} }
func (s *Server) addConn(st transport.ServerTransport) bool { func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if s.conns == nil { if s.conns == nil {
@ -967,15 +1023,28 @@ func (s *Server) addConn(st transport.ServerTransport) bool {
// immediately. // immediately.
st.Drain() st.Drain()
} }
s.conns[st] = true
if s.conns[addr] == nil {
// Create a map entry if this is the first connection on this listener.
s.conns[addr] = make(map[transport.ServerTransport]bool)
}
s.conns[addr][st] = true
return true return true
} }
func (s *Server) removeConn(st transport.ServerTransport) { func (s *Server) removeConn(addr string, st transport.ServerTransport) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, st) conns := s.conns[addr]
if conns != nil {
delete(conns, st)
if len(conns) == 0 {
// If the last connection for this address is being removed, also
// remove the map entry corresponding to the address. This is used
// in GracefulStop() when waiting for all connections to be closed.
delete(s.conns, addr)
}
s.cv.Broadcast() s.cv.Broadcast()
} }
} }
@ -1639,7 +1708,7 @@ func (s *Server) Stop() {
s.mu.Lock() s.mu.Lock()
listeners := s.lis listeners := s.lis
s.lis = nil s.lis = nil
st := s.conns conns := s.conns
s.conns = nil s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently. // interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast() s.cv.Broadcast()
@ -1648,8 +1717,10 @@ func (s *Server) Stop() {
for lis := range listeners { for lis := range listeners {
lis.Close() lis.Close()
} }
for c := range st { for _, cs := range conns {
c.Close() for st := range cs {
st.Close()
}
} }
if s.opts.numServerWorkers > 0 { if s.opts.numServerWorkers > 0 {
s.stopServerWorkers() s.stopServerWorkers()
@ -1686,8 +1757,10 @@ func (s *Server) GracefulStop() {
} }
s.lis = nil s.lis = nil
if !s.drain { if !s.drain {
for st := range s.conns { for _, conns := range s.conns {
st.Drain() for st := range conns {
st.Drain()
}
} }
s.drain = true s.drain = true
} }

View File

@ -73,9 +73,11 @@ func FromProto(s *spb.Status) *Status {
return status.FromProto(s) return status.FromProto(s)
} }
// FromError returns a Status representing err if it was produced from this // FromError returns a Status representing err if it was produced by this
// package or has a method `GRPCStatus() *Status`. Otherwise, ok is false and a // package or has a method `GRPCStatus() *Status`.
// Status is returned with codes.Unknown and the original error message. // If err is nil, a Status is returned with codes.OK and no message.
// Otherwise, ok is false and a Status is returned with codes.Unknown and
// the original error message.
func FromError(err error) (s *Status, ok bool) { func FromError(err error) (s *Status, ok bool) {
if err == nil { if err == nil {
return nil, true return nil, true

View File

@ -52,14 +52,20 @@ import (
// of the RPC. // of the RPC.
type StreamHandler func(srv interface{}, stream ServerStream) error type StreamHandler func(srv interface{}, stream ServerStream) error
// StreamDesc represents a streaming RPC service's method specification. // StreamDesc represents a streaming RPC service's method specification. Used
// on the server when registering services and on the client when initiating
// new streams.
type StreamDesc struct { type StreamDesc struct {
StreamName string // StreamName and Handler are only used when registering handlers on a
Handler StreamHandler // server.
StreamName string // the name of the method excluding the service
Handler StreamHandler // the handler called for the method
// At least one of these is true. // ServerStreams and ClientStreams are used for registering handlers on a
ServerStreams bool // server as well as defining RPC behavior when passed to NewClientStream
ClientStreams bool // and ClientConn.NewStream. At least one must be true.
ServerStreams bool // indicates the server can perform streaming sends
ClientStreams bool // indicates the client can perform streaming sends
} }
// Stream defines the common interface a client or server stream has to satisfy. // Stream defines the common interface a client or server stream has to satisfy.
@ -166,7 +172,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
} }
}() }()
} }
c := defaultCallInfo()
// Provide an opportunity for the first RPC to see the first service config // Provide an opportunity for the first RPC to see the first service config
// provided by the resolver. // provided by the resolver.
if err := cc.waitForResolvedAddrs(ctx); err != nil { if err := cc.waitForResolvedAddrs(ctx); err != nil {
@ -175,18 +180,40 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
var mc serviceconfig.MethodConfig var mc serviceconfig.MethodConfig
var onCommit func() var onCommit func()
rpcConfig, err := cc.safeConfigSelector.SelectConfig(iresolver.RPCInfo{Context: ctx, Method: method}) var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
if err != nil { return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
return nil, status.Convert(err).Err()
} }
rpcInfo := iresolver.RPCInfo{Context: ctx, Method: method}
rpcConfig, err := cc.safeConfigSelector.SelectConfig(rpcInfo)
if err != nil {
return nil, toRPCErr(err)
}
if rpcConfig != nil { if rpcConfig != nil {
if rpcConfig.Context != nil { if rpcConfig.Context != nil {
ctx = rpcConfig.Context ctx = rpcConfig.Context
} }
mc = rpcConfig.MethodConfig mc = rpcConfig.MethodConfig
onCommit = rpcConfig.OnCommitted onCommit = rpcConfig.OnCommitted
if rpcConfig.Interceptor != nil {
rpcInfo.Context = nil
ns := newStream
newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
cs, err := rpcConfig.Interceptor.NewStream(ctx, rpcInfo, done, ns)
if err != nil {
return nil, toRPCErr(err)
}
return cs, nil
}
}
} }
return newStream(ctx, func() {})
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
c := defaultCallInfo()
if mc.WaitForReady != nil { if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady c.failFast = !*mc.WaitForReady
} }
@ -223,6 +250,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
Host: cc.authority, Host: cc.authority,
Method: method, Method: method,
ContentSubtype: c.contentSubtype, ContentSubtype: c.contentSubtype,
DoneFunc: doneFunc,
} }
// Set our outgoing compression according to the UseCompressor CallOption, if // Set our outgoing compression according to the UseCompressor CallOption, if

View File

@ -37,16 +37,16 @@ type Info struct {
// TODO: More to be added. // TODO: More to be added.
} }
// ServerInHandle defines the function which runs before a new stream is created // ServerInHandle defines the function which runs before a new stream is
// on the server side. If it returns a non-nil error, the stream will not be // created on the server side. If it returns a non-nil error, the stream will
// created and a RST_STREAM will be sent back to the client with REFUSED_STREAM. // not be created and an error will be returned to the client. If the error
// The client will receive an RPC error "code = Unavailable, desc = stream // returned is a status error, that status code and message will be used,
// terminated by RST_STREAM with error code: REFUSED_STREAM". // otherwise PermissionDenied will be the code and err.Error() will be the
// message.
// //
// It's intended to be used in situations where you don't want to waste the // It's intended to be used in situations where you don't want to waste the
// resources to accept the new stream (e.g. rate-limiting). And the content of // resources to accept the new stream (e.g. rate-limiting). For other general
// the error will be ignored and won't be sent back to the client. For other // usages, please use interceptors.
// general usages, please use interceptors.
// //
// Note that it is executed in the per-connection I/O goroutine(s) instead of // Note that it is executed in the per-connection I/O goroutine(s) instead of
// per-RPC goroutine. Therefore, users should NOT have any // per-RPC goroutine. Therefore, users should NOT have any

View File

@ -19,4 +19,4 @@
package grpc package grpc
// Version is the current grpc version. // Version is the current grpc version.
const Version = "1.35.0" const Version = "1.38.0"

43
vendor/google.golang.org/grpc/vet.sh generated vendored
View File

@ -28,7 +28,8 @@ cleanup() {
} }
trap cleanup EXIT trap cleanup EXIT
PATH="${GOPATH}/bin:${GOROOT}/bin:${PATH}" PATH="${HOME}/go/bin:${GOROOT}/bin:${PATH}"
go version
if [[ "$1" = "-install" ]]; then if [[ "$1" = "-install" ]]; then
# Check for module support # Check for module support
@ -104,12 +105,6 @@ git grep '"github.com/envoyproxy/go-control-plane/envoy' -- '*.go' ':(exclude)*.
# TODO: Remove when we drop Go 1.10 support # TODO: Remove when we drop Go 1.10 support
go list -f {{.Dir}} ./... | xargs go run test/go_vet/vet.go go list -f {{.Dir}} ./... | xargs go run test/go_vet/vet.go
# - gofmt, goimports, golint (with exceptions for generated code), go vet.
gofmt -s -d -l . 2>&1 | fail_on_output
goimports -l . 2>&1 | not grep -vE "\.pb\.go"
golint ./... 2>&1 | not grep -vE "\.pb\.go:"
go vet -all ./...
misspell -error . misspell -error .
# - Check that generated proto files are up to date. # - Check that generated proto files are up to date.
@ -119,12 +114,22 @@ if [[ -z "${VET_SKIP_PROTO}" ]]; then
(git status; git --no-pager diff; exit 1) (git status; git --no-pager diff; exit 1)
fi fi
# - Check that our modules are tidy. # - gofmt, goimports, golint (with exceptions for generated code), go vet,
if go help mod >& /dev/null; then # go mod tidy.
find . -name 'go.mod' | xargs -IXXX bash -c 'cd $(dirname XXX); go mod tidy' # Perform these checks on each module inside gRPC.
for MOD_FILE in $(find . -name 'go.mod'); do
MOD_DIR=$(dirname ${MOD_FILE})
pushd ${MOD_DIR}
go vet -all ./... | fail_on_output
gofmt -s -d -l . 2>&1 | fail_on_output
goimports -l . 2>&1 | not grep -vE "\.pb\.go"
golint ./... 2>&1 | not grep -vE "/testv3\.pb\.go:"
go mod tidy
git status --porcelain 2>&1 | fail_on_output || \ git status --porcelain 2>&1 | fail_on_output || \
(git status; git --no-pager diff; exit 1) (git status; git --no-pager diff; exit 1)
fi popd
done
# - Collection of static analysis checks # - Collection of static analysis checks
# #
@ -141,8 +146,11 @@ not grep -Fv '.CredsBundle
.NewAddress .NewAddress
.NewServiceConfig .NewServiceConfig
.Type is deprecated: use Attributes .Type is deprecated: use Attributes
BuildVersion is deprecated
balancer.ErrTransientFailure balancer.ErrTransientFailure
balancer.Picker balancer.Picker
extDesc.Filename is deprecated
github.com/golang/protobuf/jsonpb is deprecated
grpc.CallCustomCodec grpc.CallCustomCodec
grpc.Code grpc.Code
grpc.Compressor grpc.Compressor
@ -164,13 +172,7 @@ grpc.WithServiceConfig
grpc.WithTimeout grpc.WithTimeout
http.CloseNotifier http.CloseNotifier
info.SecurityVersion info.SecurityVersion
resolver.Backend
resolver.GRPCLB
extDesc.Filename is deprecated
BuildVersion is deprecated
github.com/golang/protobuf/jsonpb is deprecated
proto is deprecated proto is deprecated
xxx_messageInfo_
proto.InternalMessageInfo is deprecated proto.InternalMessageInfo is deprecated
proto.EnumName is deprecated proto.EnumName is deprecated
proto.ErrInternalBadWireType is deprecated proto.ErrInternalBadWireType is deprecated
@ -184,7 +186,12 @@ proto.RegisterExtension is deprecated
proto.RegisteredExtension is deprecated proto.RegisteredExtension is deprecated
proto.RegisteredExtensions is deprecated proto.RegisteredExtensions is deprecated
proto.RegisterMapType is deprecated proto.RegisterMapType is deprecated
proto.Unmarshaler is deprecated' "${SC_OUT}" proto.Unmarshaler is deprecated
resolver.Backend
resolver.GRPCLB
Target is deprecated: Use the Target field in the BuildOptions instead.
xxx_messageInfo_
' "${SC_OUT}"
# - special golint on package comments. # - special golint on package comments.
lint_package_comment_per_package() { lint_package_comment_per_package() {

3
vendor/modules.txt vendored
View File

@ -354,7 +354,7 @@ google.golang.org/appengine/internal/urlfetch
google.golang.org/appengine/urlfetch google.golang.org/appengine/urlfetch
# google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a # google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a
google.golang.org/genproto/googleapis/rpc/status google.golang.org/genproto/googleapis/rpc/status
# google.golang.org/grpc v1.36.1 => google.golang.org/grpc v1.35.0 # google.golang.org/grpc v1.38.0
## explicit ## explicit
google.golang.org/grpc google.golang.org/grpc
google.golang.org/grpc/attributes google.golang.org/grpc/attributes
@ -921,7 +921,6 @@ sigs.k8s.io/yaml
# github.com/hashicorp/vault/api => github.com/hashicorp/vault/api v1.0.5-0.20200902155336-f9d5ce5a171a # github.com/hashicorp/vault/api => github.com/hashicorp/vault/api v1.0.5-0.20200902155336-f9d5ce5a171a
# github.com/hashicorp/vault/sdk => github.com/hashicorp/vault/sdk v0.1.14-0.20201116234512-b4d4137dfe8b # github.com/hashicorp/vault/sdk => github.com/hashicorp/vault/sdk v0.1.14-0.20201116234512-b4d4137dfe8b
# github.com/kubernetes-csi/external-snapshotter/v2 => github.com/kubernetes-csi/external-snapshotter/v2 v2.1.1-0.20200504125226-859696c419ff # github.com/kubernetes-csi/external-snapshotter/v2 => github.com/kubernetes-csi/external-snapshotter/v2 v2.1.1-0.20200504125226-859696c419ff
# google.golang.org/grpc => google.golang.org/grpc v1.35.0
# k8s.io/api => k8s.io/api v0.20.6 # k8s.io/api => k8s.io/api v0.20.6
# k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.20.6 # k8s.io/apiextensions-apiserver => k8s.io/apiextensions-apiserver v0.20.6
# k8s.io/apimachinery => k8s.io/apimachinery v0.20.6 # k8s.io/apimachinery => k8s.io/apimachinery v0.20.6