From b25cae46023c985e090e95e9cac5c653a973d820 Mon Sep 17 00:00:00 2001 From: Magnus Jungsbluth Date: Wed, 19 Jun 2024 16:23:36 +0200 Subject: [PATCH 1/6] OPA: Fail fast on discovery or bundle download errors Signed-off-by: Magnus Jungsbluth --- filters/openpolicyagent/openpolicyagent.go | 51 ++++++++++++++++------ 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 1024208ec4..9280350096 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -21,6 +21,7 @@ import ( "github.com/open-policy-agent/opa/config" "github.com/open-policy-agent/opa/logging" "github.com/open-policy-agent/opa/plugins" + "github.com/open-policy-agent/opa/plugins/bundle" "github.com/open-policy-agent/opa/plugins/discovery" "github.com/open-policy-agent/opa/rego" "github.com/open-policy-agent/opa/runtime" @@ -480,25 +481,50 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes [] // Start asynchronously starts the policy engine's plugins that download // policies, report status, etc. func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error { - err := opa.manager.Start(ctx) + discoveryPlugin := discovery.Lookup(opa.manager) + bundlePlugin := bundle.Lookup(opa.manager) - if err != nil { - return err - } + done := make(chan struct{}) + defer close(done) + failed := make(chan error) + defer close(failed) - // check readiness of all plugins - pluginsReady := func() bool { - for _, status := range opa.manager.PluginStatus() { - if status != nil && status.State != plugins.StateOK { - return false + discoveryPlugin.RegisterListener("startuplistener", func(status bundle.Status) { + if len(status.Errors) > 0 { + failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...)) + } + }) + + bundlePlugin.Register("startuplistener", func(status bundle.Status) { + if len(status.Errors) > 0 { + failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...)) + } + }) + defer bundlePlugin.Unregister("startuplistener") + + opa.manager.RegisterPluginStatusListener("startuplistener", func(status map[string]*plugins.Status) { + for _, pluginstatus := range status { + if pluginstatus != nil && pluginstatus.State != plugins.StateOK { + return } } - return true + close(done) + }) + defer opa.manager.UnregisterPluginStatusListener("startuplistener") + + err := opa.manager.Start(ctx) + if err != nil { + return err } - err = waitFunc(ctx, pluginsReady, 100*time.Millisecond) + select { + case <-done: + return nil + case err := <-failed: + opa.Close(ctx) - if err != nil { + return err + case <-ctx.Done(): for pluginName, status := range opa.manager.PluginStatus() { if status != nil && status.State != plugins.StateOK { opa.Logger().WithFields(map[string]interface{}{ @@ -511,7 +537,6 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura opa.Close(ctx) return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err) } - return nil } func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { From ed756e62225bdc3d6d0a3094d93b7b7a1c6f2c08 Mon Sep 17 00:00:00 2001 From: Pushpalanka Jayawardhana Date: Tue, 2 Jul 2024 14:14:00 +0200 Subject: [PATCH 2/6] Fail fast when bundle and discovery plugins faced with unrecoverable errors. Signed-off-by: Pushpalanka Jayawardhana --- filters/openpolicyagent/openpolicyagent.go | 129 ++++++++++----- .../openpolicyagent/openpolicyagent_test.go | 153 +++++++++++++++++- 2 files changed, 234 insertions(+), 48 deletions(-) diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 9280350096..2d77001fb2 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -481,36 +481,47 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes [] // Start asynchronously starts the policy engine's plugins that download // policies, report status, etc. func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error { + discoveryPlugin := discovery.Lookup(opa.manager) - bundlePlugin := bundle.Lookup(opa.manager) done := make(chan struct{}) - defer close(done) - failed := make(chan error) - defer close(failed) + failed := make(chan error, 1) - discoveryPlugin.RegisterListener("startuplistener", func(status bundle.Status) { - if len(status.Errors) > 0 { - failed <- fmt.Errorf("discovery download failed: %w", errors.Join(status.Errors...)) - } + var registerDiscoveryListenerOnce sync.Once + registerDiscoveryListenerOnce.Do(func() { + discoveryPlugin.RegisterListener("skipper-instance-startup-discovery", func(status bundle.Status) { + handleStatusErrors(status, failed, "discovery plugin") + }) }) - - bundlePlugin.Register("startuplistener", func(status bundle.Status) { - if len(status.Errors) > 0 { - failed <- fmt.Errorf("bundle activation failed: %w", errors.Join(status.Errors...)) + defer discoveryPlugin.Unregister("skipper-instance-startup-discovery") + + // Register listener for "bundle" status + var registerBundleListenerOnce sync.Once + opa.manager.RegisterPluginStatusListener("skipper-instance-startup-plugin", func(status map[string]*plugins.Status) { + if _, exists := status["bundle"]; exists { + bundlePlugin := bundle.Lookup(opa.manager) + if bundlePlugin != nil { + registerBundleListenerOnce.Do(func() { + bundlePlugin.Register("skipper-instance-startup-bundle", func(status bundle.Status) { + handleStatusErrors(status, failed, "bundle plugin") + }) + }) + defer bundlePlugin.Unregister("skipper-instance-startup-bundle") + } } }) - defer bundlePlugin.Unregister("startuplistener") + defer opa.manager.UnregisterPluginStatusListener("skipper-instance-startup-plugin") - opa.manager.RegisterPluginStatusListener("startuplistener", func(status map[string]*plugins.Status) { - for _, pluginstatus := range status { - if pluginstatus != nil && pluginstatus.State != plugins.StateOK { + // Register listener for general plugin status checks + opa.manager.RegisterPluginStatusListener("generalStartUpListener", func(status map[string]*plugins.Status) { + for _, pluginStatus := range status { + if pluginStatus != nil && pluginStatus.State != plugins.StateOK { return } } close(done) }) - defer opa.manager.UnregisterPluginStatusListener("startuplistener") + defer opa.manager.UnregisterPluginStatusListener("generalStartUpListener") err := opa.manager.Start(ctx) if err != nil { @@ -518,24 +529,30 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura } select { - case <-done: - return nil - case err := <-failed: - opa.Close(ctx) - - return err case <-ctx.Done(): + timeoutErr := ctx.Err() + for pluginName, status := range opa.manager.PluginStatus() { if status != nil && status.State != plugins.StateOK { opa.Logger().WithFields(map[string]interface{}{ "plugin_name": pluginName, "plugin_state": status.State, "error_message": status.Message, - }).Error("Open policy agent plugin did not start in time") + }).Error("Open policy agent plugin: %v did not start in time", pluginName) } } opa.Close(ctx) - return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err) + if timeoutErr != nil { + return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, timeoutErr) + } + return fmt.Errorf("one or more open policy agent plugins failed to start in %v", timeout) + + case <-done: + return nil + case err := <-failed: + opa.Close(ctx) + + return err } } @@ -546,25 +563,6 @@ func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) { }) } -func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error { - if fun() { - return nil - } - ticker := time.NewTicker(interval) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return fmt.Errorf("timed out while starting: %w", ctx.Err()) - case <-ticker.C: - if fun() { - return nil - } - } - } -} - func configLabelsInfo(opaConfig config.Config) func(*plugins.Manager) { info := ast.NewObject() labels := ast.NewObject() @@ -821,3 +819,46 @@ func (l *QuietLogger) Error(fmt string, a ...interface{}) { func (l *QuietLogger) Warn(fmt string, a ...interface{}) { l.target.Warn(fmt, a) } + +var temporaryClientErrorHTTPCodes = map[int64]struct{}{ + 429: {}, // too many requests + 408: {}, // request timeout +} + +func isTemporaryError(code int64) bool { + _, exists := temporaryClientErrorHTTPCodes[code] + return exists +} + +func handleStatusErrors( + status bundle.Status, + failed chan error, + prefix string, +) { + if status.Code == "bundle_error" { + if status.HTTPCode == "" { + failed <- formatStatusError(prefix, status) + return + } + code, err := status.HTTPCode.Int64() + if err == nil { + if code >= 400 && code < 500 && !isTemporaryError(code) { + // Fail for error codes in the range 400-500 excluding temporary errors + failed <- formatStatusError(prefix, status) + return + } else if code >= 500 { + // Do not fail for 5xx errors and keep retrying + return + } + } + if err != nil { + failed <- formatStatusError(prefix, status) + return + } + } +} + +func formatStatusError(prefix string, status bundle.Status) error { + return fmt.Errorf("%s failed: Name: %s, Code: %s, Message: %s, HTTPCode: %s, Errors: %v", + prefix, status.Name, status.Code, status.Message, status.HTTPCode, status.Errors) +} diff --git a/filters/openpolicyagent/openpolicyagent_test.go b/filters/openpolicyagent/openpolicyagent_test.go index 6c6dc7d923..d80cf82edf 100644 --- a/filters/openpolicyagent/openpolicyagent_test.go +++ b/filters/openpolicyagent/openpolicyagent_test.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "net/http" + "net/http/httptest" "os" "strconv" "testing" @@ -243,7 +244,7 @@ func TestRegistry(t *testing.T) { assert.Error(t, err, "should not work after close") } -func TestOpaEngineStartFailureWithTimeout(t *testing.T) { +func TestOpaEngineStartFailureWithWrongBundle(t *testing.T) { _, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery-with-wrong-bundle") registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) @@ -259,7 +260,7 @@ func TestOpaEngineStartFailureWithTimeout(t *testing.T) { err = engine.Start(ctx, cfg.startupTimeout) assert.True(t, engine.stopped) - assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s") + assert.Contains(t, err.Error(), "bundle plugin failed: Name: bundles/non-existing-bundle, Code: bundle_error, Message: server replied with Not Found, HTTPCode: 404") } func TestOpaActivationSuccessWithDiscovery(t *testing.T) { @@ -340,7 +341,7 @@ func TestOpaActivationTimeOutWithDiscoveryPointingWrongBundle(t *testing.T) { instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") assert.Nil(t, instance) - assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded") + assert.Contains(t, err.Error(), "bundle plugin failed: Name: bundles/non-existing-bundle, Code: bundle_error, Message: server replied with Not Found, HTTPCode: 404") assert.Equal(t, 0, len(registry.instances)) } @@ -354,7 +355,7 @@ func TestOpaActivationTimeOutWithDiscoveryParsingError(t *testing.T) { instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") assert.Nil(t, instance) - assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded") + assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded") assert.Equal(t, 0, len(registry.instances)) } @@ -703,3 +704,147 @@ func TestBodyExtractionUnknownBody(t *testing.T) { f1() f2() } + +func TestOpaActivationFailureWithInvalidDiscovery(t *testing.T) { + _, config := mockControlPlaneWithDiscoveryBundle("bundles/invalid-discovery") + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config)) + assert.NoError(t, err) + + _, err = registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.Error(t, err) + assert.Equal(t, "discovery plugin failed: Name: discovery, Code: bundle_error, Message: server replied with Not Found, HTTPCode: 404, Errors: []", err.Error()) +} + +func TestDiscoveryRetryErrors(t *testing.T) { + tests := []struct { + name string + statusCode int + expectedError string + }{ + {"TemporaryError429", http.StatusTooManyRequests, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"TemporaryError408", http.StatusRequestTimeout, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"Retry5xx", http.StatusInternalServerError, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"Failure403BundleError", http.StatusForbidden, "discovery plugin failed: Name: discovery, Code: bundle_error, Message: server replied with Forbidden, HTTPCode: 403"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testDiscoveryBundleError(t, tt.statusCode, tt.expectedError) + }) + } +} + +func TestResourceBundleRetryErrors(t *testing.T) { + tests := []struct { + name string + statusCode int + expectedError string + }{ + {"TemporaryError429", http.StatusTooManyRequests, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"TemporaryError408", http.StatusRequestTimeout, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"Retry5xx", http.StatusInternalServerError, "one or more open policy agent plugins failed to start in 1s with error: context deadline exceeded"}, + {"Failure403BundleError", http.StatusForbidden, "bundle plugin failed: Name: test, Code: bundle_error, Message: server replied with Forbidden, HTTPCode: 403"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + testResourceBundleError(t, tt.statusCode, tt.expectedError) + }) + } +} + +// Helper function to test discovery bundle temporary errors +func testDiscoveryBundleError(t *testing.T, statusCode int, expectedError string) { + mockDiscoveryBundleServer := mockBundleServerWithStatus("/bundles/discovery", statusCode) + defer mockDiscoveryBundleServer.Close() + + // Create the plugin configuration + config := []byte(fmt.Sprintf(`{ + "services": { + "test": { + "url": %q + } + }, + "labels": { + "environment": "envValue" + }, + "discovery": { + "name": "discovery", + "resource": "/bundles/discovery", + "service": "test" + } + }`, mockDiscoveryBundleServer.URL)) + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second)) + assert.NoError(t, err) + + instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.Nil(t, instance) + assert.Contains(t, err.Error(), expectedError) +} + +// Helper function to test resource bundle temporary errors +func testResourceBundleError(t *testing.T, statusCode int, expectedError string) { + mockServer := mockBundleServerWithStatus("/bundles/test.tgz", statusCode) + defer mockServer.Close() + + // Create the OPA control plane with the discovery bundle + opaControlPlane := opasdktest.MustNewServer( + opasdktest.MockBundle("/bundles/discovery", map[string]string{ + "data.json": fmt.Sprintf(`{ + "discovery": { + "bundles": { + "test": { + "persist": false, + "resource": "bundles/test.tgz", + "service": "styra-bundles" + } + }, + "services": { + "styra-bundles": { + "url": "%s" + } + } + } + }`, mockServer.URL), + }), + ) + + config := []byte(fmt.Sprintf(`{ + "services": { + "discovery": { + "url": %q + } + }, + "labels": { + "environment": "envValue" + }, + "discovery": { + "name": "discovery", + "resource": "/bundles/discovery", + "service": "discovery" + } + }`, opaControlPlane.URL())) + + registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second)) + + cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second)) + assert.NoError(t, err) + + instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter") + assert.Nil(t, instance) + assert.Contains(t, err.Error(), expectedError) +} + +func mockBundleServerWithStatus(path string, statusCode int) *httptest.Server { + handler := http.NewServeMux() + handler.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) { + http.Error(w, http.StatusText(statusCode), statusCode) + }) + return httptest.NewServer(handler) +} From 23df108e051f8dac4ace1977d7d19a5c6a93c31c Mon Sep 17 00:00:00 2001 From: Pushpalanka Jayawardhana Date: Wed, 16 Oct 2024 21:57:26 +0200 Subject: [PATCH 3/6] make sync.Once attached to OPA instance - addressing review comments Signed-off-by: Pushpalanka Jayawardhana --- filters/openpolicyagent/openpolicyagent.go | 33 ++++++++++++---------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 2d77001fb2..5c629da06a 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -364,16 +364,18 @@ func (registry *OpenPolicyAgentRegistry) newOpenPolicyAgentInstance(bundleName s } type OpenPolicyAgentInstance struct { - manager *plugins.Manager - instanceConfig OpenPolicyAgentInstanceConfig - opaConfig *config.Config - bundleName string - preparedQuery *rego.PreparedEvalQuery - preparedQueryDoOnce *sync.Once - interQueryBuiltinCache iCache.InterQueryCache - once sync.Once - stopped bool - registry *OpenPolicyAgentRegistry + manager *plugins.Manager + instanceConfig OpenPolicyAgentInstanceConfig + opaConfig *config.Config + bundleName string + preparedQuery *rego.PreparedEvalQuery + preparedQueryDoOnce *sync.Once + interQueryBuiltinCache iCache.InterQueryCache + once sync.Once + stopped bool + registry *OpenPolicyAgentRegistry + registerBundleListenerOnce *sync.Once + registerDiscoveryListenerOnce *sync.Once maxBodyBytes int64 bodyReadBufferSize int64 @@ -470,7 +472,9 @@ func (registry *OpenPolicyAgentRegistry) new(store storage.Store, configBytes [] preparedQueryDoOnce: new(sync.Once), interQueryBuiltinCache: iCache.NewInterQueryCache(manager.InterQueryBuiltinCacheConfig()), - idGenerator: uniqueIDGenerator, + idGenerator: uniqueIDGenerator, + registerDiscoveryListenerOnce: new(sync.Once), + registerBundleListenerOnce: new(sync.Once), } manager.RegisterCompilerTrigger(opa.compilerUpdated) @@ -487,8 +491,7 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura done := make(chan struct{}) failed := make(chan error, 1) - var registerDiscoveryListenerOnce sync.Once - registerDiscoveryListenerOnce.Do(func() { + opa.registerDiscoveryListenerOnce.Do(func() { discoveryPlugin.RegisterListener("skipper-instance-startup-discovery", func(status bundle.Status) { handleStatusErrors(status, failed, "discovery plugin") }) @@ -496,12 +499,12 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura defer discoveryPlugin.Unregister("skipper-instance-startup-discovery") // Register listener for "bundle" status - var registerBundleListenerOnce sync.Once + opa.manager.RegisterPluginStatusListener("skipper-instance-startup-plugin", func(status map[string]*plugins.Status) { if _, exists := status["bundle"]; exists { bundlePlugin := bundle.Lookup(opa.manager) if bundlePlugin != nil { - registerBundleListenerOnce.Do(func() { + opa.registerBundleListenerOnce.Do(func() { bundlePlugin.Register("skipper-instance-startup-bundle", func(status bundle.Status) { handleStatusErrors(status, failed, "bundle plugin") }) From 0b857aa477fa307f2eb86339ff34cda0c5a49b4e Mon Sep 17 00:00:00 2001 From: Pushpalanka Jayawardhana Date: Thu, 17 Oct 2024 10:10:06 +0200 Subject: [PATCH 4/6] temporily disable bundleplugin unregister to check premature unregistering Signed-off-by: Pushpalanka Jayawardhana --- filters/openpolicyagent/openpolicyagent.go | 24 +++++++++++++--------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 5c629da06a..27b6122f2e 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -51,6 +51,10 @@ const ( DefaultRequestBodyBufferSize = 8 * 1024 // 8 KB spanNameEval = "open-policy-agent" + + GeneralPluginStatusStartupListener = "general-plugin-status-startup" + DiscoveryPluginStartupListener = "skipper-instance-startup-discovery" + PluginStatusStartupListener = "skipper-instance-startup-plugin" ) type OpenPolicyAgentRegistry struct { @@ -492,31 +496,31 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura failed := make(chan error, 1) opa.registerDiscoveryListenerOnce.Do(func() { - discoveryPlugin.RegisterListener("skipper-instance-startup-discovery", func(status bundle.Status) { + + discoveryPlugin.RegisterListener(DiscoveryPluginStartupListener, func(status bundle.Status) { handleStatusErrors(status, failed, "discovery plugin") }) + //defer discoveryPlugin.Unregister(DiscoveryPluginStartupListener) //ToDo }) - defer discoveryPlugin.Unregister("skipper-instance-startup-discovery") - - // Register listener for "bundle" status - opa.manager.RegisterPluginStatusListener("skipper-instance-startup-plugin", func(status map[string]*plugins.Status) { + opa.manager.RegisterPluginStatusListener(PluginStatusStartupListener, func(status map[string]*plugins.Status) { if _, exists := status["bundle"]; exists { bundlePlugin := bundle.Lookup(opa.manager) if bundlePlugin != nil { opa.registerBundleListenerOnce.Do(func() { - bundlePlugin.Register("skipper-instance-startup-bundle", func(status bundle.Status) { + const BundlePluginStartupListener = "skipper-instance-startup-bundle" + bundlePlugin.Register(BundlePluginStartupListener, func(status bundle.Status) { handleStatusErrors(status, failed, "bundle plugin") + //defer bundlePlugin.Unregister(BundlePluginStartupListener) //ToDo }) }) - defer bundlePlugin.Unregister("skipper-instance-startup-bundle") } } }) - defer opa.manager.UnregisterPluginStatusListener("skipper-instance-startup-plugin") + defer opa.manager.UnregisterPluginStatusListener(PluginStatusStartupListener) // Register listener for general plugin status checks - opa.manager.RegisterPluginStatusListener("generalStartUpListener", func(status map[string]*plugins.Status) { + opa.manager.RegisterPluginStatusListener(GeneralPluginStatusStartupListener, func(status map[string]*plugins.Status) { for _, pluginStatus := range status { if pluginStatus != nil && pluginStatus.State != plugins.StateOK { return @@ -524,7 +528,7 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura } close(done) }) - defer opa.manager.UnregisterPluginStatusListener("generalStartUpListener") + defer opa.manager.UnregisterPluginStatusListener(GeneralPluginStatusStartupListener) err := opa.manager.Start(ctx) if err != nil { From 8b5740107d4814287673f7cd1a85a59c3cb30808 Mon Sep 17 00:00:00 2001 From: Pushpalanka Jayawardhana Date: Fri, 18 Oct 2024 21:30:39 +0200 Subject: [PATCH 5/6] unregister discovery and bundle listeners post successful startup Signed-off-by: Pushpalanka Jayawardhana --- filters/openpolicyagent/openpolicyagent.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 27b6122f2e..170708128d 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -55,6 +55,7 @@ const ( GeneralPluginStatusStartupListener = "general-plugin-status-startup" DiscoveryPluginStartupListener = "skipper-instance-startup-discovery" PluginStatusStartupListener = "skipper-instance-startup-plugin" + BundlePluginStartupListener = "skipper-instance-startup-bundle" ) type OpenPolicyAgentRegistry struct { @@ -500,7 +501,6 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura discoveryPlugin.RegisterListener(DiscoveryPluginStartupListener, func(status bundle.Status) { handleStatusErrors(status, failed, "discovery plugin") }) - //defer discoveryPlugin.Unregister(DiscoveryPluginStartupListener) //ToDo }) opa.manager.RegisterPluginStatusListener(PluginStatusStartupListener, func(status map[string]*plugins.Status) { @@ -508,10 +508,8 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura bundlePlugin := bundle.Lookup(opa.manager) if bundlePlugin != nil { opa.registerBundleListenerOnce.Do(func() { - const BundlePluginStartupListener = "skipper-instance-startup-bundle" bundlePlugin.Register(BundlePluginStartupListener, func(status bundle.Status) { handleStatusErrors(status, failed, "bundle plugin") - //defer bundlePlugin.Unregister(BundlePluginStartupListener) //ToDo }) }) } @@ -527,6 +525,13 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura } } close(done) + + //unregister discovery and bundle plugin listeners, post successful plugin startup + discoveryPlugin.Unregister(DiscoveryPluginStartupListener) + bundlePlugin := bundle.Lookup(opa.manager) + if bundlePlugin != nil { + bundlePlugin.Unregister(BundlePluginStartupListener) + } }) defer opa.manager.UnregisterPluginStatusListener(GeneralPluginStatusStartupListener) From b504ce0a6c0552776d794b8f704f8316372f4e89 Mon Sep 17 00:00:00 2001 From: Pushpalanka Jayawardhana Date: Sat, 19 Oct 2024 10:16:08 +0200 Subject: [PATCH 6/6] revert unregister discovery and bundle listeners post successful startup Signed-off-by: Pushpalanka Jayawardhana --- filters/openpolicyagent/openpolicyagent.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/filters/openpolicyagent/openpolicyagent.go b/filters/openpolicyagent/openpolicyagent.go index 170708128d..9e6536b32c 100644 --- a/filters/openpolicyagent/openpolicyagent.go +++ b/filters/openpolicyagent/openpolicyagent.go @@ -501,6 +501,7 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura discoveryPlugin.RegisterListener(DiscoveryPluginStartupListener, func(status bundle.Status) { handleStatusErrors(status, failed, "discovery plugin") }) + //defer discoveryPlugin.Unregister(DiscoveryPluginStartupListener) //ToDo }) opa.manager.RegisterPluginStatusListener(PluginStatusStartupListener, func(status map[string]*plugins.Status) { @@ -510,6 +511,7 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura opa.registerBundleListenerOnce.Do(func() { bundlePlugin.Register(BundlePluginStartupListener, func(status bundle.Status) { handleStatusErrors(status, failed, "bundle plugin") + //defer bundlePlugin.Unregister(BundlePluginStartupListener) //ToDo }) }) } @@ -525,13 +527,6 @@ func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Dura } } close(done) - - //unregister discovery and bundle plugin listeners, post successful plugin startup - discoveryPlugin.Unregister(DiscoveryPluginStartupListener) - bundlePlugin := bundle.Lookup(opa.manager) - if bundlePlugin != nil { - bundlePlugin.Unregister(BundlePluginStartupListener) - } }) defer opa.manager.UnregisterPluginStatusListener(GeneralPluginStatusStartupListener)