Skip to content

Commit

Permalink
OPA: Improve opa startup delay handling (#2633)
Browse files Browse the repository at this point in the history
* Improve opa startup delay handling

Signed-off-by: nwickramasin <[email protected]>

* Cleanup test file

Signed-off-by: nwickramasin <[email protected]>

* Add clearer messages

Signed-off-by: nwickramasin <[email protected]>

* Fix OpenPolicyAgentCleanerInterval description

Signed-off-by: nwickramasin <[email protected]>

* Refactor and fix check validations

Signed-off-by: nwickramasin <[email protected]>

* Refactoring error message

Signed-off-by: nwickramasin <[email protected]>

* Set default timeout in config initialization

Signed-off-by: nwickramasin <[email protected]>

* Fix Test_NewConfigWithArgs for the new flag

Signed-off-by: nwickramasin <[email protected]>

* Revert go sum changes

Signed-off-by: nwickramasin <[email protected]>

* Use context with timeout

Signed-off-by: nwickramasin <[email protected]>

* Introduce instance stopped property

Signed-off-by: nwickramasin <[email protected]>

* Fix closing

Signed-off-by: nwickramasin <[email protected]>

---------

Signed-off-by: nwickramasin <[email protected]>
Co-authored-by: nwickramasin <[email protected]>
  • Loading branch information
wisinghe and nwickramasin authored Oct 9, 2023
1 parent 40f4634 commit 98148f8
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 34 deletions.
5 changes: 4 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,7 @@ type Config struct {
OpenPolicyAgentConfigTemplate string `yaml:"open-policy-agent-config-template"`
OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"`
OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"`
OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"`
}

const (
Expand Down Expand Up @@ -498,7 +499,8 @@ func NewConfig() *Config {
flag.BoolVar(&cfg.EnableOpenPolicyAgent, "enable-open-policy-agent", false, "enables Open Policy Agent filters")
flag.StringVar(&cfg.OpenPolicyAgentConfigTemplate, "open-policy-agent-config-template", "", "file containing a template for an Open Policy Agent configuration file that is interpolated for each OPA filter instance")
flag.StringVar(&cfg.OpenPolicyAgentEnvoyMetadata, "open-policy-agent-envoy-metadata", "", "JSON file containing meta-data passed as input for compatibility with Envoy policies in the format")
flag.DurationVar(&cfg.OpenPolicyAgentCleanerInterval, "open-policy-agent-cleaner-interval", openpolicyagent.DefaultCleanIdlePeriod, "JSON file containing meta-data passed as input for compatibility with Envoy policies in the format")
flag.DurationVar(&cfg.OpenPolicyAgentCleanerInterval, "open-policy-agent-cleaner-interval", openpolicyagent.DefaultCleanerInterval, "Duration in seconds to wait before cleaning up unused opa instances")
flag.DurationVar(&cfg.OpenPolicyAgentStartupTimeout, "open-policy-agent-startup-timeout", openpolicyagent.DefaultOpaStartupTimeout, "Maximum duration in seconds to wait for the open policy agent to start up")

// TLS client certs
flag.StringVar(&cfg.ClientKeyFile, "client-tls-key", "", "TLS Key file for backend connections, multiple keys may be given comma separated - the order must match the certs")
Expand Down Expand Up @@ -903,6 +905,7 @@ func (c *Config) ToOptions() skipper.Options {
OpenPolicyAgentConfigTemplate: c.OpenPolicyAgentConfigTemplate,
OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata,
OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval,
OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout,
}
for _, rcci := range c.CloneRoute {
eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl)
Expand Down
1 change: 1 addition & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ func defaultConfig() *Config {
LuaModules: commaListFlag(),
LuaSources: commaListFlag(),
OpenPolicyAgentCleanerInterval: 10 * time.Second,
OpenPolicyAgentStartupTimeout: 30 * time.Second,
}
}

Expand Down
89 changes: 63 additions & 26 deletions filters/openpolicyagent/openpolicyagent.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/open-policy-agent/opa/storage/inmem"
iCache "github.com/open-policy-agent/opa/topdown/cache"
opatracing "github.com/open-policy-agent/opa/tracing"
opautil "github.com/open-policy-agent/opa/util"
"github.com/opentracing/opentracing-go"
"google.golang.org/protobuf/encoding/protojson"

Expand All @@ -36,7 +35,8 @@ import (
const (
defaultReuseDuration = 30 * time.Second
defaultShutdownGracePeriod = 30 * time.Second
DefaultCleanIdlePeriod = 10 * time.Second
DefaultCleanerInterval = 10 * time.Second
DefaultOpaStartupTimeout = 30 * time.Second
)

type OpenPolicyAgentRegistry struct {
Expand Down Expand Up @@ -78,7 +78,7 @@ func WithCleanInterval(interval time.Duration) func(*OpenPolicyAgentRegistry) er
func NewOpenPolicyAgentRegistry(opts ...func(*OpenPolicyAgentRegistry) error) *OpenPolicyAgentRegistry {
registry := &OpenPolicyAgentRegistry{
reuseDuration: defaultReuseDuration,
cleanInterval: DefaultCleanIdlePeriod,
cleanInterval: DefaultCleanerInterval,
instances: make(map[string]*OpenPolicyAgentInstance),
lastused: make(map[*OpenPolicyAgentInstance]time.Time),
quit: make(chan struct{}),
Expand All @@ -96,6 +96,7 @@ func NewOpenPolicyAgentRegistry(opts ...func(*OpenPolicyAgentRegistry) error) *O
type OpenPolicyAgentInstanceConfig struct {
envoyMetadata *ext_authz_v3_core.Metadata
configTemplate []byte
startupTimeout time.Duration
}

func WithConfigTemplate(configTemplate []byte) func(*OpenPolicyAgentInstanceConfig) error {
Expand Down Expand Up @@ -144,12 +145,21 @@ func WithEnvoyMetadataFile(file string) func(*OpenPolicyAgentInstanceConfig) err
}
}

func WithStartupTimeout(timeout time.Duration) func(*OpenPolicyAgentInstanceConfig) error {
return func(cfg *OpenPolicyAgentInstanceConfig) error {
cfg.startupTimeout = timeout
return nil
}
}

func (cfg *OpenPolicyAgentInstanceConfig) GetEnvoyMetadata() *ext_authz_v3_core.Metadata {
return cfg.envoyMetadata
}

func NewOpenPolicyAgentConfig(opts ...func(*OpenPolicyAgentInstanceConfig) error) (*OpenPolicyAgentInstanceConfig, error) {
cfg := OpenPolicyAgentInstanceConfig{}
cfg := OpenPolicyAgentInstanceConfig{
startupTimeout: DefaultOpaStartupTimeout,
}

for _, opt := range opts {
if err := opt(&cfg); err != nil {
Expand Down Expand Up @@ -285,14 +295,10 @@ func (registry *OpenPolicyAgentRegistry) newOpenPolicyAgentInstance(bundleName s
return nil, err
}

ctx := context.Background()
if err = engine.Start(ctx); err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), config.startupTimeout)
defer cancel()

err = engine.waitPluginsReady(100*time.Millisecond, 30*time.Second)
if err != nil {
engine.Logger().WithFields(map[string]interface{}{"err": err}).Error("Failed to wait for plugins activation.")
if err = engine.Start(ctx, config.startupTimeout); err != nil {
return nil, err
}

Expand All @@ -308,6 +314,7 @@ type OpenPolicyAgentInstance struct {
preparedQueryDoOnce *sync.Once
interQueryBuiltinCache iCache.InterQueryCache
once sync.Once
stopped bool
}

func envVariablesMap() map[string]string {
Expand Down Expand Up @@ -343,7 +350,6 @@ func interpolateConfigTemplate(configTemplate []byte, bundleName string) ([]byte
// New returns a new OPA object.
func New(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgentInstanceConfig, filterName string, bundleName string) (*OpenPolicyAgentInstance, error) {
id := uuid.New().String()

opaConfig, err := config.ParseConfig(configBytes, id)
if err != nil {
return nil, err
Expand Down Expand Up @@ -382,19 +388,11 @@ func New(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgent

// Start asynchronously starts the policy engine's plugins that download
// policies, report status, etc.
func (opa *OpenPolicyAgentInstance) Start(ctx context.Context) error {
return opa.manager.Start(ctx)
}
func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error {
err := opa.manager.Start(ctx)

func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) {
opa.once.Do(func() {
opa.manager.Stop(ctx)
})
}

func (opa *OpenPolicyAgentInstance) waitPluginsReady(checkInterval, timeout time.Duration) error {
if timeout <= 0 {
return nil
if err != nil {
return err
}

// check readiness of all plugins
Expand All @@ -407,9 +405,48 @@ func (opa *OpenPolicyAgentInstance) waitPluginsReady(checkInterval, timeout time
return true
}

opa.Logger().Debug("Waiting for plugins activation (%v).", timeout)
err = waitFunc(ctx, pluginsReady, 100*time.Millisecond)

return opautil.WaitFunc(pluginsReady, checkInterval, timeout)
if err != nil {
for pluginName, status := range opa.manager.PluginStatus() {
if status != nil && status.State != plugins.StateOK {
opa.Logger().WithFields(map[string]interface{}{
"plugin_name": pluginName,
"plugin_state": status.State,
"error_message": status.Message,
}).Error("Open policy agent plugin did not start in time")
}
}
opa.Close(ctx)
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err)
}
return nil
}

func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) {
opa.once.Do(func() {
opa.manager.Stop(ctx)
opa.stopped = true
})
}

func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error {
if fun() {
return nil
}
ticker := time.NewTicker(interval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return fmt.Errorf("timed out while starting: %w", ctx.Err())
case <-ticker.C:
if fun() {
return nil
}
}
}
}

func (opa *OpenPolicyAgentInstance) InstanceConfig() *OpenPolicyAgentInstanceConfig {
Expand Down
133 changes: 127 additions & 6 deletions filters/openpolicyagent/openpolicyagent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/open-policy-agent/opa/storage/inmem"
"io"
"net/http"
"os"
Expand Down Expand Up @@ -89,7 +90,49 @@ func TestLoadEnvoyMetadata(t *testing.T) {
assert.Equal(t, expected, cfg.envoyMetadata)
}

func mockControlPlane() (*opasdktest.Server, []byte) {
func mockControlPlaneWithDiscoveryBundle(discoveryBundle string) (*opasdktest.Server, []byte) {
opaControlPlane := opasdktest.MustNewServer(
opasdktest.MockBundle("/bundles/test", map[string]string{
"main.rego": `
package envoy.authz
default allow = false
`,
}),
opasdktest.MockBundle("/bundles/discovery", map[string]string{
"data.json": `
{"discovery":{"bundles":{"bundles/test":{"persist":false,"resource":"bundles/test","service":"test"}}}}
`,
}),
opasdktest.MockBundle("/bundles/discovery-with-wrong-bundle", map[string]string{
"data.json": `
{"discovery":{"bundles":{"bundles/non-existing-bundle":{"persist":false,"resource":"bundles/non-existing-bundle","service":"test"}}}}
`,
}),
opasdktest.MockBundle("/bundles/discovery-with-parsing-error", map[string]string{
"data.json": `
{unparsable : json}
`,
}),
)

config := []byte(fmt.Sprintf(`{
"services": {
"test": {
"url": %q
}
},
"discovery": {
"name": "discovery",
"resource": %q,
"service": "test"
}
}`, opaControlPlane.URL(), discoveryBundle))

return opaControlPlane, config
}

func mockControlPlaneWithResourceBundle() (*opasdktest.Server, []byte) {
opaControlPlane := opasdktest.MustNewServer(
opasdktest.MockBundle("/bundles/test", map[string]string{
"main.rego": `
Expand Down Expand Up @@ -130,7 +173,7 @@ func mockControlPlane() (*opasdktest.Server, []byte) {
}

func TestRegistry(t *testing.T) {
_, config := mockControlPlane()
_, config := mockControlPlaneWithResourceBundle()

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

Expand Down Expand Up @@ -182,8 +225,86 @@ func TestRegistry(t *testing.T) {
assert.Error(t, err, "should not work after close")
}

func TestOpaEngineStartFailureWithTimeout(t *testing.T) {
_, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery-with-wrong-bundle")

cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second))
assert.NoError(t, err)

engine, err := New(inmem.New(), config, *cfg, "testfilter", "test")
assert.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), cfg.startupTimeout)
defer cancel()

err = engine.Start(ctx, cfg.startupTimeout)
assert.True(t, engine.stopped)
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s")
}

func TestOpaActivationSuccessWithDiscovery(t *testing.T) {
_, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery")

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config))
assert.NoError(t, err)

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.NotNil(t, instance)
assert.NoError(t, err)
assert.Equal(t, 1, len(registry.instances))
}

func TestOpaActivationFailureWithWrongServiceConfig(t *testing.T) {
configWithUnknownService := []byte(`{
"discovery": {
"name": "discovery",
"resource": "discovery",
"service": "test"
}}`)

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(configWithUnknownService), WithStartupTimeout(1*time.Second))
assert.NoError(t, err)

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Nil(t, instance)
assert.Contains(t, err.Error(), "invalid configuration for discovery")
assert.Equal(t, 0, len(registry.instances))
}

func TestOpaActivationTimeOutWithDiscoveryPointingWrongBundle(t *testing.T) {
_, config := mockControlPlaneWithDiscoveryBundle("/bundles/discovery-with-wrong-bundle")

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second))
assert.NoError(t, err)

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Nil(t, instance)
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded")
assert.Equal(t, 0, len(registry.instances))
}

func TestOpaActivationTimeOutWithDiscoveryParsingError(t *testing.T) {
_, config := mockControlPlaneWithDiscoveryBundle("/bundles/discovery-with-parsing-error")

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second))
assert.NoError(t, err)

instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
assert.Nil(t, instance)
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded")
assert.Equal(t, 0, len(registry.instances))
}

func TestStartup(t *testing.T) {
_, config := mockControlPlane()
_, config := mockControlPlaneWithResourceBundle()

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

Expand All @@ -199,7 +320,7 @@ func TestStartup(t *testing.T) {
}

func TestTracing(t *testing.T) {
_, config := mockControlPlane()
_, config := mockControlPlaneWithResourceBundle()

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

Expand All @@ -222,7 +343,7 @@ func TestTracing(t *testing.T) {
}

func TestEval(t *testing.T) {
_, config := mockControlPlane()
_, config := mockControlPlaneWithResourceBundle()

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

Expand Down Expand Up @@ -250,7 +371,7 @@ func TestEval(t *testing.T) {
}

func TestResponses(t *testing.T) {
_, config := mockControlPlane()
_, config := mockControlPlaneWithResourceBundle()

registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ require (
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/biter777/processex v0.0.0-20210102170504-01bb369eda71 // indirect
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect
github.com/bytecodealliance/wasmtime-go/v3 v3.0.2 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
Expand Down
Loading

0 comments on commit 98148f8

Please sign in to comment.