diff --git a/pkg/data/generic.go b/pkg/data/generic.go index 266033fe5..2a805dadc 100644 --- a/pkg/data/generic.go +++ b/pkg/data/generic.go @@ -10,19 +10,22 @@ import ( "strings" "time" - "github.com/sirupsen/logrus" - - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - opa_client "github.com/open-policy-agent/kube-mgmt/pkg/opa" "github.com/open-policy-agent/kube-mgmt/pkg/types" + + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" ) // GenericSync replicates Kubernetes resources into OPA as raw JSON. @@ -69,7 +72,12 @@ func (s *GenericSync) Run() (chan struct{}, error) { } quit := make(chan struct{}) - go s.loop(quit) + ctx, cancel := context.WithCancel(context.Background()) + go func() { // propagate cancel signal from channel to context + <-quit + cancel() + }() + go s.RunContext(ctx) return quit, nil } @@ -79,167 +87,163 @@ func (s *GenericSync) RunContext(ctx context.Context) error { if s.createError != nil { return s.createError } - s.loop(ctx.Done()) + + store, queue := s.setup(ctx) + defer queue.ShutDown() + + go s.loop(ctx, store, queue) + <-ctx.Done() return nil } -func (s *GenericSync) loop(quit <-chan struct{}) { - - defer func() { - logrus.Infof("Sync for %v finished. Exiting.", s.ns) - }() +// setup the store and queue for this GenericSync instance +func (s *GenericSync) setup(ctx context.Context) (cache.Store, workqueue.DelayingInterface) { - resource := s.client.Resource(schema.GroupVersionResource{ + baseResource := s.client.Resource(schema.GroupVersionResource{ Group: s.ns.Group, Version: s.ns.Version, Resource: s.ns.Resource, }) + var resource dynamic.ResourceInterface = baseResource + if s.ns.Namespaced { + resource = baseResource.Namespace(metav1.NamespaceAll) + } - delay := backoffMin + queue := workqueue.NewNamedDelayingQueue(s.ns.String()) + store, controller := cache.NewInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return resource.List(ctx, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return resource.Watch(ctx, options) + }, + }, + &unstructured.Unstructured{}, + 0, + cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil { + var _ string = key // Tiny sanity check. The code depends on queue keys being strings. + queue.Add(key) + } + }, + UpdateFunc: func(oldObj, newObj interface{}) { + if newMeta, err := meta.Accessor(newObj); err == nil { + if oldMeta, err := meta.Accessor(oldObj); err == nil { + if newMeta.GetResourceVersion() == oldMeta.GetResourceVersion() { + return // Avoid sync flood on relist. We don't use resync. + } + } + } + if key, err := cache.MetaNamespaceKeyFunc(newObj); err == nil { + queue.Add(key) + } + }, + DeleteFunc: func(obj interface{}) { + if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil { + queue.Add(key) + } + }, + }, + ) + + start, quit := time.Now(), ctx.Done() + go controller.Run(quit) + for !cache.WaitForCacheSync(quit, controller.HasSynced) { + logrus.Warnf("Failed to sync cache for %v, retrying...", s.ns) + } + if controller.HasSynced() { + logrus.Infof("Initial informer sync for %v completed, took %v", s.ns, time.Since(start)) + } - for { + return store, queue +} - err := s.sync(resource, quit) - if err == nil { - return - } +const initPath = "" - switch err.(type) { +// loop starts replicating Kubernetes resources into OPA. If an error occurs +// during the replication process, this function will backoff and reload +// all resources into OPA from scratch. +func (s *GenericSync) loop(ctx context.Context, store cache.Store, queue workqueue.DelayingInterface) { - case errChannelClosed: - logrus.Infof("Sync channel for %v closed. Restarting immediately.", s.ns) - delay = backoffMin + logrus.Infof("Syncing %v.", s.ns) + defer func() { + logrus.Infof("Sync for %v finished. Exiting.", s.ns) + }() - case errOPA: - logrus.Errorf("Sync for %v failed due to OPA error. Trying again in %v. Reason: %v", s.ns, delay, err) - delay = backoffMin - t := time.NewTimer(delay) - select { - case <-t.C: - break - case <-quit: - return - } + rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(backoffMin, backoffMax) + var delay time.Duration + for !queue.ShuttingDown() { - case errKubernetes: - logrus.Errorf("Sync for %v failed due to Kubernetes error. Trying again in %v. Reason: %v", s.ns, delay, err) - delay *= 2 - if delay > backoffMax { - delay = backoffMax - } - t := time.NewTimer(delay) - select { - case <-t.C: - break - case <-quit: + queue.AddAfter(initPath, delay) // this special path will trigger a full load + syncDone := false // discard everything until initPath + + var err error + for err == nil { + key, shuttingDown := queue.Get() + if shuttingDown { return } + err = s.processNext(store, key, &syncDone) + if key == initPath && syncDone { + rateLimiter.Forget(initPath) + } + queue.Done(key) } - } -} - -type errKubernetes struct{ error } -type errOPA struct{ error } - -type errChannelClosed struct{} - -func (err errKubernetes) Unwrap() error { - return err.error -} - -func (err errOPA) Unwrap() error { - return err.error -} - -func (errChannelClosed) Error() string { - return "channel closed" + delay := rateLimiter.When(initPath) + logrus.Errorf("Sync for %v failed, trying again in %v. Reason: %v", s.ns, delay, err) + } } -// sync starts replicating Kubernetes resources into OPA. If an error occurs -// during the replication process this function returns and indicates whether -// the synchronizer should backoff. The synchronizer will backoff whenever the -// Kubernetes API returns an error. -func (s *GenericSync) sync(resource dynamic.NamespaceableResourceInterface, quit <-chan struct{}) error { - - logrus.Infof("Syncing %v.", s.ns) - tList := time.Now() - result, err := resource.List(context.TODO(), metav1.ListOptions{}) +func (s *GenericSync) processNext(store cache.Store, key interface{}, syncDone *bool) error { + path, err := objPath(key, s.ns.Namespaced) if err != nil { - return errKubernetes{fmt.Errorf("list: %w", err)} + return err } - dList := time.Since(tList) - resourceVersion := result.GetResourceVersion() - logrus.Infof("Listed %v and got %v resources with resourceVersion %v. Took %v.", s.ns, len(result.Items), resourceVersion, dList) - - tLoad := time.Now() - - if err := s.syncAll(result.Items); err != nil { - return errOPA{fmt.Errorf("reset: %w", err)} + // On receiving the initPath, load a full dump of the data store + if path == initPath { + start, list := time.Now(), store.List() + if err := s.syncAll(list); err != nil { + return err + } + logrus.Infof("Loaded %d resources of kind %v into OPA. Took %v", len(list), s.ns, time.Since(start)) + *syncDone = true // sync is now Done + return nil } - dLoad := time.Since(tLoad) - logrus.Infof("Loaded %v resources into OPA. Took %v. Starting watch at resourceVersion %v.", s.ns, dLoad, resourceVersion) + // Ignore updates queued before the initial load + if !*syncDone { + return nil + } - w, err := resource.Watch(context.TODO(), metav1.ListOptions{ - ResourceVersion: resourceVersion, - }) + obj, exists, err := store.Get(key) if err != nil { - return errKubernetes{fmt.Errorf("watch: %w", err)} + return fmt.Errorf("store error: %w", err) } - - defer w.Stop() - - ch := w.ResultChan() - - for { - select { - case evt := <-ch: - switch evt.Type { - case watch.Added: - err := s.syncAdd(evt.Object) - if err != nil { - return errOPA{fmt.Errorf("add event: %w", err)} - } - case watch.Modified: - err := s.syncAdd(evt.Object) - if err != nil { - return errOPA{fmt.Errorf("modify event: %w", err)} - } - case watch.Deleted: - err := s.syncRemove(evt.Object) - if err != nil { - return errOPA{fmt.Errorf("delete event: %w", err)} - } - case watch.Error: - return errKubernetes{fmt.Errorf("error event: %v", evt.Object)} - default: - return errChannelClosed{} - } - case <-quit: - return nil + if exists { + if err := s.syncAdd(path, obj); err != nil { + return fmt.Errorf("add event: %w", err) + } + } else { + if err := s.syncRemove(path); err != nil { + return fmt.Errorf("delete event: %w", err) } } + return nil } -func (s *GenericSync) syncAdd(obj runtime.Object) error { - path, err := objPath(obj, s.ns.Namespaced) - if err != nil { - return err - } +func (s *GenericSync) syncAdd(path string, obj interface{}) error { return s.opa.PutData(path, obj) } -func (s *GenericSync) syncRemove(obj runtime.Object) error { - path, err := objPath(obj, s.ns.Namespaced) - if err != nil { - return err - } +func (s *GenericSync) syncRemove(path string) error { return s.opa.PatchData(path, "remove", nil) } -func (s *GenericSync) syncAll(objs []unstructured.Unstructured) error { +func (s *GenericSync) syncAll(objs []interface{}) error { // Build a list of patches to apply. payload, err := generateSyncPayload(objs, s.ns.Namespaced) @@ -250,15 +254,19 @@ func (s *GenericSync) syncAll(objs []unstructured.Unstructured) error { return s.opa.PutData("/", payload) } -func generateSyncPayload(objs []unstructured.Unstructured, namespaced bool) (map[string]interface{}, error) { +func generateSyncPayload(objs []interface{}, namespaced bool) (map[string]interface{}, error) { combined := make(map[string]interface{}, len(objs)) for _, obj := range objs { - path, err := objPath(&obj, namespaced) + key, err := cache.MetaNamespaceKeyFunc(obj) + if err != nil { + return nil, err + } + path, err := objPath(key, namespaced) if err != nil { return nil, err } - // Ensure the path in thee map up to our value exists + // Ensure the path in the map up to our value exists // We make some assumptions about the paths that do exist // being the correct types due to the expected uniform // objPath's for each of the similar object types being @@ -273,23 +281,15 @@ func generateSyncPayload(objs []unstructured.Unstructured, namespaced bool) (map } dir = next.(map[string]interface{}) } - dir[segments[len(segments)-1]] = obj.Object + dir[segments[len(segments)-1]] = obj } return combined, nil } -func objPath(obj runtime.Object, namespaced bool) (string, error) { - m, err := meta.Accessor(obj) - if err != nil { - return "", err - } - name := m.GetName() - var path string - if namespaced { - path = m.GetNamespace() + "/" + name - } else { - path = name - } - return path, nil +// objPath transforms queue key into OPA path +func objPath(key interface{}, namespaced bool) (string, error) { + // OPA keys actually match current kubernetes cache key format + // ("namespace/name" or "name" if resource is not namespaced) + return key.(string), nil } diff --git a/pkg/data/generic_test.go b/pkg/data/generic_test.go index e663e3953..5c425274b 100644 --- a/pkg/data/generic_test.go +++ b/pkg/data/generic_test.go @@ -11,7 +11,6 @@ import ( "github.com/open-policy-agent/kube-mgmt/pkg/types" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/kubernetes/scheme" @@ -474,9 +473,9 @@ func TestGenericSync(t *testing.T) { func mustGenerateSyncPayload(t *testing.T, resourceType types.ResourceType, objs []runtime.Object) map[string]interface{} { t.Helper() - data := make([]unstructured.Unstructured, 0, len(objs)) + data := make([]interface{}, 0, len(objs)) for _, obj := range objs { - data = append(data, mustUnstructure(t, obj)) + data = append(data, obj) } patches, err := generateSyncPayload(data, resourceType.Namespaced) if err != nil { @@ -534,12 +533,3 @@ func mustJSONRoundTrip(t *testing.T, obj interface{}) map[string]interface{} { return out } - -func mustUnstructure(t *testing.T, obj runtime.Object) unstructured.Unstructured { - t.Helper() - current, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) - if err != nil { - t.Fatalf("Failed to convert runtime Object to unstructured: %v", err) - } - return unstructured.Unstructured{Object: current} -}