From d3fc90abaf0376211d83344ac3e37aa67f15613e Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 2 Oct 2024 16:31:26 +0300 Subject: [PATCH 1/3] offsets interface in pipeline in --- pipeline/pipeline.go | 21 +++++++++++------- pipeline/pipeline_test.go | 5 +++-- .../add_file_name/add_file_name_test.go | 4 ++-- plugin/action/add_host/add_host_test.go | 2 +- .../action/convert_date/convert_date_test.go | 8 +++---- .../convert_log_level_test.go | 2 +- .../convert_utf8_bytes_test.go | 2 +- plugin/action/discard/discard_test.go | 4 ++-- plugin/action/flatten/flatten_test.go | 2 +- plugin/action/join/join_test.go | 4 ++-- .../join_template/join_template_test.go | 4 ++-- plugin/action/json_decode/json_decode_test.go | 2 +- plugin/action/json_encode/json_encode_test.go | 2 +- .../action/json_extract/json_extract_test.go | 2 +- plugin/action/keep_fields/keep_fields_test.go | 6 ++--- plugin/action/mask/mask_test.go | 6 ++--- plugin/action/modify/modify_test.go | 6 ++--- plugin/action/move/move_test.go | 2 +- plugin/action/parse_es/pipeline_test.go | 2 +- plugin/action/parse_re2/parse_re2_test.go | 6 ++--- .../remove_fields/remove_fields_test.go | 12 +++++----- plugin/action/rename/rename_test.go | 12 +++++----- plugin/action/set_time/set_time_test.go | 2 +- plugin/action/split/split_test.go | 6 ++--- plugin/action/throttle/throttle_test.go | 14 ++++++------ plugin/input/dmesg/dmesg.go | 12 +++++++++- plugin/input/fake/fake.go | 2 +- plugin/input/file/worker.go | 18 +++++++++++++-- plugin/input/file/worker_test.go | 2 +- plugin/input/http/http.go | 16 +++++++++++--- plugin/input/journalctl/journalctl.go | 12 +++++++++- plugin/input/k8s/k8s_test.go | 22 +++++++++---------- plugin/input/kafka/consumer.go | 12 +++++++++- test/file_base.go | 2 +- test/test.go | 10 +++++++++ 35 files changed, 158 insertions(+), 88 deletions(-) diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 62a675308..28d705672 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -44,7 +44,7 @@ const ( type finalizeFn = func(event *Event, notifyInput bool, backEvent bool) type InputPluginController interface { - In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta metadata.MetaData) uint64 + In(sourceID SourceID, sourceName string, offset Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64 UseSpread() // don't use stream field and spread all events across all processors DisableStreams() // don't use stream field SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder @@ -354,8 +354,13 @@ func (p *Pipeline) GetOutput() OutputPlugin { return p.output } +type Offsets interface { + Current() int64 + ByStream(stream string) int64 +} + // In decodes message and passes it to event stream. -func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) { +func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) { length := len(bytes) // don't process mud. @@ -386,7 +391,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes row, err = decoder.DecodeCRI(bytes) if err != nil { p.wrongEventCRIFormatMetric.Inc() - p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset, length, err.Error(), sourceID, sourceName, bytes)) + p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset.Current(), length, err.Error(), sourceID, sourceName, bytes)) return EventSeqIDError } } @@ -447,7 +452,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes } p.logger.Log(level, "wrong json format", zap.Error(err), - zap.Int64("offset", offset), + zap.Int64("offset", offset.Current()), zap.Int("length", length), zap.Uint64("source", uint64(sourceID)), zap.String("source_name", sourceName), @@ -470,7 +475,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes err := decoder.DecodePostgres(event.Root, bytes) if err != nil { p.logger.Fatal("wrong postgres format", zap.Error(err), - zap.Int64("offset", offset), + zap.Int64("offset", offset.Current()), zap.Int("length", length), zap.Uint64("source", uint64(sourceID)), zap.String("source_name", sourceName), @@ -489,7 +494,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes } p.logger.Log(level, "wrong nginx error log format", zap.Error(err), - zap.Int64("offset", offset), + zap.Int64("offset", offset.Current()), zap.Int("length", length), zap.Uint64("source", uint64(sourceID)), zap.String("source_name", sourceName), @@ -503,7 +508,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes err = p.decoder.Decode(event.Root, bytes) if err != nil { p.logger.Fatal("wrong protobuf format", zap.Error(err), - zap.Int64("offset", offset), + zap.Int64("offset", offset.Current()), zap.Int("length", length), zap.Uint64("source", uint64(sourceID)), zap.String("source_name", sourceName), @@ -533,7 +538,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes } } - event.Offset = offset + event.Offset = offset.Current() event.SourceID = sourceID event.SourceName = sourceName event.streamName = DefaultStreamName diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index 675f99a11..c6a5fff20 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -6,6 +6,7 @@ import ( "github.com/ozontech/file.d/pipeline" "github.com/ozontech/file.d/plugin/input/fake" + "github.com/ozontech/file.d/test" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -41,7 +42,7 @@ func TestInUnparsableMessages(t *testing.T) { pipe.SetInput(getFakeInputInfo()) - seqID := pipe.In(sourceID, "kafka", offset, message, false, nil) + seqID := pipe.In(sourceID, "kafka", test.Offset(offset), message, false, nil) require.Equal(t, pipeline.EventSeqIDError, seqID) refPipe := reflect.ValueOf(pipe) @@ -119,7 +120,7 @@ func TestInInvalidMessages(t *testing.T) { pipe.SetInput(getFakeInputInfo()) - seqID := pipe.In(tCase.sourceID, "kafka", tCase.offset, tCase.message, false, nil) + seqID := pipe.In(tCase.sourceID, "kafka", test.Offset(tCase.offset), tCase.message, false, nil) require.Equal(t, pipeline.EventSeqIDError, seqID) }) } diff --git a/plugin/action/add_file_name/add_file_name_test.go b/plugin/action/add_file_name/add_file_name_test.go index 7278e1b16..d27e10d88 100644 --- a/plugin/action/add_file_name/add_file_name_test.go +++ b/plugin/action/add_file_name/add_file_name_test.go @@ -21,8 +21,8 @@ func TestModify(t *testing.T) { wg.Done() }) - input.In(0, sourceName, 0, []byte(`{"error":"info about error"}`)) - input.In(0, sourceName, 0, []byte(`{"file":"not_my_file"}`)) + input.In(0, sourceName, test.Offset(0), []byte(`{"error":"info about error"}`)) + input.In(0, sourceName, test.Offset(0), []byte(`{"file":"not_my_file"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/add_host/add_host_test.go b/plugin/action/add_host/add_host_test.go index 8f21492e0..e7a86bc5f 100644 --- a/plugin/action/add_host/add_host_test.go +++ b/plugin/action/add_host/add_host_test.go @@ -22,7 +22,7 @@ func TestModify(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{}`)) wg.Wait() p.Stop() diff --git a/plugin/action/convert_date/convert_date_test.go b/plugin/action/convert_date/convert_date_test.go index 992be464b..77597dfb7 100644 --- a/plugin/action/convert_date/convert_date_test.go +++ b/plugin/action/convert_date/convert_date_test.go @@ -23,9 +23,9 @@ func TestConvert(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"time":998578502}`)) - input.In(0, "test.log", 0, []byte(`{"time":998578999.1346}`)) - input.In(0, "test.log", 0, []byte(`{"time":"2022/02/07 13:06:14"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578502}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578999.1346}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"time":"2022/02/07 13:06:14"}`)) wg.Wait() p.Stop() @@ -52,7 +52,7 @@ func TestConvertFail(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"time":"XXX"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"time":"XXX"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/convert_log_level/convert_log_level_test.go b/plugin/action/convert_log_level/convert_log_level_test.go index ebe469b24..0b34906f7 100644 --- a/plugin/action/convert_log_level/convert_log_level_test.go +++ b/plugin/action/convert_log_level/convert_log_level_test.go @@ -235,7 +235,7 @@ func TestDo(t *testing.T) { }) for _, log := range tc.In { - input.In(0, "test.log", 0, []byte(log)) + input.In(0, "test.log", test.Offset(0), []byte(log)) } now := time.Now() diff --git a/plugin/action/convert_utf8_bytes/convert_utf8_bytes_test.go b/plugin/action/convert_utf8_bytes/convert_utf8_bytes_test.go index 1851dd540..5cdf38b54 100644 --- a/plugin/action/convert_utf8_bytes/convert_utf8_bytes_test.go +++ b/plugin/action/convert_utf8_bytes/convert_utf8_bytes_test.go @@ -149,7 +149,7 @@ func TestConvertUTF8Bytes(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(tt.in)) + input.In(0, "test.log", test.Offset(0), []byte(tt.in)) wg.Wait() p.Stop() diff --git a/plugin/action/discard/discard_test.go b/plugin/action/discard/discard_test.go index e1e52f8e9..ec8eedd76 100644 --- a/plugin/action/discard/discard_test.go +++ b/plugin/action/discard/discard_test.go @@ -139,10 +139,10 @@ func TestDiscard(t *testing.T) { }) for _, e := range tt.passEvents { - input.In(0, "test", 0, []byte(e)) + input.In(0, "test", test.Offset(0), []byte(e)) } for _, e := range tt.discardEvents { - input.In(0, "test", 0, []byte(e)) + input.In(0, "test", test.Offset(0), []byte(e)) } wg.Wait() diff --git a/plugin/action/flatten/flatten_test.go b/plugin/action/flatten/flatten_test.go index ca5668be5..b8e9ebd50 100644 --- a/plugin/action/flatten/flatten_test.go +++ b/plugin/action/flatten/flatten_test.go @@ -25,7 +25,7 @@ func TestFlatten(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"complex":{"a":"b","c":"d"}}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"complex":{"a":"b","c":"d"}}`)) wg.Wait() p.Stop() diff --git a/plugin/action/join/join_test.go b/plugin/action/join/join_test.go index 9207dcaaa..b407b5e48 100644 --- a/plugin/action/join/join_test.go +++ b/plugin/action/join/join_test.go @@ -172,7 +172,7 @@ func TestSimpleJoin(t *testing.T) { for i := 0; i < tt.iterations; i++ { for m, line := range lines { - input.In(0, "test.log", int64(i*10000+m), []byte(line)) + input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line)) } } @@ -271,7 +271,7 @@ func TestJoinAfterNilNode(t *testing.T) { for i := 0; i < tt.iterations; i++ { for m, line := range lines { - input.In(0, "test.log", int64(i*10000+m), []byte(line)) + input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line)) } } diff --git a/plugin/action/join_template/join_template_test.go b/plugin/action/join_template/join_template_test.go index 32d04661c..a7f2cbf73 100644 --- a/plugin/action/join_template/join_template_test.go +++ b/plugin/action/join_template/join_template_test.go @@ -447,7 +447,7 @@ func TestSimpleJoin(t *testing.T) { for i := 0; i < tt.iterations; i++ { for m, line := range lines { - input.In(0, "test.log", int64(i*10000+m), []byte(line)) + input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line)) } } @@ -534,7 +534,7 @@ func TestJoinAfterNilNode(t *testing.T) { for i := 0; i < tt.iterations; i++ { for m, line := range lines { - input.In(0, "test.log", int64(i*10000+m), []byte(line)) + input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line)) } } diff --git a/plugin/action/json_decode/json_decode_test.go b/plugin/action/json_decode/json_decode_test.go index d6af17f20..433b3a8b1 100644 --- a/plugin/action/json_decode/json_decode_test.go +++ b/plugin/action/json_decode/json_decode_test.go @@ -19,7 +19,7 @@ func TestDecode(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/json_encode/json_encode_test.go b/plugin/action/json_encode/json_encode_test.go index 02f1ab6bf..4921b90b3 100644 --- a/plugin/action/json_encode/json_encode_test.go +++ b/plugin/action/json_encode/json_encode_test.go @@ -20,7 +20,7 @@ func TestEncode(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"server":{"os":"linux","arch":"amd64"}}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"server":{"os":"linux","arch":"amd64"}}`)) wg.Wait() p.Stop() diff --git a/plugin/action/json_extract/json_extract_test.go b/plugin/action/json_extract/json_extract_test.go index c9f4aa017..ab38b819c 100644 --- a/plugin/action/json_extract/json_extract_test.go +++ b/plugin/action/json_extract/json_extract_test.go @@ -118,7 +118,7 @@ func TestJsonExtract(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(tt.in)) + input.In(0, "test.log", test.Offset(0), []byte(tt.in)) wg.Wait() p.Stop() diff --git a/plugin/action/keep_fields/keep_fields_test.go b/plugin/action/keep_fields/keep_fields_test.go index 60699b171..cd4e5386e 100644 --- a/plugin/action/keep_fields/keep_fields_test.go +++ b/plugin/action/keep_fields/keep_fields_test.go @@ -21,9 +21,9 @@ func TestKeepFields(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"field_1":"value_1","a":"b"}`)) - input.In(0, "test.log", 0, []byte(`{"field_2":"value_2","b":"c"}`)) - input.In(0, "test.log", 0, []byte(`{"field_3":"value_3","a":"b"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1","a":"b"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2","b":"c"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3","a":"b"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/mask/mask_test.go b/plugin/action/mask/mask_test.go index 6ed474eae..5125d0890 100644 --- a/plugin/action/mask/mask_test.go +++ b/plugin/action/mask/mask_test.go @@ -826,7 +826,7 @@ func TestPlugin(t *testing.T) { }) for _, in := range s.input { - input.In(0, "test.log", 0, []byte(in)) + input.In(0, "test.log", test.Offset(0), []byte(in)) } wg.Wait() @@ -899,7 +899,7 @@ func TestWithEmptyRegex(t *testing.T) { }) for _, in := range s.input { - input.In(0, "test.log", 0, []byte(in)) + input.In(0, "test.log", test.Offset(0), []byte(in)) } wg.Wait() @@ -1090,7 +1090,7 @@ func TestPluginWithComplexMasks(t *testing.T) { }) for _, in := range s.input { - input.In(0, "test.log", 0, []byte(in)) + input.In(0, "test.log", test.Offset(0), []byte(in)) } wg.Wait() diff --git a/plugin/action/modify/modify_test.go b/plugin/action/modify/modify_test.go index 103a8c726..121a16496 100644 --- a/plugin/action/modify/modify_test.go +++ b/plugin/action/modify/modify_test.go @@ -26,7 +26,7 @@ func TestModify(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"existing_field":"existing_value","my_object":{"field":{"subfield":"subfield_value"}}}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"existing_field":"existing_value","my_object":{"field":{"subfield":"subfield_value"}}}`)) wg.Wait() p.Stop() @@ -75,7 +75,7 @@ func TestModifyRegex(t *testing.T) { wg.Add(len(testEvents)) for _, te := range testEvents { - input.In(0, "test.log", 0, te.in) + input.In(0, "test.log", test.Offset(0), te.in) } wg.Wait() @@ -138,7 +138,7 @@ func TestModifyTrim(t *testing.T) { wg.Add(len(testEvents)) for _, te := range testEvents { - input.In(0, "test.log", 0, te.in) + input.In(0, "test.log", test.Offset(0), te.in) } wg.Wait() diff --git a/plugin/action/move/move_test.go b/plugin/action/move/move_test.go index 90dc7182e..2b7dfa85d 100644 --- a/plugin/action/move/move_test.go +++ b/plugin/action/move/move_test.go @@ -256,7 +256,7 @@ func TestMove(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(tt.in)) + input.In(0, "test.log", test.Offset(0), []byte(tt.in)) wg.Wait() p.Stop() diff --git a/plugin/action/parse_es/pipeline_test.go b/plugin/action/parse_es/pipeline_test.go index 9d4f967b4..707c006ac 100644 --- a/plugin/action/parse_es/pipeline_test.go +++ b/plugin/action/parse_es/pipeline_test.go @@ -110,7 +110,7 @@ func TestPipeline(t *testing.T) { }) for _, event := range tCase.eventsIn { - input.In(event.sourceID, event.sourceName, event.offset, event.bytes) + input.In(event.sourceID, event.sourceName, test.Offset(event.offset), event.bytes) } wg.Wait() diff --git a/plugin/action/parse_re2/parse_re2_test.go b/plugin/action/parse_re2/parse_re2_test.go index 54f2d3198..288695a65 100644 --- a/plugin/action/parse_re2/parse_re2_test.go +++ b/plugin/action/parse_re2/parse_re2_test.go @@ -25,7 +25,7 @@ func TestDecode(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"log":"2021-06-22 16:24:27 GMT [7291] => [2-1] client=test_client,db=test_db,user=test_user LOG: listening on IPv4 address \"0.0.0.0\", port 5432"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"log":"2021-06-22 16:24:27 GMT [7291] => [2-1] client=test_client,db=test_db,user=test_user LOG: listening on IPv4 address \"0.0.0.0\", port 5432"}`)) wg.Wait() p.Stop() @@ -46,8 +46,8 @@ func TestDecodeAccessLogsJira(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"message":"10.115.195.13 0x51320775x2 jira_robot [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1\" 200 198 20 \"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365\" \"Apache-HttpClient/4.5.13 (Java/11.0.9)\" \"nj56zg\""}`)) - input.In(0, "test.log", 0, []byte(`{"message":"10.115.195.12 0x51320774x2 ezabelin [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1\" 201 158 15 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"1tmznt9\""}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"message":"10.115.195.13 0x51320775x2 jira_robot [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1\" 200 198 20 \"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365\" \"Apache-HttpClient/4.5.13 (Java/11.0.9)\" \"nj56zg\""}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"message":"10.115.195.12 0x51320774x2 ezabelin [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1\" 201 158 15 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"1tmznt9\""}`)) wg.Wait() p.Stop() diff --git a/plugin/action/remove_fields/remove_fields_test.go b/plugin/action/remove_fields/remove_fields_test.go index 685c68488..2638bfd8e 100644 --- a/plugin/action/remove_fields/remove_fields_test.go +++ b/plugin/action/remove_fields/remove_fields_test.go @@ -22,9 +22,9 @@ func TestRemoveFields(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"field_1":"value_1","a":"b"}`)) - input.In(0, "test.log", 0, []byte(`{"field_2":"value_2","b":"c"}`)) - input.In(0, "test.log", 0, []byte(`{"field_3":"value_3","a":"b"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1","a":"b"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2","b":"c"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3","a":"b"}`)) wg.Wait() p.Stop() @@ -47,9 +47,9 @@ func TestRemoveNestedFields(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"a":"some"}`)) - input.In(0, "test.log", 0, []byte(`{"a":{"b":"deleted"}}`)) - input.In(0, "test.log", 0, []byte(`{"a":{"b":{"c":["deleted"]},"d":"saved"}}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"a":"some"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"a":{"b":"deleted"}}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"a":{"b":{"c":["deleted"]},"d":"saved"}}`)) wg.Wait() p.Stop() diff --git a/plugin/action/rename/rename_test.go b/plugin/action/rename/rename_test.go index 841f6ff4a..d57ba516c 100644 --- a/plugin/action/rename/rename_test.go +++ b/plugin/action/rename/rename_test.go @@ -31,11 +31,11 @@ func TestRename(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"field_1":"value_1"}`)) - input.In(0, "test.log", 0, []byte(`{"field_2":"value_2"}`)) - input.In(0, "test.log", 0, []byte(`{"field_3":"value_3"}`)) - input.In(0, "test.log", 0, []byte(`{"field_4":{"field_5":"value_5"}}`)) - input.In(0, "test.log", 0, []byte(`{"k8s_node_label_topology.kubernetes.io/zone":"value_6"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"field_4":{"field_5":"value_5"}}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"k8s_node_label_topology.kubernetes.io/zone":"value_6"}`)) wg.Wait() p.Stop() @@ -71,7 +71,7 @@ func TestRenamingSequence(t *testing.T) { wg.Done() }) - input.In(0, "test.log", 0, []byte(`{"key1":"value_1"}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"key1":"value_1"}`)) wg.Wait() p.Stop() diff --git a/plugin/action/set_time/set_time_test.go b/plugin/action/set_time/set_time_test.go index 43eade9e7..160f20f2b 100644 --- a/plugin/action/set_time/set_time_test.go +++ b/plugin/action/set_time/set_time_test.go @@ -142,7 +142,7 @@ func TestE2E_Plugin(t *testing.T) { }) counter.Add(1) - input.In(0, "test.log", 0, []byte(`{"message":123}`)) + input.In(0, "test.log", test.Offset(0), []byte(`{"message":123}`)) for counter.Load() != 0 { time.Sleep(time.Millisecond * 10) diff --git a/plugin/action/split/split_test.go b/plugin/action/split/split_test.go index 90ff483a6..56074440f 100644 --- a/plugin/action/split/split_test.go +++ b/plugin/action/split/split_test.go @@ -31,14 +31,14 @@ func TestPlugin_Do(t *testing.T) { splitted = append(splitted, strings.Clone(e.Root.Dig("message").AsString())) }) - input.In(0, "test.log", 0, []byte(`{ + input.In(0, "test.log", test.Offset(0), []byte(`{ "data": [ { "message": "go" }, { "message": "rust" }, { "message": "c++" } ] }`)) - input.In(0, "test.log", 0, []byte(`{ + input.In(0, "test.log", test.Offset(0), []byte(`{ "data": [ { "message": "python" }, { "message": "ruby" }, @@ -79,7 +79,7 @@ func TestPlugin_DoArray(t *testing.T) { splitted = append(splitted, strings.Clone(e.Root.Dig("message").AsString())) }) - input.In(0, sourceName, 0, []byte(`[ + input.In(0, sourceName, test.Offset(0), []byte(`[ { "message": "go" }, { "message": "rust" }, { "message": "c++" } diff --git a/plugin/action/throttle/throttle_test.go b/plugin/action/throttle/throttle_test.go index 33b45f14f..ed1c9de8a 100644 --- a/plugin/action/throttle/throttle_test.go +++ b/plugin/action/throttle/throttle_test.go @@ -79,7 +79,7 @@ func (c *testConfig) runPipeline() { index := j % len(formats) // Format like RFC3339Nano, but nanoseconds are zero-padded, thus all times have equal length. json := fmt.Sprintf(formats[index], curTimeStr) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], 0, []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) } // just to make sure that events from the current iteration are processed in the plugin time.Sleep(10 * time.Millisecond) @@ -274,7 +274,7 @@ func TestRedisThrottle(t *testing.T) { for i := 0; i < eventsTotal; i++ { json := fmt.Sprintf(events[i], time.Now().Format(time.RFC3339Nano)) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], 0, []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) time.Sleep(300 * time.Millisecond) } @@ -355,7 +355,7 @@ func TestRedisThrottleMultiPipes(t *testing.T) { } for i := 0; i < len(firstPipeEvents); i++ { json := fmt.Sprintf(firstPipeEvents[i], time.Now().Format(time.RFC3339Nano)) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], 0, []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) // timeout required due shifting time call to redis time.Sleep(100 * time.Millisecond) } @@ -364,7 +364,7 @@ func TestRedisThrottleMultiPipes(t *testing.T) { for i := 0; i < len(secondPipeEvents); i++ { json := fmt.Sprintf(secondPipeEvents[i], time.Now().Format(time.RFC3339Nano)) - inputSec.In(10, sourceNames[rand.Int()%len(sourceNames)], 0, []byte(json)) + inputSec.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) // timeout required due shifting time call to redis time.Sleep(100 * time.Millisecond) } @@ -436,7 +436,7 @@ func TestRedisThrottleWithCustomLimitData(t *testing.T) { for i := 0; i < eventsTotal; i++ { json := fmt.Sprintf(events[i], nowTs) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], 0, []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) time.Sleep(300 * time.Millisecond) } @@ -486,7 +486,7 @@ func TestThrottleLimiterExpiration(t *testing.T) { for i := 0; i < eventsTotal; i++ { json := fmt.Sprintf(events[i], nowTs) - input.In(10, sourceNames[rand.Int()%len(sourceNames)], 0, []byte(json)) + input.In(10, sourceNames[rand.Int()%len(sourceNames)], test.Offset(0), []byte(json)) time.Sleep(10 * time.Millisecond) } @@ -616,7 +616,7 @@ func TestThrottleWithDistribution(t *testing.T) { nowTs := time.Now().Format(time.RFC3339Nano) for i := 0; i < len(events); i++ { json := fmt.Sprintf(events[i], nowTs) - input.In(0, "test", 0, []byte(json)) + input.In(0, "test", test.Offset(0), []byte(json)) } wgWaitWithTimeout := func(wg *sync.WaitGroup, timeout time.Duration) bool { diff --git a/plugin/input/dmesg/dmesg.go b/plugin/input/dmesg/dmesg.go index e1a45bedf..a6762dfba 100644 --- a/plugin/input/dmesg/dmesg.go +++ b/plugin/input/dmesg/dmesg.go @@ -111,7 +111,7 @@ func (p *Plugin) read() { out = root.Encode(out[:0]) - p.controller.In(0, "dmesg", ts, out, false, nil) + p.controller.In(0, "dmesg", Offset(ts), out, false, nil) } } @@ -134,3 +134,13 @@ func (p *Plugin) Commit(event *pipeline.Event) { func (p *Plugin) PassEvent(event *pipeline.Event) bool { return true } + +type Offset int64 + +func (o Offset) Current() int64 { + return int64(o) +} + +func (o Offset) ByStream(_ string) int64 { + panic("unimplemented") +} diff --git a/plugin/input/fake/fake.go b/plugin/input/fake/fake.go index 9a1ca97de..3d6250ba4 100644 --- a/plugin/input/fake/fake.go +++ b/plugin/input/fake/fake.go @@ -45,7 +45,7 @@ func (p *Plugin) Commit(event *pipeline.Event) { // ^ fn-list // > It sends a test event into the pipeline. -func (p *Plugin) In(sourceID pipeline.SourceID, sourceName string, offset int64, bytes []byte) { // * +func (p *Plugin) In(sourceID pipeline.SourceID, sourceName string, offset pipeline.Offsets, bytes []byte) { // * if p.inFn != nil { p.inFn() } diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index 0681352b7..aa37bb1f1 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -17,7 +17,7 @@ type worker struct { type inputer interface { // In processes event and returns it seq number. - In(sourceID pipeline.SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta metadata.MetaData) uint64 + In(sourceID pipeline.SourceID, sourceName string, offset pipeline.Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64 IncReadOps() IncMaxEventSizeExceeded() } @@ -125,7 +125,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi } } - job.lastEventSeq = controller.In(sourceID, sourceName, lastOffset+scanned, inBuf, isVirgin, metadataInfo) + job.lastEventSeq = controller.In(sourceID, sourceName, Offset{lastOffset + scanned, job.offsets}, inBuf, isVirgin, metadataInfo) } // restore the line buffer accumBuf = accumBuf[:0] @@ -198,3 +198,17 @@ func (m metaInformation) GetData() map[string]any { "offset": m.offset, } } + +type Offset struct { + current int64 + offsets sliceMap +} + +func (o Offset) Current() int64 { + return int64(o.current) +} + +func (o Offset) ByStream(stream string) int64 { + offset, _ := o.offsets.get(pipeline.StreamName(stream)) + return offset +} diff --git a/plugin/input/file/worker_test.go b/plugin/input/file/worker_test.go index c5176b999..2c45eb4a7 100644 --- a/plugin/input/file/worker_test.go +++ b/plugin/input/file/worker_test.go @@ -27,7 +27,7 @@ func (i *inputerMock) IncReadOps() {} func (i *inputerMock) IncMaxEventSizeExceeded() {} -func (i *inputerMock) In(_ pipeline.SourceID, _ string, _ int64, data []byte, _ bool, _ metadata.MetaData) uint64 { +func (i *inputerMock) In(_ pipeline.SourceID, _ string, _ pipeline.Offsets, data []byte, _ bool, _ metadata.MetaData) uint64 { i.gotData = append(i.gotData, string(data)) return 0 } diff --git a/plugin/input/http/http.go b/plugin/input/http/http.go index bb65825ef..750e49d8e 100644 --- a/plugin/input/http/http.go +++ b/plugin/input/http/http.go @@ -550,10 +550,10 @@ func (p *Plugin) processChunk(sourceID pipeline.SourceID, readBuff []byte, event if len(eventBuff) != 0 { eventBuff = append(eventBuff, readBuff[nlPos:pos]...) - _ = p.controller.In(sourceID, "http", int64(pos), eventBuff, true, meta) + _ = p.controller.In(sourceID, "http", Offset(int64(pos)), eventBuff, true, meta) eventBuff = eventBuff[:0] } else { - _ = p.controller.In(sourceID, "http", int64(pos), readBuff[nlPos:pos], true, meta) + _ = p.controller.In(sourceID, "http", Offset(int64(pos)), readBuff[nlPos:pos], true, meta) } pos++ @@ -562,7 +562,7 @@ func (p *Plugin) processChunk(sourceID pipeline.SourceID, readBuff []byte, event if isLastChunk { // flush buffers if we can't find the newline character - _ = p.controller.In(sourceID, "http", int64(pos), append(eventBuff, readBuff[nlPos:]...), true, meta) + _ = p.controller.In(sourceID, "http", Offset(int64(pos)), append(eventBuff, readBuff[nlPos:]...), true, meta) eventBuff = eventBuff[:0] } else { eventBuff = append(eventBuff, readBuff[nlPos:]...) @@ -684,3 +684,13 @@ func (m metaInformation) GetData() map[string]any { "params": m.params, } } + +type Offset int64 + +func (o Offset) Current() int64 { + return int64(o) +} + +func (o Offset) ByStream(_ string) int64 { + panic("unimplemented") +} diff --git a/plugin/input/journalctl/journalctl.go b/plugin/input/journalctl/journalctl.go index 0461e717d..d6706445b 100644 --- a/plugin/input/journalctl/journalctl.go +++ b/plugin/input/journalctl/journalctl.go @@ -64,7 +64,7 @@ func (o *offsetInfo) set(cursor string) { } func (p *Plugin) Write(bytes []byte) (int, error) { - p.params.Controller.In(0, "journalctl", p.currentOffset, bytes, false, nil) + p.params.Controller.In(0, "journalctl", Offset(p.currentOffset), bytes, false, nil) p.currentOffset++ return len(bytes), nil } @@ -140,3 +140,13 @@ func (p *Plugin) Commit(event *pipeline.Event) { func (p *Plugin) PassEvent(event *pipeline.Event) bool { return true } + +type Offset int64 + +func (o Offset) Current() int64 { + return int64(o) +} + +func (o Offset) ByStream(_ string) int64 { + panic("unimplemented") +} diff --git a/plugin/input/k8s/k8s_test.go b/plugin/input/k8s/k8s_test.go index 9d6b42e78..a4081becc 100644 --- a/plugin/input/k8s/k8s_test.go +++ b/plugin/input/k8s/k8s_test.go @@ -91,7 +91,7 @@ func TestEnrichment(t *testing.T) { }) filename := getLogFilename("/k8s-logs", item) - input.In(0, filename, 0, []byte(`{"time":"time","log":"log\n"}`)) + input.In(0, filename, test.Offset(0), []byte(`{"time":"time","log":"log\n"}`)) wg.Wait() p.Stop() @@ -134,8 +134,8 @@ func TestAllowedLabels(t *testing.T) { wg.Done() }) - input.In(0, filename1, 0, []byte(`{"time":"time","log":"log\n"}`)) - input.In(0, filename2, 0, []byte(`{"time":"time","log":"log\n"}`)) + input.In(0, filename1, test.Offset(0), []byte(`{"time":"time","log":"log\n"}`)) + input.In(0, filename2, test.Offset(0), []byte(`{"time":"time","log":"log\n"}`)) wg.Wait() p.Stop() @@ -168,14 +168,14 @@ func TestK8SJoin(t *testing.T) { }) filename := getLogFilename("/k8s-logs", item) - input.In(0, filename, 10, []byte(`{"ts":"time","stream":"stdout","log":"one line log 1\n"}`)) - input.In(0, filename, 20, []byte(`{"ts":"time","stream":"stderr","log":"error "}`)) - input.In(0, filename, 30, []byte(`{"ts":"time","stream":"stdout","log":"this "}`)) - input.In(0, filename, 40, []byte(`{"ts":"time","stream":"stdout","log":"is "}`)) - input.In(0, filename, 50, []byte(`{"ts":"time","stream":"stdout","log":"joined "}`)) - input.In(0, filename, 60, []byte(`{"ts":"time","stream":"stdout","log":"log 2\n"}`)) - input.In(0, filename, 70, []byte(`{"ts":"time","stream":"stderr","log":"joined\n"}`)) - input.In(0, filename, 80, []byte(`{"ts":"time","stream":"stdout","log":"one line log 3\n"}`)) + input.In(0, filename, test.Offset(10), []byte(`{"ts":"time","stream":"stdout","log":"one line log 1\n"}`)) + input.In(0, filename, test.Offset(20), []byte(`{"ts":"time","stream":"stderr","log":"error "}`)) + input.In(0, filename, test.Offset(30), []byte(`{"ts":"time","stream":"stdout","log":"this "}`)) + input.In(0, filename, test.Offset(40), []byte(`{"ts":"time","stream":"stdout","log":"is "}`)) + input.In(0, filename, test.Offset(50), []byte(`{"ts":"time","stream":"stdout","log":"joined "}`)) + input.In(0, filename, test.Offset(60), []byte(`{"ts":"time","stream":"stdout","log":"log 2\n"}`)) + input.In(0, filename, test.Offset(70), []byte(`{"ts":"time","stream":"stderr","log":"joined\n"}`)) + input.In(0, filename, test.Offset(80), []byte(`{"ts":"time","stream":"stdout","log":"one line log 3\n"}`)) wg.Wait() p.Stop() diff --git a/plugin/input/kafka/consumer.go b/plugin/input/kafka/consumer.go index a372abd0a..595d7fbc1 100644 --- a/plugin/input/kafka/consumer.go +++ b/plugin/input/kafka/consumer.go @@ -138,8 +138,18 @@ func (pc *pconsumer) consume() { pc.logger.Error("can't render meta data", zap.Error(err)) } } - _ = pc.controller.In(sourceID, "kafka", offset, message.Value, true, metadataInfo) + _ = pc.controller.In(sourceID, "kafka", Offset(offset), message.Value, true, metadataInfo) } } } } + +type Offset int64 + +func (o Offset) Current() int64 { + return int64(o) +} + +func (o Offset) ByStream(_ string) int64 { + panic("unimplemented") +} diff --git a/test/file_base.go b/test/file_base.go index 2d20e9b3f..1bae14d35 100644 --- a/test/file_base.go +++ b/test/file_base.go @@ -21,7 +21,7 @@ func SendPack(t *testing.T, p *pipeline.Pipeline, msgs []Msg) int64 { t.Helper() var sent int64 = 0 for _, m := range msgs { - _ = p.In(0, "test", 0, m, false, nil) + _ = p.In(0, "test", Offset(0), m, false, nil) // count \n sent += int64(len(m)) + 1 } diff --git a/test/test.go b/test/test.go index 0b3ad7045..fbaac6a48 100644 --- a/test/test.go +++ b/test/test.go @@ -245,3 +245,13 @@ func NewConfig(config any, params map[string]int) any { return config } + +type Offset int64 + +func (o Offset) Current() int64 { + return int64(o) +} + +func (o Offset) ByStream(_ string) int64 { + panic("unimplemented") +} From c0b71e3a315181f212f35b056db6509a08ed2dec Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 4 Oct 2024 17:13:19 +0300 Subject: [PATCH 2/3] skip already committed stream --- pipeline/pipeline.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 28d705672..8097bdc7d 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -405,6 +405,13 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, byte // For example, for containerd this setting is called max_container_log_line_size // https://github.com/containerd/containerd/blob/f7f2be732159a411eae46b78bfdb479b133a823b/pkg/cri/config/config.go#L263-L266 if !row.IsPartial && p.settings.AntispamThreshold > 0 { + streamOffset := offset.ByStream(string(row.Stream)) + currentOffset := offset.Current() + + if currentOffset < streamOffset { + return EventSeqIDError + } + var checkSourceID any var checkSourceName string if p.settings.AntispamField == "" { From 083240e537270ba5c3c194c8c07609985cdf173a Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 4 Oct 2024 17:15:51 +0300 Subject: [PATCH 3/3] fix doc for fake input && linter && race on get offsets --- plugin/input/fake/README.md | 2 +- plugin/input/file/worker.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/plugin/input/fake/README.md b/plugin/input/fake/README.md index f6b047d32..a5e81da6b 100755 --- a/plugin/input/fake/README.md +++ b/plugin/input/fake/README.md @@ -4,7 +4,7 @@ It provides an API to test pipelines and other plugins. > No config params ### API description -``In(sourceID pipeline.SourceID, sourceName string, offset int64, bytes []byte)`` +``In(sourceID pipeline.SourceID, sourceName string, offset pipeline.Offsets, bytes []byte)`` It sends a test event into the pipeline. diff --git a/plugin/input/file/worker.go b/plugin/input/file/worker.go index aa37bb1f1..58f42ea7a 100644 --- a/plugin/input/file/worker.go +++ b/plugin/input/file/worker.go @@ -44,6 +44,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi sourceName := job.filename skipLine := job.shouldSkip.Load() lastOffset := job.curOffset + offsets := job.offsets if job.symlink != "" { sourceName = job.symlink } @@ -125,7 +126,7 @@ func (w *worker) work(controller inputer, jobProvider *jobProvider, readBufferSi } } - job.lastEventSeq = controller.In(sourceID, sourceName, Offset{lastOffset + scanned, job.offsets}, inBuf, isVirgin, metadataInfo) + job.lastEventSeq = controller.In(sourceID, sourceName, Offset{lastOffset + scanned, offsets}, inBuf, isVirgin, metadataInfo) } // restore the line buffer accumBuf = accumBuf[:0] @@ -205,7 +206,7 @@ type Offset struct { } func (o Offset) Current() int64 { - return int64(o.current) + return o.current } func (o Offset) ByStream(stream string) int64 {