From bd296ce94c84f0671e4fb5d7f3b0d61fae3c1311 Mon Sep 17 00:00:00 2001 From: chengchuanp Date: Fri, 13 Dec 2024 09:53:14 -0700 Subject: [PATCH] Refactor 2nd Version --- receiver/tcpcheckreceiver/config.go | 15 +- receiver/tcpcheckreceiver/factory.go | 63 +++--- .../internal/configtcp/configtcp.go | 137 ++++++++----- .../internal/configtcp/configtcp_test.go | 165 ++++++++++------ receiver/tcpcheckreceiver/scraper.go | 185 +++++++++++++++--- receiver/tcpcheckreceiver/tcp_test.go | 131 +++++++++++++ 6 files changed, 536 insertions(+), 160 deletions(-) create mode 100644 receiver/tcpcheckreceiver/tcp_test.go diff --git a/receiver/tcpcheckreceiver/config.go b/receiver/tcpcheckreceiver/config.go index 033be7c8048d..9c75f141e8b4 100644 --- a/receiver/tcpcheckreceiver/config.go +++ b/receiver/tcpcheckreceiver/config.go @@ -6,13 +6,12 @@ package tcpcheckreceiver // import "github.com/open-telemetry/opentelemetry-coll import ( "errors" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcpcheckreceiver/internal/metadata" + "go.opentelemetry.io/collector/config/confignet" "net" "strconv" "strings" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcpcheckreceiver/internal/metadata" - - "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/receiver/scraperhelper" "go.uber.org/multierr" ) @@ -28,7 +27,7 @@ var ( type Config struct { scraperhelper.ControllerConfig `mapstructure:",squash"` metadata.MetricsBuilderConfig `mapstructure:",squash"` - Targets []*confignet.TCPAddrConfig `mapstructure:"targets"` + tcpConfigs []*confignet.TCPAddrConfig `mapstructure:",squash"` } func validatePort(port string) error { @@ -69,13 +68,15 @@ func validateTarget(cfg *confignet.TCPAddrConfig) error { func (cfg *Config) Validate() error { var err error - if len(cfg.Targets) == 0 { + if len(cfg.tcpConfigs) == 0 { err = multierr.Append(err, errMissingTargets) } - for _, target := range cfg.Targets { - err = multierr.Append(err, validateTarget(target)) + for _, tcpConfig := range cfg.tcpConfigs { + err = multierr.Append(err, validateTarget(tcpConfig)) } return err } + +// createcient diff --git a/receiver/tcpcheckreceiver/factory.go b/receiver/tcpcheckreceiver/factory.go index 7cc051e59436..0f875b48c6cf 100644 --- a/receiver/tcpcheckreceiver/factory.go +++ b/receiver/tcpcheckreceiver/factory.go @@ -5,50 +5,35 @@ package tcpcheckreceiver // import "github.com/open-telemetry/opentelemetry-coll import ( "context" - + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcpcheckreceiver/internal/metadata" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/scraperhelper" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcpcheckreceiver/internal/metadata" ) // NewFactory creates a factory for tcpcheckreceiver receiver. func NewFactory() receiver.Factory { return receiver.NewFactory( metadata.Type, - createDefaultConfig, - receiver.WithMetrics(createMetricsReceiver, metadata.MetricsStability)) + newDefaultConfig, + receiver.WithMetrics(newReceiver, metadata.MetricsStability)) } -//func createDefaultConfig() component.Config { -// cfg := scraperhelper.NewDefaultControllerConfig() -// // ?? -// cfg.CollectionInterval = 10 * time.Second -// -// return &Config{ -// ControllerConfig: cfg, -// // do we need to add timeout? -// TCPClientSettings: configtcp.TCPClientSettings{ -// Timeout: 10 * time.Second, -// }, -// MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), -// } -//} - -func createDefaultConfig() component.Config { +// to do where to set timeout +func newDefaultConfig() component.Config { cfg := scraperhelper.NewDefaultControllerConfig() return &Config{ ControllerConfig: cfg, MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), - Targets: []*confignet.TCPAddrConfig{}, + tcpConfigs: []*confignet.TCPAddrConfig{}, } } -func createMetricsReceiver( +// todo: match new version +func newReceiver( _ context.Context, settings receiver.Settings, cfg component.Config, @@ -59,7 +44,8 @@ func createMetricsReceiver( return nil, errConfigTCPCheck } - mp := newScraper(tlsCheckConfig, settings, getConnectionState) + //mp := newScraper(tlsCheckConfig, settings, getConnectionState) + mp := newScraper(tlsCheckConfig, settings) s, err := scraperhelper.NewScraperWithoutType(mp.scrape) if err != nil { return nil, err @@ -73,3 +59,32 @@ func createMetricsReceiver( opt, ) } + +// timeout ?? +//func createDefaultConfig() component.Config { +// cfg := scraperhelper.NewDefaultControllerConfig() +// cfg.CollectionInterval = 10 * time.Second +// +// return &Config{ +// ControllerConfig: cfg, +// TCPClientSettings: configtcp.TCPClientSettings{ +// Timeout: 10 * time.Second, +// }, +// MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(), +// } +//} + +//func createMetricsReceiver(_ context.Context, params receiver.Settings, rConf component.Config, consumer consumer.Metrics) (receiver.Metrics, error) { +// cfg, ok := rConf.(*Config) +// if !ok { +// return nil, errConfigNotTCPCheck +// } +// +// tcpCheckScraper := newScraper(cfg, params) +// scraper, err := scraperhelper.NewScraper(metadata.Type, tcpCheckScraper.scrape, scraperhelper.WithStart(tcpCheckScraper.start)) +// if err != nil { +// return nil, err +// } +// +// return scraperhelper.NewScraperControllerReceiver(&cfg.ControllerConfig, params, consumer, scraperhelper.AddScraper(scraper)) +//} diff --git a/receiver/tcpcheckreceiver/internal/configtcp/configtcp.go b/receiver/tcpcheckreceiver/internal/configtcp/configtcp.go index facdc97da0ec..b5988d5d9593 100644 --- a/receiver/tcpcheckreceiver/internal/configtcp/configtcp.go +++ b/receiver/tcpcheckreceiver/internal/configtcp/configtcp.go @@ -1,44 +1,93 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package configtcp // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcpcheckreceiver/internal/configtcp" - -import ( - "context" - "net" - "time" - - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/confignet" -) - -type TCPClientSettings struct { - // Endpoint is always required - Endpoint string `mapstructure:"endpoint"` - Timeout time.Duration `mapstructure:"timeout"` -} - -type Client struct { - net.Conn - TCPAddrConfig confignet.TCPAddrConfig -} - -// Dial starts a TCP session. -func (c *Client) Dial() (err error) { - c.Conn, err = c.TCPAddrConfig.Dial(context.Background()) - if err != nil { - return err - } - return nil -} - -func (tcs *TCPClientSettings) ToClient(_ component.Host, _ component.TelemetrySettings) (*Client, error) { - return &Client{ - TCPAddrConfig: confignet.TCPAddrConfig{ - Endpoint: tcs.Endpoint, - DialerConfig: confignet.DialerConfig{ - Timeout: tcs.Timeout, - }, - }, - }, nil -} +//// Copyright The OpenTelemetry Authors +//// SPDX-License-Identifier: Apache-2.0 +// +//package configtcp // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcpcheckreceiver/internal/configtcp" +// +//import "C" +//import ( +// "context" +// "errors" +// "fmt" +// "go.opentelemetry.io/collector/config/confignet" +// "net" +// "strconv" +// "strings" +//) +// +//// Predefined error responses for configuration validation failures +//var ( +// errInvalidEndpoint = errors.New(`"Endpoint" must be in the form of :`) +// errMissingTargets = errors.New(`No targets specified`) +// errConfigTCPCheck = errors.New(`Invalid Config`) +//) +// +//// X this +//func (tcs *TCPClientSettings) ToClient() (*Client, error) { +// TCPAddrConfig: confignet.TCPAddrConfig{ +// Endpoint: tcs.Endpoint, +// DialerConfig: confignet.DialerConfig{ +// Timeout: tcs.Timeout, +// }, +// }, +// } +//} +// +//func validatePort(port string) error { +// portNum, err := strconv.Atoi(port) +// if err != nil { +// return fmt.Errorf("provided port is not a number: %s", port) +// } +// if portNum < 1 || portNum > 65535 { +// return fmt.Errorf("provided port is out of valid range (1-65535): %d", portNum) +// } +// return nil +//} +// +//func (tcs *TCPClientSettings) ValidateTarget() error { +// var err error +// +// if tcs.Endpoint == "" { +// return errMissingTargets +// } +// +// if strings.Contains(tcs.Endpoint, "://") { +// return fmt.Errorf("endpoint contains a scheme, which is not allowed: %s", tcs.Endpoint) +// } +// +// _, port, parseErr := net.SplitHostPort(tcs.Endpoint) +// if parseErr != nil { +// return fmt.Errorf("%s: %w", errInvalidEndpoint.Error(), parseErr) +// } +// +// portParseErr := validatePort(port) +// if portParseErr != nil { +// return fmt.Errorf("%s: %w", errInvalidEndpoint.Error(), portParseErr) +// } +// +// return err +//} +// +//// Client +//type Client struct { +// Connection net.Conn +// Listener net.Listener +// TCPAddrConfig confignet.TCPAddrConfig +//} +// +//// Dial starts a TCP session. +//func (c *Client) Dial() (err error) { +// c.Connection, err = c.TCPAddrConfig.Dial(context.Background()) +// if err != nil { +// return err +// } +// return nil +//} +// +//// mock +//func (c *Client) Listen() (err error) { +// c.Listener, err = c.TCPAddrConfig.Listen(context.Background()) +// if err != nil { +// return err +// } +// return nil +//} diff --git a/receiver/tcpcheckreceiver/internal/configtcp/configtcp_test.go b/receiver/tcpcheckreceiver/internal/configtcp/configtcp_test.go index 0cee34af3107..82fdf16a032a 100644 --- a/receiver/tcpcheckreceiver/internal/configtcp/configtcp_test.go +++ b/receiver/tcpcheckreceiver/internal/configtcp/configtcp_test.go @@ -1,62 +1,103 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package configtcp // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcpcheckreceiver/internal/configtcp" - -import ( - "testing" - "time" - - "github.com/stretchr/testify/assert" - "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/extension" - "go.opentelemetry.io/collector/extension/auth/authtest" -) - -type mockHost struct { - component.Host - ext map[component.ID]extension.Extension -} - -func TestAllTCPClientSettings(t *testing.T) { - host := &mockHost{ - ext: map[component.ID]extension.Extension{ - component.MustNewID("testauth"): &authtest.MockClient{}, - }, - } - - endpoint := "localhost:8080" - timeout := time.Second * 5 - - tests := []struct { - name string - settings TCPClientSettings - shouldError bool - }{ - { - name: "valid_settings_endpoint", - settings: TCPClientSettings{ - Endpoint: endpoint, - Timeout: timeout, - }, - shouldError: false, - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - tt := componenttest.NewNopTelemetrySettings() - tt.TracerProvider = nil - - client, err := test.settings.ToClient(host, tt) - if test.shouldError { - assert.Error(t, err) - return - } - assert.NoError(t, err) - - assert.EqualValues(t, client.TCPAddrConfig.Endpoint, test.settings.Endpoint) - assert.EqualValues(t, client.TCPAddrConfig.DialerConfig.Timeout, test.settings.Timeout) - }) - } -} +//// Copyright The OpenTelemetry Authors +//// SPDX-License-Identifier: Apache-2.0 +// +//package configtcp // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/tcpcheckreceiver/internal/configtcp" +// +//import ( +// "github.com/stretchr/testify/require" +// "testing" +// "time" +// +// "github.com/stretchr/testify/assert" +// "go.opentelemetry.io/collector/component" +// "go.opentelemetry.io/collector/component/componenttest" +// "go.opentelemetry.io/collector/extension" +//) +// +//type mockHost struct { +// component.Host +// ext map[component.ID]extension.Extension +//} +// +//func TestAllTCPClientSettings(t *testing.T) { +// //host := &mockHost{ +// // ext: map[component.ID]extension.Extension{ +// // component.MustNewID("testauth"): &authtest.MockClient{}, +// // }, +// //} +// +// endpoint := "localhost:8080" +// timeout := time.Second * 5 +// +// tests := []struct { +// name string +// settings TCPClientSettings +// shouldError bool +// }{ +// { +// name: "valid_settings_endpoint", +// settings: TCPClientSettings{ +// Endpoint: endpoint, +// Timeout: timeout, +// }, +// shouldError: false, +// }, +// } +// for _, test := range tests { +// t.Run(test.name, func(t *testing.T) { +// tt := componenttest.NewNopTelemetrySettings() +// tt.TracerProvider = nil +// +// client, err := test.settings.ToClient() +// if test.shouldError { +// assert.Error(t, err) +// return +// } +// assert.NoError(t, err) +// +// assert.EqualValues(t, client.TCPAddrConfig.Endpoint, test.settings.Endpoint) +// assert.EqualValues(t, client.TCPAddrConfig.DialerConfig.Timeout, test.settings.Timeout) +// }) +// } +//} +// +//func TestMockTCPServer(t *testing.T) { +// +// settings := TCPClientSettings{ +// Endpoint: "localhost:8080", +// Timeout: time.Second * 5, +// } +// client, err := settings.ToClient() +// +// client.Listen() +// require.NoError(t, err, "Failed to start listener") +// defer client.Listener.Close() +// +// go func() { +// conn, err := client.Listener.Accept() +// require.NoError(t, err, "Failed to accept connection") +// defer conn.Close() +// +// // Read client message +// buf := make([]byte, 256) +// n, err := conn.Read(buf) +// require.NoError(t, err, "Failed to read from client") +// +// // Respond to the client +// conn.Write([]byte("Echo: " + string(buf[:n]))) +// }() +// +// err = client.Dial() +// require.NoError(t, err, "Client failed to connect") +// defer client.Connection.Close() +// +// // Sends a message +// message := "Hi, Server!" +// _, err = client.Connection.Write([]byte(message)) +// require.NoError(t, err, "Client failed to send message") +// +// buf := make([]byte, 256) +// n, err := client.Connection.Read(buf) +// require.NoError(t, err, "Client failed to read response") +// require.Equal(t, "Echo: "+message, string(buf[:n]), "Unexpected response from server") +//} diff --git a/receiver/tcpcheckreceiver/scraper.go b/receiver/tcpcheckreceiver/scraper.go index df1a32e6e41a..c6db128223eb 100644 --- a/receiver/tcpcheckreceiver/scraper.go +++ b/receiver/tcpcheckreceiver/scraper.go @@ -4,7 +4,7 @@ package tcpcheckreceiver // import "github.com/open-telemetry/opentelemetry-coll import ( "context" - "crypto/tls" + "go.opentelemetry.io/collector/config/confignet" "sync" "time" @@ -22,32 +22,46 @@ type scraper struct { settings component.TelemetrySettings mb *metadata.MetricsBuilder // dial sth - getConnectionState func(endpoint string) (tls.ConnectionState, error) + getConnectionState func(tcpConfig *confignet.TCPAddrConfig) (TCPConnectionState, error) } -func getConnectionState(endpoint string) (tls.ConnectionState, error) { - conn, err := tls.Dial("tcp", endpoint, &tls.Config{InsecureSkipVerify: true}) +type TCPConnectionState struct { + LocalAddr string // Local address of the connection + RemoteAddr string // Remote address of the connection + Network string // Network type (e.g., "tcp") +} + +// we may not need +func getConnectionState(tcpConfig *confignet.TCPAddrConfig) (TCPConnectionState, error) { + + conn, err := tcpConfig.Dial(context.Background()) + //conn, err := tls.Dial("tcp", endpoint, &tls.Config{InsecureSkipVerify: true}) if err != nil { - return tls.ConnectionState{}, err + return TCPConnectionState{}, err } defer conn.Close() - return conn.ConnectionState(), nil + //state := TCPConnectionState{ + // LocalAddr: conn.LocalAddr().String(), // Local endpoint (IP:port) + // RemoteAddr: conn.RemoteAddr().String(), // Remote endpoint (IP:port) + // Network: conn.LocalAddr().Network(), // Connection network type + //} + return TCPConnectionState{}, nil } -func (s *scraper) scrapeEndpoint(endpoint string, wg *sync.WaitGroup, mux *sync.Mutex) { +func (s *scraper) scrapeEndpoint(tcpConfig *confignet.TCPAddrConfig, wg *sync.WaitGroup, mux *sync.Mutex) { defer wg.Done() const pointVal int64 = 1 // Use a constant for clarity and immutability start := time.Now() - // Attempt to get the connection state - _, err := s.getConnectionState(endpoint) + //tcpClient, err := tcpConfig.ToClient() + _, err1 := s.getConnectionState(tcpConfig) now := pcommon.NewTimestampFromTime(time.Now()) - if err != nil { + if err1 != nil { // Record error data point and log the error mux.Lock() - s.mb.RecordTcpcheckErrorDataPoint(now, pointVal, endpoint, err.Error()) - s.settings.Logger.Error("TCP connection error encountered", zap.String("endpoint", endpoint), zap.Error(err)) + s.mb.RecordTcpcheckErrorDataPoint(now, pointVal, tcpConfig.Endpoint, err1.Error()) + s.settings.Logger.Error("TCP connection error encountered", zap.String("endpoint", tcpConfig.Endpoint), zap.Error(err1)) defer mux.Unlock() return } @@ -56,33 +70,158 @@ func (s *scraper) scrapeEndpoint(endpoint string, wg *sync.WaitGroup, mux *sync. duration := time.Since(start).Milliseconds() mux.Lock() - s.mb.RecordTcpcheckDurationDataPoint(now, duration, endpoint) - s.mb.RecordTcpcheckStatusDataPoint(now, pointVal, endpoint) + s.mb.RecordTcpcheckDurationDataPoint(now, duration, tcpConfig.Endpoint) + s.mb.RecordTcpcheckStatusDataPoint(now, pointVal, tcpConfig.Endpoint) defer mux.Unlock() } +//func (s *scraper) scrapeEndpoint(tcpConfig *confignet.TCPAddrConfig, wg *sync.WaitGroup, mux *sync.Mutex) { +// defer wg.Done() +// +// //state, err := s.getConnectionState(endpoint) +// //if err != nil { +// // s.settings.Logger.Error("TCP connection error encountered", zap.String("endpoint", endpoint), zap.Error(err)) +// // return +// //} +// +// currentTime := time.Now() +// timeLeftInt := int64(timeLeft) +// now := pcommon.NewTimestampFromTime(time.Now()) +// +// mux.Lock() +// defer mux.Unlock() +// s.mb.RecordTlscheckTimeLeftDataPoint(now, timeLeftInt, issuer, commonName, endpoint) +//} + +//# scrape -> to client -> dial + func (s *scraper) scrape(_ context.Context) (pmetric.Metrics, error) { - if s.cfg == nil || len(s.cfg.Targets) == 0 { + if s.cfg == nil || len(s.cfg.tcpConfigs) == 0 { return pmetric.NewMetrics(), errMissingTargets } var wg sync.WaitGroup - wg.Add(len(s.cfg.Targets)) + wg.Add(len(s.cfg.tcpConfigs)) var mux sync.Mutex - for _, target := range s.cfg.Targets { - go s.scrapeEndpoint(target.Endpoint, &wg, &mux) + for _, tcpConfig := range s.cfg.tcpConfigs { + go s.scrapeEndpoint(tcpConfig, &wg, &mux) } wg.Wait() return s.mb.Emit(), nil } -func newScraper(cfg *Config, settings receiver.Settings, getConnectionState func(endpoint string) (tls.ConnectionState, error)) *scraper { +func newScraper(cfg *Config, settings receiver.Settings) *scraper { return &scraper{ - cfg: cfg, - settings: settings.TelemetrySettings, - mb: metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), settings), - getConnectionState: getConnectionState, + cfg: cfg, + settings: settings.TelemetrySettings, + mb: metadata.NewMetricsBuilder(metadata.DefaultMetricsBuilderConfig(), settings), + //getConnectionState: getConnectionState, } } + +// others +//var errClientNotInit = errors.New("client not initialized") +// +//type tcpCheckScraper struct { +// *configtcp.Client +// *Config +// settings component.TelemetrySettings +// mb *metadata.MetricsBuilder +//} +// +//// start the scraper by creating a new TCP Client on the scraper +//func (c *tcpCheckScraper) start(_ context.Context, host component.Host) error { +// var err error +// c.Client, err = c.Config.ToClient(host, c.settings) +// return err +//} +// +//func (c *tcpCheckScraper) start() error { +// c.Client = make([]*Client, len(c.Config.TCPClientSettings)) // Allocate slice for clients +// +// // Loop through the array of TCPClientSettings and call ToClient +// for i, tcpSettings := range c.Config.TCPClientSettings { +// client, clientErr := tcpSettings.ToClient() +// if clientErr != nil { +// return fmt.Errorf("failed to create client for index %d: %w", i, clientErr) +// } +// c.Client[i] = client // Store the client +// } +// +// return nil +//} +// +//func (c *tcpCheckScraper) scrapeTCP(now pcommon.Timestamp) error { +// var success int64 +// +// start := time.Now() +// err := c.Client.Dial() +// if err == nil { +// success = 1 +// } +// c.mb.RecordTcpcheckDurationDataPoint(now, time.Since(start).Nanoseconds(), c.Config.TCPClientSettings.Endpoint) +// c.mb.RecordTcpcheckStatusDataPoint(now, success, c.Config.TCPClientSettings.Endpoint) +// return err +//} +// +//// timeout chooses the shorter between a given deadline and timeout +//func timeout(deadline time.Time, timeout time.Duration) time.Duration { +// timeToDeadline := time.Until(deadline) +// if timeToDeadline < timeout { +// return timeToDeadline +// } +// return timeout +//} +// +//// scrape connects to the endpoint and produces metrics based on the response. +//func (c *tcpCheckScraper) scrape(ctx context.Context) (_ pmetric.Metrics, err error) { +// var ( +// to time.Duration +// ) +// // check cancellation +// select { +// case <-ctx.Done(): +// return pmetric.NewMetrics(), ctx.Err() +// default: +// } +// +// cleanup := func() { +// c.Client.Close() +// } +// +// // if the context carries a shorter deadline then timeout that quickly +// deadline, ok := ctx.Deadline() +// if ok { +// to = timeout(deadline, c.Client.TCPAddrConfig.DialerConfig.Timeout) +// c.Client.TCPAddrConfig.DialerConfig.Timeout = to +// } +// +// ctx, cancel := context.WithCancel(ctx) +// defer cancel() +// +// now := pcommon.NewTimestampFromTime(time.Now()) +// if c.Client == nil { +// return pmetric.NewMetrics(), errClientNotInit +// } +// +// if err = c.scrapeTCP(now); err != nil { +// c.mb.RecordTcpcheckErrorDataPoint(now, int64(1), c.Endpoint, err.Error()) +// } else { +// go func() { +// <-ctx.Done() +// cleanup() +// }() +// } +// +// return c.mb.Emit(), nil +//} +// +//func newScraper(conf *Config, settings receiver.Settings) *tcpCheckScraper { +// return &tcpCheckScraper{ +// Config: conf, +// settings: settings.TelemetrySettings, +// mb: metadata.NewMetricsBuilder(conf.MetricsBuilderConfig, settings), +// } +//} diff --git a/receiver/tcpcheckreceiver/tcp_test.go b/receiver/tcpcheckreceiver/tcp_test.go new file mode 100644 index 000000000000..6f94f2ac3d7c --- /dev/null +++ b/receiver/tcpcheckreceiver/tcp_test.go @@ -0,0 +1,131 @@ +//package tcpcheckreceiver +// +//import ( +// "fmt" +// "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" +// "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" +// "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" +// "github.com/stretchr/testify/require" +// "go.opentelemetry.io/collector/component" +// "go.opentelemetry.io/collector/confmap/confmaptest" +// "go.opentelemetry.io/collector/consumer/consumertest" +// "go.opentelemetry.io/collector/receiver/receivertest" +// "net" +// "path/filepath" +// "time" +//) +// +//// Copyright The OpenTelemetry Authors +//// SPDX-License-Identifier: Apache-2.0 +// +//package tcplogreceiver +// +//import ( +//"context" +//"fmt" +//"net" +//"path/filepath" +//"testing" +//"time" +// +//"github.com/stretchr/testify/assert" +//"github.com/stretchr/testify/require" +//"go.opentelemetry.io/collector/component" +//"go.opentelemetry.io/collector/component/componenttest" +//"go.opentelemetry.io/collector/confmap/confmaptest" +//"go.opentelemetry.io/collector/consumer/consumertest" +//"go.opentelemetry.io/collector/receiver/receivertest" +// +//"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter" +//"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator" +//"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" +//) +// +//func TestTcp(t *testing.T) { +// testTCP(t, testdataConfigYaml()) +//} +// +//func testTCP(t *testing.T, cfg *TCPLogConfig) { +// numLogs := 5 +// +// f := NewFactory() +// sink := new(consumertest.LogsSink) +// rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(), cfg, sink) +// require.NoError(t, err) +// require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost())) +// +// var conn net.Conn +// conn, err = net.Dial("tcp", "127.0.0.1:29018") +// require.NoError(t, err) +// +// for i := 0; i < numLogs; i++ { +// msg := fmt.Sprintf("<86>1 2021-02-28T00:0%d:02.003Z test msg %d\n", i, i) +// _, err = conn.Write([]byte(msg)) +// require.NoError(t, err) +// } +// require.NoError(t, conn.Close()) +// +// require.Eventually(t, expectNLogs(sink, numLogs), 2*time.Second, time.Millisecond) +// require.NoError(t, rcvr.Shutdown(context.Background())) +// require.Len(t, sink.AllLogs(), 1) +// +// resourceLogs := sink.AllLogs()[0].ResourceLogs().At(0) +// logs := resourceLogs.ScopeLogs().At(0).LogRecords() +// +// for i := 0; i < numLogs; i++ { +// log := logs.At(i) +// +// msg := log.Body() +// require.Equal(t, msg.Str(), fmt.Sprintf("<86>1 2021-02-28T00:0%d:02.003Z test msg %d", i, i)) +// } +//} +// +//func TestLoadConfig(t *testing.T) { +// cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml")) +// require.NoError(t, err) +// factory := NewFactory() +// cfg := factory.CreateDefaultConfig() +// +// sub, err := cm.Sub("tcplog") +// require.NoError(t, err) +// require.NoError(t, sub.Unmarshal(cfg)) +// +// assert.NoError(t, component.ValidateConfig(cfg)) +// assert.Equal(t, testdataConfigYaml(), cfg) +//} +// +//func testdataConfigYaml() *TCPLogConfig { +// return &TCPLogConfig{ +// BaseConfig: adapter.BaseConfig{ +// Operators: []operator.Config{}, +// }, +// InputConfig: func() tcp.Config { +// c := tcp.NewConfig() +// c.ListenAddress = "127.0.0.1:29018" +// return *c +// }(), +// } +//} +// +//func TestDecodeInputConfigFailure(t *testing.T) { +// factory := NewFactory() +// badCfg := &TCPLogConfig{ +// BaseConfig: adapter.BaseConfig{ +// Operators: []operator.Config{}, +// }, +// InputConfig: func() tcp.Config { +// c := tcp.NewConfig() +// c.Encoding = "fake" +// return *c +// }(), +// } +// receiver, err := factory.CreateLogs(context.Background(), receivertest.NewNopSettings(), badCfg, consumertest.NewNop()) +// require.Error(t, err, "receiver creation should fail if input config isn't valid") +// require.Nil(t, receiver, "receiver creation should fail if input config isn't valid") +//} +// +//func expectNLogs(sink *consumertest.LogsSink, expected int) func() bool { +// return func() bool { +// return sink.LogRecordCount() == expected +// } +//} \ No newline at end of file