Skip to content

Commit

Permalink
graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
boz committed Aug 16, 2017
1 parent e150bda commit 9084348
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 45 deletions.
31 changes: 21 additions & 10 deletions _example/demo.sh
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,15 +1,26 @@
for file in _example/{prod,demo,test}.yml; do
kubectl create -f "$file"
done
#!/bin/sh

./kail
# ./kail
# ./kail --svc api
# ./kail --svc prod/api -c nginx
# ./kail --rs workers --ns test --ns demo
# ./kail --deploy api -c cache
# ./kail -l 'app=api,component != worker' -c nginx

./kail --svc api
start() {
for file in $(dirname $0)/{prod,demo,test}.yml; do
kubectl create -f "$file"
done
}

./kail --svc prod/api -c nginx
stop() {
for file in $(dirname $0)/{prod,demo,test}.yml; do
kubectl delete -f "$file"
done
}

./kail --rs workers --ns test --ns demo
case "$1" in
start) start;;
stop) stop;;
esac

./kail --deploy api -c cache

./kail -l 'app=api,component != worker' -c nginx
41 changes: 36 additions & 5 deletions cmd/kail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"text/tabwriter"

logutil "github.com/boz/go-logutil"
Expand Down Expand Up @@ -66,16 +68,39 @@ func main() {

ctx := logutil.NewContext(context.Background(), log)

ctx, cancel := context.WithCancel(ctx)

go watchSignals(ctx, cancel)

ds := createDS(ctx, cs, dsb)

if *flagDryRun {
listPods(ds)
} else {
streamLogs(ctx, cs, ds)
ds.Close()
cancel()
<-ds.Done()
return
}

ds.Shutdown()
controller := createController(ctx, cs, ds)
streamLogs(controller)
cancel()

<-ds.Done()
<-controller.Done()
}

func watchSignals(ctx context.Context, cancel context.CancelFunc) {
// NOTE: ignoring SIGINT to improve responsiveness Ctrl-C responsiveness
sigch := make(chan os.Signal, 1)
signal.Notify(sigch, syscall.SIGHUP, syscall.SIGQUIT)
go func() {
select {
case <-ctx.Done():
case <-sigch:
cancel()
}
}()
}

func createLog() logutil.Log {
Expand All @@ -86,7 +111,7 @@ func createLog() logutil.Log {
parent.Level = lvl
parent.Out = *flagLogFile

return logutil_logrus.New(parent)
return logutil_logrus.New(parent).WithComponent("kail.main")
}

func createKubeClient() kubernetes.Interface {
Expand Down Expand Up @@ -172,13 +197,19 @@ func listPods(ds kail.DS) {
w.Flush()
}

func streamLogs(ctx context.Context, cs kubernetes.Interface, ds kail.DS) {
func createController(
ctx context.Context, cs kubernetes.Interface, ds kail.DS) kail.Controller {

filter := kail.NewContainerFilter(*flagContainers)

controller, err := kail.NewController(ctx, cs, ds.Pods(), filter)
kingpin.FatalIfError(err, "Error creating controller")

return controller
}

func streamLogs(controller kail.Controller) {

writer := kail.NewWriter(os.Stdout)

for {
Expand Down
57 changes: 35 additions & 22 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (

type Controller interface {
Events() <-chan Event
Stop()
Close()
Done() <-chan struct{}
}

Expand All @@ -44,6 +44,7 @@ func NewController(
go lc.WatchContext(ctx)

log := logutil.FromContextOrDefault(ctx)
log = log.WithComponent("kail.controller")

c := &controller{
cs: cs,
Expand Down Expand Up @@ -77,6 +78,9 @@ type controller struct {
lc lifecycle.Lifecycle
}

type podMonitors map[eventSource]monitor
type monitors map[nsname.NSName]podMonitors

func (c *controller) Events() <-chan Event {
return c.eventch
}
Expand All @@ -85,7 +89,7 @@ func (c *controller) Done() <-chan struct{} {
return c.lc.Done()
}

func (c *controller) Stop() {
func (c *controller) Close() {
c.lc.Shutdown()
}

Expand All @@ -102,30 +106,52 @@ func (c *controller) run(initial []*v1.Pod) {

for {

c.log.Debugf("loop draining:%v monitors:%v", draining, len(c.monitors))

if draining && len(c.monitors) == 0 {
return
}

select {

case <-shutdownch:
c.log.Debugf("shutdown requested")
shutdownch = nil
draining = true
c.shutdownMonitors()

for _, pms := range c.monitors {
for _, pm := range pms {
go pm.Shutdown()
}
}

case ev, ok := <-peventch:
if !ok {
c.lc.Shutdown()
c.log.Debugf("pods closed")

go c.lc.Shutdown()
peventch = nil
break
}
c.handlePodEvent(ev)

if !draining {
c.handlePodEvent(ev)
break
}

case source := <-c.monitorch:
c.log.Debugf("removing source %v", source)
if pms, ok := c.monitors[source.id]; ok {
delete(pms, source)
if _, ok := pms[source]; ok {
c.log.Debugf("removing source %v", source)
delete(pms, source)
if len(pms) == 0 {
c.log.Debugf("removing pod %v", source.id)
delete(c.monitors, source.id)
}
break
}
}
c.log.Warnf("attempted to remove unknown source: %v", source)
}
}
}
Expand All @@ -140,7 +166,7 @@ func (c *controller) handlePodEvent(ev pod.Event) {
if ev.Type() == kcache.EventTypeDelete {
if pms, ok := c.monitors[id]; ok {
for _, pm := range pms {
pm.Shutdown()
go pm.Shutdown()
}
}
return
Expand All @@ -167,8 +193,7 @@ func (c *controller) ensureMonitorsForPod(pod *v1.Pod) {
if pms, ok := c.monitors[id]; ok {
for source, pm := range pms {
if !sources[source] {
c.log.Debugf("shutting down %v", source)
pm.Shutdown()
go pm.Shutdown()
}
}
}
Expand All @@ -192,14 +217,6 @@ func (c *controller) ensureMonitorsForPod(pod *v1.Pod) {
c.monitors[id] = pms
}

func (c *controller) shutdownMonitors() {
for _, pms := range c.monitors {
for _, pm := range pms {
pm.Shutdown()
}
}
}

func (c *controller) createMonitor(source eventSource) monitor {
defer c.log.Un(c.log.Trace("createMonitor(%v)", source))
m := newMonitor(c, &source)
Expand All @@ -220,7 +237,3 @@ func (c *controller) createInitialMonitors(pods []*v1.Pod) {
c.ensureMonitorsForPod(pod)
}
}

type podMonitors map[eventSource]monitor

type monitors map[nsname.NSName]podMonitors
21 changes: 19 additions & 2 deletions ds.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package kail

import (
"context"

logutil "github.com/boz/go-logutil"
"github.com/boz/kcache/types/daemonset"
"github.com/boz/kcache/types/deployment"
"github.com/boz/kcache/types/node"
Expand All @@ -14,7 +17,7 @@ type DS interface {
Pods() pod.Controller
Ready() <-chan struct{}
Done() <-chan struct{}
Shutdown()
Close()
}

type datastore struct {
Expand All @@ -36,6 +39,7 @@ type datastore struct {

readych chan struct{}
donech chan struct{}
log logutil.Log
}

type cacheController interface {
Expand All @@ -56,10 +60,23 @@ func (ds *datastore) Done() <-chan struct{} {
return ds.donech
}

func (ds *datastore) Shutdown() {
func (ds *datastore) Close() {
ds.closeAll()
}

func (ds *datastore) run(ctx context.Context) {
go func() {
select {
case <-ctx.Done():
ds.Close()
case <-ds.Done():
}
}()

go ds.waitReadyAll()
go ds.waitDoneAll()
}

func (ds *datastore) waitReadyAll() {
for _, c := range ds.controllers() {
select {
Expand Down
6 changes: 4 additions & 2 deletions ds_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,11 @@ func (b *dsBuilder) Create(ctx context.Context, cs kubernetes.Interface) (DS, er
ds := &datastore{
readych: make(chan struct{}),
donech: make(chan struct{}),
log: log.WithComponent("kail.ds"),
}

log = log.WithComponent("kail.ds.builder")

base, err := pod.NewController(ctx, log, cs, "")
if err != nil {
return nil, log.Err(err, "base pod controller")
Expand Down Expand Up @@ -272,8 +275,7 @@ func (b *dsBuilder) Create(ctx context.Context, cs kubernetes.Interface) (DS, er
}
}

go ds.waitReadyAll()
go ds.waitDoneAll()
ds.run(ctx)

return ds, nil
}
10 changes: 6 additions & 4 deletions monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ func (m *_monitor) run() {

go m.mainloop(ctx, donech)

<-m.lc.ShutdownRequest()
m.lc.ShutdownInitiated()
cancel()
go func() {
<-m.lc.ShutdownRequest()
m.lc.ShutdownInitiated()
cancel()
}()

<-donech
}
Expand Down Expand Up @@ -120,7 +122,7 @@ func (m *_monitor) readloop(ctx context.Context) error {

stream, err := req.Stream()
if err != nil {
return m.log.Err(err, "error opening stream")
return err
}

defer stream.Close()
Expand Down

0 comments on commit 9084348

Please sign in to comment.