From 19f456d0b160a2d6589680d553f7295d112fb951 Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Wed, 8 Jan 2025 15:33:14 -0800 Subject: [PATCH 1/2] [cmd/opampsupervisor] update types This change addresses the breaking changes in https://github.com/open-telemetry/opamp-go/releases/tag/v0.18.0 Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- cmd/opampsupervisor/e2e_test.go | 80 +++++++++---------- cmd/opampsupervisor/supervisor/server.go | 30 ++++--- cmd/opampsupervisor/supervisor/server_test.go | 8 +- cmd/opampsupervisor/supervisor/supervisor.go | 28 +++---- extension/opampextension/opamp_agent.go | 12 +-- 5 files changed, 81 insertions(+), 77 deletions(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index 96935b4bd0d0..b18b298b42bd 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -64,7 +64,7 @@ func (tl testLogger) Errorf(_ context.Context, format string, args ...any) { tl.t.Logf(format, args...) } -func defaultConnectingHandler(connectionCallbacks server.ConnectionCallbacksStruct) func(request *http.Request) types.ConnectionResponse { +func defaultConnectingHandler(connectionCallbacks types.Callbacks) func(request *http.Request) types.ConnectionResponse { return func(_ *http.Request) types.ConnectionResponse { return types.ConnectionResponse{ Accept: true, @@ -73,11 +73,11 @@ func defaultConnectingHandler(connectionCallbacks server.ConnectionCallbacksStru } } -// onConnectingFuncFactory is a function that will be given to server.CallbacksStruct as +// onConnectingFuncFactory is a function that will be given to types.Callbacks as // OnConnectingFunc. This allows changing the ConnectionCallbacks both from the newOpAMPServer // caller and inside of newOpAMP Server, and for custom implementations of the value for `Accept` // in types.ConnectionResponse. -type onConnectingFuncFactory func(connectionCallbacks server.ConnectionCallbacksStruct) func(request *http.Request) types.ConnectionResponse +type onConnectingFuncFactory func(connectionCallbacks types.Callbacks) func(request *http.Request) types.ConnectionResponse type testingOpAMPServer struct { addr string @@ -87,13 +87,13 @@ type testingOpAMPServer struct { shutdown func() } -func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer { +func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.Callbacks) *testingOpAMPServer { s := newUnstartedOpAMPServer(t, connectingCallback, callbacks) s.start() return s } -func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks server.ConnectionCallbacksStruct) *testingOpAMPServer { +func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.Callbacks) *testingOpAMPServer { var agentConn atomic.Value var isAgentConnected atomic.Bool var didShutdown atomic.Bool @@ -117,8 +117,8 @@ func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFa } } handler, _, err := s.Attach(server.Settings{ - Callbacks: server.CallbacksStruct{ - OnConnectingFunc: connectingCallback(callbacks), + Callbacks: types.Callbacks{ + OnConnecting: connectingCallback(callbacks), }, }) require.NoError(t, err) @@ -211,8 +211,8 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] if config != nil { @@ -287,8 +287,8 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0o600)) connected := atomic.Bool{} - server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ - OnConnectedFunc: func(ctx context.Context, conn types.Connection) { + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.Callbacks{ + OnConnected: func(ctx context.Context, conn types.Connection) { connected.Store(true) }, }) @@ -331,11 +331,11 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) { configuredChan := make(chan struct{}) connected := atomic.Bool{} - server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ - OnConnectedFunc: func(ctx context.Context, conn types.Connection) { + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.Callbacks{ + OnConnected: func(ctx context.Context, conn types.Connection) { connected.Store(true) }, - OnMessageFunc: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + OnMessage: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash() if bytes.Equal(lastCfgHash, hash) { close(configuredChan) @@ -415,8 +415,8 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.Health != nil { healthReport.Store(message.Health) } @@ -501,8 +501,8 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { capabilities.Store(message.Capabilities) return &protobufs.ServerToAgent{} @@ -556,8 +556,8 @@ func TestSupervisorBootstrapsCollector(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.AgentDescription != nil { agentDescription.Store(message.AgentDescription) } @@ -602,8 +602,8 @@ func TestSupervisorReportsEffectiveConfig(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] if config != nil { @@ -713,8 +713,8 @@ func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.AgentDescription != nil { select { case agentDescMessageChan <- message: @@ -866,8 +866,8 @@ func TestSupervisorRestartCommand(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.Health != nil { healthReport.Store(message.Health) } @@ -999,8 +999,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) { initialServer := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] if config != nil { @@ -1043,8 +1043,8 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) { newServer := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] if config != nil { @@ -1087,8 +1087,8 @@ func TestSupervisorPersistsInstanceID(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { select { case agentIDChan <- message.InstanceUid: default: @@ -1163,8 +1163,8 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { select { case agentIDChan <- message.InstanceUid: default: @@ -1270,8 +1270,8 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] if config != nil { @@ -1449,8 +1449,8 @@ func TestSupervisorRemoteConfigApplyStatus(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] if config != nil { @@ -1586,8 +1586,8 @@ func TestSupervisorOpAmpServerPort(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + types.Callbacks{ + OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] if config != nil { diff --git a/cmd/opampsupervisor/supervisor/server.go b/cmd/opampsupervisor/supervisor/server.go index cd54f96336eb..4ecfd9afa617 100644 --- a/cmd/opampsupervisor/supervisor/server.go +++ b/cmd/opampsupervisor/supervisor/server.go @@ -13,24 +13,26 @@ import ( ) type flattenedSettings struct { - onMessageFunc func(conn serverTypes.Connection, message *protobufs.AgentToServer) - onConnectingFunc func(request *http.Request) (shouldConnect bool, rejectStatusCode int) - onConnectionCloseFunc func(conn serverTypes.Connection) - endpoint string + onMessage func(conn serverTypes.Connection, message *protobufs.AgentToServer) + onConnecting func(request *http.Request) (shouldConnect bool, rejectStatusCode int) + onConnectionClose func(conn serverTypes.Connection) + endpoint string } func (fs flattenedSettings) toServerSettings() server.StartSettings { return server.StartSettings{ Settings: server.Settings{ - Callbacks: fs, + Callbacks: serverTypes.Callbacks{ + OnConnecting: fs.OnConnecting, + }, }, ListenEndpoint: fs.endpoint, } } func (fs flattenedSettings) OnConnecting(request *http.Request) serverTypes.ConnectionResponse { - if fs.onConnectingFunc != nil { - shouldConnect, rejectStatusCode := fs.onConnectingFunc(request) + if fs.onConnecting != nil { + shouldConnect, rejectStatusCode := fs.onConnecting(request) if !shouldConnect { return serverTypes.ConnectionResponse{ Accept: false, @@ -40,23 +42,25 @@ func (fs flattenedSettings) OnConnecting(request *http.Request) serverTypes.Conn } return serverTypes.ConnectionResponse{ - Accept: true, - ConnectionCallbacks: fs, + Accept: true, + ConnectionCallbacks: serverTypes.ConnectionCallbacks{ + OnMessage: fs.OnMessage, + }, } } func (fs flattenedSettings) OnConnected(_ context.Context, _ serverTypes.Connection) {} func (fs flattenedSettings) OnMessage(_ context.Context, conn serverTypes.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { - if fs.onMessageFunc != nil { - fs.onMessageFunc(conn, message) + if fs.onMessage != nil { + fs.onMessage(conn, message) } return &protobufs.ServerToAgent{} } func (fs flattenedSettings) OnConnectionClose(conn serverTypes.Connection) { - if fs.onConnectionCloseFunc != nil { - fs.onConnectionCloseFunc(conn) + if fs.onConnectionClose != nil { + fs.onConnectionClose(conn) } } diff --git a/cmd/opampsupervisor/supervisor/server_test.go b/cmd/opampsupervisor/supervisor/server_test.go index e35c11ab186e..e4272270300a 100644 --- a/cmd/opampsupervisor/supervisor/server_test.go +++ b/cmd/opampsupervisor/supervisor/server_test.go @@ -28,7 +28,7 @@ func Test_flattenedSettings_OnConnecting(t *testing.T) { t.Run("accept connection", func(t *testing.T) { onConnectingFuncCalled := false fs := flattenedSettings{ - onConnectingFunc: func(_ *http.Request) (shouldConnect bool, rejectStatusCode int) { + onConnecting: func(_ *http.Request) (shouldConnect bool, rejectStatusCode int) { onConnectingFuncCalled = true return true, 0 }, @@ -43,7 +43,7 @@ func Test_flattenedSettings_OnConnecting(t *testing.T) { t.Run("do not accept connection", func(t *testing.T) { onConnectingFuncCalled := false fs := flattenedSettings{ - onConnectingFunc: func(_ *http.Request) (shouldConnect bool, rejectStatusCode int) { + onConnecting: func(_ *http.Request) (shouldConnect bool, rejectStatusCode int) { onConnectingFuncCalled = true return false, 500 }, @@ -60,7 +60,7 @@ func Test_flattenedSettings_OnConnecting(t *testing.T) { func Test_flattenedSettings_OnMessage(t *testing.T) { onMessageFuncCalled := false fs := flattenedSettings{ - onMessageFunc: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) { + onMessage: func(_ serverTypes.Connection, _ *protobufs.AgentToServer) { onMessageFuncCalled = true }, } @@ -74,7 +74,7 @@ func Test_flattenedSettings_OnMessage(t *testing.T) { func Test_flattenedSettings_OnConnectionClose(t *testing.T) { onConnectionCloseFuncCalled := false fs := flattenedSettings{ - onConnectionCloseFunc: func(_ serverTypes.Connection) { + onConnectionClose: func(_ serverTypes.Connection) { onConnectionCloseFuncCalled = true }, } diff --git a/cmd/opampsupervisor/supervisor/supervisor.go b/cmd/opampsupervisor/supervisor/supervisor.go index 4607aa6d5958..894854273c6a 100644 --- a/cmd/opampsupervisor/supervisor/supervisor.go +++ b/cmd/opampsupervisor/supervisor/supervisor.go @@ -305,11 +305,11 @@ func (s *Supervisor) getBootstrapInfo() (err error) { // using the Collector's OpAMP extension. err = srv.Start(flattenedSettings{ endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort), - onConnectingFunc: func(_ *http.Request) (bool, int) { + onConnecting: func(_ *http.Request) (bool, int) { connected.Store(true) return true, http.StatusOK }, - onMessageFunc: func(_ serverTypes.Connection, message *protobufs.AgentToServer) { + onMessage: func(_ serverTypes.Connection, message *protobufs.AgentToServer) { if message.AgentDescription != nil { instanceIDSeen := false s.setAgentDescription(message.AgentDescription) @@ -415,33 +415,33 @@ func (s *Supervisor) startOpAMPClient() error { Header: s.config.Server.Headers, TLSConfig: tlsConfig, InstanceUid: types.InstanceUid(s.persistentState.InstanceID), - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(_ context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(_ context.Context) { s.logger.Debug("Connected to the server.") }, - OnConnectFailedFunc: func(_ context.Context, err error) { + OnConnectFailed: func(_ context.Context, err error) { s.logger.Error("Failed to connect to the server", zap.Error(err)) }, - OnErrorFunc: func(_ context.Context, err *protobufs.ServerErrorResponse) { + OnError: func(_ context.Context, err *protobufs.ServerErrorResponse) { s.logger.Error("Server returned an error response", zap.String("message", err.ErrorMessage)) }, - OnMessageFunc: s.onMessage, - OnOpampConnectionSettingsFunc: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { + OnMessage: s.onMessage, + OnOpampConnectionSettings: func(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { //nolint:errcheck go s.onOpampConnectionSettings(ctx, settings) return nil }, - OnCommandFunc: func(_ context.Context, command *protobufs.ServerToAgentCommand) error { + OnCommand: func(_ context.Context, command *protobufs.ServerToAgentCommand) error { cmdType := command.GetType() if *cmdType.Enum() == protobufs.CommandType_CommandType_Restart { return s.handleRestartCommand() } return nil }, - SaveRemoteConfigStatusFunc: func(_ context.Context, _ *protobufs.RemoteConfigStatus) { + SaveRemoteConfigStatus: func(_ context.Context, _ *protobufs.RemoteConfigStatus) { // TODO: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21079 }, - GetEffectiveConfigFunc: func(_ context.Context) (*protobufs.EffectiveConfig, error) { + GetEffectiveConfig: func(_ context.Context) (*protobufs.EffectiveConfig, error) { return s.createEffectiveConfigMsg(), nil }, }, @@ -486,13 +486,13 @@ func (s *Supervisor) startOpAMPServer() error { err = s.opampServer.Start(flattenedSettings{ endpoint: fmt.Sprintf("localhost:%d", s.opampServerPort), - onConnectingFunc: func(_ *http.Request) (bool, int) { + onConnecting: func(_ *http.Request) (bool, int) { // Only allow one agent to be connected the this server at a time. alreadyConnected := connected.Swap(true) return !alreadyConnected, http.StatusConflict }, - onMessageFunc: s.handleAgentOpAMPMessage, - onConnectionCloseFunc: func(_ serverTypes.Connection) { + onMessage: s.handleAgentOpAMPMessage, + onConnectionClose: func(_ serverTypes.Connection) { connected.Store(false) }, }.toServerSettings()) diff --git a/extension/opampextension/opamp_agent.go b/extension/opampextension/opamp_agent.go index 3e26d407fd80..cca5f762ebe3 100644 --- a/extension/opampextension/opamp_agent.go +++ b/extension/opampextension/opamp_agent.go @@ -118,20 +118,20 @@ func (o *opampAgent) Start(ctx context.Context, host component.Host) error { TLSConfig: tls, OpAMPServerURL: o.cfg.Server.GetEndpoint(), InstanceUid: types.InstanceUid(o.instanceID), - Callbacks: types.CallbacksStruct{ - OnConnectFunc: func(_ context.Context) { + Callbacks: types.Callbacks{ + OnConnect: func(_ context.Context) { o.logger.Debug("Connected to the OpAMP server") }, - OnConnectFailedFunc: func(_ context.Context, err error) { + OnConnectFailed: func(_ context.Context, err error) { o.logger.Error("Failed to connect to the OpAMP server", zap.Error(err)) }, - OnErrorFunc: func(_ context.Context, err *protobufs.ServerErrorResponse) { + OnError: func(_ context.Context, err *protobufs.ServerErrorResponse) { o.logger.Error("OpAMP server returned an error response", zap.String("message", err.ErrorMessage)) }, - GetEffectiveConfigFunc: func(_ context.Context) (*protobufs.EffectiveConfig, error) { + GetEffectiveConfig: func(_ context.Context) (*protobufs.EffectiveConfig, error) { return o.composeEffectiveConfig(), nil }, - OnMessageFunc: o.onMessage, + OnMessage: o.onMessage, }, Capabilities: o.capabilities.toAgentCapabilities(), } From c27be187644bb861f0a218a985b4bd6b38b34594 Mon Sep 17 00:00:00 2001 From: Alex Boten <223565+codeboten@users.noreply.github.com> Date: Thu, 9 Jan 2025 15:26:25 -0800 Subject: [PATCH 2/2] fix e2e test references Signed-off-by: Alex Boten <223565+codeboten@users.noreply.github.com> --- cmd/opampsupervisor/e2e_test.go | 87 +++++++++++++++++---------------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/cmd/opampsupervisor/e2e_test.go b/cmd/opampsupervisor/e2e_test.go index b18b298b42bd..b4f80e9624c8 100644 --- a/cmd/opampsupervisor/e2e_test.go +++ b/cmd/opampsupervisor/e2e_test.go @@ -64,7 +64,7 @@ func (tl testLogger) Errorf(_ context.Context, format string, args ...any) { tl.t.Logf(format, args...) } -func defaultConnectingHandler(connectionCallbacks types.Callbacks) func(request *http.Request) types.ConnectionResponse { +func defaultConnectingHandler(connectionCallbacks types.ConnectionCallbacks) func(request *http.Request) types.ConnectionResponse { return func(_ *http.Request) types.ConnectionResponse { return types.ConnectionResponse{ Accept: true, @@ -73,11 +73,11 @@ func defaultConnectingHandler(connectionCallbacks types.Callbacks) func(request } } -// onConnectingFuncFactory is a function that will be given to types.Callbacks as +// onConnectingFuncFactory is a function that will be given to types.ConnectionCallbacks as // OnConnectingFunc. This allows changing the ConnectionCallbacks both from the newOpAMPServer // caller and inside of newOpAMP Server, and for custom implementations of the value for `Accept` // in types.ConnectionResponse. -type onConnectingFuncFactory func(connectionCallbacks types.Callbacks) func(request *http.Request) types.ConnectionResponse +type onConnectingFuncFactory func(connectionCallbacks types.ConnectionCallbacks) func(request *http.Request) types.ConnectionResponse type testingOpAMPServer struct { addr string @@ -87,20 +87,20 @@ type testingOpAMPServer struct { shutdown func() } -func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.Callbacks) *testingOpAMPServer { +func newOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.ConnectionCallbacks) *testingOpAMPServer { s := newUnstartedOpAMPServer(t, connectingCallback, callbacks) s.start() return s } -func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.Callbacks) *testingOpAMPServer { +func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFactory, callbacks types.ConnectionCallbacks) *testingOpAMPServer { var agentConn atomic.Value var isAgentConnected atomic.Bool var didShutdown atomic.Bool connectedChan := make(chan bool) s := server.New(testLogger{t: t}) - onConnectedFunc := callbacks.OnConnectedFunc - callbacks.OnConnectedFunc = func(ctx context.Context, conn types.Connection) { + onConnectedFunc := callbacks.OnConnected + callbacks.OnConnected = func(ctx context.Context, conn types.Connection) { if onConnectedFunc != nil { onConnectedFunc(ctx, conn) } @@ -108,8 +108,8 @@ func newUnstartedOpAMPServer(t *testing.T, connectingCallback onConnectingFuncFa isAgentConnected.Store(true) connectedChan <- true } - onConnectionCloseFunc := callbacks.OnConnectionCloseFunc - callbacks.OnConnectionCloseFunc = func(conn types.Connection) { + onConnectionCloseFunc := callbacks.OnConnectionClose + callbacks.OnConnectionClose = func(conn types.Connection) { isAgentConnected.Store(false) connectedChan <- false if onConnectionCloseFunc != nil { @@ -211,7 +211,7 @@ func TestSupervisorStartsCollectorWithRemoteConfig(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] @@ -287,7 +287,7 @@ func TestSupervisorStartsCollectorWithNoOpAMPServer(t *testing.T) { require.NoError(t, os.WriteFile(remoteConfigFilePath, marshalledRemoteConfig, 0o600)) connected := atomic.Bool{} - server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.Callbacks{ + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.ConnectionCallbacks{ OnConnected: func(ctx context.Context, conn types.Connection) { connected.Store(true) }, @@ -331,19 +331,20 @@ func TestSupervisorStartsWithNoOpAMPServer(t *testing.T) { configuredChan := make(chan struct{}) connected := atomic.Bool{} - server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.Callbacks{ - OnConnected: func(ctx context.Context, conn types.Connection) { - connected.Store(true) - }, - OnMessage: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { - lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash() - if bytes.Equal(lastCfgHash, hash) { - close(configuredChan) - } + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, + types.ConnectionCallbacks{ + OnConnected: func(ctx context.Context, conn types.Connection) { + connected.Store(true) + }, + OnMessage: func(ctx context.Context, conn types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { + lastCfgHash := message.GetRemoteConfigStatus().GetLastRemoteConfigHash() + if bytes.Equal(lastCfgHash, hash) { + close(configuredChan) + } - return &protobufs.ServerToAgent{} - }, - }) + return &protobufs.ServerToAgent{} + }, + }) defer server.shutdown() // The supervisor is started without a running OpAMP server. @@ -415,7 +416,7 @@ func TestSupervisorRestartsCollectorAfterBadConfig(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.Health != nil { healthReport.Store(message.Health) @@ -501,7 +502,7 @@ func TestSupervisorConfiguresCapabilities(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { capabilities.Store(message.Capabilities) @@ -556,7 +557,7 @@ func TestSupervisorBootstrapsCollector(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.AgentDescription != nil { agentDescription.Store(message.AgentDescription) @@ -602,7 +603,7 @@ func TestSupervisorReportsEffectiveConfig(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] @@ -713,7 +714,7 @@ func TestSupervisorAgentDescriptionConfigApplies(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.AgentDescription != nil { select { @@ -866,7 +867,7 @@ func TestSupervisorRestartCommand(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.Health != nil { healthReport.Store(message.Health) @@ -948,7 +949,7 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) { initialServer := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{}) + types.ConnectionCallbacks{}) s := newSupervisor(t, "accepts_conn", map[string]string{"url": initialServer.addr}) @@ -960,11 +961,11 @@ func TestSupervisorOpAMPConnectionSettings(t *testing.T) { newServer := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{ - OnConnectedFunc: func(_ context.Context, _ types.Connection) { + types.ConnectionCallbacks{ + OnConnected: func(_ context.Context, _ types.Connection) { connectedToNewServer.Store(true) }, - OnMessageFunc: func(_ context.Context, _ types.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent { + OnMessage: func(_ context.Context, _ types.Connection, _ *protobufs.AgentToServer) *protobufs.ServerToAgent { return &protobufs.ServerToAgent{} }, }) @@ -999,7 +1000,7 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) { initialServer := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] @@ -1043,7 +1044,7 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) { newServer := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] @@ -1087,7 +1088,7 @@ func TestSupervisorPersistsInstanceID(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { select { case agentIDChan <- message.InstanceUid: @@ -1163,7 +1164,7 @@ func TestSupervisorPersistsNewInstanceID(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { select { case agentIDChan <- message.InstanceUid: @@ -1242,7 +1243,7 @@ func TestSupervisorWritesAgentFilesToStorageDir(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - server.ConnectionCallbacksStruct{}, + types.ConnectionCallbacks{}, ) s := newSupervisor(t, "basic", map[string]string{ @@ -1270,7 +1271,7 @@ func TestSupervisorStopsAgentProcessWithEmptyConfigMap(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] @@ -1386,8 +1387,8 @@ func TestSupervisorLogging(t *testing.T) { require.NoError(t, os.WriteFile(remoteCfgFilePath, marshalledRemoteCfg, 0o600)) connected := atomic.Bool{} - server := newUnstartedOpAMPServer(t, defaultConnectingHandler, server.ConnectionCallbacksStruct{ - OnConnectedFunc: func(ctx context.Context, conn types.Connection) { + server := newUnstartedOpAMPServer(t, defaultConnectingHandler, types.ConnectionCallbacks{ + OnConnected: func(ctx context.Context, conn types.Connection) { connected.Store(true) }, }) @@ -1449,7 +1450,7 @@ func TestSupervisorRemoteConfigApplyStatus(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""] @@ -1586,7 +1587,7 @@ func TestSupervisorOpAmpServerPort(t *testing.T) { server := newOpAMPServer( t, defaultConnectingHandler, - types.Callbacks{ + types.ConnectionCallbacks{ OnMessage: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent { if message.EffectiveConfig != nil { config := message.EffectiveConfig.ConfigMap.ConfigMap[""]