diff --git a/agent/backend/pktvisor/policy.go b/agent/backend/pktvisor/policy.go index 5e4a3dd6a..7e7eef0fc 100644 --- a/agent/backend/pktvisor/policy.go +++ b/agent/backend/pktvisor/policy.go @@ -8,6 +8,7 @@ import ( "bytes" "fmt" "net/http" + "strings" "github.com/orb-community/orb/agent/policies" "go.uber.org/zap" @@ -60,8 +61,11 @@ func (p *pktvisorBackend) RemovePolicy(data policies.PolicyData) error { } else { name = data.Name } - err := p.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", RemovePolicyTimeout) - if err != nil { + if err := p.request(fmt.Sprintf("policies/%s", name), &resp, http.MethodDelete, http.NoBody, "application/json", RemovePolicyTimeout); err != nil { + if strings.Contains(err.Error(), "404") { + p.logger.Warn("ignoring error from removing a policy not found", zap.String("policy_id", data.ID), zap.String("policy_name", name)) + return nil + } return err } return nil diff --git a/agent/comms.go b/agent/comms.go index 448c19527..e2d2aecd3 100644 --- a/agent/comms.go +++ b/agent/comms.go @@ -8,6 +8,7 @@ import ( "context" "crypto/tls" "fmt" + "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" @@ -27,8 +28,13 @@ func (a *orbAgent) connect(ctx context.Context, config config.MQTTConfig) (mqtt. }) opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { a.logger.Error("connection to mqtt lost", zap.Error(err)) + // If it is a bug on the mqttclient, stop the agent + if strings.Contains(err.Error(), "BUG") { + a.Stop(ctx) + return + } a.logger.Info("reconnecting....") - client.Connect() + a.requestReconnection(ctx, a.client, config) }) opts.SetPingTimeout(5 * time.Second) opts.SetAutoReconnect(false) diff --git a/agent/otel/otlpmqttexporter/factory.go b/agent/otel/otlpmqttexporter/factory.go index 57f609c78..5c460c4fc 100644 --- a/agent/otel/otlpmqttexporter/factory.go +++ b/agent/otel/otlpmqttexporter/factory.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/trace/noop" "github.com/orb-community/orb/agent/otel" "go.uber.org/zap" @@ -13,7 +14,6 @@ import ( "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" - "go.opentelemetry.io/otel/trace" ) const ( @@ -55,7 +55,7 @@ func CreateDefaultSettings(logger *zap.Logger) exporter.CreateSettings { return exporter.CreateSettings{ TelemetrySettings: component.TelemetrySettings{ Logger: logger, - TracerProvider: trace.NewNoopTracerProvider(), + TracerProvider: noop.NewTracerProvider(), MeterProvider: metric.NewMeterProvider(), }, BuildInfo: component.NewDefaultBuildInfo(), @@ -134,7 +134,8 @@ func CreateMetricsExporter( // explicitly disable since we rely on http.Client timeout logic. exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), exporterhelper.WithRetry(oCfg.RetrySettings), - exporterhelper.WithQueue(oCfg.QueueSettings)) + exporterhelper.WithQueue(oCfg.QueueSettings), + exporterhelper.WithShutdown(oce.shutdown)) } func CreateLogsExporter( diff --git a/agent/otel/otlpmqttexporter/otlp.go b/agent/otel/otlpmqttexporter/otlp.go index f2f0cb9f0..beedd373b 100644 --- a/agent/otel/otlpmqttexporter/otlp.go +++ b/agent/otel/otlpmqttexporter/otlp.go @@ -196,6 +196,13 @@ func (e *baseExporter) injectScopeLogsAttribute(logsScope plog.ScopeLogs, attrib return logsScope } +func (e *baseExporter) shutdown(_ context.Context) error { + if e.config.Client != nil && (*e.config.Client).IsConnected() { + (*e.config.Client).Disconnect(0) + } + return nil +} + func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error { tr := plogotlp.NewExportRequest() ref := tr.Logs().ResourceLogs().AppendEmpty()