Skip to content

Commit

Permalink
try to reenable a discovery docker test
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jan 10, 2025
1 parent 9badcfe commit 911ba27
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 35 deletions.
52 changes: 40 additions & 12 deletions tests/general/discoverymode/docker_observer_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"testing"
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/pmetrictest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"

Expand All @@ -34,15 +36,17 @@ import (
// starting a collector with the daemon domain socket mounted and the container running with its group id
// to detect a prometheus container with a test.id label the receiver creator rule matches against.
func TestDockerObserver(t *testing.T) {
t.Skip("discovery receivers host_observer and docker observer is already being tested elsewhere implicitly")
testutils.SkipIfNotContainerTest(t)
if runtime.GOOS == "darwin" {
t.Skip("unable to share sockets between mac and d4m vm: https://github.com/docker/for-mac/issues/483#issuecomment-758836836")
}
tc := testutils.NewTestcase(t)
defer tc.PrintLogsOnFailure()
defer tc.ShutdownOTLPReceiverSink()

dockerSocketProxy := testutils.CreateDockerSocketProxy(t)
require.NoError(t, dockerSocketProxy.Start())
t.Cleanup(func() {
dockerSocketProxy.Stop()
})

cntrs, shutdownPrometheus := tc.Containers(
testutils.NewContainer().WithImage("bitnami/prometheus").WithLabel("test.id", tc.ID).WillWaitForLogs("Server is ready to receive web requests."),
)
Expand All @@ -59,7 +63,6 @@ func TestDockerObserver(t *testing.T) {
properties, err := filepath.Abs(filepath.Join(".", "testdata", "docker-observer-properties.yaml"))
require.NoError(t, err)
cc.Container = cc.Container.WithMount(testcontainers.BindMount(properties, "/opt/properties.yaml"))
cc.Container = cc.Container.WithBinds("/var/run/docker.sock:/var/run/dock.e.r.sock:ro")
cc.Container = cc.Container.WillWaitForLogs("Discovering for next")
// uid check is for basic collector functionality not using the splunk-otel-collector user
// but the docker gid is required to reach the daemon
Expand All @@ -72,7 +75,7 @@ func TestDockerObserver(t *testing.T) {
"SPLUNK_DISCOVERY_DURATION": "20s",
// confirm that debug logging doesn't affect runtime
"SPLUNK_DISCOVERY_LOG_LEVEL": "debug",
"DOCKER_DOMAIN_SOCKET": "unix:///var/run/dock.e.r.sock",
"DOCKER_DOMAIN_SOCKET": fmt.Sprintf("tcp://%s", dockerSocketProxy.ContainerEndpoint),
"LABEL_ONE_VALUE": "actual.label.one.value",
"LABEL_TWO_VALUE": "actual.label.two.value",
"SPLUNK_DISCOVERY_RECEIVERS_prometheus_x5f_simple_CONFIG_labels_x3a__x3a_label_x5f_three": "overwritten by --set property",
Expand All @@ -90,8 +93,35 @@ func TestDockerObserver(t *testing.T) {
)
defer shutdown()

expectedResourceMetrics := tc.ResourceMetrics("docker-observer-internal-prometheus.yaml")
require.NoError(t, tc.OTLPReceiverSink.AssertAllMetricsReceived(t, *expectedResourceMetrics, 30*time.Second))
expected, err := golden.ReadMetrics(filepath.Join("testdata", "expected", "docker-observer-internal-prometheus-expected.yaml"))
require.NoError(t, err)
require.EventuallyWithT(t, func(tt *assert.CollectT) {
if len(tc.OTLPReceiverSink.AllMetrics()) == 0 {
assert.Fail(tt, "No metrics collected")
return
}
err := pmetrictest.CompareMetrics(expected, tc.OTLPReceiverSink.AllMetrics()[len(tc.OTLPReceiverSink.AllMetrics())-1],
pmetrictest.IgnoreResourceAttributeValue("service.instance.id"),
pmetrictest.IgnoreResourceAttributeValue("net.host.port"),
pmetrictest.IgnoreResourceAttributeValue("net.host.name"),
pmetrictest.IgnoreResourceAttributeValue("server.address"),
pmetrictest.IgnoreResourceAttributeValue("container.name"),
pmetrictest.IgnoreResourceAttributeValue("server.port"),
pmetrictest.IgnoreResourceAttributeValue("service.name"),
pmetrictest.IgnoreResourceAttributeValue("service_instance_id"),
pmetrictest.IgnoreResourceAttributeValue("service_version"),
pmetrictest.IgnoreMetricAttributeValue("service_version"),
pmetrictest.IgnoreMetricAttributeValue("service_instance_id"),
pmetrictest.IgnoreTimestamp(),
pmetrictest.IgnoreStartTimestamp(),
pmetrictest.IgnoreMetricDataPointsOrder(),
pmetrictest.IgnoreScopeMetricsOrder(),
pmetrictest.IgnoreScopeVersion(),
pmetrictest.IgnoreResourceMetricsOrder(),
pmetrictest.IgnoreMetricValues(),
)
assert.NoError(tt, err)
}, 30*time.Second, 1*time.Second)

expectedInitial := map[string]any{
"file": map[string]any{
Expand Down Expand Up @@ -167,8 +197,6 @@ func TestDockerObserver(t *testing.T) {
"receivers/splunk.discovery": []any{"receiver_creator/discovery"},
},
},
"splunk.properties": map[string]any{},
"splunk.property": map[string]any{},
}
require.Equal(t, expectedInitial, cc.InitialConfig(t, 55554))

Expand Down Expand Up @@ -214,7 +242,7 @@ func TestDockerObserver(t *testing.T) {
},
"extensions": map[string]any{
"docker_observer": map[string]any{
"endpoint": "unix:///var/run/dock.e.r.sock",
"endpoint": fmt.Sprintf("tcp://%s", dockerSocketProxy.ContainerEndpoint),
},
},
"receivers": map[string]any{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
resourceMetrics:
- resource:
attributes:
- key: container.image.name
value:
stringValue: bitnami/prometheus
- key: container.name
value:
stringValue: nervous_brahmagupta
- key: http.scheme
value:
stringValue: http
- key: net.host.name
value:
stringValue: 172.17.0.3
- key: net.host.port
value:
stringValue: "9090"
- key: server.address
value:
stringValue: 172.17.0.3
- key: server.port
value:
stringValue: "9090"
- key: service.instance.id
value:
stringValue: 172.17.0.3:9090
- key: service.name
value:
stringValue: prometheus_simple/172.17.0.3:9090
- key: url.scheme
value:
stringValue: http
scopeMetrics:
- metrics:
- description: Number of exemplars currently in circular storage.
gauge:
dataPoints:
- asDouble: 0
attributes:
- key: label_five
value:
stringValue: actual.label.five.value
- key: label_four
value:
stringValue: actual.label.four.value
- key: label_one
value:
stringValue: actual.label.one.value
- key: label_three
value:
stringValue: actual.label.three.value
- key: label_two
value:
stringValue: actual.label.two.value
timeUnixNano: "1000000"
metadata:
- key: prometheus.type
value:
stringValue: gauge
name: prometheus_tsdb_exemplar_exemplars_in_storage
scope:
name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/prometheusreceiver
version: v0.116.0-1-g0f3e01e7

This file was deleted.

89 changes: 89 additions & 0 deletions tests/testutils/socket_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright Splunk, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package testutils

import (
"fmt"
"io"
"net"
"runtime"
"sync"
"sync/atomic"
"testing"
)

type SocketProxy struct {
Path string
Endpoint string
ContainerEndpoint string
listener net.Listener
wg sync.WaitGroup
active atomic.Bool
t testing.TB
}

func CreateDockerSocketProxy(t testing.TB) *SocketProxy {
port := GetAvailablePort(t)

dockerHost := "0.0.0.0"
if runtime.GOOS == "darwin" {
dockerHost = "host.docker.internal"
}

return &SocketProxy{
Path: "/var/run/docker.sock",
Endpoint: fmt.Sprintf("0.0.0.0:%d", port),
ContainerEndpoint: fmt.Sprintf("%s:%d", dockerHost, port),
t: t,
}
}

func (s *SocketProxy) Start() error {
l, err := net.Listen("tcp", s.Endpoint)
if err != nil {
return err
}
s.listener = l
s.wg.Add(1)
s.active.Store(true)
go func() {
for s.active.Load() {
conn, err := l.Accept()
if err != nil {
break
}
go func(c net.Conn) {
socketConn, err := net.Dial("unix", s.Path)
if err != nil {
s.t.Log("Error dialing", err)
return
}
// Echo all incoming data.
go func() {
_, _ = io.Copy(c, socketConn)
_ = socketConn.Close()
_ = c.Close()
}()
_, _ = io.Copy(socketConn, c)
}(conn)
}
s.wg.Done()
}()
return nil
}

func (s *SocketProxy) Stop() {
s.active.Store(false)
}

0 comments on commit 911ba27

Please sign in to comment.