Skip to content

Commit

Permalink
Fail fast when bundle and discovery plugins faced with unrecoverable …
Browse files Browse the repository at this point in the history
…errors.

Signed-off-by: Pushpalanka Jayawardhana <[email protected]>
  • Loading branch information
Pushpalanka committed Oct 11, 2024
1 parent 42517c6 commit 07ebb8c
Showing 1 changed file with 85 additions and 44 deletions.
129 changes: 85 additions & 44 deletions filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,61 +481,78 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes []
// Start asynchronously starts the policy engine's plugins that download
// policies, report status, etc.
func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error {

discoveryPlugin := discovery.Lookup(opa.manager)
bundlePlugin := bundle.Lookup(opa.manager)

done := make(chan struct{})
defer close(done)
failed := make(chan error)
defer close(failed)
failed := make(chan error, 1)

discoveryPlugin.RegisterListener("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...))
}
var registerDiscoveryListenerOnce sync.Once
registerDiscoveryListenerOnce.Do(func() {
discoveryPlugin.RegisterListener("skipper-instance-startup-discovery", func(status bundle.Status) {
handleStatusErrors(status, failed, "discovery plugin")
})
})

bundlePlugin.Register("startuplistener", func(status bundle.Status) {
if len(status.Errors) > 0 {
failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...))
defer discoveryPlugin.Unregister("skipper-instance-startup-discovery")

// Register listener for "bundle" status
var registerBundleListenerOnce sync.Once
opa.manager.RegisterPluginStatusListener("skipper-instance-startup-plugin", func(status map[string]*plugins.Status) {
if _, exists := status["bundle"]; exists {
bundlePlugin := bundle.Lookup(opa.manager)
if bundlePlugin != nil {
registerBundleListenerOnce.Do(func() {
bundlePlugin.Register("skipper-instance-startup-bundle", func(status bundle.Status) {
handleStatusErrors(status, failed, "bundle plugin")
})
})
defer bundlePlugin.Unregister("skipper-instance-startup-bundle")
}
}
})
defer bundlePlugin.Unregister("startuplistener")
defer opa.manager.UnregisterPluginStatusListener("skipper-instance-startup-plugin")

opa.manager.RegisterPluginStatusListener("startuplistener", func(status map[string]*plugins.Status) {
for _, pluginstatus := range status {
if pluginstatus != nil && pluginstatus.State != plugins.StateOK {
// Register listener for general plugin status checks
opa.manager.RegisterPluginStatusListener("generalStartUpListener", func(status map[string]*plugins.Status) {
for _, pluginStatus := range status {
if pluginStatus != nil && pluginStatus.State != plugins.StateOK {
return
}
}
close(done)
})
defer opa.manager.UnregisterPluginStatusListener("startuplistener")
defer opa.manager.UnregisterPluginStatusListener("generalStartUpListener")

err := opa.manager.Start(ctx)
if err != nil {
return err
}

select {
case <-done:
return nil
case err := <-failed:
opa.Close(ctx)

return err
case <-ctx.Done():
timeoutErr := ctx.Err()

for pluginName, status := range opa.manager.PluginStatus() {
if status != nil && status.State != plugins.StateOK {
opa.Logger().WithFields(map[string]interface{}{
"plugin_name": pluginName,
"plugin_state": status.State,
"error_message": status.Message,
}).Error("Open policy agent plugin did not start in time")
}).Error("Open policy agent plugin: %v did not start in time", pluginName)
}
}
opa.Close(ctx)
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err)
if timeoutErr != nil {
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, timeoutErr)
}
return fmt.Errorf("one or more open policy agent plugins failed to start in %v", timeout)

case <-done:
return nil
case err := <-failed:
opa.Close(ctx)

return err
}
}

Expand All @@ -546,25 +563,6 @@ func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) {
})
}

func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error {
if fun() {
return nil
}
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out while starting: %w", ctx.Err())
case <-ticker.C:
if fun() {
return nil
}
}
}
}

func configLabelsInfo(opaConfig config.Config) func(*plugins.Manager) {
info := ast.NewObject()
labels := ast.NewObject()
Expand Down Expand Up @@ -821,3 +819,46 @@ func (l *QuietLogger) Error(fmt string, a ...interface{}) {
func (l *QuietLogger) Warn(fmt string, a ...interface{}) {
l.target.Warn(fmt, a)
}

var temporaryClientErrorHTTPCodes = map[int64]struct{}{
429: {}, // too many requests
408: {}, // request timeout
}

func isTemporaryError(code int64) bool {
_, exists := temporaryClientErrorHTTPCodes[code]
return exists
}

func handleStatusErrors(
status bundle.Status,
failed chan error,
prefix string,
) {
if status.Code == "bundle_error" {
if status.HTTPCode == "" {
failed <- formatStatusError(prefix, status)
return
}
code, err := status.HTTPCode.Int64()
if err == nil {
if code >= 400 && code < 500 && !isTemporaryError(code) {
// Fail for error codes in the range 400-500 excluding temporary errors
failed <- formatStatusError(prefix, status)
return
} else if code >= 500 {
// Do not fail for 5xx errors and keep retrying
return
}
}
if err != nil {
failed <- formatStatusError(prefix, status)
return
}
}
}

func formatStatusError(prefix string, status bundle.Status) error {
return fmt.Errorf("%s failed: Name: %s, Code: %s, Message: %s, HTTPCode: %s, Errors: %v",
prefix, status.Name, status.Code, status.Message, status.HTTPCode, status.Errors)
}

0 comments on commit 07ebb8c

Please sign in to comment.