Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline in offset #683

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
type finalizeFn = func(event *Event, notifyInput bool, backEvent bool)

type InputPluginController interface {
In(sourceID SourceID, sourceName string, offset int64, data []byte, isNewSource bool, meta metadata.MetaData) uint64
In(sourceID SourceID, sourceName string, offset Offsets, data []byte, isNewSource bool, meta metadata.MetaData) uint64
UseSpread() // don't use stream field and spread all events across all processors
DisableStreams() // don't use stream field
SuggestDecoder(t decoder.Type) // set decoder type if pipeline uses "auto" value for decoder
Expand Down Expand Up @@ -354,8 +354,13 @@ func (p *Pipeline) GetOutput() OutputPlugin {
return p.output
}

type Offsets interface {
Current() int64
ByStream(stream string) int64
}

// In decodes message and passes it to event stream.
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
func (p *Pipeline) In(sourceID SourceID, sourceName string, offset Offsets, bytes []byte, isNewSource bool, meta metadata.MetaData) (seqID uint64) {
length := len(bytes)

// don't process mud.
Expand Down Expand Up @@ -386,7 +391,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
row, err = decoder.DecodeCRI(bytes)
if err != nil {
p.wrongEventCRIFormatMetric.Inc()
p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset, length, err.Error(), sourceID, sourceName, bytes))
p.Error(fmt.Sprintf("wrong cri format offset=%d, length=%d, err=%s, source=%d:%s, cri=%s", offset.Current(), length, err.Error(), sourceID, sourceName, bytes))
return EventSeqIDError
}
}
Expand All @@ -400,6 +405,13 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
// For example, for containerd this setting is called max_container_log_line_size
// https://github.com/containerd/containerd/blob/f7f2be732159a411eae46b78bfdb479b133a823b/pkg/cri/config/config.go#L263-L266
if !row.IsPartial && p.settings.AntispamThreshold > 0 {
streamOffset := offset.ByStream(string(row.Stream))
currentOffset := offset.Current()

if currentOffset < streamOffset {
return EventSeqIDError
}

var checkSourceID any
var checkSourceName string
if p.settings.AntispamField == "" {
Expand Down Expand Up @@ -447,7 +459,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
}

p.logger.Log(level, "wrong json format", zap.Error(err),
zap.Int64("offset", offset),
zap.Int64("offset", offset.Current()),
zap.Int("length", length),
zap.Uint64("source", uint64(sourceID)),
zap.String("source_name", sourceName),
Expand All @@ -470,7 +482,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
err := decoder.DecodePostgres(event.Root, bytes)
if err != nil {
p.logger.Fatal("wrong postgres format", zap.Error(err),
zap.Int64("offset", offset),
zap.Int64("offset", offset.Current()),
zap.Int("length", length),
zap.Uint64("source", uint64(sourceID)),
zap.String("source_name", sourceName),
Expand All @@ -489,7 +501,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
}

p.logger.Log(level, "wrong nginx error log format", zap.Error(err),
zap.Int64("offset", offset),
zap.Int64("offset", offset.Current()),
zap.Int("length", length),
zap.Uint64("source", uint64(sourceID)),
zap.String("source_name", sourceName),
Expand All @@ -503,7 +515,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
err = p.decoder.Decode(event.Root, bytes)
if err != nil {
p.logger.Fatal("wrong protobuf format", zap.Error(err),
zap.Int64("offset", offset),
zap.Int64("offset", offset.Current()),
zap.Int("length", length),
zap.Uint64("source", uint64(sourceID)),
zap.String("source_name", sourceName),
Expand Down Expand Up @@ -533,7 +545,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offset int64, bytes
}
}

event.Offset = offset
event.Offset = offset.Current()
event.SourceID = sourceID
event.SourceName = sourceName
event.streamName = DefaultStreamName
Expand Down
5 changes: 3 additions & 2 deletions pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/plugin/input/fake"
"github.com/ozontech/file.d/test"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -41,7 +42,7 @@ func TestInUnparsableMessages(t *testing.T) {

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(sourceID, "kafka", offset, message, false, nil)
seqID := pipe.In(sourceID, "kafka", test.Offset(offset), message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)

refPipe := reflect.ValueOf(pipe)
Expand Down Expand Up @@ -119,7 +120,7 @@ func TestInInvalidMessages(t *testing.T) {

pipe.SetInput(getFakeInputInfo())

seqID := pipe.In(tCase.sourceID, "kafka", tCase.offset, tCase.message, false, nil)
seqID := pipe.In(tCase.sourceID, "kafka", test.Offset(tCase.offset), tCase.message, false, nil)
require.Equal(t, pipeline.EventSeqIDError, seqID)
})
}
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/add_file_name/add_file_name_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, sourceName, 0, []byte(`{"error":"info about error"}`))
input.In(0, sourceName, 0, []byte(`{"file":"not_my_file"}`))
input.In(0, sourceName, test.Offset(0), []byte(`{"error":"info about error"}`))
input.In(0, sourceName, test.Offset(0), []byte(`{"file":"not_my_file"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/add_host/add_host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{}`))
input.In(0, "test.log", test.Offset(0), []byte(`{}`))

wg.Wait()
p.Stop()
Expand Down
8 changes: 4 additions & 4 deletions plugin/action/convert_date/convert_date_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ func TestConvert(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"time":998578502}`))
input.In(0, "test.log", 0, []byte(`{"time":998578999.1346}`))
input.In(0, "test.log", 0, []byte(`{"time":"2022/02/07 13:06:14"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578502}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":998578999.1346}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":"2022/02/07 13:06:14"}`))

wg.Wait()
p.Stop()
Expand All @@ -52,7 +52,7 @@ func TestConvertFail(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"time":"XXX"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"time":"XXX"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/convert_log_level/convert_log_level_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestDo(t *testing.T) {
})

for _, log := range tc.In {
input.In(0, "test.log", 0, []byte(log))
input.In(0, "test.log", test.Offset(0), []byte(log))
}

now := time.Now()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestConvertUTF8Bytes(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(tt.in))
input.In(0, "test.log", test.Offset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/discard/discard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ func TestDiscard(t *testing.T) {
})

for _, e := range tt.passEvents {
input.In(0, "test", 0, []byte(e))
input.In(0, "test", test.Offset(0), []byte(e))
}
for _, e := range tt.discardEvents {
input.In(0, "test", 0, []byte(e))
input.In(0, "test", test.Offset(0), []byte(e))
}

wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/flatten/flatten_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestFlatten(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"complex":{"a":"b","c":"d"}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"complex":{"a":"b","c":"d"}}`))

wg.Wait()
p.Stop()
Expand Down
4 changes: 2 additions & 2 deletions plugin/action/join/join_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestSimpleJoin(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", int64(i*10000+m), []byte(line))
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
}
}

Expand Down Expand Up @@ -271,7 +271,7 @@ func TestJoinAfterNilNode(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", int64(i*10000+m), []byte(line))
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
}
}

Expand Down
4 changes: 2 additions & 2 deletions plugin/action/join_template/join_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ func TestSimpleJoin(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", int64(i*10000+m), []byte(line))
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
}
}

Expand Down Expand Up @@ -534,7 +534,7 @@ func TestJoinAfterNilNode(t *testing.T) {

for i := 0; i < tt.iterations; i++ {
for m, line := range lines {
input.In(0, "test.log", int64(i*10000+m), []byte(line))
input.In(0, "test.log", test.Offset(int64(i*10000+m)), []byte(line))
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_decode/json_decode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestDecode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"log":"{\"field2\":\"value2\",\"field3\":\"value3\"}"}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_encode/json_encode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestEncode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"server":{"os":"linux","arch":"amd64"}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"server":{"os":"linux","arch":"amd64"}}`))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/json_extract/json_extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestJsonExtract(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(tt.in))
input.In(0, "test.log", test.Offset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/keep_fields/keep_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ func TestKeepFields(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"field_1":"value_1","a":"b"}`))
input.In(0, "test.log", 0, []byte(`{"field_2":"value_2","b":"c"}`))
input.In(0, "test.log", 0, []byte(`{"field_3":"value_3","a":"b"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1","a":"b"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2","b":"c"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3","a":"b"}`))

wg.Wait()
p.Stop()
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/mask/mask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,7 +826,7 @@ func TestPlugin(t *testing.T) {
})

for _, in := range s.input {
input.In(0, "test.log", 0, []byte(in))
input.In(0, "test.log", test.Offset(0), []byte(in))
}

wg.Wait()
Expand Down Expand Up @@ -899,7 +899,7 @@ func TestWithEmptyRegex(t *testing.T) {
})

for _, in := range s.input {
input.In(0, "test.log", 0, []byte(in))
input.In(0, "test.log", test.Offset(0), []byte(in))
}

wg.Wait()
Expand Down Expand Up @@ -1090,7 +1090,7 @@ func TestPluginWithComplexMasks(t *testing.T) {
})

for _, in := range s.input {
input.In(0, "test.log", 0, []byte(in))
input.In(0, "test.log", test.Offset(0), []byte(in))
}

wg.Wait()
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/modify/modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestModify(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"existing_field":"existing_value","my_object":{"field":{"subfield":"subfield_value"}}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"existing_field":"existing_value","my_object":{"field":{"subfield":"subfield_value"}}}`))

wg.Wait()
p.Stop()
Expand Down Expand Up @@ -75,7 +75,7 @@ func TestModifyRegex(t *testing.T) {
wg.Add(len(testEvents))

for _, te := range testEvents {
input.In(0, "test.log", 0, te.in)
input.In(0, "test.log", test.Offset(0), te.in)
}

wg.Wait()
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestModifyTrim(t *testing.T) {
wg.Add(len(testEvents))

for _, te := range testEvents {
input.In(0, "test.log", 0, te.in)
input.In(0, "test.log", test.Offset(0), te.in)
}

wg.Wait()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/move/move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestMove(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(tt.in))
input.In(0, "test.log", test.Offset(0), []byte(tt.in))

wg.Wait()
p.Stop()
Expand Down
2 changes: 1 addition & 1 deletion plugin/action/parse_es/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestPipeline(t *testing.T) {
})

for _, event := range tCase.eventsIn {
input.In(event.sourceID, event.sourceName, event.offset, event.bytes)
input.In(event.sourceID, event.sourceName, test.Offset(event.offset), event.bytes)
}

wg.Wait()
Expand Down
6 changes: 3 additions & 3 deletions plugin/action/parse_re2/parse_re2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestDecode(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"log":"2021-06-22 16:24:27 GMT [7291] => [2-1] client=test_client,db=test_db,user=test_user LOG: listening on IPv4 address \"0.0.0.0\", port 5432"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"log":"2021-06-22 16:24:27 GMT [7291] => [2-1] client=test_client,db=test_db,user=test_user LOG: listening on IPv4 address \"0.0.0.0\", port 5432"}`))

wg.Wait()
p.Stop()
Expand All @@ -46,8 +46,8 @@ func TestDecodeAccessLogsJira(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"message":"10.115.195.13 0x51320775x2 jira_robot [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1\" 200 198 20 \"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365\" \"Apache-HttpClient/4.5.13 (Java/11.0.9)\" \"nj56zg\""}`))
input.In(0, "test.log", 0, []byte(`{"message":"10.115.195.12 0x51320774x2 ezabelin [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1\" 201 158 15 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"1tmznt9\""}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"message":"10.115.195.13 0x51320775x2 jira_robot [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/FRAUD-3847?fields=resolution HTTP/1.1\" 200 198 20 \"https://jit.o3.ru/secure/RapidBoard.jspa?rapidView=2701&selectedIssue=EXPC-3767&quickFilter=16465&quickFilter=15365\" \"Apache-HttpClient/4.5.13 (Java/11.0.9)\" \"nj56zg\""}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"message":"10.115.195.12 0x51320774x2 ezabelin [07/Nov/2022:00:00:00 +0300] \"GET /rest/api/2/issue/RP-4977?fields=resolution HTTP/1.1\" 201 158 15 \"-\" \"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/106.0.0.0 Safari/537.36\" \"1tmznt9\""}`))

wg.Wait()
p.Stop()
Expand Down
12 changes: 6 additions & 6 deletions plugin/action/remove_fields/remove_fields_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ func TestRemoveFields(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"field_1":"value_1","a":"b"}`))
input.In(0, "test.log", 0, []byte(`{"field_2":"value_2","b":"c"}`))
input.In(0, "test.log", 0, []byte(`{"field_3":"value_3","a":"b"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_1":"value_1","a":"b"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_2":"value_2","b":"c"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"field_3":"value_3","a":"b"}`))

wg.Wait()
p.Stop()
Expand All @@ -47,9 +47,9 @@ func TestRemoveNestedFields(t *testing.T) {
wg.Done()
})

input.In(0, "test.log", 0, []byte(`{"a":"some"}`))
input.In(0, "test.log", 0, []byte(`{"a":{"b":"deleted"}}`))
input.In(0, "test.log", 0, []byte(`{"a":{"b":{"c":["deleted"]},"d":"saved"}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"a":"some"}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"a":{"b":"deleted"}}`))
input.In(0, "test.log", test.Offset(0), []byte(`{"a":{"b":{"c":["deleted"]},"d":"saved"}}`))

wg.Wait()
p.Stop()
Expand Down
Loading
Loading