mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-15 09:20:53 +00:00
328 lines
7.4 KiB
Go
328 lines
7.4 KiB
Go
|
package spdystream
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"net"
|
||
|
"net/http"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/docker/spdystream/spdy"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
ErrUnreadPartialData = errors.New("unread partial data")
|
||
|
)
|
||
|
|
||
|
type Stream struct {
|
||
|
streamId spdy.StreamId
|
||
|
parent *Stream
|
||
|
conn *Connection
|
||
|
startChan chan error
|
||
|
|
||
|
dataLock sync.RWMutex
|
||
|
dataChan chan []byte
|
||
|
unread []byte
|
||
|
|
||
|
priority uint8
|
||
|
headers http.Header
|
||
|
headerChan chan http.Header
|
||
|
finishLock sync.Mutex
|
||
|
finished bool
|
||
|
replyCond *sync.Cond
|
||
|
replied bool
|
||
|
closeLock sync.Mutex
|
||
|
closeChan chan bool
|
||
|
}
|
||
|
|
||
|
// WriteData writes data to stream, sending a dataframe per call
|
||
|
func (s *Stream) WriteData(data []byte, fin bool) error {
|
||
|
s.waitWriteReply()
|
||
|
var flags spdy.DataFlags
|
||
|
|
||
|
if fin {
|
||
|
flags = spdy.DataFlagFin
|
||
|
s.finishLock.Lock()
|
||
|
if s.finished {
|
||
|
s.finishLock.Unlock()
|
||
|
return ErrWriteClosedStream
|
||
|
}
|
||
|
s.finished = true
|
||
|
s.finishLock.Unlock()
|
||
|
}
|
||
|
|
||
|
dataFrame := &spdy.DataFrame{
|
||
|
StreamId: s.streamId,
|
||
|
Flags: flags,
|
||
|
Data: data,
|
||
|
}
|
||
|
|
||
|
debugMessage("(%p) (%d) Writing data frame", s, s.streamId)
|
||
|
return s.conn.framer.WriteFrame(dataFrame)
|
||
|
}
|
||
|
|
||
|
// Write writes bytes to a stream, calling write data for each call.
|
||
|
func (s *Stream) Write(data []byte) (n int, err error) {
|
||
|
err = s.WriteData(data, false)
|
||
|
if err == nil {
|
||
|
n = len(data)
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Read reads bytes from a stream, a single read will never get more
|
||
|
// than what is sent on a single data frame, but a multiple calls to
|
||
|
// read may get data from the same data frame.
|
||
|
func (s *Stream) Read(p []byte) (n int, err error) {
|
||
|
if s.unread == nil {
|
||
|
select {
|
||
|
case <-s.closeChan:
|
||
|
return 0, io.EOF
|
||
|
case read, ok := <-s.dataChan:
|
||
|
if !ok {
|
||
|
return 0, io.EOF
|
||
|
}
|
||
|
s.unread = read
|
||
|
}
|
||
|
}
|
||
|
n = copy(p, s.unread)
|
||
|
if n < len(s.unread) {
|
||
|
s.unread = s.unread[n:]
|
||
|
} else {
|
||
|
s.unread = nil
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// ReadData reads an entire data frame and returns the byte array
|
||
|
// from the data frame. If there is unread data from the result
|
||
|
// of a Read call, this function will return an ErrUnreadPartialData.
|
||
|
func (s *Stream) ReadData() ([]byte, error) {
|
||
|
debugMessage("(%p) Reading data from %d", s, s.streamId)
|
||
|
if s.unread != nil {
|
||
|
return nil, ErrUnreadPartialData
|
||
|
}
|
||
|
select {
|
||
|
case <-s.closeChan:
|
||
|
return nil, io.EOF
|
||
|
case read, ok := <-s.dataChan:
|
||
|
if !ok {
|
||
|
return nil, io.EOF
|
||
|
}
|
||
|
return read, nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (s *Stream) waitWriteReply() {
|
||
|
if s.replyCond != nil {
|
||
|
s.replyCond.L.Lock()
|
||
|
for !s.replied {
|
||
|
s.replyCond.Wait()
|
||
|
}
|
||
|
s.replyCond.L.Unlock()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Wait waits for the stream to receive a reply.
|
||
|
func (s *Stream) Wait() error {
|
||
|
return s.WaitTimeout(time.Duration(0))
|
||
|
}
|
||
|
|
||
|
// WaitTimeout waits for the stream to receive a reply or for timeout.
|
||
|
// When the timeout is reached, ErrTimeout will be returned.
|
||
|
func (s *Stream) WaitTimeout(timeout time.Duration) error {
|
||
|
var timeoutChan <-chan time.Time
|
||
|
if timeout > time.Duration(0) {
|
||
|
timeoutChan = time.After(timeout)
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case err := <-s.startChan:
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
break
|
||
|
case <-timeoutChan:
|
||
|
return ErrTimeout
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Close closes the stream by sending an empty data frame with the
|
||
|
// finish flag set, indicating this side is finished with the stream.
|
||
|
func (s *Stream) Close() error {
|
||
|
select {
|
||
|
case <-s.closeChan:
|
||
|
// Stream is now fully closed
|
||
|
s.conn.removeStream(s)
|
||
|
default:
|
||
|
break
|
||
|
}
|
||
|
return s.WriteData([]byte{}, true)
|
||
|
}
|
||
|
|
||
|
// Reset sends a reset frame, putting the stream into the fully closed state.
|
||
|
func (s *Stream) Reset() error {
|
||
|
s.conn.removeStream(s)
|
||
|
return s.resetStream()
|
||
|
}
|
||
|
|
||
|
func (s *Stream) resetStream() error {
|
||
|
// Always call closeRemoteChannels, even if s.finished is already true.
|
||
|
// This makes it so that stream.Close() followed by stream.Reset() allows
|
||
|
// stream.Read() to unblock.
|
||
|
s.closeRemoteChannels()
|
||
|
|
||
|
s.finishLock.Lock()
|
||
|
if s.finished {
|
||
|
s.finishLock.Unlock()
|
||
|
return nil
|
||
|
}
|
||
|
s.finished = true
|
||
|
s.finishLock.Unlock()
|
||
|
|
||
|
resetFrame := &spdy.RstStreamFrame{
|
||
|
StreamId: s.streamId,
|
||
|
Status: spdy.Cancel,
|
||
|
}
|
||
|
return s.conn.framer.WriteFrame(resetFrame)
|
||
|
}
|
||
|
|
||
|
// CreateSubStream creates a stream using the current as the parent
|
||
|
func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) {
|
||
|
return s.conn.CreateStream(headers, s, fin)
|
||
|
}
|
||
|
|
||
|
// SetPriority sets the stream priority, does not affect the
|
||
|
// remote priority of this stream after Open has been called.
|
||
|
// Valid values are 0 through 7, 0 being the highest priority
|
||
|
// and 7 the lowest.
|
||
|
func (s *Stream) SetPriority(priority uint8) {
|
||
|
s.priority = priority
|
||
|
}
|
||
|
|
||
|
// SendHeader sends a header frame across the stream
|
||
|
func (s *Stream) SendHeader(headers http.Header, fin bool) error {
|
||
|
return s.conn.sendHeaders(headers, s, fin)
|
||
|
}
|
||
|
|
||
|
// SendReply sends a reply on a stream, only valid to be called once
|
||
|
// when handling a new stream
|
||
|
func (s *Stream) SendReply(headers http.Header, fin bool) error {
|
||
|
if s.replyCond == nil {
|
||
|
return errors.New("cannot reply on initiated stream")
|
||
|
}
|
||
|
s.replyCond.L.Lock()
|
||
|
defer s.replyCond.L.Unlock()
|
||
|
if s.replied {
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
err := s.conn.sendReply(headers, s, fin)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
s.replied = true
|
||
|
s.replyCond.Broadcast()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Refuse sends a reset frame with the status refuse, only
|
||
|
// valid to be called once when handling a new stream. This
|
||
|
// may be used to indicate that a stream is not allowed
|
||
|
// when http status codes are not being used.
|
||
|
func (s *Stream) Refuse() error {
|
||
|
if s.replied {
|
||
|
return nil
|
||
|
}
|
||
|
s.replied = true
|
||
|
return s.conn.sendReset(spdy.RefusedStream, s)
|
||
|
}
|
||
|
|
||
|
// Cancel sends a reset frame with the status canceled. This
|
||
|
// can be used at any time by the creator of the Stream to
|
||
|
// indicate the stream is no longer needed.
|
||
|
func (s *Stream) Cancel() error {
|
||
|
return s.conn.sendReset(spdy.Cancel, s)
|
||
|
}
|
||
|
|
||
|
// ReceiveHeader receives a header sent on the other side
|
||
|
// of the stream. This function will block until a header
|
||
|
// is received or stream is closed.
|
||
|
func (s *Stream) ReceiveHeader() (http.Header, error) {
|
||
|
select {
|
||
|
case <-s.closeChan:
|
||
|
break
|
||
|
case header, ok := <-s.headerChan:
|
||
|
if !ok {
|
||
|
return nil, fmt.Errorf("header chan closed")
|
||
|
}
|
||
|
return header, nil
|
||
|
}
|
||
|
return nil, fmt.Errorf("stream closed")
|
||
|
}
|
||
|
|
||
|
// Parent returns the parent stream
|
||
|
func (s *Stream) Parent() *Stream {
|
||
|
return s.parent
|
||
|
}
|
||
|
|
||
|
// Headers returns the headers used to create the stream
|
||
|
func (s *Stream) Headers() http.Header {
|
||
|
return s.headers
|
||
|
}
|
||
|
|
||
|
// String returns the string version of stream using the
|
||
|
// streamId to uniquely identify the stream
|
||
|
func (s *Stream) String() string {
|
||
|
return fmt.Sprintf("stream:%d", s.streamId)
|
||
|
}
|
||
|
|
||
|
// Identifier returns a 32 bit identifier for the stream
|
||
|
func (s *Stream) Identifier() uint32 {
|
||
|
return uint32(s.streamId)
|
||
|
}
|
||
|
|
||
|
// IsFinished returns whether the stream has finished
|
||
|
// sending data
|
||
|
func (s *Stream) IsFinished() bool {
|
||
|
return s.finished
|
||
|
}
|
||
|
|
||
|
// Implement net.Conn interface
|
||
|
|
||
|
func (s *Stream) LocalAddr() net.Addr {
|
||
|
return s.conn.conn.LocalAddr()
|
||
|
}
|
||
|
|
||
|
func (s *Stream) RemoteAddr() net.Addr {
|
||
|
return s.conn.conn.RemoteAddr()
|
||
|
}
|
||
|
|
||
|
// TODO set per stream values instead of connection-wide
|
||
|
|
||
|
func (s *Stream) SetDeadline(t time.Time) error {
|
||
|
return s.conn.conn.SetDeadline(t)
|
||
|
}
|
||
|
|
||
|
func (s *Stream) SetReadDeadline(t time.Time) error {
|
||
|
return s.conn.conn.SetReadDeadline(t)
|
||
|
}
|
||
|
|
||
|
func (s *Stream) SetWriteDeadline(t time.Time) error {
|
||
|
return s.conn.conn.SetWriteDeadline(t)
|
||
|
}
|
||
|
|
||
|
func (s *Stream) closeRemoteChannels() {
|
||
|
s.closeLock.Lock()
|
||
|
defer s.closeLock.Unlock()
|
||
|
select {
|
||
|
case <-s.closeChan:
|
||
|
default:
|
||
|
close(s.closeChan)
|
||
|
}
|
||
|
}
|