-
Notifications
You must be signed in to change notification settings - Fork 19
/
gen_func.go
313 lines (288 loc) · 8.93 KB
/
gen_func.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
// Code generated by gen.go; DO NOT EDIT.
package skipset
import (
"sync"
"sync/atomic"
"unsafe"
)
// FuncSet represents a set based on skip list.
type FuncSet[T any] struct {
length int64
highestLevel uint64 // highest level for now
header *funcnode[T]
less func(a, b T) bool
}
type funcnode[T any] struct {
flags bitflag
value T
next optionalArray // [level]*funcnode
mu sync.Mutex
level uint32
}
func newFuncNode[T any](value T, level int) *funcnode[T] {
n := &funcnode[T]{
value: value,
level: uint32(level),
}
if level > op1 {
n.next.extra = new([op2]unsafe.Pointer)
}
return n
}
func (n *funcnode[T]) loadNext(i int) *funcnode[T] {
return (*funcnode[T])(n.next.load(i))
}
func (n *funcnode[T]) storeNext(i int, next *funcnode[T]) {
n.next.store(i, unsafe.Pointer(next))
}
func (n *funcnode[T]) atomicLoadNext(i int) *funcnode[T] {
return (*funcnode[T])(n.next.atomicLoad(i))
}
func (n *funcnode[T]) atomicStoreNext(i int, next *funcnode[T]) {
n.next.atomicStore(i, unsafe.Pointer(next))
}
// findNodeRemove takes a value and two maximal-height arrays then searches exactly as in a sequential skip-list.
// The returned preds and succs always satisfy preds[i] > value >= succs[i].
func (s *FuncSet[T]) findNodeRemove(value T, preds *[maxLevel]*funcnode[T], succs *[maxLevel]*funcnode[T]) int {
// lFound represents the index of the first layer at which it found a node.
lFound, x := -1, s.header
for i := int(atomic.LoadUint64(&s.highestLevel)) - 1; i >= 0; i-- {
succ := x.atomicLoadNext(i)
for succ != nil && s.less(succ.value, value) {
x = succ
succ = x.atomicLoadNext(i)
}
preds[i] = x
succs[i] = succ
// Check if the value already in the skip list.
if lFound == -1 && succ != nil && !s.less(value, succ.value) {
lFound = i
}
}
return lFound
}
// findNodeAdd takes a value and two maximal-height arrays then searches exactly as in a sequential skip-set.
// The returned preds and succs always satisfy preds[i] > value >= succs[i].
func (s *FuncSet[T]) findNodeAdd(value T, preds *[maxLevel]*funcnode[T], succs *[maxLevel]*funcnode[T]) int {
x := s.header
for i := int(atomic.LoadUint64(&s.highestLevel)) - 1; i >= 0; i-- {
succ := x.atomicLoadNext(i)
for succ != nil && s.less(succ.value, value) {
x = succ
succ = x.atomicLoadNext(i)
}
preds[i] = x
succs[i] = succ
// Check if the value already in the skip list.
if succ != nil && !s.less(value, succ.value) {
return i
}
}
return -1
}
func unlockfunc[T any](preds [maxLevel]*funcnode[T], highestLevel int) {
var prevPred *funcnode[T]
for i := highestLevel; i >= 0; i-- {
if preds[i] != prevPred { // the node could be unlocked by previous loop
preds[i].mu.Unlock()
prevPred = preds[i]
}
}
}
// Add adds the value into skip set, returns true if this process insert the value into skip set,
// returns false if this process can't insert this value, because another process has insert the same value.
//
// If the value is in the skip set but not fully linked, this process will wait until it is.
func (s *FuncSet[T]) Add(value T) bool {
level := s.randomlevel()
var preds, succs [maxLevel]*funcnode[T]
for {
lFound := s.findNodeAdd(value, &preds, &succs)
if lFound != -1 { // indicating the value is already in the skip-list
nodeFound := succs[lFound]
if !nodeFound.flags.Get(marked) {
for !nodeFound.flags.Get(fullyLinked) {
// The node is not yet fully linked, just waits until it is.
}
return false
}
// If the node is marked, represents some other thread is in the process of deleting this node,
// we need to add this node in next loop.
continue
}
// Add this node into skip list.
var (
highestLocked = -1 // the highest level being locked by this process
valid = true
pred, succ, prevPred *funcnode[T]
)
for layer := 0; valid && layer < level; layer++ {
pred = preds[layer] // target node's previous node
succ = succs[layer] // target node's next node
if pred != prevPred { // the node in this layer could be locked by previous loop
pred.mu.Lock()
highestLocked = layer
prevPred = pred
}
// valid check if there is another node has inserted into the skip list in this layer during this process.
// It is valid if:
// 1. The previous node and next node both are not marked.
// 2. The previous node's next node is succ in this layer.
valid = !pred.flags.Get(marked) && (succ == nil || !succ.flags.Get(marked)) && pred.loadNext(layer) == succ
}
if !valid {
unlockfunc(preds, highestLocked)
continue
}
nn := newFuncNode(value, level)
for layer := 0; layer < level; layer++ {
nn.storeNext(layer, succs[layer])
preds[layer].atomicStoreNext(layer, nn)
}
nn.flags.SetTrue(fullyLinked)
unlockfunc(preds, highestLocked)
atomic.AddInt64(&s.length, 1)
return true
}
}
func (s *FuncSet[T]) randomlevel() int {
// Generate random level.
level := randomLevel()
// Update highest level if possible.
for {
hl := atomic.LoadUint64(&s.highestLevel)
if level <= int(hl) {
break
}
if atomic.CompareAndSwapUint64(&s.highestLevel, hl, uint64(level)) {
break
}
}
return level
}
// Contains checks if the value is in the skip set.
func (s *FuncSet[T]) Contains(value T) bool {
x := s.header
for i := int(atomic.LoadUint64(&s.highestLevel)) - 1; i >= 0; i-- {
nex := x.atomicLoadNext(i)
for nex != nil && s.less(nex.value, value) {
x = nex
nex = x.atomicLoadNext(i)
}
// Check if the value already in the skip list.
if nex != nil && !s.less(value, nex.value) {
return nex.flags.MGet(fullyLinked|marked, fullyLinked)
}
}
return false
}
// Remove removes a node from the skip set.
func (s *FuncSet[T]) Remove(value T) bool {
var (
nodeToRemove *funcnode[T]
isMarked bool // represents if this operation mark the node
topLayer = -1
preds, succs [maxLevel]*funcnode[T]
)
for {
lFound := s.findNodeRemove(value, &preds, &succs)
if isMarked || // this process mark this node or we can find this node in the skip list
lFound != -1 && succs[lFound].flags.MGet(fullyLinked|marked, fullyLinked) && (int(succs[lFound].level)-1) == lFound {
if !isMarked { // we don't mark this node for now
nodeToRemove = succs[lFound]
topLayer = lFound
nodeToRemove.mu.Lock()
if nodeToRemove.flags.Get(marked) {
// The node is marked by another process,
// the physical deletion will be accomplished by another process.
nodeToRemove.mu.Unlock()
return false
}
nodeToRemove.flags.SetTrue(marked)
isMarked = true
}
// Accomplish the physical deletion.
var (
highestLocked = -1 // the highest level being locked by this process
valid = true
pred, succ, prevPred *funcnode[T]
)
for layer := 0; valid && (layer <= topLayer); layer++ {
pred, succ = preds[layer], succs[layer]
if pred != prevPred { // the node in this layer could be locked by previous loop
pred.mu.Lock()
highestLocked = layer
prevPred = pred
}
// valid check if there is another node has inserted into the skip list in this layer
// during this process, or the previous is removed by another process.
// It is valid if:
// 1. the previous node exists.
// 2. no another node has inserted into the skip list in this layer.
valid = !pred.flags.Get(marked) && pred.loadNext(layer) == succ
}
if !valid {
unlockfunc(preds, highestLocked)
continue
}
for i := topLayer; i >= 0; i-- {
// Now we own the nodeToRemove, no other goroutine will modify it.
// So we don't need nodeToRemove.loadNext
preds[i].atomicStoreNext(i, nodeToRemove.loadNext(i))
}
nodeToRemove.mu.Unlock()
unlockfunc(preds, highestLocked)
atomic.AddInt64(&s.length, -1)
return true
}
return false
}
}
// Range calls f sequentially for each value present in the skip set.
// If f returns false, range stops the iteration.
func (s *FuncSet[T]) Range(f func(value T) bool) {
x := s.header.atomicLoadNext(0)
for x != nil {
if !x.flags.MGet(fullyLinked|marked, fullyLinked) {
x = x.atomicLoadNext(0)
continue
}
if !f(x.value) {
break
}
x = x.atomicLoadNext(0)
}
}
// RangeFrom calls f sequentially for all values with `value >= start` in the skip set.
// If f returns false, range stops the iteration.
func (s *FuncSet[T]) RangeFrom(start T, f func(value T) bool) {
var (
x = s.header
nex *funcnode[T]
)
for i := int(atomic.LoadUint64(&s.highestLevel)) - 1; i >= 0; i-- {
nex = x.atomicLoadNext(i)
for nex != nil && s.less(nex.value, start) {
x = nex
nex = x.atomicLoadNext(i)
}
// Check if the value already in the skip list.
if nex != nil && !s.less(start, nex.value) {
break
}
}
for nex != nil {
if !nex.flags.MGet(fullyLinked|marked, fullyLinked) {
nex = nex.atomicLoadNext(0)
continue
}
if !f(nex.value) {
break
}
nex = nex.atomicLoadNext(0)
}
}
// Len returns the length of this skip set.
func (s *FuncSet[T]) Len() int {
return int(atomic.LoadInt64(&s.length))
}