From 7b45320917ee390fd46b32bebba5dcbc437268d5 Mon Sep 17 00:00:00 2001 From: Andrea Spacca Date: Wed, 30 Nov 2022 17:34:04 +0900 Subject: [PATCH] handle EOF on single line content (#33568) * handle EOF on single line content * changelog * fallback to encode_eof if no events in aws-s3 input * lint * lint * collect on EOF in line reader * remove encode eof * remove iterN * fix test * increase test coverage * linting * more linting * increase coverage --- CHANGELOG.next.asciidoc | 1 + libbeat/reader/readfile/bench_test.go | 5 +- libbeat/reader/readfile/encode.go | 8 +- libbeat/reader/readfile/line.go | 96 +++++++--- libbeat/reader/readfile/line_test.go | 176 ++++++++++++------ x-pack/filebeat/input/awss3/s3_objects.go | 21 ++- .../filebeat/input/awss3/s3_objects_test.go | 8 + .../input/awss3/testdata/no-eol-twolines.txt | 2 + .../filebeat/input/awss3/testdata/no-eol.txt | 1 + 9 files changed, 221 insertions(+), 97 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/testdata/no-eol-twolines.txt create mode 100644 x-pack/filebeat/input/awss3/testdata/no-eol.txt diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 9154a8dd4d6f..1036f87519b3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -58,6 +58,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Add handling of AAA operations for Cisco ASA module. {issue}32257[32257] {pull}32789[32789] - Fix gc.log always shipped even if gc fileset is disabled {issue}30995[30995] - Fix handling of empty array in httpjson input. {pull}32001[32001] +- Fix EOF on single line not producing any event. {issue}30436[30436] {pull}33568[33568] - Fix reporting of `filebeat.events.active` in log events such that the current value is always reported instead of the difference from the last value. {pull}33597[33597] - Fix splitting array of strings/arrays in httpjson input {issue}30345[30345] {pull}33609[33609] - Fix Google workspace pagination and document ID generation. {pull}33666[33666] diff --git a/libbeat/reader/readfile/bench_test.go b/libbeat/reader/readfile/bench_test.go index b1f6e7667f60..e1d612488486 100644 --- a/libbeat/reader/readfile/bench_test.go +++ b/libbeat/reader/readfile/bench_test.go @@ -20,6 +20,7 @@ package readfile import ( "bytes" "encoding/hex" + "errors" "fmt" "io" "io/ioutil" @@ -39,7 +40,7 @@ func BenchmarkEncoderReader(b *testing.B) { b.Run(name, func(b *testing.B) { b.ReportAllocs() for bN := 0; bN < b.N; bN++ { - reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit}) + reader, err := NewEncodeReader(ioutil.NopCloser(bytes.NewReader(lines)), Config{encoding.Nop, bufferSize, LineFeed, lineMaxLimit, false}) if err != nil { b.Fatal("failed to initialize reader:", err) } @@ -48,7 +49,7 @@ func BenchmarkEncoderReader(b *testing.B) { for i := 0; ; i++ { msg, err := reader.Next() if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { b.ReportMetric(float64(i), "processed_lines") break } else { diff --git a/libbeat/reader/readfile/encode.go b/libbeat/reader/readfile/encode.go index 3b63c43aa356..0e11554a6714 100644 --- a/libbeat/reader/readfile/encode.go +++ b/libbeat/reader/readfile/encode.go @@ -40,9 +40,15 @@ type Config struct { BufferSize int Terminator LineTerminator MaxBytes int + // If CollectOnEOF is set to true (default false) the line reader will return the buffer if EOF reached: this + // will ensure full content including last line with no EOL will be returned for fully retrieved content that's + // not appended anymore between reads. + // If CollectOnEOF is set to false the line reader will return 0 content and keep the buffer at the current + // state of appending data after temporarily EOF. + CollectOnEOF bool } -// New creates a new Encode reader from input reader by applying +// NewEncodeReader creates a new Encode reader from input reader by applying // the given codec. func NewEncodeReader(r io.ReadCloser, config Config) (EncoderReader, error) { eReader, err := NewLineReader(r, config) diff --git a/libbeat/reader/readfile/line.go b/libbeat/reader/readfile/line.go index c36832e514e4..0a0a119b736e 100644 --- a/libbeat/reader/readfile/line.go +++ b/libbeat/reader/readfile/line.go @@ -19,6 +19,7 @@ package readfile import ( "bytes" + "errors" "fmt" "io" @@ -33,18 +34,22 @@ const unlimited = 0 // LineReader reads lines from underlying reader, decoding the input stream // using the configured codec. The reader keeps track of bytes consumed // from raw input stream for every decoded line. +// If collectOnEOF is set to true (default false) it will return the buffer if EOF reached. +// If collectOnEOF is set to false it will return 0 content and keep the buffer at the current +// state of appending data after temporarily EOF. type LineReader struct { - reader io.ReadCloser - maxBytes int // max bytes per line limit to avoid OOM with malformatted files - nl []byte - decodedNl []byte - inBuffer *streambuf.Buffer - outBuffer *streambuf.Buffer - inOffset int // input buffer read offset - byteCount int // number of bytes decoded from input buffer into output buffer - decoder transform.Transformer - tempBuffer []byte - logger *logp.Logger + reader io.ReadCloser + maxBytes int // max bytes per line limit to avoid OOM with malformatted files + nl []byte + decodedNl []byte + collectOnEOF bool + inBuffer *streambuf.Buffer + outBuffer *streambuf.Buffer + inOffset int // input buffer read offset + byteCount int // number of bytes decoded from input buffer into output buffer + decoder transform.Transformer + tempBuffer []byte + logger *logp.Logger } // NewLineReader creates a new reader object @@ -63,15 +68,16 @@ func NewLineReader(input io.ReadCloser, config Config) (*LineReader, error) { } return &LineReader{ - reader: input, - maxBytes: config.MaxBytes, - decoder: config.Codec.NewDecoder(), - nl: nl, - decodedNl: terminator, - inBuffer: streambuf.New(nil), - outBuffer: streambuf.New(nil), - tempBuffer: make([]byte, config.BufferSize), - logger: logp.NewLogger("reader_line"), + reader: input, + maxBytes: config.MaxBytes, + decoder: config.Codec.NewDecoder(), + nl: nl, + decodedNl: terminator, + collectOnEOF: config.CollectOnEOF, + inBuffer: streambuf.New(nil), + outBuffer: streambuf.New(nil), + tempBuffer: make([]byte, config.BufferSize), + logger: logp.NewLogger("reader_line"), }, nil } @@ -88,12 +94,46 @@ func (r *LineReader) Next() (b []byte, n int, err error) { // read next 'potential' line from input buffer/reader err := r.advance() if err != nil { + if errors.Is(err, io.EOF) && r.collectOnEOF { + // Found EOF and collectOnEOF is true + // -> decode input sequence into outBuffer + // let's take whole buffer len without len(nl) if it ends with it + end := r.inBuffer.Len() + if bytes.HasSuffix(r.inBuffer.Bytes(), r.decodedNl) { + end -= len(r.nl) + } + + sz, err := r.decode(end) + if err != nil { + r.logger.Errorf("Error decoding line: %s", err) + // In case of error increase size by unencoded length + sz = r.inBuffer.Len() + } + + // Consume transformed bytes from input buffer + _ = r.inBuffer.Advance(sz) + r.inBuffer.Reset() + + // output buffer contains untile EOF. Extract + // byte slice from buffer and reset output buffer. + bytes, err := r.outBuffer.Collect(r.outBuffer.Len()) + r.outBuffer.Reset() + if err != nil { + // This should never happen as otherwise we have a broken state + panic(err) + } + + // return and reset consumed bytes count + sz = r.byteCount + r.byteCount = 0 + return bytes, sz, io.EOF + } + // return and reset consumed bytes count sz := r.byteCount r.byteCount = 0 return nil, sz, err } - // Check last decoded byte really being newline also unencoded // if not, continue reading buf := r.outBuffer.Bytes() @@ -144,13 +184,13 @@ func (r *LineReader) advance() error { // Try to read more bytes into buffer n, err := r.reader.Read(r.tempBuffer) - if err == io.EOF && n > 0 { + if errors.Is(err, io.EOF) && n > 0 { // Continue processing the returned bytes. The next call will yield EOF with 0 bytes. err = nil } // Write to buffer also in case of err - r.inBuffer.Write(r.tempBuffer[:n]) + _, _ = r.inBuffer.Write(r.tempBuffer[:n]) if err != nil { return err @@ -169,7 +209,7 @@ func (r *LineReader) advance() error { // If newLine is found, drop the lines longer than maxBytes for idx != -1 && idx > r.maxBytes { r.logger.Warnf("Exceeded %d max bytes in line limit, skipped %d bytes line", r.maxBytes, idx) - err = r.inBuffer.Advance(idx + len(r.nl)) + _ = r.inBuffer.Advance(idx + len(r.nl)) r.byteCount += idx + len(r.nl) r.inBuffer.Reset() r.inOffset = 0 @@ -237,7 +277,7 @@ func (r *LineReader) skipUntilNewLine() (int, error) { idx = bytes.Index(r.tempBuffer[:n], r.nl) if idx != -1 { - r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n]) + _, _ = r.inBuffer.Write(r.tempBuffer[idx+len(r.nl) : n]) skipped += idx + len(r.nl) } else { skipped += n @@ -267,8 +307,8 @@ func (r *LineReader) decode(end int) (int, error) { nDst, nSrc, err = r.decoder.Transform(r.tempBuffer, inBytes[start:end], false) if err != nil { // Check if error is different from destination buffer too short - if err != transform.ErrShortDst { - r.outBuffer.Write(inBytes[0:end]) + if !errors.Is(err, transform.ErrShortDst) { + _, _ = r.outBuffer.Write(inBytes[0:end]) start = end break } @@ -278,7 +318,7 @@ func (r *LineReader) decode(end int) (int, error) { } start += nSrc - r.outBuffer.Write(r.tempBuffer[:nDst]) + _, _ = r.outBuffer.Write(r.tempBuffer[:nDst]) } r.byteCount += start diff --git a/libbeat/reader/readfile/line_test.go b/libbeat/reader/readfile/line_test.go index 85323ea32c20..000630fd35da 100644 --- a/libbeat/reader/readfile/line_test.go +++ b/libbeat/reader/readfile/line_test.go @@ -23,6 +23,8 @@ package readfile import ( "bytes" "encoding/hex" + "errors" + "fmt" "io" "io/ioutil" "math/rand" @@ -39,44 +41,47 @@ import ( // Sample texts are from http://www.columbia.edu/~kermit/utf8.html type lineTestCase struct { - encoding string - strings []string + encoding string + strings []string + collectOnEOF bool + withEOL bool + lineTerminator LineTerminator } var tests = []lineTestCase{ - {"plain", []string{"I can", "eat glass"}}, - {"latin1", []string{"I kå Glas frässa", "ond des macht mr nix!"}}, - {"utf-16be", []string{"Pot să mănânc sticlă", "și ea nu mă rănește."}}, - {"utf-16le", []string{"काचं शक्नोम्यत्तुम् ।", "नोपहिनस्ति माम् ॥"}}, - {"big5", []string{"我能吞下玻", "璃而不傷身體。"}}, - {"gb18030", []string{"我能吞下玻璃", "而不傷身。體"}}, - {"euc-kr", []string{" 나는 유리를 먹을 수 있어요.", " 그래도 아프지 않아요"}}, - {"euc-jp", []string{"私はガラスを食べられます。", "それは私を傷つけません。"}}, - {"plain", []string{"I can", "eat glass"}}, - {"iso8859-1", []string{"Filebeat is my favourite"}}, - {"iso8859-2", []string{"Filebeat je môj obľúbený"}}, // slovak: filebeat is my favourite - {"iso8859-3", []string{"büyükannem Filebeat kullanıyor"}}, // turkish: my granmother uses filebeat - {"iso8859-4", []string{"Filebeat on mõeldud kõigile"}}, // estonian: filebeat is for everyone - {"iso8859-5", []string{"я люблю кодировки"}}, // russian: i love encodings - {"iso8859-6", []string{"أنا بحاجة إلى المزيد من الترميزات"}}, // arabic: i need more encodings - {"iso8859-7", []string{"όπου μπορώ να αγοράσω περισσότερες κωδικοποιήσεις"}}, // greek: where can i buy more encodings? - {"iso8859-8", []string{"אני צריך קידוד אישי"}}, // hebrew: i need a personal encoding - {"iso8859-9", []string{"kodlamaları pişirebilirim"}}, // turkish: i can cook encodings - {"iso8859-10", []string{"koodaukset jäädyttävät nollaan"}}, // finnish: encodings freeze below zero - {"iso8859-13", []string{"mój pies zjada kodowanie"}}, // polish: my dog eats encodings - {"iso8859-14", []string{"An féidir leat cáise a ionchódú?"}}, // irish: can you encode a cheese? - {"iso8859-15", []string{"bedes du kode", "for min €"}}, // danish: please encode my euro symbol - {"iso8859-16", []string{"rossz karakterkódolást", "használsz"}}, // hungarian: you use the wrong character encoding - {"koi8r", []string{"я люблю кодировки"}}, // russian: i love encodings - {"koi8u", []string{"я люблю кодировки"}}, // russian: i love encodings - {"windows1250", []string{"Filebeat je môj obľúbený"}}, // slovak: filebeat is my favourite - {"windows1251", []string{"я люблю кодировки"}}, // russian: i love encodings - {"windows1252", []string{"what is better than an encoding?", "a legacy encoding"}}, - {"windows1253", []string{"όπου μπορώ να αγοράσω", "περισσότερες κωδικοποιήσεις"}}, // greek: where can i buy more encodings? - {"windows1254", []string{"kodlamaları", "pişirebilirim"}}, // turkish: i can cook encodings - {"windows1255", []string{"אני צריך קידוד אישי"}}, // hebrew: i need a personal encoding - {"windows1256", []string{"أنا بحاجة إلى المزيد من الترميزات"}}, // arabic: i need more encodings - {"windows1257", []string{"toite", "kodeerijaid"}}, // estonian: feed the encoders + {encoding: "plain", strings: []string{"I can", "eat glass"}}, + {encoding: "latin1", strings: []string{"I kå Glas frässa", "ond des macht mr nix!"}}, + {encoding: "utf-16be", strings: []string{"Pot să mănânc sticlă", "și ea nu mă rănește."}}, + {encoding: "utf-16le", strings: []string{"काचं शक्नोम्यत्तुम् ।", "नोपहिनस्ति माम् ॥"}}, + {encoding: "big5", strings: []string{"我能吞下玻", "璃而不傷身體。"}}, + {encoding: "gb18030", strings: []string{"我能吞下玻璃", "而不傷身。體"}}, + {encoding: "euc-kr", strings: []string{" 나는 유리를 먹을 수 있어요.", " 그래도 아프지 않아요"}}, + {encoding: "euc-jp", strings: []string{"私はガラスを食べられます。", "それは私を傷つけません。"}}, + {encoding: "plain", strings: []string{"I can", "eat glass"}}, + {encoding: "iso8859-1", strings: []string{"Filebeat is my favourite"}}, + {encoding: "iso8859-2", strings: []string{"Filebeat je môj obľúbený"}}, // slovak: filebeat is my favourite + {encoding: "iso8859-3", strings: []string{"büyükannem Filebeat kullanıyor"}}, // turkish: my granmother uses filebeat + {encoding: "iso8859-4", strings: []string{"Filebeat on mõeldud kõigile"}}, // estonian: filebeat is for everyone + {encoding: "iso8859-5", strings: []string{"я люблю кодировки"}}, // russian: i love encodings + {encoding: "iso8859-6", strings: []string{"أنا بحاجة إلى المزيد من الترميزات"}}, // arabic: i need more encodings + {encoding: "iso8859-7", strings: []string{"όπου μπορώ να αγοράσω περισσότερες κωδικοποιήσεις"}}, // greek: where can i buy more encodings? + {encoding: "iso8859-8", strings: []string{"אני צריך קידוד אישי"}}, // hebrew: i need a personal encoding + {encoding: "iso8859-9", strings: []string{"kodlamaları pişirebilirim"}}, // turkish: i can cook encodings + {encoding: "iso8859-10", strings: []string{"koodaukset jäädyttävät nollaan"}}, // finnish: encodings freeze below zero + {encoding: "iso8859-13", strings: []string{"mój pies zjada kodowanie"}}, // polish: my dog eats encodings + {encoding: "iso8859-14", strings: []string{"An féidir leat cáise a ionchódú?"}}, // irish: can you encode a cheese? + {encoding: "iso8859-15", strings: []string{"bedes du kode", "for min €"}}, // danish: please encode my euro symbol + {encoding: "iso8859-16", strings: []string{"rossz karakterkódolást", "használsz"}}, // hungarian: you use the wrong character encoding + {encoding: "koi8r", strings: []string{"я люблю кодировки"}}, // russian: i love encodings + {encoding: "koi8u", strings: []string{"я люблю кодировки"}}, // russian: i love encodings + {encoding: "windows1250", strings: []string{"Filebeat je môj obľúbený"}}, // slovak: filebeat is my favourite + {encoding: "windows1251", strings: []string{"я люблю кодировки"}}, // russian: i love encodings + {encoding: "windows1252", strings: []string{"what is better than an encoding?", "a legacy encoding"}}, + {encoding: "windows1253", strings: []string{"όπου μπορώ να αγοράσω", "περισσότερες κωδικοποιήσεις"}}, // greek: where can i buy more encodings? + {encoding: "windows1254", strings: []string{"kodlamaları", "pişirebilirim"}}, // turkish: i can cook encodings + {encoding: "windows1255", strings: []string{"אני צריך קידוד אישי"}}, // hebrew: i need a personal encoding + {encoding: "windows1256", strings: []string{"أنا بحاجة إلى المزيد من الترميزات"}}, // arabic: i need more encodings + {encoding: "windows1257", strings: []string{"toite", "kodeerijaid"}}, // estonian: feed the encoders } func TestReaderEncodings(t *testing.T) { @@ -88,19 +93,21 @@ func TestReaderEncodings(t *testing.T) { buffer := bytes.NewBuffer(nil) codec, _ := codecFactory(buffer) - nl := lineTerminatorCharacters[LineFeed] + nl := lineTerminatorCharacters[test.lineTerminator] // write with encoding to buffer writer := transform.NewWriter(buffer, codec.NewEncoder()) var expectedCount []int - for _, line := range test.strings { - writer.Write([]byte(line)) - writer.Write(nl) + for i, line := range test.strings { + _, _ = writer.Write([]byte(line)) + if !test.collectOnEOF || i < len(test.strings)-1 || test.withEOL { + _, _ = writer.Write(nl) + } expectedCount = append(expectedCount, buffer.Len()) } // create line reader - reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, LineFeed, unlimited}) + reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, test.lineTerminator, unlimited, test.collectOnEOF}) if err != nil { t.Fatal("failed to initialize reader:", err) } @@ -112,15 +119,19 @@ func TestReaderEncodings(t *testing.T) { for { bytes, sz, err := reader.Next() if sz > 0 { - readLines = append(readLines, string(bytes[:len(bytes)-len(nl)])) + offset := len(bytes) + if offset > 0 && (!test.collectOnEOF || !errors.Is(err, io.EOF) || test.withEOL) { + offset -= len(nl) + } + readLines = append(readLines, string(bytes[:offset])) } + current += sz + byteCounts = append(byteCounts, current) + if err != nil { break } - - current += sz - byteCounts = append(byteCounts, current) } // validate lines and byte offsets @@ -136,10 +147,65 @@ func TestReaderEncodings(t *testing.T) { } } + invalidLineTerminatorForEncoding := map[string][]LineTerminator{ + "latin1": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "big5": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "euc-kr": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "euc-jp": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-1": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-2": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-3": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-4": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-5": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-6": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-7": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-8": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-9": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-10": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-13": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-14": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-15": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "iso8859-16": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "koi8r": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "koi8u": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1250": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1251": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1252": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1253": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1254": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1255": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1256": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "windows1257": []LineTerminator{LineSeparator, NextLine, ParagraphSeparator}, + "utf-16be": []LineTerminator{NextLine}, // test fails: buf ends with uint8{189} instead of uint8{133} + "gb18030": []LineTerminator{NextLine}, // test fails: buf ends with uint8{189} instead of uint8{133} + "utf-16le": []LineTerminator{NextLine}, // test fails: buf ends with uint8{189} instead of uint8{133} + } for _, test := range tests { - t.Run(test.encoding, func(t *testing.T) { - runTest(t, test) - }) + for _, collectOnEOF := range []bool{false, true} { + for _, withEOL := range []bool{false, true} { + for lineTerminatorName, lineTerminator := range lineTerminators { + lineTerminatorIsInvalid := false + if invalidLineTerminatorForEncoding, ok := invalidLineTerminatorForEncoding[test.encoding]; ok { + for _, invalidLineTerminator := range invalidLineTerminatorForEncoding { + if invalidLineTerminator == lineTerminator { + lineTerminatorIsInvalid = true + break + } + } + } + if lineTerminatorIsInvalid { + continue + } + + test.withEOL = withEOL + test.collectOnEOF = collectOnEOF + test.lineTerminator = lineTerminator + t.Run(fmt.Sprintf("encoding: %s, collect on EOF: %t, with EOL: %t, line terminator: %s", test.encoding, test.collectOnEOF, test.withEOL, lineTerminatorName), func(t *testing.T) { + runTest(t, test) + }) + } + } + } } } @@ -160,7 +226,7 @@ func TestLineTerminators(t *testing.T) { buffer.Write([]byte("this is my second line")) buffer.Write(nl) - reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, terminator, unlimited}) + reader, err := NewLineReader(ioutil.NopCloser(buffer), Config{codec, 1024, terminator, unlimited, false}) if err != nil { t.Errorf("failed to initialize reader: %v", err) continue @@ -238,7 +304,7 @@ func testReadLines(t *testing.T, inputLines [][]byte, eofOnLastRead bool) { } codec, _ := encoding.Plain(r) - reader, err := NewLineReader(ioutil.NopCloser(r), Config{codec, buffer.Len(), LineFeed, unlimited}) + reader, err := NewLineReader(ioutil.NopCloser(r), Config{codec, buffer.Len(), LineFeed, unlimited, false}) if err != nil { t.Fatalf("Error initializing reader: %v", err) } @@ -261,10 +327,6 @@ func testReadLines(t *testing.T, inputLines [][]byte, eofOnLastRead bool) { } } -func testReadLine(t *testing.T, line []byte) { - testReadLines(t, [][]byte{line}, false) -} - func randomInt(r *rand.Rand, min, max int) int { return r.Intn(max+1-min) + min } @@ -358,7 +420,7 @@ func TestMaxBytesLimit(t *testing.T) { } // Create line reader - reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, LineFeed, lineMaxLimit}) + reader, err := NewLineReader(ioutil.NopCloser(strings.NewReader(input)), Config{codec, bufferSize, LineFeed, lineMaxLimit, false}) if err != nil { t.Fatal("failed to initialize reader:", err) } @@ -372,7 +434,7 @@ func TestMaxBytesLimit(t *testing.T) { for i := 0; ; i++ { b, n, err := reader.Next() if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { readLen += n break } else { @@ -418,7 +480,7 @@ func TestBufferSize(t *testing.T) { bufferSize := 10 in := ioutil.NopCloser(strings.NewReader(strings.Join(lines, ""))) - reader, err := NewLineReader(in, Config{codec, bufferSize, AutoLineTerminator, 1024}) + reader, err := NewLineReader(in, Config{codec, bufferSize, AutoLineTerminator, 1024, false}) if err != nil { t.Fatal("failed to initialize reader:", err) } @@ -426,7 +488,7 @@ func TestBufferSize(t *testing.T) { for i := 0; i < len(lines); i++ { b, n, err := reader.Next() if err != nil { - if err == io.EOF { + if errors.Is(err, io.EOF) { break } else { t.Fatal("unexpected error:", err) diff --git a/x-pack/filebeat/input/awss3/s3_objects.go b/x-pack/filebeat/input/awss3/s3_objects.go index 0893a664e76a..e88de239c022 100644 --- a/x-pack/filebeat/input/awss3/s3_objects.go +++ b/x-pack/filebeat/input/awss3/s3_objects.go @@ -283,10 +283,11 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { var reader reader.Reader reader, err = readfile.NewEncodeReader(ioutil.NopCloser(r), readfile.Config{ - Codec: enc, - BufferSize: int(p.readerConfig.BufferSize), - Terminator: p.readerConfig.LineTerminator, - MaxBytes: int(p.readerConfig.MaxBytes) * 4, + Codec: enc, + BufferSize: int(p.readerConfig.BufferSize), + Terminator: p.readerConfig.LineTerminator, + CollectOnEOF: true, + MaxBytes: int(p.readerConfig.MaxBytes) * 4, }) if err != nil { return fmt.Errorf("failed to create encode reader: %w", err) @@ -299,6 +300,13 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { var offset int64 for { message, err := reader.Next() + if len(message.Content) > 0 { + event := p.createEvent(string(message.Content), offset) + event.Fields.DeepUpdate(message.Fields) + offset += int64(message.Bytes) + p.publish(p.acker, &event) + } + if errors.Is(err, io.EOF) { // No more lines break @@ -306,11 +314,6 @@ func (p *s3ObjectProcessor) readFile(r io.Reader) error { if err != nil { return fmt.Errorf("error reading message: %w", err) } - - event := p.createEvent(string(message.Content), offset) - event.Fields.DeepUpdate(message.Fields) - offset += int64(message.Bytes) - p.publish(p.acker, &event) } return nil diff --git a/x-pack/filebeat/input/awss3/s3_objects_test.go b/x-pack/filebeat/input/awss3/s3_objects_test.go index a3a9168d9672..fe6607657885 100644 --- a/x-pack/filebeat/input/awss3/s3_objects_test.go +++ b/x-pack/filebeat/input/awss3/s3_objects_test.go @@ -198,6 +198,14 @@ func TestS3ObjectProcessor(t *testing.T) { err := s3ObjProc.Create(ctx, logp.NewLogger(inputName), mockPublisher, ack, s3Event).ProcessS3Object() require.NoError(t, err) }) + + t.Run("text file without end of line marker", func(t *testing.T) { + testProcessS3Object(t, "testdata/no-eol.txt", "text/plain", 1) + }) + + t.Run("text file without end of line marker but with newline", func(t *testing.T) { + testProcessS3Object(t, "testdata/no-eol-twolines.txt", "text/plain", 2) + }) } func testProcessS3Object(t testing.TB, file, contentType string, numEvents int, selectors ...fileSelectorConfig) []beat.Event { diff --git a/x-pack/filebeat/input/awss3/testdata/no-eol-twolines.txt b/x-pack/filebeat/input/awss3/testdata/no-eol-twolines.txt new file mode 100644 index 000000000000..22ab1cb28e51 --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/no-eol-twolines.txt @@ -0,0 +1,2 @@ +Line1: This file does contain a final EOL. +Line2: This file does contain a final EOL. \ No newline at end of file diff --git a/x-pack/filebeat/input/awss3/testdata/no-eol.txt b/x-pack/filebeat/input/awss3/testdata/no-eol.txt new file mode 100644 index 000000000000..0b7757db86e8 --- /dev/null +++ b/x-pack/filebeat/input/awss3/testdata/no-eol.txt @@ -0,0 +1 @@ +This file does contain a final EOL. \ No newline at end of file