From 5dc0317c2a7f374c07fdf37894f24ba8da73fa8f Mon Sep 17 00:00:00 2001 From: Peeter Karolin <43540664+tw-peeterkarolin@users.noreply.github.com> Date: Tue, 26 Sep 2023 12:08:20 +0300 Subject: [PATCH] APPENG-656: add support for doing database queries with multiple threads in parallel within one entry point. (#40) --- CHANGELOG.md | 7 +++- build.libraries.gradle | 4 +- gradle.properties | 2 +- .../DatabaseAccessStatisticsIntTest.java | 42 +++++++++++++++++++ ...AccessStatisticsEntryPointInterceptor.java | 4 +- 5 files changed, 53 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 76db87f..adcabcb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,10 +5,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [2.13.4] - 2023-08-01 +## [2.14.0] - 2023-09-26 ### Added +* Added support for multiple threads doing database queries in parallel related to gathering database access statistics. + Previously it wasn't possible as it had the potential to throw a `ConcurrentModificationException`. + +## [2.13.4] - 2023-08-01 +### Added * Support for Spring Boot 3.1 ### Bumped diff --git a/build.libraries.gradle b/build.libraries.gradle index 6072e42..24ab9e4 100644 --- a/build.libraries.gradle +++ b/build.libraries.gradle @@ -9,8 +9,8 @@ ext { springBootDependencies : "org.springframework.boot:spring-boot-dependencies:${springBootVersion}", testContainersMariaDb : "org.testcontainers:mariadb:1.17.6", twBaseUtils : "com.transferwise.common:tw-base-utils:1.10.1", - twContext : "com.transferwise.common:tw-context:0.12.0", - twContextStarter : "com.transferwise.common:tw-context-starter:0.12.0", + twContext : "com.transferwise.common:tw-context:1.0.0", + twContextStarter : "com.transferwise.common:tw-context-starter:1.0.0", twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.11.0', twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.11.0', twSpyqlCore : "com.transferwise.common:tw-spyql-core:1.6.0", diff --git a/gradle.properties b/gradle.properties index 0277f53..77685fc 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=2.13.4 +version=2.14.0 diff --git a/tw-entrypoints-starter/src/test/java/com/transferwise/common/entrypoints/databaseaccessstatistics/DatabaseAccessStatisticsIntTest.java b/tw-entrypoints-starter/src/test/java/com/transferwise/common/entrypoints/databaseaccessstatistics/DatabaseAccessStatisticsIntTest.java index 5cae60a..954905d 100644 --- a/tw-entrypoints-starter/src/test/java/com/transferwise/common/entrypoints/databaseaccessstatistics/DatabaseAccessStatisticsIntTest.java +++ b/tw-entrypoints-starter/src/test/java/com/transferwise/common/entrypoints/databaseaccessstatistics/DatabaseAccessStatisticsIntTest.java @@ -9,7 +9,13 @@ import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Meter; import io.micrometer.core.instrument.Timer; +import java.util.Arrays; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.sql.DataSource; @@ -111,6 +117,42 @@ void rowsStatisticsAreGathered() { } + // This test doesn't test so much the potential concurrent modification exceptions, but it's more about documenting that it's an expected behaviour + // and how it should work. + @Test + void multipleThreadsAreAbleToGatherStatisticsWithinOneEntryPointConcurrently() { + TwContext entryPointContext = TwContext.current().createSubContext().asEntryPoint("Test", "myConcurrentEntryPoint"); + List> queries = Arrays.asList( + createQueryCallable(entryPointContext), + createQueryCallable(entryPointContext), + createQueryCallable(entryPointContext), + createQueryCallable(entryPointContext) + ); + + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + entryPointContext.execute(() -> { + try { + List> futures = executorService.invokeAll(queries); + for (Future future : futures) { + future.get(1000, TimeUnit.MILLISECONDS); + } + } catch (Throwable e) { + throw new RuntimeException(e); + } + }); + } finally { + executorService.shutdown(); + } + + DatabaseAccessStatistics stats = (DatabaseAccessStatistics) ((Map) entryPointContext.get(DatabaseAccessStatistics.TW_CONTEXT_KEY)).get("mydb"); + assertThat(stats.getNonTransactionalQueriesCount()).isEqualTo(4); + } + + private Callable createQueryCallable(TwContext twContext) { + return () -> twContext.createSubContext().execute(() -> jdbcTemplate.queryForObject("select count(*) from table_a", Integer.class)); + } + private Map metersAsMap() { return meterRegistry.getMeters().stream().filter(m -> m.getId().getName().startsWith("EntryPoints_Das")).collect(Collectors .toMap(m -> StringUtils.substringAfter(m.getId().getName(), "EntryPoints_Das_"), m -> m)); diff --git a/tw-entrypoints/src/main/java/com/transferwise/common/entrypoints/databaseaccessstatistics/DatabaseAccessStatisticsEntryPointInterceptor.java b/tw-entrypoints/src/main/java/com/transferwise/common/entrypoints/databaseaccessstatistics/DatabaseAccessStatisticsEntryPointInterceptor.java index ed075c5..e5563c2 100644 --- a/tw-entrypoints/src/main/java/com/transferwise/common/entrypoints/databaseaccessstatistics/DatabaseAccessStatisticsEntryPointInterceptor.java +++ b/tw-entrypoints/src/main/java/com/transferwise/common/entrypoints/databaseaccessstatistics/DatabaseAccessStatisticsEntryPointInterceptor.java @@ -8,8 +8,8 @@ import com.transferwise.common.entrypoints.EntryPointsMetrics; import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; -import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; @@ -40,7 +40,7 @@ public DatabaseAccessStatisticsEntryPointInterceptor(IMeterCache meterCache) { @Override public T intercept(TwContext context, Supplier supplier) { - Map dbDasMap = new HashMap<>(); + Map dbDasMap = new ConcurrentHashMap<>(); try { context.put(DatabaseAccessStatistics.TW_CONTEXT_KEY, dbDasMap);