rebase: upgrade snapshot v1beta1 api and clientset to v1 version

At present e2e snapshot code make use of snapshot v1beta1 api and client
This patch address the same and snapshot api clientset/apis are updated
to v1 version.

Signed-off-by: Humble Chirammal <hchiramm@redhat.com>
This commit is contained in:
Humble Chirammal
2021-06-23 14:22:11 +05:30
committed by mergify[bot]
parent 0749315d7e
commit 5aca04d540
58 changed files with 1561 additions and 1285 deletions

View File

@ -57,22 +57,12 @@ import (
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
// Server transports are tracked in a map which is keyed on listener
// address. For regular gRPC traffic, connections are accepted in Serve()
// through a call to Accept(), and we use the actual listener address as key
// when we add it to the map. But for connections received through
// ServeHTTP(), we do not have a listener and hence use this dummy value.
listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)
func init() {
internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
return srv.opts.creds
}
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
}
var statusOK = status.New(codes.OK, "")
@ -117,12 +107,9 @@ type serverWorkerData struct {
type Server struct {
opts serverOptions
mu sync.Mutex // guards following
lis map[net.Listener]bool
// conns contains all active server transports. It is a map keyed on a
// listener address with the value being the set of active transports
// belonging to that listener.
conns map[string]map[transport.ServerTransport]bool
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
@ -279,35 +266,6 @@ func CustomCodec(codec Codec) ServerOption {
})
}
// ForceServerCodec returns a ServerOption that sets a codec for message
// marshaling and unmarshaling.
//
// This will override any lookups by content-subtype for Codecs registered
// with RegisterCodec.
//
// See Content-Type on
// https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests for
// more details. Also see the documentation on RegisterCodec and
// CallContentSubtype for more details on the interaction between encoding.Codec
// and content-subtype.
//
// This function is provided for advanced users; prefer to register codecs
// using encoding.RegisterCodec.
// The server will automatically use registered codecs based on the incoming
// requests' headers. See also
// https://github.com/grpc/grpc-go/blob/master/Documentation/encoding.md#using-a-codec.
// Will be supported throughout 1.x.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func ForceServerCodec(codec encoding.Codec) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
o.codec = codec
})
}
// RPCCompressor returns a ServerOption that sets a compressor for outbound
// messages. For backward compatibility, all outbound messages will be sent
// using this compressor, regardless of incoming message compression. By
@ -418,11 +376,6 @@ func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptio
// InTapHandle returns a ServerOption that sets the tap handle for all the server
// transport to be created. Only one can be installed.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func InTapHandle(h tap.ServerInHandle) ServerOption {
return newFuncServerOption(func(o *serverOptions) {
if o.inTapHandle != nil {
@ -566,7 +519,7 @@ func NewServer(opt ...ServerOption) *Server {
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[string]map[transport.ServerTransport]bool),
conns: make(map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
@ -825,7 +778,7 @@ func (s *Server) Serve(lis net.Listener) error {
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
@ -833,7 +786,7 @@ func (s *Server) Serve(lis net.Listener) error {
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
func (s *Server) handleRawConn(rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
@ -861,24 +814,15 @@ func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
}
rawConn.SetDeadline(time.Time{})
if !s.addConn(lisAddr, st) {
if !s.addConn(st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(lisAddr, st)
s.removeConn(st)
}()
}
func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain()
}
s.mu.Unlock()
}
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
@ -980,10 +924,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !s.addConn(listenerAddressForServeHTTP, st) {
if !s.addConn(st) {
return
}
defer s.removeConn(listenerAddressForServeHTTP, st)
defer s.removeConn(st)
s.serveStreams(st)
}
@ -1011,7 +955,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
return trInfo
}
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
func (s *Server) addConn(st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
@ -1023,28 +967,15 @@ func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
// immediately.
st.Drain()
}
if s.conns[addr] == nil {
// Create a map entry if this is the first connection on this listener.
s.conns[addr] = make(map[transport.ServerTransport]bool)
}
s.conns[addr][st] = true
s.conns[st] = true
return true
}
func (s *Server) removeConn(addr string, st transport.ServerTransport) {
func (s *Server) removeConn(st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
conns := s.conns[addr]
if conns != nil {
delete(conns, st)
if len(conns) == 0 {
// If the last connection for this address is being removed, also
// remove the map entry corresponding to the address. This is used
// in GracefulStop() when waiting for all connections to be closed.
delete(s.conns, addr)
}
if s.conns != nil {
delete(s.conns, st)
s.cv.Broadcast()
}
}
@ -1708,7 +1639,7 @@ func (s *Server) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
conns := s.conns
st := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast()
@ -1717,10 +1648,8 @@ func (s *Server) Stop() {
for lis := range listeners {
lis.Close()
}
for _, cs := range conns {
for st := range cs {
st.Close()
}
for c := range st {
c.Close()
}
if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
@ -1757,10 +1686,8 @@ func (s *Server) GracefulStop() {
}
s.lis = nil
if !s.drain {
for _, conns := range s.conns {
for st := range conns {
st.Drain()
}
for st := range s.conns {
st.Drain()
}
s.drain = true
}