Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup test output #12

Merged
merged 2 commits into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions signalflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@ package signalflow
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"runtime"
"runtime/pprof"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -197,7 +194,7 @@ func TestReconnect(t *testing.T) {
resolution, _ = comp.Resolution(context.Background())
require.Equal(t, 1*time.Second, resolution)

log.Printf("%v", fakeBackend.received)
t.Log(fakeBackend.received)
require.Equal(t, []map[string]interface{}{
{
"type": "authenticate",
Expand Down Expand Up @@ -322,11 +319,6 @@ func TestReconnectAfterBackendDown(t *testing.T) {
}

func TestFailedConnGoroutineShutdown(t *testing.T) {
defer func() {
time.Sleep(2 * time.Second)
pprof.Lookup("goroutine").WriteTo(os.Stdout, 1)
}()

fakeBackend := NewRunningFakeBackend()
fakeBackend.Stop()

Expand Down
33 changes: 22 additions & 11 deletions signalflow/fake_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -48,6 +49,8 @@ type FakeBackend struct {
cancelFuncsByChannel map[string]context.CancelFunc
server *httptest.Server
handleIdx int

logger *log.Logger
}

func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -76,7 +79,7 @@ func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if err != nil {
log.Printf("Could not write message: %v", err)
f.logger.Printf("Could not write message: %v", err)
}
}
}()
Expand All @@ -85,20 +88,20 @@ func (f *FakeBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
_, message, err := c.ReadMessage()
if err != nil {
if !errors.Is(err, net.ErrClosed) {
log.Println("read err:", err)
f.logger.Println("read err:", err)
}
return
}

var in map[string]interface{}
if err := json.Unmarshal(message, &in); err != nil {
log.Println("error unmarshalling: ", err)
f.logger.Println("error unmarshalling: ", err)
}
f.received = append(f.received, in)

err = f.handleMessage(ctx, in, textMsgs, binMsgs)
if err != nil {
log.Printf("Error handling fake backend message, closing connection: %v", err)
f.logger.Printf("Error handling fake backend message, closing connection: %v", err)
return
}
}
Expand Down Expand Up @@ -161,7 +164,7 @@ func (f *FakeBackend) handleMessage(ctx context.Context, message map[string]inte
f.cancelFuncsByHandle[handle] = cancel
f.cancelFuncsByChannel[ch] = cancel

log.Printf("Executing SignalFlow program %s with tsids %v and handle %s", program, programTSIDs, handle)
f.logger.Printf("Executing SignalFlow program %s with tsids %v and handle %s", program, programTSIDs, handle)
f.runningJobsByProgram[program]++

var resolutionMs int
Expand Down Expand Up @@ -207,14 +210,14 @@ func (f *FakeBackend) handleMessage(ctx context.Context, message map[string]inte
if md := f.metadataByTSID[tsid]; md != nil {
propJSON, err := json.Marshal(md)
if err != nil {
log.Printf("Error serializing metadata to json: %v", err)
f.logger.Printf("Error serializing metadata to json: %v", err)
continue
}
textMsgs <- fmt.Sprintf(`{"type": "metadata", "tsId": "%s", "channel": "%s", "properties": %s}`, tsid, ch, propJSON)
}
}

log.Print("done sending metadata messages")
f.logger.Print("done sending metadata messages")

// Send data periodically until the connection is closed.
iterations := 0
Expand All @@ -223,7 +226,7 @@ func (f *FakeBackend) handleMessage(ctx context.Context, message map[string]inte
for {
select {
case <-execCtx.Done():
log.Printf("sending done")
f.logger.Printf("sending done")
f.Lock()
f.runningJobsByProgram[program]--
f.Unlock()
Expand All @@ -239,14 +242,14 @@ func (f *FakeBackend) handleMessage(ctx context.Context, message map[string]inte
f.Unlock()
metricTime := startMs + uint64(iterations*resolutionMs)
if stopMs != 0 && metricTime > stopMs {
log.Printf("sending channel end")
f.logger.Printf("sending channel end")
// tell the client the computation is complete
textMsgs <- fmt.Sprintf(`{"type": "control-message", "channel": "%s", "event": "END_OF_CHANNEL", "handle": "%s"}`, ch, handle)
return
}
log.Printf("sending data message")
f.logger.Printf("sending data message")
binMsgs <- makeDataMessage(ch, valsWithTSID, metricTime)
log.Printf("done sending data message")
f.logger.Printf("done sending data message")
iterations++
}
}
Expand Down Expand Up @@ -373,9 +376,17 @@ func (f *FakeBackend) RunningJobsForProgram(program string) int {
return f.runningJobsByProgram[program]
}

// SetLogger sets the internal logger.
func (f *FakeBackend) SetLogger(logger *log.Logger) {
f.Lock()
f.logger = logger
f.Unlock()
}

func NewRunningFakeBackend() *FakeBackend {
f := &FakeBackend{
AccessToken: "abcd",
logger: log.New(io.Discard, "", 0),
}
f.Start()
return f
Expand Down
Loading