diff --git a/cmd/kail/main.go b/cmd/kail/main.go index e32c4ac..597a1ad 100644 --- a/cmd/kail/main.go +++ b/cmd/kail/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "flag" "fmt" "os" "os/signal" @@ -20,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" ) @@ -57,6 +59,14 @@ var ( PlaceHolder("DURATION"). Default("1s"). Duration() + + flagGlogV = kingpin.Flag("glog-v", "glog -v value"). + Default("0"). + String() + + flagGlogVmodule = kingpin.Flag("glog-vmodule", "glog -vmodule flag"). + Default(""). + String() ) func main() { @@ -72,7 +82,7 @@ func main() { log := createLog() - cs := createKubeClient() + cs, rc := createKubeClient() dsb := createDSBuilder() @@ -84,13 +94,15 @@ func main() { ds := createDS(ctx, cs, dsb) + filter := kail.NewContainerFilter(*flagContainers) + if *flagDryRun { - listPods(ds) + listPods(ds, filter) } else { - streamLogs(createController(ctx, cs, ds)) + streamLogs(createController(ctx, cs, rc, ds, filter)) } @@ -123,23 +135,31 @@ func createLog() logutil.Log { parent.Level = lvl parent.Out = *flagLogFile + // XXX: fucking glog. + os.Args = []string{os.Args[0], + "-logtostderr=true", + "-v=" + *flagGlogV, + "-vmodule=" + *flagGlogVmodule, + } + flag.Parse() + return logutil_logrus.New(parent).WithComponent("kail.main") } -func createKubeClient() kubernetes.Interface { +func createKubeClient() (kubernetes.Interface, *rest.Config) { overrides := &clientcmd.ConfigOverrides{} if flagContext != nil { overrides.CurrentContext = *flagContext } - cs, _, err := util.KubeClient(overrides) + cs, rc, err := util.KubeClient(overrides) kingpin.FatalIfError(err, "Error configuring kubernetes connection") _, err = cs.CoreV1().Namespaces().List(metav1.ListOptions{}) kingpin.FatalIfError(err, "Can't connnect to kubernetes") - return cs + return cs, rc } func createDSBuilder() kail.DSBuilder { @@ -204,27 +224,28 @@ func createDS(ctx context.Context, cs kubernetes.Interface, dsb kail.DSBuilder) return ds } -func listPods(ds kail.DS) { +func listPods(ds kail.DS, filter kail.ContainerFilter) { pods, err := ds.Pods().Cache().List() kingpin.FatalIfError(err, "Error fetching pods") w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) - fmt.Fprintln(w, "NAMESPACE\tNAME\tNODE") + fmt.Fprintln(w, "NAMESPACE\tNAME\tCONTAINER\tNODE") for _, pod := range pods { - fmt.Fprintf(w, "%v\t%v\t%v\n", pod.GetNamespace(), pod.GetName(), pod.Spec.NodeName) + _, sources := kail.SourcesForPod(filter, pod) + for _, source := range sources { + fmt.Fprintf(w, "%v\t%v\t%v\t%v\n", source.Namespace(), source.Name(), source.Container(), source.Node()) + } } w.Flush() } func createController( - ctx context.Context, cs kubernetes.Interface, ds kail.DS) kail.Controller { - - filter := kail.NewContainerFilter(*flagContainers) + ctx context.Context, cs kubernetes.Interface, rc *rest.Config, ds kail.DS, filter kail.ContainerFilter) kail.Controller { - controller, err := kail.NewController(ctx, cs, ds.Pods(), filter, *flagSince) + controller, err := kail.NewController(ctx, cs, rc, ds.Pods(), filter, *flagSince) kingpin.FatalIfError(err, "Error creating controller") return controller diff --git a/controller.go b/controller.go index 69c0c4b..86532a6 100644 --- a/controller.go +++ b/controller.go @@ -6,6 +6,7 @@ import ( "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" lifecycle "github.com/boz/go-lifecycle" logutil "github.com/boz/go-logutil" @@ -27,6 +28,7 @@ type Controller interface { func NewController( ctx context.Context, cs kubernetes.Interface, + rc *rest.Config, pcontroller pod.Controller, filter ContainerFilter, since time.Duration) (Controller, error) { @@ -50,6 +52,7 @@ func NewController( c := &controller{ cs: cs, + rc: rc, pods: pods, filter: filter, mconfig: monitorConfig{since: since}, @@ -68,6 +71,7 @@ func NewController( type controller struct { cs kubernetes.Interface + rc *rest.Config pods pod.Subscription filter ContainerFilter @@ -183,15 +187,7 @@ func (c *controller) handlePodEvent(ev pod.Event) { } func (c *controller) ensureMonitorsForPod(pod *v1.Pod) { - id := nsname.ForObject(pod) - sources := make(map[eventSource]bool) - - for _, cstatus := range pod.Status.ContainerStatuses { - if c.filter.Accept(cstatus) { - source := eventSource{id, cstatus.Name, pod.Spec.NodeName} - sources[source] = true - } - } + id, sources := sourcesForPod(c.filter, pod) c.log.Debugf("pod %v/%v: %v containers ready", pod.GetNamespace(), pod.GetName(), len(sources)) diff --git a/filter.go b/filter.go index e196ce0..e80877c 100644 --- a/filter.go +++ b/filter.go @@ -1,6 +1,11 @@ package kail -import "k8s.io/api/core/v1" +import ( + "sort" + + "github.com/boz/kcache/nsname" + "k8s.io/api/core/v1" +) type ContainerFilter interface { Accept(cs v1.ContainerStatus) bool @@ -26,3 +31,34 @@ func (cf containerFilter) Accept(cs v1.ContainerStatus) bool { } return false } + +func sourcesForPod(filter ContainerFilter, pod *v1.Pod) (nsname.NSName, map[eventSource]bool) { + id := nsname.ForObject(pod) + sources := make(map[eventSource]bool) + + for _, cstatus := range pod.Status.ContainerStatuses { + if filter.Accept(cstatus) { + source := eventSource{id, cstatus.Name, pod.Spec.NodeName} + sources[source] = true + } + } + + return id, sources +} + +func SourcesForPod( + filter ContainerFilter, pod *v1.Pod) (nsname.NSName, []EventSource) { + + id, internal := sourcesForPod(filter, pod) + sources := make([]EventSource, 0, len(internal)) + + for source, _ := range internal { + sources = append(sources, source) + } + + sort.Slice(sources, func(a, b int) bool { + return sources[a].Name() < sources[b].Name() + }) + + return id, sources +} diff --git a/monitor.go b/monitor.go index 4d2532a..81cf063 100644 --- a/monitor.go +++ b/monitor.go @@ -1,13 +1,16 @@ package kail import ( + "bytes" "context" "fmt" "io" "time" "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/rest" lifecycle "github.com/boz/go-lifecycle" logutil "github.com/boz/go-logutil" @@ -17,6 +20,10 @@ const ( logBufsiz = 1024 ) +var ( + canaryLog = []byte("unexpected stream type \"\"") +) + type monitorConfig struct { since time.Duration } @@ -34,7 +41,7 @@ func newMonitor(c *controller, source EventSource, config monitorConfig) monitor fmt.Sprintf("monitor [%v]", source)) m := &_monitor{ - core: c.cs.CoreV1(), + rc: c.rc, source: source, config: config, eventch: c.eventch, @@ -49,7 +56,7 @@ func newMonitor(c *controller, source EventSource, config monitorConfig) monitor } type _monitor struct { - core corev1.CoreV1Interface + rc *rest.Config source EventSource config monitorConfig eventch chan<- Event @@ -72,18 +79,34 @@ func (m *_monitor) run() { ctx, cancel := context.WithCancel(m.ctx) + client, err := m.makeClient(ctx) + if err != nil { + m.lc.ShutdownInitiated(err) + cancel() + return + } + donech := make(chan struct{}) - go m.mainloop(ctx, donech) + go m.mainloop(ctx, client, donech) - err := <-m.lc.ShutdownRequest() + err = <-m.lc.ShutdownRequest() m.lc.ShutdownInitiated(err) cancel() <-donech } -func (m *_monitor) mainloop(ctx context.Context, donech chan struct{}) { +func (m *_monitor) makeClient(ctx context.Context) (corev1.CoreV1Interface, error) { + cs, err := kubernetes.NewForConfig(m.rc) + if err != nil { + return nil, err + } + return cs.CoreV1(), nil +} + +func (m *_monitor) mainloop( + ctx context.Context, client corev1.CoreV1Interface, donech chan struct{}) { defer m.log.Un(m.log.Trace("mainloop")) defer close(donech) @@ -94,8 +117,11 @@ func (m *_monitor) mainloop(ctx context.Context, donech chan struct{}) { m.log.Debugf("displaying logs since %v seconds", sinceSecs) - for ctx.Err() == nil { - err := m.readloop(ctx, since) + for i := 0; ctx.Err() == nil; i++ { + + m.log.Debugf("readloop count: %v", i) + + err := m.readloop(ctx, client, since) switch { case err == io.EOF: case err == nil: @@ -111,7 +137,9 @@ func (m *_monitor) mainloop(ctx context.Context, donech chan struct{}) { } } -func (m *_monitor) readloop(ctx context.Context, since *int64) error { +func (m *_monitor) readloop( + ctx context.Context, client corev1.CoreV1Interface, since *int64) error { + defer m.log.Un(m.log.Trace("readloop")) opts := &v1.PodLogOptions{ @@ -120,11 +148,10 @@ func (m *_monitor) readloop(ctx context.Context, since *int64) error { SinceSeconds: since, } - req := m.core. + req := client. Pods(m.source.Namespace()). - GetLogs(m.source.Name(), opts) - - req = req.Context(ctx) + GetLogs(m.source.Name(), opts). + Context(ctx) stream, err := req.Stream() if err != nil { @@ -148,7 +175,13 @@ func (m *_monitor) readloop(ctx context.Context, since *int64) error { return io.EOF } - event := newEvent(m.source, logbuf[0:nread]) + log := logbuf[0:nread] + + if bytes.Compare(canaryLog, log) == 0 { + continue + } + + event := newEvent(m.source, log) select { case m.eventch <- event: diff --git a/vendor/vendor.json b/vendor/vendor.json index f6db0ce..336701c 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -299,7 +299,7 @@ "revisionTime": "2017-08-08T02:16:21Z" }, { - "checksumSHA1": "kHrNY4ktruLxWd+qxbMw90KfO1Y=", + "checksumSHA1": "mygGPzw0Q81XnyUgA7+YYDIjYD8=", "path": "github.com/google/btree", "revision": "316fb6d3f031ae8f4d457c6c5186b9e3ded70435", "revisionTime": "2016-12-17T18:35:37Z" @@ -543,7 +543,7 @@ "revisionTime": "2017-07-25T16:55:14Z" }, { - "checksumSHA1": "9/T0BIuAN9HJmAYwQM+0l2Wi7R4=", + "checksumSHA1": "pLUgsMw7bn9d4FrZaL/RUSCfNwQ=", "path": "golang.org/x/sys/unix", "revision": "2d3e384235de683634e9080b58f757466840aa48", "revisionTime": "2017-08-14T10:24:55Z" @@ -567,19 +567,19 @@ "revisionTime": "2017-07-06T13:46:35Z" }, { - "checksumSHA1": "MRLtTu/vpd18le8/HPLxOdjO5RE=", + "checksumSHA1": "KG+XZAbxdkpBm3Fa3bJ3Ylq8CKI=", "path": "golang.org/x/text/unicode/bidi", "revision": "b19bf474d317b857955b12035d2c5acb57ce8b01", "revisionTime": "2017-07-06T13:46:35Z" }, { - "checksumSHA1": "kKylzIrLEnH8NKyeVAL0dq5gjVQ=", + "checksumSHA1": "Anof4bt0AU+Sa3R8Rq0KBnlpbaQ=", "path": "golang.org/x/text/unicode/norm", "revision": "b19bf474d317b857955b12035d2c5acb57ce8b01", "revisionTime": "2017-07-06T13:46:35Z" }, { - "checksumSHA1": "mfeW9NCKg58o6w5bbXPvpixpIw0=", + "checksumSHA1": "U1OTBlgTRUe9ZdMsbISL1E+eMm8=", "path": "golang.org/x/text/width", "revision": "b19bf474d317b857955b12035d2c5acb57ce8b01", "revisionTime": "2017-07-06T13:46:35Z"