Skip to content

Commit

Permalink
Merge pull request #29 from satta/alerts-pg
Browse files Browse the repository at this point in the history
Make sure all events are written to database
  • Loading branch information
Robert Haist authored Jan 25, 2019
2 parents 32d0a4e + 151a5c0 commit 048ae3a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 62 deletions.
2 changes: 1 addition & 1 deletion db/slurper_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (s *PostgresSlurper) slurpPostgres(eventchan chan types.Entry) {
s.Logger.WithFields(log.Fields{
"chunksize": s.ChunkSize,
"table": s.CurrentTableName,
}).Info("COPY complete")
}).Debug("COPY complete")
}
copybuf.Reset()
}
Expand Down
2 changes: 2 additions & 0 deletions processing/flow_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ func makeFlowEvent() types.Entry {
PktsToClient: int64(rand.Intn(100)),
PktsToServer: int64(rand.Intn(100)),
}
jsonBytes, _ := json.Marshal(e)
e.JSONLine = string(jsonBytes)
return e
}

Expand Down
40 changes: 21 additions & 19 deletions processing/handler_dispatcher.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package processing

// DCSO FEVER
// Copyright (c) 2017, DCSO GmbH
// Copyright (c) 2017, 2018, DCSO GmbH

import (
"sync"
Expand All @@ -26,34 +26,33 @@ type HandlerDispatcherPerfStats struct {
type HandlerDispatcher struct {
Lock sync.Mutex
DispatchMap map[string]([]Handler)
DefaultHandler Handler
DBHandler Handler
PerfStats HandlerDispatcherPerfStats
Logger *log.Entry
StatsEncoder *util.PerformanceStatsEncoder
StopCounterChan chan bool
StoppedCounterChan chan bool
}

// DefaultHandler is a built-in default handler which simply passes events on
// unchanged.
type DefaultHandler struct {
DefaultOut chan types.Entry
// DBHandler writes consumed events to a database.
type DBHandler struct {
OutChan chan types.Entry
}

// GetName just returns the name of the default handler
func (h *DefaultHandler) GetName() string {
func (h *DBHandler) GetName() string {
return "Default handler"
}

// GetEventTypes here is a dummy method -- since this handler is never
// registered we don't need to set this to an actual event type
func (h *DefaultHandler) GetEventTypes() []string {
func (h *DBHandler) GetEventTypes() []string {
return []string{"not applicable"}
}

// Consume simply emits ths consumed entry on the default output channel
func (h *DefaultHandler) Consume(e *types.Entry) error {
h.DefaultOut <- *e
func (h *DBHandler) Consume(e *types.Entry) error {
h.OutChan <- *e
return nil
}

Expand Down Expand Up @@ -86,13 +85,15 @@ func (ad *HandlerDispatcher) runCounter() {
func MakeHandlerDispatcher(databaseOut chan types.Entry) *HandlerDispatcher {
ad := &HandlerDispatcher{
DispatchMap: make(map[string]([]Handler)),
DefaultHandler: &DefaultHandler{
DefaultOut: databaseOut,
},
Logger: log.WithFields(log.Fields{
"domain": "dispatch",
}),
}
if databaseOut != nil {
ad.DBHandler = &DBHandler{
OutChan: databaseOut,
}
}
ad.Logger.WithFields(log.Fields{
"type": "*",
"name": "default handler",
Expand All @@ -119,18 +120,19 @@ func (ad *HandlerDispatcher) RegisterHandler(agg Handler) {
// Dispatch applies the set of handlers currently registered in the dispatcher
// to the Entry object passed to it.
func (ad *HandlerDispatcher) Dispatch(e *types.Entry) {
// by default just send entry to database
if _, ok := ad.DispatchMap[e.EventType]; !ok {
ad.DefaultHandler.Consume(e)
}
for _, agg := range ad.DispatchMap[e.EventType] {
agg.Consume(e)
if _, ok := ad.DispatchMap[e.EventType]; ok {
for _, agg := range ad.DispatchMap[e.EventType] {
agg.Consume(e)
}
}
if a, ok := ad.DispatchMap["*"]; ok {
for _, agg := range a {
agg.Consume(e)
}
}
if ad.DBHandler != nil {
ad.DBHandler.Consume(e)
}
ad.Lock.Lock()
ad.PerfStats.DispatchedPerSec++
ad.Lock.Unlock()
Expand Down
42 changes: 0 additions & 42 deletions processing/handler_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,6 @@ import (
"github.com/NeowayLabs/wabbit/amqptest/server"
)

func TestHandlerDispatcherDefaultHandler(t *testing.T) {
outChan := make(chan types.Entry)
closeChan := make(chan bool)
ad := MakeHandlerDispatcher(outChan)
vals := make([]string, 0)

defaultTypes := ad.DefaultHandler.GetEventTypes()
if len(defaultTypes) != 1 {
t.Fatal("default handler should only have one type")
}
if defaultTypes[0] != "not applicable" {
t.Fatal("default handler should only have one type, 'not applicable'")
}
if ad.DefaultHandler.GetName() != "Default handler" {
t.Fatal("default handler has wrong name")
}

go func(closeChan chan bool, inChan chan types.Entry) {
for v := range inChan {
vals = append(vals, v.JSONLine)
}
close(closeChan)
}(closeChan, outChan)

ad.Dispatch(&types.Entry{
JSONLine: "foo",
})
ad.Dispatch(&types.Entry{
JSONLine: "bar",
})

close(outChan)
<-closeChan

if len(vals) != 2 {
t.Fatal("wrong number of entries delivered to default handler")
}
if vals[0] != "foo" || vals[1] != "bar" {
t.Fatalf("wrong entries delivered to default handler: %s/%s", vals[0], vals[1])
}
}

type Test1Handler struct {
Vals []string
}
Expand Down

0 comments on commit 048ae3a

Please sign in to comment.