mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-07 04:19:30 +00:00
960 lines
24 KiB
Go
960 lines
24 KiB
Go
|
package spdystream
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/docker/spdystream/spdy"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ErrInvalidStreamId = errors.New("Invalid stream id")
|
||
|
ErrTimeout = errors.New("Timeout occurred")
|
||
|
ErrReset = errors.New("Stream reset")
|
||
|
ErrWriteClosedStream = errors.New("Write on closed stream")
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
FRAME_WORKERS = 5
|
||
|
QUEUE_SIZE = 50
|
||
|
)
|
||
|
|
||
|
type StreamHandler func(stream *Stream)
|
||
|
|
||
|
type AuthHandler func(header http.Header, slot uint8, parent uint32) bool
|
||
|
|
||
|
type idleAwareFramer struct {
|
||
|
f *spdy.Framer
|
||
|
conn *Connection
|
||
|
writeLock sync.Mutex
|
||
|
resetChan chan struct{}
|
||
|
setTimeoutLock sync.Mutex
|
||
|
setTimeoutChan chan time.Duration
|
||
|
timeout time.Duration
|
||
|
}
|
||
|
|
||
|
func newIdleAwareFramer(framer *spdy.Framer) *idleAwareFramer {
|
||
|
iaf := &idleAwareFramer{
|
||
|
f: framer,
|
||
|
resetChan: make(chan struct{}, 2),
|
||
|
// setTimeoutChan needs to be buffered to avoid deadlocks when calling setIdleTimeout at about
|
||
|
// the same time the connection is being closed
|
||
|
setTimeoutChan: make(chan time.Duration, 1),
|
||
|
}
|
||
|
return iaf
|
||
|
}
|
||
|
|
||
|
func (i *idleAwareFramer) monitor() {
|
||
|
var (
|
||
|
timer *time.Timer
|
||
|
expired <-chan time.Time
|
||
|
resetChan = i.resetChan
|
||
|
setTimeoutChan = i.setTimeoutChan
|
||
|
)
|
||
|
Loop:
|
||
|
for {
|
||
|
select {
|
||
|
case timeout := <-i.setTimeoutChan:
|
||
|
i.timeout = timeout
|
||
|
if timeout == 0 {
|
||
|
if timer != nil {
|
||
|
timer.Stop()
|
||
|
}
|
||
|
} else {
|
||
|
if timer == nil {
|
||
|
timer = time.NewTimer(timeout)
|
||
|
expired = timer.C
|
||
|
} else {
|
||
|
timer.Reset(timeout)
|
||
|
}
|
||
|
}
|
||
|
case <-resetChan:
|
||
|
if timer != nil && i.timeout > 0 {
|
||
|
timer.Reset(i.timeout)
|
||
|
}
|
||
|
case <-expired:
|
||
|
i.conn.streamCond.L.Lock()
|
||
|
streams := i.conn.streams
|
||
|
i.conn.streams = make(map[spdy.StreamId]*Stream)
|
||
|
i.conn.streamCond.Broadcast()
|
||
|
i.conn.streamCond.L.Unlock()
|
||
|
go func() {
|
||
|
for _, stream := range streams {
|
||
|
stream.resetStream()
|
||
|
}
|
||
|
i.conn.Close()
|
||
|
}()
|
||
|
case <-i.conn.closeChan:
|
||
|
if timer != nil {
|
||
|
timer.Stop()
|
||
|
}
|
||
|
|
||
|
// Start a goroutine to drain resetChan. This is needed because we've seen
|
||
|
// some unit tests with large numbers of goroutines get into a situation
|
||
|
// where resetChan fills up, at least 1 call to Write() is still trying to
|
||
|
// send to resetChan, the connection gets closed, and this case statement
|
||
|
// attempts to grab the write lock that Write() already has, causing a
|
||
|
// deadlock.
|
||
|
//
|
||
|
// See https://github.com/docker/spdystream/issues/49 for more details.
|
||
|
go func() {
|
||
|
for _ = range resetChan {
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
go func() {
|
||
|
for _ = range setTimeoutChan {
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
i.writeLock.Lock()
|
||
|
close(resetChan)
|
||
|
i.resetChan = nil
|
||
|
i.writeLock.Unlock()
|
||
|
|
||
|
i.setTimeoutLock.Lock()
|
||
|
close(i.setTimeoutChan)
|
||
|
i.setTimeoutChan = nil
|
||
|
i.setTimeoutLock.Unlock()
|
||
|
|
||
|
break Loop
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Drain resetChan
|
||
|
for _ = range resetChan {
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (i *idleAwareFramer) WriteFrame(frame spdy.Frame) error {
|
||
|
i.writeLock.Lock()
|
||
|
defer i.writeLock.Unlock()
|
||
|
if i.resetChan == nil {
|
||
|
return io.EOF
|
||
|
}
|
||
|
err := i.f.WriteFrame(frame)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
i.resetChan <- struct{}{}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (i *idleAwareFramer) ReadFrame() (spdy.Frame, error) {
|
||
|
frame, err := i.f.ReadFrame()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
// resetChan should never be closed since it is only closed
|
||
|
// when the connection has closed its closeChan. This closure
|
||
|
// only occurs after all Reads have finished
|
||
|
// TODO (dmcgowan): refactor relationship into connection
|
||
|
i.resetChan <- struct{}{}
|
||
|
|
||
|
return frame, nil
|
||
|
}
|
||
|
|
||
|
func (i *idleAwareFramer) setIdleTimeout(timeout time.Duration) {
|
||
|
i.setTimeoutLock.Lock()
|
||
|
defer i.setTimeoutLock.Unlock()
|
||
|
|
||
|
if i.setTimeoutChan == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
i.setTimeoutChan <- timeout
|
||
|
}
|
||
|
|
||
|
type Connection struct {
|
||
|
conn net.Conn
|
||
|
framer *idleAwareFramer
|
||
|
|
||
|
closeChan chan bool
|
||
|
goneAway bool
|
||
|
lastStreamChan chan<- *Stream
|
||
|
goAwayTimeout time.Duration
|
||
|
closeTimeout time.Duration
|
||
|
|
||
|
streamLock *sync.RWMutex
|
||
|
streamCond *sync.Cond
|
||
|
streams map[spdy.StreamId]*Stream
|
||
|
|
||
|
nextIdLock sync.Mutex
|
||
|
receiveIdLock sync.Mutex
|
||
|
nextStreamId spdy.StreamId
|
||
|
receivedStreamId spdy.StreamId
|
||
|
|
||
|
pingIdLock sync.Mutex
|
||
|
pingId uint32
|
||
|
pingChans map[uint32]chan error
|
||
|
|
||
|
shutdownLock sync.Mutex
|
||
|
shutdownChan chan error
|
||
|
hasShutdown bool
|
||
|
|
||
|
// for testing https://github.com/docker/spdystream/pull/56
|
||
|
dataFrameHandler func(*spdy.DataFrame) error
|
||
|
}
|
||
|
|
||
|
// NewConnection creates a new spdy connection from an existing
|
||
|
// network connection.
|
||
|
func NewConnection(conn net.Conn, server bool) (*Connection, error) {
|
||
|
framer, framerErr := spdy.NewFramer(conn, conn)
|
||
|
if framerErr != nil {
|
||
|
return nil, framerErr
|
||
|
}
|
||
|
idleAwareFramer := newIdleAwareFramer(framer)
|
||
|
var sid spdy.StreamId
|
||
|
var rid spdy.StreamId
|
||
|
var pid uint32
|
||
|
if server {
|
||
|
sid = 2
|
||
|
rid = 1
|
||
|
pid = 2
|
||
|
} else {
|
||
|
sid = 1
|
||
|
rid = 2
|
||
|
pid = 1
|
||
|
}
|
||
|
|
||
|
streamLock := new(sync.RWMutex)
|
||
|
streamCond := sync.NewCond(streamLock)
|
||
|
|
||
|
session := &Connection{
|
||
|
conn: conn,
|
||
|
framer: idleAwareFramer,
|
||
|
|
||
|
closeChan: make(chan bool),
|
||
|
goAwayTimeout: time.Duration(0),
|
||
|
closeTimeout: time.Duration(0),
|
||
|
|
||
|
streamLock: streamLock,
|
||
|
streamCond: streamCond,
|
||
|
streams: make(map[spdy.StreamId]*Stream),
|
||
|
nextStreamId: sid,
|
||
|
receivedStreamId: rid,
|
||
|
|
||
|
pingId: pid,
|
||
|
pingChans: make(map[uint32]chan error),
|
||
|
|
||
|
shutdownChan: make(chan error),
|
||
|
}
|
||
|
session.dataFrameHandler = session.handleDataFrame
|
||
|
idleAwareFramer.conn = session
|
||
|
go idleAwareFramer.monitor()
|
||
|
|
||
|
return session, nil
|
||
|
}
|
||
|
|
||
|
// Ping sends a ping frame across the connection and
|
||
|
// returns the response time
|
||
|
func (s *Connection) Ping() (time.Duration, error) {
|
||
|
pid := s.pingId
|
||
|
s.pingIdLock.Lock()
|
||
|
if s.pingId > 0x7ffffffe {
|
||
|
s.pingId = s.pingId - 0x7ffffffe
|
||
|
} else {
|
||
|
s.pingId = s.pingId + 2
|
||
|
}
|
||
|
s.pingIdLock.Unlock()
|
||
|
pingChan := make(chan error)
|
||
|
s.pingChans[pid] = pingChan
|
||
|
defer delete(s.pingChans, pid)
|
||
|
|
||
|
frame := &spdy.PingFrame{Id: pid}
|
||
|
startTime := time.Now()
|
||
|
writeErr := s.framer.WriteFrame(frame)
|
||
|
if writeErr != nil {
|
||
|
return time.Duration(0), writeErr
|
||
|
}
|
||
|
select {
|
||
|
case <-s.closeChan:
|
||
|
return time.Duration(0), errors.New("connection closed")
|
||
|
case err, ok := <-pingChan:
|
||
|
if ok && err != nil {
|
||
|
return time.Duration(0), err
|
||
|
}
|
||
|
break
|
||
|
}
|
||
|
return time.Now().Sub(startTime), nil
|
||
|
}
|
||
|
|
||
|
// Serve handles frames sent from the server, including reply frames
|
||
|
// which are needed to fully initiate connections. Both clients and servers
|
||
|
// should call Serve in a separate goroutine before creating streams.
|
||
|
func (s *Connection) Serve(newHandler StreamHandler) {
|
||
|
// use a WaitGroup to wait for all frames to be drained after receiving
|
||
|
// go-away.
|
||
|
var wg sync.WaitGroup
|
||
|
|
||
|
// Parition queues to ensure stream frames are handled
|
||
|
// by the same worker, ensuring order is maintained
|
||
|
frameQueues := make([]*PriorityFrameQueue, FRAME_WORKERS)
|
||
|
for i := 0; i < FRAME_WORKERS; i++ {
|
||
|
frameQueues[i] = NewPriorityFrameQueue(QUEUE_SIZE)
|
||
|
|
||
|
// Ensure frame queue is drained when connection is closed
|
||
|
go func(frameQueue *PriorityFrameQueue) {
|
||
|
<-s.closeChan
|
||
|
frameQueue.Drain()
|
||
|
}(frameQueues[i])
|
||
|
|
||
|
wg.Add(1)
|
||
|
go func(frameQueue *PriorityFrameQueue) {
|
||
|
// let the WaitGroup know this worker is done
|
||
|
defer wg.Done()
|
||
|
|
||
|
s.frameHandler(frameQueue, newHandler)
|
||
|
}(frameQueues[i])
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
partitionRoundRobin int
|
||
|
goAwayFrame *spdy.GoAwayFrame
|
||
|
)
|
||
|
Loop:
|
||
|
for {
|
||
|
readFrame, err := s.framer.ReadFrame()
|
||
|
if err != nil {
|
||
|
if err != io.EOF {
|
||
|
debugMessage("frame read error: %s", err)
|
||
|
} else {
|
||
|
debugMessage("(%p) EOF received", s)
|
||
|
}
|
||
|
break
|
||
|
}
|
||
|
var priority uint8
|
||
|
var partition int
|
||
|
switch frame := readFrame.(type) {
|
||
|
case *spdy.SynStreamFrame:
|
||
|
if s.checkStreamFrame(frame) {
|
||
|
priority = frame.Priority
|
||
|
partition = int(frame.StreamId % FRAME_WORKERS)
|
||
|
debugMessage("(%p) Add stream frame: %d ", s, frame.StreamId)
|
||
|
s.addStreamFrame(frame)
|
||
|
} else {
|
||
|
debugMessage("(%p) Rejected stream frame: %d ", s, frame.StreamId)
|
||
|
continue
|
||
|
}
|
||
|
case *spdy.SynReplyFrame:
|
||
|
priority = s.getStreamPriority(frame.StreamId)
|
||
|
partition = int(frame.StreamId % FRAME_WORKERS)
|
||
|
case *spdy.DataFrame:
|
||
|
priority = s.getStreamPriority(frame.StreamId)
|
||
|
partition = int(frame.StreamId % FRAME_WORKERS)
|
||
|
case *spdy.RstStreamFrame:
|
||
|
priority = s.getStreamPriority(frame.StreamId)
|
||
|
partition = int(frame.StreamId % FRAME_WORKERS)
|
||
|
case *spdy.HeadersFrame:
|
||
|
priority = s.getStreamPriority(frame.StreamId)
|
||
|
partition = int(frame.StreamId % FRAME_WORKERS)
|
||
|
case *spdy.PingFrame:
|
||
|
priority = 0
|
||
|
partition = partitionRoundRobin
|
||
|
partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
|
||
|
case *spdy.GoAwayFrame:
|
||
|
// hold on to the go away frame and exit the loop
|
||
|
goAwayFrame = frame
|
||
|
break Loop
|
||
|
default:
|
||
|
priority = 7
|
||
|
partition = partitionRoundRobin
|
||
|
partitionRoundRobin = (partitionRoundRobin + 1) % FRAME_WORKERS
|
||
|
}
|
||
|
frameQueues[partition].Push(readFrame, priority)
|
||
|
}
|
||
|
close(s.closeChan)
|
||
|
|
||
|
// wait for all frame handler workers to indicate they've drained their queues
|
||
|
// before handling the go away frame
|
||
|
wg.Wait()
|
||
|
|
||
|
if goAwayFrame != nil {
|
||
|
s.handleGoAwayFrame(goAwayFrame)
|
||
|
}
|
||
|
|
||
|
// now it's safe to close remote channels and empty s.streams
|
||
|
s.streamCond.L.Lock()
|
||
|
// notify streams that they're now closed, which will
|
||
|
// unblock any stream Read() calls
|
||
|
for _, stream := range s.streams {
|
||
|
stream.closeRemoteChannels()
|
||
|
}
|
||
|
s.streams = make(map[spdy.StreamId]*Stream)
|
||
|
s.streamCond.Broadcast()
|
||
|
s.streamCond.L.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *Connection) frameHandler(frameQueue *PriorityFrameQueue, newHandler StreamHandler) {
|
||
|
for {
|
||
|
popFrame := frameQueue.Pop()
|
||
|
if popFrame == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
var frameErr error
|
||
|
switch frame := popFrame.(type) {
|
||
|
case *spdy.SynStreamFrame:
|
||
|
frameErr = s.handleStreamFrame(frame, newHandler)
|
||
|
case *spdy.SynReplyFrame:
|
||
|
frameErr = s.handleReplyFrame(frame)
|
||
|
case *spdy.DataFrame:
|
||
|
frameErr = s.dataFrameHandler(frame)
|
||
|
case *spdy.RstStreamFrame:
|
||
|
frameErr = s.handleResetFrame(frame)
|
||
|
case *spdy.HeadersFrame:
|
||
|
frameErr = s.handleHeaderFrame(frame)
|
||
|
case *spdy.PingFrame:
|
||
|
frameErr = s.handlePingFrame(frame)
|
||
|
case *spdy.GoAwayFrame:
|
||
|
frameErr = s.handleGoAwayFrame(frame)
|
||
|
default:
|
||
|
frameErr = fmt.Errorf("unhandled frame type: %T", frame)
|
||
|
}
|
||
|
|
||
|
if frameErr != nil {
|
||
|
debugMessage("frame handling error: %s", frameErr)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *Connection) getStreamPriority(streamId spdy.StreamId) uint8 {
|
||
|
stream, streamOk := s.getStream(streamId)
|
||
|
if !streamOk {
|
||
|
return 7
|
||
|
}
|
||
|
return stream.priority
|
||
|
}
|
||
|
|
||
|
func (s *Connection) addStreamFrame(frame *spdy.SynStreamFrame) {
|
||
|
var parent *Stream
|
||
|
if frame.AssociatedToStreamId != spdy.StreamId(0) {
|
||
|
parent, _ = s.getStream(frame.AssociatedToStreamId)
|
||
|
}
|
||
|
|
||
|
stream := &Stream{
|
||
|
streamId: frame.StreamId,
|
||
|
parent: parent,
|
||
|
conn: s,
|
||
|
startChan: make(chan error),
|
||
|
headers: frame.Headers,
|
||
|
finished: (frame.CFHeader.Flags & spdy.ControlFlagUnidirectional) != 0x00,
|
||
|
replyCond: sync.NewCond(new(sync.Mutex)),
|
||
|
dataChan: make(chan []byte),
|
||
|
headerChan: make(chan http.Header),
|
||
|
closeChan: make(chan bool),
|
||
|
priority: frame.Priority,
|
||
|
}
|
||
|
if frame.CFHeader.Flags&spdy.ControlFlagFin != 0x00 {
|
||
|
stream.closeRemoteChannels()
|
||
|
}
|
||
|
|
||
|
s.addStream(stream)
|
||
|
}
|
||
|
|
||
|
// checkStreamFrame checks to see if a stream frame is allowed.
|
||
|
// If the stream is invalid, then a reset frame with protocol error
|
||
|
// will be returned.
|
||
|
func (s *Connection) checkStreamFrame(frame *spdy.SynStreamFrame) bool {
|
||
|
s.receiveIdLock.Lock()
|
||
|
defer s.receiveIdLock.Unlock()
|
||
|
if s.goneAway {
|
||
|
return false
|
||
|
}
|
||
|
validationErr := s.validateStreamId(frame.StreamId)
|
||
|
if validationErr != nil {
|
||
|
go func() {
|
||
|
resetErr := s.sendResetFrame(spdy.ProtocolError, frame.StreamId)
|
||
|
if resetErr != nil {
|
||
|
debugMessage("reset error: %s", resetErr)
|
||
|
}
|
||
|
}()
|
||
|
return false
|
||
|
}
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
func (s *Connection) handleStreamFrame(frame *spdy.SynStreamFrame, newHandler StreamHandler) error {
|
||
|
stream, ok := s.getStream(frame.StreamId)
|
||
|
if !ok {
|
||
|
return fmt.Errorf("Missing stream: %d", frame.StreamId)
|
||
|
}
|
||
|
|
||
|
newHandler(stream)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Connection) handleReplyFrame(frame *spdy.SynReplyFrame) error {
|
||
|
debugMessage("(%p) Reply frame received for %d", s, frame.StreamId)
|
||
|
stream, streamOk := s.getStream(frame.StreamId)
|
||
|
if !streamOk {
|
||
|
debugMessage("Reply frame gone away for %d", frame.StreamId)
|
||
|
// Stream has already gone away
|
||
|
return nil
|
||
|
}
|
||
|
if stream.replied {
|
||
|
// Stream has already received reply
|
||
|
return nil
|
||
|
}
|
||
|
stream.replied = true
|
||
|
|
||
|
// TODO Check for error
|
||
|
if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
|
||
|
s.remoteStreamFinish(stream)
|
||
|
}
|
||
|
|
||
|
close(stream.startChan)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Connection) handleResetFrame(frame *spdy.RstStreamFrame) error {
|
||
|
stream, streamOk := s.getStream(frame.StreamId)
|
||
|
if !streamOk {
|
||
|
// Stream has already been removed
|
||
|
return nil
|
||
|
}
|
||
|
s.removeStream(stream)
|
||
|
stream.closeRemoteChannels()
|
||
|
|
||
|
if !stream.replied {
|
||
|
stream.replied = true
|
||
|
stream.startChan <- ErrReset
|
||
|
close(stream.startChan)
|
||
|
}
|
||
|
|
||
|
stream.finishLock.Lock()
|
||
|
stream.finished = true
|
||
|
stream.finishLock.Unlock()
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Connection) handleHeaderFrame(frame *spdy.HeadersFrame) error {
|
||
|
stream, streamOk := s.getStream(frame.StreamId)
|
||
|
if !streamOk {
|
||
|
// Stream has already gone away
|
||
|
return nil
|
||
|
}
|
||
|
if !stream.replied {
|
||
|
// No reply received...Protocol error?
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// TODO limit headers while not blocking (use buffered chan or goroutine?)
|
||
|
select {
|
||
|
case <-stream.closeChan:
|
||
|
return nil
|
||
|
case stream.headerChan <- frame.Headers:
|
||
|
}
|
||
|
|
||
|
if (frame.CFHeader.Flags & spdy.ControlFlagFin) != 0x00 {
|
||
|
s.remoteStreamFinish(stream)
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Connection) handleDataFrame(frame *spdy.DataFrame) error {
|
||
|
debugMessage("(%p) Data frame received for %d", s, frame.StreamId)
|
||
|
stream, streamOk := s.getStream(frame.StreamId)
|
||
|
if !streamOk {
|
||
|
debugMessage("(%p) Data frame gone away for %d", s, frame.StreamId)
|
||
|
// Stream has already gone away
|
||
|
return nil
|
||
|
}
|
||
|
if !stream.replied {
|
||
|
debugMessage("(%p) Data frame not replied %d", s, frame.StreamId)
|
||
|
// No reply received...Protocol error?
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
debugMessage("(%p) (%d) Data frame handling", stream, stream.streamId)
|
||
|
if len(frame.Data) > 0 {
|
||
|
stream.dataLock.RLock()
|
||
|
select {
|
||
|
case <-stream.closeChan:
|
||
|
debugMessage("(%p) (%d) Data frame not sent (stream shut down)", stream, stream.streamId)
|
||
|
case stream.dataChan <- frame.Data:
|
||
|
debugMessage("(%p) (%d) Data frame sent", stream, stream.streamId)
|
||
|
}
|
||
|
stream.dataLock.RUnlock()
|
||
|
}
|
||
|
if (frame.Flags & spdy.DataFlagFin) != 0x00 {
|
||
|
s.remoteStreamFinish(stream)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Connection) handlePingFrame(frame *spdy.PingFrame) error {
|
||
|
if s.pingId&0x01 != frame.Id&0x01 {
|
||
|
return s.framer.WriteFrame(frame)
|
||
|
}
|
||
|
pingChan, pingOk := s.pingChans[frame.Id]
|
||
|
if pingOk {
|
||
|
close(pingChan)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Connection) handleGoAwayFrame(frame *spdy.GoAwayFrame) error {
|
||
|
debugMessage("(%p) Go away received", s)
|
||
|
s.receiveIdLock.Lock()
|
||
|
if s.goneAway {
|
||
|
s.receiveIdLock.Unlock()
|
||
|
return nil
|
||
|
}
|
||
|
s.goneAway = true
|
||
|
s.receiveIdLock.Unlock()
|
||
|
|
||
|
if s.lastStreamChan != nil {
|
||
|
stream, _ := s.getStream(frame.LastGoodStreamId)
|
||
|
go func() {
|
||
|
s.lastStreamChan <- stream
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
// Do not block frame handler waiting for closure
|
||
|
go s.shutdown(s.goAwayTimeout)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Connection) remoteStreamFinish(stream *Stream) {
|
||
|
stream.closeRemoteChannels()
|
||
|
|
||
|
stream.finishLock.Lock()
|
||
|
if stream.finished {
|
||
|
// Stream is fully closed, cleanup
|
||
|
s.removeStream(stream)
|
||
|
}
|
||
|
stream.finishLock.Unlock()
|
||
|
}
|
||
|
|
||
|
// CreateStream creates a new spdy stream using the parameters for
|
||
|
// creating the stream frame. The stream frame will be sent upon
|
||
|
// calling this function, however this function does not wait for
|
||
|
// the reply frame. If waiting for the reply is desired, use
|
||
|
// the stream Wait or WaitTimeout function on the stream returned
|
||
|
// by this function.
|
||
|
func (s *Connection) CreateStream(headers http.Header, parent *Stream, fin bool) (*Stream, error) {
|
||
|
// MUST synchronize stream creation (all the way to writing the frame)
|
||
|
// as stream IDs **MUST** increase monotonically.
|
||
|
s.nextIdLock.Lock()
|
||
|
defer s.nextIdLock.Unlock()
|
||
|
|
||
|
streamId := s.getNextStreamId()
|
||
|
if streamId == 0 {
|
||
|
return nil, fmt.Errorf("Unable to get new stream id")
|
||
|
}
|
||
|
|
||
|
stream := &Stream{
|
||
|
streamId: streamId,
|
||
|
parent: parent,
|
||
|
conn: s,
|
||
|
startChan: make(chan error),
|
||
|
headers: headers,
|
||
|
dataChan: make(chan []byte),
|
||
|
headerChan: make(chan http.Header),
|
||
|
closeChan: make(chan bool),
|
||
|
}
|
||
|
|
||
|
debugMessage("(%p) (%p) Create stream", s, stream)
|
||
|
|
||
|
s.addStream(stream)
|
||
|
|
||
|
return stream, s.sendStream(stream, fin)
|
||
|
}
|
||
|
|
||
|
func (s *Connection) shutdown(closeTimeout time.Duration) {
|
||
|
// TODO Ensure this isn't called multiple times
|
||
|
s.shutdownLock.Lock()
|
||
|
if s.hasShutdown {
|
||
|
s.shutdownLock.Unlock()
|
||
|
return
|
||
|
}
|
||
|
s.hasShutdown = true
|
||
|
s.shutdownLock.Unlock()
|
||
|
|
||
|
var timeout <-chan time.Time
|
||
|
if closeTimeout > time.Duration(0) {
|
||
|
timeout = time.After(closeTimeout)
|
||
|
}
|
||
|
streamsClosed := make(chan bool)
|
||
|
|
||
|
go func() {
|
||
|
s.streamCond.L.Lock()
|
||
|
for len(s.streams) > 0 {
|
||
|
debugMessage("Streams opened: %d, %#v", len(s.streams), s.streams)
|
||
|
s.streamCond.Wait()
|
||
|
}
|
||
|
s.streamCond.L.Unlock()
|
||
|
close(streamsClosed)
|
||
|
}()
|
||
|
|
||
|
var err error
|
||
|
select {
|
||
|
case <-streamsClosed:
|
||
|
// No active streams, close should be safe
|
||
|
err = s.conn.Close()
|
||
|
case <-timeout:
|
||
|
// Force ungraceful close
|
||
|
err = s.conn.Close()
|
||
|
// Wait for cleanup to clear active streams
|
||
|
<-streamsClosed
|
||
|
}
|
||
|
|
||
|
if err != nil {
|
||
|
duration := 10 * time.Minute
|
||
|
time.AfterFunc(duration, func() {
|
||
|
select {
|
||
|
case err, ok := <-s.shutdownChan:
|
||
|
if ok {
|
||
|
debugMessage("Unhandled close error after %s: %s", duration, err)
|
||
|
}
|
||
|
default:
|
||
|
}
|
||
|
})
|
||
|
s.shutdownChan <- err
|
||
|
}
|
||
|
close(s.shutdownChan)
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Closes spdy connection by sending GoAway frame and initiating shutdown
|
||
|
func (s *Connection) Close() error {
|
||
|
s.receiveIdLock.Lock()
|
||
|
if s.goneAway {
|
||
|
s.receiveIdLock.Unlock()
|
||
|
return nil
|
||
|
}
|
||
|
s.goneAway = true
|
||
|
s.receiveIdLock.Unlock()
|
||
|
|
||
|
var lastStreamId spdy.StreamId
|
||
|
if s.receivedStreamId > 2 {
|
||
|
lastStreamId = s.receivedStreamId - 2
|
||
|
}
|
||
|
|
||
|
goAwayFrame := &spdy.GoAwayFrame{
|
||
|
LastGoodStreamId: lastStreamId,
|
||
|
Status: spdy.GoAwayOK,
|
||
|
}
|
||
|
|
||
|
err := s.framer.WriteFrame(goAwayFrame)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
go s.shutdown(s.closeTimeout)
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// CloseWait closes the connection and waits for shutdown
|
||
|
// to finish. Note the underlying network Connection
|
||
|
// is not closed until the end of shutdown.
|
||
|
func (s *Connection) CloseWait() error {
|
||
|
closeErr := s.Close()
|
||
|
if closeErr != nil {
|
||
|
return closeErr
|
||
|
}
|
||
|
shutdownErr, ok := <-s.shutdownChan
|
||
|
if ok {
|
||
|
return shutdownErr
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Wait waits for the connection to finish shutdown or for
|
||
|
// the wait timeout duration to expire. This needs to be
|
||
|
// called either after Close has been called or the GOAWAYFRAME
|
||
|
// has been received. If the wait timeout is 0, this function
|
||
|
// will block until shutdown finishes. If wait is never called
|
||
|
// and a shutdown error occurs, that error will be logged as an
|
||
|
// unhandled error.
|
||
|
func (s *Connection) Wait(waitTimeout time.Duration) error {
|
||
|
var timeout <-chan time.Time
|
||
|
if waitTimeout > time.Duration(0) {
|
||
|
timeout = time.After(waitTimeout)
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case err, ok := <-s.shutdownChan:
|
||
|
if ok {
|
||
|
return err
|
||
|
}
|
||
|
case <-timeout:
|
||
|
return ErrTimeout
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// NotifyClose registers a channel to be called when the remote
|
||
|
// peer inidicates connection closure. The last stream to be
|
||
|
// received by the remote will be sent on the channel. The notify
|
||
|
// timeout will determine the duration between go away received
|
||
|
// and the connection being closed.
|
||
|
func (s *Connection) NotifyClose(c chan<- *Stream, timeout time.Duration) {
|
||
|
s.goAwayTimeout = timeout
|
||
|
s.lastStreamChan = c
|
||
|
}
|
||
|
|
||
|
// SetCloseTimeout sets the amount of time close will wait for
|
||
|
// streams to finish before terminating the underlying network
|
||
|
// connection. Setting the timeout to 0 will cause close to
|
||
|
// wait forever, which is the default.
|
||
|
func (s *Connection) SetCloseTimeout(timeout time.Duration) {
|
||
|
s.closeTimeout = timeout
|
||
|
}
|
||
|
|
||
|
// SetIdleTimeout sets the amount of time the connection may sit idle before
|
||
|
// it is forcefully terminated.
|
||
|
func (s *Connection) SetIdleTimeout(timeout time.Duration) {
|
||
|
s.framer.setIdleTimeout(timeout)
|
||
|
}
|
||
|
|
||
|
func (s *Connection) sendHeaders(headers http.Header, stream *Stream, fin bool) error {
|
||
|
var flags spdy.ControlFlags
|
||
|
if fin {
|
||
|
flags = spdy.ControlFlagFin
|
||
|
}
|
||
|
|
||
|
headerFrame := &spdy.HeadersFrame{
|
||
|
StreamId: stream.streamId,
|
||
|
Headers: headers,
|
||
|
CFHeader: spdy.ControlFrameHeader{Flags: flags},
|
||
|
}
|
||
|
|
||
|
return s.framer.WriteFrame(headerFrame)
|
||
|
}
|
||
|
|
||
|
func (s *Connection) sendReply(headers http.Header, stream *Stream, fin bool) error {
|
||
|
var flags spdy.ControlFlags
|
||
|
if fin {
|
||
|
flags = spdy.ControlFlagFin
|
||
|
}
|
||
|
|
||
|
replyFrame := &spdy.SynReplyFrame{
|
||
|
StreamId: stream.streamId,
|
||
|
Headers: headers,
|
||
|
CFHeader: spdy.ControlFrameHeader{Flags: flags},
|
||
|
}
|
||
|
|
||
|
return s.framer.WriteFrame(replyFrame)
|
||
|
}
|
||
|
|
||
|
func (s *Connection) sendResetFrame(status spdy.RstStreamStatus, streamId spdy.StreamId) error {
|
||
|
resetFrame := &spdy.RstStreamFrame{
|
||
|
StreamId: streamId,
|
||
|
Status: status,
|
||
|
}
|
||
|
|
||
|
return s.framer.WriteFrame(resetFrame)
|
||
|
}
|
||
|
|
||
|
func (s *Connection) sendReset(status spdy.RstStreamStatus, stream *Stream) error {
|
||
|
return s.sendResetFrame(status, stream.streamId)
|
||
|
}
|
||
|
|
||
|
func (s *Connection) sendStream(stream *Stream, fin bool) error {
|
||
|
var flags spdy.ControlFlags
|
||
|
if fin {
|
||
|
flags = spdy.ControlFlagFin
|
||
|
stream.finished = true
|
||
|
}
|
||
|
|
||
|
var parentId spdy.StreamId
|
||
|
if stream.parent != nil {
|
||
|
parentId = stream.parent.streamId
|
||
|
}
|
||
|
|
||
|
streamFrame := &spdy.SynStreamFrame{
|
||
|
StreamId: spdy.StreamId(stream.streamId),
|
||
|
AssociatedToStreamId: spdy.StreamId(parentId),
|
||
|
Headers: stream.headers,
|
||
|
CFHeader: spdy.ControlFrameHeader{Flags: flags},
|
||
|
}
|
||
|
|
||
|
return s.framer.WriteFrame(streamFrame)
|
||
|
}
|
||
|
|
||
|
// getNextStreamId returns the next sequential id
|
||
|
// every call should produce a unique value or an error
|
||
|
func (s *Connection) getNextStreamId() spdy.StreamId {
|
||
|
sid := s.nextStreamId
|
||
|
if sid > 0x7fffffff {
|
||
|
return 0
|
||
|
}
|
||
|
s.nextStreamId = s.nextStreamId + 2
|
||
|
return sid
|
||
|
}
|
||
|
|
||
|
// PeekNextStreamId returns the next sequential id and keeps the next id untouched
|
||
|
func (s *Connection) PeekNextStreamId() spdy.StreamId {
|
||
|
sid := s.nextStreamId
|
||
|
return sid
|
||
|
}
|
||
|
|
||
|
func (s *Connection) validateStreamId(rid spdy.StreamId) error {
|
||
|
if rid > 0x7fffffff || rid < s.receivedStreamId {
|
||
|
return ErrInvalidStreamId
|
||
|
}
|
||
|
s.receivedStreamId = rid + 2
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *Connection) addStream(stream *Stream) {
|
||
|
s.streamCond.L.Lock()
|
||
|
s.streams[stream.streamId] = stream
|
||
|
debugMessage("(%p) (%p) Stream added, broadcasting: %d", s, stream, stream.streamId)
|
||
|
s.streamCond.Broadcast()
|
||
|
s.streamCond.L.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *Connection) removeStream(stream *Stream) {
|
||
|
s.streamCond.L.Lock()
|
||
|
delete(s.streams, stream.streamId)
|
||
|
debugMessage("(%p) (%p) Stream removed, broadcasting: %d", s, stream, stream.streamId)
|
||
|
s.streamCond.Broadcast()
|
||
|
s.streamCond.L.Unlock()
|
||
|
}
|
||
|
|
||
|
func (s *Connection) getStream(streamId spdy.StreamId) (stream *Stream, ok bool) {
|
||
|
s.streamLock.RLock()
|
||
|
stream, ok = s.streams[streamId]
|
||
|
s.streamLock.RUnlock()
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// FindStream looks up the given stream id and either waits for the
|
||
|
// stream to be found or returns nil if the stream id is no longer
|
||
|
// valid.
|
||
|
func (s *Connection) FindStream(streamId uint32) *Stream {
|
||
|
var stream *Stream
|
||
|
var ok bool
|
||
|
s.streamCond.L.Lock()
|
||
|
stream, ok = s.streams[spdy.StreamId(streamId)]
|
||
|
debugMessage("(%p) Found stream %d? %t", s, spdy.StreamId(streamId), ok)
|
||
|
for !ok && streamId >= uint32(s.receivedStreamId) {
|
||
|
s.streamCond.Wait()
|
||
|
stream, ok = s.streams[spdy.StreamId(streamId)]
|
||
|
}
|
||
|
s.streamCond.L.Unlock()
|
||
|
return stream
|
||
|
}
|
||
|
|
||
|
func (s *Connection) CloseChan() <-chan bool {
|
||
|
return s.closeChan
|
||
|
}
|