Skip to content

Commit

Permalink
APPENG-656: add support for doing database queries with multiple thre…
Browse files Browse the repository at this point in the history
…ads in parallel within one entry point. (#40)
  • Loading branch information
tw-peeterkarolin authored Sep 26, 2023
1 parent 951c582 commit 5dc0317
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 6 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.13.4] - 2023-08-01
## [2.14.0] - 2023-09-26

### Added
* Added support for multiple threads doing database queries in parallel related to gathering database access statistics.
Previously it wasn't possible as it had the potential to throw a `ConcurrentModificationException`.

## [2.13.4] - 2023-08-01

### Added
* Support for Spring Boot 3.1

### Bumped
Expand Down
4 changes: 2 additions & 2 deletions build.libraries.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ ext {
springBootDependencies : "org.springframework.boot:spring-boot-dependencies:${springBootVersion}",
testContainersMariaDb : "org.testcontainers:mariadb:1.17.6",
twBaseUtils : "com.transferwise.common:tw-base-utils:1.10.1",
twContext : "com.transferwise.common:tw-context:0.12.0",
twContextStarter : "com.transferwise.common:tw-context-starter:0.12.0",
twContext : "com.transferwise.common:tw-context:1.0.0",
twContextStarter : "com.transferwise.common:tw-context-starter:1.0.0",
twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.11.0',
twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.11.0',
twSpyqlCore : "com.transferwise.common:tw-spyql-core:1.6.0",
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=2.13.4
version=2.14.0
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Timer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.sql.DataSource;
Expand Down Expand Up @@ -111,6 +117,42 @@ void rowsStatisticsAreGathered() {

}

// This test doesn't test so much the potential concurrent modification exceptions, but it's more about documenting that it's an expected behaviour
// and how it should work.
@Test
void multipleThreadsAreAbleToGatherStatisticsWithinOneEntryPointConcurrently() {
TwContext entryPointContext = TwContext.current().createSubContext().asEntryPoint("Test", "myConcurrentEntryPoint");
List<Callable<Integer>> queries = Arrays.asList(
createQueryCallable(entryPointContext),
createQueryCallable(entryPointContext),
createQueryCallable(entryPointContext),
createQueryCallable(entryPointContext)
);

ExecutorService executorService = Executors.newFixedThreadPool(2);
try {
entryPointContext.execute(() -> {
try {
List<Future<Integer>> futures = executorService.invokeAll(queries);
for (Future<Integer> future : futures) {
future.get(1000, TimeUnit.MILLISECONDS);
}
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
} finally {
executorService.shutdown();
}

DatabaseAccessStatistics stats = (DatabaseAccessStatistics) ((Map) entryPointContext.get(DatabaseAccessStatistics.TW_CONTEXT_KEY)).get("mydb");
assertThat(stats.getNonTransactionalQueriesCount()).isEqualTo(4);
}

private Callable<Integer> createQueryCallable(TwContext twContext) {
return () -> twContext.createSubContext().execute(() -> jdbcTemplate.queryForObject("select count(*) from table_a", Integer.class));
}

private Map<String, Meter> metersAsMap() {
return meterRegistry.getMeters().stream().filter(m -> m.getId().getName().startsWith("EntryPoints_Das")).collect(Collectors
.toMap(m -> StringUtils.substringAfter(m.getId().getName(), "EntryPoints_Das_"), m -> m));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import com.transferwise.common.entrypoints.EntryPointsMetrics;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -40,7 +40,7 @@ public DatabaseAccessStatisticsEntryPointInterceptor(IMeterCache meterCache) {

@Override
public <T> T intercept(TwContext context, Supplier<T> supplier) {
Map<String, DatabaseAccessStatistics> dbDasMap = new HashMap<>();
Map<String, DatabaseAccessStatistics> dbDasMap = new ConcurrentHashMap<>();
try {
context.put(DatabaseAccessStatistics.TW_CONTEXT_KEY, dbDasMap);

Expand Down

0 comments on commit 5dc0317

Please sign in to comment.