Merge branch 'bugfix/issue-138'

Closes: #138

Co-authored-by: Roberto Abdelkader Martínez Pérez <robertomartinezp@gmail.com>
This commit is contained in:
pancho horrillo
2020-12-15 19:02:25 +01:00
8 changed files with 109 additions and 78 deletions
+2 -20
View File
@@ -25,7 +25,6 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/BBVA/kapow/internal/logger"
"github.com/BBVA/kapow/internal/server" "github.com/BBVA/kapow/internal/server"
) )
@@ -47,7 +46,7 @@ var ServerCmd = &cobra.Command{
sConf.ClientAuth, _ = cmd.Flags().GetBool("clientauth") sConf.ClientAuth, _ = cmd.Flags().GetBool("clientauth")
sConf.ClientCaFile, _ = cmd.Flags().GetString("clientcafile") sConf.ClientCaFile, _ = cmd.Flags().GetString("clientcafile")
debug, _ := cmd.Flags().GetBool("debug") sConf.Debug, _ = cmd.Flags().GetBool("debug")
// Set environment variables KAPOW_DATA_URL and KAPOW_CONTROL_URL only if they aren't set so we don't overwrite user's preferences // Set environment variables KAPOW_DATA_URL and KAPOW_CONTROL_URL only if they aren't set so we don't overwrite user's preferences
if _, exist := os.LookupEnv("KAPOW_DATA_URL"); !exist { if _, exist := os.LookupEnv("KAPOW_DATA_URL"); !exist {
@@ -57,10 +56,6 @@ var ServerCmd = &cobra.Command{
os.Setenv("KAPOW_CONTROL_URL", "http://"+sConf.ControlBindAddr) os.Setenv("KAPOW_CONTROL_URL", "http://"+sConf.ControlBindAddr)
} }
if debug {
logger.RegisterLogger(logger.SCRIPTS, nil)
}
server.StartServer(sConf) server.StartServer(sConf)
if len(args) > 0 { if len(args) > 0 {
@@ -83,11 +78,7 @@ var ServerCmd = &cobra.Command{
log.Printf("Done running powfile: %q\n", powfile) log.Printf("Done running powfile: %q\n", powfile)
} }
if debug { select {}
processLogs()
} else {
select {}
}
}, },
} }
@@ -123,12 +114,3 @@ func validateServerCommandArguments(cmd *cobra.Command, args []string) error {
return nil return nil
} }
func processLogs() {
for {
if !logger.ProcessMsg(logger.SCRIPTS) {
break
}
}
}
+2
View File
@@ -42,4 +42,6 @@ type Route struct {
// Index is this route position in the server's routes list. // Index is this route position in the server's routes list.
// It is an output field, its value is ignored as input. // It is an output field, its value is ignored as input.
Index int `json:"index"` Index int `json:"index"`
Debug bool `json:"debug"`
} }
+3 -2
View File
@@ -32,7 +32,8 @@ type ServerConfig struct {
CertFile, CertFile,
ClientCaFile string ClientCaFile string
ClientAuth bool ClientAuth,
Debug bool
} }
// StartServer Starts one instance of each server in a goroutine and remains listening on a channel for trace events generated by them // StartServer Starts one instance of each server in a goroutine and remains listening on a channel for trace events generated by them
@@ -41,7 +42,7 @@ func StartServer(config ServerConfig) {
wg.Add(3) wg.Add(3)
go control.Run(config.ControlBindAddr, &wg) go control.Run(config.ControlBindAddr, &wg)
go data.Run(config.DataBindAddr, &wg) go data.Run(config.DataBindAddr, &wg)
go user.Run(config.UserBindAddr, &wg, config.CertFile, config.KeyFile, config.ClientCaFile, config.ClientAuth) go user.Run(config.UserBindAddr, &wg, config.CertFile, config.KeyFile, config.ClientCaFile, config.ClientAuth, config.Debug)
// Wait for servers signals in order to return // Wait for servers signals in order to return
wg.Wait() wg.Wait()
+32 -17
View File
@@ -18,13 +18,13 @@ package mux
import ( import (
"bufio" "bufio"
"bytes"
"log" "log"
"net/http" "net/http"
"io"
"os"
"github.com/google/uuid" "github.com/google/uuid"
"github.com/BBVA/kapow/internal/logger"
"github.com/BBVA/kapow/internal/server/data" "github.com/BBVA/kapow/internal/server/data"
"github.com/BBVA/kapow/internal/server/model" "github.com/BBVA/kapow/internal/server/model"
"github.com/BBVA/kapow/internal/server/user/spawn" "github.com/BBVA/kapow/internal/server/user/spawn"
@@ -32,6 +32,7 @@ import (
var spawner = spawn.Spawn var spawner = spawn.Spawn
var idGenerator = uuid.NewUUID var idGenerator = uuid.NewUUID
var logHandler io.Writer = os.Stdout
func handlerBuilder(route model.Route) http.Handler { func handlerBuilder(route model.Route) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -51,29 +52,43 @@ func handlerBuilder(route model.Route) http.Handler {
data.Handlers.Add(h) data.Handlers.Add(h)
defer data.Handlers.Remove(h.ID) defer data.Handlers.Remove(h.ID)
stdOut := &bytes.Buffer{} if route.Debug {
stdErr := &bytes.Buffer{} var stdOutR, stdOutW *os.File
err = spawner(h, stdOut, stdErr) stdOutR, stdOutW, err = os.Pipe()
//err = spawner(h, nil) defer stdOutW.Close()
if err != nil {
log.Println(err)
return
}
var stdErrR, stdErrW *os.File
stdErrR, stdErrW, err = os.Pipe()
defer stdErrW.Close()
if err != nil {
log.Println(err)
return
}
go logStream(h.ID, "stdout", stdOutR)
go logStream(h.ID, "stderr", stdErrR)
err = spawner(h, stdOutW, stdErrW)
} else {
err = spawner(h, nil, nil)
}
if err != nil { if err != nil {
log.Println(err) log.Println(err)
} }
logger.SendMsg(logger.SCRIPTS, createLogMsg(h.ID, *stdOut, *stdErr))
}) })
} }
func createLogMsg(handlerId string, stdout, stderr bytes.Buffer) logger.LogMsg { func logStream(handlerId string, streamName string, stream *os.File) {
var messages []string defer stream.Close()
scanner := bufio.NewScanner(bytes.NewBuffer(stdout.Bytes())) execLog := log.New(logHandler, "", log.Ldate|log.Ltime|log.LUTC|log.Lmicroseconds)
scanner := bufio.NewScanner(stream)
for scanner.Scan() { for scanner.Scan() {
messages = append(messages, scanner.Text()) execLog.Printf("%s %s: %s", handlerId, streamName, scanner.Text())
} }
scanner = bufio.NewScanner(bytes.NewBuffer(stderr.Bytes()))
for scanner.Scan() {
messages = append(messages, scanner.Text())
}
return logger.LogMsg{Prefix: handlerId, Messages: messages}
} }
+54 -38
View File
@@ -1,3 +1,5 @@
// +build !race
/* /*
* Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A. * Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A.
* *
@@ -25,6 +27,7 @@ import (
"reflect" "reflect"
"strings" "strings"
"testing" "testing"
"time"
"github.com/google/uuid" "github.com/google/uuid"
@@ -199,50 +202,63 @@ func TestHandlerBuilderRemovesHandlerWhenDone(t *testing.T) {
} }
} }
func TestCreateLogMsgAdsPrefixInfo(t *testing.T) { func TestHandlerBuilderLogToLogHandlerWhenDebugIsEnabled(t *testing.T) {
expected := "FOO" data.Handlers = data.New()
route := model.Route{Debug: true}
var got string
msg := createLogMsg(expected, bytes.Buffer{}, bytes.Buffer{}) logHandler = new(bytes.Buffer)
if msg.Prefix != expected { spawner = func(h *model.Handler, out io.Writer, er io.Writer) error {
t.Errorf("LogMsg doesn't contain expected Prefix. Expected: %s, got: %s", expected, msg.Prefix) _, _ = out.Write([]byte("this is stdout"))
_, _ = er.Write([]byte("this is stderr"))
return nil
}
handlerBuilder(route).ServeHTTP(nil, nil)
// NOTE: logStream will write stdout and stderr contents eventually.
// We do not have any control the goroutines running logStream, thus we
// cannot use a synchronization primitive to wait for them. Sorry.
time.Sleep(1 * time.Second)
got = logHandler.(*bytes.Buffer).String()
if ! strings.Contains(got, "this is stdout") {
t.Errorf("Stdout not preserved. Actual: %+q", got)
}
if ! strings.Contains(got, "this is stderr") {
t.Errorf("Stderr not preserved. Actual: %+q", got)
} }
} }
func TestCreateLogMsgAdsStdOutInfo(t *testing.T) {
expected := "FOO\nBAR"
out := bytes.Buffer{}
out.WriteString(expected)
msg := createLogMsg("", out, bytes.Buffer{}) func TestHandlerBuilderDoesNotLogToLogHandlerWhenDebugIsDisabled(t *testing.T) {
data.Handlers = data.New()
route := model.Route{Debug: false}
if strings.Join(msg.Messages, "\n") != expected { logHandler = new(bytes.Buffer)
t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix)
} spawner = func(h *model.Handler, out io.Writer, er io.Writer) error {
} if out != nil {
_, _ = out.Write([]byte("this is stdout"))
func TestCreateLogMsgAdsStdErrInfo(t *testing.T) { }
expected := "FOO\nBAR" if er != nil {
err := bytes.Buffer{} _, _ = er.Write([]byte("this is stderr"))
err.WriteString(expected) }
msg := createLogMsg("", bytes.Buffer{}, err) return nil
}
if strings.Join(msg.Messages, "\n") != expected {
t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) handlerBuilder(route).ServeHTTP(nil, nil)
}
} // NOTE: logStream will write stdout and stderr contents eventually.
// We do not have any control the goroutines running logStream, thus we
func TestCreateLogMsgAdsStdOutAndStdErrInfo(t *testing.T) { // cannot use a synchronization primitive to wait for them. Sorry.
expected := "FOO\nBAR\nFOO BAZ" time.Sleep(1 * time.Second)
out := bytes.Buffer{}
out.WriteString("FOO\nBAR\n") size := logHandler.(*bytes.Buffer).Len()
err := bytes.Buffer{} if size != 0 {
err.WriteString("FOO BAZ") t.Error("Something was logged to stderr with debug=false")
msg := createLogMsg("", out, err)
if strings.Join(msg.Messages, "\n") != expected {
t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix)
} }
} }
+2
View File
@@ -27,6 +27,7 @@ import (
"time" "time"
"github.com/BBVA/kapow/internal/server/model" "github.com/BBVA/kapow/internal/server/model"
"github.com/BBVA/kapow/internal/server/user/spawn"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
@@ -227,6 +228,7 @@ func TestServeHTTPCallsInnerMuxAfterAcquiringLock(t *testing.T) {
} }
func TestUpdateUpdatesMuxWithProvideRouteList(t *testing.T) { func TestUpdateUpdatesMuxWithProvideRouteList(t *testing.T) {
spawner = spawn.Spawn
sm := New() sm := New()
rs := []model.Route{ rs := []model.Route{
{ {
+8 -1
View File
@@ -34,13 +34,20 @@ var Server = http.Server{
Handler: mux.New(), Handler: mux.New(),
} }
var DebugEndpoints bool
// Run finishes configuring Server and runs ListenAndServe on it // Run finishes configuring Server and runs ListenAndServe on it
func Run(bindAddr string, wg *sync.WaitGroup, certFile, keyFile, cliCaFile string, cliAuth bool) { func Run(bindAddr string, wg *sync.WaitGroup, certFile, keyFile, cliCaFile string, cliAuth, debug bool) {
Server = http.Server{ Server = http.Server{
Addr: bindAddr, Addr: bindAddr,
Handler: mux.New(), Handler: mux.New(),
} }
if debug {
Routes.SetDebug()
}
listener, err := net.Listen("tcp", bindAddr) listener, err := net.Listen("tcp", bindAddr)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
+6
View File
@@ -27,6 +27,7 @@ import (
type safeRouteList struct { type safeRouteList struct {
rs []model.Route rs []model.Route
m *sync.RWMutex m *sync.RWMutex
globalDebug bool
} }
var Routes safeRouteList = New() var Routes safeRouteList = New()
@@ -38,9 +39,14 @@ func New() safeRouteList {
} }
} }
func (srl *safeRouteList) SetDebug() {
srl.globalDebug = true
}
func (srl *safeRouteList) Append(r model.Route) model.Route { func (srl *safeRouteList) Append(r model.Route) model.Route {
srl.m.Lock() srl.m.Lock()
r.Index = len(srl.rs) r.Index = len(srl.rs)
r.Debug = srl.globalDebug || r.Debug
srl.rs = append(srl.rs, r) srl.rs = append(srl.rs, r)
srl.m.Unlock() srl.m.Unlock()