Skip to content

Commit

Permalink
[NYS2AWS-134] preparation before PR
Browse files Browse the repository at this point in the history
  • Loading branch information
pvriel committed Feb 11, 2025
1 parent a13ce9c commit f66caf7
Show file tree
Hide file tree
Showing 10 changed files with 277 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.alfresco.service.cmr.repository.NodeRef;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.lang.Nullable;

import javax.sql.DataSource;
import java.util.HashSet;
Expand Down Expand Up @@ -40,7 +42,22 @@ public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull DataSource dataSource,
@NonNull MeterRegistry meterRegistry) {
@Nullable MeterRegistry meterRegistry) {
this(configuration, nodeDAO, searchTrackingComponent, new JdbcTemplate(dataSource), meterRegistry);
}

ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate) {
this(configuration, nodeDAO, searchTrackingComponent, jdbcTemplate, null);
}

ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull SearchTrackingComponent searchTrackingComponent,
@NonNull JdbcTemplate jdbcTemplate,
@Nullable MeterRegistry meterRegistry) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));

Expand All @@ -49,14 +66,14 @@ public ThresholdIndexingStrategy(@NonNull ThresholdIndexingStrategyConfiguration
this.nodeDAO = nodeDAO;

this.runningThreads = new HashSet<>(configuration.getTransactionsBackgroundWorkers() + 1);
this.transactionIdFetcher = new ThresholdIndexingStrategyTransactionIdFetcher(configuration, dataSource, state);
this.transactionIdFetcher = new ThresholdIndexingStrategyTransactionIdFetcher(configuration, jdbcTemplate, state);
this.queuedNodes = new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers());

this.transactionIdMergers = new ThresholdIndexingStrategyTransactionIdMerger[configuration.getTransactionsBackgroundWorkers()];
for (int i = 0; i < configuration.getTransactionsBackgroundWorkers(); i++)
this.transactionIdMergers[i] = new ThresholdIndexingStrategyTransactionIdMerger(transactionIdFetcher, queuedNodes, configuration, searchTrackingComponent);

bindTo(meterRegistry);
if (meterRegistry != null) bindTo(meterRegistry);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@
import lombok.extern.slf4j.Slf4j;
import org.springframework.jdbc.core.JdbcTemplate;

import javax.sql.DataSource;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;

// TODO: add an explanation about why we directly contact the DB.
/**
* Problem we discovered while implementing this: you would think that you can use the
* searchTrackingComponent.getTransactions(...) method to fetch the transaction IDs.
* DO NOT USE THIS APPROACH UNLESS YOU ALSO SPECIFY A RANGE (TRANSACTION ID OR DATE) IN THE QUERY.
* Alfresco does not send a LIMIT to the database, so it will fetch all transactions from the database
* and then apply the limit in Java. This is not efficient and can cause memory issues.
*/
@Slf4j
public class ThresholdIndexingStrategyTransactionIdFetcher implements Runnable, MeterBinder {

Expand All @@ -26,24 +31,24 @@ public class ThresholdIndexingStrategyTransactionIdFetcher implements Runnable,
private boolean isRunning = false;

public ThresholdIndexingStrategyTransactionIdFetcher(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull DataSource dataSource,
@NonNull JdbcTemplate jdbcTemplate,
@NonNull ThresholdIndexingStrategyState state) {
// No more required than the amount of background workers.
// Queue size: no more required than the amount of background workers.
// If the queue is full, it means that the background workers can not keep up with the transaction fetcher anyway.
// Slow down in this case.
this(configuration, dataSource, state, new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers()));
this(configuration, jdbcTemplate, state, new LinkedBlockingDeque<>(configuration.getTransactionsBackgroundWorkers()));
}

ThresholdIndexingStrategyTransactionIdFetcher(@NonNull ThresholdIndexingStrategyConfiguration configuration,
@NonNull DataSource dataSource,
@NonNull JdbcTemplate jdbcTemplate,
@NonNull ThresholdIndexingStrategyState state,
@NonNull LinkedBlockingDeque<@NonNull List<@NonNull Long>> queuedTransactionIDs) {
@NonNull BlockingDeque<@NonNull List<@NonNull Long>> queuedTransactionIDs) {
if (configuration.getTransactionsBackgroundWorkers() <= 0)
throw new IllegalArgumentException(String.format("The amount of background workers must be greater than zero (%d provided).", configuration.getTransactionsBackgroundWorkers()));
if (configuration.getTransactionsBatchSize() <= 0)
throw new IllegalArgumentException(String.format("The batch size must be greater than zero (%d provided).", configuration.getTransactionsBatchSize()));

this.jdbcTemplate = new JdbcTemplate(dataSource);
this.jdbcTemplate = jdbcTemplate;
this.state = state;
this.configuration = configuration;
this.queuedTransactionIDs = queuedTransactionIDs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.alfresco.service.cmr.repository.NodeRef;

import org.alfresco.util.Pair;
import org.springframework.lang.Nullable;

import javax.annotation.Nonnull;
import java.util.HashMap;
Expand Down Expand Up @@ -42,11 +43,19 @@ public class SolrUndersizedTransactionsHealthProcessorPlugin extends ToggleableH
private final @NonNull AbstractNodeDAOImpl nodeDAO;
private final @Getter @NonNull Map<@NonNull String, @NonNull String> configuration;

// Used for tests.
SolrUndersizedTransactionsHealthProcessorPlugin(boolean enabled, int mergerThreads,
@NonNull Properties properties,
@NonNull TransactionHelper transactionHelper,
@NonNull AbstractNodeDAOImpl nodeDAO) {
this(enabled, mergerThreads, properties, transactionHelper, nodeDAO, null);
}

public SolrUndersizedTransactionsHealthProcessorPlugin(boolean enabled, int mergerThreads,
@NonNull Properties properties,
@NonNull TransactionHelper transactionHelper,
@NonNull AbstractNodeDAOImpl nodeDAO,
@NonNull MeterRegistry meterRegistry) {
@Nullable MeterRegistry meterRegistry) {
super(enabled);
if (enabled) guaranteeThresholdIndexerIsUsed(properties);

Expand All @@ -58,7 +67,7 @@ public SolrUndersizedTransactionsHealthProcessorPlugin(boolean enabled, int merg
this.configuration = new HashMap<>(super.getConfiguration());
this.configuration.put(MERGER_THREADS_CONFIGURATION_KEY, String.valueOf(mergerThreads));

bindTo(meterRegistry);
if (meterRegistry != null) bindTo(meterRegistry);
}

@Nonnull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@
import eu.xenit.alfresco.healthprocessor.indexing.txnid.TxnIdIndexingConfiguration;
import eu.xenit.alfresco.healthprocessor.util.AttributeStore;
import eu.xenit.alfresco.healthprocessor.util.InMemoryAttributeStore;
import io.micrometer.core.instrument.MeterRegistry;
import org.alfresco.repo.domain.node.AbstractNodeDAOImpl;
import org.alfresco.repo.search.SearchTrackingComponent;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import javax.sql.DataSource;

@ExtendWith(MockitoExtension.class)
public class IndexingStrategyFactoryBeanTest {

Expand Down Expand Up @@ -58,6 +61,6 @@ private IndexingStrategyFactoryBean factoryBean() {

private IndexingStrategyFactoryBean factoryBean(IndexingConfiguration configuration) {
return new IndexingStrategyFactoryBean(configuration, trackingComponent, attributeStore,
mock(SearchTrackingComponent.class), mock(AbstractNodeDAOImpl.class));
mock(SearchTrackingComponent.class), mock(AbstractNodeDAOImpl.class), mock(DataSource.class), mock(MeterRegistry.class));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@
import org.alfresco.service.cmr.repository.NodeRef;
import org.alfresco.service.cmr.repository.StoreRef;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.JdbcTemplate;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static eu.xenit.alfresco.healthprocessor.indexing.threshold.ThresholdIndexingStrategyTransactionIdFetcherTest.QUERY_PATTERN;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.*;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -45,8 +47,9 @@ class ThresholdIndexingStrategyTest {
= new ThresholdIndexingStrategyConfiguration(AMOUNT_OF_BACKGROUND_WORKERS, TRANSACTION_BATCHES, THRESHOLD,
0, AMOUNT_OF_TRANSACTIONS);

private final @NonNull AbstractNodeDAOImpl nodeDAO;
private final @NonNull SearchTrackingComponent searchTrackingComponent;
private final @NonNull AbstractNodeDAOImpl nodeDAO = mock(AbstractNodeDAOImpl.class);
private final @NonNull JdbcTemplate dummyJdbcTemplate = mock(JdbcTemplate.class);
private final @NonNull SearchTrackingComponent searchTrackingComponent = mock(SearchTrackingComponent.class);
private final @NonNull ArrayList<Node> nodes = new ArrayList<>();
private final @NonNull ThresholdIndexingStrategy indexingStrategy;

Expand All @@ -61,23 +64,9 @@ public ThresholdIndexingStrategyTest() {
nodes.add(node);
}

this.nodeDAO = mock(AbstractNodeDAOImpl.class);
when(nodeDAO.getMinTxnId()).thenReturn(0L);

this.searchTrackingComponent = mock(SearchTrackingComponent.class);
when(searchTrackingComponent.getMaxTxnId()).thenReturn((long) AMOUNT_OF_TRANSACTIONS);
when(searchTrackingComponent.getTransactions(anyLong(), anyLong(), anyLong(), anyLong(), anyInt())).thenAnswer(invocation -> {
long start = invocation.getArgument(0);
long end = Math.min(invocation.getArgument(2), Math.min(start + (int) invocation.getArgument(4),
AMOUNT_OF_TRANSACTIONS));

return LongStream.range(start, end)
.mapToObj(txnId -> {
Transaction transaction = mock(Transaction.class);
when(transaction.getId()).thenReturn(txnId);
return transaction;
}).collect(Collectors.toList());
});
doAnswer(invocation -> {
NodeParameters nodeParameters = invocation.getArgument(0);
SearchTrackingComponent.NodeQueryCallback callback = invocation.getArgument(1);
Expand All @@ -89,7 +78,18 @@ public ThresholdIndexingStrategyTest() {
return null;
}).when(searchTrackingComponent).getNodes(any(), any());

this.indexingStrategy = new ThresholdIndexingStrategy(CONFIGURATION, nodeDAO, searchTrackingComponent);
when(dummyJdbcTemplate.queryForList(anyString(), eq(Long.class))).thenAnswer(invocation -> {
Matcher matcher = QUERY_PATTERN.matcher(invocation.getArgument(0));
if (!matcher.matches()) throw new IllegalArgumentException(String.format("unexpected query: %s", invocation.getArgument(0)));
int startTxnId = Integer.parseInt(matcher.group(1));
int endTxnId = Integer.parseInt(matcher.group(2));
int limit = Integer.parseInt(matcher.group(3));

int end = Math.min(startTxnId + limit, Math.min(endTxnId, AMOUNT_OF_TRANSACTIONS));
return LongStream.range(startTxnId, end).boxed().collect(Collectors.toList());
});

this.indexingStrategy = new ThresholdIndexingStrategy(CONFIGURATION, nodeDAO, searchTrackingComponent, dummyJdbcTemplate);
}

@Test
Expand All @@ -115,7 +115,7 @@ void getNextNodeIds() {
@Test
public void testArguments() {
ThresholdIndexingStrategyConfiguration configuration = new ThresholdIndexingStrategyConfiguration(0, 0, 0, 0, 0);
assertThrows(IllegalArgumentException.class, () -> new ThresholdIndexingStrategy(configuration, nodeDAO, searchTrackingComponent));
assertThrows(IllegalArgumentException.class, () -> new ThresholdIndexingStrategy(configuration, nodeDAO, searchTrackingComponent, dummyJdbcTemplate));
}

}
Loading

0 comments on commit f66caf7

Please sign in to comment.