diff --git a/internal/cmd/server.go b/internal/cmd/server.go index ea6866e..ccbd8ae 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -25,7 +25,6 @@ import ( "github.com/spf13/cobra" - "github.com/BBVA/kapow/internal/logger" "github.com/BBVA/kapow/internal/server" ) @@ -47,7 +46,7 @@ var ServerCmd = &cobra.Command{ sConf.ClientAuth, _ = cmd.Flags().GetBool("clientauth") sConf.ClientCaFile, _ = cmd.Flags().GetString("clientcafile") - debug, _ := cmd.Flags().GetBool("debug") + sConf.Debug, _ = cmd.Flags().GetBool("debug") // Set environment variables KAPOW_DATA_URL and KAPOW_CONTROL_URL only if they aren't set so we don't overwrite user's preferences if _, exist := os.LookupEnv("KAPOW_DATA_URL"); !exist { @@ -57,10 +56,6 @@ var ServerCmd = &cobra.Command{ os.Setenv("KAPOW_CONTROL_URL", "http://"+sConf.ControlBindAddr) } - if debug { - logger.RegisterLogger(logger.SCRIPTS, nil) - } - server.StartServer(sConf) if len(args) > 0 { @@ -83,11 +78,7 @@ var ServerCmd = &cobra.Command{ log.Printf("Done running powfile: %q\n", powfile) } - if debug { - processLogs() - } else { - select {} - } + select {} }, } @@ -123,12 +114,3 @@ func validateServerCommandArguments(cmd *cobra.Command, args []string) error { return nil } - -func processLogs() { - - for { - if !logger.ProcessMsg(logger.SCRIPTS) { - break - } - } -} diff --git a/internal/server/model/route.go b/internal/server/model/route.go index f630dce..53b9ed5 100644 --- a/internal/server/model/route.go +++ b/internal/server/model/route.go @@ -42,4 +42,6 @@ type Route struct { // Index is this route position in the server's routes list. // It is an output field, its value is ignored as input. Index int `json:"index"` + + Debug bool `json:"debug"` } diff --git a/internal/server/server.go b/internal/server/server.go index e4afe38..ea0d6ca 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -32,7 +32,8 @@ type ServerConfig struct { CertFile, ClientCaFile string - ClientAuth bool + ClientAuth, + Debug bool } // StartServer Starts one instance of each server in a goroutine and remains listening on a channel for trace events generated by them @@ -41,7 +42,7 @@ func StartServer(config ServerConfig) { wg.Add(3) go control.Run(config.ControlBindAddr, &wg) go data.Run(config.DataBindAddr, &wg) - go user.Run(config.UserBindAddr, &wg, config.CertFile, config.KeyFile, config.ClientCaFile, config.ClientAuth) + go user.Run(config.UserBindAddr, &wg, config.CertFile, config.KeyFile, config.ClientCaFile, config.ClientAuth, config.Debug) // Wait for servers signals in order to return wg.Wait() diff --git a/internal/server/user/mux/handlerbuilder.go b/internal/server/user/mux/handlerbuilder.go index 55df210..8c327fd 100644 --- a/internal/server/user/mux/handlerbuilder.go +++ b/internal/server/user/mux/handlerbuilder.go @@ -18,13 +18,13 @@ package mux import ( "bufio" - "bytes" "log" "net/http" + "io" + "os" "github.com/google/uuid" - "github.com/BBVA/kapow/internal/logger" "github.com/BBVA/kapow/internal/server/data" "github.com/BBVA/kapow/internal/server/model" "github.com/BBVA/kapow/internal/server/user/spawn" @@ -32,6 +32,7 @@ import ( var spawner = spawn.Spawn var idGenerator = uuid.NewUUID +var logHandler io.Writer = os.Stdout func handlerBuilder(route model.Route) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -51,29 +52,43 @@ func handlerBuilder(route model.Route) http.Handler { data.Handlers.Add(h) defer data.Handlers.Remove(h.ID) - stdOut := &bytes.Buffer{} - stdErr := &bytes.Buffer{} - err = spawner(h, stdOut, stdErr) - //err = spawner(h, nil) + if route.Debug { + var stdOutR, stdOutW *os.File + stdOutR, stdOutW, err = os.Pipe() + defer stdOutW.Close() + if err != nil { + log.Println(err) + return + } + var stdErrR, stdErrW *os.File + stdErrR, stdErrW, err = os.Pipe() + defer stdErrW.Close() + if err != nil { + log.Println(err) + return + } + + go logStream(h.ID, "stdout", stdOutR) + go logStream(h.ID, "stderr", stdErrR) + + err = spawner(h, stdOutW, stdErrW) + } else { + err = spawner(h, nil, nil) + } + if err != nil { log.Println(err) } - logger.SendMsg(logger.SCRIPTS, createLogMsg(h.ID, *stdOut, *stdErr)) }) } -func createLogMsg(handlerId string, stdout, stderr bytes.Buffer) logger.LogMsg { - var messages []string - scanner := bufio.NewScanner(bytes.NewBuffer(stdout.Bytes())) +func logStream(handlerId string, streamName string, stream *os.File) { + defer stream.Close() + execLog := log.New(logHandler, "", log.Ldate|log.Ltime|log.LUTC|log.Lmicroseconds) + scanner := bufio.NewScanner(stream) for scanner.Scan() { - messages = append(messages, scanner.Text()) + execLog.Printf("%s %s: %s", handlerId, streamName, scanner.Text()) } - scanner = bufio.NewScanner(bytes.NewBuffer(stderr.Bytes())) - for scanner.Scan() { - messages = append(messages, scanner.Text()) - } - - return logger.LogMsg{Prefix: handlerId, Messages: messages} } diff --git a/internal/server/user/mux/handlerbuilder_test.go b/internal/server/user/mux/handlerbuilder_test.go index 56f12a7..ee7b03d 100644 --- a/internal/server/user/mux/handlerbuilder_test.go +++ b/internal/server/user/mux/handlerbuilder_test.go @@ -1,3 +1,5 @@ +// +build !race + /* * Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A. * @@ -25,6 +27,7 @@ import ( "reflect" "strings" "testing" + "time" "github.com/google/uuid" @@ -199,50 +202,63 @@ func TestHandlerBuilderRemovesHandlerWhenDone(t *testing.T) { } } -func TestCreateLogMsgAdsPrefixInfo(t *testing.T) { - expected := "FOO" +func TestHandlerBuilderLogToLogHandlerWhenDebugIsEnabled(t *testing.T) { + data.Handlers = data.New() + route := model.Route{Debug: true} + var got string - msg := createLogMsg(expected, bytes.Buffer{}, bytes.Buffer{}) + logHandler = new(bytes.Buffer) - if msg.Prefix != expected { - t.Errorf("LogMsg doesn't contain expected Prefix. Expected: %s, got: %s", expected, msg.Prefix) + spawner = func(h *model.Handler, out io.Writer, er io.Writer) error { + _, _ = out.Write([]byte("this is stdout")) + _, _ = er.Write([]byte("this is stderr")) + + return nil + } + + handlerBuilder(route).ServeHTTP(nil, nil) + + // NOTE: logStream will write stdout and stderr contents eventually. + // We do not have any control the goroutines running logStream, thus we + // cannot use a synchronization primitive to wait for them. Sorry. + time.Sleep(1 * time.Second) + + got = logHandler.(*bytes.Buffer).String() + if ! strings.Contains(got, "this is stdout") { + t.Errorf("Stdout not preserved. Actual: %+q", got) + } + if ! strings.Contains(got, "this is stderr") { + t.Errorf("Stderr not preserved. Actual: %+q", got) } } -func TestCreateLogMsgAdsStdOutInfo(t *testing.T) { - expected := "FOO\nBAR" - out := bytes.Buffer{} - out.WriteString(expected) - msg := createLogMsg("", out, bytes.Buffer{}) +func TestHandlerBuilderDoesNotLogToLogHandlerWhenDebugIsDisabled(t *testing.T) { + data.Handlers = data.New() + route := model.Route{Debug: false} - if strings.Join(msg.Messages, "\n") != expected { - t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) - } -} - -func TestCreateLogMsgAdsStdErrInfo(t *testing.T) { - expected := "FOO\nBAR" - err := bytes.Buffer{} - err.WriteString(expected) - - msg := createLogMsg("", bytes.Buffer{}, err) - - if strings.Join(msg.Messages, "\n") != expected { - t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) - } -} - -func TestCreateLogMsgAdsStdOutAndStdErrInfo(t *testing.T) { - expected := "FOO\nBAR\nFOO BAZ" - out := bytes.Buffer{} - out.WriteString("FOO\nBAR\n") - err := bytes.Buffer{} - err.WriteString("FOO BAZ") - - msg := createLogMsg("", out, err) - - if strings.Join(msg.Messages, "\n") != expected { - t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) + logHandler = new(bytes.Buffer) + + spawner = func(h *model.Handler, out io.Writer, er io.Writer) error { + if out != nil { + _, _ = out.Write([]byte("this is stdout")) + } + if er != nil { + _, _ = er.Write([]byte("this is stderr")) + } + + return nil + } + + handlerBuilder(route).ServeHTTP(nil, nil) + + // NOTE: logStream will write stdout and stderr contents eventually. + // We do not have any control the goroutines running logStream, thus we + // cannot use a synchronization primitive to wait for them. Sorry. + time.Sleep(1 * time.Second) + + size := logHandler.(*bytes.Buffer).Len() + if size != 0 { + t.Error("Something was logged to stderr with debug=false") } } diff --git a/internal/server/user/mux/mux_test.go b/internal/server/user/mux/mux_test.go index 75c3d96..1f96507 100644 --- a/internal/server/user/mux/mux_test.go +++ b/internal/server/user/mux/mux_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/BBVA/kapow/internal/server/model" + "github.com/BBVA/kapow/internal/server/user/spawn" "github.com/gorilla/mux" ) @@ -227,6 +228,7 @@ func TestServeHTTPCallsInnerMuxAfterAcquiringLock(t *testing.T) { } func TestUpdateUpdatesMuxWithProvideRouteList(t *testing.T) { + spawner = spawn.Spawn sm := New() rs := []model.Route{ { diff --git a/internal/server/user/server.go b/internal/server/user/server.go index 980fac5..e8fc3d4 100644 --- a/internal/server/user/server.go +++ b/internal/server/user/server.go @@ -34,13 +34,20 @@ var Server = http.Server{ Handler: mux.New(), } +var DebugEndpoints bool + // Run finishes configuring Server and runs ListenAndServe on it -func Run(bindAddr string, wg *sync.WaitGroup, certFile, keyFile, cliCaFile string, cliAuth bool) { +func Run(bindAddr string, wg *sync.WaitGroup, certFile, keyFile, cliCaFile string, cliAuth, debug bool) { + Server = http.Server{ Addr: bindAddr, Handler: mux.New(), } + if debug { + Routes.SetDebug() + } + listener, err := net.Listen("tcp", bindAddr) if err != nil { log.Fatal(err) diff --git a/internal/server/user/state.go b/internal/server/user/state.go index ed240b5..d924cc9 100644 --- a/internal/server/user/state.go +++ b/internal/server/user/state.go @@ -27,6 +27,7 @@ import ( type safeRouteList struct { rs []model.Route m *sync.RWMutex + globalDebug bool } var Routes safeRouteList = New() @@ -38,9 +39,14 @@ func New() safeRouteList { } } +func (srl *safeRouteList) SetDebug() { + srl.globalDebug = true +} + func (srl *safeRouteList) Append(r model.Route) model.Route { srl.m.Lock() r.Index = len(srl.rs) + r.Debug = srl.globalDebug || r.Debug srl.rs = append(srl.rs, r) srl.m.Unlock()