From 8f7aa5da47df3c765d49dc54a7d796a108749e0a Mon Sep 17 00:00:00 2001 From: Lars Stegman Date: Mon, 16 Sep 2024 09:31:51 +0200 Subject: [PATCH] feat(parsers): support passing receive timestamp to parser --- parser.go | 7 +++++++ plugins/common/socket/datagram.go | 7 ++++++- plugins/common/socket/socket.go | 3 ++- plugins/common/socket/socket_test.go | 6 +++--- plugins/common/socket/stream.go | 9 +++++++-- plugins/inputs/socket_listener/socket_listener.go | 12 ++++++++++-- plugins/parsers/binary/parser.go | 10 +++++++++- 7 files changed, 44 insertions(+), 10 deletions(-) diff --git a/parser.go b/parser.go index 5d67de987e8c2..b06067cc6fdb9 100644 --- a/parser.go +++ b/parser.go @@ -1,5 +1,7 @@ package telegraf +import "time" + // Parser is an interface defining functions that a parser plugin must satisfy. type Parser interface { // Parse takes a byte buffer separated by newlines @@ -22,6 +24,11 @@ type Parser interface { SetDefaultTags(tags map[string]string) } +type ParserWithTimestamp interface { + ParseWithTimestamp(buf []byte, timestamp time.Time) ([]Metric, error) + ParseLineWithTimestamp(line string, timestamp time.Time) (Metric, error) +} + type ParserFunc func() (Parser, error) // ParserPlugin is an interface for plugins that are able to parse diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index e07b0f53164d8..b2555419cc440 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -12,6 +12,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -39,6 +40,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError) buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet for { n, src, err := l.conn.ReadFrom(buf) + receiveTime := time.Now() if err != nil { if !strings.HasSuffix(err.Error(), ": use of closed network connection") { if onError != nil { @@ -56,7 +58,10 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError) if l.path != "" { src = &net.UnixAddr{Name: l.path, Net: "unixgram"} } - onData(src, body) + + d := make([]byte, len(body)) + copy(d, body) + go onData(src, d, receiveTime) } }() } diff --git a/plugins/common/socket/socket.go b/plugins/common/socket/socket.go index 3582c16eb8713..2aa6db599268d 100644 --- a/plugins/common/socket/socket.go +++ b/plugins/common/socket/socket.go @@ -9,13 +9,14 @@ import ( "net/url" "regexp" "strings" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/config" tlsint "github.com/influxdata/telegraf/plugins/common/tls" ) -type CallbackData func(net.Addr, []byte) +type CallbackData func(net.Addr, []byte, time.Time) type CallbackConnection func(net.Addr, io.ReadCloser) type CallbackError func(error) diff --git a/plugins/common/socket/socket_test.go b/plugins/common/socket/socket_test.go index 5ced8c0f9a2ff..951c76aacc754 100644 --- a/plugins/common/socket/socket_test.go +++ b/plugins/common/socket/socket_test.go @@ -153,7 +153,7 @@ func TestListenData(t *testing.T) { require.NoError(t, parser.Init()) var acc testutil.Accumulator - onData := func(remote net.Addr, data []byte) { + onData := func(remote net.Addr, data []byte, timestamp time.Time) { m, err := parser.Parse(data) require.NoError(t, err) addr, _, err := net.SplitHostPort(remote.String()) @@ -450,7 +450,7 @@ func TestClosingConnections(t *testing.T) { require.NoError(t, parser.Init()) var acc testutil.Accumulator - onData := func(_ net.Addr, data []byte) { + onData := func(_ net.Addr, data []byte, timestamp time.Time) { m, err := parser.Parse(data) require.NoError(t, err) acc.AddMetrics(m) @@ -518,7 +518,7 @@ func TestMaxConnections(t *testing.T) { // Create callback var errs []error var mu sync.Mutex - onData := func(_ net.Addr, _ []byte) {} + onData := func(_ net.Addr, _ []byte, timestamp time.Time) {} onError := func(err error) { mu.Lock() errs = append(errs, err) diff --git a/plugins/common/socket/stream.go b/plugins/common/socket/stream.go index 4896bbc873357..999f44ae392d0 100644 --- a/plugins/common/socket/stream.go +++ b/plugins/common/socket/stream.go @@ -330,12 +330,16 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error { break } + receiveTime := time.Now() src := conn.RemoteAddr() if l.path != "" { src = &net.UnixAddr{Name: l.path, Net: "unix"} } + data := scanner.Bytes() - onData(src, data) + d := make([]byte, len(data)) + copy(d, data) + go onData(src, d, receiveTime) } if err := scanner.Err(); err != nil { @@ -379,7 +383,8 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error { if err != nil { return fmt.Errorf("read on %s failed: %w", src, err) } - onData(src, buf) + + go onData(src, buf, time.Now()) return nil } diff --git a/plugins/inputs/socket_listener/socket_listener.go b/plugins/inputs/socket_listener/socket_listener.go index b4783afc1af86..269f01d58f329 100644 --- a/plugins/inputs/socket_listener/socket_listener.go +++ b/plugins/inputs/socket_listener/socket_listener.go @@ -6,6 +6,7 @@ import ( _ "embed" "net" "sync" + "time" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/internal" @@ -52,8 +53,15 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) { func (sl *SocketListener) Start(acc telegraf.Accumulator) error { // Create the callbacks for parsing the data and recording issues - onData := func(_ net.Addr, data []byte) { - metrics, err := sl.parser.Parse(data) + onData := func(_ net.Addr, data []byte, receiveTime time.Time) { + var metrics []telegraf.Metric + var err error + if tp, ok := sl.parser.(telegraf.ParserWithTimestamp); ok { + metrics, err = tp.ParseWithTimestamp(data, receiveTime) + } else { + metrics, err = sl.parser.Parse(data) + } + if err != nil { acc.AddError(err) return diff --git a/plugins/parsers/binary/parser.go b/plugins/parsers/binary/parser.go index 9e417f63d86b3..08fdbdbc5dcdc 100644 --- a/plugins/parsers/binary/parser.go +++ b/plugins/parsers/binary/parser.go @@ -73,6 +73,10 @@ func (p *Parser) Init() error { func (p *Parser) Parse(data []byte) ([]telegraf.Metric, error) { t := time.Now() + return p.ParseWithTimestamp(data, t) +} + +func (p *Parser) ParseWithTimestamp(data []byte, t time.Time) ([]telegraf.Metric, error) { // If the data is encoded in HEX, we need to decode it first buf := data @@ -122,7 +126,11 @@ func (p *Parser) Parse(data []byte) ([]telegraf.Metric, error) { } func (p *Parser) ParseLine(line string) (telegraf.Metric, error) { - metrics, err := p.Parse([]byte(line)) + return p.ParseLineWithTimestamp(line, time.Now()) +} + +func (p *Parser) ParseLineWithTimestamp(line string, timestamp time.Time) (telegraf.Metric, error) { + metrics, err := p.ParseWithTimestamp([]byte(line), timestamp) if err != nil { return nil, err }