From 85707a929a21c5a16227e6479dc8b4f20d5e2fe0 Mon Sep 17 00:00:00 2001
From: Pieter-Jan Vrielynck <43749206+pvriel@users.noreply.github.com>
Date: Mon, 10 Feb 2025 11:02:31 +0100
Subject: [PATCH] [NYS2AWS-134] fixed bug that caused entire alf_transaction
table to be fetched
---
.../default/indexing-context.xml | 1 +
.../indexing/IndexingStrategyFactoryBean.java | 5 +-
.../threshold/ThresholdIndexingStrategy.java | 6 +-
...dIndexingStrategyTransactionIdFetcher.java | 66 +++++++++----------
...ldIndexingStrategyTransactionIdMerger.java | 10 ++-
5 files changed, 44 insertions(+), 44 deletions(-)
diff --git a/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/HealthProcessor/default/indexing-context.xml b/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/HealthProcessor/default/indexing-context.xml
index aca867c..c006b0b 100644
--- a/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/HealthProcessor/default/indexing-context.xml
+++ b/alfresco-health-processor-platform/src/main/amp/config/alfresco/subsystems/HealthProcessor/default/indexing-context.xml
@@ -66,5 +66,6 @@
+
diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/IndexingStrategyFactoryBean.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/IndexingStrategyFactoryBean.java
index a6ae376..814cfe5 100644
--- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/IndexingStrategyFactoryBean.java
+++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/IndexingStrategyFactoryBean.java
@@ -12,6 +12,8 @@
import org.alfresco.repo.search.SearchTrackingComponent;
import org.springframework.beans.factory.config.AbstractFactoryBean;
+import javax.sql.DataSource;
+
@AllArgsConstructor
public final class IndexingStrategyFactoryBean extends AbstractFactoryBean {
@@ -20,6 +22,7 @@ public final class IndexingStrategyFactoryBean extends AbstractFactoryBean getObjectType() {
@@ -38,7 +41,7 @@ private IndexingStrategy createIndexingStrategy(IndexingStrategy.IndexingStrateg
case LAST_TXNS:
return new LastTxnsBasedIndexingStrategy((LastTxnsIndexingConfiguration) configuration, trackingComponent);
case THRESHOLD:
- return new ThresholdIndexingStrategy((ThresholdIndexingStrategyConfiguration) configuration, nodeDAO, searchTrackingComponent);
+ return new ThresholdIndexingStrategy((ThresholdIndexingStrategyConfiguration) configuration, nodeDAO, searchTrackingComponent, dataSource);
default:
throw new IllegalArgumentException("Unknown indexing strategy: "+ indexingStrategy);
}
diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategy.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategy.java
index 1da6325..c55b3cd 100644
--- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategy.java
+++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategy.java
@@ -11,6 +11,7 @@
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.service.cmr.repository.NodeRef;
+import javax.sql.DataSource;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -35,7 +36,8 @@ public class ThresholdIndexingStrategy implements IndexingStrategy {
public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
- @NonNull SearchTrackingComponent searchTrackingComponent) {
+ @NonNull SearchTrackingComponent searchTrackingComponent,
+ @NonNull DataSource dataSource) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));
@@ -44,7 +46,7 @@ public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration
this.nodeDAO = nodeDAO;
this.runningThreads = new HashSet<>(configuration.getTransactionsBackgroundWorkers() + 1);
- this.transactionIdFetcher = new ThresholdIndexingStrategyTransactionIdFetcher(configuration, searchTrackingComponent, state);
+ this.transactionIdFetcher = new ThresholdIndexingStrategyTransactionIdFetcher(configuration, dataSource, state);
this.queuedNodes = new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers());
this.transactionIdMergers = new ThresholdIndexingStrategyTransactionIdMerger[configuration.getTransactionsBackgroundWorkers()];
diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategyTransactionIdFetcher.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategyTransactionIdFetcher.java
index 012284d..d8dde91 100644
--- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategyTransactionIdFetcher.java
+++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategyTransactionIdFetcher.java
@@ -2,50 +2,47 @@
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
-import org.alfresco.repo.search.SearchTrackingComponent;
-import org.alfresco.repo.solr.Transaction;
+import org.springframework.jdbc.core.JdbcTemplate;
+import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
-/**
- * In case the performance ever becomes an issue: this part can be easily multi-threaded too,
- * by dividing the transaction range into multiple parts and fetching them in parallel (using multiple fetchers).
- * However, this is not necessary at the moment. We're already at 200.000 transactions per minute on an underpowered laptop.
- * I'm just leaving this note here, in case this might help another developer in the future.
- */
+// TODO: add an explanation about why we directly contact the DB.
@Slf4j
public class ThresholdIndexingStrategyTransactionIdFetcher implements Runnable {
- private final @NonNull BlockingDeque<@NonNull List<@NonNull Transaction>> queuedTransactions;
+ private final static @NonNull String QUERY = "SELECT txn.id as id FROM alf_transaction txn WHERE txn.id BETWEEN %d AND %d ORDER BY txn.id ASC LIMIT %d";
- private final @NonNull SearchTrackingComponent searchTrackingComponent;
+ private final @NonNull BlockingDeque<@NonNull List<@NonNull Long>> queuedTransactionIDs;
+
+ private final @NonNull JdbcTemplate jdbcTemplate;
private final @NonNull ThresholdIndexingStrategyState state;
private final @NonNull ThresholdIndexingStrategyConfiguration configuration;
public ThresholdIndexingStrategyTransactionIdFetcher(@NonNull ThresholdIndexingStrategyConfiguration configuration,
- @NonNull SearchTrackingComponent searchTrackingComponent,
+ @NonNull DataSource dataSource,
@NonNull ThresholdIndexingStrategyState state) {
// No more required than the amount of background workers.
// If the queue is full, it means that the background workers can not keep up with the transaction fetcher anyway.
// Slow down in this case.
- this(configuration, searchTrackingComponent, state, new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers()));
+ this(configuration, dataSource, state, new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers()));
}
ThresholdIndexingStrategyTransactionIdFetcher(@NonNull ThresholdIndexingStrategyConfiguration configuration,
- @NonNull SearchTrackingComponent searchTrackingComponent,
+ @NonNull DataSource dataSource,
@NonNull ThresholdIndexingStrategyState state,
- @NonNull LinkedBlockingDeque<@NonNull List<@NonNull Transaction>> queuedTransactions) {
+ @NonNull LinkedBlockingDeque<@NonNull List<@NonNull Long>> queuedTransactionIDs) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));
if (configuration.getTransactionsBatchSize() <= 0)
throw new IllegalArgumentException(String.format("The batch size must be greater than zero (%d provided).", configuration.getTransactionsBatchSize()));
- this.searchTrackingComponent = searchTrackingComponent;
+ this.jdbcTemplate = new JdbcTemplate(dataSource);
this.state = state;
this.configuration = configuration;
- this.queuedTransactions = queuedTransactions;
+ this.queuedTransactionIDs = queuedTransactionIDs;
}
@Override
@@ -56,21 +53,20 @@ public void run() {
long maxTransactionId = state.getMaxTransactionId();
int amountOfTransactionsToFetch = configuration.getTransactionsBackgroundWorkers() * configuration.getTransactionsBatchSize();
- List fetchedTransactions;
+ List fetchedTransactionsIDs;
do {
+ String query = String.format(QUERY, currentTransactionId, maxTransactionId, amountOfTransactionsToFetch);
log.trace("Fetching transactions from ({}) to ({}).", currentTransactionId, Math.min(currentTransactionId + amountOfTransactionsToFetch, maxTransactionId));
- fetchedTransactions = searchTrackingComponent.getTransactions(currentTransactionId, Long.MIN_VALUE,
- maxTransactionId, Long.MAX_VALUE, amountOfTransactionsToFetch);
- log.trace("Fetched ({}) transactions.", fetchedTransactions.size());
- if (fetchedTransactions.isEmpty()) break;
+ fetchedTransactionsIDs = jdbcTemplate.queryForList(query, Long.class);
+ log.trace("Fetched ({}) transaction ID(s).", fetchedTransactionsIDs.size());
+ if (fetchedTransactionsIDs.isEmpty()) break;
- queueTransactions(fetchedTransactions);
- currentTransactionId = fetchedTransactions.get(fetchedTransactions.size() - 1).getId() + 1;
+ queueTransactions(fetchedTransactionsIDs);
+ currentTransactionId = fetchedTransactionsIDs.get(fetchedTransactionsIDs.size() - 1) + 1;
state.setCurrentTransactionId(currentTransactionId); // UI update; nice to have.
} while (currentTransactionId < maxTransactionId); // maxTransactionId is exclusive.
- } catch (InterruptedException e) {
- log.warn("The ThresholdIndexingStrategyTransactionIdFetcher has been interrupted. This is unexpected behavior. " +
- "Trying to recover by signaling the end to the transaction merger(s).", e);
+ } catch (Exception e) {
+ log.warn("An exception occurred while fetching transactions. Trying to signal the end to the transaction merger(s).", e);
} finally {
try {
signalEnd();
@@ -84,26 +80,26 @@ public void run() {
private void signalEnd() throws InterruptedException {
// Signal to each of the transaction mergers that the end has been reached.
log.trace("Signaling the end to ({}) transaction merger(s).", configuration.getTransactionsBackgroundWorkers());
- for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i++) queuedTransactions.putLast(List.of());
+ for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i++) queuedTransactionIDs.putLast(List.of());
}
- public @NonNull List<@NonNull Transaction> getNextTransactions() throws InterruptedException {
- List transactions = queuedTransactions.takeFirst();
- if (!transactions.isEmpty()) state.getTransactionBatchesQueueSize().decrementAndGet();
+ public @NonNull List<@NonNull Long> getNextTransactionIDs() throws InterruptedException {
+ List transactionIDs = queuedTransactionIDs.takeFirst();
+ if (!transactionIDs.isEmpty()) state.getTransactionBatchesQueueSize().decrementAndGet();
else log.trace("One of the transaction mergers is receiving the end signal from the transaction fetcher.");
- return transactions;
+ return transactionIDs;
}
- private void queueTransactions(@NonNull List<@NonNull Transaction> transactions) throws InterruptedException {
- int transactionsSize = transactions.size();
+ private void queueTransactions(@NonNull List<@NonNull Long> transactionIDs) throws InterruptedException {
+ int transactionsSize = transactionIDs.size();
for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i ++) {
- List workerBatch = transactions.subList(i * configuration.getTransactionsBatchSize(),
+ List workerBatch = transactionIDs.subList(i * configuration.getTransactionsBatchSize(),
Math.min((i + 1) * configuration.getTransactionsBatchSize(), transactionsSize));
if (!workerBatch.isEmpty()) {
log.trace("Queuing a batch of ({}) transactions for transaction merger ({}).", workerBatch.size(), i);
- queuedTransactions.putLast(workerBatch);
+ queuedTransactionIDs.putLast(workerBatch);
state.getTransactionBatchesQueueSize().incrementAndGet();
}
if (workerBatch.size() < configuration.getTransactionsBatchSize()) return;
diff --git a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategyTransactionIdMerger.java b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategyTransactionIdMerger.java
index 4ea1410..dc47fb6 100644
--- a/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategyTransactionIdMerger.java
+++ b/alfresco-health-processor-platform/src/main/java/eu/xenit/alfresco/healthprocessor/indexing/threshold/ThresholdIndexingStrategyTransactionIdMerger.java
@@ -6,7 +6,6 @@
import org.alfresco.repo.domain.node.Node;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.repo.solr.NodeParameters;
-import org.alfresco.repo.solr.Transaction;
import org.alfresco.service.cmr.repository.NodeRef;
import java.util.HashMap;
@@ -14,7 +13,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
-import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
@@ -32,15 +30,15 @@ public void run() {
log.debug("Starting ({}).", Thread.currentThread().getName());
HashSet bucket = new HashSet<>(configuration.getThreshold());
- List newTransactions;
- while (!(newTransactions = fetcher.getNextTransactions()).isEmpty()) {
+ List newTransactionIDs;
+ while (!(newTransactionIDs = fetcher.getNextTransactionIDs()).isEmpty()) {
// Fetch the nodes of the new transactions.
// Even though we fetch multiple transactions at once for IO-efficiency, we still need to keep track
// of which nodes belong to which transaction.
// Transactions that already contain #threshold nodes shouldn't be part of the merging process, since
// there is no point (they are already large enough).
- log.trace("Fetched a new batch of ({}) transaction(s) from the transaction fetcher to process.", newTransactions.size());
- nodeParameters.setTransactionIds(newTransactions.stream().map(Transaction::getId).collect(Collectors.toList()));
+ log.trace("Fetched a new batch of ({}) transaction(s) from the transaction fetcher to process.", newTransactionIDs.size());
+ nodeParameters.setTransactionIds(newTransactionIDs);
HashMap> fetchedTransactionNodes = new HashMap<>(nodeParameters.getTransactionIds().size());
searchTrackingComponent.getNodes(nodeParameters, node ->
fetchedTransactionNodes.computeIfAbsent(node.getTransaction().getId(), ignored -> new HashSet<>()).add(node));