diff --git a/pkg/source/internal/channel.go b/pkg/source/internal/channel.go new file mode 100644 index 0000000000..4b45347bdc --- /dev/null +++ b/pkg/source/internal/channel.go @@ -0,0 +1,147 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "errors" + "fmt" + "sync" + + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +type Channel[T any] struct { + // once ensures the event distribution goroutine will be performed only once + once sync.Once + + // source is the source channel to fetch GenericEvents + Source <-chan event.TypedGenericEvent[T] + + Handler handler.TypedEventHandler[T] + + Predicates []predicate.TypedPredicate[T] + + BufferSize *int + + // dest is the destination channels of the added event handlers + dest []chan event.TypedGenericEvent[T] + + // destLock is to ensure the destination channels are safely added/removed + destLock sync.Mutex +} + +func (cs *Channel[T]) String() string { + return fmt.Sprintf("channel source: %p", cs) +} + +// Start implements Source and should only be called by the Controller. +func (cs *Channel[T]) Start( + ctx context.Context, + queue workqueue.RateLimitingInterface, +) error { + // Source should have been specified by the user. + if cs.Source == nil { + return fmt.Errorf("must specify Channel.Source") + } + if cs.Handler == nil { + return errors.New("must specify Channel.Handler") + } + + if cs.BufferSize == nil { + cs.BufferSize = ptr.To(1024) + } + + dst := make(chan event.TypedGenericEvent[T], *cs.BufferSize) + + cs.destLock.Lock() + cs.dest = append(cs.dest, dst) + cs.destLock.Unlock() + + cs.once.Do(func() { + // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source + go cs.syncLoop(ctx) + }) + + go func() { + for evt := range dst { + shouldHandle := true + for _, p := range cs.Predicates { + if !p.Generic(evt) { + shouldHandle = false + break + } + } + + if shouldHandle { + func() { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + cs.Handler.Generic(ctx, evt, queue) + }() + } + } + }() + + return nil +} + +func (cs *Channel[T]) doStop() { + cs.destLock.Lock() + defer cs.destLock.Unlock() + + for _, dst := range cs.dest { + close(dst) + } +} + +func (cs *Channel[T]) distribute(evt event.TypedGenericEvent[T]) { + cs.destLock.Lock() + defer cs.destLock.Unlock() + + for _, dst := range cs.dest { + // We cannot make it under goroutine here, or we'll meet the + // race condition of writing message to closed channels. + // To avoid blocking, the dest channels are expected to be of + // proper buffer size. If we still see it blocked, then + // the controller is thought to be in an abnormal state. + dst <- evt + } +} + +func (cs *Channel[T]) syncLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + // Close destination channels + cs.doStop() + return + case evt, stillOpen := <-cs.Source: + if !stillOpen { + // if the source channel is closed, we're never gonna get + // anything more on it, so stop & bail + cs.doStop() + return + } + cs.distribute(evt) + } + } +} diff --git a/pkg/internal/source/event_handler.go b/pkg/source/internal/event_handler.go similarity index 100% rename from pkg/internal/source/event_handler.go rename to pkg/source/internal/event_handler.go diff --git a/pkg/source/internal/func.go b/pkg/source/internal/func.go new file mode 100644 index 0000000000..3db10b54fc --- /dev/null +++ b/pkg/source/internal/func.go @@ -0,0 +1,36 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "fmt" + + "k8s.io/client-go/util/workqueue" +) + +// Func is a function that implements Source. +type Func func(context.Context, workqueue.RateLimitingInterface) error + +// Start implements Source. +func (f Func) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { + return f(ctx, queue) +} + +func (f Func) String() string { + return fmt.Sprintf("func source: %p", f) +} diff --git a/pkg/source/internal/informer.go b/pkg/source/internal/informer.go new file mode 100644 index 0000000000..db77a37dab --- /dev/null +++ b/pkg/source/internal/informer.go @@ -0,0 +1,58 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "context" + "errors" + "fmt" + + "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). +type Informer struct { + // Informer is the controller-runtime Informer + Informer cache.Informer + Handler handler.EventHandler + Predicates []predicate.Predicate +} + +// Start is internal and should be called only by the Controller to register an EventHandler with the Informer +// to enqueue reconcile.Requests. +func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { + // Informer should have been specified by the user. + if is.Informer == nil { + return fmt.Errorf("must specify Informer.Informer") + } + if is.Handler == nil { + return errors.New("must specify Informer.Handler") + } + + _, err := is.Informer.AddEventHandler(NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs()) + if err != nil { + return err + } + return nil +} + +func (is *Informer) String() string { + return fmt.Sprintf("informer source: %p", is.Informer) +} diff --git a/pkg/internal/source/internal_suite_test.go b/pkg/source/internal/internal_suite_test.go similarity index 100% rename from pkg/internal/source/internal_suite_test.go rename to pkg/source/internal/internal_suite_test.go diff --git a/pkg/internal/source/internal_test.go b/pkg/source/internal/internal_test.go similarity index 99% rename from pkg/internal/source/internal_test.go rename to pkg/source/internal/internal_test.go index f71be58424..c62ab827dc 100644 --- a/pkg/internal/source/internal_test.go +++ b/pkg/source/internal/internal_test.go @@ -26,7 +26,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - internal "sigs.k8s.io/controller-runtime/pkg/internal/source" + internal "sigs.k8s.io/controller-runtime/pkg/source/internal" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" diff --git a/pkg/internal/source/kind.go b/pkg/source/internal/kind.go similarity index 87% rename from pkg/internal/source/kind.go rename to pkg/source/internal/kind.go index 03431d1d24..ae22d0ad9c 100644 --- a/pkg/internal/source/kind.go +++ b/pkg/source/internal/kind.go @@ -1,3 +1,19 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package internal import ( diff --git a/pkg/source/source.go b/pkg/source/source.go index 26e53022bf..08b5abed63 100644 --- a/pkg/source/source.go +++ b/pkg/source/source.go @@ -18,16 +18,12 @@ package source import ( "context" - "errors" - "fmt" - "sync" "k8s.io/client-go/util/workqueue" - "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" - internal "sigs.k8s.io/controller-runtime/pkg/internal/source" + internal "sigs.k8s.io/controller-runtime/pkg/source/internal" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/predicate" @@ -64,14 +60,25 @@ func Kind[T client.Object](cache cache.Cache, object T, handler handler.TypedEve } } -var _ Source = &channel[string]{} +var _ Source = &internal.Kind[client.Object]{} +var _ SyncingSource = &internal.Kind[client.Object]{} + +// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). +type Informer = internal.Informer + +var _ Source = &internal.Informer{} + +type channelOpts[T any] struct { + bufferSize *int + predicates []predicate.TypedPredicate[T] +} // ChannelOpt allows to configure a source.Channel. -type ChannelOpt[T any] func(*channel[T]) +type ChannelOpt[T any] func(*channelOpts[T]) // WithPredicates adds the configured predicates to a source.Channel. func WithPredicates[T any](p ...predicate.TypedPredicate[T]) ChannelOpt[T] { - return func(c *channel[T]) { + return func(c *channelOpts[T]) { c.predicates = append(c.predicates, p...) } } @@ -79,7 +86,7 @@ func WithPredicates[T any](p ...predicate.TypedPredicate[T]) ChannelOpt[T] { // WithBufferSize configures the buffer size for a source.Channel. By // default, the buffer size is 1024. func WithBufferSize[T any](bufferSize int) ChannelOpt[T] { - return func(c *channel[T]) { + return func(c *channelOpts[T]) { c.bufferSize = &bufferSize } } @@ -88,176 +95,22 @@ func WithBufferSize[T any](bufferSize int) ChannelOpt[T] { // (e.g. GitHub Webhook callback). Channel requires the user to wire the external // source (e.g. http handler) to write GenericEvents to the underlying channel. func Channel[T any](source <-chan event.TypedGenericEvent[T], handler handler.TypedEventHandler[T], opts ...ChannelOpt[T]) Source { - c := &channel[T]{ - source: source, - handler: handler, - } + c := &channelOpts[T]{} for _, opt := range opts { opt(c) } - return c -} - -type channel[T any] struct { - // once ensures the event distribution goroutine will be performed only once - once sync.Once - - // source is the source channel to fetch GenericEvents - source <-chan event.TypedGenericEvent[T] - - handler handler.TypedEventHandler[T] - - predicates []predicate.TypedPredicate[T] - - bufferSize *int - - // dest is the destination channels of the added event handlers - dest []chan event.TypedGenericEvent[T] - - // destLock is to ensure the destination channels are safely added/removed - destLock sync.Mutex -} - -func (cs *channel[T]) String() string { - return fmt.Sprintf("channel source: %p", cs) -} - -// Start implements Source and should only be called by the Controller. -func (cs *channel[T]) Start( - ctx context.Context, - queue workqueue.RateLimitingInterface, -) error { - // Source should have been specified by the user. - if cs.source == nil { - return fmt.Errorf("must specify Channel.Source") - } - if cs.handler == nil { - return errors.New("must specify Channel.Handler") - } - - if cs.bufferSize == nil { - cs.bufferSize = ptr.To(1024) - } - - dst := make(chan event.TypedGenericEvent[T], *cs.bufferSize) - - cs.destLock.Lock() - cs.dest = append(cs.dest, dst) - cs.destLock.Unlock() - - cs.once.Do(func() { - // Distribute GenericEvents to all EventHandler / Queue pairs Watching this source - go cs.syncLoop(ctx) - }) - - go func() { - for evt := range dst { - shouldHandle := true - for _, p := range cs.predicates { - if !p.Generic(evt) { - shouldHandle = false - break - } - } - - if shouldHandle { - func() { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - cs.handler.Generic(ctx, evt, queue) - }() - } - } - }() - - return nil -} - -func (cs *channel[T]) doStop() { - cs.destLock.Lock() - defer cs.destLock.Unlock() - - for _, dst := range cs.dest { - close(dst) - } -} - -func (cs *channel[T]) distribute(evt event.TypedGenericEvent[T]) { - cs.destLock.Lock() - defer cs.destLock.Unlock() - - for _, dst := range cs.dest { - // We cannot make it under goroutine here, or we'll meet the - // race condition of writing message to closed channels. - // To avoid blocking, the dest channels are expected to be of - // proper buffer size. If we still see it blocked, then - // the controller is thought to be in an abnormal state. - dst <- evt - } -} - -func (cs *channel[T]) syncLoop(ctx context.Context) { - for { - select { - case <-ctx.Done(): - // Close destination channels - cs.doStop() - return - case evt, stillOpen := <-cs.source: - if !stillOpen { - // if the source channel is closed, we're never gonna get - // anything more on it, so stop & bail - cs.doStop() - return - } - cs.distribute(evt) - } - } -} - -// Informer is used to provide a source of events originating inside the cluster from Watches (e.g. Pod Create). -type Informer struct { - // Informer is the controller-runtime Informer - Informer cache.Informer - Handler handler.EventHandler - Predicates []predicate.Predicate -} - -var _ Source = &Informer{} - -// Start is internal and should be called only by the Controller to register an EventHandler with the Informer -// to enqueue reconcile.Requests. -func (is *Informer) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { - // Informer should have been specified by the user. - if is.Informer == nil { - return fmt.Errorf("must specify Informer.Informer") - } - if is.Handler == nil { - return errors.New("must specify Informer.Handler") - } - - _, err := is.Informer.AddEventHandler(internal.NewEventHandler(ctx, queue, is.Handler, is.Predicates).HandlerFuncs()) - if err != nil { - return err + return &internal.Channel[T]{ + Source: source, + Handler: handler, + BufferSize: c.bufferSize, + Predicates: c.predicates, } - return nil -} - -func (is *Informer) String() string { - return fmt.Sprintf("informer source: %p", is.Informer) } -var _ Source = Func(nil) +var _ Source = &internal.Channel[client.Object]{} // Func is a function that implements Source. -type Func func(context.Context, workqueue.RateLimitingInterface) error +type Func = internal.Func -// Start implements Source. -func (f Func) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { - return f(ctx, queue) -} - -func (f Func) String() string { - return fmt.Sprintf("func source: %p", f) -} +var _ Source = internal.Func(nil)