mirror of
synced 2025-03-16 12:29:30 +00:00
Uses github.com/libopenstorage/secrets to communicate with Vault. This removes the need for maintaining our own limited Vault APIs. By adding the new dependency, several other packages got updated in the process. Unused indirect dependencies have been removed from go.mod. Signed-off-by: Niels de Vos <ndevos@redhat.com>
959 lines
23 KiB
959 lines
23 KiB
package spdystream
import (
var (
ErrInvalidStreamId = errors.New("Invalid stream id")
ErrTimeout = errors.New("Timeout occured")
ErrReset = errors.New("Stream reset")
ErrWriteClosedStream = errors.New("Write on closed stream")
const (
type StreamHandler func(stream *Stream)
type AuthHandler func(header http.Header, slot uint8, parent uint32) bool
type idleAwareFramer struct {
f *spdy.Framer
conn *Connection
writeLock sync.Mutex
resetChan chan struct{}
setTimeoutLock sync.Mutex
setTimeoutChan chan time.Duration
timeout time.Duration
func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer {
iaf := &idleAwareFramer{
f: framer,
resetChan: make(chan struct{}, 2),
// setTimeoutChan needs to be buffered to avoid deadlocks when calling setIdleTimeout at about
// the same time the connection is being closed
setTimeoutChan: make(chan time.Duration, 1),
return iaf
func (i *idleAwareFramer) monitor() {
var (
timer *time.Timer
expired <-chan time.Time
resetChan = i.resetChan
setTimeoutChan = i.setTimeoutChan
for {
select {
case timeout := <-i.setTimeoutChan:
i.timeout = timeout
if timeout == 0 {
if timer != nil {
} else {
if timer == nil {
timer = time.NewTimer(timeout)
expired = timer.C
} else {
case <-resetChan:
if timer != nil && i.timeout > 0 {
case <-expired:
streams := i.conn.streams
i.conn.streams = make(map[spdy.StreamId]*Stream)
go func() {
for _, stream := range streams {
case <-i.conn.closeChan:
if timer != nil {
// Start a goroutine to drain resetChan. This is needed because we've seen
// some unit tests with large numbers of goroutines get into a situation
// where resetChan fills up, at least 1 call to Write() is still trying to
// send to resetChan, the connection gets closed, and this case statement
// attempts to grab the write lock that Write() already has, causing a
// deadlock.
// See https://github.com/docker/spdystream/issues/49 for more details.
go func() {
for _ = range resetChan {
go func() {
for _ = range setTimeoutChan {
i.resetChan = nil
i.setTimeoutChan = nil
break Loop
// Drain resetChan
for _ = range resetChan {
func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
defer i.writeLock.Unlock()
if i.resetChan == nil {
return io.EOF
err := i.f.WriteFrame(frame)
if err != nil {
return err
i.resetChan <- struct{}{}
return nil
func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
frame, err := i.f.ReadFrame()
if err != nil {
return nil, err
// resetChan should never be closed since it is only closed
// when the connection has closed its closeChan. This closure
// only occurs after all Reads have finished
// TODO (dmcgowan): refactor relationship into connection
i.resetChan <- struct{}{}
return frame, nil
func (i *idleAwareFramer) setIdleTimeout(timeout time.Duration) {
defer i.setTimeoutLock.Unlock()
if i.setTimeoutChan == nil {
i.setTimeoutChan <- timeout
type Connection struct {
conn net.Conn
framer *idleAwareFramer
closeChan chan bool
goneAway bool
lastStreamChan chan<- *Stream
goAwayTimeout time.Duration
closeTimeout time.Duration
streamLock *sync.RWMutex
streamCond *sync.Cond
streams map[spdy.StreamId]*Stream
nextIdLock sync.Mutex
receiveIdLock sync.Mutex
nextStreamId spdy.StreamId
receivedStreamId spdy.StreamId
pingIdLock sync.Mutex
pingId uint32
pingChans map[uint32]chan error
shutdownLock sync.Mutex
shutdownChan chan error
hasShutdown bool
// for testing https://github.com/docker/spdystream/pull/56
dataFrameHandler func(*spdy.DataFrame) error
// NewConnection creates a new spdy connection from an existing
// network connection.
func NewConnection(conn net.Conn, server bool) (*Connection, error) {
framer, framerErr := spdy.NewFramer(conn, conn)
if framerErr != nil {
return nil, framerErr
idleAwareFramer := newIdleAwareFramer(framer)
var sid spdy.StreamId
var rid spdy.StreamId
var pid uint32
if server {
sid = 2
rid = 1
pid = 2
} else {
sid = 1
rid = 2
pid = 1
streamLock := new(sync.RWMutex)
streamCond := sync.NewCond(streamLock)
session := &Connection{
conn: conn,
framer: idleAwareFramer,
closeChan: make(chan bool),
goAwayTimeout: time.Duration(0),
closeTimeout: time.Duration(0),
streamLock: streamLock,
streamCond: streamCond,
streams: make(map[spdy.StreamId]*Stream),
nextStreamId: sid,
receivedStreamId: rid,
pingId: pid,
pingChans: make(map[uint32]chan error),
shutdownChan: make(chan error),
session.dataFrameHandler = session.handleDataFrame
idleAwareFramer.conn = session
go idleAwareFramer.monitor()
return session, nil
// Ping sends a ping frame across the connection and
// returns the response time
func (s *Connection) Ping() (time.Duration, error) {
pid := s.pingId
if s.pingId > 0x7ffffffe {
s.pingId = s.pingId - 0x7ffffffe
} else {
s.pingId = s.pingId + 2
pingChan := make(chan error)
s.pingChans[pid] = pingChan
defer delete(s.pingChans, pid)
frame := &spdy.PingFrame{Id: pid}
startTime := time.Now()
writeErr := s.framer.WriteFrame(frame)
if writeErr != nil {
return time.Duration(0), writeErr
select {
case <-s.closeChan:
return time.Duration(0), errors.New("connection closed")
case err, ok := <-pingChan:
if ok && err != nil {
return time.Duration(0), err
return time.Now().Sub(startTime), nil
// Serve handles frames sent from the server, including reply frames
// which are needed to fully initiate connections. Both clients and servers
// should call Serve in a separate goroutine before creating streams.
func (s *Connection) Serve(newHandler StreamHandler) {
// use a WaitGroup to wait for all frames to be drained after receiving
// go-away.
var wg sync.WaitGroup
// Parition queues to ensure stream frames are handled
// by the same worker, ensuring order is maintained
frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
for i := 0; i < FRAME_WORKERS; i++ {
frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE)
// Ensure frame queue is drained when connection is closed
go func(frameQueue *PriorityFrameQueue) {
go func(frameQueue *PriorityFrameQueue) {
// let the WaitGroup know this worker is done
defer wg.Done()
s.frameHandler(frameQueue, newHandler)
var (
partitionRoundRobin int
goAwayFrame *spdy.GoAwayFrame
for {
readFrame, err := s.framer.ReadFrame()
if err != nil {
if err != io.EOF {
fmt.Errorf("frame read error: %s", err)
} else {
debugMessage("(%p) EOF received", s)
var priority uint8
var partition int
switch frame := readFrame.(type) {
case *spdy.SynStreamFrame:
if s.checkStreamFrame(frame) {
priority = frame.Priority
partition = int(frame.StreamId % FRAME_WORKERS)
debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId)
} else {
debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId)
case *spdy.SynReplyFrame:
priority = s.getStreamPriority(frame.StreamId)
partition = int(frame.StreamId % FRAME_WORKERS)
case *spdy.DataFrame:
priority = s.getStreamPriority(frame.StreamId)
partition = int(frame.StreamId % FRAME_WORKERS)
case *spdy.RstStreamFrame:
priority = s.getStreamPriority(frame.StreamId)
partition = int(frame.StreamId % FRAME_WORKERS)
case *spdy.HeadersFrame:
priority = s.getStreamPriority(frame.StreamId)
partition = int(frame.StreamId % FRAME_WORKERS)
case *spdy.PingFrame:
priority = 0
partition = partitionRoundRobin
partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
case *spdy.GoAwayFrame:
// hold on to the go away frame and exit the loop
goAwayFrame = frame
break Loop
priority = 7
partition = partitionRoundRobin
partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
frameQueues[partition].Push(readFrame, priority)
// wait for all frame handler workers to indicate they've drained their queues
// before handling the go away frame
if goAwayFrame != nil {
// now it's safe to close remote channels and empty s.streams
// notify streams that they're now closed, which will
// unblock any stream Read() calls
for _, stream := range s.streams {
s.streams = make(map[spdy.StreamId]*Stream)
func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) {
for {
popFrame := frameQueue.Pop()
if popFrame == nil {
var frameErr error
switch frame := popFrame.(type) {
case *spdy.SynStreamFrame:
frameErr = s.handleStreamFrame(frame, newHandler)
case *spdy.SynReplyFrame:
frameErr = s.handleReplyFrame(frame)
case *spdy.DataFrame:
frameErr = s.dataFrameHandler(frame)
case *spdy.RstStreamFrame:
frameErr = s.handleResetFrame(frame)
case *spdy.HeadersFrame:
frameErr = s.handleHeaderFrame(frame)
case *spdy.PingFrame:
frameErr = s.handlePingFrame(frame)
case *spdy.GoAwayFrame:
frameErr = s.handleGoAwayFrame(frame)
frameErr = fmt.Errorf("unhandled frame type: %T", frame)
if frameErr != nil {
fmt.Errorf("frame handling error: %s", frameErr)
func (s *Connection) getStreamPriority(streamId spdy.StreamId) uint8 {
stream, streamOk := s.getStream(streamId)
if !streamOk {
return 7
return stream.priority
func (s *Connection) addStreamFrame(frame *spdy.SynStreamFrame) {
var parent *Stream
if frame.AssociatedToStreamId != spdy.StreamId(0) {
parent, _ = s.getStream(frame.AssociatedToStreamId)
stream := &Stream{
streamId: frame.StreamId,
parent: parent,
conn: s,
startChan: make(chan error),
headers: frame.Headers,
finished: (frame.CFHeader.Flags & spdy.ControlFlagUnidirectional) != 0x00,
replyCond: sync.NewCond(new(sync.Mutex)),
dataChan: make(chan []byte),
headerChan: make(chan http.Header),
closeChan: make(chan bool),
if frame.CFHeader.Flags&spdy.ControlFlagFin != 0x00 {
// checkStreamFrame checks to see if a stream frame is allowed.
// If the stream is invalid, then a reset frame with protocol error
// will be returned.
func (s *Connection) checkStreamFrame(frame *spdy.SynStreamFrame) bool {
defer s.receiveIdLock.Unlock()
if s.goneAway {
return false
validationErr := s.validateStreamId(frame.StreamId)
if validationErr != nil {
go func() {
resetErr := s.sendResetFrame(spdy.ProtocolError, frame.StreamId)
if resetErr != nil {
fmt.Errorf("reset error: %s", resetErr)
return false
return true
func (s *Connection) handleStreamFrame(frame *spdy.SynStreamFrame, newHandler StreamHandler) error {
stream, ok := s.getStream(frame.StreamId)
if !ok {
return fmt.Errorf("Missing stream: %d", frame.StreamId)
return nil
func (s *Connection) handleReplyFrame(frame *spdy.SynReplyFrame) error {
debugMessage("(%p) Reply frame received for %d", s, frame.StreamId)
stream, streamOk := s.getStream(frame.StreamId)
if !streamOk {
debugMessage("Reply frame gone away for %d", frame.StreamId)
// Stream has already gone away
return nil
if stream.replied {
// Stream has already received reply
return nil
stream.replied = true
// TODO Check for error
if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
return nil
func (s *Connection) handleResetFrame(frame *spdy.RstStreamFrame) error {
stream, streamOk := s.getStream(frame.StreamId)
if !streamOk {
// Stream has already been removed
return nil
if !stream.replied {
stream.replied = true
stream.startChan <- ErrReset
stream.finished = true
return nil
func (s *Connection) handleHeaderFrame(frame *spdy.HeadersFrame) error {
stream, streamOk := s.getStream(frame.StreamId)
if !streamOk {
// Stream has already gone away
return nil
if !stream.replied {
// No reply received...Protocol error?
return nil
// TODO limit headers while not blocking (use buffered chan or goroutine?)
select {
case <-stream.closeChan:
return nil
case stream.headerChan <- frame.Headers:
if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
return nil
func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error {
debugMessage("(%p) Data frame received for %d", s, frame.StreamId)
stream, streamOk := s.getStream(frame.StreamId)
if !streamOk {
debugMessage("(%p) Data frame gone away for %d", s, frame.StreamId)
// Stream has already gone away
return nil
if !stream.replied {
debugMessage("(%p) Data frame not replied %d", s, frame.StreamId)
// No reply received...Protocol error?
return nil
debugMessage("(%p) (%d) Data frame handling", stream, stream.streamId)
if len(frame.Data) > 0 {
select {
case <-stream.closeChan:
debugMessage("(%p) (%d) Data frame not sent (stream shut down)", stream, stream.streamId)
case stream.dataChan <- frame.Data:
debugMessage("(%p) (%d) Data frame sent", stream, stream.streamId)
if (frame.Flags & spdy.DataFlagFin) != 0x00 {
return nil
func (s *Connection) handlePingFrame(frame *spdy.PingFrame) error {
if s.pingId&0x01 != frame.Id&0x01 {
return s.framer.WriteFrame(frame)
pingChan, pingOk := s.pingChans[frame.Id]
if pingOk {
return nil
func (s *Connection) handleGoAwayFrame(frame *spdy.GoAwayFrame) error {
debugMessage("(%p) Go away received", s)
if s.goneAway {
return nil
s.goneAway = true
if s.lastStreamChan != nil {
stream, _ := s.getStream(frame.LastGoodStreamId)
go func() {
s.lastStreamChan <- stream
// Do not block frame handler waiting for closure
go s.shutdown(s.goAwayTimeout)
return nil
func (s *Connection) remoteStreamFinish(stream *Stream) {
if stream.finished {
// Stream is fully closed, cleanup
// CreateStream creates a new spdy stream using the parameters for
// creating the stream frame. The stream frame will be sent upon
// calling this function, however this function does not wait for
// the reply frame. If waiting for the reply is desired, use
// the stream Wait or WaitTimeout function on the stream returned
// by this function.
func (s *Connection) CreateStream(headers http.Header, parent *Stream, fin bool) (*Stream, error) {
// MUST synchronize stream creation (all the way to writing the frame)
// as stream IDs **MUST** increase monotonically.
defer s.nextIdLock.Unlock()
streamId := s.getNextStreamId()
if streamId == 0 {
return nil, fmt.Errorf("Unable to get new stream id")
stream := &Stream{
streamId: streamId,
parent: parent,
conn: s,
startChan: make(chan error),
headers: headers,
dataChan: make(chan []byte),
headerChan: make(chan http.Header),
closeChan: make(chan bool),
debugMessage("(%p) (%p) Create stream", s, stream)
return stream, s.sendStream(stream, fin)
func (s *Connection) shutdown(closeTimeout time.Duration) {
// TODO Ensure this isn't called multiple times
if s.hasShutdown {
s.hasShutdown = true
var timeout <-chan time.Time
if closeTimeout > time.Duration(0) {
timeout = time.After(closeTimeout)
streamsClosed := make(chan bool)
go func() {
for len(s.streams) > 0 {
debugMessage("Streams opened: %d, %#v", len(s.streams), s.streams)
var err error
select {
case <-streamsClosed:
// No active streams, close should be safe
err = s.conn.Close()
case <-timeout:
// Force ungraceful close
err = s.conn.Close()
// Wait for cleanup to clear active streams
if err != nil {
duration := 10 * time.Minute
time.AfterFunc(duration, func() {
select {
case err, ok := <-s.shutdownChan:
if ok {
fmt.Errorf("Unhandled close error after %s: %s", duration, err)
s.shutdownChan <- err
// Closes spdy connection by sending GoAway frame and initiating shutdown
func (s *Connection) Close() error {
if s.goneAway {
return nil
s.goneAway = true
var lastStreamId spdy.StreamId
if s.receivedStreamId > 2 {
lastStreamId = s.receivedStreamId - 2
goAwayFrame := &spdy.GoAwayFrame{
LastGoodStreamId: lastStreamId,
Status: spdy.GoAwayOK,
err := s.framer.WriteFrame(goAwayFrame)
if err != nil {
return err
go s.shutdown(s.closeTimeout)
return nil
// CloseWait closes the connection and waits for shutdown
// to finish. Note the underlying network Connection
// is not closed until the end of shutdown.
func (s *Connection) CloseWait() error {
closeErr := s.Close()
if closeErr != nil {
return closeErr
shutdownErr, ok := <-s.shutdownChan
if ok {
return shutdownErr
return nil
// Wait waits for the connection to finish shutdown or for
// the wait timeout duration to expire. This needs to be
// called either after Close has been called or the GOAWAYFRAME
// has been received. If the wait timeout is 0, this function
// will block until shutdown finishes. If wait is never called
// and a shutdown error occurs, that error will be logged as an
// unhandled error.
func (s *Connection) Wait(waitTimeout time.Duration) error {
var timeout <-chan time.Time
if waitTimeout > time.Duration(0) {
timeout = time.After(waitTimeout)
select {
case err, ok := <-s.shutdownChan:
if ok {
return err
case <-timeout:
return ErrTimeout
return nil
// NotifyClose registers a channel to be called when the remote
// peer inidicates connection closure. The last stream to be
// received by the remote will be sent on the channel. The notify
// timeout will determine the duration between go away received
// and the connection being closed.
func (s *Connection) NotifyClose(c chan<- *Stream, timeout time.Duration) {
s.goAwayTimeout = timeout
s.lastStreamChan = c
// SetCloseTimeout sets the amount of time close will wait for
// streams to finish before terminating the underlying network
// connection. Setting the timeout to 0 will cause close to
// wait forever, which is the default.
func (s *Connection) SetCloseTimeout(timeout time.Duration) {
s.closeTimeout = timeout
// SetIdleTimeout sets the amount of time the connection may sit idle before
// it is forcefully terminated.
func (s *Connection) SetIdleTimeout(timeout time.Duration) {
func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool) error {
var flags spdy.ControlFlags
if fin {
flags = spdy.ControlFlagFin
headerFrame := &spdy.HeadersFrame{
StreamId: stream.streamId,
Headers: headers,
CFHeader: spdy.ControlFrameHeader{Flags: flags},
return s.framer.WriteFrame(headerFrame)
func (s *Connection) sendReply(headers http.Header, stream *Stream, fin bool) error {
var flags spdy.ControlFlags
if fin {
flags = spdy.ControlFlagFin
replyFrame := &spdy.SynReplyFrame{
StreamId: stream.streamId,
Headers: headers,
CFHeader: spdy.ControlFrameHeader{Flags: flags},
return s.framer.WriteFrame(replyFrame)
func (s *Connection) sendResetFrame(status spdy.RstStreamStatus, streamId spdy.StreamId) error {
resetFrame := &spdy.RstStreamFrame{
StreamId: streamId,
Status: status,
return s.framer.WriteFrame(resetFrame)
func (s *Connection) sendReset(status spdy.RstStreamStatus, stream *Stream) error {
return s.sendResetFrame(status, stream.streamId)
func (s *Connection) sendStream(stream *Stream, fin bool) error {
var flags spdy.ControlFlags
if fin {
flags = spdy.ControlFlagFin
stream.finished = true
var parentId spdy.StreamId
if stream.parent != nil {
parentId = stream.parent.streamId
streamFrame := &spdy.SynStreamFrame{
StreamId: spdy.StreamId(stream.streamId),
AssociatedToStreamId: spdy.StreamId(parentId),
Headers: stream.headers,
CFHeader: spdy.ControlFrameHeader{Flags: flags},
return s.framer.WriteFrame(streamFrame)
// getNextStreamId returns the next sequential id
// every call should produce a unique value or an error
func (s *Connection) getNextStreamId() spdy.StreamId {
sid := s.nextStreamId
if sid > 0x7fffffff {
return 0
s.nextStreamId = s.nextStreamId + 2
return sid
// PeekNextStreamId returns the next sequential id and keeps the next id untouched
func (s *Connection) PeekNextStreamId() spdy.StreamId {
sid := s.nextStreamId
return sid
func (s *Connection) validateStreamId(rid spdy.StreamId) error {
if rid > 0x7fffffff || rid < s.receivedStreamId {
return ErrInvalidStreamId
s.receivedStreamId = rid + 2
return nil
func (s *Connection) addStream(stream *Stream) {
s.streams[stream.streamId] = stream
debugMessage("(%p) (%p) Stream added, broadcasting: %d", s, stream, stream.streamId)
func (s *Connection) removeStream(stream *Stream) {
delete(s.streams, stream.streamId)
debugMessage("(%p) (%p) Stream removed, broadcasting: %d", s, stream, stream.streamId)
func (s *Connection) getStream(streamId spdy.StreamId) (stream *Stream, ok bool) {
stream, ok = s.streams[streamId]
// FindStream looks up the given stream id and either waits for the
// stream to be found or returns nil if the stream id is no longer
// valid.
func (s *Connection) FindStream(streamId uint32) *Stream {
var stream *Stream
var ok bool
stream, ok = s.streams[spdy.StreamId(streamId)]
debugMessage("(%p) Found stream %d? %t", s, spdy.StreamId(streamId), ok)
for !ok && streamId >= uint32(s.receivedStreamId) {
stream, ok = s.streams[spdy.StreamId(streamId)]
return stream
func (s *Connection) CloseChan() <-chan bool {
return s.closeChan