Skip to content

Commit

Permalink
fresh rest client for each monitor
Browse files Browse the repository at this point in the history
 * add glog flags
 * filter out 'unexpected stream type ""' messages.
 * --dry-run: show only active containers
 * --dry-run: show container name

 fixes #10
  • Loading branch information
boz committed Sep 28, 2017
1 parent 15d7ebc commit 7852e59
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 41 deletions.
47 changes: 34 additions & 13 deletions cmd/kail/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
Expand All @@ -20,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

Expand Down Expand Up @@ -57,6 +59,14 @@ var (
PlaceHolder("DURATION").
Default("1s").
Duration()

flagGlogV = kingpin.Flag("glog-v", "glog -v value").
Default("0").
String()

flagGlogVmodule = kingpin.Flag("glog-vmodule", "glog -vmodule flag").
Default("").
String()
)

func main() {
Expand All @@ -72,7 +82,7 @@ func main() {

log := createLog()

cs := createKubeClient()
cs, rc := createKubeClient()

dsb := createDSBuilder()

Expand All @@ -84,13 +94,15 @@ func main() {

ds := createDS(ctx, cs, dsb)

filter := kail.NewContainerFilter(*flagContainers)

if *flagDryRun {

listPods(ds)
listPods(ds, filter)

} else {

streamLogs(createController(ctx, cs, ds))
streamLogs(createController(ctx, cs, rc, ds, filter))

}

Expand Down Expand Up @@ -123,23 +135,31 @@ func createLog() logutil.Log {
parent.Level = lvl
parent.Out = *flagLogFile

// XXX: fucking glog.
os.Args = []string{os.Args[0],
"-logtostderr=true",
"-v=" + *flagGlogV,
"-vmodule=" + *flagGlogVmodule,
}
flag.Parse()

return logutil_logrus.New(parent).WithComponent("kail.main")
}

func createKubeClient() kubernetes.Interface {
func createKubeClient() (kubernetes.Interface, *rest.Config) {
overrides := &clientcmd.ConfigOverrides{}

if flagContext != nil {
overrides.CurrentContext = *flagContext
}

cs, _, err := util.KubeClient(overrides)
cs, rc, err := util.KubeClient(overrides)
kingpin.FatalIfError(err, "Error configuring kubernetes connection")

_, err = cs.CoreV1().Namespaces().List(metav1.ListOptions{})
kingpin.FatalIfError(err, "Can't connnect to kubernetes")

return cs
return cs, rc
}

func createDSBuilder() kail.DSBuilder {
Expand Down Expand Up @@ -204,27 +224,28 @@ func createDS(ctx context.Context, cs kubernetes.Interface, dsb kail.DSBuilder)
return ds
}

func listPods(ds kail.DS) {
func listPods(ds kail.DS, filter kail.ContainerFilter) {
pods, err := ds.Pods().Cache().List()
kingpin.FatalIfError(err, "Error fetching pods")

w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0)

fmt.Fprintln(w, "NAMESPACE\tNAME\tNODE")
fmt.Fprintln(w, "NAMESPACE\tNAME\tCONTAINER\tNODE")

for _, pod := range pods {
fmt.Fprintf(w, "%v\t%v\t%v\n", pod.GetNamespace(), pod.GetName(), pod.Spec.NodeName)
_, sources := kail.SourcesForPod(filter, pod)
for _, source := range sources {
fmt.Fprintf(w, "%v\t%v\t%v\t%v\n", source.Namespace(), source.Name(), source.Container(), source.Node())
}
}

w.Flush()
}

func createController(
ctx context.Context, cs kubernetes.Interface, ds kail.DS) kail.Controller {

filter := kail.NewContainerFilter(*flagContainers)
ctx context.Context, cs kubernetes.Interface, rc *rest.Config, ds kail.DS, filter kail.ContainerFilter) kail.Controller {

controller, err := kail.NewController(ctx, cs, ds.Pods(), filter, *flagSince)
controller, err := kail.NewController(ctx, cs, rc, ds.Pods(), filter, *flagSince)
kingpin.FatalIfError(err, "Error creating controller")

return controller
Expand Down
14 changes: 5 additions & 9 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

lifecycle "github.com/boz/go-lifecycle"
logutil "github.com/boz/go-logutil"
Expand All @@ -27,6 +28,7 @@ type Controller interface {
func NewController(
ctx context.Context,
cs kubernetes.Interface,
rc *rest.Config,
pcontroller pod.Controller,
filter ContainerFilter,
since time.Duration) (Controller, error) {
Expand All @@ -50,6 +52,7 @@ func NewController(

c := &controller{
cs: cs,
rc: rc,
pods: pods,
filter: filter,
mconfig: monitorConfig{since: since},
Expand All @@ -68,6 +71,7 @@ func NewController(

type controller struct {
cs kubernetes.Interface
rc *rest.Config
pods pod.Subscription
filter ContainerFilter

Expand Down Expand Up @@ -183,15 +187,7 @@ func (c *controller) handlePodEvent(ev pod.Event) {
}

func (c *controller) ensureMonitorsForPod(pod *v1.Pod) {
id := nsname.ForObject(pod)
sources := make(map[eventSource]bool)

for _, cstatus := range pod.Status.ContainerStatuses {
if c.filter.Accept(cstatus) {
source := eventSource{id, cstatus.Name, pod.Spec.NodeName}
sources[source] = true
}
}
id, sources := sourcesForPod(c.filter, pod)

c.log.Debugf("pod %v/%v: %v containers ready",
pod.GetNamespace(), pod.GetName(), len(sources))
Expand Down
38 changes: 37 additions & 1 deletion filter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package kail

import "k8s.io/api/core/v1"
import (
"sort"

"github.com/boz/kcache/nsname"
"k8s.io/api/core/v1"
)

type ContainerFilter interface {
Accept(cs v1.ContainerStatus) bool
Expand All @@ -26,3 +31,34 @@ func (cf containerFilter) Accept(cs v1.ContainerStatus) bool {
}
return false
}

func sourcesForPod(filter ContainerFilter, pod *v1.Pod) (nsname.NSName, map[eventSource]bool) {
id := nsname.ForObject(pod)
sources := make(map[eventSource]bool)

for _, cstatus := range pod.Status.ContainerStatuses {
if filter.Accept(cstatus) {
source := eventSource{id, cstatus.Name, pod.Spec.NodeName}
sources[source] = true
}
}

return id, sources
}

func SourcesForPod(
filter ContainerFilter, pod *v1.Pod) (nsname.NSName, []EventSource) {

id, internal := sourcesForPod(filter, pod)
sources := make([]EventSource, 0, len(internal))

for source, _ := range internal {
sources = append(sources, source)
}

sort.Slice(sources, func(a, b int) bool {
return sources[a].Name() < sources[b].Name()
})

return id, sources
}
59 changes: 46 additions & 13 deletions monitor.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package kail

import (
"bytes"
"context"
"fmt"
"io"
"time"

"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"

lifecycle "github.com/boz/go-lifecycle"
logutil "github.com/boz/go-logutil"
Expand All @@ -17,6 +20,10 @@ const (
logBufsiz = 1024
)

var (
canaryLog = []byte("unexpected stream type \"\"")
)

type monitorConfig struct {
since time.Duration
}
Expand All @@ -34,7 +41,7 @@ func newMonitor(c *controller, source EventSource, config monitorConfig) monitor
fmt.Sprintf("monitor [%v]", source))

m := &_monitor{
core: c.cs.CoreV1(),
rc: c.rc,
source: source,
config: config,
eventch: c.eventch,
Expand All @@ -49,7 +56,7 @@ func newMonitor(c *controller, source EventSource, config monitorConfig) monitor
}

type _monitor struct {
core corev1.CoreV1Interface
rc *rest.Config
source EventSource
config monitorConfig
eventch chan<- Event
Expand All @@ -72,18 +79,34 @@ func (m *_monitor) run() {

ctx, cancel := context.WithCancel(m.ctx)

client, err := m.makeClient(ctx)
if err != nil {
m.lc.ShutdownInitiated(err)
cancel()
return
}

donech := make(chan struct{})

go m.mainloop(ctx, donech)
go m.mainloop(ctx, client, donech)

err := <-m.lc.ShutdownRequest()
err = <-m.lc.ShutdownRequest()
m.lc.ShutdownInitiated(err)
cancel()

<-donech
}

func (m *_monitor) mainloop(ctx context.Context, donech chan struct{}) {
func (m *_monitor) makeClient(ctx context.Context) (corev1.CoreV1Interface, error) {
cs, err := kubernetes.NewForConfig(m.rc)
if err != nil {
return nil, err
}
return cs.CoreV1(), nil
}

func (m *_monitor) mainloop(
ctx context.Context, client corev1.CoreV1Interface, donech chan struct{}) {
defer m.log.Un(m.log.Trace("mainloop"))
defer close(donech)

Expand All @@ -94,8 +117,11 @@ func (m *_monitor) mainloop(ctx context.Context, donech chan struct{}) {

m.log.Debugf("displaying logs since %v seconds", sinceSecs)

for ctx.Err() == nil {
err := m.readloop(ctx, since)
for i := 0; ctx.Err() == nil; i++ {

m.log.Debugf("readloop count: %v", i)

err := m.readloop(ctx, client, since)
switch {
case err == io.EOF:
case err == nil:
Expand All @@ -111,7 +137,9 @@ func (m *_monitor) mainloop(ctx context.Context, donech chan struct{}) {
}
}

func (m *_monitor) readloop(ctx context.Context, since *int64) error {
func (m *_monitor) readloop(
ctx context.Context, client corev1.CoreV1Interface, since *int64) error {

defer m.log.Un(m.log.Trace("readloop"))

opts := &v1.PodLogOptions{
Expand All @@ -120,11 +148,10 @@ func (m *_monitor) readloop(ctx context.Context, since *int64) error {
SinceSeconds: since,
}

req := m.core.
req := client.
Pods(m.source.Namespace()).
GetLogs(m.source.Name(), opts)

req = req.Context(ctx)
GetLogs(m.source.Name(), opts).
Context(ctx)

stream, err := req.Stream()
if err != nil {
Expand All @@ -148,7 +175,13 @@ func (m *_monitor) readloop(ctx context.Context, since *int64) error {
return io.EOF
}

event := newEvent(m.source, logbuf[0:nread])
log := logbuf[0:nread]

if bytes.Compare(canaryLog, log) == 0 {
continue
}

event := newEvent(m.source, log)

select {
case m.eventch <- event:
Expand Down
Loading

0 comments on commit 7852e59

Please sign in to comment.