/*
   Copyright 2014-2021 Docker Inc.

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
*/

package spdystream

import (
	"errors"
	"fmt"
	"io"
	"net"
	"net/http"
	"sync"
	"time"

	"github.com/moby/spdystream/spdy"
)

var (
	ErrUnreadPartialData = errors.New("unread partial data")
)

type Stream struct {
	streamId  spdy.StreamId
	parent    *Stream
	conn      *Connection
	startChan chan error

	dataLock sync.RWMutex
	dataChan chan []byte
	unread   []byte

	priority   uint8
	headers    http.Header
	headerChan chan http.Header
	finishLock sync.Mutex
	finished   bool
	replyCond  *sync.Cond
	replied    bool
	closeLock  sync.Mutex
	closeChan  chan bool
}

// WriteData writes data to stream, sending a dataframe per call
func (s *Stream) WriteData(data []byte, fin bool) error {
	s.waitWriteReply()
	var flags spdy.DataFlags

	if fin {
		flags = spdy.DataFlagFin
		s.finishLock.Lock()
		if s.finished {
			s.finishLock.Unlock()
			return ErrWriteClosedStream
		}
		s.finished = true
		s.finishLock.Unlock()
	}

	dataFrame := &spdy.DataFrame{
		StreamId: s.streamId,
		Flags:    flags,
		Data:     data,
	}

	debugMessage("(%p) (%d) Writing data frame", s, s.streamId)
	return s.conn.framer.WriteFrame(dataFrame)
}

// Write writes bytes to a stream, calling write data for each call.
func (s *Stream) Write(data []byte) (n int, err error) {
	err = s.WriteData(data, false)
	if err == nil {
		n = len(data)
	}
	return
}

// Read reads bytes from a stream, a single read will never get more
// than what is sent on a single data frame, but a multiple calls to
// read may get data from the same data frame.
func (s *Stream) Read(p []byte) (n int, err error) {
	if s.unread == nil {
		select {
		case <-s.closeChan:
			return 0, io.EOF
		case read, ok := <-s.dataChan:
			if !ok {
				return 0, io.EOF
			}
			s.unread = read
		}
	}
	n = copy(p, s.unread)
	if n < len(s.unread) {
		s.unread = s.unread[n:]
	} else {
		s.unread = nil
	}
	return
}

// ReadData reads an entire data frame and returns the byte array
// from the data frame.  If there is unread data from the result
// of a Read call, this function will return an ErrUnreadPartialData.
func (s *Stream) ReadData() ([]byte, error) {
	debugMessage("(%p) Reading data from %d", s, s.streamId)
	if s.unread != nil {
		return nil, ErrUnreadPartialData
	}
	select {
	case <-s.closeChan:
		return nil, io.EOF
	case read, ok := <-s.dataChan:
		if !ok {
			return nil, io.EOF
		}
		return read, nil
	}
}

func (s *Stream) waitWriteReply() {
	if s.replyCond != nil {
		s.replyCond.L.Lock()
		for !s.replied {
			s.replyCond.Wait()
		}
		s.replyCond.L.Unlock()
	}
}

// Wait waits for the stream to receive a reply.
func (s *Stream) Wait() error {
	return s.WaitTimeout(time.Duration(0))
}

// WaitTimeout waits for the stream to receive a reply or for timeout.
// When the timeout is reached, ErrTimeout will be returned.
func (s *Stream) WaitTimeout(timeout time.Duration) error {
	var timeoutChan <-chan time.Time
	if timeout > time.Duration(0) {
		timeoutChan = time.After(timeout)
	}

	select {
	case err := <-s.startChan:
		if err != nil {
			return err
		}
		break
	case <-timeoutChan:
		return ErrTimeout
	}
	return nil
}

// Close closes the stream by sending an empty data frame with the
// finish flag set, indicating this side is finished with the stream.
func (s *Stream) Close() error {
	select {
	case <-s.closeChan:
		// Stream is now fully closed
		s.conn.removeStream(s)
	default:
		break
	}
	return s.WriteData([]byte{}, true)
}

// Reset sends a reset frame, putting the stream into the fully closed state.
func (s *Stream) Reset() error {
	s.conn.removeStream(s)
	return s.resetStream()
}

func (s *Stream) resetStream() error {
	// Always call closeRemoteChannels, even if s.finished is already true.
	// This makes it so that stream.Close() followed by stream.Reset() allows
	// stream.Read() to unblock.
	s.closeRemoteChannels()

	s.finishLock.Lock()
	if s.finished {
		s.finishLock.Unlock()
		return nil
	}
	s.finished = true
	s.finishLock.Unlock()

	resetFrame := &spdy.RstStreamFrame{
		StreamId: s.streamId,
		Status:   spdy.Cancel,
	}
	return s.conn.framer.WriteFrame(resetFrame)
}

// CreateSubStream creates a stream using the current as the parent
func (s *Stream) CreateSubStream(headers http.Header, fin bool) (*Stream, error) {
	return s.conn.CreateStream(headers, s, fin)
}

// SetPriority sets the stream priority, does not affect the
// remote priority of this stream after Open has been called.
// Valid values are 0 through 7, 0 being the highest priority
// and 7 the lowest.
func (s *Stream) SetPriority(priority uint8) {
	s.priority = priority
}

// SendHeader sends a header frame across the stream
func (s *Stream) SendHeader(headers http.Header, fin bool) error {
	return s.conn.sendHeaders(headers, s, fin)
}

// SendReply sends a reply on a stream, only valid to be called once
// when handling a new stream
func (s *Stream) SendReply(headers http.Header, fin bool) error {
	if s.replyCond == nil {
		return errors.New("cannot reply on initiated stream")
	}
	s.replyCond.L.Lock()
	defer s.replyCond.L.Unlock()
	if s.replied {
		return nil
	}

	err := s.conn.sendReply(headers, s, fin)
	if err != nil {
		return err
	}

	s.replied = true
	s.replyCond.Broadcast()
	return nil
}

// Refuse sends a reset frame with the status refuse, only
// valid to be called once when handling a new stream.  This
// may be used to indicate that a stream is not allowed
// when http status codes are not being used.
func (s *Stream) Refuse() error {
	if s.replied {
		return nil
	}
	s.replied = true
	return s.conn.sendReset(spdy.RefusedStream, s)
}

// Cancel sends a reset frame with the status canceled. This
// can be used at any time by the creator of the Stream to
// indicate the stream is no longer needed.
func (s *Stream) Cancel() error {
	return s.conn.sendReset(spdy.Cancel, s)
}

// ReceiveHeader receives a header sent on the other side
// of the stream.  This function will block until a header
// is received or stream is closed.
func (s *Stream) ReceiveHeader() (http.Header, error) {
	select {
	case <-s.closeChan:
		break
	case header, ok := <-s.headerChan:
		if !ok {
			return nil, fmt.Errorf("header chan closed")
		}
		return header, nil
	}
	return nil, fmt.Errorf("stream closed")
}

// Parent returns the parent stream
func (s *Stream) Parent() *Stream {
	return s.parent
}

// Headers returns the headers used to create the stream
func (s *Stream) Headers() http.Header {
	return s.headers
}

// String returns the string version of stream using the
// streamId to uniquely identify the stream
func (s *Stream) String() string {
	return fmt.Sprintf("stream:%d", s.streamId)
}

// Identifier returns a 32 bit identifier for the stream
func (s *Stream) Identifier() uint32 {
	return uint32(s.streamId)
}

// IsFinished returns whether the stream has finished
// sending data
func (s *Stream) IsFinished() bool {
	return s.finished
}

// Implement net.Conn interface

func (s *Stream) LocalAddr() net.Addr {
	return s.conn.conn.LocalAddr()
}

func (s *Stream) RemoteAddr() net.Addr {
	return s.conn.conn.RemoteAddr()
}

// TODO set per stream values instead of connection-wide

func (s *Stream) SetDeadline(t time.Time) error {
	return s.conn.conn.SetDeadline(t)
}

func (s *Stream) SetReadDeadline(t time.Time) error {
	return s.conn.conn.SetReadDeadline(t)
}

func (s *Stream) SetWriteDeadline(t time.Time) error {
	return s.conn.conn.SetWriteDeadline(t)
}

func (s *Stream) closeRemoteChannels() {
	s.closeLock.Lock()
	defer s.closeLock.Unlock()
	select {
	case <-s.closeChan:
	default:
		close(s.closeChan)
	}
}