Skip to content

Commit

Permalink
move arguments from Start() to Source objects
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Ramlot <[email protected]>
  • Loading branch information
inteon committed Jan 6, 2024
1 parent 91f642b commit 916d5db
Show file tree
Hide file tree
Showing 17 changed files with 695 additions and 445 deletions.
9 changes: 6 additions & 3 deletions examples/builtins/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,17 @@ func main() {
}

// Watch ReplicaSets and enqueue ReplicaSet object key
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}, &handler.EnqueueRequestForObject{})); err != nil {
entryLog.Error(err, "unable to watch ReplicaSets")
os.Exit(1)
}

// Watch Pods and enqueue owning ReplicaSet key
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
if err := c.Watch(source.Kind(
mgr.GetCache(),
&corev1.Pod{},
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner()),
)); err != nil {
entryLog.Error(err, "unable to watch Pods")
os.Exit(1)
}
Expand Down
100 changes: 58 additions & 42 deletions pkg/builder/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
internalsource "sigs.k8s.io/controller-runtime/pkg/internal/source"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -54,14 +53,15 @@ const (

// Builder builds a Controller.
type Builder struct {
forInput ForInput
ownsInput []OwnsInput
watchesInput []WatchesInput
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
name string
forInput ForInput
ownsInput []OwnsInput
watchesObjectInput []WatchesObjectInput
watchesSourceInput []WatchesSourceInput
mgr manager.Manager
globalPredicates []predicate.Predicate
ctrl controller.Controller
ctrlOptions controller.Options
name string
}

// ControllerManagedBy returns a new controller builder that will be started by the provided Manager.
Expand Down Expand Up @@ -121,10 +121,10 @@ func (blder *Builder) Owns(object client.Object, opts ...OwnsOption) *Builder {
return blder
}

// WatchesInput represents the information set by Watches method.
type WatchesInput struct {
src source.Source
eventHandler handler.EventHandler
// WatchesObjectInput represents the information set by Watches method.
type WatchesObjectInput struct {
object client.Object
eventhandler handler.EventHandler
predicates []predicate.Predicate
objectProjection objectProjection
}
Expand All @@ -133,10 +133,15 @@ type WatchesInput struct {
// update events by *reconciling the object* with the given EventHandler.
//
// This is the equivalent of calling
// WatchesRawSource(source.Kind(cache, object), eventHandler, opts...).
func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
src := source.Kind(blder.mgr.GetCache(), object)
return blder.WatchesRawSource(src, eventHandler, opts...)
// WatchesRawSource(source.Kind(scheme, object), eventhandler, opts...).
func (blder *Builder) Watches(object client.Object, eventhandler handler.EventHandler, opts ...WatchesObjectOption) *Builder {
input := WatchesObjectInput{object: object, eventhandler: eventhandler}
for _, opt := range opts {
opt.ApplyToObjectWatches(&input)
}

blder.watchesObjectInput = append(blder.watchesObjectInput, input)
return blder
}

// WatchesMetadata is the same as Watches, but forces the internal cache to only watch PartialObjectMetadata.
Expand Down Expand Up @@ -166,23 +171,24 @@ func (blder *Builder) Watches(object client.Object, eventHandler handler.EventHa
// In the first case, controller-runtime will create another cache for the
// concrete type on top of the metadata cache; this increases memory
// consumption and leads to race conditions as caches are not in sync.
func (blder *Builder) WatchesMetadata(object client.Object, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
func (blder *Builder) WatchesMetadata(object client.Object, eventhandler handler.EventHandler, opts ...WatchesObjectOption) *Builder {
opts = append(opts, OnlyMetadata)
return blder.Watches(object, eventHandler, opts...)
return blder.Watches(object, eventhandler, opts...)
}

// WatchesSourceInput represents the information set by Watches method.
type WatchesSourceInput struct {
src source.Source
}

// WatchesRawSource exposes the lower-level ControllerManagedBy Watches functions through the builder.
// Specified predicates are registered only for given source.
//
// STOP! Consider using For(...), Owns(...), Watches(...), WatchesMetadata(...) instead.
// This method is only exposed for more advanced use cases, most users should use one of the higher level functions.
func (blder *Builder) WatchesRawSource(src source.Source, eventHandler handler.EventHandler, opts ...WatchesOption) *Builder {
input := WatchesInput{src: src, eventHandler: eventHandler}
for _, opt := range opts {
opt.ApplyToWatches(&input)
}

blder.watchesInput = append(blder.watchesInput, input)
// This method is only exposed for more advanced use cases, most users should use higher level functions.
// This method does generally disregard all the global configuration set by the builder.
func (blder *Builder) WatchesRawSource(src source.Source) *Builder {
blder.watchesSourceInput = append(blder.watchesSourceInput, WatchesSourceInput{src: src})
return blder
}

Expand Down Expand Up @@ -272,11 +278,15 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
hdler := &handler.EnqueueRequestForObject{}
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, blder.forInput.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(
blder.mgr.GetCache(),
obj,
&handler.EnqueueRequestForObject{},
allPredicates...,
)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}
Expand All @@ -290,7 +300,6 @@ func (blder *Builder) doWatch() error {
if err != nil {
return err
}
src := source.Kind(blder.mgr.GetCache(), obj)
opts := []handler.OwnerOption{}
if !own.matchEveryOwner {
opts = append(opts, handler.OnlyControllerOwner())
Expand All @@ -302,30 +311,37 @@ func (blder *Builder) doWatch() error {
)
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, own.predicates...)
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
src := source.Kind(blder.mgr.GetCache(), obj, hdler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}

// Do the watch requests
if len(blder.watchesInput) == 0 && blder.forInput.object == nil {
if len(blder.watchesObjectInput) == 0 && len(blder.watchesSourceInput) == 0 && blder.forInput.object == nil {
return errors.New("there are no watches configured, controller will never get triggered. Use For(), Owns() or Watches() to set them up")
}
for _, w := range blder.watchesInput {
// If the source of this watch is of type Kind, project it.
if srcKind, ok := w.src.(*internalsource.Kind); ok {
typeForSrc, err := blder.project(srcKind.Type, w.objectProjection)
if err != nil {
return err
}
srcKind.Type = typeForSrc

for _, w := range blder.watchesObjectInput {
obj, err := blder.project(w.object, w.objectProjection)
if err != nil {
return err
}

allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
allPredicates = append(allPredicates, w.predicates...)
if err := blder.ctrl.Watch(w.src, w.eventHandler, allPredicates...); err != nil {
src := source.Kind(blder.mgr.GetCache(), obj, w.eventhandler, allPredicates...)
if err := blder.ctrl.Watch(src); err != nil {
return err
}
}

for _, w := range blder.watchesSourceInput {
if err := blder.ctrl.Watch(w.src); err != nil {
return err
}
}

return nil
}

Expand Down
22 changes: 11 additions & 11 deletions pkg/builder/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ type OwnsOption interface {
ApplyToOwns(*OwnsInput)
}

// WatchesOption is some configuration that modifies options for a watches request.
type WatchesOption interface {
// WatchesObjectOption is some configuration that modifies options for a watches request.
type WatchesObjectOption interface {
// ApplyToWatches applies this configuration to the given watches options.
ApplyToWatches(*WatchesInput)
ApplyToObjectWatches(*WatchesObjectInput)
}

// }}}
Expand Down Expand Up @@ -66,14 +66,14 @@ func (w Predicates) ApplyToOwns(opts *OwnsInput) {
opts.predicates = w.predicates
}

// ApplyToWatches applies this configuration to the given WatchesInput options.
func (w Predicates) ApplyToWatches(opts *WatchesInput) {
// ApplyToObjectWatches applies this configuration to the given WatchesInput options.
func (w Predicates) ApplyToObjectWatches(opts *WatchesObjectInput) {
opts.predicates = w.predicates
}

var _ ForOption = &Predicates{}
var _ OwnsOption = &Predicates{}
var _ WatchesOption = &Predicates{}
var _ WatchesObjectOption = &Predicates{}

// }}}

Expand All @@ -94,8 +94,8 @@ func (p projectAs) ApplyToOwns(opts *OwnsInput) {
opts.objectProjection = objectProjection(p)
}

// ApplyToWatches applies this configuration to the given WatchesInput options.
func (p projectAs) ApplyToWatches(opts *WatchesInput) {
// ApplyToWatches applies this configuration to the given WatchesObjectInput options.
func (p projectAs) ApplyToObjectWatches(opts *WatchesObjectInput) {
opts.objectProjection = objectProjection(p)
}

Expand Down Expand Up @@ -132,9 +132,9 @@ var (
// consumption and leads to race conditions as caches are not in sync.
OnlyMetadata = projectAs(projectAsMetadata)

_ ForOption = OnlyMetadata
_ OwnsOption = OnlyMetadata
_ WatchesOption = OnlyMetadata
_ ForOption = OnlyMetadata
_ OwnsOption = OnlyMetadata
_ WatchesObjectOption = OnlyMetadata
)

// }}}
Expand Down
12 changes: 3 additions & 9 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"

"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/internal/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/ratelimiter"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
Expand Down Expand Up @@ -72,13 +70,9 @@ type Controller interface {
// Reconciler is called to reconcile an object by Namespace/Name
reconcile.Reconciler

// Watch takes events provided by a Source and uses the EventHandler to
// enqueue reconcile.Requests in response to the events.
//
// Watch may be provided one or more Predicates to filter events before
// they are given to the EventHandler. Events will be passed to the
// EventHandler if all provided Predicates evaluate to true.
Watch(src source.Source, eventhandler handler.EventHandler, predicates ...predicate.Predicate) error
// Watch takes events provided by a Source and enqueues reconcile.Requests
// in response to the events.
Watch(src source.Source) error

// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ var _ = Describe("controller", func() {
Expect(err).NotTo(HaveOccurred())

By("Watching Resources")
err = instance.Watch(
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
err = instance.Watch(source.Kind(
cm.GetCache(),
&appsv1.ReplicaSet{},
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
)
))
Expect(err).NotTo(HaveOccurred())

err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{}))
Expect(err).NotTo(HaveOccurred())

err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = Describe("controller.Controller", func() {

ctx, cancel := context.WithCancel(context.Background())
watchChan := make(chan event.GenericEvent, 1)
watch := &source.Channel{Source: watchChan}
watch := source.Channel(source.NewChannelBroadcaster(watchChan), nil, &handler.EnqueueRequestForObject{})
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}

reconcileStarted := make(chan struct{})
Expand All @@ -99,7 +99,7 @@ var _ = Describe("controller.Controller", func() {
Expect(err).NotTo(HaveOccurred())

c, err := controller.New("new-controller", m, controller.Options{Reconciler: rec})
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
Expect(c.Watch(watch)).To(Succeed())
Expect(err).NotTo(HaveOccurred())

go func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func ExampleController() {
}

// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -108,7 +108,7 @@ func ExampleController_unstructured() {
Version: "v1",
})
// Watch for Pod create / update / delete events and call Reconcile
err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{})
err = c.Watch(source.Kind(mgr.GetCache(), u, &handler.EnqueueRequestForObject{}))
if err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
Expand Down Expand Up @@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
os.Exit(1)
}

if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil {
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}, &handler.EnqueueRequestForObject{})); err != nil {
log.Error(err, "unable to watch pods")
os.Exit(1)
}
Expand Down
Loading

0 comments on commit 916d5db

Please sign in to comment.