From 1d5c5e2361c78d740d8d3cae97639aadf71d6706 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Mon, 24 Jun 2024 15:48:29 +0200 Subject: [PATCH] fix: LogStreams create and close their own lines channels. This makes it easier to observe logstream state externally because we can read the lines channel until it closes. The tests are changed slightly to asynchronously drain the output channel at the start of the stream. Fix some naming issues and log formatting. --- internal/tailer/logstream/dgramstream.go | 86 ++++++------ .../tailer/logstream/dgramstream_unix_test.go | 36 ++--- internal/tailer/logstream/filestream.go | 14 +- internal/tailer/logstream/filestream_test.go | 126 +++++++++--------- .../tailer/logstream/filestream_unix_test.go | 40 +++--- internal/tailer/logstream/logstream.go | 30 ++--- internal/tailer/logstream/logstream_test.go | 4 +- .../tailer/logstream/logstream_unix_test.go | 2 +- internal/tailer/logstream/pipestream.go | 16 ++- .../tailer/logstream/pipestream_unix_test.go | 59 ++++---- internal/tailer/logstream/socketstream.go | 23 +++- .../logstream/socketstream_unix_test.go | 34 +++-- internal/tailer/tail.go | 10 +- internal/testutil/lines.go | 22 +++ 14 files changed, 278 insertions(+), 224 deletions(-) diff --git a/internal/tailer/logstream/dgramstream.go b/internal/tailer/logstream/dgramstream.go index acce398f6..9338d2a3f 100644 --- a/internal/tailer/logstream/dgramstream.go +++ b/internal/tailer/logstream/dgramstream.go @@ -18,7 +18,7 @@ import ( type dgramStream struct { cancel context.CancelFunc - lines chan<- *logline.LogLine + lines chan *logline.LogLine scheme string // Datagram scheme, either "unixgram" or "udp". address string // Given name for the underlying socket path on the filesystem or hostport. @@ -28,34 +28,34 @@ type dgramStream struct { lastReadTime time.Time // Last time a log line was read from this named pipe } -func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { +func newDgramStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) { if address == "" { return nil, ErrEmptySocketAddress } ctx, cancel := context.WithCancel(ctx) - ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} + ss := &dgramStream{cancel: cancel, scheme: scheme, address: address, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)} if err := ss.stream(ctx, wg, waker, oneShot); err != nil { return nil, err } return ss, nil } -func (ss *dgramStream) LastReadTime() time.Time { - ss.mu.RLock() - defer ss.mu.RUnlock() - return ss.lastReadTime +func (ds *dgramStream) LastReadTime() time.Time { + ds.mu.RLock() + defer ds.mu.RUnlock() + return ds.lastReadTime } // The read buffer size for datagrams. const datagramReadBufferSize = 131072 -func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error { - c, err := net.ListenPacket(ss.scheme, ss.address) +func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, oneShot OneShotMode) error { + c, err := net.ListenPacket(ds.scheme, ds.address) if err != nil { - logErrors.Add(ss.address, 1) + logErrors.Add(ds.address, 1) return err } - glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ss.scheme, ss.address, c) + glog.V(2).Infof("stream(%s:%s): opened new datagram socket %v", ds.scheme, ds.address, c) b := make([]byte, datagramReadBufferSize) partial := bytes.NewBufferString("") var total int @@ -63,18 +63,19 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak go func() { defer wg.Done() defer func() { - glog.V(2).Infof("stream(%s:%s): read total %d bytes", ss.scheme, ss.address, total) - glog.V(2).Infof("stream(%s:%s): closing connection", ss.scheme, ss.address) + glog.V(2).Infof("stream(%s:%s): read total %d bytes", ds.scheme, ds.address, total) + glog.V(2).Infof("stream(%s:%s): closing connection", ds.scheme, ds.address) err := c.Close() if err != nil { - logErrors.Add(ss.address, 1) + logErrors.Add(ds.address, 1) glog.Info(err) } - logCloses.Add(ss.address, 1) - ss.mu.Lock() - ss.completed = true - ss.mu.Unlock() - ss.Stop() + logCloses.Add(ds.address, 1) + ds.mu.Lock() + ds.completed = true + close(ds.lines) + ds.mu.Unlock() + ds.Stop() }() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -82,24 +83,24 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak for { n, _, err := c.ReadFrom(b) - glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ss.scheme, ss.address, n, err) + glog.V(2).Infof("stream(%s:%s): read %d bytes, err is %v", ds.scheme, ds.address, n, err) // This is a test-only trick that says if we've already put this // logstream in graceful shutdown, then a zero-byte read is // equivalent to an "EOF" in connection and file oriented streams. if n == 0 { if oneShot { - glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ss.scheme, ss.address) + glog.V(2).Infof("stream(%s:%s): exiting because zero byte read and one shot", ds.scheme, ds.address) if partial.Len() > 0 { - sendLine(ctx, ss.address, partial, ss.lines) + sendLine(ctx, ds.address, partial, ds.lines) } return } select { case <-ctx.Done(): - glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ss.scheme, ss.address) + glog.V(2).Infof("stream(%s:%s): exiting because zero byte read after cancellation", ds.scheme, ds.address) if partial.Len() > 0 { - sendLine(ctx, ss.address, partial, ss.lines) + sendLine(ctx, ds.address, partial, ds.lines) } return default: @@ -109,22 +110,22 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak if n > 0 { total += n //nolint:contextcheck - decodeAndSend(ctx, ss.lines, ss.address, n, b[:n], partial) - ss.mu.Lock() - ss.lastReadTime = time.Now() - ss.mu.Unlock() + decodeAndSend(ctx, ds.lines, ds.address, n, b[:n], partial) + ds.mu.Lock() + ds.lastReadTime = time.Now() + ds.mu.Unlock() } if err != nil && IsEndOrCancel(err) { if partial.Len() > 0 { - sendLine(ctx, ss.address, partial, ss.lines) + sendLine(ctx, ds.address, partial, ds.lines) } - glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ss.scheme, ss.address, err) + glog.V(2).Infof("stream(%s:%s): exiting, stream has error %s", ds.scheme, ds.address, err) return } // Yield and wait - glog.V(2).Infof("stream(%s:%s): waiting", ss.scheme, ss.address) + glog.V(2).Infof("stream(%s:%s): waiting", ds.scheme, ds.address) select { case <-ctx.Done(): // We may have started waiting here when the stop signal @@ -132,23 +133,28 @@ func (ss *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak // written to. The file is not technically yet at EOF so // we need to go back and try one more read. We'll exit // the stream in the zero byte handler above. - glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ss.scheme, ss.address) + glog.V(2).Infof("stream(%s:%s): Stopping after next zero byte read", ds.scheme, ds.address) case <-waker.Wake(): // sleep until next Wake() - glog.V(2).Infof("stream(%s:%s): Wake received", ss.scheme, ss.address) + glog.V(2).Infof("stream(%s:%s): Wake received", ds.scheme, ds.address) } } }() return nil } -func (ss *dgramStream) IsComplete() bool { - ss.mu.RLock() - defer ss.mu.RUnlock() - return ss.completed +func (ds *dgramStream) IsComplete() bool { + ds.mu.RLock() + defer ds.mu.RUnlock() + return ds.completed } -func (ss *dgramStream) Stop() { - glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ss.scheme, ss.address) - ss.cancel() +func (ds *dgramStream) Stop() { + glog.V(2).Infof("stream(%s:%s): Stop received on datagram stream.", ds.scheme, ds.address) + ds.cancel() +} + +// Lines implements the LogStream interface, returning the output lines channel. +func (ds *dgramStream) Lines() <-chan *logline.LogLine { + return ds.lines } diff --git a/internal/tailer/logstream/dgramstream_unix_test.go b/internal/tailer/logstream/dgramstream_unix_test.go index 3bb89abc2..9d4d289cc 100644 --- a/internal/tailer/logstream/dgramstream_unix_test.go +++ b/internal/tailer/logstream/dgramstream_unix_test.go @@ -38,14 +38,19 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { default: t.Fatalf("bad scheme %s", scheme) } - lines := make(chan *logline.LogLine, 1) + ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") sockName := scheme + "://" + addr - ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled) + ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled) testutil.FatalIfErr(t, err) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: addr, Line: "1"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines()) + s, err := net.Dial(scheme, addr) testutil.FatalIfErr(t, err) @@ -59,18 +64,13 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) { testutil.FatalIfErr(t, err) wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: addr, Line: "1"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() cancel() wg.Wait() - if !ss.IsComplete() { + if !ds.IsComplete() { t.Errorf("expecting dgramstream to be complete because socket closed") } })) @@ -93,14 +93,19 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) { default: t.Fatalf("bad scheme %s", scheme) } - lines := make(chan *logline.LogLine, 1) + ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") sockName := scheme + "://" + addr - ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled) + ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: addr, Line: "1"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ds.Lines()) + s, err := net.Dial(scheme, addr) testutil.FatalIfErr(t, err) @@ -112,15 +117,10 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) { cancel() // This cancellation should cause the stream to shut down. wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: addr, Line: "1"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() - if !ss.IsComplete() { + if !ds.IsComplete() { t.Errorf("expecting dgramstream to be complete because cancel") } })) diff --git a/internal/tailer/logstream/filestream.go b/internal/tailer/logstream/filestream.go index 0a2ab9317..c03842d5b 100644 --- a/internal/tailer/logstream/filestream.go +++ b/internal/tailer/logstream/filestream.go @@ -36,7 +36,7 @@ var fileTruncates = expvar.NewMap("file_truncates_total") type fileStream struct { cancel context.CancelFunc - lines chan<- *logline.LogLine + lines chan *logline.LogLine pathname string // Given name for the underlying file on the filesystem @@ -46,9 +46,9 @@ type fileStream struct { } // newFileStream creates a new log stream from a regular file. -func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { +func newFileStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, oneShot OneShotMode) (LogStream, error) { ctx, cancel := context.WithCancel(ctx) - fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: lines} + fs := &fileStream{cancel: cancel, pathname: pathname, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)} // Stream from the start of the file when in one shot mode. streamFromStart := oneShot == OneShotEnabled if err := fs.stream(ctx, wg, waker, fi, oneShot, streamFromStart); err != nil { @@ -163,6 +163,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } fs.mu.Lock() fs.completed = true + close(fs.lines) fs.mu.Unlock() return } @@ -227,6 +228,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } fs.mu.Lock() fs.completed = true + close(fs.lines) fs.mu.Unlock() return } @@ -238,6 +240,7 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake } fs.mu.Lock() fs.completed = true + close(fs.lines) fs.mu.Unlock() return default: @@ -280,3 +283,8 @@ func (fs *fileStream) IsComplete() bool { func (fs *fileStream) Stop() { fs.cancel() } + +// Lines implements the LogStream interface, returning the output lines channel. +func (fs *fileStream) Lines() <-chan *logline.LogLine { + return fs.lines +} diff --git a/internal/tailer/logstream/filestream_test.go b/internal/tailer/logstream/filestream_test.go index f3ab76d98..b25da104b 100644 --- a/internal/tailer/logstream/filestream_test.go +++ b/internal/tailer/logstream/filestream_test.go @@ -24,11 +24,16 @@ func TestFileStreamRead(t *testing.T) { f := testutil.TestOpenFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) + fs, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "yo"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + awaken(1, 1) // synchronise past first read testutil.WriteString(t, f, "yo\n") @@ -36,12 +41,8 @@ func TestFileStreamRead(t *testing.T) { cancel() wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "yo"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + + checkLineDiff() if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because stopped") @@ -58,19 +59,20 @@ func TestFileStreamReadOneShot(t *testing.T) { defer f.Close() testutil.WriteString(t, f, "yo\n") - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker := waker.NewTestAlways() - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + + fs, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotEnabled) testutil.FatalIfErr(t, err) - wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) expected := []*logline.LogLine{ {Context: context.TODO(), Filename: name, Line: "yo"}, } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + + wg.Wait() + + checkLineDiff() if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because stopped") @@ -88,30 +90,31 @@ func TestFileStreamReadNonSingleByteEnd(t *testing.T) { f := testutil.TestOpenFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) + fs, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) - awaken(1, 1) s := "a" for i := 0; i < 4094; i++ { s += "a" } - s += "δΈ­" + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: s}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + + awaken(1, 1) + testutil.WriteString(t, f, s+"\n") awaken(1, 1) fs.Stop() wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: s}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + + checkLineDiff() if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because stopped") @@ -129,12 +132,11 @@ func TestStreamDoesntBreakOnCorruptRune(t *testing.T) { f := testutil.TestOpenFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) + + fs, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) - awaken(1, 1) s := string([]byte{0xF1}) // 0xF1 = 11110001 , a byte signaling the start of a unicode character that @@ -146,18 +148,20 @@ func TestStreamDoesntBreakOnCorruptRune(t *testing.T) { for i := 0; i < 100; i++ { s += "a" } + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: s[1:]}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + + awaken(1, 1) testutil.WriteString(t, f, s+"\n") awaken(1, 1) fs.Stop() wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: s[1:]}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + + checkLineDiff() if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because stopped") @@ -175,15 +179,21 @@ func TestFileStreamTruncation(t *testing.T) { f := testutil.OpenLogFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 3) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) + fs, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotDisabled) // fs.Stop() is also called explicitly further down but a failed test // and early return would lead to the handle staying open defer fs.Stop() - testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "1"}, + {Context: context.TODO(), Filename: name, Line: "2"}, + {Context: context.TODO(), Filename: name, Line: "3"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + awaken(1, 1) // Synchronise past first read after seekToEnd testutil.WriteString(t, f, "1\n2\n") @@ -198,16 +208,8 @@ func TestFileStreamTruncation(t *testing.T) { fs.Stop() wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "1"}, - {Context: context.TODO(), Filename: name, Line: "2"}, - {Context: context.TODO(), Filename: name, Line: "3"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() cancel() wg.Wait() @@ -222,12 +224,17 @@ func TestFileStreamPartialRead(t *testing.T) { f := testutil.TestOpenFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) + fs, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "yo"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + awaken(1, 1) testutil.WriteString(t, f, "yo") @@ -239,12 +246,7 @@ func TestFileStreamPartialRead(t *testing.T) { cancel() wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "yo"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because cancellation") @@ -261,11 +263,17 @@ func TestFileStreamReadToEOFOnCancel(t *testing.T) { f := testutil.TestOpenFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 2) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) + fs, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "line 1"}, + {Context: context.TODO(), Filename: name, Line: "line 2"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + awaken(1, 1) testutil.WriteString(t, f, "line 1\n") @@ -276,13 +284,7 @@ func TestFileStreamReadToEOFOnCancel(t *testing.T) { wg.Wait() - close(lines) // Signal it's time to go. - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "line 1"}, - {Context: context.TODO(), Filename: name, Line: "line 2"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because cancellation") diff --git a/internal/tailer/logstream/filestream_unix_test.go b/internal/tailer/logstream/filestream_unix_test.go index ae63173e5..423648613 100644 --- a/internal/tailer/logstream/filestream_unix_test.go +++ b/internal/tailer/logstream/filestream_unix_test.go @@ -32,15 +32,19 @@ func TestFileStreamRotation(t *testing.T) { f := testutil.TestOpenFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 2) - ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") // OneShotDisabled because we hit EOF and need to wait. - fs, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) - + fs, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "1"}, + {Context: context.TODO(), Filename: name, Line: "2"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + awaken(1, 1) // sync to eof glog.Info("write 1") @@ -62,13 +66,7 @@ func TestFileStreamRotation(t *testing.T) { cancel() wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "1"}, - {Context: context.TODO(), Filename: name, Line: "2"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because stopped") @@ -84,11 +82,17 @@ func TestFileStreamURL(t *testing.T) { f := testutil.TestOpenFile(t, name) defer f.Close() - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - fs, err := logstream.New(ctx, &wg, waker, "file://"+name, lines, logstream.OneShotDisabled) + + fs, err := logstream.New(ctx, &wg, waker, "file://"+name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "yo"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, fs.Lines()) + awaken(1, 1) testutil.WriteString(t, f, "yo\n") @@ -97,12 +101,7 @@ func TestFileStreamURL(t *testing.T) { cancel() wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "yo"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() if !fs.IsComplete() { t.Errorf("expecting filestream to be complete because stopped") @@ -125,11 +124,10 @@ func TestFileStreamOpenFailure(t *testing.T) { testutil.FatalIfErr(t, err) - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, _ := waker.NewTest(ctx, 0, "stream") - _, err = logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotEnabled) + _, err = logstream.New(ctx, &wg, waker, name, logstream.OneShotEnabled) if err == nil || !os.IsPermission(err) { t.Errorf("Expected a permission denied error, got: %v", err) } diff --git a/internal/tailer/logstream/logstream.go b/internal/tailer/logstream/logstream.go index a60a9f416..5b76537c1 100644 --- a/internal/tailer/logstream/logstream.go +++ b/internal/tailer/logstream/logstream.go @@ -34,9 +34,10 @@ var ( // LogStream. type LogStream interface { - LastReadTime() time.Time // Return the time when the last log line was read from the source - Stop() // Ask to gracefully stop the stream; e.g. stream keeps reading until EOF and then completes work. - IsComplete() bool // True if the logstream has completed work and cannot recover. The caller should clean up this logstream, creating a new logstream on a pathname if necessary. + LastReadTime() time.Time // Return the time when the last log line was read from the source + Stop() // Ask to gracefully stop the stream; e.g. stream keeps reading until EOF and then completes work. + IsComplete() bool // True if the logstream has completed work and cannot recover. The caller should clean up this logstream, creating a new logstream on a pathname if necessary. + Lines() <-chan *logline.LogLine // Returns the output channel of this LogStream. } // defaultReadBufferSize the size of the buffer for reading bytes for files. @@ -62,10 +63,9 @@ const ( // New creates a LogStream from the file object located at the absolute path // `pathname`. The LogStream will watch `ctx` for a cancellation signal, and -// notify the `wg` when it is Done. Log lines will be sent to the `lines` -// channel. `seekToStart` is only used for testing and only works for regular -// files that can be seeked. -func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { +// notify the `wg` when it is Done. `oneShot` is used for testing and only +// works for regular files that can be seeked. +func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, oneShot OneShotMode) (LogStream, error) { if wg == nil { return nil, ErrNeedsWaitgroup } @@ -80,13 +80,13 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st default: glog.V(2).Infof("%v: %q in path pattern %q, treating as path", ErrUnsupportedURLScheme, u.Scheme, pathname) case "unixgram": - return newDgramStream(ctx, wg, waker, u.Scheme, u.Path, lines, oneShot) + return newDgramStream(ctx, wg, waker, u.Scheme, u.Path, oneShot) case "unix": - return newSocketStream(ctx, wg, waker, u.Scheme, u.Path, lines, oneShot) + return newSocketStream(ctx, wg, waker, u.Scheme, u.Path, oneShot) case "tcp": - return newSocketStream(ctx, wg, waker, u.Scheme, u.Host, lines, oneShot) + return newSocketStream(ctx, wg, waker, u.Scheme, u.Host, oneShot) case "udp": - return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, lines, oneShot) + return newDgramStream(ctx, wg, waker, u.Scheme, u.Host, oneShot) case "", "file": path = u.Path } @@ -96,7 +96,7 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st logErrors.Add(path, 1) return nil, err } - return newPipeStream(ctx, wg, waker, path, fi, lines) + return newPipeStream(ctx, wg, waker, path, fi) } fi, err := os.Stat(path) if err != nil { @@ -105,12 +105,12 @@ func New(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname st } switch m := fi.Mode(); { case m.IsRegular(): - return newFileStream(ctx, wg, waker, path, fi, lines, oneShot) + return newFileStream(ctx, wg, waker, path, fi, oneShot) case m&os.ModeType == os.ModeNamedPipe: - return newPipeStream(ctx, wg, waker, path, fi, lines) + return newPipeStream(ctx, wg, waker, path, fi) // TODO(jaq): in order to listen on an existing socket filepath, we must unlink and recreate it // case m&os.ModeType == os.ModeSocket: - // return newSocketStream(ctx, wg, waker, pathname, lines) + // return newSocketStream(ctx, wg, waker, pathname) default: return nil, fmt.Errorf("%w: %q", ErrUnsupportedFileType, pathname) } diff --git a/internal/tailer/logstream/logstream_test.go b/internal/tailer/logstream/logstream_test.go index 8a7109f8e..3fceebe59 100644 --- a/internal/tailer/logstream/logstream_test.go +++ b/internal/tailer/logstream/logstream_test.go @@ -10,12 +10,12 @@ import ( func TestNewErrors(t *testing.T) { ctx := context.Background() - _, err := logstream.New(ctx, nil, nil, "", nil, logstream.OneShotDisabled) + _, err := logstream.New(ctx, nil, nil, "", logstream.OneShotDisabled) if err == nil { t.Errorf("New(ctx, nil) expecting error, received nil") } var wg sync.WaitGroup - _, err = logstream.New(ctx, &wg, nil, ":a/b", nil, logstream.OneShotDisabled) + _, err = logstream.New(ctx, &wg, nil, ":a/b", logstream.OneShotDisabled) if err == nil { t.Error("New(ctg, wg, ..., path) expecting error, received nil") } diff --git a/internal/tailer/logstream/logstream_unix_test.go b/internal/tailer/logstream/logstream_unix_test.go index 4735ae8ed..03a2091f9 100644 --- a/internal/tailer/logstream/logstream_unix_test.go +++ b/internal/tailer/logstream/logstream_unix_test.go @@ -24,7 +24,7 @@ func TestReadStdin(t *testing.T) { testutil.FatalIfErr(t, err) testutil.OverrideStdin(t, f) - _, err = logstream.New(ctx, &wg, nil, "-", nil, logstream.OneShotDisabled) + _, err = logstream.New(ctx, &wg, nil, "-", logstream.OneShotDisabled) if err != nil { t.Errorf("New(.., '-') -> %v, expecting nil", err) } diff --git a/internal/tailer/logstream/pipestream.go b/internal/tailer/logstream/pipestream.go index f362c2d80..e81322129 100644 --- a/internal/tailer/logstream/pipestream.go +++ b/internal/tailer/logstream/pipestream.go @@ -17,7 +17,7 @@ import ( ) type pipeStream struct { - lines chan<- *logline.LogLine + lines chan *logline.LogLine pathname string // Given name for the underlying named pipe on the filesystem @@ -28,8 +28,8 @@ type pipeStream struct { // newPipeStream creates a new stream reader for Unix Pipes. // `pathname` must already be verified as clean. -func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo, lines chan<- *logline.LogLine) (LogStream, error) { - ps := &pipeStream{pathname: pathname, lastReadTime: time.Now(), lines: lines} +func newPipeStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, pathname string, fi os.FileInfo) (LogStream, error) { + ps := &pipeStream{pathname: pathname, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)} if err := ps.stream(ctx, wg, waker, fi); err != nil { return nil, err } @@ -77,7 +77,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake go func() { defer wg.Done() defer func() { - glog.V(2).Infof("stream(%s): read total %d bytes", ps.pathname, fd, total) + glog.V(2).Infof("stream(%s): read total %d bytes", ps.pathname, total) glog.V(2).Infof("stream(%s): closing file descriptor %v", ps.pathname, fd) err := fd.Close() if err != nil { @@ -87,6 +87,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake logCloses.Add(ps.pathname, 1) ps.mu.Lock() ps.completed = true + close(ps.lines) ps.mu.Unlock() }() ctx, cancel := context.WithCancel(ctx) @@ -139,6 +140,11 @@ func (ps *pipeStream) IsComplete() bool { } // Stop implements the Logstream interface. -// Calling Stop on a PipeStream is a no-op; PipeStreams always read until the pipe is closed, which is what calling Stop means on a Logstream. +// Calling Stop on a PipeStream is a no-op; PipeStreams always read until the input pipe is closed, which is what calling Stop means on a Logstream. func (ps *pipeStream) Stop() { } + +// Lines implements the LogStream interface, returning the output lines channel. +func (ps *pipeStream) Lines() <-chan *logline.LogLine { + return ps.lines +} diff --git a/internal/tailer/logstream/pipestream_unix_test.go b/internal/tailer/logstream/pipestream_unix_test.go index 7f6fad9eb..d41c58d41 100644 --- a/internal/tailer/logstream/pipestream_unix_test.go +++ b/internal/tailer/logstream/pipestream_unix_test.go @@ -29,7 +29,6 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) { name := filepath.Join(tmpDir, "fifo") testutil.FatalIfErr(t, unix.Mkfifo(name, 0o666)) - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker := waker.NewTestAlways() @@ -40,9 +39,14 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) { f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe) testutil.FatalIfErr(t, err) - ps, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) + ps, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "1"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines()) + testutil.WriteString(t, f, "1\n") // Pipes need to be closed to signal to the pipeStream to finish up. @@ -50,13 +54,8 @@ func TestPipeStreamReadCompletedBecauseClosed(t *testing.T) { ps.Stop() // no-op for pipes wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "1"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() cancel() @@ -75,15 +74,18 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) { name := filepath.Join(tmpDir, "fifo") testutil.FatalIfErr(t, unix.Mkfifo(name, 0o666)) - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe) testutil.FatalIfErr(t, err) - ps, err := logstream.New(ctx, &wg, waker, name, lines, logstream.OneShotDisabled) + ps, err := logstream.New(ctx, &wg, waker, name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "1"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines()) testutil.WriteString(t, f, "1\n") @@ -93,13 +95,8 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) { cancel() // Cancellation here should cause the stream to shut down. wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "1"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() if !ps.IsComplete() { t.Errorf("expecting pipestream to be complete because cancelled") @@ -115,13 +112,17 @@ func TestPipeStreamReadURL(t *testing.T) { name := filepath.Join(tmpDir, "fifo") testutil.FatalIfErr(t, unix.Mkfifo(name, 0o666)) - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker := waker.NewTestAlways() - ps, err := logstream.New(ctx, &wg, waker, "file://"+name, lines, logstream.OneShotDisabled) + ps, err := logstream.New(ctx, &wg, waker, "file://"+name, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: name, Line: "1"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines()) + f, err := os.OpenFile(name, os.O_WRONLY, os.ModeNamedPipe) testutil.FatalIfErr(t, err) testutil.WriteString(t, f, "1\n") @@ -131,13 +132,8 @@ func TestPipeStreamReadURL(t *testing.T) { ps.Stop() // no-op for pipes wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: name, Line: "1"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() cancel() @@ -158,13 +154,17 @@ func TestPipeStreamReadStdin(t *testing.T) { testutil.OverrideStdin(t, f) testutil.WriteString(t, f, "content\n") - lines := make(chan *logline.LogLine, 1) ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") - ps, err := logstream.New(ctx, &wg, waker, "-", lines, logstream.OneShotDisabled) + ps, err := logstream.New(ctx, &wg, waker, "-", logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: "-", Line: "content"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines()) + awaken(0, 0) testutil.FatalIfErr(t, f.Close()) @@ -173,13 +173,8 @@ func TestPipeStreamReadStdin(t *testing.T) { ps.Stop() wg.Wait() - close(lines) - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: "-", Line: "content"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() cancel() if !ps.IsComplete() { diff --git a/internal/tailer/logstream/socketstream.go b/internal/tailer/logstream/socketstream.go index f50d1ef05..18ebaae61 100644 --- a/internal/tailer/logstream/socketstream.go +++ b/internal/tailer/logstream/socketstream.go @@ -18,7 +18,7 @@ import ( type socketStream struct { cancel context.CancelFunc - lines chan<- *logline.LogLine + lines chan *logline.LogLine oneShot OneShotMode scheme string // URL Scheme to listen with, either tcp or unix @@ -29,12 +29,12 @@ type socketStream struct { lastReadTime time.Time // Last time a log line was read from this socket } -func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, lines chan<- *logline.LogLine, oneShot OneShotMode) (LogStream, error) { +func newSocketStream(ctx context.Context, wg *sync.WaitGroup, waker waker.Waker, scheme, address string, oneShot OneShotMode) (LogStream, error) { if address == "" { return nil, ErrEmptySocketAddress } ctx, cancel := context.WithCancel(ctx) - ss := &socketStream{cancel: cancel, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: lines} + ss := &socketStream{cancel: cancel, oneShot: oneShot, scheme: scheme, address: address, lastReadTime: time.Now(), lines: make(chan *logline.LogLine)} if err := ss.stream(ctx, wg, waker); err != nil { return nil, err } @@ -54,7 +54,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa logErrors.Add(ss.address, 1) return err } - glog.V(2).Infof("stream(%s:%s): opened new socket listener %v", ss.scheme, ss.address, l) + glog.V(2).Infof("stream(%s:%s): opened new socket listener %+v", ss.scheme, ss.address, l) initDone := make(chan struct{}) // Set up for shutdown @@ -68,13 +68,16 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa case <-ctx.Done(): } } - glog.V(2).Infof("stream(%s:%s): closing listener", ss.scheme, ss.address, l) + glog.V(2).Infof("stream(%s:%s): closing listener", ss.scheme, ss.address) err := l.Close() if err != nil { glog.Info(err) } ss.mu.Lock() ss.completed = true + if !ss.oneShot { + close(ss.lines) + } ss.mu.Unlock() }() @@ -97,7 +100,7 @@ func (ss *socketStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wa if err := acceptConn(); err != nil { glog.Info(err) } - glog.Info("stream(%s:%s): oneshot mode, returning", ss.scheme, ss.address) + glog.Infof("stream(%s:%s): oneshot mode, returning", ss.scheme, ss.address) close(initDone) }() return nil @@ -130,6 +133,9 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake glog.Info(err) } logCloses.Add(ss.address, 1) + if ss.oneShot { + close(ss.lines) + } }() ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -181,3 +187,8 @@ func (ss *socketStream) IsComplete() bool { func (ss *socketStream) Stop() { ss.cancel() } + +// Lines implements the LogStream interface, returning the output lines channel. +func (ss *socketStream) Lines() <-chan *logline.LogLine { + return ss.lines +} diff --git a/internal/tailer/logstream/socketstream_unix_test.go b/internal/tailer/logstream/socketstream_unix_test.go index d6f5f3c24..d3642c690 100644 --- a/internal/tailer/logstream/socketstream_unix_test.go +++ b/internal/tailer/logstream/socketstream_unix_test.go @@ -36,14 +36,19 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { default: t.Fatalf("bad scheme %s", scheme) } - lines := make(chan *logline.LogLine, 1) + ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") sockName := scheme + "://" + addr - ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotEnabled) + ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled) testutil.FatalIfErr(t, err) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: addr, Line: "1"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ss.Lines()) + s, err := net.Dial(scheme, addr) testutil.FatalIfErr(t, err) @@ -57,13 +62,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) { wg.Wait() - close(lines) - - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: addr, Line: "1"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() if !ss.IsComplete() { t.Errorf("expecting socketstream to be complete because socket closed") @@ -90,14 +89,19 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { default: t.Fatalf("bad scheme %s", scheme) } - lines := make(chan *logline.LogLine, 1) + ctx, cancel := context.WithCancel(context.Background()) waker, awaken := waker.NewTest(ctx, 1, "stream") sockName := scheme + "://" + addr - ss, err := logstream.New(ctx, &wg, waker, sockName, lines, logstream.OneShotDisabled) + ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled) testutil.FatalIfErr(t, err) + expected := []*logline.LogLine{ + {Context: context.TODO(), Filename: addr, Line: "1"}, + } + checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ss.Lines()) + s, err := net.Dial(scheme, addr) testutil.FatalIfErr(t, err) @@ -109,13 +113,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) { cancel() // This cancellation should cause the stream to shut down immediately. wg.Wait() - close(lines) - - received := testutil.LinesReceived(lines) - expected := []*logline.LogLine{ - {Context: context.TODO(), Filename: addr, Line: "1"}, - } - testutil.ExpectNoDiff(t, expected, received, testutil.IgnoreFields(logline.LogLine{}, "Context")) + checkLineDiff() if !ss.IsComplete() { t.Errorf("expecting socketstream to be complete because cancel") diff --git a/internal/tailer/tail.go b/internal/tailer/tail.go index f60e69694..5dac7d486 100644 --- a/internal/tailer/tail.go +++ b/internal/tailer/tail.go @@ -283,11 +283,19 @@ func (t *Tailer) TailPath(pathname string) error { logCount.Add(-1) // Removing the current entry before re-adding. glog.V(2).Infof("%q: Existing logstream is finished, creating a new one.", pathname) } - l, err := logstream.New(t.ctx, &t.wg, t.logstreamPollWaker, pathname, t.lines, t.oneShot) + l, err := logstream.New(t.ctx, &t.wg, t.logstreamPollWaker, pathname, t.oneShot) if err != nil { return err } t.logstreams[pathname] = l + t.wg.Add(1) + // Start a goroutine to move lines from the logstream to the main tailer output. + go func() { + defer t.wg.Done() + for line := range l.Lines() { + t.lines <- line + } + }() glog.Infof("Tailing %s", pathname) logCount.Add(1) return nil diff --git a/internal/testutil/lines.go b/internal/testutil/lines.go index 178674582..d98dfc603 100644 --- a/internal/testutil/lines.go +++ b/internal/testutil/lines.go @@ -4,6 +4,9 @@ package testutil import ( + "sync" + "testing" + "github.com/google/mtail/internal/logline" ) @@ -14,3 +17,22 @@ func LinesReceived(lines <-chan *logline.LogLine) (r []*logline.LogLine) { } return } + +func ExpectLinesReceivedNoDiff(tb testing.TB, wantLines []*logline.LogLine, gotLines <-chan *logline.LogLine) func() { + tb.Helper() + var received []*logline.LogLine + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for line := range gotLines { + received = append(received, line) + } + }() + return func() { + tb.Helper() + wg.Wait() + ExpectNoDiff(tb, wantLines, received, IgnoreFields(logline.LogLine{}, "Context")) + } +}