Skip to content

Commit

Permalink
fix: Return to read immediately after a successful read.
Browse files Browse the repository at this point in the history
Copying the filestream, now pipe, dgram, and socket streams return to read
again immediately after a successful read, so that we don't wait.

This is now obvious the problem in #685 and using the bandwidth-delay-product
we can see that a 250ms pause between reads of 4096B and 128KiB matches the
results seen.

Before:
```
jaq% time ./mtail -logs - -progs examples/rsyncd.mtail < internal/mtail/testdata/rsyncd.log
0.01s user 0.01s system 7% cpu 0.264 total
```

After:
```
jaq% time ./mtail -logs - -progs examples/rsyncd.mtail < internal/mtail/testdata/rsyncd.log
0.01s user 0.03s system 102% cpu 0.041 total
```

Thanks to @rideliner for the hint.
  • Loading branch information
jaqx0r committed Jul 5, 2024
1 parent f39cf7b commit c645528
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 27 deletions.
5 changes: 5 additions & 0 deletions internal/tailer/logstream/dgramstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func (ds *dgramStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wak
ds.lastReadTime = time.Now()
ds.mu.Unlock()
ds.staleTimer = time.AfterFunc(time.Hour*24, ds.cancel)

// No error implies more to read, so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

if err != nil && IsEndOrCancel(err) {
Expand Down
8 changes: 2 additions & 6 deletions internal/tailer/logstream/dgramstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// Stream is not shut down with cancel in this test
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
Expand All @@ -59,8 +59,6 @@ func TestDgramStreamReadCompletedBecauseSocketClosed(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // sync past read

// "Close" the socket by sending zero bytes, which in oneshot mode tells the stream to act as if we're done.
_, err = s.Write([]byte{})
testutil.FatalIfErr(t, err)
Expand Down Expand Up @@ -94,7 +92,7 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ds, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
Expand All @@ -111,8 +109,6 @@ func TestDgramStreamReadCompletedBecauseCancel(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // Synchronise past read.

cancel() // This cancellation should cause the stream to shut down.
wg.Wait()

Expand Down
11 changes: 5 additions & 6 deletions internal/tailer/logstream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
fs.lastReadTime = time.Now()
fs.mu.Unlock()
fs.staleTimer = time.AfterFunc(time.Hour*24, fs.cancel)

// No error implies there is more to read so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

if err != nil && err != io.EOF {
Expand Down Expand Up @@ -206,12 +211,6 @@ func (fs *fileStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
}

// No error implies there is more to read in this file so go
// straight back to read unless it looks like context is Done.
if err == nil && ctx.Err() == nil {
continue
}

Sleep:
// If we get here it's because we've stalled. First test to see if it's
// time to exit.
Expand Down
10 changes: 7 additions & 3 deletions internal/tailer/logstream/pipestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,8 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
}
logCloses.Add(ps.pathname, 1)
close(ps.lines)
ps.cancel()
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
SetReadDeadlineOnDone(ctx, fd)

for {
Expand All @@ -105,6 +104,11 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
ps.lastReadTime = time.Now()
ps.mu.Unlock()
ps.staleTimer = time.AfterFunc(time.Hour*24, ps.cancel)

// No error implies there is more to read so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

// Test to see if we should exit.
Expand All @@ -123,7 +127,7 @@ func (ps *pipeStream) stream(ctx context.Context, wg *sync.WaitGroup, waker wake
// Exit immediately; cancelled context is going to cause the
// next read to be interrupted and exit, so don't bother going
// around the loop again.
return
//return
case <-waker.Wake():
// sleep until next Wake()
glog.V(2).Infof("stream(%s): Wake received", ps.pathname)
Expand Down
10 changes: 4 additions & 6 deletions internal/tailer/logstream/pipestream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {
testutil.FatalIfErr(t, unix.Mkfifo(name, 0o666))

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

f, err := os.OpenFile(name, os.O_RDWR, os.ModeNamedPipe)
testutil.FatalIfErr(t, err)
Expand All @@ -87,9 +87,6 @@ func TestPipeStreamReadCompletedBecauseCancel(t *testing.T) {

testutil.WriteString(t, f, "1\n")

// Avoid a race with cancellation if we can synchronise with waker.Wake()
awaken(0, 0)

cancel() // Cancellation here should cause the stream to shut down.
wg.Wait()

Expand Down Expand Up @@ -155,7 +152,7 @@ func TestPipeStreamReadStdin(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// The stream is not shut down by cancel in this test.
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

ps, err := logstream.New(ctx, &wg, waker, "-", logstream.OneShotDisabled)
testutil.FatalIfErr(t, err)
Expand All @@ -165,7 +162,8 @@ func TestPipeStreamReadStdin(t *testing.T) {
}
checkLineDiff := testutil.ExpectLinesReceivedNoDiff(t, expected, ps.Lines())

awaken(0, 0)
// Give the stream a chance to wake and read
time.Sleep(10 * time.Millisecond)

testutil.FatalIfErr(t, f.Close())

Expand Down
5 changes: 5 additions & 0 deletions internal/tailer/logstream/socketstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ func (ss *socketStream) handleConn(ctx context.Context, wg *sync.WaitGroup, wake
ss.lastReadTime = time.Now()
ss.mu.Unlock()
ss.staleTimer = time.AfterFunc(time.Hour*24, ss.cancel)

// No error implies more to read, so restart the loop.
if err == nil && ctx.Err() == nil {
continue
}
}

if err != nil && IsEndOrCancel(err) {
Expand Down
8 changes: 2 additions & 6 deletions internal/tailer/logstream/socketstream_unix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// The stream is not shut down with cancel in this test.
defer cancel()
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotEnabled)
Expand All @@ -57,8 +57,6 @@ func TestSocketStreamReadCompletedBecauseSocketClosed(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // Sync past read

// Close the socket to signal to the socketStream to shut down.
testutil.FatalIfErr(t, s.Close())

Expand Down Expand Up @@ -91,7 +89,7 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
waker, awaken := waker.NewTest(ctx, 1, "stream")
waker := waker.NewTestAlways()

sockName := scheme + "://" + addr
ss, err := logstream.New(ctx, &wg, waker, sockName, logstream.OneShotDisabled)
Expand All @@ -108,8 +106,6 @@ func TestSocketStreamReadCompletedBecauseCancel(t *testing.T) {
_, err = s.Write([]byte("1\n"))
testutil.FatalIfErr(t, err)

awaken(0, 0) // Sync past read to ensure we read

cancel() // This cancellation should cause the stream to shut down immediately.
wg.Wait()

Expand Down

0 comments on commit c645528

Please sign in to comment.