Skip to content

Commit 068cf0f

Browse files
Merge pull request #57 from oracle/async-lock-2
Eliminating JDBC Lock Contention
2 parents 938b999 + e30de88 commit 068cf0f

10 files changed

+1675
-1392
lines changed

src/main/java/oracle/r2dbc/impl/AsyncLock.java

Lines changed: 514 additions & 0 deletions
Large diffs are not rendered by default.

src/main/java/oracle/r2dbc/impl/OracleBatchImpl.java

Lines changed: 3 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package oracle.r2dbc.impl;
2323

2424
import java.sql.Connection;
25+
import java.sql.Statement;
2526
import java.time.Duration;
2627
import java.util.LinkedList;
2728
import java.util.Queue;
@@ -114,18 +115,6 @@ public Batch add(String sql) {
114115
* are executed in the order they were added. Calling this method clears all
115116
* statements that have been added to the current batch.
116117
* </p><p>
117-
* A {@code Result} emitted by the returned {@code Publisher} must be
118-
* <a href="OracleStatementImpl.html#fully-consumed-result">
119-
* fully-consumed
120-
* </a>
121-
* before the next {@code Result} is emitted. This ensures that a command in
122-
* the batch can not be executed while the {@code Result} of a previous
123-
* command is consumed concurrently. It is a known limitation of the Oracle
124-
* R2DBC Driver that concurrent operations on a single {@code Connection}
125-
* will result in blocked threads. Deferring {@code Statement} execution
126-
* until full consumption of the previous {@code Statement}'s {@code Result}
127-
* is necessary in order to avoid blocked threads.
128-
* </p><p>
129118
* If the execution of any statement in the sequence results in a failure,
130119
* then the returned publisher emits {@code onError} with an
131120
* {@link R2dbcException} that describes the failure, and all subsequent
@@ -147,42 +136,8 @@ public Publisher<OracleResultImpl> execute() {
147136
requireOpenConnection(jdbcConnection);
148137
Queue<OracleStatementImpl> currentStatements = statements;
149138
statements = new LinkedList<>();
150-
return publishBatch(currentStatements);
151-
}
152-
153-
/**
154-
* <p>
155-
* Publish a batch of {@code Result}s from {@code statements}. Each
156-
* {@code Result} is published serially with the consumption of the
157-
* previous {@code Result}.
158-
* </p><p>
159-
* This method returns an empty {@code Publisher} if {@code statements} is
160-
* empty. Otherwise, this method dequeues the next {@code Statement} and
161-
* executes it for a {@code Result}. After the {@code Result} has been
162-
* fully consumed, this method is invoked recursively to publish the {@code
163-
* Result}s of remaining {@code statements}.
164-
* </p>
165-
* @param statements A batch to executed.
166-
* @return {@code Publisher} of {@code statements} {@code Result}s
167-
*/
168-
private static Publisher<OracleResultImpl> publishBatch(
169-
Queue<OracleStatementImpl> statements) {
170-
171-
OracleStatementImpl next = statements.poll();
172-
173-
if (next != null) {
174-
AtomicReference<OracleResultImpl> lastResult =
175-
new AtomicReference<>(null);
176-
177-
return Flux.from(next.execute())
178-
.doOnNext(lastResult::set)
179-
.concatWith(Flux.defer(() ->
180-
Mono.from(lastResult.get().onConsumed())
181-
.thenMany(publishBatch(statements))));
182-
}
183-
else {
184-
return Mono.empty();
185-
}
139+
return Flux.fromIterable(currentStatements)
140+
.concatMap(OracleStatementImpl::execute);
186141
}
187142

188143
}

src/main/java/oracle/r2dbc/impl/OracleConnectionImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,7 @@ public boolean isAutoCommit() {
470470
@Override
471471
public ConnectionMetadata getMetadata() {
472472
requireOpenConnection(jdbcConnection);
473+
// TODO: Initialize this on construction, to avoid lock contention
473474
return new OracleConnectionMetadataImpl(
474475
fromJdbc(jdbcConnection::getMetaData));
475476
}
@@ -575,7 +576,7 @@ public Publisher<Void> rollbackTransactionToSavepoint(String name) {
575576
@Override
576577
public Publisher<Void> setAutoCommit(boolean autoCommit) {
577578
requireOpenConnection(jdbcConnection);
578-
return Mono.defer(() -> fromJdbc(() -> {
579+
return Mono.from(adapter.getLock().flatMap(() -> {
579580
if (autoCommit == jdbcConnection.getAutoCommit()) {
580581
return Mono.empty(); // No change
581582
}
@@ -589,7 +590,7 @@ else if (! autoCommit) {
589590
// Changing auto-commit from disabled to enabled. Commit in case
590591
// there is an active transaction.
591592
return Mono.from(commitTransaction())
592-
.doOnSuccess(nil -> runJdbc(() ->
593+
.concatWith(adapter.getLock().run(() ->
593594
jdbcConnection.setAutoCommit(true)));
594595
}
595596
}))

src/main/java/oracle/r2dbc/impl/OracleR2dbcExceptions.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ else if (sqlException instanceof SQLRecoverableException) {
240240
* null.
241241
* @throws R2dbcException If the supplier throws a {@code SQLException}.
242242
*/
243-
static void runJdbc(ThrowingRunnable runnable)
243+
static void runJdbc(JdbcRunnable runnable)
244244
throws R2dbcException {
245245
try {
246246
runnable.runOrThrow();
@@ -273,7 +273,7 @@ static void runJdbc(ThrowingRunnable runnable)
273273
* @return The output of the specified {@code supplier}.
274274
* @throws R2dbcException If the supplier throws a {@code SQLException}.
275275
*/
276-
static <T> T fromJdbc(ThrowingSupplier<T> supplier)
276+
static <T> T fromJdbc(JdbcSupplier<T> supplier)
277277
throws R2dbcException {
278278
try {
279279
return supplier.getOrThrow();
@@ -330,7 +330,7 @@ private static String getSql(SQLException sqlException) {
330330
* </p>
331331
*/
332332
@FunctionalInterface
333-
interface ThrowingRunnable extends Runnable {
333+
interface JdbcRunnable extends Runnable {
334334
/**
335335
* Runs to completion and returns normally, or throws a {@code SQLException}
336336
* if an error is encountered.
@@ -343,8 +343,7 @@ interface ThrowingRunnable extends Runnable {
343343
* R2dbcException} if an error is encountered.
344344
* @throws R2dbcException If the run does not complete due to an error.
345345
* @implNote The default implementation invokes
346-
* {@link #runJdbc(ThrowingRunnable)} with this {@code
347-
* ThrowingRunnable}.
346+
* {@link #runJdbc(JdbcRunnable)} with this {@code JdbcRunnable}.
348347
*/
349348
@Override
350349
default void run() throws R2dbcException {
@@ -362,7 +361,7 @@ default void run() throws R2dbcException {
362361
* @param <T> the type of values supplied by this supplier.
363362
*/
364363
@FunctionalInterface
365-
interface ThrowingSupplier<T> extends Supplier<T> {
364+
interface JdbcSupplier<T> extends Supplier<T> {
366365
/**
367366
* Returns a value, or throws a {@code SQLException} if an error is
368367
* encountered.
@@ -376,8 +375,7 @@ interface ThrowingSupplier<T> extends Supplier<T> {
376375
* encountered.
377376
* @throws R2dbcException If a value is not returned due to an error.
378377
* @implNote The default implementation invokes
379-
* {@link #fromJdbc(ThrowingSupplier)} (ThrowingRunnable)}
380-
* with this {@code ThrowingSupplier}.
378+
* {@link #fromJdbc(JdbcSupplier)} with this {@code JdbcSupplier}.
381379
*/
382380
@Override
383381
default T get() throws R2dbcException {

0 commit comments

Comments
 (0)