diff --git a/pool/node_test.go b/pool/node_test.go index ac0c6fb..ccc2605 100644 --- a/pool/node_test.go +++ b/pool/node_test.go @@ -5,6 +5,7 @@ import ( "context" "os" "strings" + "sync" "testing" "time" @@ -119,9 +120,9 @@ func testContext(t *testing.T) context.Context { return log.Context(context.Background(), log.WithDebug()) } -func testLogContext(t *testing.T) (context.Context, *bytes.Buffer) { +func testLogContext(t *testing.T) (context.Context, *buffer) { t.Helper() - var buf bytes.Buffer + var buf buffer return log.Context(context.Background(), log.WithOutput(&buf), log.WithFormat(log.FormatText), log.WithDebug()), &buf } @@ -159,3 +160,21 @@ type workerMock struct { func (w *workerMock) Start(job *Job) error { return w.startFunc(job) } func (w *workerMock) Stop(key string) error { return w.stopFunc(key) } func (w *workerMock) Notify(p []byte) error { return w.notifyFunc(p) } + +// buffer is a goroutine safe bytes.Buffer +type buffer struct { + buffer bytes.Buffer + mutex sync.Mutex +} + +func (s *buffer) Write(p []byte) (n int, err error) { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.buffer.Write(p) +} + +func (s *buffer) String() string { + s.mutex.Lock() + defer s.mutex.Unlock() + return s.buffer.String() +} diff --git a/pool/scheduler.go b/pool/scheduler.go index e356692..8617fa5 100644 --- a/pool/scheduler.go +++ b/pool/scheduler.go @@ -71,16 +71,17 @@ var ErrScheduleStop = fmt.Errorf("stop") // returns ErrScheduleStop. Plan is called on only one of the nodes that // scheduled the same producer. func (node *Node) Schedule(ctx context.Context, producer JobProducer, interval time.Duration) error { - jobMap, err := rmap.Join(ctx, producer.Name(), node.rdb, rmap.WithLogger(node.logger)) + name := node.Name + ":" + producer.Name() + jobMap, err := rmap.Join(ctx, name, node.rdb, rmap.WithLogger(node.logger)) if err != nil { - return fmt.Errorf("failed to join job map: %w", err) + return fmt.Errorf("failed to join job map %s: %w", name, err) } ticker, err := node.NewTicker(ctx, producer.Name(), interval) if err != nil { - return fmt.Errorf("failed to create ticker: %w", err) + return fmt.Errorf("failed to create ticker %s: %w", name, err) } sched := &scheduler{ - name: producer.Name(), + name: name, interval: interval, producer: producer, node: node, diff --git a/pool/scheduler_test.go b/pool/scheduler_test.go index 2285f44..145e853 100644 --- a/pool/scheduler_test.go +++ b/pool/scheduler_test.go @@ -1,6 +1,7 @@ package pool import ( + "sync" "testing" "time" @@ -20,12 +21,16 @@ func TestSchedule(t *testing.T) { worker = newTestWorker(t, ctx, node) d = 10 * time.Millisecond iter = 0 + lock sync.Mutex ) defer cleanup(t, rdb, false, testName) + inc := func() { lock.Lock(); iter++; lock.Unlock() } + it := func() int { lock.Lock(); defer lock.Unlock(); return iter } + producer := newTestProducer(testName, func() (*JobPlan, error) { - iter++ - switch iter { + inc() + switch it() { case 1: assert.Equal(t, 0, numJobs(t, worker), "unexpected number of jobs") // First iteration: start a job @@ -60,17 +65,19 @@ func TestSchedule(t *testing.T) { // Seventh iteration: stop schedule return nil, ErrScheduleStop } - t.Errorf("unexpected iteration %d", iter) + t.Errorf("unexpected iteration %d", it()) return nil, nil }) // Observe call to reset - jobMap, err := rmap.Join(ctx, testName, rdb) + jobMap, err := rmap.Join(ctx, testName+":"+testName, rdb) require.NoError(t, err) var reset bool c := jobMap.Subscribe() defer jobMap.Unsubscribe(c) + done := make(chan struct{}) go func() { + defer close(done) for ev := range c { if ev == rmap.EventReset { reset = true @@ -82,9 +89,14 @@ func TestSchedule(t *testing.T) { err = node.Schedule(ctx, producer, d) require.NoError(t, err) - jobMap.Subscribe() - assert.Eventually(t, func() bool { return iter == 7 }, max, delay, "schedule should have stopped") - assert.Eventually(t, func() bool { return reset }, max, delay, "job map should have been reset") + assert.Eventually(t, func() bool { return it() == 7 }, max, delay, "schedule should have stopped") + select { + case <-done: + reset = true + case <-time.After(time.Second): + break + } + assert.True(t, reset, "job map should have been reset") assert.NotContains(t, buf.String(), "level=error", "unexpected logged error") } diff --git a/pool/ticker.go b/pool/ticker.go index 5d94ccc..94930b8 100644 --- a/pool/ticker.go +++ b/pool/ticker.go @@ -36,6 +36,7 @@ func (node *Node) NewTicker(ctx context.Context, name string, d time.Duration, o if node.clientOnly { return nil, fmt.Errorf("cannot create ticker on client-only node") } + name = node.Name + ":" + name o := parseTickerOptions(opts...) logger := o.logger if logger == nil {