Skip to content

Commit

Permalink
Modified data sync to use an informer
Browse files Browse the repository at this point in the history
Signed-off-by: rg2011 <[email protected]>
  • Loading branch information
rg2011 committed Mar 10, 2022
1 parent c71de33 commit c421f3b
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 157 deletions.
290 changes: 145 additions & 145 deletions pkg/data/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@ import (
"strings"
"time"

"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"

opa_client "github.com/open-policy-agent/kube-mgmt/pkg/opa"
"github.com/open-policy-agent/kube-mgmt/pkg/types"

"github.com/sirupsen/logrus"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"

"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)

// GenericSync replicates Kubernetes resources into OPA as raw JSON.
Expand Down Expand Up @@ -69,7 +72,12 @@ func (s *GenericSync) Run() (chan struct{}, error) {
}

quit := make(chan struct{})
go s.loop(quit)
ctx, cancel := context.WithCancel(context.Background())
go func() { // propagate cancel signal from channel to context
<-quit
cancel()
}()
go s.RunContext(ctx)
return quit, nil
}

Expand All @@ -79,167 +87,163 @@ func (s *GenericSync) RunContext(ctx context.Context) error {
if s.createError != nil {
return s.createError
}
s.loop(ctx.Done())

store, queue := s.setup(ctx)
defer queue.ShutDown()

go s.loop(ctx, store, queue)
<-ctx.Done()
return nil
}

func (s *GenericSync) loop(quit <-chan struct{}) {

defer func() {
logrus.Infof("Sync for %v finished. Exiting.", s.ns)
}()
// setup the store and queue for this GenericSync instance
func (s *GenericSync) setup(ctx context.Context) (cache.Store, workqueue.DelayingInterface) {

resource := s.client.Resource(schema.GroupVersionResource{
baseResource := s.client.Resource(schema.GroupVersionResource{
Group: s.ns.Group,
Version: s.ns.Version,
Resource: s.ns.Resource,
})
var resource dynamic.ResourceInterface = baseResource
if s.ns.Namespaced {
resource = baseResource.Namespace(metav1.NamespaceAll)
}

delay := backoffMin
queue := workqueue.NewNamedDelayingQueue(s.ns.String())
store, controller := cache.NewInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return resource.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return resource.Watch(ctx, options)
},
},
&unstructured.Unstructured{},
0,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
var _ string = key // Tiny sanity check. The code depends on queue keys being strings.
queue.Add(key)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if newMeta, err := meta.Accessor(newObj); err == nil {
if oldMeta, err := meta.Accessor(oldObj); err == nil {
if newMeta.GetResourceVersion() == oldMeta.GetResourceVersion() {
return // Avoid sync flood on relist. We don't use resync.
}
}
}
if key, err := cache.MetaNamespaceKeyFunc(newObj); err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
queue.Add(key)
}
},
},
)

start, quit := time.Now(), ctx.Done()
go controller.Run(quit)
for !cache.WaitForCacheSync(quit, controller.HasSynced) {
logrus.Warnf("Failed to sync cache for %v, retrying...", s.ns)
}
if controller.HasSynced() {
logrus.Infof("Initial informer sync for %v completed, took %v", s.ns, time.Since(start))
}

for {
return store, queue
}

err := s.sync(resource, quit)
if err == nil {
return
}
const initPath = ""

switch err.(type) {
// loop starts replicating Kubernetes resources into OPA. If an error occurs
// during the replication process, this function will backoff and reload
// all resources into OPA from scratch.
func (s *GenericSync) loop(ctx context.Context, store cache.Store, queue workqueue.DelayingInterface) {

case errChannelClosed:
logrus.Infof("Sync channel for %v closed. Restarting immediately.", s.ns)
delay = backoffMin
logrus.Infof("Syncing %v.", s.ns)
defer func() {
logrus.Infof("Sync for %v finished. Exiting.", s.ns)
}()

case errOPA:
logrus.Errorf("Sync for %v failed due to OPA error. Trying again in %v. Reason: %v", s.ns, delay, err)
delay = backoffMin
t := time.NewTimer(delay)
select {
case <-t.C:
break
case <-quit:
return
}
rateLimiter := workqueue.NewItemExponentialFailureRateLimiter(backoffMin, backoffMax)
var delay time.Duration
for !queue.ShuttingDown() {

case errKubernetes:
logrus.Errorf("Sync for %v failed due to Kubernetes error. Trying again in %v. Reason: %v", s.ns, delay, err)
delay *= 2
if delay > backoffMax {
delay = backoffMax
}
t := time.NewTimer(delay)
select {
case <-t.C:
break
case <-quit:
queue.AddAfter(initPath, delay) // this special path will trigger a full load
syncDone := false // discard everything until initPath

var err error
for err == nil {
key, shuttingDown := queue.Get()
if shuttingDown {
return
}
err = s.processNext(store, key, &syncDone)
if key == initPath && syncDone {
rateLimiter.Forget(initPath)
}
queue.Done(key)
}
}
}

type errKubernetes struct{ error }

type errOPA struct{ error }

type errChannelClosed struct{}

func (err errKubernetes) Unwrap() error {
return err.error
}

func (err errOPA) Unwrap() error {
return err.error
}

func (errChannelClosed) Error() string {
return "channel closed"
delay := rateLimiter.When(initPath)
logrus.Errorf("Sync for %v failed, trying again in %v. Reason: %v", s.ns, delay, err)
}
}

// sync starts replicating Kubernetes resources into OPA. If an error occurs
// during the replication process this function returns and indicates whether
// the synchronizer should backoff. The synchronizer will backoff whenever the
// Kubernetes API returns an error.
func (s *GenericSync) sync(resource dynamic.NamespaceableResourceInterface, quit <-chan struct{}) error {

logrus.Infof("Syncing %v.", s.ns)
tList := time.Now()
result, err := resource.List(context.TODO(), metav1.ListOptions{})
func (s *GenericSync) processNext(store cache.Store, key interface{}, syncDone *bool) error {
path, err := objPath(key, s.ns.Namespaced)
if err != nil {
return errKubernetes{fmt.Errorf("list: %w", err)}
return err
}

dList := time.Since(tList)
resourceVersion := result.GetResourceVersion()
logrus.Infof("Listed %v and got %v resources with resourceVersion %v. Took %v.", s.ns, len(result.Items), resourceVersion, dList)

tLoad := time.Now()

if err := s.syncAll(result.Items); err != nil {
return errOPA{fmt.Errorf("reset: %w", err)}
// On receiving the initPath, load a full dump of the data store
if path == initPath {
start, list := time.Now(), store.List()
if err := s.syncAll(list); err != nil {
return err
}
logrus.Infof("Loaded %d resources of kind %v into OPA. Took %v", len(list), s.ns, time.Since(start))
*syncDone = true // sync is now Done
return nil
}

dLoad := time.Since(tLoad)
logrus.Infof("Loaded %v resources into OPA. Took %v. Starting watch at resourceVersion %v.", s.ns, dLoad, resourceVersion)
// Ignore updates queued before the initial load
if !*syncDone {
return nil
}

w, err := resource.Watch(context.TODO(), metav1.ListOptions{
ResourceVersion: resourceVersion,
})
obj, exists, err := store.Get(key)
if err != nil {
return errKubernetes{fmt.Errorf("watch: %w", err)}
return fmt.Errorf("store error: %w", err)
}

defer w.Stop()

ch := w.ResultChan()

for {
select {
case evt := <-ch:
switch evt.Type {
case watch.Added:
err := s.syncAdd(evt.Object)
if err != nil {
return errOPA{fmt.Errorf("add event: %w", err)}
}
case watch.Modified:
err := s.syncAdd(evt.Object)
if err != nil {
return errOPA{fmt.Errorf("modify event: %w", err)}
}
case watch.Deleted:
err := s.syncRemove(evt.Object)
if err != nil {
return errOPA{fmt.Errorf("delete event: %w", err)}
}
case watch.Error:
return errKubernetes{fmt.Errorf("error event: %v", evt.Object)}
default:
return errChannelClosed{}
}
case <-quit:
return nil
if exists {
if err := s.syncAdd(path, obj); err != nil {
return fmt.Errorf("add event: %w", err)
}
} else {
if err := s.syncRemove(path); err != nil {
return fmt.Errorf("delete event: %w", err)
}
}
return nil
}

func (s *GenericSync) syncAdd(obj runtime.Object) error {
path, err := objPath(obj, s.ns.Namespaced)
if err != nil {
return err
}
func (s *GenericSync) syncAdd(path string, obj interface{}) error {
return s.opa.PutData(path, obj)
}

func (s *GenericSync) syncRemove(obj runtime.Object) error {
path, err := objPath(obj, s.ns.Namespaced)
if err != nil {
return err
}
func (s *GenericSync) syncRemove(path string) error {
return s.opa.PatchData(path, "remove", nil)
}

func (s *GenericSync) syncAll(objs []unstructured.Unstructured) error {
func (s *GenericSync) syncAll(objs []interface{}) error {

// Build a list of patches to apply.
payload, err := generateSyncPayload(objs, s.ns.Namespaced)
Expand All @@ -250,15 +254,19 @@ func (s *GenericSync) syncAll(objs []unstructured.Unstructured) error {
return s.opa.PutData("/", payload)
}

func generateSyncPayload(objs []unstructured.Unstructured, namespaced bool) (map[string]interface{}, error) {
func generateSyncPayload(objs []interface{}, namespaced bool) (map[string]interface{}, error) {
combined := make(map[string]interface{}, len(objs))
for _, obj := range objs {
path, err := objPath(&obj, namespaced)
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
return nil, err
}
path, err := objPath(key, namespaced)
if err != nil {
return nil, err
}

// Ensure the path in thee map up to our value exists
// Ensure the path in the map up to our value exists
// We make some assumptions about the paths that do exist
// being the correct types due to the expected uniform
// objPath's for each of the similar object types being
Expand All @@ -273,23 +281,15 @@ func generateSyncPayload(objs []unstructured.Unstructured, namespaced bool) (map
}
dir = next.(map[string]interface{})
}
dir[segments[len(segments)-1]] = obj.Object
dir[segments[len(segments)-1]] = obj
}

return combined, nil
}

func objPath(obj runtime.Object, namespaced bool) (string, error) {
m, err := meta.Accessor(obj)
if err != nil {
return "", err
}
name := m.GetName()
var path string
if namespaced {
path = m.GetNamespace() + "/" + name
} else {
path = name
}
return path, nil
// objPath transforms queue key into OPA path
func objPath(key interface{}, namespaced bool) (string, error) {
// OPA keys actually match current kubernetes cache key format
// ("namespace/name" or "name" if resource is not namespaced)
return key.(string), nil
}
Loading

0 comments on commit c421f3b

Please sign in to comment.