diff --git a/flags.go b/flags.go index 818ea06..a657040 100644 --- a/flags.go +++ b/flags.go @@ -103,6 +103,7 @@ var ( exitOnComplete bool isStrict bool useNumber bool + multiQueue bool ) // Namespace returns the namespace flag for goworker. You @@ -138,6 +139,8 @@ func init() { flag.BoolVar(&exitOnComplete, "exit-on-complete", false, "exit when the queue is empty") flag.BoolVar(&useNumber, "use-number", false, "use json.Number instead of float64 when decoding numbers in JSON. will default to true soon") + + flag.BoolVar(&multiQueue, "multi-queue", false, "use the multi-queue failure backend") } func flags() error { diff --git a/worker.go b/worker.go index 4ec3e8f..fc122ea 100644 --- a/worker.go +++ b/worker.go @@ -4,6 +4,8 @@ import ( "encoding/json" "errors" "fmt" + "runtime" + "strings" "sync" "time" ) @@ -12,6 +14,15 @@ type worker struct { process } +type stacktraceError struct { + Err error + Stacktrace []string +} + +func (e *stacktraceError) Error() string { + return e.Err.Error() +} + func newWorker(id string, queues []string) (*worker, error) { process, err := newProcess(id, queues) if err != nil { @@ -44,20 +55,25 @@ func (w *worker) start(conn *RedisConn, job *Job) error { return w.process.start(conn) } -func (w *worker) fail(conn *RedisConn, job *Job, err error) error { +func (w *worker) fail(conn *RedisConn, job *Job, traceErr *stacktraceError) error { failure := &failure{ FailedAt: time.Now(), Payload: job.Payload, Exception: "Error", - Error: err.Error(), + Error: traceErr.Error(), Worker: w, Queue: job.Queue, + Backtrace: traceErr.Stacktrace, } buffer, err := json.Marshal(failure) if err != nil { return err } - conn.Send("RPUSH", fmt.Sprintf("%sfailed", namespace), buffer) + if multiQueue { + conn.Send("RPUSH", fmt.Sprintf("%s%s_failed", namespace, job.Queue), buffer) + } else { + conn.Send("RPUSH", fmt.Sprintf("%sfailed", namespace), buffer) + } return w.process.fail(conn) } @@ -69,8 +85,8 @@ func (w *worker) succeed(conn *RedisConn, job *Job) error { return nil } -func (w *worker) finish(conn *RedisConn, job *Job, err error) error { - if err != nil { +func (w *worker) finish(conn *RedisConn, job *Job, err *stacktraceError) error { + if err.Err != nil { w.fail(conn, job, err) } else { w.succeed(conn, job) @@ -117,7 +133,10 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { logger.Criticalf("Error on getting connection in worker %v", w) return } else { - w.finish(conn, job, errors.New(errorLog)) + stackErr := &stacktraceError{ + Err: errors.New(errorLog), + } + w.finish(conn, job, stackErr) PutConn(conn) } } @@ -126,30 +145,35 @@ func (w *worker) work(jobs <-chan *Job, monitor *sync.WaitGroup) { } func (w *worker) run(job *Job, workerFunc workerFunc) { - var err error + var err stacktraceError defer func() { conn, errCon := GetConn() if errCon != nil { logger.Criticalf("Error on getting connection in worker %v", w) return } else { - w.finish(conn, job, err) + w.finish(conn, job, &err) PutConn(conn) } }() defer func() { if r := recover(); r != nil { - err = errors.New(fmt.Sprint(r)) + stackBuf := make([]byte, 2048) + runtime.Stack(stackBuf, false) + stack := string(stackBuf[:]) + err.Err = errors.New(fmt.Sprint(r)) + err.Stacktrace = strings.Split(stack, "\n") } }() - conn, err := GetConn() - if err != nil { + var conn *RedisConn + conn, err.Err = GetConn() + if err.Err != nil { logger.Criticalf("Error on getting connection in worker %v", w) return } else { w.start(conn, job) PutConn(conn) } - err = workerFunc(job.Queue, job.Payload.Args...) + err.Err = workerFunc(job.Queue, job.Payload.Args...) } diff --git a/workers.go b/workers.go index d7be9d8..e46a4c8 100644 --- a/workers.go +++ b/workers.go @@ -44,6 +44,11 @@ func Enqueue(job *Job) error { logger.Criticalf("Cant push to queue") return err } + err = conn.Send("SADD", fmt.Sprintf("%squeues", namespace), job.Queue) + if err != nil { + logger.Criticalf("Can't watch queue") + return err + } return conn.Flush() }