Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[chore] introduce prometheus load test #305

Merged
merged 23 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions internal/testbed/load/tests/datasenders/prometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package datasenders

import (
"context"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datasenders"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/pdata/pmetric"
"net"
"strings"
)

type multiHostPrometheusDataSender struct {
dataSenders []testbed.MetricDataSender
consumer.Metrics
}

func NewMultiHostPrometheusDataSender(host string, ports []int) testbed.MetricDataSender {
ds := &multiHostPrometheusDataSender{
dataSenders: make([]testbed.MetricDataSender, len(ports)),
}

for i, port := range ports {
ds.dataSenders[i] = datasenders.NewPrometheusDataSender(host, port)
}

return ds
}

func (m multiHostPrometheusDataSender) Start() error {
for _, prom := range m.dataSenders {
if err := prom.Start(); err != nil {
return err
}
}
return nil
}

func (m multiHostPrometheusDataSender) Flush() {
for _, prom := range m.dataSenders {
prom.Flush()
}
}

func (m multiHostPrometheusDataSender) GetEndpoint() net.Addr {
return nil
}

func (m multiHostPrometheusDataSender) GenConfigYAMLStr() string {
yamlStr := ""

for i, prom := range m.dataSenders {
yamlStr += getIndexedPrometheusReceiverConfig(prom.GenConfigYAMLStr(), i)
}

return yamlStr
}

func (m multiHostPrometheusDataSender) ProtocolName() string {
protocols := ""
for i, prom := range m.dataSenders {
if i < len(m.dataSenders)-1 {
protocols += fmt.Sprintf("%s/%d,", prom.ProtocolName(), i)
} else {
protocols += fmt.Sprintf("%s/%d", prom.ProtocolName(), i)
}
}
return protocols
}

func (m multiHostPrometheusDataSender) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error {
for _, prom := range m.dataSenders {
if err := prom.ConsumeMetrics(ctx, md); err != nil {
return err
}
}
return nil
}

func getIndexedPrometheusReceiverConfig(config string, i int) string {
config = strings.Replace(config, "prometheus:", fmt.Sprintf("prometheus/%d:", i), 1)
config = strings.Replace(config, "- job_name: 'testbed'", fmt.Sprintf("- job_name: 'testbed-%d'", i), 1)
return config
}
111 changes: 110 additions & 1 deletion internal/testbed/load/tests/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ package loadtest
// coded in this file or use scenarios from perf_scenarios.go.

import (
datasenders2 "github.com/Dynatrace/dynatrace-otel-collector/internal/testbed/load/tests/datasenders"
"testing"

"github.com/Dynatrace/dynatrace-otel-collector/internal/testcommon/testutil"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/datasenders"
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

Expand Down Expand Up @@ -118,7 +120,7 @@ func TestMetric100kDPS(t *testing.T) {
},
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 70,
ExpectedMaxRAM: 105,
ExpectedMaxRAM: 110,
bacherfl marked this conversation as resolved.
Show resolved Hide resolved
},
attrCount: 25,
attrSizeByte: 20,
Expand Down Expand Up @@ -162,3 +164,110 @@ func TestMetric100kDPS(t *testing.T) {
})
}
}

func TestPrometheusMetric(t *testing.T) {
tests := []struct {
name string
sender testbed.DataSender
receiver testbed.DataReceiver
extendedLoadOptions ExtendedLoadOptions
resourceSpec testbed.ResourceSpec
processors map[string]string
}{
{
name: "Prometheus 1kDPS - 1 Prometheus Endpoint",
sender: datasenders.NewPrometheusDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
extendedLoadOptions: ExtendedLoadOptions{
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 70,
ExpectedMaxRAM: 130,
},
loadOptions: &testbed.LoadOptions{
DataItemsPerSecond: 1,
ItemsPerBatch: 1,
Parallel: 1,
},
attrCount: 25,
attrSizeByte: 20,
attrKeySizeByte: 100,
scrapeLoadOptions: scrapeLoadOptions{
numberOfMetrics: 1000,
scrapeIntervalMilliSeconds: 1000,
},
},
processors: metricProcessors,
},
{
name: "Prometheus Prometheus 10kDPS - 1 Prometheus Endpoint",
sender: datasenders.NewPrometheusDataSender(testbed.DefaultHost, testutil.GetAvailablePort(t)),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
extendedLoadOptions: ExtendedLoadOptions{
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 70,
ExpectedMaxRAM: 280,
},
loadOptions: &testbed.LoadOptions{
DataItemsPerSecond: 1,
ItemsPerBatch: 1,
Parallel: 1,
},
attrCount: 25,
attrSizeByte: 20,
attrKeySizeByte: 100,
scrapeLoadOptions: scrapeLoadOptions{
numberOfMetrics: 10_000,
scrapeIntervalMilliSeconds: 1000,
},
},
processors: metricProcessors,
},
{
name: "Prometheus Prometheus 1kDPS - 5 Prometheus Endpoints",
sender: datasenders2.NewMultiHostPrometheusDataSender(testbed.DefaultHost, getAvailablePorts(t, 5)),
receiver: testbed.NewOTLPDataReceiver(testutil.GetAvailablePort(t)),
extendedLoadOptions: ExtendedLoadOptions{
resourceSpec: testbed.ResourceSpec{
ExpectedMaxCPU: 150,
ExpectedMaxRAM: 280,
},
loadOptions: &testbed.LoadOptions{
DataItemsPerSecond: 1,
ItemsPerBatch: 1,
Parallel: 1,
},
attrCount: 25,
attrSizeByte: 20,
attrKeySizeByte: 100,
scrapeLoadOptions: scrapeLoadOptions{
numberOfMetrics: 1000,
scrapeIntervalMilliSeconds: 1000,
},
},
processors: metricProcessors,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
PullBasedSenderScenario(
t,
test.sender,
test.receiver,
performanceResultsSummary,
test.processors,
nil,
test.extendedLoadOptions,
)
})
}
}

func getAvailablePorts(t *testing.T, numberOfPorts int) []int {
bacherfl marked this conversation as resolved.
Show resolved Hide resolved
ports := make([]int, numberOfPorts)

for i := 0; i < numberOfPorts; i++ {
ports[i] = testutil.GetAvailablePort(t)
}
return ports
}
104 changes: 99 additions & 5 deletions internal/testbed/load/tests/scenarios.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package loadtest

import (
"context"
"fmt"
"path"
"path/filepath"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -13,12 +16,18 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/testbed/testbed"
)

type scrapeLoadOptions struct {
numberOfMetrics int
scrapeIntervalMilliSeconds int
}

type ExtendedLoadOptions struct {
loadOptions *testbed.LoadOptions
resourceSpec testbed.ResourceSpec
attrCount int
attrSizeByte int
attrKeySizeByte int
loadOptions *testbed.LoadOptions
resourceSpec testbed.ResourceSpec
attrCount int
attrSizeByte int
attrKeySizeByte int
scrapeLoadOptions scrapeLoadOptions
}

// createConfigYaml creates a collector config file that corresponds to the
Expand Down Expand Up @@ -172,6 +181,80 @@ func GenericScenario(
tc.ValidateData()
}

func PullBasedSenderScenario(
t *testing.T,
sender testbed.DataSender,
receiver testbed.DataReceiver,
resultsSummary testbed.TestResultsSummary,
processors map[string]string,
extensions map[string]string,
loadOptions ExtendedLoadOptions,
) {
resultDir, err := filepath.Abs(path.Join("results", t.Name()))
require.NoError(t, err)

agentProc := testbed.NewChildProcessCollector(testbed.WithEnvVar("GOMAXPROCS", "2"))

configStr := createConfigYaml(t, sender, receiver, resultDir, processors, extensions)

// replace the default scrape interval duration with the interval defined in the load options
configStr = strings.Replace(
configStr,
"scrape_interval: 100ms",
fmt.Sprintf("scrape_interval: %dms", loadOptions.scrapeLoadOptions.scrapeIntervalMilliSeconds),
1,
)
configCleanup, err := agentProc.PrepareConfig(configStr)
require.NoError(t, err)
defer configCleanup()

dataProvider := testbed.NewPerfTestDataProvider(*loadOptions.loadOptions)
tc := testbed.NewTestCase(
t,
dataProvider,
sender,
receiver,
agentProc,
&simpleTestcaseValidator{
perfTestValidator: &testbed.PerfTestValidator{},
},
resultsSummary,
testbed.WithResourceLimits(loadOptions.resourceSpec),
)
t.Cleanup(tc.Stop)

tc.StartBackend()

// first generate a fixed number of metrics
err = sender.Start()
require.NoError(t, err)

providerSender, ok := tc.LoadGenerator.(*testbed.ProviderSender)
require.True(t, ok)
metricSender, ok := sender.(testbed.MetricDataSender)
require.True(t, ok)

for i := 0; i < loadOptions.scrapeLoadOptions.numberOfMetrics; i++ {
dataItemsSent := atomic.Uint64{}
providerSender.Provider.SetLoadGeneratorCounters(&dataItemsSent)
metrics, _ := providerSender.Provider.GenerateMetrics()
metricSender.ConsumeMetrics(context.Background(), metrics)
tc.LoadGenerator.IncDataItemsSent()
}

tc.StartAgent()

tc.Sleep(tc.Duration)

tc.StopLoad()

tc.WaitForN(func() bool { return tc.LoadGenerator.DataItemsSent() <= tc.MockBackend.DataItemsReceived() },
time.Second*300,
"all data items received")

tc.ValidateData()
}

func constructAttributes(loadOptions ExtendedLoadOptions) ExtendedLoadOptions {
loadOptions.loadOptions.Attributes = make(map[string]string)

Expand All @@ -190,3 +273,14 @@ func genRandByteString(length int) string {
}
return string(b)
}

type simpleTestcaseValidator struct {
perfTestValidator *testbed.PerfTestValidator
}

func (simpleTestcaseValidator) Validate(tc *testbed.TestCase) {
}

func (s simpleTestcaseValidator) RecordResults(tc *testbed.TestCase) {
s.perfTestValidator.RecordResults(tc)
}
Loading