Skip to content

Commit

Permalink
Added test for GenerycSync.Run
Browse files Browse the repository at this point in the history
Signed-off-by: warpcomdev <[email protected]>
  • Loading branch information
warpcomdev committed Mar 9, 2022
1 parent cf978af commit e9c29a8
Show file tree
Hide file tree
Showing 37 changed files with 8,296 additions and 429 deletions.
25 changes: 15 additions & 10 deletions cmd/kube-mgmt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/open-policy-agent/kube-mgmt/pkg/version"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
Expand Down Expand Up @@ -172,22 +174,25 @@ func run(params *params) {
}
}

for _, gvk := range params.replicateCluster {
sync := data.New(kubeconfig, opa.New(params.opaURL, params.opaAuth).Prefix(params.replicatePath), getResourceType(gvk, false))
_, err := sync.Run()
var client dynamic.Interface
if len(params.replicateCluster)+len(params.replicateNamespace) > 0 {
client, err = dynamic.NewForConfig(kubeconfig)
if err != nil {
logrus.Fatalf("Failed to start data sync for %v: %v", gvk, err)
logrus.Fatalf("Failed to get dynamic client: %v", err)
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, gvk := range params.replicateNamespace {
sync := data.New(kubeconfig, opa.New(params.opaURL, params.opaAuth).Prefix(params.replicatePath), getResourceType(gvk, true))
_, err := sync.Run()
if err != nil {
logrus.Fatalf("Failed to start data sync for %v: %v", gvk, err)
}
for _, gvk := range params.replicateCluster {
sync := data.NewFromInterface(client, opa.New(params.opaURL, params.opaAuth).Prefix(params.replicatePath), getResourceType(gvk, false))
go sync.RunContext(ctx)
}

for _, gvk := range params.replicateNamespace {
sync := data.NewFromInterface(client, opa.New(params.opaURL, params.opaAuth).Prefix(params.replicatePath), getResourceType(gvk, true))
go sync.RunContext(ctx)
}
quit := make(chan struct{})
<-quit
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
Expand All @@ -23,6 +24,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPO
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
Expand Down Expand Up @@ -345,6 +347,7 @@ github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
69 changes: 46 additions & 23 deletions pkg/data/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -20,16 +19,19 @@ import (
"github.com/open-policy-agent/kube-mgmt/pkg/types"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)

// GenericSync replicates Kubernetes resources into OPA as raw JSON.
type GenericSync struct {
kubeconfig *rest.Config
opa opa_client.Data
ns types.ResourceType
client dynamic.Interface
opa opa_client.Data
ns types.ResourceType
createError error // to support deprecated calls to New / Run

}

// The min/max amount of time to wait when resetting the synchronizer.
Expand All @@ -38,36 +40,57 @@ const (
backoffMin = time.Second
)

// New returns a new GenericSync that cna be started.
// New returns a new GenericSync that can be started.
// Deprecated: Please Use NewFromInterface instead.
func New(kubeconfig *rest.Config, opa opa_client.Data, ns types.ResourceType) *GenericSync {
client, err := dynamic.NewForConfig(kubeconfig)
if err != nil {
return &GenericSync{createError: err}
}
return NewFromInterface(client, opa, ns)
}

// NewFromInterface returns a new GenericSync that can be started.
func NewFromInterface(client dynamic.Interface, opa opa_client.Data, ns types.ResourceType) *GenericSync {
return &GenericSync{
kubeconfig: kubeconfig,
ns: ns,
opa: opa.Prefix(ns.Resource),
client: client,
ns: ns,
opa: opa.Prefix(ns.Resource),
}
}

// Run starts the synchronizer. To stop the synchronizer send a message to the
// channel.
// Deprecated: Please use RunContext instead.
func (s *GenericSync) Run() (chan struct{}, error) {

client, err := dynamic.NewForConfig(s.kubeconfig)
if err != nil {
return nil, err
// To support legacy sync create from *rest.Config
if s.createError != nil {
return nil, s.createError
}

quit := make(chan struct{})
go s.loop(client, quit)
go s.loop(quit)
return quit, nil
}

func (s *GenericSync) loop(client dynamic.Interface, quit chan struct{}) {
// RunContext starts the synchronizer in the foreground.
// To stop the synchronizer, cancel the context.
func (s *GenericSync) RunContext(ctx context.Context) error {
if s.createError != nil {
return s.createError
}
s.loop(ctx.Done())
return nil
}

func (s *GenericSync) loop(quit <-chan struct{}) {

defer func() {
logrus.Infof("Sync for %v finished. Exiting.", s.ns)
}()

resource := client.Resource(schema.GroupVersionResource{
resource := s.client.Resource(schema.GroupVersionResource{
Group: s.ns.Group,
Version: s.ns.Version,
Resource: s.ns.Resource,
Expand Down Expand Up @@ -138,7 +161,7 @@ func (errChannelClosed) Error() string {
// during the replication process this function returns and indicates whether
// the synchronizer should backoff. The synchronizer will backoff whenever the
// Kubernetes API returns an error.
func (s *GenericSync) sync(resource dynamic.NamespaceableResourceInterface, quit chan struct{}) error {
func (s *GenericSync) sync(resource dynamic.NamespaceableResourceInterface, quit <-chan struct{}) error {

logrus.Infof("Syncing %v.", s.ns)
tList := time.Now()
Expand Down Expand Up @@ -202,15 +225,15 @@ func (s *GenericSync) sync(resource dynamic.NamespaceableResourceInterface, quit
}

func (s *GenericSync) syncAdd(obj runtime.Object) error {
path, err := s.objPath(obj)
path, err := objPath(obj, s.ns.Namespaced)
if err != nil {
return err
}
return s.opa.PutData(path, obj)
}

func (s *GenericSync) syncRemove(obj runtime.Object) error {
path, err := s.objPath(obj)
path, err := objPath(obj, s.ns.Namespaced)
if err != nil {
return err
}
Expand All @@ -220,18 +243,18 @@ func (s *GenericSync) syncRemove(obj runtime.Object) error {
func (s *GenericSync) syncAll(objs []unstructured.Unstructured) error {

// Build a list of patches to apply.
payload, err := s.generateSyncPayload(objs)
payload, err := generateSyncPayload(objs, s.ns.Namespaced)
if err != nil {
return err
}

return s.opa.PutData("/", payload)
}

func (s *GenericSync) generateSyncPayload(objs []unstructured.Unstructured) (map[string]interface{}, error) {
func generateSyncPayload(objs []unstructured.Unstructured, namespaced bool) (map[string]interface{}, error) {
combined := make(map[string]interface{}, len(objs))
for _, obj := range objs {
objPath, err := s.objPath(&obj)
path, err := objPath(&obj, namespaced)
if err != nil {
return nil, err
}
Expand All @@ -241,7 +264,7 @@ func (s *GenericSync) generateSyncPayload(objs []unstructured.Unstructured) (map
// being the correct types due to the expected uniform
// objPath's for each of the similar object types being
// sync'd with the GenericSync instance.
segments := strings.Split(objPath, "/")
segments := strings.Split(path, "/")
dir := combined
for i := 0; i < len(segments)-1; i++ {
next, ok := combined[segments[i]]
Expand All @@ -257,14 +280,14 @@ func (s *GenericSync) generateSyncPayload(objs []unstructured.Unstructured) (map
return combined, nil
}

func (s *GenericSync) objPath(obj runtime.Object) (string, error) {
func objPath(obj runtime.Object, namespaced bool) (string, error) {
m, err := meta.Accessor(obj)
if err != nil {
return "", err
}
name := m.GetName()
var path string
if s.ns.Namespaced {
if namespaced {
path = m.GetNamespace() + "/" + name
} else {
path = name
Expand Down
Loading

0 comments on commit e9c29a8

Please sign in to comment.