diff --git a/plugins/common/socket/datagram.go b/plugins/common/socket/datagram.go index e07b0f53164d8..b7d55e38a4a99 100644 --- a/plugins/common/socket/datagram.go +++ b/plugins/common/socket/datagram.go @@ -13,7 +13,10 @@ import ( "strings" "sync" + "github.com/alitto/pond" + "github.com/influxdata/telegraf" + "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/internal" ) @@ -24,10 +27,19 @@ type packetListener struct { ReadBufferSize int Log telegraf.Logger - conn net.PacketConn - decoder internal.ContentDecoder - path string - wg sync.WaitGroup + conn net.PacketConn + decoders sync.Pool + path string + wg sync.WaitGroup + parsePool *pond.WorkerPool +} + +func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int) *packetListener { + return &packetListener{ + Encoding: encoding, + MaxDecompressionSize: int64(maxDecompressionSize), + parsePool: pond.New(maxWorkers, 0, pond.MinWorkers(maxWorkers/2+1)), + } } func (l *packetListener) listenData(onData CallbackData, onError CallbackError) { @@ -48,15 +60,22 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError) break } - body, err := l.decoder.Decode(buf[:n]) - if err != nil && onError != nil { - onError(fmt.Errorf("unable to decode incoming packet: %w", err)) - } + d := make([]byte, n) + copy(d, buf[:n]) + l.parsePool.Submit(func() { + decoder := l.decoders.Get().(internal.ContentDecoder) + defer l.decoders.Put(decoder) + body, err := decoder.Decode(d) + if err != nil && onError != nil { + onError(fmt.Errorf("unable to decode incoming packet: %w", err)) + } - if l.path != "" { - src = &net.UnixAddr{Name: l.path, Net: "unixgram"} - } - onData(src, body) + if l.path != "" { + src = &net.UnixAddr{Name: l.path, Net: "unixgram"} + } + + onData(src, body) + }) } }() } @@ -80,26 +99,34 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr break } - // Decode the contents depending on the given encoding - body, err := l.decoder.Decode(buf[:n]) - if err != nil && onError != nil { - onError(fmt.Errorf("unable to decode incoming packet: %w", err)) - } + d := make([]byte, n) + copy(d, buf[:n]) + l.parsePool.Submit(func() { + // Decode the contents depending on the given encoding + decoder := l.decoders.Get().(internal.ContentDecoder) + // Not possible to immediately return the decoder to the Pool after calling Decode, because some + // decoders return a reference to their internal buffers. This would cause data races. + defer l.decoders.Put(decoder) + body, err := decoder.Decode(d[:n]) + if err != nil && onError != nil { + onError(fmt.Errorf("unable to decode incoming packet: %w", err)) + } - // Workaround to provide remote endpoints for Unix-type sockets - if l.path != "" { - src = &net.UnixAddr{Name: l.path, Net: "unixgram"} - } + // Workaround to provide remote endpoints for Unix-type sockets + if l.path != "" { + src = &net.UnixAddr{Name: l.path, Net: "unixgram"} + } - // Create a pipe and notify the caller via Callback that new data is - // available. Afterwards write the data. Please note: Write() will - // blocks until all data is consumed! - reader, writer := io.Pipe() - go onConnection(src, reader) - if _, err := writer.Write(body); err != nil && onError != nil { - onError(err) - } - writer.Close() + // Create a pipe and notify the caller via Callback that new data is + // available. Afterwards write the data. Please note: Write() will + // block until all data is consumed! + reader, writer := io.Pipe() + go onConnection(src, reader) + if _, err := writer.Write(body); err != nil && onError != nil { + onError(err) + } + writer.Close() + }) } }() } @@ -133,18 +160,7 @@ func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error { } } - // Create a decoder for the given encoding - var options []internal.DecodingOption - if l.MaxDecompressionSize > 0 { - options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize)) - } - decoder, err := internal.NewContentDecoder(l.Encoding, options...) - if err != nil { - return fmt.Errorf("creating decoder failed: %w", err) - } - l.decoder = decoder - - return nil + return l.setupDecoder() } func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) error { @@ -179,20 +195,9 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err l.Log.Warnf("Setting read buffer on %s socket failed: %v", u.Scheme, err) } } - l.conn = conn - - // Create a decoder for the given encoding - var options []internal.DecodingOption - if l.MaxDecompressionSize > 0 { - options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize)) - } - decoder, err := internal.NewContentDecoder(l.Encoding, options...) - if err != nil { - return fmt.Errorf("creating decoder failed: %w", err) - } - l.decoder = decoder - return nil + l.conn = conn + return l.setupDecoder() } func (l *packetListener) setupIP(u *url.URL) error { @@ -200,18 +205,27 @@ func (l *packetListener) setupIP(u *url.URL) error { if err != nil { return fmt.Errorf("listening (ip) failed: %w", err) } + l.conn = conn + return l.setupDecoder() +} +func (l *packetListener) setupDecoder() error { // Create a decoder for the given encoding var options []internal.DecodingOption if l.MaxDecompressionSize > 0 { options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize)) } - decoder, err := internal.NewContentDecoder(l.Encoding, options...) - if err != nil { - return fmt.Errorf("creating decoder failed: %w", err) - } - l.decoder = decoder + + l.decoders = sync.Pool{New: func() any { + decoder, err := internal.NewContentDecoder(l.Encoding, options...) + if err != nil { + l.Log.Errorf("creating decoder failed: %v", err) + return nil + } + + return decoder + }} return nil } @@ -237,5 +251,7 @@ func (l *packetListener) close() error { } } + l.parsePool.StopAndWait() + return nil } diff --git a/plugins/common/socket/socket.go b/plugins/common/socket/socket.go index 482301c3c155f..0a138415e5e06 100644 --- a/plugins/common/socket/socket.go +++ b/plugins/common/socket/socket.go @@ -34,6 +34,7 @@ type Config struct { SocketMode string `toml:"socket_mode"` ContentEncoding string `toml:"content_encoding"` MaxDecompressionSize config.Size `toml:"max_decompression_size"` + MaxParallelParsers int `toml:"max_parallel_parsers"` common_tls.ServerConfig } @@ -96,74 +97,54 @@ func (cfg *Config) NewSocket(address string, splitcfg *SplitConfig, logger teleg } func (s *Socket) Setup() error { + s.MaxParallelParsers = max(s.MaxParallelParsers, 1) switch s.url.Scheme { case "tcp", "tcp4", "tcp6": - l := &streamListener{ - ReadBufferSize: int(s.ReadBufferSize), - ReadTimeout: s.ReadTimeout, - KeepAlivePeriod: s.KeepAlivePeriod, - MaxConnections: s.MaxConnections, - Encoding: s.ContentEncoding, - Splitter: s.splitter, - Log: s.log, - } + l := newStreamListener( + s.Config, + s.splitter, + s.log, + ) if err := l.setupTCP(s.url, s.tlsCfg); err != nil { return err } s.listener = l case "unix", "unixpacket": - l := &streamListener{ - ReadBufferSize: int(s.ReadBufferSize), - ReadTimeout: s.ReadTimeout, - KeepAlivePeriod: s.KeepAlivePeriod, - MaxConnections: s.MaxConnections, - Encoding: s.ContentEncoding, - Splitter: s.splitter, - Log: s.log, - } + l := newStreamListener( + s.Config, + s.splitter, + s.log, + ) if err := l.setupUnix(s.url, s.tlsCfg, s.SocketMode); err != nil { return err } s.listener = l case "udp", "udp4", "udp6": - l := &packetListener{ - Encoding: s.ContentEncoding, - MaxDecompressionSize: int64(s.MaxDecompressionSize), - } + l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers) if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil { return err } s.listener = l case "ip", "ip4", "ip6": - l := &packetListener{ - Encoding: s.ContentEncoding, - MaxDecompressionSize: int64(s.MaxDecompressionSize), - } + l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers) if err := l.setupIP(s.url); err != nil { return err } s.listener = l case "unixgram": - l := &packetListener{ - Encoding: s.ContentEncoding, - MaxDecompressionSize: int64(s.MaxDecompressionSize), - } + l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers) if err := l.setupUnixgram(s.url, s.SocketMode); err != nil { return err } s.listener = l case "vsock": - l := &streamListener{ - ReadBufferSize: int(s.ReadBufferSize), - ReadTimeout: s.ReadTimeout, - KeepAlivePeriod: s.KeepAlivePeriod, - MaxConnections: s.MaxConnections, - Encoding: s.ContentEncoding, - Splitter: s.splitter, - Log: s.log, - } + l := newStreamListener( + s.Config, + s.splitter, + s.log, + ) if err := l.setupVsock(s.url); err != nil { return err diff --git a/plugins/common/socket/stream.go b/plugins/common/socket/stream.go index 4896bbc873357..8b497e6bfd08f 100644 --- a/plugins/common/socket/stream.go +++ b/plugins/common/socket/stream.go @@ -19,6 +19,7 @@ import ( "syscall" "time" + "github.com/alitto/pond" "github.com/mdlayher/vsock" "github.com/influxdata/telegraf" @@ -43,11 +44,29 @@ type streamListener struct { connections uint64 path string cancel context.CancelFunc + parsePool *pond.WorkerPool wg sync.WaitGroup sync.Mutex } +func newStreamListener(conf Config, splitter bufio.SplitFunc, log telegraf.Logger) *streamListener { + return &streamListener{ + ReadBufferSize: int(conf.ReadBufferSize), + ReadTimeout: conf.ReadTimeout, + KeepAlivePeriod: conf.KeepAlivePeriod, + MaxConnections: conf.MaxConnections, + Encoding: conf.ContentEncoding, + Splitter: splitter, + Log: log, + + parsePool: pond.New( + conf.MaxParallelParsers, + 0, + pond.MinWorkers(conf.MaxParallelParsers/2+1)), + } +} + func (l *streamListener) setupTCP(u *url.URL, tlsCfg *tls.Config) error { var err error if tlsCfg == nil { @@ -216,6 +235,9 @@ func (l *streamListener) close() error { return err } } + + l.parsePool.StopAndWait() + return nil } @@ -334,8 +356,13 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error { if l.path != "" { src = &net.UnixAddr{Name: l.path, Net: "unix"} } + data := scanner.Bytes() - onData(src, data) + d := make([]byte, len(data)) + copy(d, data) + l.parsePool.Submit(func() { + onData(src, d) + }) } if err := scanner.Err(); err != nil { @@ -379,7 +406,10 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error { if err != nil { return fmt.Errorf("read on %s failed: %w", src, err) } - onData(src, buf) + + l.parsePool.Submit(func() { + onData(src, buf) + }) return nil }