mirror of
https://github.com/ceph/ceph-csi.git
synced 2025-01-07 20:39:30 +00:00
d300da19b7
Signed-off-by: Madhu Rajanna <madhupr007@gmail.com>
225 lines
6.3 KiB
Go
225 lines
6.3 KiB
Go
/*
|
|
|
|
The remote package provides the pieces to allow Ginkgo test suites to report to remote listeners.
|
|
This is used, primarily, to enable streaming parallel test output but has, in principal, broader applications (e.g. streaming test output to a browser).
|
|
|
|
*/
|
|
|
|
package remote
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"net"
|
|
"net/http"
|
|
"sync"
|
|
|
|
"github.com/onsi/ginkgo/internal/spec_iterator"
|
|
|
|
"github.com/onsi/ginkgo/config"
|
|
"github.com/onsi/ginkgo/reporters"
|
|
"github.com/onsi/ginkgo/types"
|
|
)
|
|
|
|
/*
|
|
Server spins up on an automatically selected port and listens for communication from the forwarding reporter.
|
|
It then forwards that communication to attached reporters.
|
|
*/
|
|
type Server struct {
|
|
listener net.Listener
|
|
reporters []reporters.Reporter
|
|
alives []func() bool
|
|
lock *sync.Mutex
|
|
beforeSuiteData types.RemoteBeforeSuiteData
|
|
parallelTotal int
|
|
counter int
|
|
}
|
|
|
|
//Create a new server, automatically selecting a port
|
|
func NewServer(parallelTotal int) (*Server, error) {
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Server{
|
|
listener: listener,
|
|
lock: &sync.Mutex{},
|
|
alives: make([]func() bool, parallelTotal),
|
|
beforeSuiteData: types.RemoteBeforeSuiteData{Data: nil, State: types.RemoteBeforeSuiteStatePending},
|
|
parallelTotal: parallelTotal,
|
|
}, nil
|
|
}
|
|
|
|
//Start the server. You don't need to `go s.Start()`, just `s.Start()`
|
|
func (server *Server) Start() {
|
|
httpServer := &http.Server{}
|
|
mux := http.NewServeMux()
|
|
httpServer.Handler = mux
|
|
|
|
//streaming endpoints
|
|
mux.HandleFunc("/SpecSuiteWillBegin", server.specSuiteWillBegin)
|
|
mux.HandleFunc("/BeforeSuiteDidRun", server.beforeSuiteDidRun)
|
|
mux.HandleFunc("/AfterSuiteDidRun", server.afterSuiteDidRun)
|
|
mux.HandleFunc("/SpecWillRun", server.specWillRun)
|
|
mux.HandleFunc("/SpecDidComplete", server.specDidComplete)
|
|
mux.HandleFunc("/SpecSuiteDidEnd", server.specSuiteDidEnd)
|
|
|
|
//synchronization endpoints
|
|
mux.HandleFunc("/BeforeSuiteState", server.handleBeforeSuiteState)
|
|
mux.HandleFunc("/RemoteAfterSuiteData", server.handleRemoteAfterSuiteData)
|
|
mux.HandleFunc("/counter", server.handleCounter)
|
|
mux.HandleFunc("/has-counter", server.handleHasCounter) //for backward compatibility
|
|
|
|
go httpServer.Serve(server.listener)
|
|
}
|
|
|
|
//Stop the server
|
|
func (server *Server) Close() {
|
|
server.listener.Close()
|
|
}
|
|
|
|
//The address the server can be reached it. Pass this into the `ForwardingReporter`.
|
|
func (server *Server) Address() string {
|
|
return "http://" + server.listener.Addr().String()
|
|
}
|
|
|
|
//
|
|
// Streaming Endpoints
|
|
//
|
|
|
|
//The server will forward all received messages to Ginkgo reporters registered with `RegisterReporters`
|
|
func (server *Server) readAll(request *http.Request) []byte {
|
|
defer request.Body.Close()
|
|
body, _ := ioutil.ReadAll(request.Body)
|
|
return body
|
|
}
|
|
|
|
func (server *Server) RegisterReporters(reporters ...reporters.Reporter) {
|
|
server.reporters = reporters
|
|
}
|
|
|
|
func (server *Server) specSuiteWillBegin(writer http.ResponseWriter, request *http.Request) {
|
|
body := server.readAll(request)
|
|
|
|
var data struct {
|
|
Config config.GinkgoConfigType `json:"config"`
|
|
Summary *types.SuiteSummary `json:"suite-summary"`
|
|
}
|
|
|
|
json.Unmarshal(body, &data)
|
|
|
|
for _, reporter := range server.reporters {
|
|
reporter.SpecSuiteWillBegin(data.Config, data.Summary)
|
|
}
|
|
}
|
|
|
|
func (server *Server) beforeSuiteDidRun(writer http.ResponseWriter, request *http.Request) {
|
|
body := server.readAll(request)
|
|
var setupSummary *types.SetupSummary
|
|
json.Unmarshal(body, &setupSummary)
|
|
|
|
for _, reporter := range server.reporters {
|
|
reporter.BeforeSuiteDidRun(setupSummary)
|
|
}
|
|
}
|
|
|
|
func (server *Server) afterSuiteDidRun(writer http.ResponseWriter, request *http.Request) {
|
|
body := server.readAll(request)
|
|
var setupSummary *types.SetupSummary
|
|
json.Unmarshal(body, &setupSummary)
|
|
|
|
for _, reporter := range server.reporters {
|
|
reporter.AfterSuiteDidRun(setupSummary)
|
|
}
|
|
}
|
|
|
|
func (server *Server) specWillRun(writer http.ResponseWriter, request *http.Request) {
|
|
body := server.readAll(request)
|
|
var specSummary *types.SpecSummary
|
|
json.Unmarshal(body, &specSummary)
|
|
|
|
for _, reporter := range server.reporters {
|
|
reporter.SpecWillRun(specSummary)
|
|
}
|
|
}
|
|
|
|
func (server *Server) specDidComplete(writer http.ResponseWriter, request *http.Request) {
|
|
body := server.readAll(request)
|
|
var specSummary *types.SpecSummary
|
|
json.Unmarshal(body, &specSummary)
|
|
|
|
for _, reporter := range server.reporters {
|
|
reporter.SpecDidComplete(specSummary)
|
|
}
|
|
}
|
|
|
|
func (server *Server) specSuiteDidEnd(writer http.ResponseWriter, request *http.Request) {
|
|
body := server.readAll(request)
|
|
var suiteSummary *types.SuiteSummary
|
|
json.Unmarshal(body, &suiteSummary)
|
|
|
|
for _, reporter := range server.reporters {
|
|
reporter.SpecSuiteDidEnd(suiteSummary)
|
|
}
|
|
}
|
|
|
|
//
|
|
// Synchronization Endpoints
|
|
//
|
|
|
|
func (server *Server) RegisterAlive(node int, alive func() bool) {
|
|
server.lock.Lock()
|
|
defer server.lock.Unlock()
|
|
server.alives[node-1] = alive
|
|
}
|
|
|
|
func (server *Server) nodeIsAlive(node int) bool {
|
|
server.lock.Lock()
|
|
defer server.lock.Unlock()
|
|
alive := server.alives[node-1]
|
|
if alive == nil {
|
|
return true
|
|
}
|
|
return alive()
|
|
}
|
|
|
|
func (server *Server) handleBeforeSuiteState(writer http.ResponseWriter, request *http.Request) {
|
|
if request.Method == "POST" {
|
|
dec := json.NewDecoder(request.Body)
|
|
dec.Decode(&(server.beforeSuiteData))
|
|
} else {
|
|
beforeSuiteData := server.beforeSuiteData
|
|
if beforeSuiteData.State == types.RemoteBeforeSuiteStatePending && !server.nodeIsAlive(1) {
|
|
beforeSuiteData.State = types.RemoteBeforeSuiteStateDisappeared
|
|
}
|
|
enc := json.NewEncoder(writer)
|
|
enc.Encode(beforeSuiteData)
|
|
}
|
|
}
|
|
|
|
func (server *Server) handleRemoteAfterSuiteData(writer http.ResponseWriter, request *http.Request) {
|
|
afterSuiteData := types.RemoteAfterSuiteData{
|
|
CanRun: true,
|
|
}
|
|
for i := 2; i <= server.parallelTotal; i++ {
|
|
afterSuiteData.CanRun = afterSuiteData.CanRun && !server.nodeIsAlive(i)
|
|
}
|
|
|
|
enc := json.NewEncoder(writer)
|
|
enc.Encode(afterSuiteData)
|
|
}
|
|
|
|
func (server *Server) handleCounter(writer http.ResponseWriter, request *http.Request) {
|
|
c := spec_iterator.Counter{}
|
|
server.lock.Lock()
|
|
c.Index = server.counter
|
|
server.counter = server.counter + 1
|
|
server.lock.Unlock()
|
|
|
|
json.NewEncoder(writer).Encode(c)
|
|
}
|
|
|
|
func (server *Server) handleHasCounter(writer http.ResponseWriter, request *http.Request) {
|
|
writer.Write([]byte(""))
|
|
}
|