From 3e285bfdbcd7c979a375ae4e9144a719d87f3054 Mon Sep 17 00:00:00 2001 From: Adam Bozanich Date: Mon, 27 Nov 2017 23:21:35 -0800 Subject: [PATCH] line-buffer logs * emit event for every line, buffering if necessary. * fix memory corruption (stop sharing bufer) * delivery timeout after 1ms fixes #16 --- _example/prod.yml | 9 ++++++- buffer.go | 52 ++++++++++++++++++++++++++++++++++++ buffer_test.go | 66 ++++++++++++++++++++++++++++++++++++++++++++++ monitor.go | 27 +++++++++++++++---- vendor/vendor.json | 12 +++++++++ 5 files changed, 160 insertions(+), 6 deletions(-) create mode 100644 buffer.go create mode 100644 buffer_test.go diff --git a/_example/prod.yml b/_example/prod.yml index cb5ba14..99fc6ba 100644 --- a/_example/prod.yml +++ b/_example/prod.yml @@ -27,7 +27,14 @@ spec: command: - /bin/sh - "-c" - - i=0; while true; do echo "GET /users/$i"; sleep 1; i=$((i + 1)); done + - | + i=0; + while true; do + echo "GET /users/$i + no-cache"; + sleep 1; + i=$((i + 1)); + done - name: cache image: busybox command: diff --git a/buffer.go b/buffer.go new file mode 100644 index 0000000..0bc66d6 --- /dev/null +++ b/buffer.go @@ -0,0 +1,52 @@ +package kail + +import "bytes" + +const bufferMaxRetainSize = logBufsiz + +type buffer interface { + process([]byte) []Event +} + +type _buffer struct { + source EventSource + prev *bytes.Buffer +} + +func newBuffer(source EventSource) buffer { + return &_buffer{source, new(bytes.Buffer)} +} + +func (b *_buffer) process(log []byte) []Event { + + var events []Event + + for end := bytes.IndexRune(log, '\n'); end >= 0 && len(log) > 0; end = bytes.IndexRune(log, '\n') { + var ebuf []byte + + if plen := b.prev.Len(); plen > 0 { + ebuf = make([]byte, plen+end) + copy(ebuf, b.prev.Bytes()) + copy(ebuf[plen:], log[:end]) + b.prev.Reset() + } else { + ebuf = make([]byte, end) + copy(ebuf, log[:end]) + } + + events = append(events, newEvent(b.source, ebuf)) + log = log[end+1:] + } + + if sz := len(log); sz > 0 { + b.prev.Write(log) + if plen := b.prev.Len(); plen >= bufferMaxRetainSize { + ebuf := make([]byte, plen) + copy(ebuf, b.prev.Bytes()) + events = append(events, newEvent(b.source, ebuf)) + b.prev.Reset() + } + } + + return events +} diff --git a/buffer_test.go b/buffer_test.go new file mode 100644 index 0000000..d6b74d2 --- /dev/null +++ b/buffer_test.go @@ -0,0 +1,66 @@ +package kail + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestBuffer(t *testing.T) { + source := eventSource{} + + { + buffer := newBuffer(source) + events := buffer.process([]byte("")) + assert.Empty(t, events) + + events = buffer.process([]byte("foo\n")) + assert.Len(t, events, 1) + assert.Equal(t, "foo", string(events[0].Log())) + } + + { + buffer := newBuffer(source) + events := buffer.process([]byte("foo")) + assert.Empty(t, events) + + events = buffer.process([]byte("\n")) + assert.Len(t, events, 1) + assert.Equal(t, "foo", string(events[0].Log())) + } + + { + buffer := newBuffer(source) + events := buffer.process([]byte("foo\n")) + assert.Len(t, events, 1) + assert.Equal(t, "foo", string(events[0].Log())) + + events = buffer.process([]byte("bar\n")) + assert.Len(t, events, 1) + assert.Equal(t, "bar", string(events[0].Log())) + } + + { + buffer := newBuffer(source) + events := buffer.process([]byte("foo\nbar\n")) + assert.Len(t, events, 2) + assert.Equal(t, "foo", string(events[0].Log())) + assert.Equal(t, "bar", string(events[1].Log())) + + events = buffer.process([]byte("baz\n")) + assert.Len(t, events, 1) + assert.Equal(t, "baz", string(events[0].Log())) + } + + { + buffer := newBuffer(source) + events := buffer.process([]byte("foo\nbar")) + assert.Len(t, events, 1) + assert.Equal(t, "foo", string(events[0].Log())) + + events = buffer.process([]byte("baz\n")) + assert.Len(t, events, 1) + assert.Equal(t, "barbaz", string(events[0].Log())) + } + +} diff --git a/monitor.go b/monitor.go index 81cf063..c9c7719 100644 --- a/monitor.go +++ b/monitor.go @@ -17,7 +17,8 @@ import ( ) const ( - logBufsiz = 1024 + logBufsiz = 1024 * 16 // 16k max message size + monitorDeliverWait = time.Millisecond ) var ( @@ -161,6 +162,8 @@ func (m *_monitor) readloop( defer stream.Close() logbuf := make([]byte, logBufsiz) + buffer := newBuffer(m.source) + for ctx.Err() == nil { nread, err := stream.Read(logbuf) @@ -178,16 +181,30 @@ func (m *_monitor) readloop( log := logbuf[0:nread] if bytes.Compare(canaryLog, log) == 0 { + m.log.Warnf("received 'unexpect stream type'") continue } - event := newEvent(m.source, log) + if events := buffer.process(log); len(events) > 0 { + m.deliverEvents(ctx, events) + } + + } + return nil +} +func (m *_monitor) deliverEvents(ctx context.Context, events []Event) { + t := time.NewTimer(monitorDeliverWait) + defer t.Stop() + + for i, event := range events { select { case m.eventch <- event: - default: - m.log.Warnf("event buffer full. dropping logs %v", nread) + case <-t.C: + m.log.Warnf("event buffer full. dropping %v logs", len(events)-i) + return + case <-ctx.Done(): + return } } - return nil } diff --git a/vendor/vendor.json b/vendor/vendor.json index 18f3ce0..cce15f0 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -452,6 +452,12 @@ "revision": "2b3a18b5f0fb6b4f9190549597d3f962c02bc5eb", "revisionTime": "2017-09-10T13:46:14Z" }, + { + "checksumSHA1": "LuFv4/jlrmFNnDb/5SCSEPAM9vU=", + "path": "github.com/pmezard/go-difflib/difflib", + "revision": "792786c7400a136282c1664665ae0a8db921c6c2", + "revisionTime": "2016-01-10T10:55:54Z" + }, { "checksumSHA1": "I5DMrLM0FpmvG81tAWZ3zoydBIo=", "path": "github.com/sirupsen/logrus", @@ -464,6 +470,12 @@ "revision": "e57e3eeb33f795204c1ca35f56c44f83227c6e66", "revisionTime": "2017-05-08T18:43:26Z" }, + { + "checksumSHA1": "mGbTYZ8dHVTiPTTJu3ktp+84pPI=", + "path": "github.com/stretchr/testify/assert", + "revision": "2aa2c176b9dab406a6970f6a55f513e8a8c8b18f", + "revisionTime": "2017-08-14T20:04:35Z" + }, { "checksumSHA1": "9Zw986fuQM/hCoVd8vmHoSM+8sU=", "path": "github.com/ugorji/go/codec",