Skip to content

Commit

Permalink
Merge branch 'main' into feat/export-telemetrygen-funcs-for-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Erog38 authored Jan 10, 2025
2 parents 0a561e9 + 992d3b0 commit 462faf3
Show file tree
Hide file tree
Showing 117 changed files with 862 additions and 528 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/scoped-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ jobs:
- '**/*_test.go'
scoped-tests:
needs: changedfiles
if: needs.changedfiles.outputs.go_sources != '' || needs.changedfiles.outputs.go_tests != ''
strategy:
fail-fast: false
matrix:
os: [ windows-latest ]
runs-on: ${{ matrix.os }}
needs: changedfiles
steps:
- name: Echo changed files
shell: bash
Expand All @@ -59,9 +60,14 @@ jobs:
./.tools
key: go-cache-${{ runner.os }}-${{ hashFiles('**/go.sum') }}

- name: Build test tools
- name: Build gotestsum on Windows
if: runner.os == 'Windows'
run: make "$(${PWD} -replace '\\', '/')/.tools/gotestsum"

- name: Build gotestsum
if: runner.os != 'Windows'
run: make "$PWD/.tools/gotestsum"

- name: Run changed tests
if: needs.changedfiles.outputs.go_tests
env:
Expand Down
36 changes: 36 additions & 0 deletions .github/workflows/update-otel.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: 'Update contrib to the latest core source'
on:
workflow_dispatch:
# TODO: Enable schedule once it's verified that the action works as expected.
# schedule:
# - cron: "27 21 * * *" # Run at an arbitrary time on weekdays.

jobs:
update-otel:
runs-on: ubuntu-24.04
if: ${{ github.repository_owner == 'open-telemetry' }}
steps:
- uses: actions/checkout@v4
with:
path: opentelemetry-collector-contrib
- name: Pull the latest collector repo
uses: actions/checkout@v4
with:
path: opentelemetry-collector
repository: open-telemetry/opentelemetry-collector
- name: Update to latest opentelemetry-collector release
run: |
cd opentelemetry-collector-contrib
git config user.name opentelemetrybot
git config user.email [email protected]
make genotelcontribcol
make update-otel
- name: Create pull request against main
uses: peter-evans/create-pull-request@v7
with:
branch: opentelemetrybot/update-otel
token: ${{ secrets.OPENTELEMETRYBOT_GITHUB_TOKEN }}
commit-message: Update to latest opentelemetry-collector release.
title: "[chore] Update to latest opentelemetry-collector"
body: |
This PR updates the opentelemetry-collector dependency to the latest release.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,8 @@ update-otel:$(MULTIMOD)
$(call updatehelper,$(CORE_VERSIONS),$(GOMOD),./cmd/oteltestbedcol/builder-config.yaml)
$(MAKE) genotelcontribcol
$(MAKE) genoteltestbedcol
$(MAKE) oteltestbedcol
$(MAKE) generate
$(MAKE) crosslink
$(MAKE) remove-toolchain
git add . && git commit -s -m "[chore] mod and toolchain tidy" ; \

Expand Down
2 changes: 1 addition & 1 deletion Makefile.Common
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ for-affected-components:
if [ -z '$${DEPENDENT_PKGS}' ]; then \
echo "No other package depends on the one being changed."; \
else \
DEPENDENT_PKG_DIRS=$$(echo $${DEPENDENT_PKGS} | tr ' ' '\n' | xargs -I {} grep --include=go.mod -rl {} | xargs dirname | uniq); \
DEPENDENT_PKG_DIRS=$$(echo $${DEPENDENT_PKGS} | tr ' ' '\n' | xargs -I {} grep --include=go.mod -rl {} | xargs -r dirname | uniq); \
set -e; for dir in $$(echo $${DEPENDENT_PKG_DIRS}); do \
(cd "$${dir}" && \
echo "running $${CMD} in $${dir}" && \
Expand Down
121 changes: 61 additions & 60 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (tl testLogger) Errorf(_ context.Context, format string, args ...any) {
tl.t.Logf(format, args...)
}

func defaultConnectingHandler(connectionCallbacks server.ConnectionCallbacksStruct) func(request *http.Request) types.ConnectionResponse {
func defaultConnectingHandler(connectionCallbacks types.ConnectionCallbacks) func(request *http.Request) types.ConnectionResponse {
return func(_ *http.Request) types.ConnectionResponse {
return types.ConnectionResponse{
Accept: true,
Expand All @@ -73,11 +73,11 @@ func defaultConnectingHandler(connectionCallbacks server.ConnectionCallbacksStru
}
}

// onConnectingFuncFactory is a function that will be given to server.CallbacksStruct as
// onConnectingFuncFactory is a function that will be given to types.ConnectionCallbacks as
// OnConnectingFunc. This allows changing the ConnectionCallbacks both from the newOpAMPServer
// caller and inside of newOpAMP Server, and for custom implementations of the value for `Accept`
// in types.ConnectionResponse.
type onConnectingFuncFactory func(connectionCallbacks server.ConnectionCallbacksStruct) func(request *http.Request) types.ConnectionResponse
type onConnectingFuncFactory func(connectionCallbacks types.ConnectionCallbacks) func(request *http.Request) types.ConnectionResponse

type testingOpAMPServer struct {
addr string
Expand All @@ -87,38 +87,38 @@ type testingOpAMPServer struct {
shutdown func()
}

func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.ConnectionCallbacks) *testingOpAMPServer {
s := newUnstartedOpAMPServer(t, connectingCallback, callbacks)
s.start()
return s
}

func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer {
func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.ConnectionCallbacks) *testingOpAMPServer {
var agentConn atomic.Value
var isAgentConnected atomic.Bool
var didShutdown atomic.Bool
connectedChan := make(chan bool)
s := server.New(testLogger{t: t})
onConnectedFunc := callbacks.OnConnectedFunc
callbacks.OnConnectedFunc = func(ctx context.Context, conn types.Connection) {
onConnectedFunc := callbacks.OnConnected
callbacks.OnConnected = func(ctx context.Context, conn types.Connection) {
if onConnectedFunc != nil {
onConnectedFunc(ctx, conn)
}
agentConn.Store(conn)
isAgentConnected.Store(true)
connectedChan <- true
}
onConnectionCloseFunc := callbacks.OnConnectionCloseFunc
callbacks.OnConnectionCloseFunc = func(conn types.Connection) {
onConnectionCloseFunc := callbacks.OnConnectionClose
callbacks.OnConnectionClose = func(conn types.Connection) {
isAgentConnected.Store(false)
connectedChan <- false
if onConnectionCloseFunc != nil {
onConnectionCloseFunc(conn)
}
}
handler, _, err := s.Attach(server.Settings{
Callbacks: server.CallbacksStruct{
OnConnectingFunc: connectingCallback(callbacks),
Callbacks: types.Callbacks{
OnConnecting: connectingCallback(callbacks),
},
})
require.NoError(t, err)
Expand Down Expand Up @@ -211,8 +211,8 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -287,8 +287,8 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) {
require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0o600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.ConnectionCallbacks{
OnConnected: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
})
Expand Down Expand Up @@ -331,19 +331,20 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) {

configuredChan := make(chan struct{})
connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
OnMessageFunc: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash()
if bytes.Equal(lastCfgHash, hash) {
close(configuredChan)
}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler,
types.ConnectionCallbacks{
OnConnected: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
OnMessage: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash()
if bytes.Equal(lastCfgHash, hash) {
close(configuredChan)
}

return &protobufs.ServerToAgent{}
},
})
return &protobufs.ServerToAgent{}
},
})
defer server.shutdown()

// The supervisor is started without a running OpAMP server.
Expand Down Expand Up @@ -415,8 +416,8 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.Health != nil {
healthReport.Store(message.Health)
}
Expand Down Expand Up @@ -501,8 +502,8 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
capabilities.Store(message.Capabilities)

return &protobufs.ServerToAgent{}
Expand Down Expand Up @@ -556,8 +557,8 @@ func TestSupervisorBootstrapsCollector(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.AgentDescription != nil {
agentDescription.Store(message.AgentDescription)
}
Expand Down Expand Up @@ -602,8 +603,8 @@ func TestSupervisorReportsEffectiveConfig(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -713,8 +714,8 @@ func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.AgentDescription != nil {
select {
case agentDescMessageChan <- message:
Expand Down Expand Up @@ -866,8 +867,8 @@ func TestSupervisorRestartCommand(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.Health != nil {
healthReport.Store(message.Health)
}
Expand Down Expand Up @@ -948,7 +949,7 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
initialServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{})
types.ConnectionCallbacks{})

s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr})

Expand All @@ -960,11 +961,11 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) {
newServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnConnectedFunc: func(_ context.Context, _ types.Connection) {
types.ConnectionCallbacks{
OnConnected: func(_ context.Context, _ types.Connection) {
connectedToNewServer.Store(true)
},
OnMessageFunc: func(_ context.Context, _ types.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent {
OnMessage: func(_ context.Context, _ types.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent {
return &protobufs.ServerToAgent{}
},
})
Expand Down Expand Up @@ -999,8 +1000,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
initialServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -1043,8 +1044,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
newServer := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -1087,8 +1088,8 @@ func TestSupervisorPersistsInstanceID(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
select {
case agentIDChan <- message.InstanceUid:
default:
Expand Down Expand Up @@ -1163,8 +1164,8 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
select {
case agentIDChan <- message.InstanceUid:
default:
Expand Down Expand Up @@ -1242,7 +1243,7 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{},
types.ConnectionCallbacks{},
)

s := newSupervisor(t, "basic", map[string]string{
Expand Down Expand Up @@ -1270,8 +1271,8 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -1386,8 +1387,8 @@ func TestSupervisorLogging(t *testing.T) {
require.NoError(t, os.WriteFile(remoteCfgFilePath, marshalledRemoteCfg, 0o600))

connected := atomic.Bool{}
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{
OnConnectedFunc: func(ctx context.Context, conn types.Connection) {
server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.ConnectionCallbacks{
OnConnected: func(ctx context.Context, conn types.Connection) {
connected.Store(true)
},
})
Expand Down Expand Up @@ -1449,8 +1450,8 @@ func TestSupervisorRemoteConfigApplyStatus(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down Expand Up @@ -1586,8 +1587,8 @@ func TestSupervisorOpAmpServerPort(t *testing.T) {
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
types.ConnectionCallbacks{
OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {
if message.EffectiveConfig != nil {
config := message.EffectiveConfig.ConfigMap.ConfigMap[""]
if config != nil {
Expand Down
Loading

0 comments on commit 462faf3

Please sign in to comment.