Skip to content

Commit 98148f8

Browse files
wisinghenwickramasin
and
nwickramasin
authored
OPA: Improve opa startup delay handling (zalando#2633)
* Improve opa startup delay handling Signed-off-by: nwickramasin <[email protected]> * Cleanup test file Signed-off-by: nwickramasin <[email protected]> * Add clearer messages Signed-off-by: nwickramasin <[email protected]> * Fix OpenPolicyAgentCleanerInterval description Signed-off-by: nwickramasin <[email protected]> * Refactor and fix check validations Signed-off-by: nwickramasin <[email protected]> * Refactoring error message Signed-off-by: nwickramasin <[email protected]> * Set default timeout in config initialization Signed-off-by: nwickramasin <[email protected]> * Fix Test_NewConfigWithArgs for the new flag Signed-off-by: nwickramasin <[email protected]> * Revert go sum changes Signed-off-by: nwickramasin <[email protected]> * Use context with timeout Signed-off-by: nwickramasin <[email protected]> * Introduce instance stopped property Signed-off-by: nwickramasin <[email protected]> * Fix closing Signed-off-by: nwickramasin <[email protected]> --------- Signed-off-by: nwickramasin <[email protected]> Co-authored-by: nwickramasin <[email protected]>
1 parent 40f4634 commit 98148f8

File tree

6 files changed

+200
-34
lines changed

6 files changed

+200
-34
lines changed

config/config.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ type Config struct {
282282
OpenPolicyAgentConfigTemplate string `yaml:"open-policy-agent-config-template"`
283283
OpenPolicyAgentEnvoyMetadata string `yaml:"open-policy-agent-envoy-metadata"`
284284
OpenPolicyAgentCleanerInterval time.Duration `yaml:"open-policy-agent-cleaner-interval"`
285+
OpenPolicyAgentStartupTimeout time.Duration `yaml:"open-policy-agent-startup-timeout"`
285286
}
286287

287288
const (
@@ -498,7 +499,8 @@ func NewConfig() *Config {
498499
flag.BoolVar(&cfg.EnableOpenPolicyAgent, "enable-open-policy-agent", false, "enables Open Policy Agent filters")
499500
flag.StringVar(&cfg.OpenPolicyAgentConfigTemplate, "open-policy-agent-config-template", "", "file containing a template for an Open Policy Agent configuration file that is interpolated for each OPA filter instance")
500501
flag.StringVar(&cfg.OpenPolicyAgentEnvoyMetadata, "open-policy-agent-envoy-metadata", "", "JSON file containing meta-data passed as input for compatibility with Envoy policies in the format")
501-
flag.DurationVar(&cfg.OpenPolicyAgentCleanerInterval, "open-policy-agent-cleaner-interval", openpolicyagent.DefaultCleanIdlePeriod, "JSON file containing meta-data passed as input for compatibility with Envoy policies in the format")
502+
flag.DurationVar(&cfg.OpenPolicyAgentCleanerInterval, "open-policy-agent-cleaner-interval", openpolicyagent.DefaultCleanerInterval, "Duration in seconds to wait before cleaning up unused opa instances")
503+
flag.DurationVar(&cfg.OpenPolicyAgentStartupTimeout, "open-policy-agent-startup-timeout", openpolicyagent.DefaultOpaStartupTimeout, "Maximum duration in seconds to wait for the open policy agent to start up")
502504

503505
// TLS client certs
504506
flag.StringVar(&cfg.ClientKeyFile, "client-tls-key", "", "TLS Key file for backend connections, multiple keys may be given comma separated - the order must match the certs")
@@ -903,6 +905,7 @@ func (c *Config) ToOptions() skipper.Options {
903905
OpenPolicyAgentConfigTemplate: c.OpenPolicyAgentConfigTemplate,
904906
OpenPolicyAgentEnvoyMetadata: c.OpenPolicyAgentEnvoyMetadata,
905907
OpenPolicyAgentCleanerInterval: c.OpenPolicyAgentCleanerInterval,
908+
OpenPolicyAgentStartupTimeout: c.OpenPolicyAgentStartupTimeout,
906909
}
907910
for _, rcci := range c.CloneRoute {
908911
eskipClone := eskip.NewClone(rcci.Reg, rcci.Repl)

config/config_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func defaultConfig() *Config {
161161
LuaModules: commaListFlag(),
162162
LuaSources: commaListFlag(),
163163
OpenPolicyAgentCleanerInterval: 10 * time.Second,
164+
OpenPolicyAgentStartupTimeout: 30 * time.Second,
164165
}
165166
}
166167

filters/openpolicyagent/openpolicyagent.go

+63-26
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/open-policy-agent/opa/storage/inmem"
2424
iCache "github.com/open-policy-agent/opa/topdown/cache"
2525
opatracing "github.com/open-policy-agent/opa/tracing"
26-
opautil "github.com/open-policy-agent/opa/util"
2726
"github.com/opentracing/opentracing-go"
2827
"google.golang.org/protobuf/encoding/protojson"
2928

@@ -36,7 +35,8 @@ import (
3635
const (
3736
defaultReuseDuration = 30 * time.Second
3837
defaultShutdownGracePeriod = 30 * time.Second
39-
DefaultCleanIdlePeriod = 10 * time.Second
38+
DefaultCleanerInterval = 10 * time.Second
39+
DefaultOpaStartupTimeout = 30 * time.Second
4040
)
4141

4242
type OpenPolicyAgentRegistry struct {
@@ -78,7 +78,7 @@ func WithCleanInterval(interval time.Duration) func(*OpenPolicyAgentRegistry) er
7878
func NewOpenPolicyAgentRegistry(opts ...func(*OpenPolicyAgentRegistry) error) *OpenPolicyAgentRegistry {
7979
registry := &OpenPolicyAgentRegistry{
8080
reuseDuration: defaultReuseDuration,
81-
cleanInterval: DefaultCleanIdlePeriod,
81+
cleanInterval: DefaultCleanerInterval,
8282
instances: make(map[string]*OpenPolicyAgentInstance),
8383
lastused: make(map[*OpenPolicyAgentInstance]time.Time),
8484
quit: make(chan struct{}),
@@ -96,6 +96,7 @@ func NewOpenPolicyAgentRegistry(opts ...func(*OpenPolicyAgentRegistry) error) *O
9696
type OpenPolicyAgentInstanceConfig struct {
9797
envoyMetadata *ext_authz_v3_core.Metadata
9898
configTemplate []byte
99+
startupTimeout time.Duration
99100
}
100101

101102
func WithConfigTemplate(configTemplate []byte) func(*OpenPolicyAgentInstanceConfig) error {
@@ -144,12 +145,21 @@ func WithEnvoyMetadataFile(file string) func(*OpenPolicyAgentInstanceConfig) err
144145
}
145146
}
146147

148+
func WithStartupTimeout(timeout time.Duration) func(*OpenPolicyAgentInstanceConfig) error {
149+
return func(cfg *OpenPolicyAgentInstanceConfig) error {
150+
cfg.startupTimeout = timeout
151+
return nil
152+
}
153+
}
154+
147155
func (cfg *OpenPolicyAgentInstanceConfig) GetEnvoyMetadata() *ext_authz_v3_core.Metadata {
148156
return cfg.envoyMetadata
149157
}
150158

151159
func NewOpenPolicyAgentConfig(opts ...func(*OpenPolicyAgentInstanceConfig) error) (*OpenPolicyAgentInstanceConfig, error) {
152-
cfg := OpenPolicyAgentInstanceConfig{}
160+
cfg := OpenPolicyAgentInstanceConfig{
161+
startupTimeout: DefaultOpaStartupTimeout,
162+
}
153163

154164
for _, opt := range opts {
155165
if err := opt(&cfg); err != nil {
@@ -285,14 +295,10 @@ func (registry *OpenPolicyAgentRegistry) newOpenPolicyAgentInstance(bundleName s
285295
return nil, err
286296
}
287297

288-
ctx := context.Background()
289-
if err = engine.Start(ctx); err != nil {
290-
return nil, err
291-
}
298+
ctx, cancel := context.WithTimeout(context.Background(), config.startupTimeout)
299+
defer cancel()
292300

293-
err = engine.waitPluginsReady(100*time.Millisecond, 30*time.Second)
294-
if err != nil {
295-
engine.Logger().WithFields(map[string]interface{}{"err": err}).Error("Failed to wait for plugins activation.")
301+
if err = engine.Start(ctx, config.startupTimeout); err != nil {
296302
return nil, err
297303
}
298304

@@ -308,6 +314,7 @@ type OpenPolicyAgentInstance struct {
308314
preparedQueryDoOnce *sync.Once
309315
interQueryBuiltinCache iCache.InterQueryCache
310316
once sync.Once
317+
stopped bool
311318
}
312319

313320
func envVariablesMap() map[string]string {
@@ -343,7 +350,6 @@ func interpolateConfigTemplate(configTemplate []byte, bundleName string) ([]byte
343350
// New returns a new OPA object.
344351
func New(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgentInstanceConfig, filterName string, bundleName string) (*OpenPolicyAgentInstance, error) {
345352
id := uuid.New().String()
346-
347353
opaConfig, err := config.ParseConfig(configBytes, id)
348354
if err != nil {
349355
return nil, err
@@ -382,19 +388,11 @@ func New(store storage.Store, configBytes []byte, instanceConfig OpenPolicyAgent
382388

383389
// Start asynchronously starts the policy engine's plugins that download
384390
// policies, report status, etc.
385-
func (opa *OpenPolicyAgentInstance) Start(ctx context.Context) error {
386-
return opa.manager.Start(ctx)
387-
}
391+
func (opa *OpenPolicyAgentInstance) Start(ctx context.Context, timeout time.Duration) error {
392+
err := opa.manager.Start(ctx)
388393

389-
func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) {
390-
opa.once.Do(func() {
391-
opa.manager.Stop(ctx)
392-
})
393-
}
394-
395-
func (opa *OpenPolicyAgentInstance) waitPluginsReady(checkInterval, timeout time.Duration) error {
396-
if timeout <= 0 {
397-
return nil
394+
if err != nil {
395+
return err
398396
}
399397

400398
// check readiness of all plugins
@@ -407,9 +405,48 @@ func (opa *OpenPolicyAgentInstance) waitPluginsReady(checkInterval, timeout time
407405
return true
408406
}
409407

410-
opa.Logger().Debug("Waiting for plugins activation (%v).", timeout)
408+
err = waitFunc(ctx, pluginsReady, 100*time.Millisecond)
411409

412-
return opautil.WaitFunc(pluginsReady, checkInterval, timeout)
410+
if err != nil {
411+
for pluginName, status := range opa.manager.PluginStatus() {
412+
if status != nil && status.State != plugins.StateOK {
413+
opa.Logger().WithFields(map[string]interface{}{
414+
"plugin_name": pluginName,
415+
"plugin_state": status.State,
416+
"error_message": status.Message,
417+
}).Error("Open policy agent plugin did not start in time")
418+
}
419+
}
420+
opa.Close(ctx)
421+
return fmt.Errorf("one or more open policy agent plugins failed to start in %v with error: %w", timeout, err)
422+
}
423+
return nil
424+
}
425+
426+
func (opa *OpenPolicyAgentInstance) Close(ctx context.Context) {
427+
opa.once.Do(func() {
428+
opa.manager.Stop(ctx)
429+
opa.stopped = true
430+
})
431+
}
432+
433+
func waitFunc(ctx context.Context, fun func() bool, interval time.Duration) error {
434+
if fun() {
435+
return nil
436+
}
437+
ticker := time.NewTicker(interval)
438+
defer ticker.Stop()
439+
440+
for {
441+
select {
442+
case <-ctx.Done():
443+
return fmt.Errorf("timed out while starting: %w", ctx.Err())
444+
case <-ticker.C:
445+
if fun() {
446+
return nil
447+
}
448+
}
449+
}
413450
}
414451

415452
func (opa *OpenPolicyAgentInstance) InstanceConfig() *OpenPolicyAgentInstanceConfig {

filters/openpolicyagent/openpolicyagent_test.go

+127-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"fmt"
7+
"github.com/open-policy-agent/opa/storage/inmem"
78
"io"
89
"net/http"
910
"os"
@@ -89,7 +90,49 @@ func TestLoadEnvoyMetadata(t *testing.T) {
8990
assert.Equal(t, expected, cfg.envoyMetadata)
9091
}
9192

92-
func mockControlPlane() (*opasdktest.Server, []byte) {
93+
func mockControlPlaneWithDiscoveryBundle(discoveryBundle string) (*opasdktest.Server, []byte) {
94+
opaControlPlane := opasdktest.MustNewServer(
95+
opasdktest.MockBundle("/bundles/test", map[string]string{
96+
"main.rego": `
97+
package envoy.authz
98+
99+
default allow = false
100+
`,
101+
}),
102+
opasdktest.MockBundle("/bundles/discovery", map[string]string{
103+
"data.json": `
104+
{"discovery":{"bundles":{"bundles/test":{"persist":false,"resource":"bundles/test","service":"test"}}}}
105+
`,
106+
}),
107+
opasdktest.MockBundle("/bundles/discovery-with-wrong-bundle", map[string]string{
108+
"data.json": `
109+
{"discovery":{"bundles":{"bundles/non-existing-bundle":{"persist":false,"resource":"bundles/non-existing-bundle","service":"test"}}}}
110+
`,
111+
}),
112+
opasdktest.MockBundle("/bundles/discovery-with-parsing-error", map[string]string{
113+
"data.json": `
114+
{unparsable : json}
115+
`,
116+
}),
117+
)
118+
119+
config := []byte(fmt.Sprintf(`{
120+
"services": {
121+
"test": {
122+
"url": %q
123+
}
124+
},
125+
"discovery": {
126+
"name": "discovery",
127+
"resource": %q,
128+
"service": "test"
129+
}
130+
}`, opaControlPlane.URL(), discoveryBundle))
131+
132+
return opaControlPlane, config
133+
}
134+
135+
func mockControlPlaneWithResourceBundle() (*opasdktest.Server, []byte) {
93136
opaControlPlane := opasdktest.MustNewServer(
94137
opasdktest.MockBundle("/bundles/test", map[string]string{
95138
"main.rego": `
@@ -130,7 +173,7 @@ func mockControlPlane() (*opasdktest.Server, []byte) {
130173
}
131174

132175
func TestRegistry(t *testing.T) {
133-
_, config := mockControlPlane()
176+
_, config := mockControlPlaneWithResourceBundle()
134177

135178
registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
136179

@@ -182,8 +225,86 @@ func TestRegistry(t *testing.T) {
182225
assert.Error(t, err, "should not work after close")
183226
}
184227

228+
func TestOpaEngineStartFailureWithTimeout(t *testing.T) {
229+
_, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery-with-wrong-bundle")
230+
231+
cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second))
232+
assert.NoError(t, err)
233+
234+
engine, err := New(inmem.New(), config, *cfg, "testfilter", "test")
235+
assert.NoError(t, err)
236+
237+
ctx, cancel := context.WithTimeout(context.Background(), cfg.startupTimeout)
238+
defer cancel()
239+
240+
err = engine.Start(ctx, cfg.startupTimeout)
241+
assert.True(t, engine.stopped)
242+
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s")
243+
}
244+
245+
func TestOpaActivationSuccessWithDiscovery(t *testing.T) {
246+
_, config := mockControlPlaneWithDiscoveryBundle("bundles/discovery")
247+
248+
registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
249+
250+
cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config))
251+
assert.NoError(t, err)
252+
253+
instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
254+
assert.NotNil(t, instance)
255+
assert.NoError(t, err)
256+
assert.Equal(t, 1, len(registry.instances))
257+
}
258+
259+
func TestOpaActivationFailureWithWrongServiceConfig(t *testing.T) {
260+
configWithUnknownService := []byte(`{
261+
"discovery": {
262+
"name": "discovery",
263+
"resource": "discovery",
264+
"service": "test"
265+
}}`)
266+
267+
registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
268+
269+
cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(configWithUnknownService), WithStartupTimeout(1*time.Second))
270+
assert.NoError(t, err)
271+
272+
instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
273+
assert.Nil(t, instance)
274+
assert.Contains(t, err.Error(), "invalid configuration for discovery")
275+
assert.Equal(t, 0, len(registry.instances))
276+
}
277+
278+
func TestOpaActivationTimeOutWithDiscoveryPointingWrongBundle(t *testing.T) {
279+
_, config := mockControlPlaneWithDiscoveryBundle("/bundles/discovery-with-wrong-bundle")
280+
281+
registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
282+
283+
cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second))
284+
assert.NoError(t, err)
285+
286+
instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
287+
assert.Nil(t, instance)
288+
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded")
289+
assert.Equal(t, 0, len(registry.instances))
290+
}
291+
292+
func TestOpaActivationTimeOutWithDiscoveryParsingError(t *testing.T) {
293+
_, config := mockControlPlaneWithDiscoveryBundle("/bundles/discovery-with-parsing-error")
294+
295+
registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
296+
297+
cfg, err := NewOpenPolicyAgentConfig(WithConfigTemplate(config), WithStartupTimeout(1*time.Second))
298+
assert.NoError(t, err)
299+
300+
instance, err := registry.NewOpenPolicyAgentInstance("test", *cfg, "testfilter")
301+
assert.Nil(t, instance)
302+
assert.Contains(t, err.Error(), "one or more open policy agent plugins failed to start in 1s with error: timed out while starting: context deadline exceeded")
303+
assert.Equal(t, 0, len(registry.instances))
304+
}
305+
185306
func TestStartup(t *testing.T) {
186-
_, config := mockControlPlane()
307+
_, config := mockControlPlaneWithResourceBundle()
187308

188309
registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
189310

@@ -199,7 +320,7 @@ func TestStartup(t *testing.T) {
199320
}
200321

201322
func TestTracing(t *testing.T) {
202-
_, config := mockControlPlane()
323+
_, config := mockControlPlaneWithResourceBundle()
203324

204325
registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
205326

@@ -222,7 +343,7 @@ func TestTracing(t *testing.T) {
222343
}
223344

224345
func TestEval(t *testing.T) {
225-
_, config := mockControlPlane()
346+
_, config := mockControlPlaneWithResourceBundle()
226347

227348
registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
228349

@@ -250,7 +371,7 @@ func TestEval(t *testing.T) {
250371
}
251372

252373
func TestResponses(t *testing.T) {
253-
_, config := mockControlPlane()
374+
_, config := mockControlPlaneWithResourceBundle()
254375

255376
registry := NewOpenPolicyAgentRegistry(WithReuseDuration(1*time.Second), WithCleanInterval(1*time.Second))
256377

go.mod

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ require (
7373
github.com/agnivade/levenshtein v1.1.1 // indirect
7474
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da // indirect
7575
github.com/beorn7/perks v1.0.1 // indirect
76+
github.com/biter777/processex v0.0.0-20210102170504-01bb369eda71 // indirect
7677
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect
7778
github.com/bytecodealliance/wasmtime-go/v3 v3.0.2 // indirect
7879
github.com/cenkalti/backoff/v4 v4.2.1 // indirect

0 commit comments

Comments
 (0)