Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[operator] Use contexts instead of channels for Run methods #484

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions examples/operator/opinionated/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/grafana/grafana-app-sdk/k8s"
Expand Down Expand Up @@ -125,21 +124,14 @@ func main() {
// Informers also implement Controller, so we can use them as a controller directly if there's no need for an intermediary.
op.AddController(informerController)

// Create the stop channel
stopCh := make(chan struct{}, 1)

// Set up a signal handler
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
<-sigChan
close(stopCh)
}()
// Create the run context
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

log.Printf("%sStarting Operator%s", green, nc)

// Run the controller (will block until stopCh receives a message or is closed)
err = op.Run(stopCh)
err = op.Run(ctx)
if err != nil {
panic(fmt.Errorf("error running operator: %w", err))
}
Expand Down
14 changes: 3 additions & 11 deletions examples/operator/opinionated/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/grafana/grafana-app-sdk/k8s"
Expand Down Expand Up @@ -120,21 +119,14 @@ func main() {
// Informers also implement Controller, so we can use them as a controller directly if there's no need for an intermediary.
op.AddController(informer)

// Create the stop channel
stopCh := make(chan struct{}, 1)

// Set up a signal handler
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
<-sigChan
close(stopCh)
}()
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

log.Printf("%sStarting Operator%s", green, nc)

// Run the controller (will block until stopCh receives a message or is closed)
op.Run(stopCh)
op.Run(ctx)
}

type OpinionatedModel struct {
Expand Down
14 changes: 3 additions & 11 deletions examples/operator/simple/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/grafana/grafana-app-sdk/k8s"
Expand Down Expand Up @@ -89,21 +88,14 @@ func main() {
panic(fmt.Errorf("unable to reconcile kind: %w", err))
}

// Create the stop channel
stopCh := make(chan struct{}, 1)

// Set up a signal handler
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
<-sigChan
close(stopCh)
}()
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

log.Print("\u001B[1;32mStarting Operator\u001B[0m")

// Run the controller (will block until stopCh receives a message or is closed)
err = simpleOperator.Run(stopCh)
err = simpleOperator.Run(ctx)
if err != nil {
panic(fmt.Errorf("error running operator: %w", err))
}
Expand Down
14 changes: 3 additions & 11 deletions examples/operator/simple/watcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"os"
"os/signal"
"syscall"
"time"

"github.com/grafana/grafana-app-sdk/k8s"
Expand Down Expand Up @@ -96,21 +95,14 @@ func main() {
panic(fmt.Errorf("unable to watch kind: %w", err))
}

// Create the stop channel
stopCh := make(chan struct{}, 1)

// Set up a signal handler
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
<-sigChan
close(stopCh)
}()
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)
defer cancel()

log.Print("\u001B[1;32mStarting Operator\u001B[0m")

// Run the controller (will block until stopCh receives a message or is closed)
err = simpleOperator.Run(stopCh)
err = simpleOperator.Run(ctx)
if err != nil {
panic(fmt.Errorf("error running operator: %w", err))
}
Expand Down
19 changes: 12 additions & 7 deletions operator/informer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (

"github.com/prometheus/client_golang/prometheus"

"github.com/grafana/grafana-app-sdk/app"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/grafana/grafana-app-sdk/metrics"
"github.com/grafana/grafana-app-sdk/resource"
)

var _ Controller = &InformerController{}

type ResourceAction string

const (
Expand All @@ -38,8 +41,8 @@ var DefaultErrorHandler = func(ctx context.Context, err error) {

// Informer is an interface describing an informer which can be managed by InformerController
type Informer interface {
app.Runnable
AddEventHandler(handler ResourceWatcher) error
Run(stopCh <-chan struct{}) error
}

// ResourceWatcher describes an object which handles Add/Update/Delete actions for a resource
Expand Down Expand Up @@ -283,14 +286,16 @@ func (c *InformerController) RemoveAllReconcilersForResource(resourceKind string
// Run runs the controller, which starts all informers, until stopCh is closed
//
//nolint:errcheck
func (c *InformerController) Run(stopCh <-chan struct{}) error {
func (c *InformerController) Run(ctx context.Context) error {
derivedCtx, cancel := context.WithCancel(ctx)
defer cancel()
c.informers.RangeAll(func(_ string, _ int, inf Informer) {
go inf.Run(stopCh)
go inf.Run(derivedCtx)
})

go c.retryTicker(stopCh)
go c.retryTicker(derivedCtx)

<-stopCh
<-ctx.Done()

return nil
}
Expand Down Expand Up @@ -542,7 +547,7 @@ func (c *InformerController) doReconcile(ctx context.Context, reconciler Reconci
// retryTicker blocks until stopCh is closed or receives a message.
// It checks if there are function calls to be retried every second, and, if there are any, calls the function.
// If the function returns an error, it schedules a new retry according to the RetryPolicy.
func (c *InformerController) retryTicker(stopCh <-chan struct{}) {
func (c *InformerController) retryTicker(ctx context.Context) {
ticker := time.NewTicker(c.retryTickerInterval)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -583,7 +588,7 @@ func (c *InformerController) retryTicker(stopCh <-chan struct{}) {
c.toRetry.AddItem(key, inf)
}
}
case <-stopCh:
case <-ctx.Done():
return
}
}
Expand Down
Loading
Loading