Skip to content

Commit

Permalink
Merge pull request #2 from satyrius/stats
Browse files Browse the repository at this point in the history
Aggregation reducers
  • Loading branch information
satyrius committed Mar 30, 2014
2 parents 485a19d + 150d49f commit c06432f
Show file tree
Hide file tree
Showing 11 changed files with 570 additions and 32 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
language: go
go:
- 1.1
- 1.2
- tip
install: make deps
script: go test -v -bench .
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ deps:
dev-deps:
go get github.com/nsf/gocode
go get code.google.com/p/rog-go/exp/cmd/godef
go install code.google.com/p/rog-go/exp/cmd/godef
79 changes: 76 additions & 3 deletions entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,91 @@ package gonx

import (
"fmt"
"strconv"
"strings"
)

// Shortcut for the map of strings
type Fields map[string]string

// Parsed log record. Use Get method to retrieve a value by name instead of
// threating this as a map, because inner representation is in design.
type Entry map[string]string
type Entry struct {
fields Fields
}

// Creates an empty Entry to be filled later
func NewEmptyEntry() *Entry {
return &Entry{make(Fields)}
}

// Creates an Entry with fiven fields
func NewEntry(fields Fields) *Entry {
return &Entry{fields}
}

// Return entry field value by name or empty string and error if it
// does not exist.
func (entry *Entry) Get(name string) (value string, err error) {
value, ok := (*entry)[name]
func (entry *Entry) Field(name string) (value string, err error) {
value, ok := entry.fields[name]
if !ok {
err = fmt.Errorf("Field '%v' does not found in record %+v", name, *entry)
}
return
}

// Return entry field value as float64. Rutuen nil if field does not exists
// and convertion error if cannot cast a type.
func (entry *Entry) FloatField(name string) (value float64, err error) {
tmp, err := entry.Field(name)
if err == nil {
value, err = strconv.ParseFloat(tmp, 64)
}
return
}

// Field value setter
func (entry *Entry) SetField(name string, value string) {
entry.fields[name] = value
}

// Float field value setter. It accepts float64, but still store it as a
// string in the same fields map. The precision is 2, its enough for log
// parsing task
func (entry *Entry) SetFloatField(name string, value float64) {
entry.SetField(name, strconv.FormatFloat(value, 'f', 2, 64))
}

// Integer field value setter. It accepts float64, but still store it as a
// string in the same fields map.
func (entry *Entry) SetUintField(name string, value uint64) {
entry.SetField(name, strconv.FormatUint(uint64(value), 10))
}

// Merge two entries by updating values for master entry with given.
func (master *Entry) Merge(entry *Entry) {
for name, value := range entry.fields {
master.SetField(name, value)
}
}

func (entry *Entry) FieldsHash(fields []string) string {
var key []string
for _, name := range fields {
value, err := entry.Field(name)
if err != nil {
value = "NULL"
}
key = append(key, fmt.Sprintf("'%v'=%v", name, value))
}
return strings.Join(key, ";")
}

func (entry *Entry) Partial(fields []string) *Entry {
partial := NewEmptyEntry()
for _, name := range fields {
value, _ := entry.Field(name)
partial.SetField(name, value)
}
return partial
}
105 changes: 101 additions & 4 deletions entry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,113 @@ import (
"testing"
)

func TestEntry(t *testing.T) {
entry := Entry{"foo": "1"}
func TestGetEntry(t *testing.T) {
entry := NewEntry(Fields{"foo": "1"})

// Get existings field
val, err := entry.Get("foo")
val, err := entry.Field("foo")
assert.NoError(t, err)
assert.Equal(t, val, "1")

// Get field that does not exist
val, err = entry.Get("bar")
val, err = entry.Field("bar")
assert.Error(t, err)
assert.Equal(t, val, "")
}

func TestEntryFloatField(t *testing.T) {
entry := NewEntry(Fields{"foo": "1", "bar": "not a number"})

// Get existings field
val, err := entry.FloatField("foo")
assert.NoError(t, err)
assert.Equal(t, val, 1.0)

// Type casting eror
val, err = entry.FloatField("bar")
assert.Error(t, err)
assert.Equal(t, val, 0.0)

// Get field that does not exist
val, err = entry.FloatField("baz")
assert.Error(t, err)
assert.Equal(t, val, 0.0)
}

func TestSetEntryField(t *testing.T) {
entry := NewEmptyEntry()
assert.Equal(t, len(entry.fields), 0)

entry.SetField("foo", "123")
value, err := entry.Field("foo")
assert.NoError(t, err)
assert.Equal(t, value, "123")

entry.SetField("foo", "234")
value, err = entry.Field("foo")
assert.NoError(t, err)
assert.Equal(t, value, "234")
}

func TestSetEntryFloatField(t *testing.T) {
entry := NewEmptyEntry()
entry.SetFloatField("foo", 123.4567)
value, err := entry.Field("foo")
assert.NoError(t, err)
assert.Equal(t, value, "123.46")
}

func TestSetEntryUintField(t *testing.T) {
entry := NewEmptyEntry()
entry.SetUintField("foo", 123)
value, err := entry.Field("foo")
assert.NoError(t, err)
assert.Equal(t, value, "123")
}

func TestMergeEntries(t *testing.T) {
entry1 := NewEntry(Fields{"foo": "1", "bar": "hello"})
entry2 := NewEntry(Fields{"foo": "2", "bar": "hello", "name": "alpha"})
entry1.Merge(entry2)

val, err := entry1.Field("foo")
assert.NoError(t, err)
assert.Equal(t, val, "2")

val, err = entry1.Field("bar")
assert.NoError(t, err)
assert.Equal(t, val, "hello")

val, err = entry1.Field("name")
assert.NoError(t, err)
assert.Equal(t, val, "alpha")
}

func TestGetEntryGroupHash(t *testing.T) {
entry1 := NewEntry(Fields{"foo": "1", "bar": "Hello world #1", "name": "alpha"})
entry2 := NewEntry(Fields{"foo": "2", "bar": "Hello world #2", "name": "alpha"})
entry3 := NewEntry(Fields{"foo": "2", "bar": "Hello world #3", "name": "alpha"})
entry4 := NewEntry(Fields{"foo": "3", "bar": "Hello world #4", "name": "beta"})

fields := []string{"name"}
assert.Equal(t, entry1.FieldsHash(fields), entry2.FieldsHash(fields))
assert.Equal(t, entry1.FieldsHash(fields), entry3.FieldsHash(fields))
assert.NotEqual(t, entry1.FieldsHash(fields), entry4.FieldsHash(fields))

fields = []string{"name", "foo"}
assert.NotEqual(t, entry1.FieldsHash(fields), entry2.FieldsHash(fields))
assert.Equal(t, entry2.FieldsHash(fields), entry3.FieldsHash(fields))
assert.NotEqual(t, entry1.FieldsHash(fields), entry4.FieldsHash(fields))
assert.NotEqual(t, entry2.FieldsHash(fields), entry4.FieldsHash(fields))
}

func TestPartialEntry(t *testing.T) {
entry := NewEntry(Fields{"foo": "1", "bar": "Hello world #1", "name": "alpha"})
partial := entry.Partial([]string{"name", "foo"})

assert.Equal(t, len(partial.fields), 2)
val, _ := partial.Field("name")
assert.Equal(t, val, "alpha")
val, _ = partial.Field("foo")
assert.Equal(t, val, "1")
}
8 changes: 4 additions & 4 deletions mapreduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@ func handleError(err error) {
// when result will be readed from reducer's output channel, but the mapper
// works and fills input Entries channel until all lines will be read from
// the fiven file.
func MapReduce(file io.Reader, parser *Parser, reducer Reducer) interface{} {
func MapReduce(file io.Reader, parser *Parser, reducer Reducer) chan *Entry {
// Input file lines. This channel is unbuffered to publish
// next line to handle only when previous is taken by mapper.
var lines = make(chan string)

// Host thread to spawn new mappers
var entries = make(chan Entry, 10)
var entries = make(chan *Entry, 10)
go func(topLoad int) {
// Create semafore channel with capacity equal to the output channel
// capacity. Use it to control mapper goroutines spawn.
Expand Down Expand Up @@ -68,7 +68,7 @@ func MapReduce(file io.Reader, parser *Parser, reducer Reducer) interface{} {
}(cap(entries))

// Run reducer routine.
var output = make(chan interface{})
var output = make(chan *Entry)
go reducer.Reduce(entries, output)

go func() {
Expand All @@ -84,5 +84,5 @@ func MapReduce(file io.Reader, parser *Parser, reducer Reducer) interface{} {
}
}()

return <-output
return output
}
6 changes: 3 additions & 3 deletions parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewParser(format string) *Parser {

// Parse log file line using internal format regexp. If line do not match
// given format an error will be returned.
func (parser *Parser) ParseString(line string) (entry Entry, err error) {
func (parser *Parser) ParseString(line string) (entry *Entry, err error) {
re := parser.regexp
fields := re.FindStringSubmatch(line)
if fields == nil {
Expand All @@ -32,12 +32,12 @@ func (parser *Parser) ParseString(line string) (entry Entry, err error) {
}

// Iterate over subexp foung and fill the map record
entry = make(Entry)
entry = NewEmptyEntry()
for i, name := range re.SubexpNames() {
if i == 0 {
continue
}
entry[name] = fields[i]
entry.SetField(name, fields[i])
}
return
}
Expand Down
4 changes: 2 additions & 2 deletions parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ func (suite *ParserTestSuite) TestRegexp() {

func (suite *ParserTestSuite) TestParseString() {
line := `89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`
expected := Entry{
expected := NewEntry(Fields{
"remote_addr": "89.234.89.123",
"time_local": "08/Nov/2013:13:39:18 +0000",
"request": "GET /api/foo/bar HTTP/1.1",
}
})
entry, err := suite.parser.ParseString(line)
assert.NoError(suite.T(), err)
assert.Equal(suite.T(), entry, expected)
Expand Down
7 changes: 3 additions & 4 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
type Reader struct {
file io.Reader
parser *Parser
entries chan Entry
entries chan *Entry
}

// Creates reader for custom log format.
Expand All @@ -34,11 +34,10 @@ func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) (
}

// Get next parsed Entry from the log file. Return EOF if there is no Entries to read.
func (r *Reader) Read() (entry Entry, err error) {
func (r *Reader) Read() (entry *Entry, err error) {
if r.entries == nil {
r.entries = MapReduce(r.file, r.parser, new(ReadAll)).(chan Entry)
r.entries = MapReduce(r.file, r.parser, new(ReadAll))
}
// TODO return Entry reference instead of instance
entry, ok := <-r.entries
if !ok {
err = io.EOF
Expand Down
4 changes: 2 additions & 2 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ func TestRead(t *testing.T) {
reader := NewReader(file, format)
assert.Nil(t, reader.entries)

expected := Entry{
expected := NewEntry(Fields{
"remote_addr": "89.234.89.123",
"time_local": "08/Nov/2013:13:39:18 +0000",
"request": "GET /api/foo/bar HTTP/1.1",
}
})

// Read entry from incoming channel
entry, err := reader.Read()
Expand Down
Loading

0 comments on commit c06432f

Please sign in to comment.