From 57b532da9246b3d08bd8c7e68d547429131962bb Mon Sep 17 00:00:00 2001 From: Sedky Date: Tue, 29 Oct 2024 06:40:00 -0600 Subject: [PATCH] Remove canceling during context cancellations. --- pumps/stdout.go | 35 ++++++++++++----------- pumps/stdout_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 86 insertions(+), 17 deletions(-) create mode 100644 pumps/stdout_test.go diff --git a/pumps/stdout.go b/pumps/stdout.go index d8e664266..57b21de04 100644 --- a/pumps/stdout.go +++ b/pumps/stdout.go @@ -74,28 +74,29 @@ func (s *StdOutPump) Init(config interface{}) error { func (s *StdOutPump) WriteData(ctx context.Context, data []interface{}) error { s.log.Debug("Attempting to write ", len(data), " records...") - //Data is all the analytics being written for _, v := range data { + decoded, ok := v.(analytics.AnalyticsRecord) + if !ok { + s.log.Error("Failed to decode analytics record") + continue + } - select { - case <-ctx.Done(): - return nil - default: - decoded := v.(analytics.AnalyticsRecord) - - if s.conf.Format == "json" { - formatter := &logrus.JSONFormatter{} - entry := log.WithField(s.conf.LogFieldName, decoded) - entry.Level = logrus.InfoLevel - entry.Time = time.Now().UTC() - data, _ := formatter.Format(entry) - fmt.Print(string(data)) - } else { - s.log.WithField(s.conf.LogFieldName, decoded).Info() + if s.conf.Format == "json" { + formatter := &logrus.JSONFormatter{} + entry := log.WithField(s.conf.LogFieldName, decoded) + entry.Level = logrus.InfoLevel + entry.Time = time.Now().UTC() + data, err := formatter.Format(entry) + if err != nil { + s.log.Error("Failed to format record: ", err) + continue } - + fmt.Print(string(data)) + } else { + s.log.WithField(s.conf.LogFieldName, decoded).Info() } } + s.log.Info("Purged ", len(data), " records...") return nil diff --git a/pumps/stdout_test.go b/pumps/stdout_test.go new file mode 100644 index 000000000..b0f0eb06e --- /dev/null +++ b/pumps/stdout_test.go @@ -0,0 +1,68 @@ +package pumps + +import ( + "bytes" + "context" + "fmt" + "strings" + "testing" + + "github.com/TykTechnologies/tyk-pump/analytics" + "github.com/sirupsen/logrus" +) + +// Previous implementation was canceling the writes if the context was cancelled. +// This test ensures that the pump will continue to write data even if the context is cancelled. +func TestStdOutPump_WriteData_ContextCancellation(t *testing.T) { + // Setup pump + pump := &StdOutPump{ + conf: &StdOutConf{ + LogFieldName: "test-analytics", + Format: "json", + }, + } + + // Setup logger + logger := logrus.New() + logger.SetLevel(logrus.DebugLevel) + pump.log = logger.WithField("prefix", "test") + + // Create many records to test with + data := make([]interface{}, 100) + for i := range data { + data[i] = analytics.AnalyticsRecord{ + Path: fmt.Sprintf("/test/%d", i), + Method: "GET", + } + } + + // Create an already cancelled context + // && cancel immediately + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + + // Capture logger output + var buf bytes.Buffer + pump.log.Logger.SetOutput(&buf) + oldOut := pump.log.Logger.Out + defer pump.log.Logger.SetOutput(oldOut) // restore original output when done + + err := pump.WriteData(ctx, data) + + output := buf.String() + + if err != nil { + t.Errorf("Expected no error, wanted the Pump to finish purging despite context cancellation, got %v", err) + } + + // Verify the output contains the expected message + attemptMsg := "Attempting to write 100 records" + if !strings.Contains(output, attemptMsg) { + t.Errorf("Expected output does not contain '%s'", attemptMsg) + } + purgeMsg := "Purged 100 records..." + if !strings.Contains(output, purgeMsg) { + t.Errorf("Expected output does not contain '%s'", purgeMsg) + } +} \ No newline at end of file