Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for synchronous replication #135

Merged
merged 1 commit into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions cmd/kube-mgmt/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/open-policy-agent/kube-mgmt/pkg/version"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
Expand Down Expand Up @@ -172,22 +174,25 @@ func run(params *params) {
}
}

for _, gvk := range params.replicateCluster {
sync := data.New(kubeconfig, opa.New(params.opaURL, params.opaAuth).Prefix(params.replicatePath), getResourceType(gvk, false))
_, err := sync.Run()
var client dynamic.Interface
if len(params.replicateCluster)+len(params.replicateNamespace) > 0 {
client, err = dynamic.NewForConfig(kubeconfig)
if err != nil {
logrus.Fatalf("Failed to start data sync for %v: %v", gvk, err)
logrus.Fatalf("Failed to get dynamic client: %v", err)
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

for _, gvk := range params.replicateNamespace {
sync := data.New(kubeconfig, opa.New(params.opaURL, params.opaAuth).Prefix(params.replicatePath), getResourceType(gvk, true))
_, err := sync.Run()
if err != nil {
logrus.Fatalf("Failed to start data sync for %v: %v", gvk, err)
}
for _, gvk := range params.replicateCluster {
sync := data.NewFromInterface(client, opa.New(params.opaURL, params.opaAuth).Prefix(params.replicatePath), getResourceType(gvk, false))
go sync.RunContext(ctx)
}

for _, gvk := range params.replicateNamespace {
sync := data.NewFromInterface(client, opa.New(params.opaURL, params.opaAuth).Prefix(params.replicatePath), getResourceType(gvk, true))
go sync.RunContext(ctx)
}
quit := make(chan struct{})
<-quit
}
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/evanphx/json-patch v5.6.0+incompatible // indirect
github.com/go-logr/logr v1.2.0 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
Expand All @@ -23,6 +24,7 @@ require (
github.com/json-iterator/go v1.1.12 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/spf13/pflag v1.0.5 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8 // indirect
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPO
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/evanphx/json-patch v5.6.0+incompatible h1:jBYDEEiFBPxA0v50tFdvOzQQTCvpL6mnFh5mB2/l16U=
github.com/evanphx/json-patch v5.6.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
Expand Down Expand Up @@ -345,6 +347,7 @@ github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCko
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
68 changes: 45 additions & 23 deletions pkg/data/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"time"

"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand All @@ -20,16 +19,18 @@ import (
"github.com/open-policy-agent/kube-mgmt/pkg/types"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
)

// GenericSync replicates Kubernetes resources into OPA as raw JSON.
type GenericSync struct {
kubeconfig *rest.Config
opa opa_client.Data
ns types.ResourceType
client dynamic.Interface
opa opa_client.Data
ns types.ResourceType
createError error // to support deprecated calls to New / Run
}

// The min/max amount of time to wait when resetting the synchronizer.
Expand All @@ -38,36 +39,57 @@ const (
backoffMin = time.Second
)

// New returns a new GenericSync that cna be started.
// New returns a new GenericSync that can be started.
// Deprecated: Please Use NewFromInterface instead.
func New(kubeconfig *rest.Config, opa opa_client.Data, ns types.ResourceType) *GenericSync {
client, err := dynamic.NewForConfig(kubeconfig)
if err != nil {
return &GenericSync{createError: err}
}
return NewFromInterface(client, opa, ns)
}

// NewFromInterface returns a new GenericSync that can be started.
func NewFromInterface(client dynamic.Interface, opa opa_client.Data, ns types.ResourceType) *GenericSync {
return &GenericSync{
kubeconfig: kubeconfig,
ns: ns,
opa: opa.Prefix(ns.Resource),
client: client,
ns: ns,
opa: opa.Prefix(ns.Resource),
}
}

// Run starts the synchronizer. To stop the synchronizer send a message to the
// channel.
// Deprecated: Please use RunContext instead.
func (s *GenericSync) Run() (chan struct{}, error) {

client, err := dynamic.NewForConfig(s.kubeconfig)
if err != nil {
return nil, err
// To support legacy way of creating GenericSync from *rest.Config
if s.createError != nil {
return nil, s.createError
}

quit := make(chan struct{})
go s.loop(client, quit)
go s.loop(quit)
return quit, nil
}

func (s *GenericSync) loop(client dynamic.Interface, quit chan struct{}) {
// RunContext starts the synchronizer in the foreground.
// To stop the synchronizer, cancel the context.
func (s *GenericSync) RunContext(ctx context.Context) error {
if s.createError != nil {
return s.createError
}
s.loop(ctx.Done())
return nil
}

func (s *GenericSync) loop(quit <-chan struct{}) {

defer func() {
logrus.Infof("Sync for %v finished. Exiting.", s.ns)
}()

resource := client.Resource(schema.GroupVersionResource{
resource := s.client.Resource(schema.GroupVersionResource{
Group: s.ns.Group,
Version: s.ns.Version,
Resource: s.ns.Resource,
Expand Down Expand Up @@ -138,7 +160,7 @@ func (errChannelClosed) Error() string {
// during the replication process this function returns and indicates whether
// the synchronizer should backoff. The synchronizer will backoff whenever the
// Kubernetes API returns an error.
func (s *GenericSync) sync(resource dynamic.NamespaceableResourceInterface, quit chan struct{}) error {
func (s *GenericSync) sync(resource dynamic.NamespaceableResourceInterface, quit <-chan struct{}) error {

logrus.Infof("Syncing %v.", s.ns)
tList := time.Now()
Expand Down Expand Up @@ -202,15 +224,15 @@ func (s *GenericSync) sync(resource dynamic.NamespaceableResourceInterface, quit
}

func (s *GenericSync) syncAdd(obj runtime.Object) error {
path, err := s.objPath(obj)
path, err := objPath(obj, s.ns.Namespaced)
if err != nil {
return err
}
return s.opa.PutData(path, obj)
}

func (s *GenericSync) syncRemove(obj runtime.Object) error {
path, err := s.objPath(obj)
path, err := objPath(obj, s.ns.Namespaced)
if err != nil {
return err
}
Expand All @@ -220,18 +242,18 @@ func (s *GenericSync) syncRemove(obj runtime.Object) error {
func (s *GenericSync) syncAll(objs []unstructured.Unstructured) error {

// Build a list of patches to apply.
payload, err := s.generateSyncPayload(objs)
payload, err := generateSyncPayload(objs, s.ns.Namespaced)
if err != nil {
return err
}

return s.opa.PutData("/", payload)
}

func (s *GenericSync) generateSyncPayload(objs []unstructured.Unstructured) (map[string]interface{}, error) {
func generateSyncPayload(objs []unstructured.Unstructured, namespaced bool) (map[string]interface{}, error) {
combined := make(map[string]interface{}, len(objs))
for _, obj := range objs {
objPath, err := s.objPath(&obj)
path, err := objPath(&obj, namespaced)
if err != nil {
return nil, err
}
Expand All @@ -241,7 +263,7 @@ func (s *GenericSync) generateSyncPayload(objs []unstructured.Unstructured) (map
// being the correct types due to the expected uniform
// objPath's for each of the similar object types being
// sync'd with the GenericSync instance.
segments := strings.Split(objPath, "/")
segments := strings.Split(path, "/")
dir := combined
for i := 0; i < len(segments)-1; i++ {
next, ok := combined[segments[i]]
Expand All @@ -257,14 +279,14 @@ func (s *GenericSync) generateSyncPayload(objs []unstructured.Unstructured) (map
return combined, nil
}

func (s *GenericSync) objPath(obj runtime.Object) (string, error) {
func objPath(obj runtime.Object, namespaced bool) (string, error) {
m, err := meta.Accessor(obj)
if err != nil {
return "", err
}
name := m.GetName()
var path string
if s.ns.Namespaced {
if namespaced {
path = m.GetNamespace() + "/" + name
} else {
path = name
Expand Down
Loading