diff --git a/client/internal/httpsender.go b/client/internal/httpsender.go index 21f475e0..f5a50279 100644 --- a/client/internal/httpsender.go +++ b/client/internal/httpsender.go @@ -134,7 +134,7 @@ func (h *HTTPSender) SetRequestHeader(header http.Header) { func (h *HTTPSender) makeOneRequestRoundtrip(ctx context.Context) { resp, err := h.sendRequestWithRetries(ctx) if err != nil { - h.logger.Errorf("%v", err) + h.logger.Errorf(ctx, "%v", err) return } if resp == nil { @@ -148,9 +148,9 @@ func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response req, err := h.prepareRequest(ctx) if err != nil { if errors.Is(err, context.Canceled) { - h.logger.Debugf("Client is stopped, will not try anymore.") + h.logger.Debugf(ctx, "Client is stopped, will not try anymore.") } else { - h.logger.Errorf("Failed prepare request (%v), will not try anymore.", err) + h.logger.Errorf(ctx, "Failed prepare request (%v), will not try anymore.", err) } return nil, err } @@ -190,16 +190,16 @@ func (h *HTTPSender) sendRequestWithRetries(ctx context.Context) (*http.Response return nil, fmt.Errorf("invalid response from server: %d", resp.StatusCode) } } else if errors.Is(err, context.Canceled) { - h.logger.Debugf("Client is stopped, will not try anymore.") + h.logger.Debugf(ctx, "Client is stopped, will not try anymore.") return nil, err } - h.logger.Errorf("Failed to do HTTP request (%v), will retry", err) + h.logger.Errorf(ctx, "Failed to do HTTP request (%v), will retry", err) h.callbacks.OnConnectFailed(err) } case <-ctx.Done(): - h.logger.Debugf("Client is stopped, will not try anymore.") + h.logger.Debugf(ctx, "Client is stopped, will not try anymore.") return nil, ctx.Err() } } @@ -239,11 +239,11 @@ func (h *HTTPSender) prepareRequest(ctx context.Context) (*requestWrapper, error var buf bytes.Buffer g := gzip.NewWriter(&buf) if _, err = g.Write(data); err != nil { - h.logger.Errorf("Failed to compress message: %v", err) + h.logger.Errorf(ctx, "Failed to compress message: %v", err) return nil, err } if err = g.Close(); err != nil { - h.logger.Errorf("Failed to close the writer: %v", err) + h.logger.Errorf(ctx, "Failed to close the writer: %v", err) return nil, err } req.bodyReader = bodyReader(buf.Bytes()) @@ -262,14 +262,14 @@ func (h *HTTPSender) receiveResponse(ctx context.Context, resp *http.Response) { msgBytes, err := io.ReadAll(resp.Body) if err != nil { _ = resp.Body.Close() - h.logger.Errorf("cannot read response body: %v", err) + h.logger.Errorf(ctx, "cannot read response body: %v", err) return } _ = resp.Body.Close() var response protobufs.ServerToAgent if err := proto.Unmarshal(msgBytes, &response); err != nil { - h.logger.Errorf("cannot unmarshal response: %v", err) + h.logger.Errorf(ctx, "cannot unmarshal response: %v", err) return } diff --git a/client/internal/packagessyncer.go b/client/internal/packagessyncer.go index 4c13308d..f6814401 100644 --- a/client/internal/packagessyncer.go +++ b/client/internal/packagessyncer.go @@ -101,17 +101,17 @@ func (s *packagesSyncer) initStatuses() error { func (s *packagesSyncer) doSync(ctx context.Context) { hash, err := s.localState.AllPackagesHash() if err != nil { - s.logger.Errorf("Package syncing failed: %V", err) + s.logger.Errorf(ctx, "Package syncing failed: %V", err) return } if bytes.Compare(hash, s.available.AllPackagesHash) == 0 { - s.logger.Debugf("All packages are already up to date.") + s.logger.Debugf(ctx, "All packages are already up to date.") return } failed := false - if err := s.deleteUnneededLocalPackages(); err != nil { - s.logger.Errorf("Cannot delete unneeded packages: %v", err) + if err := s.deleteUnneededLocalPackages(ctx); err != nil { + s.logger.Errorf(ctx, "Cannot delete unneeded packages: %v", err) failed = true } @@ -119,7 +119,7 @@ func (s *packagesSyncer) doSync(ctx context.Context) { for name, pkg := range s.available.Packages { err := s.syncPackage(ctx, name, pkg) if err != nil { - s.logger.Errorf("Cannot sync package %s: %v", name, err) + s.logger.Errorf(ctx, "Cannot sync package %s: %v", name, err) failed = true } } @@ -128,15 +128,15 @@ func (s *packagesSyncer) doSync(ctx context.Context) { // Update the "all" hash on success, so that next time Sync() does not thing, // unless a new hash is received from the Server. if err := s.localState.SetAllPackagesHash(s.available.AllPackagesHash); err != nil { - s.logger.Errorf("SetAllPackagesHash failed: %v", err) + s.logger.Errorf(ctx, "SetAllPackagesHash failed: %v", err) } else { - s.logger.Debugf("All packages are synced and up to date.") + s.logger.Debugf(ctx, "All packages are synced and up to date.") } } else { - s.logger.Errorf("Package syncing was not successful.") + s.logger.Errorf(ctx, "Package syncing was not successful.") } - _ = s.reportStatuses(true) + _ = s.reportStatuses(ctx, true) } // syncPackage downloads the package from the server and installs it. @@ -165,7 +165,7 @@ func (s *packagesSyncer) syncPackage( mustCreate := !pkgLocal.Exists if pkgLocal.Exists { if bytes.Equal(pkgLocal.Hash, pkgAvail.Hash) { - s.logger.Debugf("Package %s hash is unchanged, skipping", pkgName) + s.logger.Debugf(ctx, "Package %s hash is unchanged, skipping", pkgName) return nil } if pkgLocal.Type != pkgAvail.Type { @@ -183,7 +183,7 @@ func (s *packagesSyncer) syncPackage( // Report that we are beginning to install it. status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_Installing - _ = s.reportStatuses(true) + _ = s.reportStatuses(ctx, true) if mustCreate { // Make sure the package exists. @@ -213,7 +213,7 @@ func (s *packagesSyncer) syncPackage( status.Status = protobufs.PackageStatusEnum_PackageStatusEnum_InstallFailed status.ErrorMessage = err.Error() } - _ = s.reportStatuses(true) + _ = s.reportStatuses(ctx, true) return err } @@ -224,7 +224,7 @@ func (s *packagesSyncer) syncPackage( func (s *packagesSyncer) syncPackageFile( ctx context.Context, pkgName string, file *protobufs.DownloadableFile, ) error { - shouldDownload, err := s.shouldDownloadFile(pkgName, file) + shouldDownload, err := s.shouldDownloadFile(ctx, pkgName, file) if err == nil && shouldDownload { err = s.downloadFile(ctx, pkgName, file) } @@ -233,21 +233,18 @@ func (s *packagesSyncer) syncPackageFile( } // shouldDownloadFile returns true if the file should be downloaded. -func (s *packagesSyncer) shouldDownloadFile( - packageName string, - file *protobufs.DownloadableFile, -) (bool, error) { +func (s *packagesSyncer) shouldDownloadFile(ctx context.Context, packageName string, file *protobufs.DownloadableFile) (bool, error) { fileContentHash, err := s.localState.FileContentHash(packageName) if err != nil { err := fmt.Errorf("cannot calculate checksum of %s: %v", packageName, err) - s.logger.Errorf(err.Error()) + s.logger.Errorf(ctx, err.Error()) return true, nil } else { // Compare the checksum of the file we have with what // we are offered by the server. if bytes.Compare(fileContentHash, file.ContentHash) != 0 { - s.logger.Debugf("Package %s: file hash mismatch, will download.", packageName) + s.logger.Debugf(ctx, "Package %s: file hash mismatch, will download.", packageName) return true, nil } } @@ -256,7 +253,7 @@ func (s *packagesSyncer) shouldDownloadFile( // downloadFile downloads the file from the server. func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file *protobufs.DownloadableFile) error { - s.logger.Debugf("Downloading package %s file from %s", pkgName, file.DownloadUrl) + s.logger.Debugf(ctx, "Downloading package %s file from %s", pkgName, file.DownloadUrl) req, err := http.NewRequestWithContext(ctx, "GET", file.DownloadUrl, nil) if err != nil { @@ -286,7 +283,7 @@ func (s *packagesSyncer) downloadFile(ctx context.Context, pkgName string, file // deleteUnneededLocalPackages deletes local packages that are not // needed anymore. This is done by comparing the local package state // with the server's package state. -func (s *packagesSyncer) deleteUnneededLocalPackages() error { +func (s *packagesSyncer) deleteUnneededLocalPackages(ctx context.Context) error { // Read the list of packages we have locally. localPackages, err := s.localState.Packages() if err != nil { @@ -297,7 +294,7 @@ func (s *packagesSyncer) deleteUnneededLocalPackages() error { for _, localPkg := range localPackages { // Do we have a package that is not offered? if _, offered := s.available.Packages[localPkg]; !offered { - s.logger.Debugf("Package %s is no longer needed, deleting.", localPkg) + s.logger.Debugf(ctx, "Package %s is no longer needed, deleting.", localPkg) err := s.localState.DeletePackage(localPkg) if err != nil { lastErr = err @@ -318,16 +315,16 @@ func (s *packagesSyncer) deleteUnneededLocalPackages() error { // reportStatuses saves the last reported statuses to provider and client state. // If sendImmediately is true, the statuses are scheduled to be // sent to the server. -func (s *packagesSyncer) reportStatuses(sendImmediately bool) error { +func (s *packagesSyncer) reportStatuses(ctx context.Context, sendImmediately bool) error { // Save it in the user-supplied state provider. if err := s.localState.SetLastReportedStatuses(s.statuses); err != nil { - s.logger.Errorf("Cannot save last reported statuses: %v", err) + s.logger.Errorf(ctx, "Cannot save last reported statuses: %v", err) return err } // Also save it in our internal state (will be needed if the Server asks for it). if err := s.clientSyncedState.SetPackageStatuses(s.statuses); err != nil { - s.logger.Errorf("Cannot save client state: %v", err) + s.logger.Errorf(ctx, "Cannot save client state: %v", err) return err } s.sender.NextMessage().Update( diff --git a/client/internal/receivedprocessor.go b/client/internal/receivedprocessor.go index df80b1d2..a7daa697 100644 --- a/client/internal/receivedprocessor.go +++ b/client/internal/receivedprocessor.go @@ -60,13 +60,13 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro // If a command message exists, other messages will be ignored return } else { - r.logger.Debugf("Ignoring Command, agent does not have AcceptsCommands capability") + r.logger.Debugf(ctx, "Ignoring Command, agent does not have AcceptsCommands capability") } } scheduled, err := r.rcvFlags(ctx, protobufs.ServerToAgentFlags(msg.Flags)) if err != nil { - r.logger.Errorf("cannot processed received flags:%v", err) + r.logger.Errorf(ctx, "cannot processed received flags:%v", err) } msgData := &types.MessageData{} @@ -75,7 +75,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig) { msgData.RemoteConfig = msg.RemoteConfig } else { - r.logger.Debugf("Ignoring RemoteConfig, agent does not have AcceptsRemoteConfig capability") + r.logger.Debugf(ctx, "Ignoring RemoteConfig, agent does not have AcceptsRemoteConfig capability") } } @@ -84,7 +84,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnMetrics) { msgData.OwnMetricsConnSettings = msg.ConnectionSettings.OwnMetrics } else { - r.logger.Debugf("Ignoring OwnMetrics, agent does not have ReportsOwnMetrics capability") + r.logger.Debugf(ctx, "Ignoring OwnMetrics, agent does not have ReportsOwnMetrics capability") } } @@ -92,7 +92,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnTraces) { msgData.OwnTracesConnSettings = msg.ConnectionSettings.OwnTraces } else { - r.logger.Debugf("Ignoring OwnTraces, agent does not have ReportsOwnTraces capability") + r.logger.Debugf(ctx, "Ignoring OwnTraces, agent does not have ReportsOwnTraces capability") } } @@ -100,7 +100,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_ReportsOwnLogs) { msgData.OwnLogsConnSettings = msg.ConnectionSettings.OwnLogs } else { - r.logger.Debugf("Ignoring OwnLogs, agent does not have ReportsOwnLogs capability") + r.logger.Debugf(ctx, "Ignoring OwnLogs, agent does not have ReportsOwnLogs capability") } } @@ -108,7 +108,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro if r.hasCapability(protobufs.AgentCapabilities_AgentCapabilities_AcceptsOtherConnectionSettings) { msgData.OtherConnSettings = msg.ConnectionSettings.OtherConnections } else { - r.logger.Debugf("Ignoring OtherConnections, agent does not have AcceptsOtherConnectionSettings capability") + r.logger.Debugf(ctx, "Ignoring OtherConnections, agent does not have AcceptsOtherConnectionSettings capability") } } } @@ -124,7 +124,7 @@ func (r *receivedProcessor) ProcessReceivedMessage(ctx context.Context, msg *pro r.packagesStateProvider, ) } else { - r.logger.Debugf("Ignoring PackagesAvailable, agent does not have AcceptsPackages capability") + r.logger.Debugf(ctx, "Ignoring PackagesAvailable, agent does not have AcceptsPackages capability") } } @@ -164,7 +164,7 @@ func (r *receivedProcessor) rcvFlags( if flags&protobufs.ServerToAgentFlags_ServerToAgentFlags_ReportFullState != 0 { cfg, err := r.callbacks.GetEffectiveConfig(ctx) if err != nil { - r.logger.Errorf("Cannot GetEffectiveConfig: %v", err) + r.logger.Errorf(ctx, "Cannot GetEffectiveConfig: %v", err) cfg = nil } @@ -199,25 +199,25 @@ func (r *receivedProcessor) rcvOpampConnectionSettings(ctx context.Context, sett r.callbacks.OnOpampConnectionSettingsAccepted(settings.Opamp) } } else { - r.logger.Debugf("Ignoring Opamp, agent does not have AcceptsOpAMPConnectionSettings capability") + r.logger.Debugf(ctx, "Ignoring Opamp, agent does not have AcceptsOpAMPConnectionSettings capability") } } func (r *receivedProcessor) processErrorResponse(body *protobufs.ServerErrorResponse) { // TODO: implement this. - r.logger.Errorf("received an error from server: %s", body.ErrorMessage) + r.logger.Errorf(context.Background(), "received an error from server: %s", body.ErrorMessage) } func (r *receivedProcessor) rcvAgentIdentification(agentId *protobufs.AgentIdentification) error { if agentId.NewInstanceUid == "" { err := errors.New("empty instance uid is not allowed") - r.logger.Debugf(err.Error()) + r.logger.Debugf(context.Background(), err.Error()) return err } err := r.sender.SetInstanceUid(agentId.NewInstanceUid) if err != nil { - r.logger.Errorf("Error while setting instance uid: %v", err) + r.logger.Errorf(context.Background(), "Error while setting instance uid: %v", err) return err } diff --git a/client/internal/wsreceiver.go b/client/internal/wsreceiver.go index f4a3be57..30af6979 100644 --- a/client/internal/wsreceiver.go +++ b/client/internal/wsreceiver.go @@ -50,7 +50,7 @@ out: var message protobufs.ServerToAgent if err := r.receiveMessage(&message); err != nil { if ctx.Err() == nil && !websocket.IsCloseError(err, websocket.CloseNormalClosure) { - r.logger.Errorf("Unexpected error while receiving: %v", err) + r.logger.Errorf(ctx, "Unexpected error while receiving: %v", err) } break out } else { diff --git a/client/internal/wsreceiver_test.go b/client/internal/wsreceiver_test.go index fe0786b6..bd79c7ac 100644 --- a/client/internal/wsreceiver_test.go +++ b/client/internal/wsreceiver_test.go @@ -14,14 +14,20 @@ import ( "github.com/open-telemetry/opamp-go/protobufs" ) +var _ types.Logger = &TestLogger{} + type TestLogger struct { *testing.T } -func (logger TestLogger) Debugf(format string, v ...interface{}) { +func (logger TestLogger) Debugf(ctx context.Context, format string, v ...interface{}) { logger.Logf(format, v...) } +func (logger TestLogger) Errorf(ctx context.Context, format string, v ...interface{}) { + logger.Fatalf(format, v...) +} + type commandAction int const ( diff --git a/client/internal/wssender.go b/client/internal/wssender.go index 3442edd7..40ac937b 100644 --- a/client/internal/wssender.go +++ b/client/internal/wssender.go @@ -33,7 +33,7 @@ func NewSender(logger types.Logger) *WSSender { // earlier. To stop the WSSender cancel the ctx. func (s *WSSender) Start(ctx context.Context, conn *websocket.Conn) error { s.conn = conn - err := s.sendNextMessage() + err := s.sendNextMessage(ctx) // Run the sender in the background. s.stopped = make(chan struct{}) @@ -53,7 +53,7 @@ out: for { select { case <-s.hasPendingMessage: - s.sendNextMessage() + s.sendNextMessage(ctx) case <-ctx.Done(): break out @@ -63,18 +63,18 @@ out: close(s.stopped) } -func (s *WSSender) sendNextMessage() error { +func (s *WSSender) sendNextMessage(ctx context.Context) error { msgToSend := s.nextMessage.PopPending() if msgToSend != nil && !proto.Equal(msgToSend, &protobufs.AgentToServer{}) { // There is a pending message and the message has some fields populated. - return s.sendMessage(msgToSend) + return s.sendMessage(ctx, msgToSend) } return nil } -func (s *WSSender) sendMessage(msg *protobufs.AgentToServer) error { +func (s *WSSender) sendMessage(ctx context.Context, msg *protobufs.AgentToServer) error { if err := internal.WriteWSMessage(s.conn, msg); err != nil { - s.logger.Errorf("Cannot write WS message: %v", err) + s.logger.Errorf(ctx, "Cannot write WS message: %v", err) // TODO: check if it is a connection error then propagate error back to Client and reconnect. return err } diff --git a/client/types/logger.go b/client/types/logger.go index 0ba45ac0..1c4ad286 100644 --- a/client/types/logger.go +++ b/client/types/logger.go @@ -1,7 +1,9 @@ package types +import "context" + // Logger is the logging interface used by the OpAMP Client. type Logger interface { - Debugf(format string, v ...interface{}) - Errorf(format string, v ...interface{}) + Debugf(ctx context.Context, format string, v ...interface{}) + Errorf(ctx context.Context, format string, v ...interface{}) } diff --git a/client/wsclient.go b/client/wsclient.go index b40f153a..15b790aa 100644 --- a/client/wsclient.go +++ b/client/wsclient.go @@ -131,7 +131,7 @@ func (c *wsClient) tryConnectOnce(ctx context.Context) (err error, retryAfter sh c.common.Callbacks.OnConnectFailed(err) } if resp != nil { - c.common.Logger.Errorf("Server responded with status=%v", resp.Status) + c.common.Logger.Errorf(ctx, "Server responded with status=%v", resp.Status) duration := sharedinternal.ExtractRetryAfterHeader(resp) return err, duration } @@ -168,10 +168,10 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { { if err, retryAfter := c.tryConnectOnce(ctx); err != nil { if errors.Is(err, context.Canceled) { - c.common.Logger.Debugf("Client is stopped, will not try anymore.") + c.common.Logger.Debugf(ctx, "Client is stopped, will not try anymore.") return err } else { - c.common.Logger.Errorf("Connection failed (%v), will retry.", err) + c.common.Logger.Errorf(ctx, "Connection failed (%v), will retry.", err) } // Retry again a bit later. @@ -189,7 +189,7 @@ func (c *wsClient) ensureConnected(ctx context.Context) error { } case <-ctx.Done(): - c.common.Logger.Debugf("Client is stopped, will not try anymore.") + c.common.Logger.Debugf(ctx, "Client is stopped, will not try anymore.") timer.Stop() return ctx.Err() } @@ -218,7 +218,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { // Prepare the first status report. err := c.common.PrepareFirstMessage(ctx) if err != nil { - c.common.Logger.Errorf("cannot prepare the first message:%v", err) + c.common.Logger.Errorf(ctx, "cannot prepare the first message:%v", err) return } @@ -228,7 +228,7 @@ func (c *wsClient) runOneCycle(ctx context.Context) { // Connected successfully. Start the sender. This will also send the first // status report. if err := c.sender.Start(procCtx, c.conn); err != nil { - c.common.Logger.Errorf("Failed to send first status report: %v", err) + c.common.Logger.Errorf(procCtx, "Failed to send first status report: %v", err) // We could not send the report, the only thing we can do is start over. _ = c.conn.Close() procCancel() diff --git a/internal/examples/agent/agent/agent.go b/internal/examples/agent/agent/agent.go index 3bef4e9d..514dfd43 100644 --- a/internal/examples/agent/agent/agent.go +++ b/internal/examples/agent/agent/agent.go @@ -81,12 +81,12 @@ func NewAgent(logger types.Logger, agentType string, agentVersion string) *Agent } agent.createAgentIdentity() - agent.logger.Debugf("Agent starting, id=%v, type=%s, version=%s.", + agent.logger.Debugf(context.Background(), "Agent starting, id=%v, type=%s, version=%s.", agent.instanceId.String(), agentType, agentVersion) agent.loadLocalConfig() if err := agent.connect(); err != nil { - agent.logger.Errorf("Cannot connect OpAMP client: %v", err) + agent.logger.Errorf(context.Background(), "Cannot connect OpAMP client: %v", err) return nil } @@ -110,13 +110,13 @@ func (agent *Agent) connect() error { InstanceUid: agent.instanceId.String(), Callbacks: types.CallbacksStruct{ OnConnectFunc: func() { - agent.logger.Debugf("Connected to the server.") + agent.logger.Debugf(context.Background(), "Connected to the server.") }, OnConnectFailedFunc: func(err error) { - agent.logger.Errorf("Failed to connect to the server: %v", err) + agent.logger.Errorf(context.Background(), "Failed to connect to the server: %v", err) }, OnErrorFunc: func(err *protobufs.ServerErrorResponse) { - agent.logger.Errorf("Server returned an error response: %v", err.ErrorMessage) + agent.logger.Errorf(context.Background(), "Server returned an error response: %v", err.ErrorMessage) }, SaveRemoteConfigStatusFunc: func(_ context.Context, status *protobufs.RemoteConfigStatus) { agent.remoteConfigStatus = status @@ -148,20 +148,20 @@ func (agent *Agent) connect() error { // become known and can be checked. agent.requestClientCertificate() - agent.logger.Debugf("Starting OpAMP client...") + agent.logger.Debugf(context.Background(), "Starting OpAMP client...") err = agent.opampClient.Start(context.Background(), settings) if err != nil { return err } - agent.logger.Debugf("OpAMP Client started.") + agent.logger.Debugf(context.Background(), "OpAMP Client started.") return nil } func (agent *Agent) disconnect() { - agent.logger.Debugf("Disconnecting from server...") + agent.logger.Debugf(context.Background(), "Disconnecting from server...") agent.opampClient.Stop(context.Background()) } @@ -210,7 +210,7 @@ func (agent *Agent) createAgentIdentity() { } func (agent *Agent) updateAgentIdentity(instanceId ulid.ULID) { - agent.logger.Debugf("Agent identify is being changed from id=%v to id=%v", + agent.logger.Debugf(context.Background(), "Agent identify is being changed from id=%v to id=%v", agent.instanceId.String(), instanceId.String()) agent.instanceId = instanceId @@ -246,7 +246,7 @@ func (agent *Agent) composeEffectiveConfig() *protobufs.EffectiveConfig { func (agent *Agent) initMeter(settings *protobufs.TelemetryConnectionSettings) { reporter, err := NewMetricReporter(agent.logger, settings, agent.agentType, agent.agentVersion, agent.instanceId) if err != nil { - agent.logger.Errorf("Cannot collect metrics: %v", err) + agent.logger.Errorf(context.Background(), "Cannot collect metrics: %v", err) return } @@ -287,7 +287,7 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (conf return false, nil } - agent.logger.Debugf("Received remote config from server, hash=%x.", config.ConfigHash) + agent.logger.Debugf(context.Background(), "Received remote config from server, hash=%x.", config.ConfigHash) // Begin with local config. We will later merge received configs on top of it. var k = koanf.New(".") @@ -341,7 +341,7 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (conf newEffectiveConfig := string(effectiveConfigBytes) configChanged = false if agent.effectiveConfig != newEffectiveConfig { - agent.logger.Debugf("Effective config changed. Need to report to server.") + agent.logger.Debugf(context.Background(), "Effective config changed. Need to report to server.") agent.effectiveConfig = newEffectiveConfig configChanged = true } @@ -350,7 +350,7 @@ func (agent *Agent) applyRemoteConfig(config *protobufs.AgentRemoteConfig) (conf } func (agent *Agent) Shutdown() { - agent.logger.Debugf("Agent shutting down...") + agent.logger.Debugf(context.Background(), "Agent shutting down...") if agent.opampClient != nil { _ = agent.opampClient.Stop(context.Background()) } @@ -370,7 +370,7 @@ func (agent *Agent) requestClientCertificate() { // Generate a keypair for new client cert. clientCertKeyPair, err := rsa.GenerateKey(cryptorand.Reader, 4096) if err != nil { - agent.logger.Errorf("Cannot generate keypair: %v", err) + agent.logger.Errorf(context.Background(), "Cannot generate keypair: %v", err) return } @@ -401,7 +401,7 @@ func (agent *Agent) requestClientCertificate() { derBytes, err := x509.CreateCertificateRequest(cryptorand.Reader, &template, clientCertKeyPair) if err != nil { - agent.logger.Errorf("Failed to create certificate request: %s", err) + agent.logger.Errorf(context.Background(), "Failed to create certificate request: %s", err) return } @@ -426,7 +426,7 @@ func (agent *Agent) requestClientCertificate() { }, ) if err != nil { - agent.logger.Errorf("Failed to send CSR to server: %s", err) + agent.logger.Errorf(context.Background(), "Failed to send CSR to server: %s", err) return } @@ -461,7 +461,7 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { if msg.AgentIdentification != nil { newInstanceId, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid) if err != nil { - agent.logger.Errorf(err.Error()) + agent.logger.Errorf(ctx, err.Error()) } agent.updateAgentIdentity(newInstanceId) } @@ -469,7 +469,7 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { if configChanged { err := agent.opampClient.UpdateEffectiveConfig(ctx) if err != nil { - agent.logger.Errorf(err.Error()) + agent.logger.Errorf(context.Background(), err.Error()) } } @@ -483,22 +483,22 @@ func (agent *Agent) onMessage(ctx context.Context, msg *types.MessageData) { agent.requestClientCertificate() } -func (agent *Agent) tryChangeOpAMPCert(cert *tls.Certificate) { - agent.logger.Debugf("Reconnecting to verify offered client certificate.\n") +func (agent *Agent) tryChangeOpAMPCert(ctx context.Context, cert *tls.Certificate) { + agent.logger.Debugf(ctx, "Reconnecting to verify offered client certificate.\n") agent.disconnect() agent.opampClientCert = cert if err := agent.connect(); err != nil { - agent.logger.Errorf("Cannot connect using offered certificate: %s. Ignoring the offer\n", err) + agent.logger.Errorf(ctx, "Cannot connect using offered certificate: %s. Ignoring the offer\n", err) agent.opampClientCert = nil if err := agent.connect(); err != nil { - agent.logger.Errorf("Unable to reconnect after restoring client certificate: %v\n", err) + agent.logger.Errorf(ctx, "Unable to reconnect after restoring client certificate: %v\n", err) } } - agent.logger.Debugf("Successfully connected to server. Accepting new client certificate.\n") + agent.logger.Debugf(ctx, "Successfully connected to server. Accepting new client certificate.\n") // TODO: we can also persist the successfully accepted certificate and use it when the // agent connects to the server after the restart. @@ -506,7 +506,7 @@ func (agent *Agent) tryChangeOpAMPCert(cert *tls.Certificate) { func (agent *Agent) onOpampConnectionSettings(ctx context.Context, settings *protobufs.OpAMPConnectionSettings) error { if settings == nil || settings.Certificate == nil { - agent.logger.Debugf("Received nil certificate offer, ignoring.\n") + agent.logger.Debugf(ctx, "Received nil certificate offer, ignoring.\n") return nil } @@ -516,7 +516,7 @@ func (agent *Agent) onOpampConnectionSettings(ctx context.Context, settings *pro } // TODO: also use settings.DestinationEndpoint and settings.Headers for future connections. - go agent.tryChangeOpAMPCert(cert) + go agent.tryChangeOpAMPCert(ctx, cert) return nil } @@ -548,7 +548,7 @@ func (agent *Agent) getCertFromSettings(certificate *protobufs.TLSCertificate) ( } if err != nil { - agent.logger.Errorf("Received invalid certificate offer: %s\n", err.Error()) + agent.logger.Errorf(context.Background(), "Received invalid certificate offer: %s\n", err.Error()) return nil, err } @@ -556,10 +556,10 @@ func (agent *Agent) getCertFromSettings(certificate *protobufs.TLSCertificate) ( caCertPB, _ := pem.Decode(certificate.CaPublicKey) caCert, err := x509.ParseCertificate(caCertPB.Bytes) if err != nil { - agent.logger.Errorf("Cannot parse CA cert: %v", err) + agent.logger.Errorf(context.Background(), "Cannot parse CA cert: %v", err) return nil, err } - agent.logger.Debugf("Received offer signed by CA: %v", caCert.Subject) + agent.logger.Debugf(context.Background(), "Received offer signed by CA: %v", caCert.Subject) // TODO: we can verify the CA's identity here (to match our CA as we know it). } diff --git a/internal/examples/agent/agent/logger.go b/internal/examples/agent/agent/logger.go index d161f342..2e5c5ef5 100644 --- a/internal/examples/agent/agent/logger.go +++ b/internal/examples/agent/agent/logger.go @@ -1,15 +1,22 @@ package agent -import "log" +import ( + "context" + "log" + + "github.com/open-telemetry/opamp-go/client/types" +) + +var _ types.Logger = &Logger{} type Logger struct { Logger *log.Logger } -func (l *Logger) Debugf(format string, v ...interface{}) { +func (l *Logger) Debugf(_ context.Context, format string, v ...interface{}) { l.Logger.Printf(format, v...) } -func (l *Logger) Errorf(format string, v ...interface{}) { +func (l *Logger) Errorf(_ context.Context, format string, v ...interface{}) { l.Logger.Printf(format, v...) } diff --git a/internal/examples/agent/agent/metricreporter.go b/internal/examples/agent/agent/metricreporter.go index 524e8dc5..74e06a07 100644 --- a/internal/examples/agent/agent/metricreporter.go +++ b/internal/examples/agent/agent/metricreporter.go @@ -148,10 +148,10 @@ func NewMetricReporter( return reporter, nil } -func (reporter *MetricReporter) processCpuTimeFunc(_ context.Context, result metric.Float64ObserverResult) { +func (reporter *MetricReporter) processCpuTimeFunc(ctx context.Context, result metric.Float64ObserverResult) { times, err := reporter.process.Times() if err != nil { - reporter.logger.Errorf("Cannot get process CPU times: %v", err) + reporter.logger.Errorf(ctx, "Cannot get process CPU times: %v", err) } // Report process CPU times, but also add some randomness to make it interesting for demo. @@ -160,10 +160,10 @@ func (reporter *MetricReporter) processCpuTimeFunc(_ context.Context, result met result.Observe(math.Min(times.Iowait+rand.Float64(), 1), attribute.String("state", "wait")) } -func (reporter *MetricReporter) processMemoryPhysicalFunc(_ context.Context, result metric.Int64ObserverResult) { +func (reporter *MetricReporter) processMemoryPhysicalFunc(ctx context.Context, result metric.Int64ObserverResult) { memory, err := reporter.process.MemoryInfo() if err != nil { - reporter.logger.Errorf("Cannot get process memory information: %v", err) + reporter.logger.Errorf(ctx, "Cannot get process memory information: %v", err) return } diff --git a/internal/examples/server/opampsrv/logger.go b/internal/examples/server/opampsrv/logger.go index 270fdaa1..d65ad9f4 100644 --- a/internal/examples/server/opampsrv/logger.go +++ b/internal/examples/server/opampsrv/logger.go @@ -1,15 +1,22 @@ package opampsrv -import "log" +import ( + "context" + "log" + + "github.com/open-telemetry/opamp-go/client/types" +) + +var _ types.Logger = &Logger{} type Logger struct { logger *log.Logger } -func (l *Logger) Debugf(format string, v ...interface{}) { +func (l *Logger) Debugf(ctx context.Context, format string, v ...interface{}) { l.logger.Printf(format, v...) } -func (l *Logger) Errorf(format string, v ...interface{}) { +func (l *Logger) Errorf(ctx context.Context, format string, v ...interface{}) { l.logger.Printf(format, v...) } diff --git a/internal/examples/server/opampsrv/opampsrv.go b/internal/examples/server/opampsrv/opampsrv.go index 070541ff..f2945d70 100644 --- a/internal/examples/server/opampsrv/opampsrv.go +++ b/internal/examples/server/opampsrv/opampsrv.go @@ -61,12 +61,12 @@ func (srv *Server) Start() { "../../certs/server_certs/server.key.pem", ) if err != nil { - srv.logger.Debugf("Could not load TLS config, working without TLS: %v", err.Error()) + srv.logger.Debugf(context.Background(), "Could not load TLS config, working without TLS: %v", err.Error()) } settings.TLSConfig = tlsConfig if err := srv.opampSrv.Start(settings); err != nil { - srv.logger.Errorf("OpAMP server start fail: %v", err.Error()) + srv.logger.Errorf(context.Background(), "OpAMP server start fail: %v", err.Error()) os.Exit(1) } } diff --git a/internal/examples/supervisor/main.go b/internal/examples/supervisor/main.go index 67f89d4f..998c8262 100644 --- a/internal/examples/supervisor/main.go +++ b/internal/examples/supervisor/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "log" "os" "os/signal" @@ -12,7 +13,7 @@ func main() { logger := &supervisor.Logger{Logger: log.Default()} supervisor, err := supervisor.NewSupervisor(logger) if err != nil { - logger.Errorf(err.Error()) + logger.Errorf(context.Background(), err.Error()) os.Exit(-1) return } diff --git a/internal/examples/supervisor/supervisor/commander/commander.go b/internal/examples/supervisor/supervisor/commander/commander.go index 1d7dc13f..bd2f1f69 100644 --- a/internal/examples/supervisor/supervisor/commander/commander.go +++ b/internal/examples/supervisor/supervisor/commander/commander.go @@ -41,7 +41,7 @@ func NewCommander(logger types.Logger, cfg *config.Agent, args ...string) (*Comm // Start the Agent and begin watching the process. // Agent's stdout and stderr are written to a file. func (c *Commander) Start(ctx context.Context) error { - c.logger.Debugf("Starting agent %s", c.cfg.Executable) + c.logger.Debugf(ctx, "Starting agent %s", c.cfg.Executable) logFilePath := "agent.log" logFile, err := os.Create(logFilePath) @@ -62,7 +62,7 @@ func (c *Commander) Start(ctx context.Context) error { return err } - c.logger.Debugf("Agent process started, PID=%d", c.cmd.Process.Pid) + c.logger.Debugf(ctx, "Agent process started, PID=%d", c.cmd.Process.Pid) atomic.StoreInt64(&c.running, 1) go c.watch() @@ -121,7 +121,7 @@ func (c *Commander) Stop(ctx context.Context) error { return nil } - c.logger.Debugf("Stopping agent process, PID=%v", c.cmd.Process.Pid) + c.logger.Debugf(ctx, "Stopping agent process, PID=%v", c.cmd.Process.Pid) // Gracefully signal process to stop. if err := c.cmd.Process.Signal(syscall.SIGTERM); err != nil { @@ -143,12 +143,12 @@ func (c *Commander) Stop(ctx context.Context) error { break case <-finished: // Process is successfully finished. - c.logger.Debugf("Agent process PID=%v successfully stopped.", c.cmd.Process.Pid) + c.logger.Debugf(ctx, "Agent process PID=%v successfully stopped.", c.cmd.Process.Pid) return } // Time is out. Kill the process. - c.logger.Debugf( + c.logger.Debugf(ctx, "Agent process PID=%d is not responding to SIGTERM. Sending SIGKILL to kill forcedly.", c.cmd.Process.Pid) if innerErr = c.cmd.Process.Signal(syscall.SIGKILL); innerErr != nil { diff --git a/internal/examples/supervisor/supervisor/logger.go b/internal/examples/supervisor/supervisor/logger.go index 5bf6e11f..a9ba75d6 100644 --- a/internal/examples/supervisor/supervisor/logger.go +++ b/internal/examples/supervisor/supervisor/logger.go @@ -1,15 +1,22 @@ package supervisor -import "log" +import ( + "context" + "log" + + "github.com/open-telemetry/opamp-go/client/types" +) + +var _ types.Logger = &Logger{} type Logger struct { Logger *log.Logger } -func (l *Logger) Debugf(format string, v ...interface{}) { +func (l *Logger) Debugf(ctx context.Context, format string, v ...interface{}) { l.Logger.Printf(format, v...) } -func (l *Logger) Errorf(format string, v ...interface{}) { +func (l *Logger) Errorf(ctx context.Context, format string, v ...interface{}) { l.Logger.Printf(format, v...) } diff --git a/internal/examples/supervisor/supervisor/supervisor.go b/internal/examples/supervisor/supervisor/supervisor.go index 489a611f..c51b3e6b 100644 --- a/internal/examples/supervisor/supervisor/supervisor.go +++ b/internal/examples/supervisor/supervisor/supervisor.go @@ -90,7 +90,7 @@ func NewSupervisor(logger types.Logger) (*Supervisor, error) { } s.createInstanceId() - logger.Debugf("Supervisor starting, id=%v, type=%s, version=%s.", + logger.Debugf(context.Background(), "Supervisor starting, id=%v, type=%s, version=%s.", s.instanceId.String(), agentType, agentVersion) s.loadAgentEffectiveConfig() @@ -141,13 +141,13 @@ func (s *Supervisor) startOpAMP() error { InstanceUid: s.instanceId.String(), Callbacks: types.CallbacksStruct{ OnConnectFunc: func() { - s.logger.Debugf("Connected to the server.") + s.logger.Debugf(context.Background(), "Connected to the server.") }, OnConnectFailedFunc: func(err error) { - s.logger.Errorf("Failed to connect to the server: %v", err) + s.logger.Errorf(context.Background(), "Failed to connect to the server: %v", err) }, OnErrorFunc: func(err *protobufs.ServerErrorResponse) { - s.logger.Errorf("Server returned an error response: %v", err.ErrorMessage) + s.logger.Errorf(context.Background(), "Server returned an error response: %v", err.ErrorMessage) }, GetEffectiveConfigFunc: func(ctx context.Context) (*protobufs.EffectiveConfig, error) { return s.createEffectiveConfigMsg(), nil @@ -170,14 +170,14 @@ func (s *Supervisor) startOpAMP() error { return err } - s.logger.Debugf("Starting OpAMP client...") + s.logger.Debugf(context.Background(), "Starting OpAMP client...") err = s.opampClient.Start(context.Background(), settings) if err != nil { return err } - s.logger.Debugf("OpAMP Client started.") + s.logger.Debugf(context.Background(), "OpAMP Client started.") return nil } @@ -283,10 +283,10 @@ func (s *Supervisor) setupOwnMetrics(ctx context.Context, settings *protobufs.Te var cfg string if settings.DestinationEndpoint == "" { // No destination. Disable metric collection. - s.logger.Debugf("Disabling own metrics pipeline in the config") + s.logger.Debugf(ctx, "Disabling own metrics pipeline in the config") cfg = "" } else { - s.logger.Debugf("Enabling own metrics pipeline in the config") + s.logger.Debugf(ctx, "Enabling own metrics pipeline in the config") // TODO: choose the scraping port dynamically instead of hard-coding to 8888. cfg = fmt.Sprintf( @@ -387,7 +387,7 @@ func (s *Supervisor) composeEffectiveConfig(config *protobufs.AgentRemoteConfig) newEffectiveConfig := string(effectiveConfigBytes) configChanged = false if s.effectiveConfig.Load().(string) != newEffectiveConfig { - s.logger.Debugf("Effective config changed.") + s.logger.Debugf(context.Background(), "Effective config changed.") s.effectiveConfig.Store(newEffectiveConfig) configChanged = true } @@ -401,7 +401,7 @@ func (s *Supervisor) recalcEffectiveConfig() (configChanged bool, err error) { configChanged, err = s.composeEffectiveConfig(s.remoteConfig) if err != nil { - s.logger.Errorf("Error composing effective config. Ignoring received config: %v", err) + s.logger.Errorf(context.Background(), "Error composing effective config. Ignoring received config: %v", err) return configChanged, err } @@ -412,7 +412,7 @@ func (s *Supervisor) startAgent() { err := s.commander.Start(context.Background()) if err != nil { errMsg := fmt.Sprintf("Cannot start the agent: %v", err) - s.logger.Errorf(errMsg) + s.logger.Errorf(context.Background(), errMsg) s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: errMsg}) return } @@ -455,15 +455,15 @@ func (s *Supervisor) healthCheck() { if err != nil { health.Healthy = false health.LastError = err.Error() - s.logger.Errorf("Agent is not healthy: %s", health.LastError) + s.logger.Errorf(ctx, "Agent is not healthy: %s", health.LastError) } else { health.Healthy = true - s.logger.Debugf("Agent is healthy.") + s.logger.Debugf(ctx, "Agent is healthy.") } // Report via OpAMP. if err2 := s.opampClient.SetHealth(health); err2 != nil { - s.logger.Errorf("Could not report health. SetHealth returned: %v", err2) + s.logger.Errorf(ctx, "Could not report health. SetHealth returned: %v", err2) return } @@ -496,7 +496,7 @@ func (s *Supervisor) runAgentProcess() { "Agent process PID=%d exited unexpectedly, exit code=%d. Will restart in a bit...", s.commander.Pid(), s.commander.ExitCode(), ) - s.logger.Debugf(errMsg) + s.logger.Debugf(context.Background(), errMsg) s.opampClient.SetHealth(&protobufs.ComponentHealth{Healthy: false, LastError: errMsg}) // TODO: decide why the agent stopped. If it was due to bad config, report it to server. @@ -515,7 +515,7 @@ func (s *Supervisor) runAgentProcess() { } func (s *Supervisor) stopAgentApplyConfig() { - s.logger.Debugf("Stopping the agent to apply new config.") + s.logger.Debugf(context.Background(), "Stopping the agent to apply new config.") cfg := s.effectiveConfig.Load().(string) s.commander.Stop(context.Background()) s.writeEffectiveConfigToFile(cfg, s.effectiveConfigFilePath) @@ -524,7 +524,7 @@ func (s *Supervisor) stopAgentApplyConfig() { func (s *Supervisor) writeEffectiveConfigToFile(cfg string, filePath string) { f, err := os.Create(filePath) if err != nil { - s.logger.Errorf("Cannot write effective config file: %v", err) + s.logger.Errorf(context.Background(), "Cannot write effective config file: %v", err) } defer f.Close() @@ -532,7 +532,7 @@ func (s *Supervisor) writeEffectiveConfigToFile(cfg string, filePath string) { } func (s *Supervisor) Shutdown() { - s.logger.Debugf("Supervisor shutting down...") + s.logger.Debugf(context.Background(), "Supervisor shutting down...") if s.commander != nil { s.commander.Stop(context.Background()) } @@ -550,7 +550,7 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { configChanged := false if msg.RemoteConfig != nil { s.remoteConfig = msg.RemoteConfig - s.logger.Debugf("Received remote config from server, hash=%x.", s.remoteConfig.ConfigHash) + s.logger.Debugf(ctx, "Received remote config from server, hash=%x.", s.remoteConfig.ConfigHash) var err error configChanged, err = s.recalcEffectiveConfig() @@ -575,10 +575,10 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { if msg.AgentIdentification != nil { newInstanceId, err := ulid.Parse(msg.AgentIdentification.NewInstanceUid) if err != nil { - s.logger.Errorf(err.Error()) + s.logger.Errorf(ctx, err.Error()) } - s.logger.Debugf("Agent identify is being changed from id=%v to id=%v", + s.logger.Debugf(ctx, "Agent identify is being changed from id=%v to id=%v", s.instanceId.String(), newInstanceId.String()) s.instanceId = newInstanceId @@ -590,10 +590,10 @@ func (s *Supervisor) onMessage(ctx context.Context, msg *types.MessageData) { if configChanged { err := s.opampClient.UpdateEffectiveConfig(ctx) if err != nil { - s.logger.Errorf(err.Error()) + s.logger.Errorf(ctx, err.Error()) } - s.logger.Debugf("Config is changed. Signal to restart the agent.") + s.logger.Debugf(ctx, "Config is changed. Signal to restart the agent.") // Signal that there is a new config. select { case s.hasNewConfig <- struct{}{}: diff --git a/internal/noplogger.go b/internal/noplogger.go index a575c5d1..a2b2ea27 100644 --- a/internal/noplogger.go +++ b/internal/noplogger.go @@ -1,6 +1,14 @@ package internal +import ( + "context" + + "github.com/open-telemetry/opamp-go/client/types" +) + +var _ types.Logger = &NopLogger{} + type NopLogger struct{} -func (l *NopLogger) Debugf(format string, v ...interface{}) {} -func (l *NopLogger) Errorf(format string, v ...interface{}) {} +func (l *NopLogger) Debugf(ctx context.Context, format string, v ...interface{}) {} +func (l *NopLogger) Errorf(ctx context.Context, format string, v ...interface{}) {} diff --git a/server/serverimpl.go b/server/serverimpl.go index 27a95874..7eef0664 100644 --- a/server/serverimpl.go +++ b/server/serverimpl.go @@ -130,7 +130,7 @@ func (s *server) startHttpServer(listenAddr string, serveFunc func(l net.Listene // ErrServerClosed is expected after successful Stop(), so we won't log that // particular error. if err != nil && err != http.ErrServerClosed { - s.logger.Errorf("Error running HTTP Server: %v", err) + s.logger.Errorf(context.Background(), "Error running HTTP Server: %v", err) } }() @@ -179,7 +179,7 @@ func (s *server) httpHandler(w http.ResponseWriter, req *http.Request) { // No, it is a WebSocket. Upgrade it. conn, err := s.wsUpgrader.Upgrade(w, req, nil) if err != nil { - s.logger.Errorf("Cannot upgrade HTTP connection to WebSocket: %v", err) + s.logger.Errorf(context.Background(), "Cannot upgrade HTTP connection to WebSocket: %v", err) return } @@ -196,7 +196,7 @@ func (s *server) handleWSConnection(wsConn *websocket.Conn, connectionCallbacks defer func() { err := wsConn.Close() if err != nil { - s.logger.Errorf("error closing the WebSocket connection: %v", err) + s.logger.Errorf(context.Background(), "error closing the WebSocket connection: %v", err) } }() @@ -215,15 +215,15 @@ func (s *server) handleWSConnection(wsConn *websocket.Conn, connectionCallbacks mt, bytes, err := wsConn.ReadMessage() if err != nil { if !websocket.IsUnexpectedCloseError(err) { - s.logger.Errorf("Cannot read a message from WebSocket: %v", err) + s.logger.Errorf(context.Background(), "Cannot read a message from WebSocket: %v", err) break } // This is a normal closing of the WebSocket connection. - s.logger.Debugf("Agent disconnected: %v", err) + s.logger.Debugf(context.Background(), "Agent disconnected: %v", err) break } if mt != websocket.BinaryMessage { - s.logger.Errorf("Received unexpected message type from WebSocket: %v", mt) + s.logger.Errorf(context.Background(), "Received unexpected message type from WebSocket: %v", mt) continue } @@ -231,7 +231,7 @@ func (s *server) handleWSConnection(wsConn *websocket.Conn, connectionCallbacks var request protobufs.AgentToServer err = internal.DecodeWSMessage(bytes, &request) if err != nil { - s.logger.Errorf("Cannot decode message from WebSocket: %v", err) + s.logger.Errorf(context.Background(), "Cannot decode message from WebSocket: %v", err) continue } @@ -242,7 +242,7 @@ func (s *server) handleWSConnection(wsConn *websocket.Conn, connectionCallbacks } err = agentConn.Send(context.Background(), response) if err != nil { - s.logger.Errorf("Cannot send message to WebSocket: %v", err) + s.logger.Errorf(context.Background(), "Cannot send message to WebSocket: %v", err) } } } @@ -288,7 +288,7 @@ func compressGzip(data []byte) ([]byte, error) { func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter, connectionCallbacks serverTypes.ConnectionCallbacks) { bytes, err := s.readReqBody(req) if err != nil { - s.logger.Debugf("Cannot read HTTP body: %v", err) + s.logger.Debugf(context.Background(), "Cannot read HTTP body: %v", err) w.WriteHeader(http.StatusBadRequest) return } @@ -297,7 +297,7 @@ func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter var request protobufs.AgentToServer err = proto.Unmarshal(bytes, &request) if err != nil { - s.logger.Debugf("Cannot decode message from HTTP Body: %v", err) + s.logger.Debugf(context.Background(), "Cannot decode message from HTTP Body: %v", err) w.WriteHeader(http.StatusBadRequest) return } @@ -340,7 +340,7 @@ func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter if req.Header.Get(headerAcceptEncoding) == contentEncodingGzip { bytes, err = compressGzip(bytes) if err != nil { - s.logger.Errorf("Cannot compress response: %v", err) + s.logger.Errorf(context.Background(), "Cannot compress response: %v", err) w.WriteHeader(http.StatusInternalServerError) return } @@ -349,6 +349,6 @@ func (s *server) handlePlainHTTPRequest(req *http.Request, w http.ResponseWriter _, err = w.Write(bytes) if err != nil { - s.logger.Debugf("Cannot send HTTP response: %v", err) + s.logger.Debugf(context.Background(), "Cannot send HTTP response: %v", err) } }