From 07ebb8c56661063491a39632605f2094c319f729 Mon Sep 17 00:00:00 2001 From: Pushpalanka Jayawardhana Date: Tue, 2 Jul 2024 14:14:00 +0200 Subject: [PATCH] Fail fast when bundle and discovery plugins faced with unrecoverable errors. Signed-off-by: Pushpalanka Jayawardhana --- filters/openpolicyagent/openpolicyagent.go | 129 ++++++++++++++------- 1 file changed, 85 insertions(+), 44 deletions(-) diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 9280350096..2d77001fb2 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -481,36 +481,47 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes [] // Start asynchronously starts the policy engine's plugins that download // policies, report status, etc. func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error { + discoveryPlugin := discovery.Lookup(opa.manager) - bundlePlugin := bundle.Lookup(opa.manager) done := make(chan struct{}) - defer close(done) - failed := make(chan error) - defer close(failed) + failed := make(chan error, 1) - discoveryPlugin.RegisterListener("startuplistener", func(status bundle.Status) { - if len(status.Errors) > 0 { - failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...)) - } + var registerDiscoveryListenerOnce sync.Once + registerDiscoveryListenerOnce.Do(func() { + discoveryPlugin.RegisterListener("skipper-instance-startup-discovery", func(status bundle.Status) { + handleStatusErrors(status, failed, "discovery plugin") + }) }) - - bundlePlugin.Register("startuplistener", func(status bundle.Status) { - if len(status.Errors) > 0 { - failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...)) + defer discoveryPlugin.Unregister("skipper-instance-startup-discovery") + + // Register listener for "bundle" status + var registerBundleListenerOnce sync.Once + opa.manager.RegisterPluginStatusListener("skipper-instance-startup-plugin", func(status map[string]*plugins.Status) { + if _, exists := status["bundle"]; exists { + bundlePlugin := bundle.Lookup(opa.manager) + if bundlePlugin != nil { + registerBundleListenerOnce.Do(func() { + bundlePlugin.Register("skipper-instance-startup-bundle", func(status bundle.Status) { + handleStatusErrors(status, failed, "bundle plugin") + }) + }) + defer bundlePlugin.Unregister("skipper-instance-startup-bundle") + } } }) - defer bundlePlugin.Unregister("startuplistener") + defer opa.manager.UnregisterPluginStatusListener("skipper-instance-startup-plugin") - opa.manager.RegisterPluginStatusListener("startuplistener", func(status map[string]*plugins.Status) { - for _, pluginstatus := range status { - if pluginstatus != nil && pluginstatus.State != plugins.StateOK { + // Register listener for general plugin status checks + opa.manager.RegisterPluginStatusListener("generalStartUpListener", func(status map[string]*plugins.Status) { + for _, pluginStatus := range status { + if pluginStatus != nil && pluginStatus.State != plugins.StateOK { return } } close(done) }) - defer opa.manager.UnregisterPluginStatusListener("startuplistener") + defer opa.manager.UnregisterPluginStatusListener("generalStartUpListener") err := opa.manager.Start(ctx) if err != nil { @@ -518,24 +529,30 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura } select { - case <-done: - return nil - case err := <-failed: - opa.Close(ctx) - - return err case <-ctx.Done(): + timeoutErr := ctx.Err() + for pluginName, status := range opa.manager.PluginStatus() { if status != nil && status.State != plugins.StateOK { opa.Logger().WithFields(map[string]interface{}{ "plugin_name": pluginName, "plugin_state": status.State, "error_message": status.Message, - }).Error("Open policy agent plugin did not start in time") + }).Error("Open policy agent plugin: %v did not start in time", pluginName) } } opa.Close(ctx) - return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err) + if timeoutErr != nil { + return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, timeoutErr) + } + return fmt.Errorf("one or more open policy agent plugins failed to start in %v", timeout) + + case <-done: + return nil + case err := <-failed: + opa.Close(ctx) + + return err } } @@ -546,25 +563,6 @@ func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { }) } -func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error { - if fun() { - return nil - } - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return fmt.Errorf("timed out while starting: %w", ctx.Err()) - case <-ticker.C: - if fun() { - return nil - } - } - } -} - func configLabelsInfo(opaConfig config.Config) func(*plugins.Manager) { info := ast.NewObject() labels := ast.NewObject() @@ -821,3 +819,46 @@ func (l *QuietLogger) Error(fmt string, a ...interface{}) { func (l *QuietLogger) Warn(fmt string, a ...interface{}) { l.target.Warn(fmt, a) } + +var temporaryClientErrorHTTPCodes = map[int64]struct{}{ + 429: {}, // too many requests + 408: {}, // request timeout +} + +func isTemporaryError(code int64) bool { + _, exists := temporaryClientErrorHTTPCodes[code] + return exists +} + +func handleStatusErrors( + status bundle.Status, + failed chan error, + prefix string, +) { + if status.Code == "bundle_error" { + if status.HTTPCode == "" { + failed <- formatStatusError(prefix, status) + return + } + code, err := status.HTTPCode.Int64() + if err == nil { + if code >= 400 && code < 500 && !isTemporaryError(code) { + // Fail for error codes in the range 400-500 excluding temporary errors + failed <- formatStatusError(prefix, status) + return + } else if code >= 500 { + // Do not fail for 5xx errors and keep retrying + return + } + } + if err != nil { + failed <- formatStatusError(prefix, status) + return + } + } +} + +func formatStatusError(prefix string, status bundle.Status) error { + return fmt.Errorf("%s failed: Name: %s, Code: %s, Message: %s, HTTPCode: %s, Errors: %v", + prefix, status.Name, status.Code, status.Message, status.HTTPCode, status.Errors) +}