Skip to content

Commit

Permalink
Support shutdown watches dynamically
Browse files Browse the repository at this point in the history
Co-authored-by: FillZpp <[email protected]>
Signed-off-by: Tim Ramlot <[email protected]>
  • Loading branch information
inteon and FillZpp committed Feb 13, 2024
1 parent 10faf3c commit 556c6df
Show file tree
Hide file tree
Showing 12 changed files with 871 additions and 102 deletions.
8 changes: 3 additions & 5 deletions pkg/cache/informertest/fake_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/utils/ptr"

"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -89,10 +90,7 @@ func (c *FakeInformers) RemoveInformer(ctx context.Context, obj client.Object) e

// WaitForCacheSync implements Informers.
func (c *FakeInformers) WaitForCacheSync(ctx context.Context) bool {
if c.Synced == nil {
return true
}
return *c.Synced
return ptr.Deref(c.Synced, true)
}

// FakeInformerFor implements Informers.
Expand All @@ -116,7 +114,7 @@ func (c *FakeInformers) informerFor(gvk schema.GroupVersionKind, _ runtime.Objec
return informer, nil
}

c.InformersByGVK[gvk] = &controllertest.FakeInformer{}
c.InformersByGVK[gvk] = &controllertest.FakeInformer{Synced: ptr.Deref(c.Synced, true)}
return c.InformersByGVK[gvk], nil
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ type Controller interface {
// in response to the events.
Watch(src source.Source) error

// StopWatch stops watching a source that was previously registered by Watch().
//
// StopWatch may be called multiple times, even concurrently. All such calls will
// block until all goroutines have terminated.
StopWatch(src source.Source) error

// Start starts the controller. Start blocks until the context is closed or a
// controller has an error starting.
Start(ctx context.Context) error
Expand Down
96 changes: 96 additions & 0 deletions pkg/controller/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,102 @@ var _ = Describe("controller", func() {
List(context.Background(), &controllertest.UnconventionalListTypeList{})
Expect(err).NotTo(HaveOccurred())
})

It("should not reconcile after watch is stopped", func() {
By("Creating the Manager")
cm, err := manager.New(cfg, manager.Options{})
Expect(err).NotTo(HaveOccurred())

By("Creating the Controller")
instance, err := controller.New("foo-controller", cm, controller.Options{
Reconciler: reconcile.Func(
func(_ context.Context, request reconcile.Request) (reconcile.Result, error) {
reconciled <- request
return reconcile.Result{}, nil
}),
})
Expect(err).NotTo(HaveOccurred())

By("Watching Resources")
deploySource := source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{})
err = instance.Watch(
deploySource,
)
Expect(err).NotTo(HaveOccurred())

By("Starting the Manager")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
defer GinkgoRecover()
Expect(cm.Start(ctx)).NotTo(HaveOccurred())
}()

deploymentDefinition := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{Name: "deployment-name"},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"foo": "bar"}},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "nginx",
Image: "nginx",
SecurityContext: &corev1.SecurityContext{
Privileged: truePtr(),
},
},
},
},
},
},
}
deployment := deploymentDefinition.DeepCopy()
expectedReconcileRequest := reconcile.Request{NamespacedName: types.NamespacedName{
Namespace: "default",
Name: "deployment-name",
}}

By("Invoking Reconciling for Create")
deployment, err = clientset.AppsV1().Deployments("default").Create(ctx, deployment, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(<-reconciled).To(Equal(expectedReconcileRequest))

By("Stopping the deployment watch")
Expect(instance.StopWatch(deploySource)).NotTo(HaveOccurred())

By("Test No Reconciling for Update")
newDeployment := deployment.DeepCopy()
newDeployment.Labels = map[string]string{"foo": "bar"}
_, err = clientset.AppsV1().Deployments("default").Update(ctx, newDeployment, metav1.UpdateOptions{})
Expect(err).NotTo(HaveOccurred())
Consistently(reconciled).ShouldNot(Receive())

By("Test No Reconciling for Delete")
err = clientset.AppsV1().Deployments("default").
Delete(ctx, "deployment-name", metav1.DeleteOptions{})
Expect(err).NotTo(HaveOccurred())
Consistently(reconciled).ShouldNot(Receive())

By("Try starting the old deployment watch")
Expect(instance.Watch(deploySource)).To(MatchError("cannot start an already started Kind source"))

By("Starting a new deployment watch")
deploySource = source.Kind(cm.GetCache(), &appsv1.Deployment{}, &handler.EnqueueRequestForObject{})
Expect(instance.Watch(deploySource)).NotTo(HaveOccurred())

deployment = deploymentDefinition.DeepCopy()

By("Invoking Reconciling for Update")
newDeployment = deployment.DeepCopy()
newDeployment.Labels = map[string]string{"foo": "bar"}
_, err = clientset.AppsV1().Deployments("default").Create(ctx, newDeployment, metav1.CreateOptions{})
Expect(err).NotTo(HaveOccurred())
Expect(<-reconciled).To(Equal(expectedReconcileRequest))
})
})
})

Expand Down
46 changes: 40 additions & 6 deletions pkg/controller/controllertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package controllertest

import (
"fmt"
"sync"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -33,7 +35,8 @@ type FakeInformer struct {
// RunCount is incremented each time RunInformersAndControllers is called
RunCount int

handlers []eventHandlerWrapper
mu sync.RWMutex
handlers []*eventHandlerWrapper
}

type modernResourceEventHandler interface {
Expand All @@ -51,7 +54,8 @@ type legacyResourceEventHandler interface {
// eventHandlerWrapper wraps a ResourceEventHandler in a manner that is compatible with client-go 1.27+ and older.
// The interface was changed in these versions.
type eventHandlerWrapper struct {
handler any
handler any
hasSynced bool
}

func (e eventHandlerWrapper) OnAdd(obj interface{}) {
Expand All @@ -78,6 +82,10 @@ func (e eventHandlerWrapper) OnDelete(obj interface{}) {
e.handler.(legacyResourceEventHandler).OnDelete(obj)
}

func (e eventHandlerWrapper) HasSynced() bool {
return e.hasSynced
}

// AddIndexers does nothing. TODO(community): Implement this.
func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error {
return nil
Expand All @@ -98,10 +106,13 @@ func (f *FakeInformer) HasSynced() bool {
return f.Synced
}

// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers. TODO(community): Implement Registration.
// AddEventHandler implements the Informer interface. Adds an EventHandler to the fake Informers.
func (f *FakeInformer) AddEventHandler(handler cache.ResourceEventHandler) (cache.ResourceEventHandlerRegistration, error) {
f.handlers = append(f.handlers, eventHandlerWrapper{handler})
return nil, nil
f.mu.Lock()
defer f.mu.Unlock()
eh := &eventHandlerWrapper{handler, f.Synced}
f.handlers = append(f.handlers, eh)
return eh, nil
}

// Run implements the Informer interface. Increments f.RunCount.
Expand All @@ -111,20 +122,26 @@ func (f *FakeInformer) Run(<-chan struct{}) {

// Add fakes an Add event for obj.
func (f *FakeInformer) Add(obj metav1.Object) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, h := range f.handlers {
h.OnAdd(obj)
}
}

// Update fakes an Update event for obj.
func (f *FakeInformer) Update(oldObj, newObj metav1.Object) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, h := range f.handlers {
h.OnUpdate(oldObj, newObj)
}
}

// Delete fakes an Delete event for obj.
func (f *FakeInformer) Delete(obj metav1.Object) {
f.mu.RLock()
defer f.mu.RUnlock()
for _, h := range f.handlers {
h.OnDelete(obj)
}
Expand All @@ -135,8 +152,25 @@ func (f *FakeInformer) AddEventHandlerWithResyncPeriod(handler cache.ResourceEve
return nil, nil
}

// RemoveEventHandler does nothing. TODO(community): Implement this.
// RemoveEventHandler removes an EventHandler to the fake Informers.
func (f *FakeInformer) RemoveEventHandler(handle cache.ResourceEventHandlerRegistration) error {
eh, ok := handle.(*eventHandlerWrapper)
if !ok {
return fmt.Errorf("invalid registration type %t", handle)
}

f.mu.Lock()
defer f.mu.Unlock()

handlers := make([]*eventHandlerWrapper, 0, len(f.handlers))
for _, h := range f.handlers {
if h == eh {
continue
}
handlers = append(handlers, h)
}
f.handlers = handlers

return nil
}

Expand Down
Loading

0 comments on commit 556c6df

Please sign in to comment.