Skip to content

Commit

Permalink
[chore] Integration tests: move sink initialization to a common packa…
Browse files Browse the repository at this point in the history
…ge (#1697)
  • Loading branch information
dmitryax authored Mar 6, 2025
1 parent faf2847 commit 1f50d4d
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 219 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ import (
"text/template"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/receivertest"
"gopkg.in/yaml.v3"

"github.com/stretchr/testify/assert"
Expand All @@ -37,12 +34,8 @@ import (
)

const (
hecReceiverPort = 8090
hecMetricsReceiverPort = 8091
apiPort = 8881
hecLogsObjectsReceiverPort = 8092
testDir = "testdata"
valuesDir = "values"
testDir = "testdata"
valuesDir = "values"
)

var globalSinks *sinks
Expand All @@ -57,14 +50,10 @@ type sinks struct {

func setupOnce(t *testing.T) *sinks {
setupRun.Do(func() {
// create an API server
internal.CreateApiServer(t, apiPort)
// set ingest pipelines
logs, metrics := setupHEC(t)
globalSinks = &sinks{
logsConsumer: logs,
hecMetricsConsumer: metrics,
logsObjectsConsumer: setupHECLogsObjects(t),
logsConsumer: internal.SetupHECLogsSink(t),
hecMetricsConsumer: internal.SetupHECMetricsSink(t),
logsObjectsConsumer: internal.SetupHECObjectsSink(t),
}
if os.Getenv("TEARDOWN_BEFORE_SETUP") == "true" {
teardown(t)
Expand Down Expand Up @@ -92,8 +81,8 @@ func deployChartsAndApps(t *testing.T, valuesFileName string, repl map[string]in
require.Fail(t, "Host endpoint not found")
}
replacements := map[string]interface{}{
"LogHecEndpoint": fmt.Sprintf("http://%s:%d", hostEp, hecReceiverPort),
"MetricHecEndpoint": fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecMetricsReceiverPort),
"LogHecEndpoint": fmt.Sprintf("http://%s:%d", hostEp, internal.HECLogsReceiverPort),
"MetricHecEndpoint": fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECMetricsReceiverPort),
}
for k, v := range repl {
replacements[k] = v
Expand Down Expand Up @@ -315,7 +304,7 @@ func testClusterReceiverEnabledOrDisabled(t *testing.T) {
if len(hostEp) == 0 {
require.Fail(t, "Host endpoint not found")
}
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECObjectsReceiverPort)

t.Run("check cluster receiver enabled", func(t *testing.T) {
replacements := map[string]interface{}{
Expand Down Expand Up @@ -358,7 +347,7 @@ func testVerifyLogsAndMetricsAttributes(t *testing.T) {
t.Run("verify cluster receiver attributes", func(t *testing.T) {
valuesFileName := "values_cluster_receiver_only.yaml.tmpl"
logsObjectsConsumer := setupOnce(t).logsObjectsConsumer
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECObjectsReceiverPort)

replacements := map[string]interface{}{
"ClusterReceiverEnabled": true,
Expand All @@ -381,7 +370,7 @@ func testVerifyLogsAndMetricsAttributes(t *testing.T) {
t.Run("verify cluster receiver metrics attributes", func(t *testing.T) {
valuesFileName := "values_cluster_receiver_only.yaml.tmpl"
hecMetricsConsumer := setupOnce(t).hecMetricsConsumer
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort)
logsObjectsHecEndpoint := fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECObjectsReceiverPort)

replacements := map[string]interface{}{
"ClusterReceiverEnabled": true,
Expand Down Expand Up @@ -626,51 +615,3 @@ func uninstallDeployment(t *testing.T) {
t.Logf("Uninstalled release: %v", uninstallResponse)
waitForAllPodsToBeRemoved(t, "default")
}

func setupHEC(t *testing.T) (*consumertest.LogsSink, *consumertest.MetricsSink) {
// the splunkhecreceiver does poorly at receiving logs and metrics. Use separate ports for now.
f := splunkhecreceiver.NewFactory()
cfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecReceiverPort)

mCfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
mCfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecMetricsReceiverPort)

lc := new(consumertest.LogsSink)
mc := new(consumertest.MetricsSink)
rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, lc)
mrcvr, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(f.Type()), mCfg, mc)
require.NoError(t, err)

require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating logs receiver")
t.Cleanup(func() {
assert.NoError(t, rcvr.Shutdown(context.Background()))
})

require.NoError(t, mrcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating metrics receiver")
t.Cleanup(func() {
assert.NoError(t, mrcvr.Shutdown(context.Background()))
})

return lc, mc
}

func setupHECLogsObjects(t *testing.T) *consumertest.LogsSink {
f := splunkhecreceiver.NewFactory()
cfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecLogsObjectsReceiverPort)

lc := new(consumertest.LogsSink)
rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, lc)
require.NoError(t, err)

require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating logs receiver")
t.Cleanup(func() {
assert.NoError(t, rcvr.Shutdown(context.Background()))
})

return lc
}
99 changes: 10 additions & 89 deletions functional_tests/functional/functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/ptracetest"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver/otlpreceiver"
"go.opentelemetry.io/collector/receiver/receivertest"
"gopkg.in/yaml.v3"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/kube"
Expand All @@ -53,14 +49,8 @@ import (
)

const (
hecReceiverPort = 8090
hecMetricsReceiverPort = 8091
hecLogsObjectsReceiverPort = 8092
signalFxReceiverPort = 9443
signalFxReceiverK8sClusterReceiverPort = 19443
otlpReceiverPort = 4317
otlpHTTPReceiverPort = 4318
apiPort = 8881
kindTestKubeEnv = "kind"
eksTestKubeEnv = "eks"
autopilotTestKubeEnv = "gke/autopilot"
Expand Down Expand Up @@ -93,17 +83,15 @@ type sinks struct {
func setupOnce(t *testing.T) *sinks {
setupRun.Do(func() {
// create an API server
internal.CreateApiServer(t, apiPort)
// set ingest pipelines
logs, metrics := setupHEC(t)
internal.SetupSignalFxApiServer(t)
globalSinks = &sinks{
logsConsumer: logs,
hecMetricsConsumer: metrics,
logsObjectsConsumer: setupHECLogsObjects(t),
logsConsumer: internal.SetupHECLogsSink(t),
hecMetricsConsumer: internal.SetupHECMetricsSink(t),
logsObjectsConsumer: internal.SetupHECObjectsSink(t),
agentMetricsConsumer: internal.SetupSignalfxReceiver(t, signalFxReceiverPort),
k8sclusterReceiverMetricsConsumer: internal.SetupSignalfxReceiver(t,
signalFxReceiverK8sClusterReceiverPort),
tracesConsumer: setupTraces(t),
tracesConsumer: internal.SetupOTLPTracesSink(t),
}
if os.Getenv("TEARDOWN_BEFORE_SETUP") == "true" {
teardown(t)
Expand Down Expand Up @@ -225,11 +213,11 @@ func deployChartsAndApps(t *testing.T) {
}{
fmt.Sprintf("http://%s:%d", hostEp, signalFxReceiverK8sClusterReceiverPort),
fmt.Sprintf("http://%s:%d", hostEp, signalFxReceiverPort),
fmt.Sprintf("http://%s:%d", hostEp, hecReceiverPort),
fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecMetricsReceiverPort),
fmt.Sprintf("%s:%d", hostEp, otlpReceiverPort),
fmt.Sprintf("http://%s:%d", hostEp, apiPort),
fmt.Sprintf("http://%s:%d/services/collector", hostEp, hecLogsObjectsReceiverPort),
fmt.Sprintf("http://%s:%d", hostEp, internal.HECLogsReceiverPort),
fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECMetricsReceiverPort),
fmt.Sprintf("%s:%d", hostEp, internal.OTLPGRPCReceiverPort),
fmt.Sprintf("http://%s:%d", hostEp, internal.SignalFxAPIPort),
fmt.Sprintf("http://%s:%d/services/collector", hostEp, internal.HECObjectsReceiverPort),
kubeTestEnv,
}
tmpl, err := template.New("").Parse(string(valuesBytes))
Expand Down Expand Up @@ -1449,73 +1437,6 @@ func waitForAllNamespacesToBeCreated(t *testing.T, client *kubernetes.Clientset)
}, 5*time.Minute, 10*time.Second)
}

func setupTraces(t *testing.T) *consumertest.TracesSink {
tc := new(consumertest.TracesSink)
f := otlpreceiver.NewFactory()
cfg := f.CreateDefaultConfig().(*otlpreceiver.Config)
cfg.Protocols.GRPC.NetAddr.Endpoint = fmt.Sprintf("0.0.0.0:%d", otlpReceiverPort)
cfg.Protocols.HTTP.Endpoint = fmt.Sprintf("0.0.0.0:%d", otlpHTTPReceiverPort)

rcvr, err := f.CreateTraces(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, tc)
require.NoError(t, err)

require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating traces receiver")
t.Cleanup(func() {
assert.NoError(t, rcvr.Shutdown(context.Background()))
})

return tc
}

func setupHEC(t *testing.T) (*consumertest.LogsSink, *consumertest.MetricsSink) {
// the splunkhecreceiver does poorly at receiving logs and metrics. Use separate ports for now.
f := splunkhecreceiver.NewFactory()
cfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecReceiverPort)

mCfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
mCfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecMetricsReceiverPort)

lc := new(consumertest.LogsSink)
mc := new(consumertest.MetricsSink)
rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, lc)
mrcvr, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(f.Type()), mCfg, mc)
require.NoError(t, err)

require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating logs receiver")
t.Cleanup(func() {
assert.NoError(t, rcvr.Shutdown(context.Background()))
})

require.NoError(t, mrcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating metrics receiver")
t.Cleanup(func() {
assert.NoError(t, mrcvr.Shutdown(context.Background()))
})

return lc, mc
}

func setupHECLogsObjects(t *testing.T) *consumertest.LogsSink {
f := splunkhecreceiver.NewFactory()
cfg := f.CreateDefaultConfig().(*splunkhecreceiver.Config)
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", hecLogsObjectsReceiverPort)

lc := new(consumertest.LogsSink)
rcvr, err := f.CreateLogs(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, lc)
require.NoError(t, err)

require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating logs receiver")
t.Cleanup(func() {
assert.NoError(t, rcvr.Shutdown(context.Background()))
})

return lc
}

func checkMetricsAreEmitted(t *testing.T, mc *consumertest.MetricsSink, metricNames []string, matchFn func(string, pcommon.Map) bool) {
metricsToFind := map[string]bool{}
for _, name := range metricNames {
Expand Down
24 changes: 1 addition & 23 deletions functional_tests/histogram/histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package histogram

import (
"bytes"
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -17,13 +16,10 @@ import (

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/signalfxreceiver"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/receiver/receivertest"
"gopkg.in/yaml.v3"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/kube"
Expand All @@ -43,7 +39,7 @@ var histogramMetricsConsumer *consumertest.MetricsSink

func setupOnce(t *testing.T) *consumertest.MetricsSink {
setupRun.Do(func() {
histogramMetricsConsumer = setupOtlpReceiver(t, otlpReceiverPort)
histogramMetricsConsumer = internal.SetupSignalfxReceiver(t, otlpReceiverPort)

if os.Getenv("TEARDOWN_BEFORE_SETUP") == "true" {
teardown(t)
Expand All @@ -59,24 +55,6 @@ func setupOnce(t *testing.T) *consumertest.MetricsSink {
return histogramMetricsConsumer
}

func setupOtlpReceiver(t *testing.T, port int) *consumertest.MetricsSink {
mc := new(consumertest.MetricsSink)
f := signalfxreceiver.NewFactory()
cfg := f.CreateDefaultConfig().(*signalfxreceiver.Config)
cfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", port)

rcvr, err := f.CreateMetrics(context.Background(), receivertest.NewNopSettings(f.Type()), cfg, mc)
require.NoError(t, err)

require.NoError(t, rcvr.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, err, "failed creating metrics receiver")
t.Cleanup(func() {
assert.NoError(t, rcvr.Shutdown(context.Background()))
})

return mc
}

func deployChartsAndApps(t *testing.T) {
testKubeConfig, setKubeConfig := os.LookupEnv("KUBECONFIG")
require.True(t, setKubeConfig, "the environment variable KUBECONFIG must be set")
Expand Down
6 changes: 4 additions & 2 deletions functional_tests/internal/api_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@ import (
"github.com/stretchr/testify/require"
)

func CreateApiServer(t *testing.T, port int) {
const SignalFxAPIPort = 8881

func SetupSignalFxApiServer(t *testing.T) {
mux := http.NewServeMux()
mux.HandleFunc("/", func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(200)
})

_, cancelCtx := context.WithCancel(context.Background())
s := &http.Server{
Addr: fmt.Sprintf("0.0.0.0:%d", port),
Addr: fmt.Sprintf("0.0.0.0:%d", SignalFxAPIPort),
Handler: mux,
}

Expand Down
Loading

0 comments on commit 1f50d4d

Please sign in to comment.