Skip to content

Commit

Permalink
line-buffer logs
Browse files Browse the repository at this point in the history
 * emit event for every line, buffering if necessary.
 * fix memory corruption (stop sharing bufer)
 * delivery timeout after 1ms

fixes #16
  • Loading branch information
boz committed Nov 28, 2017
1 parent 67ab5fd commit 3e285bf
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 6 deletions.
9 changes: 8 additions & 1 deletion _example/prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ spec:
command:
- /bin/sh
- "-c"
- i=0; while true; do echo "GET /users/$i"; sleep 1; i=$((i + 1)); done
- |
i=0;
while true; do
echo "GET /users/$i
no-cache";
sleep 1;
i=$((i + 1));
done
- name: cache
image: busybox
command:
Expand Down
52 changes: 52 additions & 0 deletions buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package kail

import "bytes"

const bufferMaxRetainSize = logBufsiz

type buffer interface {
process([]byte) []Event
}

type _buffer struct {
source EventSource
prev *bytes.Buffer
}

func newBuffer(source EventSource) buffer {
return &_buffer{source, new(bytes.Buffer)}
}

func (b *_buffer) process(log []byte) []Event {

var events []Event

for end := bytes.IndexRune(log, '\n'); end >= 0 && len(log) > 0; end = bytes.IndexRune(log, '\n') {
var ebuf []byte

if plen := b.prev.Len(); plen > 0 {
ebuf = make([]byte, plen+end)
copy(ebuf, b.prev.Bytes())
copy(ebuf[plen:], log[:end])
b.prev.Reset()
} else {
ebuf = make([]byte, end)
copy(ebuf, log[:end])
}

events = append(events, newEvent(b.source, ebuf))
log = log[end+1:]
}

if sz := len(log); sz > 0 {
b.prev.Write(log)
if plen := b.prev.Len(); plen >= bufferMaxRetainSize {
ebuf := make([]byte, plen)
copy(ebuf, b.prev.Bytes())
events = append(events, newEvent(b.source, ebuf))
b.prev.Reset()
}
}

return events
}
66 changes: 66 additions & 0 deletions buffer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package kail

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBuffer(t *testing.T) {
source := eventSource{}

{
buffer := newBuffer(source)
events := buffer.process([]byte(""))
assert.Empty(t, events)

events = buffer.process([]byte("foo\n"))
assert.Len(t, events, 1)
assert.Equal(t, "foo", string(events[0].Log()))
}

{
buffer := newBuffer(source)
events := buffer.process([]byte("foo"))
assert.Empty(t, events)

events = buffer.process([]byte("\n"))
assert.Len(t, events, 1)
assert.Equal(t, "foo", string(events[0].Log()))
}

{
buffer := newBuffer(source)
events := buffer.process([]byte("foo\n"))
assert.Len(t, events, 1)
assert.Equal(t, "foo", string(events[0].Log()))

events = buffer.process([]byte("bar\n"))
assert.Len(t, events, 1)
assert.Equal(t, "bar", string(events[0].Log()))
}

{
buffer := newBuffer(source)
events := buffer.process([]byte("foo\nbar\n"))
assert.Len(t, events, 2)
assert.Equal(t, "foo", string(events[0].Log()))
assert.Equal(t, "bar", string(events[1].Log()))

events = buffer.process([]byte("baz\n"))
assert.Len(t, events, 1)
assert.Equal(t, "baz", string(events[0].Log()))
}

{
buffer := newBuffer(source)
events := buffer.process([]byte("foo\nbar"))
assert.Len(t, events, 1)
assert.Equal(t, "foo", string(events[0].Log()))

events = buffer.process([]byte("baz\n"))
assert.Len(t, events, 1)
assert.Equal(t, "barbaz", string(events[0].Log()))
}

}
27 changes: 22 additions & 5 deletions monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (
)

const (
logBufsiz = 1024
logBufsiz = 1024 * 16 // 16k max message size
monitorDeliverWait = time.Millisecond
)

var (
Expand Down Expand Up @@ -161,6 +162,8 @@ func (m *_monitor) readloop(
defer stream.Close()

logbuf := make([]byte, logBufsiz)
buffer := newBuffer(m.source)

for ctx.Err() == nil {
nread, err := stream.Read(logbuf)

Expand All @@ -178,16 +181,30 @@ func (m *_monitor) readloop(
log := logbuf[0:nread]

if bytes.Compare(canaryLog, log) == 0 {
m.log.Warnf("received 'unexpect stream type'")
continue
}

event := newEvent(m.source, log)
if events := buffer.process(log); len(events) > 0 {
m.deliverEvents(ctx, events)
}

}
return nil
}

func (m *_monitor) deliverEvents(ctx context.Context, events []Event) {
t := time.NewTimer(monitorDeliverWait)
defer t.Stop()

for i, event := range events {
select {
case m.eventch <- event:
default:
m.log.Warnf("event buffer full. dropping logs %v", nread)
case <-t.C:
m.log.Warnf("event buffer full. dropping %v logs", len(events)-i)
return
case <-ctx.Done():
return
}
}
return nil
}
12 changes: 12 additions & 0 deletions vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,12 @@
"revision": "2b3a18b5f0fb6b4f9190549597d3f962c02bc5eb",
"revisionTime": "2017-09-10T13:46:14Z"
},
{
"checksumSHA1": "LuFv4/jlrmFNnDb/5SCSEPAM9vU=",
"path": "github.com/pmezard/go-difflib/difflib",
"revision": "792786c7400a136282c1664665ae0a8db921c6c2",
"revisionTime": "2016-01-10T10:55:54Z"
},
{
"checksumSHA1": "I5DMrLM0FpmvG81tAWZ3zoydBIo=",
"path": "github.com/sirupsen/logrus",
Expand All @@ -464,6 +470,12 @@
"revision": "e57e3eeb33f795204c1ca35f56c44f83227c6e66",
"revisionTime": "2017-05-08T18:43:26Z"
},
{
"checksumSHA1": "mGbTYZ8dHVTiPTTJu3ktp+84pPI=",
"path": "github.com/stretchr/testify/assert",
"revision": "2aa2c176b9dab406a6970f6a55f513e8a8c8b18f",
"revisionTime": "2017-08-14T20:04:35Z"
},
{
"checksumSHA1": "9Zw986fuQM/hCoVd8vmHoSM+8sU=",
"path": "github.com/ugorji/go/codec",
Expand Down

0 comments on commit 3e285bf

Please sign in to comment.