Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common.socket): Allow parallel parsing with a pool of workers #15891

Merged
merged 10 commits into from
Oct 9, 2024
136 changes: 76 additions & 60 deletions plugins/common/socket/datagram.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"strings"
"sync"

"github.com/alitto/pond"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/config"
"github.com/influxdata/telegraf/internal"
)

Expand All @@ -24,10 +27,19 @@ type packetListener struct {
ReadBufferSize int
Log telegraf.Logger

conn net.PacketConn
decoder internal.ContentDecoder
path string
wg sync.WaitGroup
conn net.PacketConn
decoders sync.Pool
path string
wg sync.WaitGroup
parsePool *pond.WorkerPool
}

func newPacketListener(encoding string, maxDecompressionSize config.Size, maxWorkers int) *packetListener {
return &packetListener{
Encoding: encoding,
MaxDecompressionSize: int64(maxDecompressionSize),
parsePool: pond.New(maxWorkers, 0, pond.MinWorkers(maxWorkers/2+1)),
}
}

func (l *packetListener) listenData(onData CallbackData, onError CallbackError) {
Expand All @@ -48,15 +60,22 @@ func (l *packetListener) listenData(onData CallbackData, onError CallbackError)
break
}

body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}
d := make([]byte, n)
copy(d, buf[:n])
l.parsePool.Submit(func() {
decoder := l.decoders.Get().(internal.ContentDecoder)
defer l.decoders.Put(decoder)
body, err := decoder.Decode(d)
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}

if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}
onData(src, body)
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}

onData(src, body)
})
}
}()
}
Expand All @@ -80,26 +99,34 @@ func (l *packetListener) listenConnection(onConnection CallbackConnection, onErr
break
}

// Decode the contents depending on the given encoding
body, err := l.decoder.Decode(buf[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}
d := make([]byte, n)
copy(d, buf[:n])
LarsStegman marked this conversation as resolved.
Show resolved Hide resolved
l.parsePool.Submit(func() {
// Decode the contents depending on the given encoding
decoder := l.decoders.Get().(internal.ContentDecoder)
// Not possible to immediately return the decoder to the Pool after calling Decode, because some
// decoders return a reference to their internal buffers. This would cause data races.
defer l.decoders.Put(decoder)
body, err := decoder.Decode(d[:n])
if err != nil && onError != nil {
onError(fmt.Errorf("unable to decode incoming packet: %w", err))
}

// Workaround to provide remote endpoints for Unix-type sockets
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}
// Workaround to provide remote endpoints for Unix-type sockets
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unixgram"}
}

// Create a pipe and notify the caller via Callback that new data is
// available. Afterwards write the data. Please note: Write() will
// blocks until all data is consumed!
reader, writer := io.Pipe()
go onConnection(src, reader)
if _, err := writer.Write(body); err != nil && onError != nil {
onError(err)
}
writer.Close()
// Create a pipe and notify the caller via Callback that new data is
// available. Afterwards write the data. Please note: Write() will
// block until all data is consumed!
reader, writer := io.Pipe()
go onConnection(src, reader)
if _, err := writer.Write(body); err != nil && onError != nil {
onError(err)
}
writer.Close()
})
}
}()
}
Expand Down Expand Up @@ -133,18 +160,7 @@ func (l *packetListener) setupUnixgram(u *url.URL, socketMode string) error {
}
}

// Create a decoder for the given encoding
var options []internal.DecodingOption
if l.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
}
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)
}
l.decoder = decoder

return nil
return l.setupDecoder()
}

func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) error {
Expand Down Expand Up @@ -179,39 +195,37 @@ func (l *packetListener) setupUDP(u *url.URL, ifname string, bufferSize int) err
l.Log.Warnf("Setting read buffer on %s socket failed: %v", u.Scheme, err)
}
}
l.conn = conn

// Create a decoder for the given encoding
var options []internal.DecodingOption
if l.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
}
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)
}
l.decoder = decoder

return nil
l.conn = conn
return l.setupDecoder()
}

func (l *packetListener) setupIP(u *url.URL) error {
conn, err := net.ListenPacket(u.Scheme, u.Host)
if err != nil {
return fmt.Errorf("listening (ip) failed: %w", err)
}

l.conn = conn
return l.setupDecoder()
}

func (l *packetListener) setupDecoder() error {
// Create a decoder for the given encoding
var options []internal.DecodingOption
if l.MaxDecompressionSize > 0 {
options = append(options, internal.WithMaxDecompressionSize(l.MaxDecompressionSize))
}
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
return fmt.Errorf("creating decoder failed: %w", err)
}
l.decoder = decoder

l.decoders = sync.Pool{New: func() any {
decoder, err := internal.NewContentDecoder(l.Encoding, options...)
if err != nil {
l.Log.Errorf("creating decoder failed: %v", err)
return nil
}

return decoder
}}

return nil
}
Expand All @@ -237,5 +251,7 @@ func (l *packetListener) close() error {
}
}

l.parsePool.StopAndWait()

return nil
}
59 changes: 20 additions & 39 deletions plugins/common/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type Config struct {
SocketMode string `toml:"socket_mode"`
ContentEncoding string `toml:"content_encoding"`
MaxDecompressionSize config.Size `toml:"max_decompression_size"`
MaxParallelParsers int `toml:"max_parallel_parsers"`
common_tls.ServerConfig
}

Expand Down Expand Up @@ -96,74 +97,54 @@ func (cfg *Config) NewSocket(address string, splitcfg *SplitConfig, logger teleg
}

func (s *Socket) Setup() error {
s.MaxParallelParsers = max(s.MaxParallelParsers, 1)
switch s.url.Scheme {
case "tcp", "tcp4", "tcp6":
l := &streamListener{
ReadBufferSize: int(s.ReadBufferSize),
ReadTimeout: s.ReadTimeout,
KeepAlivePeriod: s.KeepAlivePeriod,
MaxConnections: s.MaxConnections,
Encoding: s.ContentEncoding,
Splitter: s.splitter,
Log: s.log,
}
l := newStreamListener(
s.Config,
s.splitter,
s.log,
)

if err := l.setupTCP(s.url, s.tlsCfg); err != nil {
return err
}
s.listener = l
case "unix", "unixpacket":
l := &streamListener{
ReadBufferSize: int(s.ReadBufferSize),
ReadTimeout: s.ReadTimeout,
KeepAlivePeriod: s.KeepAlivePeriod,
MaxConnections: s.MaxConnections,
Encoding: s.ContentEncoding,
Splitter: s.splitter,
Log: s.log,
}
l := newStreamListener(
s.Config,
s.splitter,
s.log,
)

if err := l.setupUnix(s.url, s.tlsCfg, s.SocketMode); err != nil {
return err
}
s.listener = l
case "udp", "udp4", "udp6":
l := &packetListener{
Encoding: s.ContentEncoding,
MaxDecompressionSize: int64(s.MaxDecompressionSize),
}
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupUDP(s.url, s.interfaceName, int(s.ReadBufferSize)); err != nil {
return err
}
s.listener = l
case "ip", "ip4", "ip6":
l := &packetListener{
Encoding: s.ContentEncoding,
MaxDecompressionSize: int64(s.MaxDecompressionSize),
}
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupIP(s.url); err != nil {
return err
}
s.listener = l
case "unixgram":
l := &packetListener{
Encoding: s.ContentEncoding,
MaxDecompressionSize: int64(s.MaxDecompressionSize),
}
l := newPacketListener(s.ContentEncoding, s.MaxDecompressionSize, s.MaxParallelParsers)
if err := l.setupUnixgram(s.url, s.SocketMode); err != nil {
return err
}
s.listener = l
case "vsock":
l := &streamListener{
ReadBufferSize: int(s.ReadBufferSize),
ReadTimeout: s.ReadTimeout,
KeepAlivePeriod: s.KeepAlivePeriod,
MaxConnections: s.MaxConnections,
Encoding: s.ContentEncoding,
Splitter: s.splitter,
Log: s.log,
}
l := newStreamListener(
s.Config,
s.splitter,
s.log,
)

if err := l.setupVsock(s.url); err != nil {
return err
Expand Down
34 changes: 32 additions & 2 deletions plugins/common/socket/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"syscall"
"time"

"github.com/alitto/pond"
"github.com/mdlayher/vsock"

"github.com/influxdata/telegraf"
Expand All @@ -43,11 +44,29 @@ type streamListener struct {
connections uint64
path string
cancel context.CancelFunc
parsePool *pond.WorkerPool

wg sync.WaitGroup
sync.Mutex
}

func newStreamListener(conf Config, splitter bufio.SplitFunc, log telegraf.Logger) *streamListener {
return &streamListener{
ReadBufferSize: int(conf.ReadBufferSize),
ReadTimeout: conf.ReadTimeout,
KeepAlivePeriod: conf.KeepAlivePeriod,
MaxConnections: conf.MaxConnections,
Encoding: conf.ContentEncoding,
Splitter: splitter,
Log: log,

parsePool: pond.New(
conf.MaxParallelParsers,
0,
pond.MinWorkers(conf.MaxParallelParsers/2+1)),
}
}

func (l *streamListener) setupTCP(u *url.URL, tlsCfg *tls.Config) error {
var err error
if tlsCfg == nil {
Expand Down Expand Up @@ -216,6 +235,9 @@ func (l *streamListener) close() error {
return err
}
}

l.parsePool.StopAndWait()

return nil
}

Expand Down Expand Up @@ -334,8 +356,13 @@ func (l *streamListener) read(conn net.Conn, onData CallbackData) error {
if l.path != "" {
src = &net.UnixAddr{Name: l.path, Net: "unix"}
}

data := scanner.Bytes()
onData(src, data)
d := make([]byte, len(data))
copy(d, data)
LarsStegman marked this conversation as resolved.
Show resolved Hide resolved
l.parsePool.Submit(func() {
onData(src, d)
})
}

if err := scanner.Err(); err != nil {
Expand Down Expand Up @@ -379,7 +406,10 @@ func (l *streamListener) readAll(conn net.Conn, onData CallbackData) error {
if err != nil {
return fmt.Errorf("read on %s failed: %w", src, err)
}
onData(src, buf)

l.parsePool.Submit(func() {
onData(src, buf)
})

return nil
}
Expand Down
Loading