Skip to content

Commit

Permalink
[NYS2AWS-134] 'threshold' -> txn-aggregation indexing strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
pvriel committed Feb 11, 2025
1 parent dc2a22e commit f3dd75b
Show file tree
Hide file tree
Showing 17 changed files with 120 additions and 127 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Version template:
* Added the SolrUndersizedTransactionsHealthProcessorPlugin,
which merges the transactions before they are being indexed by Solr
to improve performance.
* Added the threshold indexing strategy, which multi-threads the retrieval
* Added the txn-aggregation indexing strategy, which multi-threads the retrieval
and batching process of transactions to improve performance.

## [0.6.1] - 2025-01-08
Expand Down
18 changes: 9 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ eu.xenit.alfresco.healthprocessor.indexing.last-txns.lookback-transactions=10000
eu.xenit.alfresco.healthprocessor.indexing.last-txns.txn-batch-size=5000
```

#### Multi-threaded threshold indexing
#### Multi-threaded txn-aggregation strategy

Strategy id: `threshold`
Strategy id: `txn-aggregation`

Creates a single thread that goes over all transactions in the range of `txn-id.start` and `txn-id.end` (-1 can be used to indicate all transactions).
Also creates `transactions-background-workers` threads that process these transactions in batches of `transactions-batch-size` transactions in parallel.
Expand All @@ -163,12 +163,12 @@ is scheduled for delivery by the indexing strategy.
The original bucket is then cleared and can be filled with new nodes.

```properties
eu.xenit.alfresco.healthprocessor.indexing.strategy=threshold
eu.xenit.alfresco.healthprocessor.indexing.threshold.transactions-background-workers=5
eu.xenit.alfresco.healthprocessor.indexing.threshold.transactions-batch-size=1000
eu.xenit.alfresco.healthprocessor.indexing.threshold.threshold=1000
eu.xenit.alfresco.healthprocessor.indexing.txn-id.start=-1
eu.xenit.alfresco.healthprocessor.indexing.txn-id.end=-1
eu.xenit.alfresco.healthprocessor.indexing.strategy=txn-aggregation
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-background-workers=5
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-batch-size=1000
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.threshold=1000
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.start=-1
eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.end=-1
```

### HealthProcessorPlugin implementations
Expand Down Expand Up @@ -281,7 +281,7 @@ All nodes for which an `PURGE` and `REINDEX` commands has been succesfully sent

Activation property: `eu.xenit.alfresco.healthprocessor.plugin.solr-transaction-merger.enabled=true`

<b>Note: this plugin can only be used in combination with the threshold indexing strategy.
<b>Note: this plugin can only be used in combination with the `txn-aggregation` indexing strategy.
If this indexing strategy is not selected while the plugin is enabled, the health processor platform will not boot.</b>

This plugin is used to merge transactions that are too small to be indexed by Solr in a performant manner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
value="${eu.xenit.alfresco.healthprocessor.indexing.last-txns.txn-batch-size}" />
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.indexing.threshold.ThresholdIndexingStrategyConfiguration"
class="eu.xenit.alfresco.healthprocessor.indexing.threshold.ThresholdIndexingStrategyConfiguration"
<bean id="eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.ThresholdIndexingStrategyConfiguration"
class="eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.TransactionAggregationIndexingStrategyConfiguration"
autowire-candidate="false">
<constructor-arg name="transactionsBackgroundWorkers" value="${eu.xenit.alfresco.healthprocessor.indexing.threshold.transactions-background-workers}" />
<constructor-arg name="transactionsBatchSize" value="${eu.xenit.alfresco.healthprocessor.indexing.threshold.transactions-batch-size}" />
<constructor-arg name="threshold" value="${eu.xenit.alfresco.healthprocessor.indexing.threshold.threshold}" />
<constructor-arg name="minTransactionId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-id.start}" />
<constructor-arg name="maxTransactionId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-id.end}" />
<constructor-arg name="transactionsBackgroundWorkers" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-background-workers}" />
<constructor-arg name="transactionsBatchSize" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.transactions-batch-size}" />
<constructor-arg name="threshold" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.threshold}" />
<constructor-arg name="minTransactionId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.start}" />
<constructor-arg name="maxTransactionId" value="${eu.xenit.alfresco.healthprocessor.indexing.txn-aggregation.end}" />
</bean>

<bean id="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration"
Expand All @@ -44,7 +44,7 @@
<list value-type="eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration">
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdIndexingConfiguration" />
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsIndexingConfiguration" />
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.threshold.ThresholdIndexingStrategyConfiguration" />
<ref bean="eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.ThresholdIndexingStrategyConfiguration" />
</list>
</constructor-arg>
</bean>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ default CycleProgress getCycleProgress() {
enum IndexingStrategyKey {
TXNID("txn-id"),
LAST_TXNS("last-txns"),
THRESHOLD("threshold");
TXN_AGGREGATION("txn-aggregation");

@Getter
private final String key;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

import eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsBasedIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.lasttxns.LastTxnsIndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.indexing.threshold.ThresholdIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.threshold.ThresholdIndexingStrategyConfiguration;
import eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.TransactionAggregationIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.txnaggregation.TransactionAggregationIndexingStrategyConfiguration;
import eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdBasedIndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdIndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.util.AttributeStore;
Expand Down Expand Up @@ -42,8 +42,8 @@ private IndexingStrategy createIndexingStrategy(IndexingStrategy.IndexingStrateg
return new TxnIdBasedIndexingStrategy((TxnIdIndexingConfiguration) configuration, trackingComponent, attributeStore);
case LAST_TXNS:
return new LastTxnsBasedIndexingStrategy((LastTxnsIndexingConfiguration) configuration, trackingComponent);
case THRESHOLD:
return new ThresholdIndexingStrategy((ThresholdIndexingStrategyConfiguration) configuration, nodeDAO,
case TXN_AGGREGATION:
return new TransactionAggregationIndexingStrategy((TransactionAggregationIndexingStrategyConfiguration) configuration, nodeDAO,
searchTrackingComponent, dataSource, meterRegistry);
default:
throw new IllegalArgumentException("Unknown indexing strategy: "+ indexingStrategy);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.xenit.alfresco.healthprocessor.indexing.threshold;
package eu.xenit.alfresco.healthprocessor.indexing.txnaggregation;

import eu.xenit.alfresco.healthprocessor.indexing.IndexingStrategy;
import eu.xenit.alfresco.healthprocessor.indexing.NullCycleProgress;
Expand All @@ -25,39 +25,39 @@
import java.util.function.LongSupplier;

@Slf4j
public class ThresholdIndexingStrategy implements IndexingStrategy, MeterBinder {
public class TransactionAggregationIndexingStrategy implements IndexingStrategy, MeterBinder {

private final @NonNull ThresholdIndexingStrategyConfiguration configuration;
private final @NonNull TransactionAggregationIndexingStrategyConfiguration configuration;
private final @NonNull AbstractNodeDAOImpl nodeDAO;
private final @NonNull SearchTrackingComponent searchTrackingComponent;
private final @NonNull ThresholdIndexingStrategyState state = new ThresholdIndexingStrategyState();
private final @NonNull ThresholdIndexingStrategyTransactionIdFetcher transactionIdFetcher;
private final @NonNull TransactionAggregationIndexingStrategyState state = new TransactionAggregationIndexingStrategyState();
private final @NonNull TransactionAggregationIndexingStrategyTransactionIdFetcher transactionIdFetcher;
private final @NonNull BlockingDeque<@NonNull Set<@NonNull NodeRef>> queuedNodes;
private final @NonNull ThresholdIndexingStrategyTransactionIdMerger @NonNull [] transactionIdMergers;
private final @NonNull TransactionAggregationIndexingStrategyTransactionIdMerger @NonNull [] transactionIdMergers;
private final @NonNull HashSet<@NonNull Thread> runningThreads;
private final @NonNull AtomicReference<@NonNull CycleProgress> cycleProgress = new AtomicReference<>(NullCycleProgress.getInstance());
private final @NonNull LongSupplier progressReporter = state::getCurrentTransactionId;

public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull DataSource dataSource,
@Nullable MeterRegistry meterRegistry) {
public TransactionAggregationIndexingStrategy(@NonNull TransactionAggregationIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull DataSource dataSource,
@Nullable MeterRegistry meterRegistry) {
this(configuration, nodeDAO, searchTrackingComponent, new JdbcTemplate(dataSource), meterRegistry);
}

ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate) {
TransactionAggregationIndexingStrategy(@NonNull TransactionAggregationIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate) {
this(configuration, nodeDAO, searchTrackingComponent, jdbcTemplate, null);
}

ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate,
@Nullable MeterRegistry meterRegistry) {
TransactionAggregationIndexingStrategy(@NonNull TransactionAggregationIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate,
@Nullable MeterRegistry meterRegistry) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));

Expand All @@ -66,12 +66,12 @@ public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration
this.nodeDAO = nodeDAO;

this.runningThreads = new HashSet<>(configuration.getTransactionsBackgroundWorkers() + 1);
this.transactionIdFetcher = new ThresholdIndexingStrategyTransactionIdFetcher(configuration, jdbcTemplate, state);
this.transactionIdFetcher = new TransactionAggregationIndexingStrategyTransactionIdFetcher(configuration, jdbcTemplate, state);
this.queuedNodes = new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers());

this.transactionIdMergers = new ThresholdIndexingStrategyTransactionIdMerger[configuration.getTransactionsBackgroundWorkers()];
this.transactionIdMergers = new TransactionAggregationIndexingStrategyTransactionIdMerger[configuration.getTransactionsBackgroundWorkers()];
for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i++)
this.transactionIdMergers[i] = new ThresholdIndexingStrategyTransactionIdMerger(transactionIdFetcher, queuedNodes, configuration, searchTrackingComponent);
this.transactionIdMergers[i] = new TransactionAggregationIndexingStrategyTransactionIdMerger(transactionIdFetcher, queuedNodes, configuration, searchTrackingComponent);

if (meterRegistry != null) bindTo(meterRegistry);
}
Expand All @@ -81,15 +81,15 @@ public void onStart() {
state.setCurrentTransactionId(Math.max(configuration.getMinTransactionId(), nodeDAO.getMinTxnId()));
state.setMaxTransactionId(Math.min(configuration.getMaxTransactionId() >= 0? configuration.getMaxTransactionId() : Long.MAX_VALUE, searchTrackingComponent.getMaxTxnId()));
cycleProgress.set(new SimpleCycleProgress(state.getCurrentTransactionId(), state.getMaxTransactionId(), progressReporter));
log.debug("Starting the ThresholdIndexingStrategy with currentTransactionId ({}) and maxTransactionId ({}).", state.getCurrentTransactionId(), state.getMaxTransactionId());
log.debug("Starting the TransactionAggregationIndexingStrategy with currentTransactionId ({}) and maxTransactionId ({}).", state.getCurrentTransactionId(), state.getMaxTransactionId());

Thread fetcherThread = new Thread(transactionIdFetcher);
fetcherThread.setName("ThresholdIndexingStrategyTransactionIdFetcher");
fetcherThread.setName("TransactionAggregationIndexingStrategyTransactionIdFetcher");
runningThreads.add(fetcherThread);
for (int i = 0; i < transactionIdMergers.length; i++) {
ThresholdIndexingStrategyTransactionIdMerger merger = transactionIdMergers[i];
TransactionAggregationIndexingStrategyTransactionIdMerger merger = transactionIdMergers[i];
Thread mergerThread = new Thread(merger);
mergerThread.setName(String.format("ThresholdIndexingStrategyTransactionIdMerger-%d", i));
mergerThread.setName(String.format("TransactionAggregationIndexingStrategyTransactionIdMerger-%d", i));
runningThreads.add(new Thread(merger));
}
for (Thread thread : runningThreads) thread.start();
Expand All @@ -115,7 +115,7 @@ public void onStart() {

@Override
public void onStop() {
log.debug("Stopping the ThresholdIndexingStrategy.");
log.debug("Stopping the TransactionAggregationIndexingStrategy.");
for (Thread thread : runningThreads) thread.interrupt();
runningThreads.clear();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.xenit.alfresco.healthprocessor.indexing.threshold;
package eu.xenit.alfresco.healthprocessor.indexing.txnaggregation;

import eu.xenit.alfresco.healthprocessor.indexing.IndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.indexing.IndexingStrategy;
Expand All @@ -8,7 +8,7 @@
import java.util.Map;

@Data
public class ThresholdIndexingStrategyConfiguration implements IndexingConfiguration {
public class TransactionAggregationIndexingStrategyConfiguration implements IndexingConfiguration {

private final int transactionsBackgroundWorkers;
private final int transactionsBatchSize;
Expand All @@ -17,8 +17,8 @@ public class ThresholdIndexingStrategyConfiguration implements IndexingConfigura
private final int maxTransactionId;
private final @NonNull Map<@NonNull String, @NonNull String> configuration;

public ThresholdIndexingStrategyConfiguration(int transactionsBackgroundWorkers, int transactionsBatchSize,
int threshold, int minTransactionId, int maxTransactionId) {
public TransactionAggregationIndexingStrategyConfiguration(int transactionsBackgroundWorkers, int transactionsBatchSize,
int threshold, int minTransactionId, int maxTransactionId) {
this.transactionsBackgroundWorkers = transactionsBackgroundWorkers;
this.transactionsBatchSize = transactionsBatchSize;
this.threshold = threshold;
Expand All @@ -36,7 +36,7 @@ public ThresholdIndexingStrategyConfiguration(int transactionsBackgroundWorkers,

@Override
public @NonNull IndexingStrategy.IndexingStrategyKey getIndexingStrategy() {
return IndexingStrategy.IndexingStrategyKey.THRESHOLD;
return IndexingStrategy.IndexingStrategyKey.TXN_AGGREGATION;
}

}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package eu.xenit.alfresco.healthprocessor.indexing.threshold;
package eu.xenit.alfresco.healthprocessor.indexing.txnaggregation;

import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;
Expand All @@ -13,7 +13,7 @@
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ThresholdIndexingStrategyState implements MeterBinder {
public class TransactionAggregationIndexingStrategyState implements MeterBinder {

private long currentTransactionId = -1;
private long maxTransactionId = -1;
Expand All @@ -35,9 +35,9 @@ public void decrementRunningTransactionMergers() {

@Override
public void bindTo(MeterRegistry registry) {
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.current-transaction-id", this, ThresholdIndexingStrategyState::getCurrentTransactionId);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.max-transaction-id", this, ThresholdIndexingStrategyState::getMaxTransactionId);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.running-transaction-mergers", this, ThresholdIndexingStrategyState::getRunningTransactionMergers);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.current-transaction-id", this, TransactionAggregationIndexingStrategyState::getCurrentTransactionId);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.max-transaction-id", this, TransactionAggregationIndexingStrategyState::getMaxTransactionId);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.running-transaction-mergers", this, TransactionAggregationIndexingStrategyState::getRunningTransactionMergers);
registry.gauge("eu.xenit.alfresco.healthprocessor.indexing.threshold.queued-transaction-batches", transactionBatchesQueueSize, AtomicInteger::get);
}
}
Loading

0 comments on commit f3dd75b

Please sign in to comment.