Skip to content

Commit

Permalink
feat(parsers): support passing receive timestamp to parser
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsStegman committed Sep 16, 2024
1 parent ffee74c commit 8f7aa5d
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 10 deletions.
7 changes: 7 additions & 0 deletions parser.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package telegraf

import "time"

// Parser is an interface defining functions that a parser plugin must satisfy.
type Parser interface {
// Parse takes a byte buffer separated by newlines
Expand All @@ -22,6 +24,11 @@ type Parser interface {
SetDefaultTags(tags map[string]string)
}

type ParserWithTimestamp interface {
ParseWithTimestamp(buf []byte, timestamp time.Time) ([]Metric, error)
ParseLineWithTimestamp(line string, timestamp time.Time) (Metric, error)
}

type ParserFunc func() (Parser, error)

// ParserPlugin is an interface for plugins that are able to parse
Expand Down
7 changes: 6 additions & 1 deletion plugins/common/socket/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand Down Expand Up @@ -39,6 +40,7 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
buf := make([]byte, 64*1024) // 64kb - maximum size of IP packet
for {
n, src, err := l.conn.ReadFrom(buf)
receiveTime := time.Now()
if err != nil {
if !strings.HasSuffix(err.Error(), ": use of closed network connection") {
if onError != nil {
Expand All @@ -56,7 +58,10 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}
onData(src, body)

d := make([]byte, len(body))
copy(d, body)
go onData(src, d, receiveTime)
}
}()
}
Expand Down
3 changes: 2 additions & 1 deletion plugins/common/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"net/url"
"regexp"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
tlsint "github.com/influxdata/telegraf/plugins/common/tls"
)

type CallbackData func(net.Addr, []byte)
type CallbackData func(net.Addr, []byte, time.Time)
type CallbackConnection func(net.Addr, io.ReadCloser)
type CallbackError func(error)

Expand Down
6 changes: 3 additions & 3 deletions plugins/common/socket/socket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestListenData(t *testing.T) {
require.NoError(t, parser.Init())

var acc testutil.Accumulator
onData := func(remote net.Addr, data []byte) {
onData := func(remote net.Addr, data []byte, timestamp time.Time) {
m, err := parser.Parse(data)
require.NoError(t, err)
addr, _, err := net.SplitHostPort(remote.String())
Expand Down Expand Up @@ -450,7 +450,7 @@ func TestClosingConnections(t *testing.T) {
require.NoError(t, parser.Init())

var acc testutil.Accumulator
onData := func(_ net.Addr, data []byte) {
onData := func(_ net.Addr, data []byte, timestamp time.Time) {
m, err := parser.Parse(data)
require.NoError(t, err)
acc.AddMetrics(m)
Expand Down Expand Up @@ -518,7 +518,7 @@ func TestMaxConnections(t *testing.T) {
// Create callback
var errs []error
var mu sync.Mutex
onData := func(_ net.Addr, _ []byte) {}
onData := func(_ net.Addr, _ []byte, timestamp time.Time) {}
onError := func(err error) {
mu.Lock()
errs = append(errs, err)
Expand Down
9 changes: 7 additions & 2 deletions plugins/common/socket/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,16 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
break
}

receiveTime := time.Now()
src := conn.RemoteAddr()
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unix"}
}

data := scanner.Bytes()
onData(src, data)
d := make([]byte, len(data))
copy(d, data)
go onData(src, d, receiveTime)
}

if err := scanner.Err(); err != nil {
Expand Down Expand Up @@ -379,7 +383,8 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error {
if err != nil {
return fmt.Errorf("read on %s failed: %w", src, err)
}
onData(src, buf)

go onData(src, buf, time.Now())

return nil
}
Expand Down
12 changes: 10 additions & 2 deletions plugins/inputs/socket_listener/socket_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "embed"
"net"
"sync"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
Expand Down Expand Up @@ -52,8 +53,15 @@ func (sl *SocketListener) SetParser(parser telegraf.Parser) {

func (sl *SocketListener) Start(acc telegraf.Accumulator) error {
// Create the callbacks for parsing the data and recording issues
onData := func(_ net.Addr, data []byte) {
metrics, err := sl.parser.Parse(data)
onData := func(_ net.Addr, data []byte, receiveTime time.Time) {
var metrics []telegraf.Metric
var err error
if tp, ok := sl.parser.(telegraf.ParserWithTimestamp); ok {
metrics, err = tp.ParseWithTimestamp(data, receiveTime)
} else {
metrics, err = sl.parser.Parse(data)
}

if err != nil {
acc.AddError(err)
return
Expand Down
10 changes: 9 additions & 1 deletion plugins/parsers/binary/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func (p *Parser) Init() error {

func (p *Parser) Parse(data []byte) ([]telegraf.Metric, error) {
t := time.Now()
return p.ParseWithTimestamp(data, t)
}

func (p *Parser) ParseWithTimestamp(data []byte, t time.Time) ([]telegraf.Metric, error) {

// If the data is encoded in HEX, we need to decode it first
buf := data
Expand Down Expand Up @@ -122,7 +126,11 @@ func (p *Parser) Parse(data []byte) ([]telegraf.Metric, error) {
}

func (p *Parser) ParseLine(line string) (telegraf.Metric, error) {
metrics, err := p.Parse([]byte(line))
return p.ParseLineWithTimestamp(line, time.Now())
}

func (p *Parser) ParseLineWithTimestamp(line string, timestamp time.Time) (telegraf.Metric, error) {
metrics, err := p.ParseWithTimestamp([]byte(line), timestamp)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 8f7aa5d

Please sign in to comment.