Skip to content

Commit

Permalink
add queue package
Browse files Browse the repository at this point in the history
  • Loading branch information
kevwan committed Aug 13, 2020
1 parent 5f084fb commit 6fdee77
Show file tree
Hide file tree
Showing 11 changed files with 610 additions and 0 deletions.
44 changes: 44 additions & 0 deletions core/queue/balancedqueuepusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package queue

import (
"errors"
"sync/atomic"

"github.com/tal-tech/go-zero/core/logx"
)

var ErrNoAvailablePusher = errors.New("no available pusher")

type BalancedQueuePusher struct {
name string
pushers []Pusher
index uint64
}

func NewBalancedQueuePusher(pushers []Pusher) Pusher {
return &BalancedQueuePusher{
name: generateName(pushers),
pushers: pushers,
}
}

func (pusher *BalancedQueuePusher) Name() string {
return pusher.name
}

func (pusher *BalancedQueuePusher) Push(message string) error {
size := len(pusher.pushers)

for i := 0; i < size; i++ {
index := atomic.AddUint64(&pusher.index, 1) % uint64(size)
target := pusher.pushers[index]

if err := target.Push(message); err != nil {
logx.Error(err)
} else {
return nil
}
}

return ErrNoAvailablePusher
}
43 changes: 43 additions & 0 deletions core/queue/balancedqueuepusher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package queue

import (
"fmt"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func TestBalancedQueuePusher(t *testing.T) {
const numPushers = 100
var pushers []Pusher
var mockedPushers []*mockedPusher
for i := 0; i < numPushers; i++ {
p := &mockedPusher{
name: "pusher:" + strconv.Itoa(i),
}
pushers = append(pushers, p)
mockedPushers = append(mockedPushers, p)
}

pusher := NewBalancedQueuePusher(pushers)
assert.True(t, len(pusher.Name()) > 0)

for i := 0; i < numPushers*1000; i++ {
assert.Nil(t, pusher.Push("item"))
}

var counts []int
for _, p := range mockedPushers {
counts = append(counts, p.count)
}
mean := calcMean(counts)
variance := calcVariance(mean, counts)
assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
}

func TestBalancedQueuePusher_NoAvailable(t *testing.T) {
pusher := NewBalancedQueuePusher(nil)
assert.True(t, len(pusher.Name()) == 0)
assert.Equal(t, ErrNoAvailablePusher, pusher.Push("item"))
}
10 changes: 10 additions & 0 deletions core/queue/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package queue

type (
Consumer interface {
Consume(string) error
OnEvent(event interface{})
}

ConsumerFactory func() (Consumer, error)
)
6 changes: 6 additions & 0 deletions core/queue/messagequeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package queue

type MessageQueue interface {
Start()
Stop()
}
31 changes: 31 additions & 0 deletions core/queue/multiqueuepusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package queue

import "github.com/tal-tech/go-zero/core/errorx"

type MultiQueuePusher struct {
name string
pushers []Pusher
}

func NewMultiQueuePusher(pushers []Pusher) Pusher {
return &MultiQueuePusher{
name: generateName(pushers),
pushers: pushers,
}
}

func (pusher *MultiQueuePusher) Name() string {
return pusher.name
}

func (pusher *MultiQueuePusher) Push(message string) error {
var batchError errorx.BatchError

for _, each := range pusher.pushers {
if err := each.Push(message); err != nil {
batchError.Add(err)
}
}

return batchError.Err()
}
39 changes: 39 additions & 0 deletions core/queue/multiqueuepusher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package queue

import (
"fmt"
"math"
"strconv"
"testing"

"github.com/stretchr/testify/assert"
)

func TestMultiQueuePusher(t *testing.T) {
const numPushers = 100
var pushers []Pusher
var mockedPushers []*mockedPusher
for i := 0; i < numPushers; i++ {
p := &mockedPusher{
name: "pusher:" + strconv.Itoa(i),
}
pushers = append(pushers, p)
mockedPushers = append(mockedPushers, p)
}

pusher := NewMultiQueuePusher(pushers)
assert.True(t, len(pusher.Name()) > 0)

for i := 0; i < 1000; i++ {
_ = pusher.Push("item")
}

var counts []int
for _, p := range mockedPushers {
counts = append(counts, p.count)
}
mean := calcMean(counts)
variance := calcVariance(mean, counts)
assert.True(t, math.Abs(mean-1000*(1-failProba)) < 10)
assert.True(t, variance < 100, fmt.Sprintf("too big variance - %.2f", variance))
}
15 changes: 15 additions & 0 deletions core/queue/producer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package queue

type (
Producer interface {
AddListener(listener ProduceListener)
Produce() (string, bool)
}

ProduceListener interface {
OnProducerPause()
OnProducerResume()
}

ProducerFactory func() (Producer, error)
)
Loading

0 comments on commit 6fdee77

Please sign in to comment.