Skip to content

Commit ce2b7a6

Browse files
committed
feat: implements concurrent queue
1 parent 6a9a269 commit ce2b7a6

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed
+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright (c) 2025. protobox
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package collections
18+
19+
import (
20+
"sync"
21+
"sync/atomic"
22+
)
23+
24+
type ConcurrentQueue[T any] struct {
25+
items []T
26+
mu sync.Mutex
27+
size atomic.Uint64
28+
}
29+
30+
func NewConcurrentQueue[T any]() *ConcurrentQueue[T] {
31+
return &ConcurrentQueue[T]{
32+
items: make([]T, 0),
33+
}
34+
}
35+
36+
func (q *ConcurrentQueue[T]) Enqueue(item T) {
37+
q.mu.Lock()
38+
q.items = append(q.items, item)
39+
q.mu.Unlock()
40+
q.size.Add(1)
41+
}
42+
43+
func (q *ConcurrentQueue[T]) Dequeue() T {
44+
q.mu.Lock()
45+
defer q.mu.Unlock()
46+
47+
item := q.items[0]
48+
q.items = q.items[1:]
49+
q.size.Add(^uint64(0))
50+
return item
51+
}
52+
53+
func (q *ConcurrentQueue[T]) DequeueAll() []T {
54+
q.mu.Lock()
55+
defer q.mu.Unlock()
56+
57+
items := make([]T, len(q.items))
58+
copy(items, q.items)
59+
q.items = make([]T, 0)
60+
q.size.Store(0)
61+
return items
62+
}
63+
64+
func (q *ConcurrentQueue[T]) IsEmpty() bool {
65+
return q.size.Load() == 0
66+
}

0 commit comments

Comments
 (0)