diff --git a/db/slurper_postgres.go b/db/slurper_postgres.go index eb13a78..77f2883 100644 --- a/db/slurper_postgres.go +++ b/db/slurper_postgres.go @@ -186,7 +186,7 @@ func (s *PostgresSlurper) slurpPostgres(eventchan chan types.Entry) { s.Logger.WithFields(log.Fields{ "chunksize": s.ChunkSize, "table": s.CurrentTableName, - }).Info("COPY complete") + }).Debug("COPY complete") } copybuf.Reset() } diff --git a/processing/flow_aggregator_test.go b/processing/flow_aggregator_test.go index ab7f8aa..4e96cc6 100644 --- a/processing/flow_aggregator_test.go +++ b/processing/flow_aggregator_test.go @@ -27,6 +27,8 @@ func makeFlowEvent() types.Entry { PktsToClient: int64(rand.Intn(100)), PktsToServer: int64(rand.Intn(100)), } + jsonBytes, _ := json.Marshal(e) + e.JSONLine = string(jsonBytes) return e } diff --git a/processing/handler_dispatcher.go b/processing/handler_dispatcher.go index 671edc0..88983cb 100644 --- a/processing/handler_dispatcher.go +++ b/processing/handler_dispatcher.go @@ -1,7 +1,7 @@ package processing // DCSO FEVER -// Copyright (c) 2017, DCSO GmbH +// Copyright (c) 2017, 2018, DCSO GmbH import ( "sync" @@ -26,7 +26,7 @@ type HandlerDispatcherPerfStats struct { type HandlerDispatcher struct { Lock sync.Mutex DispatchMap map[string]([]Handler) - DefaultHandler Handler + DBHandler Handler PerfStats HandlerDispatcherPerfStats Logger *log.Entry StatsEncoder *util.PerformanceStatsEncoder @@ -34,26 +34,25 @@ type HandlerDispatcher struct { StoppedCounterChan chan bool } -// DefaultHandler is a built-in default handler which simply passes events on -// unchanged. -type DefaultHandler struct { - DefaultOut chan types.Entry +// DBHandler writes consumed events to a database. +type DBHandler struct { + OutChan chan types.Entry } // GetName just returns the name of the default handler -func (h *DefaultHandler) GetName() string { +func (h *DBHandler) GetName() string { return "Default handler" } // GetEventTypes here is a dummy method -- since this handler is never // registered we don't need to set this to an actual event type -func (h *DefaultHandler) GetEventTypes() []string { +func (h *DBHandler) GetEventTypes() []string { return []string{"not applicable"} } // Consume simply emits ths consumed entry on the default output channel -func (h *DefaultHandler) Consume(e *types.Entry) error { - h.DefaultOut <- *e +func (h *DBHandler) Consume(e *types.Entry) error { + h.OutChan <- *e return nil } @@ -86,13 +85,15 @@ func (ad *HandlerDispatcher) runCounter() { func MakeHandlerDispatcher(databaseOut chan types.Entry) *HandlerDispatcher { ad := &HandlerDispatcher{ DispatchMap: make(map[string]([]Handler)), - DefaultHandler: &DefaultHandler{ - DefaultOut: databaseOut, - }, Logger: log.WithFields(log.Fields{ "domain": "dispatch", }), } + if databaseOut != nil { + ad.DBHandler = &DBHandler{ + OutChan: databaseOut, + } + } ad.Logger.WithFields(log.Fields{ "type": "*", "name": "default handler", @@ -119,18 +120,19 @@ func (ad *HandlerDispatcher) RegisterHandler(agg Handler) { // Dispatch applies the set of handlers currently registered in the dispatcher // to the Entry object passed to it. func (ad *HandlerDispatcher) Dispatch(e *types.Entry) { - // by default just send entry to database - if _, ok := ad.DispatchMap[e.EventType]; !ok { - ad.DefaultHandler.Consume(e) - } - for _, agg := range ad.DispatchMap[e.EventType] { - agg.Consume(e) + if _, ok := ad.DispatchMap[e.EventType]; ok { + for _, agg := range ad.DispatchMap[e.EventType] { + agg.Consume(e) + } } if a, ok := ad.DispatchMap["*"]; ok { for _, agg := range a { agg.Consume(e) } } + if ad.DBHandler != nil { + ad.DBHandler.Consume(e) + } ad.Lock.Lock() ad.PerfStats.DispatchedPerSec++ ad.Lock.Unlock() diff --git a/processing/handler_dispatcher_test.go b/processing/handler_dispatcher_test.go index 168c0f5..c46d274 100644 --- a/processing/handler_dispatcher_test.go +++ b/processing/handler_dispatcher_test.go @@ -19,48 +19,6 @@ import ( "github.com/NeowayLabs/wabbit/amqptest/server" ) -func TestHandlerDispatcherDefaultHandler(t *testing.T) { - outChan := make(chan types.Entry) - closeChan := make(chan bool) - ad := MakeHandlerDispatcher(outChan) - vals := make([]string, 0) - - defaultTypes := ad.DefaultHandler.GetEventTypes() - if len(defaultTypes) != 1 { - t.Fatal("default handler should only have one type") - } - if defaultTypes[0] != "not applicable" { - t.Fatal("default handler should only have one type, 'not applicable'") - } - if ad.DefaultHandler.GetName() != "Default handler" { - t.Fatal("default handler has wrong name") - } - - go func(closeChan chan bool, inChan chan types.Entry) { - for v := range inChan { - vals = append(vals, v.JSONLine) - } - close(closeChan) - }(closeChan, outChan) - - ad.Dispatch(&types.Entry{ - JSONLine: "foo", - }) - ad.Dispatch(&types.Entry{ - JSONLine: "bar", - }) - - close(outChan) - <-closeChan - - if len(vals) != 2 { - t.Fatal("wrong number of entries delivered to default handler") - } - if vals[0] != "foo" || vals[1] != "bar" { - t.Fatalf("wrong entries delivered to default handler: %s/%s", vals[0], vals[1]) - } -} - type Test1Handler struct { Vals []string }