// Copyright 2011 Google Inc. All rights reserved.
// Use of this source code is governed by the Apache 2.0
// license that can be found in the LICENSE file.

//go:build !appengine
// +build !appengine

package internal

import (
	"bytes"
	"context"
	"errors"
	"fmt"
	"io/ioutil"
	"log"
	"net"
	"net/http"
	"net/url"
	"os"
	"runtime"
	"strconv"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"github.com/golang/protobuf/proto"

	basepb "google.golang.org/appengine/internal/base"
	logpb "google.golang.org/appengine/internal/log"
	remotepb "google.golang.org/appengine/internal/remote_api"
)

const (
	apiPath = "/rpc_http"
)

var (
	// Incoming headers.
	ticketHeader       = http.CanonicalHeaderKey("X-AppEngine-API-Ticket")
	dapperHeader       = http.CanonicalHeaderKey("X-Google-DapperTraceInfo")
	traceHeader        = http.CanonicalHeaderKey("X-Cloud-Trace-Context")
	curNamespaceHeader = http.CanonicalHeaderKey("X-AppEngine-Current-Namespace")
	userIPHeader       = http.CanonicalHeaderKey("X-AppEngine-User-IP")
	remoteAddrHeader   = http.CanonicalHeaderKey("X-AppEngine-Remote-Addr")
	devRequestIdHeader = http.CanonicalHeaderKey("X-Appengine-Dev-Request-Id")

	// Outgoing headers.
	apiEndpointHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Endpoint")
	apiEndpointHeaderValue = []string{"app-engine-apis"}
	apiMethodHeader        = http.CanonicalHeaderKey("X-Google-RPC-Service-Method")
	apiMethodHeaderValue   = []string{"/VMRemoteAPI.CallRemoteAPI"}
	apiDeadlineHeader      = http.CanonicalHeaderKey("X-Google-RPC-Service-Deadline")
	apiContentType         = http.CanonicalHeaderKey("Content-Type")
	apiContentTypeValue    = []string{"application/octet-stream"}
	logFlushHeader         = http.CanonicalHeaderKey("X-AppEngine-Log-Flush-Count")

	apiHTTPClient = &http.Client{
		Transport: &http.Transport{
			Proxy:               http.ProxyFromEnvironment,
			Dial:                limitDial,
			MaxIdleConns:        1000,
			MaxIdleConnsPerHost: 10000,
			IdleConnTimeout:     90 * time.Second,
		},
	}
)

func apiURL(ctx context.Context) *url.URL {
	host, port := "appengine.googleapis.internal", "10001"
	if h := os.Getenv("API_HOST"); h != "" {
		host = h
	}
	if hostOverride := ctx.Value(apiHostOverrideKey); hostOverride != nil {
		host = hostOverride.(string)
	}
	if p := os.Getenv("API_PORT"); p != "" {
		port = p
	}
	if portOverride := ctx.Value(apiPortOverrideKey); portOverride != nil {
		port = portOverride.(string)
	}
	return &url.URL{
		Scheme: "http",
		Host:   host + ":" + port,
		Path:   apiPath,
	}
}

// Middleware wraps an http handler so that it can make GAE API calls
func Middleware(next http.Handler) http.Handler {
	return handleHTTPMiddleware(executeRequestSafelyMiddleware(next))
}

func handleHTTPMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		c := &aeContext{
			req:       r,
			outHeader: w.Header(),
		}
		r = r.WithContext(withContext(r.Context(), c))
		c.req = r

		stopFlushing := make(chan int)

		// Patch up RemoteAddr so it looks reasonable.
		if addr := r.Header.Get(userIPHeader); addr != "" {
			r.RemoteAddr = addr
		} else if addr = r.Header.Get(remoteAddrHeader); addr != "" {
			r.RemoteAddr = addr
		} else {
			// Should not normally reach here, but pick a sensible default anyway.
			r.RemoteAddr = "127.0.0.1"
		}
		// The address in the headers will most likely be of these forms:
		//	123.123.123.123
		//	2001:db8::1
		// net/http.Request.RemoteAddr is specified to be in "IP:port" form.
		if _, _, err := net.SplitHostPort(r.RemoteAddr); err != nil {
			// Assume the remote address is only a host; add a default port.
			r.RemoteAddr = net.JoinHostPort(r.RemoteAddr, "80")
		}

		if logToLogservice() {
			// Start goroutine responsible for flushing app logs.
			// This is done after adding c to ctx.m (and stopped before removing it)
			// because flushing logs requires making an API call.
			go c.logFlusher(stopFlushing)
		}

		next.ServeHTTP(c, r)
		c.outHeader = nil // make sure header changes aren't respected any more

		flushed := make(chan struct{})
		if logToLogservice() {
			stopFlushing <- 1 // any logging beyond this point will be dropped

			// Flush any pending logs asynchronously.
			c.pendingLogs.Lock()
			flushes := c.pendingLogs.flushes
			if len(c.pendingLogs.lines) > 0 {
				flushes++
			}
			c.pendingLogs.Unlock()
			go func() {
				defer close(flushed)
				// Force a log flush, because with very short requests we
				// may not ever flush logs.
				c.flushLog(true)
			}()
			w.Header().Set(logFlushHeader, strconv.Itoa(flushes))
		}

		// Avoid nil Write call if c.Write is never called.
		if c.outCode != 0 {
			w.WriteHeader(c.outCode)
		}
		if c.outBody != nil {
			w.Write(c.outBody)
		}
		if logToLogservice() {
			// Wait for the last flush to complete before returning,
			// otherwise the security ticket will not be valid.
			<-flushed
		}
	})
}

func executeRequestSafelyMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		defer func() {
			if x := recover(); x != nil {
				c := w.(*aeContext)
				logf(c, 4, "%s", renderPanic(x)) // 4 == critical
				c.outCode = 500
			}
		}()

		next.ServeHTTP(w, r)
	})
}

func renderPanic(x interface{}) string {
	buf := make([]byte, 16<<10) // 16 KB should be plenty
	buf = buf[:runtime.Stack(buf, false)]

	// Remove the first few stack frames:
	//   this func
	//   the recover closure in the caller
	// That will root the stack trace at the site of the panic.
	const (
		skipStart  = "internal.renderPanic"
		skipFrames = 2
	)
	start := bytes.Index(buf, []byte(skipStart))
	p := start
	for i := 0; i < skipFrames*2 && p+1 < len(buf); i++ {
		p = bytes.IndexByte(buf[p+1:], '\n') + p + 1
		if p < 0 {
			break
		}
	}
	if p >= 0 {
		// buf[start:p+1] is the block to remove.
		// Copy buf[p+1:] over buf[start:] and shrink buf.
		copy(buf[start:], buf[p+1:])
		buf = buf[:len(buf)-(p+1-start)]
	}

	// Add panic heading.
	head := fmt.Sprintf("panic: %v\n\n", x)
	if len(head) > len(buf) {
		// Extremely unlikely to happen.
		return head
	}
	copy(buf[len(head):], buf)
	copy(buf, head)

	return string(buf)
}

// aeContext represents the aeContext of an in-flight HTTP request.
// It implements the appengine.Context and http.ResponseWriter interfaces.
type aeContext struct {
	req *http.Request

	outCode   int
	outHeader http.Header
	outBody   []byte

	pendingLogs struct {
		sync.Mutex
		lines   []*logpb.UserAppLogLine
		flushes int
	}
}

var contextKey = "holds a *context"

// jointContext joins two contexts in a superficial way.
// It takes values and timeouts from a base context, and only values from another context.
type jointContext struct {
	base       context.Context
	valuesOnly context.Context
}

func (c jointContext) Deadline() (time.Time, bool) {
	return c.base.Deadline()
}

func (c jointContext) Done() <-chan struct{} {
	return c.base.Done()
}

func (c jointContext) Err() error {
	return c.base.Err()
}

func (c jointContext) Value(key interface{}) interface{} {
	if val := c.base.Value(key); val != nil {
		return val
	}
	return c.valuesOnly.Value(key)
}

// fromContext returns the App Engine context or nil if ctx is not
// derived from an App Engine context.
func fromContext(ctx context.Context) *aeContext {
	c, _ := ctx.Value(&contextKey).(*aeContext)
	return c
}

func withContext(parent context.Context, c *aeContext) context.Context {
	ctx := context.WithValue(parent, &contextKey, c)
	if ns := c.req.Header.Get(curNamespaceHeader); ns != "" {
		ctx = withNamespace(ctx, ns)
	}
	return ctx
}

func toContext(c *aeContext) context.Context {
	return withContext(context.Background(), c)
}

func IncomingHeaders(ctx context.Context) http.Header {
	if c := fromContext(ctx); c != nil {
		return c.req.Header
	}
	return nil
}

func ReqContext(req *http.Request) context.Context {
	return req.Context()
}

func WithContext(parent context.Context, req *http.Request) context.Context {
	return jointContext{
		base:       parent,
		valuesOnly: req.Context(),
	}
}

// RegisterTestRequest registers the HTTP request req for testing, such that
// any API calls are sent to the provided URL.
// It should only be used by aetest package.
func RegisterTestRequest(req *http.Request, apiURL *url.URL, appID string) *http.Request {
	ctx := req.Context()
	ctx = withAPIHostOverride(ctx, apiURL.Hostname())
	ctx = withAPIPortOverride(ctx, apiURL.Port())
	ctx = WithAppIDOverride(ctx, appID)

	// use the unregistered request as a placeholder so that withContext can read the headers
	c := &aeContext{req: req}
	c.req = req.WithContext(withContext(ctx, c))
	return c.req
}

var errTimeout = &CallError{
	Detail:  "Deadline exceeded",
	Code:    int32(remotepb.RpcError_CANCELLED),
	Timeout: true,
}

func (c *aeContext) Header() http.Header { return c.outHeader }

// Copied from $GOROOT/src/pkg/net/http/transfer.go. Some response status
// codes do not permit a response body (nor response entity headers such as
// Content-Length, Content-Type, etc).
func bodyAllowedForStatus(status int) bool {
	switch {
	case status >= 100 && status <= 199:
		return false
	case status == 204:
		return false
	case status == 304:
		return false
	}
	return true
}

func (c *aeContext) Write(b []byte) (int, error) {
	if c.outCode == 0 {
		c.WriteHeader(http.StatusOK)
	}
	if len(b) > 0 && !bodyAllowedForStatus(c.outCode) {
		return 0, http.ErrBodyNotAllowed
	}
	c.outBody = append(c.outBody, b...)
	return len(b), nil
}

func (c *aeContext) WriteHeader(code int) {
	if c.outCode != 0 {
		logf(c, 3, "WriteHeader called multiple times on request.") // error level
		return
	}
	c.outCode = code
}

func post(ctx context.Context, body []byte, timeout time.Duration) (b []byte, err error) {
	apiURL := apiURL(ctx)
	hreq := &http.Request{
		Method: "POST",
		URL:    apiURL,
		Header: http.Header{
			apiEndpointHeader: apiEndpointHeaderValue,
			apiMethodHeader:   apiMethodHeaderValue,
			apiContentType:    apiContentTypeValue,
			apiDeadlineHeader: []string{strconv.FormatFloat(timeout.Seconds(), 'f', -1, 64)},
		},
		Body:          ioutil.NopCloser(bytes.NewReader(body)),
		ContentLength: int64(len(body)),
		Host:          apiURL.Host,
	}
	c := fromContext(ctx)
	if c != nil {
		if info := c.req.Header.Get(dapperHeader); info != "" {
			hreq.Header.Set(dapperHeader, info)
		}
		if info := c.req.Header.Get(traceHeader); info != "" {
			hreq.Header.Set(traceHeader, info)
		}
	}

	tr := apiHTTPClient.Transport.(*http.Transport)

	var timedOut int32 // atomic; set to 1 if timed out
	t := time.AfterFunc(timeout, func() {
		atomic.StoreInt32(&timedOut, 1)
		tr.CancelRequest(hreq)
	})
	defer t.Stop()
	defer func() {
		// Check if timeout was exceeded.
		if atomic.LoadInt32(&timedOut) != 0 {
			err = errTimeout
		}
	}()

	hresp, err := apiHTTPClient.Do(hreq)
	if err != nil {
		return nil, &CallError{
			Detail: fmt.Sprintf("service bridge HTTP failed: %v", err),
			Code:   int32(remotepb.RpcError_UNKNOWN),
		}
	}
	defer hresp.Body.Close()
	hrespBody, err := ioutil.ReadAll(hresp.Body)
	if hresp.StatusCode != 200 {
		return nil, &CallError{
			Detail: fmt.Sprintf("service bridge returned HTTP %d (%q)", hresp.StatusCode, hrespBody),
			Code:   int32(remotepb.RpcError_UNKNOWN),
		}
	}
	if err != nil {
		return nil, &CallError{
			Detail: fmt.Sprintf("service bridge response bad: %v", err),
			Code:   int32(remotepb.RpcError_UNKNOWN),
		}
	}
	return hrespBody, nil
}

func Call(ctx context.Context, service, method string, in, out proto.Message) error {
	if ns := NamespaceFromContext(ctx); ns != "" {
		if fn, ok := NamespaceMods[service]; ok {
			fn(in, ns)
		}
	}

	if f, ctx, ok := callOverrideFromContext(ctx); ok {
		return f(ctx, service, method, in, out)
	}

	// Handle already-done contexts quickly.
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
	}

	c := fromContext(ctx)

	// Apply transaction modifications if we're in a transaction.
	if t := transactionFromContext(ctx); t != nil {
		if t.finished {
			return errors.New("transaction aeContext has expired")
		}
		applyTransaction(in, &t.transaction)
	}

	// Default RPC timeout is 60s.
	timeout := 60 * time.Second
	if deadline, ok := ctx.Deadline(); ok {
		timeout = deadline.Sub(time.Now())
	}

	data, err := proto.Marshal(in)
	if err != nil {
		return err
	}

	ticket := ""
	if c != nil {
		ticket = c.req.Header.Get(ticketHeader)
		if dri := c.req.Header.Get(devRequestIdHeader); IsDevAppServer() && dri != "" {
			ticket = dri
		}
	}
	req := &remotepb.Request{
		ServiceName: &service,
		Method:      &method,
		Request:     data,
		RequestId:   &ticket,
	}
	hreqBody, err := proto.Marshal(req)
	if err != nil {
		return err
	}

	hrespBody, err := post(ctx, hreqBody, timeout)
	if err != nil {
		return err
	}

	res := &remotepb.Response{}
	if err := proto.Unmarshal(hrespBody, res); err != nil {
		return err
	}
	if res.RpcError != nil {
		ce := &CallError{
			Detail: res.RpcError.GetDetail(),
			Code:   *res.RpcError.Code,
		}
		switch remotepb.RpcError_ErrorCode(ce.Code) {
		case remotepb.RpcError_CANCELLED, remotepb.RpcError_DEADLINE_EXCEEDED:
			ce.Timeout = true
		}
		return ce
	}
	if res.ApplicationError != nil {
		return &APIError{
			Service: *req.ServiceName,
			Detail:  res.ApplicationError.GetDetail(),
			Code:    *res.ApplicationError.Code,
		}
	}
	if res.Exception != nil || res.JavaException != nil {
		// This shouldn't happen, but let's be defensive.
		return &CallError{
			Detail: "service bridge returned exception",
			Code:   int32(remotepb.RpcError_UNKNOWN),
		}
	}
	return proto.Unmarshal(res.Response, out)
}

func (c *aeContext) Request() *http.Request {
	return c.req
}

func (c *aeContext) addLogLine(ll *logpb.UserAppLogLine) {
	// Truncate long log lines.
	// TODO(dsymonds): Check if this is still necessary.
	const lim = 8 << 10
	if len(*ll.Message) > lim {
		suffix := fmt.Sprintf("...(length %d)", len(*ll.Message))
		ll.Message = proto.String((*ll.Message)[:lim-len(suffix)] + suffix)
	}

	c.pendingLogs.Lock()
	c.pendingLogs.lines = append(c.pendingLogs.lines, ll)
	c.pendingLogs.Unlock()
}

var logLevelName = map[int64]string{
	0: "DEBUG",
	1: "INFO",
	2: "WARNING",
	3: "ERROR",
	4: "CRITICAL",
}

func logf(c *aeContext, level int64, format string, args ...interface{}) {
	if c == nil {
		panic("not an App Engine aeContext")
	}
	s := fmt.Sprintf(format, args...)
	s = strings.TrimRight(s, "\n") // Remove any trailing newline characters.
	if logToLogservice() {
		c.addLogLine(&logpb.UserAppLogLine{
			TimestampUsec: proto.Int64(time.Now().UnixNano() / 1e3),
			Level:         &level,
			Message:       &s,
		})
	}
	// Log to stdout if not deployed
	if !IsSecondGen() {
		log.Print(logLevelName[level] + ": " + s)
	}
}

// flushLog attempts to flush any pending logs to the appserver.
// It should not be called concurrently.
func (c *aeContext) flushLog(force bool) (flushed bool) {
	c.pendingLogs.Lock()
	// Grab up to 30 MB. We can get away with up to 32 MB, but let's be cautious.
	n, rem := 0, 30<<20
	for ; n < len(c.pendingLogs.lines); n++ {
		ll := c.pendingLogs.lines[n]
		// Each log line will require about 3 bytes of overhead.
		nb := proto.Size(ll) + 3
		if nb > rem {
			break
		}
		rem -= nb
	}
	lines := c.pendingLogs.lines[:n]
	c.pendingLogs.lines = c.pendingLogs.lines[n:]
	c.pendingLogs.Unlock()

	if len(lines) == 0 && !force {
		// Nothing to flush.
		return false
	}

	rescueLogs := false
	defer func() {
		if rescueLogs {
			c.pendingLogs.Lock()
			c.pendingLogs.lines = append(lines, c.pendingLogs.lines...)
			c.pendingLogs.Unlock()
		}
	}()

	buf, err := proto.Marshal(&logpb.UserAppLogGroup{
		LogLine: lines,
	})
	if err != nil {
		log.Printf("internal.flushLog: marshaling UserAppLogGroup: %v", err)
		rescueLogs = true
		return false
	}

	req := &logpb.FlushRequest{
		Logs: buf,
	}
	res := &basepb.VoidProto{}
	c.pendingLogs.Lock()
	c.pendingLogs.flushes++
	c.pendingLogs.Unlock()
	if err := Call(toContext(c), "logservice", "Flush", req, res); err != nil {
		log.Printf("internal.flushLog: Flush RPC: %v", err)
		rescueLogs = true
		return false
	}
	return true
}

const (
	// Log flushing parameters.
	flushInterval      = 1 * time.Second
	forceFlushInterval = 60 * time.Second
)

func (c *aeContext) logFlusher(stop <-chan int) {
	lastFlush := time.Now()
	tick := time.NewTicker(flushInterval)
	for {
		select {
		case <-stop:
			// Request finished.
			tick.Stop()
			return
		case <-tick.C:
			force := time.Now().Sub(lastFlush) > forceFlushInterval
			if c.flushLog(force) {
				lastFlush = time.Now()
			}
		}
	}
}

func ContextForTesting(req *http.Request) context.Context {
	return toContext(&aeContext{req: req})
}

func logToLogservice() bool {
	// TODO: replace logservice with json structured logs to $LOG_DIR/app.log.json
	// where $LOG_DIR is /var/log in prod and some tmpdir in dev
	return os.Getenv("LOG_TO_LOGSERVICE") != "0"
}