From 0dd9cf07ab00389a9b204140e47c13c35286a210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Roberto=20Abdelkader=20Mart=C3=ADnez=20P=C3=A9rez?= Date: Wed, 18 Nov 2020 18:57:13 +0100 Subject: [PATCH 1/4] wip: create pipes to manage entrypoint stdout/stderr even after termination --- internal/server/user/mux/handlerbuilder.go | 40 +++++----- .../server/user/mux/handlerbuilder_test.go | 74 +++++++++---------- 2 files changed, 59 insertions(+), 55 deletions(-) diff --git a/internal/server/user/mux/handlerbuilder.go b/internal/server/user/mux/handlerbuilder.go index 55df210..bd5ca6d 100644 --- a/internal/server/user/mux/handlerbuilder.go +++ b/internal/server/user/mux/handlerbuilder.go @@ -18,13 +18,12 @@ package mux import ( "bufio" - "bytes" "log" "net/http" + "os" "github.com/google/uuid" - "github.com/BBVA/kapow/internal/logger" "github.com/BBVA/kapow/internal/server/data" "github.com/BBVA/kapow/internal/server/model" "github.com/BBVA/kapow/internal/server/user/spawn" @@ -51,29 +50,36 @@ func handlerBuilder(route model.Route) http.Handler { data.Handlers.Add(h) defer data.Handlers.Remove(h.ID) - stdOut := &bytes.Buffer{} - stdErr := &bytes.Buffer{} - err = spawner(h, stdOut, stdErr) - //err = spawner(h, nil) + stdOutR, stdOutW, err := os.Pipe() + defer stdOutW.Close() + if err != nil { + log.Println(err) + return + } + stdErrR, stdErrW, err := os.Pipe() + defer stdErrW.Close() + if err != nil { + log.Println(err) + return + } + + go logStream(h.ID, "stdout", stdOutR) + go logStream(h.ID, "stderr", stdErrR) + + err = spawner(h, stdOutW, stdErrW) if err != nil { log.Println(err) } - logger.SendMsg(logger.SCRIPTS, createLogMsg(h.ID, *stdOut, *stdErr)) }) } -func createLogMsg(handlerId string, stdout, stderr bytes.Buffer) logger.LogMsg { - var messages []string - scanner := bufio.NewScanner(bytes.NewBuffer(stdout.Bytes())) +func logStream(handlerId string, streamName string, stream *os.File) { + defer stream.Close() + execLog := log.New(os.Stdout, "", log.Ldate|log.Ltime|log.LUTC|log.Lmicroseconds) + scanner := bufio.NewScanner(stream) for scanner.Scan() { - messages = append(messages, scanner.Text()) + execLog.Printf("%s %s: %s", handlerId, streamName, scanner.Text()) } - scanner = bufio.NewScanner(bytes.NewBuffer(stderr.Bytes())) - for scanner.Scan() { - messages = append(messages, scanner.Text()) - } - - return logger.LogMsg{Prefix: handlerId, Messages: messages} } diff --git a/internal/server/user/mux/handlerbuilder_test.go b/internal/server/user/mux/handlerbuilder_test.go index 56f12a7..5f62361 100644 --- a/internal/server/user/mux/handlerbuilder_test.go +++ b/internal/server/user/mux/handlerbuilder_test.go @@ -17,13 +17,11 @@ package mux import ( - "bytes" "errors" "io" "net/http" "net/http/httptest" "reflect" - "strings" "testing" "github.com/google/uuid" @@ -199,50 +197,50 @@ func TestHandlerBuilderRemovesHandlerWhenDone(t *testing.T) { } } -func TestCreateLogMsgAdsPrefixInfo(t *testing.T) { - expected := "FOO" +// func TestCreateLogMsgAdsPrefixInfo(t *testing.T) { +// expected := "FOO" - msg := createLogMsg(expected, bytes.Buffer{}, bytes.Buffer{}) +// msg := createLogMsg(expected, bytes.Buffer{}, bytes.Buffer{}) - if msg.Prefix != expected { - t.Errorf("LogMsg doesn't contain expected Prefix. Expected: %s, got: %s", expected, msg.Prefix) - } -} +// if msg.Prefix != expected { +// t.Errorf("LogMsg doesn't contain expected Prefix. Expected: %s, got: %s", expected, msg.Prefix) +// } +// } -func TestCreateLogMsgAdsStdOutInfo(t *testing.T) { - expected := "FOO\nBAR" - out := bytes.Buffer{} - out.WriteString(expected) +// func TestCreateLogMsgAdsStdOutInfo(t *testing.T) { +// expected := "FOO\nBAR" +// out := bytes.Buffer{} +// out.WriteString(expected) - msg := createLogMsg("", out, bytes.Buffer{}) +// msg := createLogMsg("", out, bytes.Buffer{}) - if strings.Join(msg.Messages, "\n") != expected { - t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) - } -} +// if strings.Join(msg.Messages, "\n") != expected { +// t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) +// } +// } -func TestCreateLogMsgAdsStdErrInfo(t *testing.T) { - expected := "FOO\nBAR" - err := bytes.Buffer{} - err.WriteString(expected) +// func TestCreateLogMsgAdsStdErrInfo(t *testing.T) { +// expected := "FOO\nBAR" +// err := bytes.Buffer{} +// err.WriteString(expected) - msg := createLogMsg("", bytes.Buffer{}, err) +// msg := createLogMsg("", bytes.Buffer{}, err) - if strings.Join(msg.Messages, "\n") != expected { - t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) - } -} +// if strings.Join(msg.Messages, "\n") != expected { +// t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) +// } +// } -func TestCreateLogMsgAdsStdOutAndStdErrInfo(t *testing.T) { - expected := "FOO\nBAR\nFOO BAZ" - out := bytes.Buffer{} - out.WriteString("FOO\nBAR\n") - err := bytes.Buffer{} - err.WriteString("FOO BAZ") +// func TestCreateLogMsgAdsStdOutAndStdErrInfo(t *testing.T) { +// expected := "FOO\nBAR\nFOO BAZ" +// out := bytes.Buffer{} +// out.WriteString("FOO\nBAR\n") +// err := bytes.Buffer{} +// err.WriteString("FOO BAZ") - msg := createLogMsg("", out, err) +// msg := createLogMsg("", out, err) - if strings.Join(msg.Messages, "\n") != expected { - t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) - } -} +// if strings.Join(msg.Messages, "\n") != expected { +// t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) +// } +// } From 6ec9b54670b6a7b388e840fc3e917004a1e09eb5 Mon Sep 17 00:00:00 2001 From: pancho horrillo Date: Tue, 15 Dec 2020 17:15:00 +0100 Subject: [PATCH 2/4] fix: add missing spawner reset code to test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Roberto Abdelkader Martínez Pérez --- internal/server/user/mux/mux_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/server/user/mux/mux_test.go b/internal/server/user/mux/mux_test.go index 75c3d96..1f96507 100644 --- a/internal/server/user/mux/mux_test.go +++ b/internal/server/user/mux/mux_test.go @@ -27,6 +27,7 @@ import ( "time" "github.com/BBVA/kapow/internal/server/model" + "github.com/BBVA/kapow/internal/server/user/spawn" "github.com/gorilla/mux" ) @@ -227,6 +228,7 @@ func TestServeHTTPCallsInnerMuxAfterAcquiringLock(t *testing.T) { } func TestUpdateUpdatesMuxWithProvideRouteList(t *testing.T) { + spawner = spawn.Spawn sm := New() rs := []model.Route{ { From 58fae53e2e1477232b0574ba85fe1de19a45263c Mon Sep 17 00:00:00 2001 From: pancho horrillo Date: Tue, 15 Dec 2020 17:17:17 +0100 Subject: [PATCH 3/4] test: logStream() behaves correctly like a good kiddo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Roberto Abdelkader Martínez Pérez --- internal/server/user/mux/handlerbuilder.go | 4 +- .../server/user/mux/handlerbuilder_test.go | 67 +++++++------------ 2 files changed, 29 insertions(+), 42 deletions(-) diff --git a/internal/server/user/mux/handlerbuilder.go b/internal/server/user/mux/handlerbuilder.go index bd5ca6d..79f68a3 100644 --- a/internal/server/user/mux/handlerbuilder.go +++ b/internal/server/user/mux/handlerbuilder.go @@ -20,6 +20,7 @@ import ( "bufio" "log" "net/http" + "io" "os" "github.com/google/uuid" @@ -31,6 +32,7 @@ import ( var spawner = spawn.Spawn var idGenerator = uuid.NewUUID +var logHandler io.Writer = os.Stdout func handlerBuilder(route model.Route) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -77,7 +79,7 @@ func handlerBuilder(route model.Route) http.Handler { func logStream(handlerId string, streamName string, stream *os.File) { defer stream.Close() - execLog := log.New(os.Stdout, "", log.Ldate|log.Ltime|log.LUTC|log.Lmicroseconds) + execLog := log.New(logHandler, "", log.Ldate|log.Ltime|log.LUTC|log.Lmicroseconds) scanner := bufio.NewScanner(stream) for scanner.Scan() { execLog.Printf("%s %s: %s", handlerId, streamName, scanner.Text()) diff --git a/internal/server/user/mux/handlerbuilder_test.go b/internal/server/user/mux/handlerbuilder_test.go index 5f62361..35851c7 100644 --- a/internal/server/user/mux/handlerbuilder_test.go +++ b/internal/server/user/mux/handlerbuilder_test.go @@ -17,12 +17,15 @@ package mux import ( + "bytes" "errors" "io" "net/http" "net/http/httptest" "reflect" + "strings" "testing" + "time" "github.com/google/uuid" @@ -197,50 +200,32 @@ func TestHandlerBuilderRemovesHandlerWhenDone(t *testing.T) { } } -// func TestCreateLogMsgAdsPrefixInfo(t *testing.T) { -// expected := "FOO" +func TestHandlerBuilderLogToLogHandler(t *testing.T) { + data.Handlers = data.New() + route := model.Route{} + var got string -// msg := createLogMsg(expected, bytes.Buffer{}, bytes.Buffer{}) + logHandler = new(bytes.Buffer) -// if msg.Prefix != expected { -// t.Errorf("LogMsg doesn't contain expected Prefix. Expected: %s, got: %s", expected, msg.Prefix) -// } -// } + spawner = func(h *model.Handler, out io.Writer, er io.Writer) error { + out.Write([]byte("this is stdout")) + er.Write([]byte("this is stderr")) -// func TestCreateLogMsgAdsStdOutInfo(t *testing.T) { -// expected := "FOO\nBAR" -// out := bytes.Buffer{} -// out.WriteString(expected) + return nil + } -// msg := createLogMsg("", out, bytes.Buffer{}) + handlerBuilder(route).ServeHTTP(nil, nil) -// if strings.Join(msg.Messages, "\n") != expected { -// t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) -// } -// } + // NOTE: logStream will write stdout and stderr contents eventually. + // We do not have any control the goroutines running logStream, thus we + // cannot use a synchronization primitive to wait for them. Sorry. + time.Sleep(1 * time.Second) -// func TestCreateLogMsgAdsStdErrInfo(t *testing.T) { -// expected := "FOO\nBAR" -// err := bytes.Buffer{} -// err.WriteString(expected) - -// msg := createLogMsg("", bytes.Buffer{}, err) - -// if strings.Join(msg.Messages, "\n") != expected { -// t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) -// } -// } - -// func TestCreateLogMsgAdsStdOutAndStdErrInfo(t *testing.T) { -// expected := "FOO\nBAR\nFOO BAZ" -// out := bytes.Buffer{} -// out.WriteString("FOO\nBAR\n") -// err := bytes.Buffer{} -// err.WriteString("FOO BAZ") - -// msg := createLogMsg("", out, err) - -// if strings.Join(msg.Messages, "\n") != expected { -// t.Errorf("LogMsg doesn't contain expected payload. Expected: %s, got: %s", expected, msg.Prefix) -// } -// } + got = logHandler.(*bytes.Buffer).String() + if ! strings.Contains(got, "this is stdout") { + t.Errorf("Stdout not preserved. Actual: %+q", got) + } + if ! strings.Contains(got, "this is stderr") { + t.Errorf("Stderr not preserved. Actual: %+q", got) + } +} From 4546fc65b657e9052226fb698b86e12a2c400bd9 Mon Sep 17 00:00:00 2001 From: pancho horrillo Date: Tue, 15 Dec 2020 18:34:45 +0100 Subject: [PATCH 4/4] fix: honor global flag --debug MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Roberto Abdelkader Martínez Pérez --- internal/cmd/server.go | 22 +--------- internal/server/model/route.go | 2 + internal/server/server.go | 5 ++- internal/server/user/mux/handlerbuilder.go | 37 ++++++++++------- .../server/user/mux/handlerbuilder_test.go | 41 +++++++++++++++++-- internal/server/user/server.go | 9 +++- internal/server/user/state.go | 6 +++ 7 files changed, 80 insertions(+), 42 deletions(-) diff --git a/internal/cmd/server.go b/internal/cmd/server.go index ea6866e..ccbd8ae 100644 --- a/internal/cmd/server.go +++ b/internal/cmd/server.go @@ -25,7 +25,6 @@ import ( "github.com/spf13/cobra" - "github.com/BBVA/kapow/internal/logger" "github.com/BBVA/kapow/internal/server" ) @@ -47,7 +46,7 @@ var ServerCmd = &cobra.Command{ sConf.ClientAuth, _ = cmd.Flags().GetBool("clientauth") sConf.ClientCaFile, _ = cmd.Flags().GetString("clientcafile") - debug, _ := cmd.Flags().GetBool("debug") + sConf.Debug, _ = cmd.Flags().GetBool("debug") // Set environment variables KAPOW_DATA_URL and KAPOW_CONTROL_URL only if they aren't set so we don't overwrite user's preferences if _, exist := os.LookupEnv("KAPOW_DATA_URL"); !exist { @@ -57,10 +56,6 @@ var ServerCmd = &cobra.Command{ os.Setenv("KAPOW_CONTROL_URL", "http://"+sConf.ControlBindAddr) } - if debug { - logger.RegisterLogger(logger.SCRIPTS, nil) - } - server.StartServer(sConf) if len(args) > 0 { @@ -83,11 +78,7 @@ var ServerCmd = &cobra.Command{ log.Printf("Done running powfile: %q\n", powfile) } - if debug { - processLogs() - } else { - select {} - } + select {} }, } @@ -123,12 +114,3 @@ func validateServerCommandArguments(cmd *cobra.Command, args []string) error { return nil } - -func processLogs() { - - for { - if !logger.ProcessMsg(logger.SCRIPTS) { - break - } - } -} diff --git a/internal/server/model/route.go b/internal/server/model/route.go index f630dce..53b9ed5 100644 --- a/internal/server/model/route.go +++ b/internal/server/model/route.go @@ -42,4 +42,6 @@ type Route struct { // Index is this route position in the server's routes list. // It is an output field, its value is ignored as input. Index int `json:"index"` + + Debug bool `json:"debug"` } diff --git a/internal/server/server.go b/internal/server/server.go index e4afe38..ea0d6ca 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -32,7 +32,8 @@ type ServerConfig struct { CertFile, ClientCaFile string - ClientAuth bool + ClientAuth, + Debug bool } // StartServer Starts one instance of each server in a goroutine and remains listening on a channel for trace events generated by them @@ -41,7 +42,7 @@ func StartServer(config ServerConfig) { wg.Add(3) go control.Run(config.ControlBindAddr, &wg) go data.Run(config.DataBindAddr, &wg) - go user.Run(config.UserBindAddr, &wg, config.CertFile, config.KeyFile, config.ClientCaFile, config.ClientAuth) + go user.Run(config.UserBindAddr, &wg, config.CertFile, config.KeyFile, config.ClientCaFile, config.ClientAuth, config.Debug) // Wait for servers signals in order to return wg.Wait() diff --git a/internal/server/user/mux/handlerbuilder.go b/internal/server/user/mux/handlerbuilder.go index 79f68a3..8c327fd 100644 --- a/internal/server/user/mux/handlerbuilder.go +++ b/internal/server/user/mux/handlerbuilder.go @@ -52,23 +52,30 @@ func handlerBuilder(route model.Route) http.Handler { data.Handlers.Add(h) defer data.Handlers.Remove(h.ID) - stdOutR, stdOutW, err := os.Pipe() - defer stdOutW.Close() - if err != nil { - log.Println(err) - return - } - stdErrR, stdErrW, err := os.Pipe() - defer stdErrW.Close() - if err != nil { - log.Println(err) - return + if route.Debug { + var stdOutR, stdOutW *os.File + stdOutR, stdOutW, err = os.Pipe() + defer stdOutW.Close() + if err != nil { + log.Println(err) + return + } + var stdErrR, stdErrW *os.File + stdErrR, stdErrW, err = os.Pipe() + defer stdErrW.Close() + if err != nil { + log.Println(err) + return + } + + go logStream(h.ID, "stdout", stdOutR) + go logStream(h.ID, "stderr", stdErrR) + + err = spawner(h, stdOutW, stdErrW) + } else { + err = spawner(h, nil, nil) } - go logStream(h.ID, "stdout", stdOutR) - go logStream(h.ID, "stderr", stdErrR) - - err = spawner(h, stdOutW, stdErrW) if err != nil { log.Println(err) diff --git a/internal/server/user/mux/handlerbuilder_test.go b/internal/server/user/mux/handlerbuilder_test.go index 35851c7..ee7b03d 100644 --- a/internal/server/user/mux/handlerbuilder_test.go +++ b/internal/server/user/mux/handlerbuilder_test.go @@ -1,3 +1,5 @@ +// +build !race + /* * Copyright 2019 Banco Bilbao Vizcaya Argentaria, S.A. * @@ -200,16 +202,16 @@ func TestHandlerBuilderRemovesHandlerWhenDone(t *testing.T) { } } -func TestHandlerBuilderLogToLogHandler(t *testing.T) { +func TestHandlerBuilderLogToLogHandlerWhenDebugIsEnabled(t *testing.T) { data.Handlers = data.New() - route := model.Route{} + route := model.Route{Debug: true} var got string logHandler = new(bytes.Buffer) spawner = func(h *model.Handler, out io.Writer, er io.Writer) error { - out.Write([]byte("this is stdout")) - er.Write([]byte("this is stderr")) + _, _ = out.Write([]byte("this is stdout")) + _, _ = er.Write([]byte("this is stderr")) return nil } @@ -229,3 +231,34 @@ func TestHandlerBuilderLogToLogHandler(t *testing.T) { t.Errorf("Stderr not preserved. Actual: %+q", got) } } + + +func TestHandlerBuilderDoesNotLogToLogHandlerWhenDebugIsDisabled(t *testing.T) { + data.Handlers = data.New() + route := model.Route{Debug: false} + + logHandler = new(bytes.Buffer) + + spawner = func(h *model.Handler, out io.Writer, er io.Writer) error { + if out != nil { + _, _ = out.Write([]byte("this is stdout")) + } + if er != nil { + _, _ = er.Write([]byte("this is stderr")) + } + + return nil + } + + handlerBuilder(route).ServeHTTP(nil, nil) + + // NOTE: logStream will write stdout and stderr contents eventually. + // We do not have any control the goroutines running logStream, thus we + // cannot use a synchronization primitive to wait for them. Sorry. + time.Sleep(1 * time.Second) + + size := logHandler.(*bytes.Buffer).Len() + if size != 0 { + t.Error("Something was logged to stderr with debug=false") + } +} diff --git a/internal/server/user/server.go b/internal/server/user/server.go index 980fac5..e8fc3d4 100644 --- a/internal/server/user/server.go +++ b/internal/server/user/server.go @@ -34,13 +34,20 @@ var Server = http.Server{ Handler: mux.New(), } +var DebugEndpoints bool + // Run finishes configuring Server and runs ListenAndServe on it -func Run(bindAddr string, wg *sync.WaitGroup, certFile, keyFile, cliCaFile string, cliAuth bool) { +func Run(bindAddr string, wg *sync.WaitGroup, certFile, keyFile, cliCaFile string, cliAuth, debug bool) { + Server = http.Server{ Addr: bindAddr, Handler: mux.New(), } + if debug { + Routes.SetDebug() + } + listener, err := net.Listen("tcp", bindAddr) if err != nil { log.Fatal(err) diff --git a/internal/server/user/state.go b/internal/server/user/state.go index ed240b5..d924cc9 100644 --- a/internal/server/user/state.go +++ b/internal/server/user/state.go @@ -27,6 +27,7 @@ import ( type safeRouteList struct { rs []model.Route m *sync.RWMutex + globalDebug bool } var Routes safeRouteList = New() @@ -38,9 +39,14 @@ func New() safeRouteList { } } +func (srl *safeRouteList) SetDebug() { + srl.globalDebug = true +} + func (srl *safeRouteList) Append(r model.Route) model.Route { srl.m.Lock() r.Index = len(srl.rs) + r.Debug = srl.globalDebug || r.Debug srl.rs = append(srl.rs, r) srl.m.Unlock()