Skip to content

Commit

Permalink
refactor: rewrote the queue for the eventstream
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey committed Sep 24, 2024
1 parent 3aebde0 commit c425634
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 295 deletions.
15 changes: 7 additions & 8 deletions eventstream/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type subscriber struct {
// sem represents a lock
sem sync.Mutex
// messages of the subscriber
messages *queue.Unbounded[*Message]
messages *queue.Queue
// topics define the topic the subscriber subscribed to
topics map[string]bool
// states whether the given subscriber is active or not
Expand All @@ -70,7 +70,7 @@ func newSubscriber() *subscriber {
return &subscriber{
id: id,
sem: sync.Mutex{},
messages: queue.NewUnbounded[*Message](),
messages: queue.NewQueue(),
topics: make(map[string]bool),
active: true,
}
Expand Down Expand Up @@ -114,18 +114,17 @@ func (x *subscriber) Shutdown() {
// release the lock once done
defer x.sem.Unlock()
x.active = false
x.messages.Close()
}

func (x *subscriber) Iterator() chan *Message {
out := make(chan *Message, x.messages.Len())
out := make(chan *Message, x.messages.Length())
defer close(out)
for {
msg, ok := x.messages.Pop()
if !ok {
msg := x.messages.Dequeue()
if msg == nil {
break
}
out <- msg
out <- msg.(*Message)
}
return out
}
Expand All @@ -138,7 +137,7 @@ func (x *subscriber) signal(message *Message) {
defer x.sem.Unlock()
// only receive message when active
if x.active {
x.messages.Push(message)
x.messages.Enqueue(message)
}
}

Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/google/uuid v1.6.0
github.com/hashicorp/go-memdb v1.3.4
github.com/stretchr/testify v1.9.0
github.com/tochemey/goakt/v2 v2.6.2-0.20240911181406-5ac03f8ba57f
github.com/tochemey/goakt/v2 v2.6.2
github.com/tochemey/gopack v0.0.0-20240704194040-eaa380774969
github.com/travisjeffery/go-dynaport v1.0.0
go.opentelemetry.io/otel v1.30.0
Expand All @@ -21,7 +21,7 @@ require (
)

require (
connectrpc.com/connect v1.16.2 // indirect
connectrpc.com/connect v1.17.0 // indirect
connectrpc.com/otelconnect v0.7.1 // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
Expand Down Expand Up @@ -105,12 +105,12 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.31.0 // indirect
k8s.io/apimachinery v0.31.0 // indirect
k8s.io/client-go v0.31.0 // indirect
k8s.io/api v0.31.1 // indirect
k8s.io/apimachinery v0.31.1 // indirect
k8s.io/client-go v0.31.1 // indirect
k8s.io/klog/v2 v2.130.1 // indirect
k8s.io/kube-openapi v0.0.0-20240903163716-9e1beecbcb38 // indirect
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3 // indirect
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect
sigs.k8s.io/yaml v1.4.0 // indirect
Expand Down
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
connectrpc.com/connect v1.16.2 h1:ybd6y+ls7GOlb7Bh5C8+ghA6SvCBajHwxssO2CGFjqE=
connectrpc.com/connect v1.16.2/go.mod h1:n2kgwskMHXC+lVqb18wngEpF95ldBHXjZYJussz5FRc=
connectrpc.com/connect v1.17.0 h1:W0ZqMhtVzn9Zhn2yATuUokDLO5N+gIuBWMOnsQrfmZk=
connectrpc.com/connect v1.17.0/go.mod h1:0292hj1rnx8oFrStN7cB4jjVBeqs+Yx5yDIC2prWDO8=
connectrpc.com/otelconnect v0.7.1 h1:scO5pOb0i4yUE66CnNrHeK1x51yq0bE0ehPg6WvzXJY=
connectrpc.com/otelconnect v0.7.1/go.mod h1:dh3bFgHBTb2bkqGCeVVOtHJreSns7uu9wwL2Tbz17ms=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
Expand Down Expand Up @@ -315,8 +315,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/redcon v1.6.2 h1:5qfvrrybgtO85jnhSravmkZyC0D+7WstbfCs3MmPhow=
github.com/tidwall/redcon v1.6.2/go.mod h1:p5Wbsgeyi2VSTBWOcA5vRXrOb9arFTcU2+ZzFjqV75Y=
github.com/tochemey/goakt/v2 v2.6.2-0.20240911181406-5ac03f8ba57f h1:ji6dtBdZAnF+usGNyRSYwHJowvyGhJfChgSEgNhmBMo=
github.com/tochemey/goakt/v2 v2.6.2-0.20240911181406-5ac03f8ba57f/go.mod h1:gugM4SAFAft136Gwr8dzobIuUUOJ/+mSubY0Z8qCs7A=
github.com/tochemey/goakt/v2 v2.6.2 h1:GbD+aR4h2xUivKCdxMIsOkVe7/Pvdbfc371/4iiHF5s=
github.com/tochemey/goakt/v2 v2.6.2/go.mod h1:uuJtrNQ3OgWZRM5uARRFMSv7U1g+crDKfm/v1ASkz0w=
github.com/tochemey/gopack v0.0.0-20240704194040-eaa380774969 h1:SDUoPcLRz28UitaOlMjOTC2T6XYBqx3qWDLDyA0jX3k=
github.com/tochemey/gopack v0.0.0-20240704194040-eaa380774969/go.mod h1:3Qt0XoTLDsLzcaMXs7tYXd+iKOehoWqykWMQNBAwTdA=
github.com/travisjeffery/go-dynaport v1.0.0 h1:m/qqf5AHgB96CMMSworIPyo1i7NZueRsnwdzdCJ8Ajw=
Expand Down Expand Up @@ -456,18 +456,18 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
k8s.io/api v0.31.0 h1:b9LiSjR2ym/SzTOlfMHm1tr7/21aD7fSkqgD/CVJBCo=
k8s.io/api v0.31.0/go.mod h1:0YiFF+JfFxMM6+1hQei8FY8M7s1Mth+z/q7eF1aJkTE=
k8s.io/apimachinery v0.31.0 h1:m9jOiSr3FoSSL5WO9bjm1n6B9KROYYgNZOb4tyZ1lBc=
k8s.io/apimachinery v0.31.0/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/client-go v0.31.0 h1:QqEJzNjbN2Yv1H79SsS+SWnXkBgVu4Pj3CJQgbx0gI8=
k8s.io/client-go v0.31.0/go.mod h1:Y9wvC76g4fLjmU0BA+rV+h2cncoadjvjjkkIGoTLcGU=
k8s.io/api v0.31.1 h1:Xe1hX/fPW3PXYYv8BlozYqw63ytA92snr96zMW9gWTU=
k8s.io/api v0.31.1/go.mod h1:sbN1g6eY6XVLeqNsZGLnI5FwVseTrZX7Fv3O26rhAaI=
k8s.io/apimachinery v0.31.1 h1:mhcUBbj7KUjaVhyXILglcVjuS4nYXiwC+KKFBgIVy7U=
k8s.io/apimachinery v0.31.1/go.mod h1:rsPdaZJfTfLsNJSQzNHQvYoTmxhoOEofxtOsF3rtsMo=
k8s.io/client-go v0.31.1 h1:f0ugtWSbWpxHR7sjVpQwuvw9a3ZKLXX0u0itkFXufb0=
k8s.io/client-go v0.31.1/go.mod h1:sKI8871MJN2OyeqRlmA4W4KM9KBdBUpDLu/43eGemCg=
k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk=
k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE=
k8s.io/kube-openapi v0.0.0-20240903163716-9e1beecbcb38 h1:1dWzkmJrrprYvjGwh9kEUxmcUV/CtNU8QM7h1FLWQOo=
k8s.io/kube-openapi v0.0.0-20240903163716-9e1beecbcb38/go.mod h1:coRQXBK9NxO98XUv3ZD6AK3xzHCxV6+b7lrquKwaKzA=
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3 h1:b2FmK8YH+QEwq/Sy2uAEhmqL5nPfGYbJOcaqjeYYZoA=
k8s.io/utils v0.0.0-20240902221715-702e33fdd3c3/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 h1:MDF6h2H/h4tbzmtIKTuctcwZmY0tY9mD9fNT47QO6HI=
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
Expand Down
235 changes: 77 additions & 158 deletions internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,181 +24,100 @@

package queue

import "sync"

// minQueueLen is the smallest capacity that queue may have.
// Must be power of 2 for bitwise modulus: x % n == x & (n - 1).
const minQueueLen = 16

// Unbounded thread-safe Queue using ring-buffer
// reference: https://blog.dubbelboer.com/2015/04/25/go-faster-queue.html
// https://github.com/eapache/queue
type Unbounded[T any] struct {
mu sync.RWMutex
cond *sync.Cond
nodes []*T
head int
tail int
count int
closed bool
initCap int
import (
"sync/atomic"
"unsafe"
)

// Queue defines a lock-free Queue.
type Queue struct {
head unsafe.Pointer // pointer to the head of the queue
tail unsafe.Pointer // pointer to the tail of the queue
len uint64 // length of the queue
}

// NewUnbounded creates an instance of Unbounded
func NewUnbounded[T any]() *Unbounded[T] {
sq := &Unbounded[T]{
initCap: minQueueLen,
nodes: make([]*T, minQueueLen),
}
sq.cond = sync.NewCond(&sq.mu)
return sq
// item is a single node in the queue.
type item struct {
next unsafe.Pointer // pointer to the next item in the queue
v interface{} // the value stored in the queue item
}

// resize the queue
func (q *Unbounded[T]) resize() {
nodes := make([]*T, q.count<<1)
if q.tail > q.head {
copy(nodes, q.nodes[q.head:q.tail])
} else {
n := copy(nodes, q.nodes[q.head:])
copy(nodes[n:], q.nodes[:q.tail])
// NewQueue creates and returns a new lock-free queue.
func NewQueue() *Queue {
// Initial node is an empty item to act as a sentinel (dummy node).
dummy := &item{}
return &Queue{
head: unsafe.Pointer(dummy), // both head and tail point to the dummy node
tail: unsafe.Pointer(dummy),
}

q.tail = q.count
q.head = 0
q.nodes = nodes
}

// Push adds an item to the back of the queue
// It can be safely called from multiple goroutines
// It will return false if the queue is closed.
// In that case the Item is dropped.
func (q *Unbounded[T]) Push(i T) bool {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
return false
}
if q.count == len(q.nodes) {
q.resize()
// Enqueue adds a value to the tail of the queue.
func (q *Queue) Enqueue(v interface{}) {
xitem := &item{v: v}
for {
last := loadItem(&q.tail) // Load current tail
lastNext := loadItem(&last.next) // Load the next pointer of tail
if last == loadItem(&q.tail) { // Check tail consistency
if lastNext == nil { // Is tail really pointing to the last node?
// Try to link the new item at the end
if casItem(&last.next, nil, xitem) {
// Enqueue successful, now try to move the tail pointer
casItem(&q.tail, last, xitem)
atomic.AddUint64(&q.len, 1)
return
}
} else {
// Tail was pointing to an intermediate node, help move it forward
casItem(&q.tail, last, lastNext)
}
}
}
q.nodes[q.tail] = &i
// bitwise modulus
q.tail = (q.tail + 1) & (len(q.nodes) - 1)
q.count++
q.cond.Signal()
q.mu.Unlock()
return true
}

// Close the queue and discard all entries in the queue
// all goroutines in wait() will return
func (q *Unbounded[T]) Close() {
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
q.count = 0
q.nodes = nil
q.cond.Broadcast()
}

// CloseRemaining will close the queue and return all entries in the queue.
// All goroutines in wait() will return.
func (q *Unbounded[T]) CloseRemaining() []T {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return []T{}
}
rem := make([]T, 0, q.count)
for q.count > 0 {
i := q.nodes[q.head]
// bitwise modulus
q.head = (q.head + 1) & (len(q.nodes) - 1)
q.count--
rem = append(rem, *i)
// Dequeue removes and returns the value at the head of the queue.
// It returns nil if the queue is empty.
func (q *Queue) Dequeue() interface{} {
for {
head := loadItem(&q.head) // Load the current head
tail := loadItem(&q.tail) // Load the current tail
next := loadItem(&head.next) // Load the next node after head
if head == loadItem(&q.head) { // Check head consistency
if head == tail { // Is the queue empty?
if next == nil { // Confirm that queue is empty
return nil
}
// Tail is lagging behind, move it forward
casItem(&q.tail, tail, next)
} else {
// Get the value before CAS to avoid freeing the node too early
v := next.v
// Try to swing the head to the next node
if casItem(&q.head, head, next) {
atomic.AddUint64(&q.len, ^uint64(0)) // decrement length
return v // return the dequeued value
}
}
}
}
q.closed = true
q.count = 0
q.nodes = nil
q.cond.Broadcast()
return rem
}

// IsClosed returns true if the queue has been closed
// The call cannot guarantee that the queue hasn't been
// closed while the function returns, so only "true" has a definite meaning.
func (q *Unbounded[T]) IsClosed() bool {
q.mu.RLock()
c := q.closed
q.mu.RUnlock()
return c
// Length returns the number of items in the queue.
func (q *Queue) Length() uint64 {
return atomic.LoadUint64(&q.len)
}

// Wait for an item to be added.
// If there is items on the queue the first will
// be returned immediately.
// Will return nil, false if the queue is closed.
// Otherwise, the return value of "remove" is returned.
func (q *Unbounded[T]) Wait() (T, bool) {
q.mu.Lock()
if q.closed {
q.mu.Unlock()
var nilElt T
return nilElt, false
}
if q.count != 0 {
q.mu.Unlock()
return q.Pop()
}
q.cond.Wait()
q.mu.Unlock()
return q.Pop()
}

// Pop removes the item from the front of the queue
// If false is returned, it either means 1) there were no items on the queue
// or 2) the queue is closed.
func (q *Unbounded[T]) Pop() (T, bool) {
q.mu.Lock()
defer q.mu.Unlock()
if q.count == 0 {
var nilElt T
return nilElt, false
}
i := q.nodes[q.head]
q.nodes[q.head] = nil
// bitwise modulus
q.head = (q.head + 1) & (len(q.nodes) - 1)
q.count--
// Resize down if buffer 1/4 full.
if len(q.nodes) > minQueueLen && (q.count<<2) == len(q.nodes) {
q.resize()
}

return *i, true
}

// Cap return the capacity (without allocations)
func (q *Unbounded[T]) Cap() int {
q.mu.RLock()
c := cap(q.nodes)
q.mu.RUnlock()
return c
// IsEmpty returns true when the queue is empty
func (q *Queue) IsEmpty() bool {
return atomic.LoadUint64(&q.len) == 0
}

// Len return the current length of the queue.
func (q *Unbounded[T]) Len() int {
q.mu.RLock()
l := q.count
q.mu.RUnlock()
return l
// loadItem atomically loads an item pointer from the given unsafe pointer.
func loadItem(p *unsafe.Pointer) *item {
return (*item)(atomic.LoadPointer(p))
}

// IsEmpty returns true when the queue is empty
func (q *Unbounded[T]) IsEmpty() bool {
q.mu.Lock()
cnt := q.count
q.mu.Unlock()
return cnt == 0
// casItem performs an atomic compare-and-swap on an unsafe pointer.
func casItem(p *unsafe.Pointer, old, new *item) bool {
return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
}
Loading

0 comments on commit c425634

Please sign in to comment.