@@ -61,6 +61,7 @@ import org.jooq.impl.DSL.table
61
61
import org.jooq.util.mysql.MySQLDSL
62
62
import org.slf4j.LoggerFactory
63
63
import org.springframework.scheduling.annotation.Scheduled
64
+ import java.util.concurrent.atomic.AtomicInteger
64
65
65
66
@KotlinOpen
66
67
class SqlQueue (
@@ -233,31 +234,12 @@ class SqlQueue(
233
234
*/
234
235
private fun doPoll (maxMessages : Int , callback : (Message , () -> Unit ) -> Unit ) {
235
236
val now = clock.instant().toEpochMilli()
236
- var changed = 0
237
+ val changed = AtomicInteger ()
237
238
238
239
/* *
239
240
* Selects the primary key ulid's of up to ([maxMessages] * 3) ready and unlocked messages,
240
241
* sorted by delivery time.
241
242
*
242
- * To minimize lock contention, this is a non-locking read. The id's returned may be
243
- * locked or removed by another instance before we can acquire them. We read more id's
244
- * than [maxMessages] and shuffle them to decrease the likelihood that multiple instances
245
- * polling concurrently are all competing for the oldest ready messages when many more
246
- * than [maxMessages] are read.
247
- *
248
- * Candidate rows are locked via an autocommit update query by primary key that will
249
- * only modify unlocked rows. When (candidates > maxMessages), a sliding window is used
250
- * to traverse the shuffled candidates, sized to (maxMessages - changed) with up-to 3
251
- * attempts (and update queries) to grab [maxMessages].
252
- *
253
- * I.e. if maxMessage == 5 and
254
- * candidates == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].shuffle() == [9, 3, 7, 1, 10, 8, 5, 2, 6, 4]
255
- *
256
- * - pass1: attempts to claim [9, 3, 7, 1, 10], locks 3 messages
257
- * - pass2: attempts to claim [8, 5], locks 1 message
258
- * - pass3: attempt to claim [2], succeeds but if not, there are no further attempts
259
- * - proceeds to process 5 messages locked via 3 update queries.
260
- *
261
243
* This makes a trade-off between grabbing the maximum number of ready messages per poll cycle
262
244
* vs. minimizing [poll] runtime which is also critical to throughput. In testing a scenario
263
245
* with up-to 100k ready messages and 7 orca/keiko-sql instances with [fillExecutorEachCycle]
@@ -269,7 +251,7 @@ class SqlQueue(
269
251
* [MaxAttemptsAttribute] has been set to a positive integer. Otherwise,
270
252
* [AttemptsAttribute] is unused.
271
253
*/
272
- var candidates = jooq.select(idField)
254
+ val candidates = jooq.select(idField)
273
255
.from(queueTable)
274
256
.where(deliveryField.le(now), lockedField.eq(" 0" ))
275
257
.orderBy(deliveryField.asc())
@@ -281,27 +263,14 @@ class SqlQueue(
281
263
return
282
264
}
283
265
284
- // Ordering is essential to prevent Deadlock in PostgreSQL datasource.
285
- candidates = candidates.sorted()
286
-
287
- var position = 0
288
- var passes = 0
289
- while (changed < maxMessages && position < candidates.size && passes < 3 ) {
290
- passes++
291
- val sliceNext = min(maxMessages - 1 - changed, candidates.size - 1 - position)
292
- val ids = candidates.slice(IntRange (position, position + sliceNext))
293
- when (sliceNext) {
294
- 0 -> position++
295
- else -> position + = sliceNext
296
- }
297
-
298
- changed + = jooq.update(queueTable)
266
+ candidates.parallelStream().forEach {
267
+ changed.addAndGet(jooq.update(queueTable)
299
268
.set(lockedField, " $lockId :$now " )
300
- .where(idField.` in `( * ids.toTypedArray() ), lockedField.eq(" 0" ))
301
- .execute()
269
+ .where(idField.eq(it ), lockedField.eq(" 0" ))
270
+ .execute())
302
271
}
303
272
304
- if (changed > 0 ) {
273
+ if (changed.get() > 0 ) {
305
274
val rs = withRetry(READ ) {
306
275
jooq.select(
307
276
field(" q.id" ).`as `(" id" ),
0 commit comments