diff --git a/.travis.yml b/.travis.yml index 3d6cdb0..c70013a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,7 @@ language: go go: - 1.1 + - 1.2 - tip install: make deps script: go test -v -bench . diff --git a/Makefile b/Makefile index f57a138..511bf23 100644 --- a/Makefile +++ b/Makefile @@ -10,3 +10,4 @@ deps: dev-deps: go get github.com/nsf/gocode go get code.google.com/p/rog-go/exp/cmd/godef + go install code.google.com/p/rog-go/exp/cmd/godef diff --git a/entry.go b/entry.go index 7e43fe8..7e57e2a 100644 --- a/entry.go +++ b/entry.go @@ -2,18 +2,91 @@ package gonx import ( "fmt" + "strconv" + "strings" ) +// Shortcut for the map of strings +type Fields map[string]string + // Parsed log record. Use Get method to retrieve a value by name instead of // threating this as a map, because inner representation is in design. -type Entry map[string]string +type Entry struct { + fields Fields +} + +// Creates an empty Entry to be filled later +func NewEmptyEntry() *Entry { + return &Entry{make(Fields)} +} + +// Creates an Entry with fiven fields +func NewEntry(fields Fields) *Entry { + return &Entry{fields} +} // Return entry field value by name or empty string and error if it // does not exist. -func (entry *Entry) Get(name string) (value string, err error) { - value, ok := (*entry)[name] +func (entry *Entry) Field(name string) (value string, err error) { + value, ok := entry.fields[name] if !ok { err = fmt.Errorf("Field '%v' does not found in record %+v", name, *entry) } return } + +// Return entry field value as float64. Rutuen nil if field does not exists +// and convertion error if cannot cast a type. +func (entry *Entry) FloatField(name string) (value float64, err error) { + tmp, err := entry.Field(name) + if err == nil { + value, err = strconv.ParseFloat(tmp, 64) + } + return +} + +// Field value setter +func (entry *Entry) SetField(name string, value string) { + entry.fields[name] = value +} + +// Float field value setter. It accepts float64, but still store it as a +// string in the same fields map. The precision is 2, its enough for log +// parsing task +func (entry *Entry) SetFloatField(name string, value float64) { + entry.SetField(name, strconv.FormatFloat(value, 'f', 2, 64)) +} + +// Integer field value setter. It accepts float64, but still store it as a +// string in the same fields map. +func (entry *Entry) SetUintField(name string, value uint64) { + entry.SetField(name, strconv.FormatUint(uint64(value), 10)) +} + +// Merge two entries by updating values for master entry with given. +func (master *Entry) Merge(entry *Entry) { + for name, value := range entry.fields { + master.SetField(name, value) + } +} + +func (entry *Entry) FieldsHash(fields []string) string { + var key []string + for _, name := range fields { + value, err := entry.Field(name) + if err != nil { + value = "NULL" + } + key = append(key, fmt.Sprintf("'%v'=%v", name, value)) + } + return strings.Join(key, ";") +} + +func (entry *Entry) Partial(fields []string) *Entry { + partial := NewEmptyEntry() + for _, name := range fields { + value, _ := entry.Field(name) + partial.SetField(name, value) + } + return partial +} diff --git a/entry_test.go b/entry_test.go index a5823c0..e50286f 100644 --- a/entry_test.go +++ b/entry_test.go @@ -5,16 +5,113 @@ import ( "testing" ) -func TestEntry(t *testing.T) { - entry := Entry{"foo": "1"} +func TestGetEntry(t *testing.T) { + entry := NewEntry(Fields{"foo": "1"}) // Get existings field - val, err := entry.Get("foo") + val, err := entry.Field("foo") assert.NoError(t, err) assert.Equal(t, val, "1") // Get field that does not exist - val, err = entry.Get("bar") + val, err = entry.Field("bar") assert.Error(t, err) assert.Equal(t, val, "") } + +func TestEntryFloatField(t *testing.T) { + entry := NewEntry(Fields{"foo": "1", "bar": "not a number"}) + + // Get existings field + val, err := entry.FloatField("foo") + assert.NoError(t, err) + assert.Equal(t, val, 1.0) + + // Type casting eror + val, err = entry.FloatField("bar") + assert.Error(t, err) + assert.Equal(t, val, 0.0) + + // Get field that does not exist + val, err = entry.FloatField("baz") + assert.Error(t, err) + assert.Equal(t, val, 0.0) +} + +func TestSetEntryField(t *testing.T) { + entry := NewEmptyEntry() + assert.Equal(t, len(entry.fields), 0) + + entry.SetField("foo", "123") + value, err := entry.Field("foo") + assert.NoError(t, err) + assert.Equal(t, value, "123") + + entry.SetField("foo", "234") + value, err = entry.Field("foo") + assert.NoError(t, err) + assert.Equal(t, value, "234") +} + +func TestSetEntryFloatField(t *testing.T) { + entry := NewEmptyEntry() + entry.SetFloatField("foo", 123.4567) + value, err := entry.Field("foo") + assert.NoError(t, err) + assert.Equal(t, value, "123.46") +} + +func TestSetEntryUintField(t *testing.T) { + entry := NewEmptyEntry() + entry.SetUintField("foo", 123) + value, err := entry.Field("foo") + assert.NoError(t, err) + assert.Equal(t, value, "123") +} + +func TestMergeEntries(t *testing.T) { + entry1 := NewEntry(Fields{"foo": "1", "bar": "hello"}) + entry2 := NewEntry(Fields{"foo": "2", "bar": "hello", "name": "alpha"}) + entry1.Merge(entry2) + + val, err := entry1.Field("foo") + assert.NoError(t, err) + assert.Equal(t, val, "2") + + val, err = entry1.Field("bar") + assert.NoError(t, err) + assert.Equal(t, val, "hello") + + val, err = entry1.Field("name") + assert.NoError(t, err) + assert.Equal(t, val, "alpha") +} + +func TestGetEntryGroupHash(t *testing.T) { + entry1 := NewEntry(Fields{"foo": "1", "bar": "Hello world #1", "name": "alpha"}) + entry2 := NewEntry(Fields{"foo": "2", "bar": "Hello world #2", "name": "alpha"}) + entry3 := NewEntry(Fields{"foo": "2", "bar": "Hello world #3", "name": "alpha"}) + entry4 := NewEntry(Fields{"foo": "3", "bar": "Hello world #4", "name": "beta"}) + + fields := []string{"name"} + assert.Equal(t, entry1.FieldsHash(fields), entry2.FieldsHash(fields)) + assert.Equal(t, entry1.FieldsHash(fields), entry3.FieldsHash(fields)) + assert.NotEqual(t, entry1.FieldsHash(fields), entry4.FieldsHash(fields)) + + fields = []string{"name", "foo"} + assert.NotEqual(t, entry1.FieldsHash(fields), entry2.FieldsHash(fields)) + assert.Equal(t, entry2.FieldsHash(fields), entry3.FieldsHash(fields)) + assert.NotEqual(t, entry1.FieldsHash(fields), entry4.FieldsHash(fields)) + assert.NotEqual(t, entry2.FieldsHash(fields), entry4.FieldsHash(fields)) +} + +func TestPartialEntry(t *testing.T) { + entry := NewEntry(Fields{"foo": "1", "bar": "Hello world #1", "name": "alpha"}) + partial := entry.Partial([]string{"name", "foo"}) + + assert.Equal(t, len(partial.fields), 2) + val, _ := partial.Field("name") + assert.Equal(t, val, "alpha") + val, _ = partial.Field("foo") + assert.Equal(t, val, "1") +} diff --git a/mapreduce.go b/mapreduce.go index b88c845..30e3fd8 100644 --- a/mapreduce.go +++ b/mapreduce.go @@ -15,13 +15,13 @@ func handleError(err error) { // when result will be readed from reducer's output channel, but the mapper // works and fills input Entries channel until all lines will be read from // the fiven file. -func MapReduce(file io.Reader, parser *Parser, reducer Reducer) interface{} { +func MapReduce(file io.Reader, parser *Parser, reducer Reducer) chan *Entry { // Input file lines. This channel is unbuffered to publish // next line to handle only when previous is taken by mapper. var lines = make(chan string) // Host thread to spawn new mappers - var entries = make(chan Entry, 10) + var entries = make(chan *Entry, 10) go func(topLoad int) { // Create semafore channel with capacity equal to the output channel // capacity. Use it to control mapper goroutines spawn. @@ -68,7 +68,7 @@ func MapReduce(file io.Reader, parser *Parser, reducer Reducer) interface{} { }(cap(entries)) // Run reducer routine. - var output = make(chan interface{}) + var output = make(chan *Entry) go reducer.Reduce(entries, output) go func() { @@ -84,5 +84,5 @@ func MapReduce(file io.Reader, parser *Parser, reducer Reducer) interface{} { } }() - return <-output + return output } diff --git a/parser.go b/parser.go index 0c16e4e..d9be163 100644 --- a/parser.go +++ b/parser.go @@ -23,7 +23,7 @@ func NewParser(format string) *Parser { // Parse log file line using internal format regexp. If line do not match // given format an error will be returned. -func (parser *Parser) ParseString(line string) (entry Entry, err error) { +func (parser *Parser) ParseString(line string) (entry *Entry, err error) { re := parser.regexp fields := re.FindStringSubmatch(line) if fields == nil { @@ -32,12 +32,12 @@ func (parser *Parser) ParseString(line string) (entry Entry, err error) { } // Iterate over subexp foung and fill the map record - entry = make(Entry) + entry = NewEmptyEntry() for i, name := range re.SubexpNames() { if i == 0 { continue } - entry[name] = fields[i] + entry.SetField(name, fields[i]) } return } diff --git a/parser_test.go b/parser_test.go index b44051a..b85650a 100644 --- a/parser_test.go +++ b/parser_test.go @@ -34,11 +34,11 @@ func (suite *ParserTestSuite) TestRegexp() { func (suite *ParserTestSuite) TestParseString() { line := `89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"` - expected := Entry{ + expected := NewEntry(Fields{ "remote_addr": "89.234.89.123", "time_local": "08/Nov/2013:13:39:18 +0000", "request": "GET /api/foo/bar HTTP/1.1", - } + }) entry, err := suite.parser.ParseString(line) assert.NoError(suite.T(), err) assert.Equal(suite.T(), entry, expected) diff --git a/reader.go b/reader.go index 293482d..2adeede 100644 --- a/reader.go +++ b/reader.go @@ -8,7 +8,7 @@ import ( type Reader struct { file io.Reader parser *Parser - entries chan Entry + entries chan *Entry } // Creates reader for custom log format. @@ -34,11 +34,10 @@ func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) ( } // Get next parsed Entry from the log file. Return EOF if there is no Entries to read. -func (r *Reader) Read() (entry Entry, err error) { +func (r *Reader) Read() (entry *Entry, err error) { if r.entries == nil { - r.entries = MapReduce(r.file, r.parser, new(ReadAll)).(chan Entry) + r.entries = MapReduce(r.file, r.parser, new(ReadAll)) } - // TODO return Entry reference instead of instance entry, ok := <-r.entries if !ok { err = io.EOF diff --git a/reader_test.go b/reader_test.go index fca5cef..434a9f3 100644 --- a/reader_test.go +++ b/reader_test.go @@ -13,11 +13,11 @@ func TestRead(t *testing.T) { reader := NewReader(file, format) assert.Nil(t, reader.entries) - expected := Entry{ + expected := NewEntry(Fields{ "remote_addr": "89.234.89.123", "time_local": "08/Nov/2013:13:39:18 +0000", "request": "GET /api/foo/bar HTTP/1.1", - } + }) // Read entry from incoming channel entry, err := reader.Read() diff --git a/reducer.go b/reducer.go index 04e5689..eb2380d 100644 --- a/reducer.go +++ b/reducer.go @@ -8,7 +8,7 @@ package gonx // It does not return values because usually it runs in a separate // goroutine and it is handy to use channel for reduced data retrieval. type Reducer interface { - Reduce(input chan Entry, output chan interface{}) + Reduce(input chan *Entry, output chan *Entry) } // Implements Reducer interface for simple input entries redirection to @@ -19,6 +19,165 @@ type ReadAll struct { // Redirect input Entries channel directly to the output without any // modifications. It is useful when you want jast to read file fast // using asynchronous with mapper routines. -func (r *ReadAll) Reduce(input chan Entry, output chan interface{}) { - output <- input +func (r *ReadAll) Reduce(input chan *Entry, output chan *Entry) { + for entry := range input { + output <- entry + } + close(output) +} + +// Implements Reducer interface to count entries +type Count struct { +} + +// Simply count entrries and write a sum to the output channel +func (r *Count) Reduce(input chan *Entry, output chan *Entry) { + var count uint64 = 0 + for { + _, ok := <-input + if !ok { + break + } + count++ + } + entry := NewEmptyEntry() + entry.SetUintField("count", count) + output <- entry + close(output) +} + +// Implements Reducer interface for summarize Entry values for the given fields +type Sum struct { + Fields []string +} + +// Summarize given Entry fields and return a map with result for each field. +func (r *Sum) Reduce(input chan *Entry, output chan *Entry) { + sum := make(map[string]float64) + for entry := range input { + for _, name := range r.Fields { + val, err := entry.FloatField(name) + if err == nil { + sum[name] += val + } + } + } + entry := NewEmptyEntry() + for name, val := range sum { + entry.SetFloatField(name, val) + } + output <- entry + close(output) +} + +// Implements Reducer interface for average entries values calculation +type Avg struct { + Fields []string +} + +// Calculate average value for input channel Entries, using configured Fields +// of the struct. Write result to the output channel as map[string]float64 +func (r *Avg) Reduce(input chan *Entry, output chan *Entry) { + avg := make(map[string]float64) + count := 0.0 + for entry := range input { + for _, name := range r.Fields { + val, err := entry.FloatField(name) + if err == nil { + avg[name] = (avg[name]*count + val) / (count + 1) + } + } + count++ + } + entry := NewEmptyEntry() + for name, val := range avg { + entry.SetFloatField(name, val) + } + output <- entry + close(output) +} + +// Implements Reducer interface for chaining other reducers +type Chain struct { + reducers []Reducer +} + +func NewChain(reducers ...Reducer) *Chain { + return &Chain{ + reducers: reducers, + } +} + +// Apply chain of reducers to the input channel of entries and merge results +func (r *Chain) Reduce(input chan *Entry, output chan *Entry) { + // Make input and output channel for each reducer + subInput := make([]chan *Entry, len(r.reducers)) + subOutput := make([]chan *Entry, len(r.reducers)) + for i, reducer := range r.reducers { + subInput[i] = make(chan *Entry, cap(input)) + subOutput[i] = make(chan *Entry, cap(output)) + go reducer.Reduce(subInput[i], subOutput[i]) + } + + // Read reducer master input channel + for entry := range input { + // Publish input entry for each sub-reducers to process + for _, sub := range subInput { + sub <- entry + } + } + for _, ch := range subInput { + close(ch) + } + + // Merge all results + entry := NewEmptyEntry() + for _, result := range subOutput { + entry.Merge(<-result) + } + + output <- entry + close(output) +} + +// Implements Reducer interface to apply other reducers and get data grouped by +// given fields. +type GroupBy struct { + Fields []string + reducers []Reducer +} + +func NewGroupBy(fields []string, reducers ...Reducer) *GroupBy { + return &GroupBy{ + Fields: fields, + reducers: reducers, + } +} + +// Apply related reducers and group data by Fields. +func (r *GroupBy) Reduce(input chan *Entry, output chan *Entry) { + subInput := make(map[string]chan *Entry) + subOutput := make(map[string]chan *Entry) + + // Read reducer master input channel and create discinct input chanel + // for each entry key we group by + for entry := range input { + key := entry.FieldsHash(r.Fields) + if _, ok := subInput[key]; !ok { + subInput[key] = make(chan *Entry, cap(input)) + subOutput[key] = make(chan *Entry, cap(output)+1) + subOutput[key] <- entry.Partial(r.Fields) + go NewChain(r.reducers...).Reduce(subInput[key], subOutput[key]) + } + subInput[key] <- entry + } + for _, ch := range subInput { + close(ch) + } + for _, ch := range subOutput { + entry := <-ch + entry.Merge(<-ch) + output <- entry + } + close(output) } diff --git a/reducer_test.go b/reducer_test.go index 8abd33a..2817148 100644 --- a/reducer_test.go +++ b/reducer_test.go @@ -9,16 +9,224 @@ func TestReadAllReducer(t *testing.T) { reducer := new(ReadAll) assert.Implements(t, (*Reducer)(nil), reducer) - // Prepare import chanel - input := make(chan Entry, 1) - input <- Entry{} + // Prepare import channel + input := make(chan *Entry, 1) + entry := NewEmptyEntry() + input <- entry + close(input) - output := make(chan interface{}, 1) // Make it buffered to avoid deadlock + output := make(chan *Entry, 1) // Make it buffered to avoid deadlock reducer.Reduce(input, output) // ReadAll reducer writes input channel to the output - result, opened := <-output - assert.True(t, opened) - _, ok := result.(chan Entry) + result, ok := <-output assert.True(t, ok) + assert.Equal(t, result, entry) +} + +func TestCountReducer(t *testing.T) { + reducer := new(Count) + assert.Implements(t, (*Reducer)(nil), reducer) + + // Prepare import channel + input := make(chan *Entry, 2) + input <- NewEmptyEntry() + input <- NewEmptyEntry() + close(input) + + output := make(chan *Entry, 1) // Make it buffered to avoid deadlock + reducer.Reduce(input, output) + + result, ok := <-output + assert.True(t, ok) + count, err := result.Field("count") + assert.NoError(t, err) + assert.Equal(t, count, "2") +} + +func TestSumReducer(t *testing.T) { + reducer := &Sum{[]string{"foo", "bar"}} + assert.Implements(t, (*Reducer)(nil), reducer) + + // Prepare import channel + input := make(chan *Entry, 2) + input <- NewEntry(Fields{ + "uri": "/asd/fgh", + "foo": "123", + "bar": "234", + "baz": "345", + }) + input <- NewEntry(Fields{ + "uri": "/zxc/vbn", + "foo": "456", + "bar": "567", + "baz": "678", + }) + close(input) + output := make(chan *Entry, 1) // Make it buffered to avoid deadlock + reducer.Reduce(input, output) + + result, ok := <-output + assert.True(t, ok) + value, err := result.FloatField("foo") + assert.NoError(t, err) + assert.Equal(t, value, 123.0+456) + value, err = result.FloatField("bar") + assert.NoError(t, err) + assert.Equal(t, value, 234.0+567.0) + _, err = result.Field("buz") + assert.Error(t, err) +} + +func TestAvgReducer(t *testing.T) { + reducer := &Avg{[]string{"foo", "bar"}} + assert.Implements(t, (*Reducer)(nil), reducer) + + // Prepare import channel + input := make(chan *Entry, 2) + input <- NewEntry(Fields{ + "uri": "/asd/fgh", + "foo": "123", + "bar": "234", + "baz": "345", + }) + input <- NewEntry(Fields{ + "uri": "/zxc/vbn", + "foo": "456", + "bar": "567", + "baz": "678", + }) + close(input) + output := make(chan *Entry, 1) // Make it buffered to avoid deadlock + reducer.Reduce(input, output) + + result, ok := <-output + assert.True(t, ok) + value, err := result.FloatField("foo") + assert.NoError(t, err) + assert.Equal(t, value, (123.0+456)/2.0) + value, err = result.FloatField("bar") + assert.NoError(t, err) + assert.Equal(t, value, (234.0+567.0)/2.0) + _, err = result.Field("buz") + assert.Error(t, err) +} + +func TestChainReducer(t *testing.T) { + reducer := NewChain(&Avg{[]string{"foo", "bar"}}, &Count{}) + assert.Implements(t, (*Reducer)(nil), reducer) + + // Prepare import channel + input := make(chan *Entry, 2) + input <- NewEntry(Fields{ + "uri": "/asd/fgh", + "foo": "123", + "bar": "234", + "baz": "345", + }) + input <- NewEntry(Fields{ + "uri": "/zxc/vbn", + "foo": "456", + "bar": "567", + "baz": "678", + }) + close(input) + output := make(chan *Entry, 1) // Make it buffered to avoid deadlock + reducer.Reduce(input, output) + + result, ok := <-output + assert.True(t, ok) + + value, err := result.FloatField("foo") + assert.NoError(t, err) + assert.Equal(t, value, (123.0+456)/2.0) + + value, err = result.FloatField("bar") + assert.NoError(t, err) + assert.Equal(t, value, (234.0+567.0)/2.0) + + count, err := result.Field("count") + assert.NoError(t, err) + assert.Equal(t, count, "2") + + _, err = result.Field("buz") + assert.Error(t, err) +} + +func TestGroupByReducer(t *testing.T) { + reducer := NewGroupBy( + // Fields to group by + []string{"host"}, + // Result reducers + &Sum{[]string{"foo", "bar"}}, + new(Count), + ) + assert.Implements(t, (*Reducer)(nil), reducer) + + // Prepare import channel + input := make(chan *Entry, 10) + input <- NewEntry(Fields{ + "uri": "/asd/fgh", + "host": "alpha.example.com", + "foo": "1", + "bar": "2", + "baz": "3", + }) + input <- NewEntry(Fields{ + "uri": "/zxc/vbn", + "host": "beta.example.com", + "foo": "4", + "bar": "5", + "baz": "6", + }) + input <- NewEntry(Fields{ + "uri": "/ijk/lmn", + "host": "beta.example.com", + "foo": "7", + "bar": "8", + "baz": "9", + }) + close(input) + output := make(chan *Entry, 2) // Make it buffered to avoid deadlock + reducer.Reduce(input, output) + + // Read and assert first group result + result, ok := <-output + assert.True(t, ok) + + value, err := result.Field("host") + assert.NoError(t, err) + assert.Equal(t, value, "alpha.example.com") + + floatVal, err := result.FloatField("foo") + assert.NoError(t, err) + assert.Equal(t, floatVal, 1.0) + + floatVal, err = result.FloatField("bar") + assert.NoError(t, err) + assert.Equal(t, floatVal, 2.0) + + value, err = result.Field("count") + assert.NoError(t, err) + assert.Equal(t, value, "1") + + // Read and assert second group result + result, ok = <-output + assert.True(t, ok) + + value, err = result.Field("host") + assert.NoError(t, err) + assert.Equal(t, value, "beta.example.com") + + floatVal, err = result.FloatField("foo") + assert.NoError(t, err) + assert.Equal(t, floatVal, 4.0+7.0) + + floatVal, err = result.FloatField("bar") + assert.NoError(t, err) + assert.Equal(t, floatVal, 5.0+8.0) + + value, err = result.Field("count") + assert.NoError(t, err) + assert.Equal(t, value, "2") }