Skip to content

Commit

Permalink
[NYS2AWS-134] fixed bug that caused entire alf_transaction table to b…
Browse files Browse the repository at this point in the history
…e fetched
  • Loading branch information
pvriel committed Feb 10, 2025
1 parent 27208a8 commit 85707a9
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,6 @@
<constructor-arg name="attributeStore" ref="eu.xenit.alfresco.healthprocessor.util.AlfrescoAttributeStore" />
<constructor-arg name="searchTrackingComponent" ref="searchTrackingComponent" />
<constructor-arg name="nodeDAO" ref="nodeDAO" />
<constructor-arg name="dataSource" ref="dataSource" />
</bean>
</beans>
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.alfresco.repo.search.SearchTrackingComponent;
import org.springframework.beans.factory.config.AbstractFactoryBean;

import javax.sql.DataSource;

@AllArgsConstructor
public final class IndexingStrategyFactoryBean extends AbstractFactoryBean<IndexingStrategy> {

Expand All @@ -20,6 +22,7 @@ public final class IndexingStrategyFactoryBean extends AbstractFactoryBean<Index
private final AttributeStore attributeStore;
private final SearchTrackingComponent searchTrackingComponent;
private final AbstractNodeDAOImpl nodeDAO;
private final DataSource dataSource;

@Override
public Class<?> getObjectType() {
Expand All @@ -38,7 +41,7 @@ private IndexingStrategy createIndexingStrategy(IndexingStrategy.IndexingStrateg
case LAST_TXNS:
return new LastTxnsBasedIndexingStrategy((LastTxnsIndexingConfiguration) configuration, trackingComponent);
case THRESHOLD:
return new ThresholdIndexingStrategy((ThresholdIndexingStrategyConfiguration) configuration, nodeDAO, searchTrackingComponent);
return new ThresholdIndexingStrategy((ThresholdIndexingStrategyConfiguration) configuration, nodeDAO, searchTrackingComponent, dataSource);
default:
throw new IllegalArgumentException("Unknown indexing strategy: "+ indexingStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.service.cmr.repository.NodeRef;

import javax.sql.DataSource;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
Expand All @@ -35,7 +36,8 @@ public class ThresholdIndexingStrategy implements IndexingStrategy {

public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent) {
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull DataSource dataSource) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));

Expand All @@ -44,7 +46,7 @@ public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration
this.nodeDAO = nodeDAO;

this.runningThreads = new HashSet<>(configuration.getTransactionsBackgroundWorkers() + 1);
this.transactionIdFetcher = new ThresholdIndexingStrategyTransactionIdFetcher(configuration, searchTrackingComponent, state);
this.transactionIdFetcher = new ThresholdIndexingStrategyTransactionIdFetcher(configuration, dataSource, state);
this.queuedNodes = new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers());

this.transactionIdMergers = new ThresholdIndexingStrategyTransactionIdMerger[configuration.getTransactionsBackgroundWorkers()];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,47 @@

import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.repo.solr.Transaction;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

/**
* In case the performance ever becomes an issue: this part can be easily multi-threaded too,
* by dividing the transaction range into multiple parts and fetching them in parallel (using multiple fetchers).
* However, this is not necessary at the moment. We're already at 200.000 transactions per minute on an underpowered laptop.
* I'm just leaving this note here, in case this might help another developer in the future.
*/
// TODO: add an explanation about why we directly contact the DB.
@Slf4j
public class ThresholdIndexingStrategyTransactionIdFetcher implements Runnable {

private final @NonNull BlockingDeque<@NonNull List<@NonNull Transaction>> queuedTransactions;
private final static @NonNull String QUERY = "SELECT txn.id as id FROM alf_transaction txn WHERE txn.id BETWEEN %d AND %d ORDER BY txn.id ASC LIMIT %d";

private final @NonNull SearchTrackingComponent searchTrackingComponent;
private final @NonNull BlockingDeque<@NonNull List<@NonNull Long>> queuedTransactionIDs;

private final @NonNull JdbcTemplate jdbcTemplate;
private final @NonNull ThresholdIndexingStrategyState state;
private final @NonNull ThresholdIndexingStrategyConfiguration configuration;

public ThresholdIndexingStrategyTransactionIdFetcher(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull DataSource dataSource,
@NonNull ThresholdIndexingStrategyState state) {
// No more required than the amount of background workers.
// If the queue is full, it means that the background workers can not keep up with the transaction fetcher anyway.
// Slow down in this case.
this(configuration, searchTrackingComponent, state, new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers()));
this(configuration, dataSource, state, new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers()));
}

ThresholdIndexingStrategyTransactionIdFetcher(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull DataSource dataSource,
@NonNull ThresholdIndexingStrategyState state,
@NonNull LinkedBlockingDeque<@NonNull List<@NonNull Transaction>> queuedTransactions) {
@NonNull LinkedBlockingDeque<@NonNull List<@NonNull Long>> queuedTransactionIDs) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));
if (configuration.getTransactionsBatchSize() <= 0)
throw new IllegalArgumentException(String.format("The batch size must be greater than zero (%d provided).", configuration.getTransactionsBatchSize()));

this.searchTrackingComponent = searchTrackingComponent;
this.jdbcTemplate = new JdbcTemplate(dataSource);
this.state = state;
this.configuration = configuration;
this.queuedTransactions = queuedTransactions;
this.queuedTransactionIDs = queuedTransactionIDs;
}

@Override
Expand All @@ -56,21 +53,20 @@ public void run() {
long maxTransactionId = state.getMaxTransactionId();
int amountOfTransactionsToFetch = configuration.getTransactionsBackgroundWorkers() * configuration.getTransactionsBatchSize();

List<Transaction> fetchedTransactions;
List<Long> fetchedTransactionsIDs;
do {
String query = String.format(QUERY, currentTransactionId, maxTransactionId, amountOfTransactionsToFetch);
log.trace("Fetching transactions from ({}) to ({}).", currentTransactionId, Math.min(currentTransactionId + amountOfTransactionsToFetch, maxTransactionId));
fetchedTransactions = searchTrackingComponent.getTransactions(currentTransactionId, Long.MIN_VALUE,
maxTransactionId, Long.MAX_VALUE, amountOfTransactionsToFetch);
log.trace("Fetched ({}) transactions.", fetchedTransactions.size());
if (fetchedTransactions.isEmpty()) break;
fetchedTransactionsIDs = jdbcTemplate.queryForList(query, Long.class);
log.trace("Fetched ({}) transaction ID(s).", fetchedTransactionsIDs.size());
if (fetchedTransactionsIDs.isEmpty()) break;

queueTransactions(fetchedTransactions);
currentTransactionId = fetchedTransactions.get(fetchedTransactions.size() - 1).getId() + 1;
queueTransactions(fetchedTransactionsIDs);
currentTransactionId = fetchedTransactionsIDs.get(fetchedTransactionsIDs.size() - 1) + 1;
state.setCurrentTransactionId(currentTransactionId); // UI update; nice to have.
} while (currentTransactionId < maxTransactionId); // maxTransactionId is exclusive.
} catch (InterruptedException e) {
log.warn("The ThresholdIndexingStrategyTransactionIdFetcher has been interrupted. This is unexpected behavior. " +
"Trying to recover by signaling the end to the transaction merger(s).", e);
} catch (Exception e) {
log.warn("An exception occurred while fetching transactions. Trying to signal the end to the transaction merger(s).", e);
} finally {
try {
signalEnd();
Expand All @@ -84,26 +80,26 @@ public void run() {
private void signalEnd() throws InterruptedException {
// Signal to each of the transaction mergers that the end has been reached.
log.trace("Signaling the end to ({}) transaction merger(s).", configuration.getTransactionsBackgroundWorkers());
for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i++) queuedTransactions.putLast(List.of());
for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i++) queuedTransactionIDs.putLast(List.of());
}

public @NonNull List<@NonNull Transaction> getNextTransactions() throws InterruptedException {
List<Transaction> transactions = queuedTransactions.takeFirst();
if (!transactions.isEmpty()) state.getTransactionBatchesQueueSize().decrementAndGet();
public @NonNull List<@NonNull Long> getNextTransactionIDs() throws InterruptedException {
List<Long> transactionIDs = queuedTransactionIDs.takeFirst();
if (!transactionIDs.isEmpty()) state.getTransactionBatchesQueueSize().decrementAndGet();
else log.trace("One of the transaction mergers is receiving the end signal from the transaction fetcher.");
return transactions;
return transactionIDs;
}

private void queueTransactions(@NonNull List<@NonNull Transaction> transactions) throws InterruptedException {
int transactionsSize = transactions.size();
private void queueTransactions(@NonNull List<@NonNull Long> transactionIDs) throws InterruptedException {
int transactionsSize = transactionIDs.size();

for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i ++) {
List<Transaction> workerBatch = transactions.subList(i * configuration.getTransactionsBatchSize(),
List<Long> workerBatch = transactionIDs.subList(i * configuration.getTransactionsBatchSize(),
Math.min((i + 1) * configuration.getTransactionsBatchSize(), transactionsSize));

if (!workerBatch.isEmpty()) {
log.trace("Queuing a batch of ({}) transactions for transaction merger ({}).", workerBatch.size(), i);
queuedTransactions.putLast(workerBatch);
queuedTransactionIDs.putLast(workerBatch);
state.getTransactionBatchesQueueSize().incrementAndGet();
}
if (workerBatch.size() < configuration.getTransactionsBatchSize()) return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
import org.alfresco.repo.domain.node.Node;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.repo.solr.NodeParameters;
import org.alfresco.repo.solr.Transaction;
import org.alfresco.service.cmr.repository.NodeRef;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingDeque;
import java.util.stream.Collectors;

@Slf4j
@RequiredArgsConstructor
Expand All @@ -32,15 +30,15 @@ public void run() {
log.debug("Starting ({}).", Thread.currentThread().getName());
HashSet<NodeRef> bucket = new HashSet<>(configuration.getThreshold());

List<Transaction> newTransactions;
while (!(newTransactions = fetcher.getNextTransactions()).isEmpty()) {
List<Long> newTransactionIDs;
while (!(newTransactionIDs = fetcher.getNextTransactionIDs()).isEmpty()) {
// Fetch the nodes of the new transactions.
// Even though we fetch multiple transactions at once for IO-efficiency, we still need to keep track
// of which nodes belong to which transaction.
// Transactions that already contain #threshold nodes shouldn't be part of the merging process, since
// there is no point (they are already large enough).
log.trace("Fetched a new batch of ({}) transaction(s) from the transaction fetcher to process.", newTransactions.size());
nodeParameters.setTransactionIds(newTransactions.stream().map(Transaction::getId).collect(Collectors.toList()));
log.trace("Fetched a new batch of ({}) transaction(s) from the transaction fetcher to process.", newTransactionIDs.size());
nodeParameters.setTransactionIds(newTransactionIDs);
HashMap<Long, HashSet<Node>> fetchedTransactionNodes = new HashMap<>(nodeParameters.getTransactionIds().size());
searchTrackingComponent.getNodes(nodeParameters, node ->
fetchedTransactionNodes.computeIfAbsent(node.getTransaction().getId(), ignored -> new HashSet<>()).add(node));
Expand Down

0 comments on commit 85707a9

Please sign in to comment.