-
Notifications
You must be signed in to change notification settings - Fork 89
/
Copy pathtailer.go
246 lines (229 loc) · 6.06 KB
/
tailer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// Copyright 2013 Canonical Ltd.
// Licensed under the LGPLv3, see LICENCE file for details.
package tailer
import (
"bufio"
"bytes"
"io"
"os"
"time"
"gopkg.in/tomb.v1"
)
const (
defaultBufferSize = 4096
polltime = time.Second
delimiter = '\n'
)
var (
bufferSize = defaultBufferSize
delimiters = []byte{delimiter}
)
// TailerFilterFunc decides if a line shall be tailed (func is nil or
// returns true) of shall be omitted (func returns false).
type TailerFilterFunc func(line []byte) bool
// Tailer reads an input line by line an tails them into the passed Writer.
// The lines have to be terminated with a newline.
type Tailer struct {
tomb tomb.Tomb
readSeeker io.ReadSeeker
reader *bufio.Reader
writeCloser io.WriteCloser
writer *bufio.Writer
filter TailerFilterFunc
polltime time.Duration
}
// NewTailer starts a Tailer which reads strings from the passed
// ReadSeeker line by line. If a filter function is specified the read
// lines are filtered. The matching lines are written to the passed
// Writer.
func NewTailer(readSeeker io.ReadSeeker, writer io.Writer, filter TailerFilterFunc) *Tailer {
return newTailer(readSeeker, writer, filter, polltime)
}
// newTailer starts a Tailer like NewTailer but allows the setting of
// the read buffer size and the time between pollings for testing.
func newTailer(readSeeker io.ReadSeeker, writer io.Writer,
filter TailerFilterFunc, polltime time.Duration) *Tailer {
t := &Tailer{
readSeeker: readSeeker,
reader: bufio.NewReaderSize(readSeeker, bufferSize),
writer: bufio.NewWriter(writer),
filter: filter,
polltime: polltime,
}
go func() {
defer t.tomb.Done()
t.tomb.Kill(t.loop())
}()
return t
}
// Stop tells the tailer to stop working.
func (t *Tailer) Stop() error {
t.tomb.Kill(nil)
return t.tomb.Wait()
}
// Wait waits until the tailer is stopped due to command
// or an error. In case of an error it returns the reason.
func (t *Tailer) Wait() error {
return t.tomb.Wait()
}
// Dead returns the channel that can be used to wait until
// the tailer is stopped.
func (t *Tailer) Dead() <-chan struct{} {
return t.tomb.Dead()
}
// Err returns a possible error.
func (t *Tailer) Err() error {
return t.tomb.Err()
}
// loop writes the last lines based on the buffer size to the
// writer and then polls for more data to write it to the
// writer too.
func (t *Tailer) loop() error {
// Start polling.
// TODO(mue) 2013-12-06
// Handling of read-seeker/files being truncated during
// tailing is currently missing!
timer := time.NewTimer(0)
for {
select {
case <-t.tomb.Dying():
return nil
case <-timer.C:
for {
line, readErr := t.readLine()
_, writeErr := t.writer.Write(line)
if writeErr != nil {
return writeErr
}
if readErr != nil {
if readErr != io.EOF {
return readErr
}
break
}
}
if writeErr := t.writer.Flush(); writeErr != nil {
return writeErr
}
timer.Reset(t.polltime)
}
}
}
// SeekLastLines sets the read position of the ReadSeeker to the
// wanted number of filtered lines before the end.
func SeekLastLines(readSeeker io.ReadSeeker, lines uint, filter TailerFilterFunc) error {
offset, err := readSeeker.Seek(0, os.SEEK_END)
if err != nil {
return err
}
if lines == 0 {
// We are done, just seeking to the end is sufficient.
return nil
}
seekPos := int64(0)
found := uint(0)
buffer := make([]byte, bufferSize)
SeekLoop:
for offset > 0 {
// buffer contains the data left over from the
// previous iteration.
space := cap(buffer) - len(buffer)
if space < bufferSize {
// Grow buffer.
newBuffer := make([]byte, len(buffer), cap(buffer)*2)
copy(newBuffer, buffer)
buffer = newBuffer
space = cap(buffer) - len(buffer)
}
if int64(space) > offset {
// Use exactly the right amount of space if there's
// only a small amount remaining.
space = int(offset)
}
// Copy data remaining from last time to the end of the buffer,
// so we can read into the right place.
copy(buffer[space:cap(buffer)], buffer)
buffer = buffer[0 : len(buffer)+space]
offset -= int64(space)
_, err := readSeeker.Seek(offset, os.SEEK_SET)
if err != nil {
return err
}
_, err = io.ReadFull(readSeeker, buffer[0:space])
if err != nil {
return err
}
// Find the end of the last line in the buffer.
// This will discard any unterminated line at the end
// of the file.
end := bytes.LastIndex(buffer, delimiters)
if end == -1 {
// No end of line found - discard incomplete
// line and continue looking. If this happens
// at the beginning of the file, we don't care
// because we're going to stop anyway.
buffer = buffer[:0]
continue
}
end++
for {
start := bytes.LastIndex(buffer[0:end-1], delimiters)
if start == -1 && offset >= 0 {
break
}
start++
if filter == nil || filter(buffer[start:end]) {
found++
if found >= lines {
seekPos = offset + int64(start)
break SeekLoop
}
}
end = start
}
// Leave the last line in buffer, as we don't know whether
// it's complete or not.
buffer = buffer[0:end]
}
// Final positioning.
readSeeker.Seek(seekPos, os.SEEK_SET)
return nil
}
// readLine reads the next valid line from the reader, even if it is
// larger than the reader buffer.
func (t *Tailer) readLine() ([]byte, error) {
for {
slice, err := t.reader.ReadSlice(delimiter)
if err == nil {
if t.isValid(slice) {
return slice, nil
}
continue
}
line := append([]byte(nil), slice...)
for err == bufio.ErrBufferFull {
slice, err = t.reader.ReadSlice(delimiter)
line = append(line, slice...)
}
switch err {
case nil:
if t.isValid(line) {
return line, nil
}
case io.EOF:
// EOF without delimiter, step back.
t.readSeeker.Seek(-int64(len(line)), os.SEEK_CUR)
return nil, err
default:
return nil, err
}
}
}
// isValid checks if the passed line is valid by checking if the
// line has content, the filter function is nil or it returns true.
func (t *Tailer) isValid(line []byte) bool {
if t.filter == nil {
return true
}
return t.filter(line)
}