Skip to content

HSEARCH-5309 Limit the time that the outbox polling processor spends trying to lock/load event entities after processing #4672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,42 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.hibernate.search.util.impl.integrationtest.mapper.orm.OrmUtils.with;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.OneToMany;
import jakarta.persistence.OneToOne;

import org.hibernate.LockMode;
import org.hibernate.SessionFactory;
import org.hibernate.engine.spi.SessionFactoryImplementor;
import org.hibernate.search.integrationtest.mapper.orm.outboxpolling.testsupport.util.OutboxEventFilter;
import org.hibernate.search.integrationtest.mapper.orm.outboxpolling.testsupport.util.TestingOutboxPollingInternalConfigurer;
import org.hibernate.search.mapper.orm.outboxpolling.cfg.HibernateOrmMapperOutboxPollingSettings;
import org.hibernate.search.mapper.orm.outboxpolling.cfg.impl.HibernateOrmMapperOutboxPollingImplSettings;
import org.hibernate.search.mapper.orm.outboxpolling.event.impl.OutboxEvent;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.IndexedEmbedded;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.KeywordField;
import org.hibernate.search.util.impl.integrationtest.common.extension.BackendMock;
import org.hibernate.search.util.impl.integrationtest.mapper.orm.CoordinationStrategyExpectations;
import org.hibernate.search.util.impl.integrationtest.mapper.orm.OrmSetupHelper;
import org.hibernate.search.util.impl.test.extension.ExpectedLog4jLog;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;

import org.awaitility.Awaitility;

/**
* Extensive tests with edge cases for automatic indexing with the outbox-polling strategy.
*/
Expand All @@ -44,6 +53,9 @@ class OutboxPollingAutomaticIndexingEdgeCasesIT {
@RegisterExtension
public static BackendMock backendMock = BackendMock.create();

@RegisterExtension
public ExpectedLog4jLog logged = ExpectedLog4jLog.create();

@RegisterExtension
public static OrmSetupHelper ormSetupHelper =
OrmSetupHelper.withCoordinationStrategy( CoordinationStrategyExpectations.outboxPolling() )
Expand All @@ -65,6 +77,8 @@ void setup() {
HibernateOrmMapperOutboxPollingImplSettings.COORDINATION_INTERNAL_CONFIGURER,
new TestingOutboxPollingInternalConfigurer().outboxEventFilter( eventFilter )
)
.withProperty( HibernateOrmMapperOutboxPollingSettings.COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY, 0 )
.withProperty( HibernateOrmMapperOutboxPollingSettings.COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX, 1 )
.withAnnotatedTypes( IndexedEntity.class, IndexedAndContainedEntity.class )
.setup();
}
Expand Down Expand Up @@ -184,6 +198,41 @@ void addIndexedAndContained_addAndUpdateEventsProcessedInDifferentBatches() {
backendMock.verifyExpectationsMet();
}

@Test
void lockedEventRowRetries() {
assumeTrue(
sessionFactory.unwrap( SessionFactoryImplementor.class ).getJdbcServices().getDialect().supportsSkipLocked(),
"This test only make sense if skip locked rows is supported by the underlying DB. " +
"Otherwise the locking will trow an exception and the batch will be just reprocessed without a retry of locking events." );

eventFilter.hideAllEvents();
with( sessionFactory ).runInTransaction( session -> {
session.persist( new IndexedEntity( 1, "initialValue" ) );
} );

backendMock.expectWorks( IndexedEntity.NAME )
.addOrUpdate( "1", b -> b
.field( "text", "initialValue" ) );

with( sessionFactory ).runInTransaction( session -> {
List<OutboxEvent> events = eventFilter.findOutboxEventsNoFilter( session );
assertThat( events ).hasSize( 1 );
session.lock( events.get( 0 ), LockMode.PESSIMISTIC_WRITE );

// let processor see the events and as that single event is locked it should skip, and reach the max retries:
eventFilter.showAllEvents();
logged.expectMessage( "after 1 retries, failed to acquire a lock on the following outbox events" )
.atLeast( 5 );

Awaitility.await().timeout( 5, TimeUnit.SECONDS )
.untilAsserted( () -> logged.expectationsMet() );
} );

with( sessionFactory ).runInTransaction( session -> {
eventFilter.awaitUntilNoMoreVisibleEvents( sessionFactory );
} );

}

@Entity(name = IndexedEntity.NAME)
@Indexed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ public void awaitUntilNoMoreVisibleEvents(SessionFactory sessionFactory) {
} ) );
}

public void awaitUntilNumberOfVisibleEvents(SessionFactory sessionFactory, int count) {
await().untilAsserted( () -> with( sessionFactory ).runInTransaction( session -> {
List<OutboxEvent> outboxEntries = visibleEventsAllShardsFinder.findOutboxEvents( session, count + 1 );
assertThat( outboxEntries ).hasSize( count );
} ) );
}

private class FilterById implements OutboxEventPredicate {
@Override
public String queryPart(String eventAlias) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,41 @@ private HibernateOrmMapperOutboxPollingSettings() {
public static final String COORDINATION_EVENT_PROCESSOR_RETRY_DELAY =
PREFIX + Radicals.COORDINATION_EVENT_PROCESSOR_RETRY_DELAY;

/**
* How many times the event processor must try locking the outbox event database records for processing
* before leaving them to be processed in another batch.
* <p>
* Only available when {@value HibernateOrmMapperSettings#COORDINATION_STRATEGY} is
* {@value #COORDINATION_STRATEGY_NAME}.
* <p>
* Only applicable when the database supports skipping locked rows when locking.
* <p>
* Expects a positive integer value, such as {@code 10},
* or a String that can be parsed into such Integer value.
* <p>
* Defaults to {@link Defaults#COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX}.
*/
public static final String COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX =
PREFIX + Radicals.COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX;

/**
* How long the event processor must wait before retrying to lock the outbox event database records.
* <p>
* Only available when {@value HibernateOrmMapperSettings#COORDINATION_STRATEGY} is
* {@value #COORDINATION_STRATEGY_NAME}.
* <p>
* Only applicable when the database supports skipping locked rows when locking.
* <p>
* Expects a positive integer value in seconds, such as {@code 5},
* or a String that can be parsed into such Integer value.
* <p>
* Use the value {@code 0} to retry locking events as soon as possible, with no delay.
* <p>
* Defaults to {@link Defaults#COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY}.
*/
public static final String COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY =
PREFIX + Radicals.COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY;

/**
* In the mass indexer, how long to wait for another query to the agent table
* when actively waiting for event processors to suspend themselves, in milliseconds.
Expand Down Expand Up @@ -516,6 +551,10 @@ private Radicals() {
COORDINATION_PREFIX + CoordinationRadicals.EVENT_PROCESSOR_TRANSACTION_TIMEOUT;
public static final String COORDINATION_EVENT_PROCESSOR_RETRY_DELAY =
COORDINATION_PREFIX + CoordinationRadicals.EVENT_PROCESSOR_RETRY_DELAY;
public static final String COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX =
COORDINATION_PREFIX + CoordinationRadicals.EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX;
public static final String COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY =
COORDINATION_PREFIX + CoordinationRadicals.EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY;
public static final String COORDINATION_MASS_INDEXER_POLLING_INTERVAL =
COORDINATION_PREFIX + CoordinationRadicals.MASS_INDEXER_POLLING_INTERVAL;
public static final String COORDINATION_MASS_INDEXER_PULSE_INTERVAL =
Expand Down Expand Up @@ -564,6 +603,8 @@ private CoordinationRadicals() {
public static final String EVENT_PROCESSOR_BATCH_SIZE = EVENT_PROCESSOR_PREFIX + "batch_size";
public static final String EVENT_PROCESSOR_TRANSACTION_TIMEOUT = EVENT_PROCESSOR_PREFIX + "transaction_timeout";
public static final String EVENT_PROCESSOR_RETRY_DELAY = EVENT_PROCESSOR_PREFIX + "retry_delay";
public static final String EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX = EVENT_PROCESSOR_PREFIX + "event_lock_retry_max";
public static final String EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY = EVENT_PROCESSOR_PREFIX + "event_lock_retry_delay";
public static final String MASS_INDEXER_PREFIX = "mass_indexer.";
public static final String MASS_INDEXER_POLLING_INTERVAL = MASS_INDEXER_PREFIX + "polling_interval";
public static final String MASS_INDEXER_PULSE_INTERVAL = MASS_INDEXER_PREFIX + "pulse_interval";
Expand Down Expand Up @@ -600,6 +641,8 @@ private Defaults() {
public static final OutboxEventProcessingOrder COORDINATION_EVENT_PROCESSOR_ORDER = OutboxEventProcessingOrder.AUTO;
public static final int COORDINATION_EVENT_PROCESSOR_BATCH_SIZE = 50;
public static final int COORDINATION_EVENT_PROCESSOR_RETRY_DELAY = 30;
public static final int COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX = 20;
public static final int COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY = 2;
public static final int COORDINATION_MASS_INDEXER_POLLING_INTERVAL = 100;
public static final int COORDINATION_MASS_INDEXER_PULSE_INTERVAL = 2000;
public static final int COORDINATION_MASS_INDEXER_PULSE_EXPIRATION = 30000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -48,6 +49,10 @@ public boolean thereAreStillEventsToProcess() {
return !eventsIds.isEmpty();
}

public Set<UUID> eventsToProcess() {
return Collections.unmodifiableSet( eventsIds );
}

public void process() {
List<OutboxEvent> lockedEvents = loader.loadLocking( session, eventsIds, processorName );
List<OutboxEvent> eventToDelete = new ArrayList<>( lockedEvents );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,22 @@ public static String namePrefix(String tenantId) {
.withDefault( HibernateOrmMapperOutboxPollingSettings.Defaults.COORDINATION_EVENT_PROCESSOR_RETRY_DELAY )
.build();

private static final ConfigurationProperty<Integer> EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX =
ConfigurationProperty
.forKey( HibernateOrmMapperOutboxPollingSettings.CoordinationRadicals.EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX )
.asIntegerPositiveOrZero()
.withDefault(
HibernateOrmMapperOutboxPollingSettings.Defaults.COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX )
.build();

private static final ConfigurationProperty<Integer> EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY =
ConfigurationProperty
.forKey( HibernateOrmMapperOutboxPollingSettings.CoordinationRadicals.EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY )
.asIntegerPositiveOrZero()
.withDefault(
HibernateOrmMapperOutboxPollingSettings.Defaults.COORDINATION_EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY )
.build();

public static Factory factory(AutomaticIndexingMappingContext mapping, Clock clock, String tenantId,
ConfigurationPropertySource configurationSource) {
OutboxEventLoader loader = new OutboxEventLoader( mapping.sessionFactory().getJdbcServices().getDialect() );
Expand All @@ -102,8 +118,12 @@ public static Factory factory(AutomaticIndexingMappingContext mapping, Clock clo
Integer transactionTimeout = TRANSACTION_TIMEOUT.get( configurationSource )
.orElse( null );

int lockEventsMaxRetry = EVENT_PROCESSOR_EVENT_LOCK_RETRY_MAX.get( configurationSource );
long lockEventsInterval =
Duration.ofSeconds( EVENT_PROCESSOR_EVENT_LOCK_RETRY_DELAY.get( configurationSource ) ).toMillis();

return new Factory( mapping, clock, tenantId, loader, pollingInterval, pulseInterval, pulseExpiration,
batchSize, retryDelay, transactionTimeout );
batchSize, retryDelay, transactionTimeout, lockEventsMaxRetry, lockEventsInterval );
}

public static class Factory {
Expand All @@ -117,10 +137,13 @@ public static class Factory {
private final int batchSize;
private final int retryDelay;
private final Integer transactionTimeout;
private final int lockEventsMaxRetry;
private final long lockEventsInterval;

private Factory(AutomaticIndexingMappingContext mapping, Clock clock, String tenantId,
OutboxEventLoader loader, Duration pollingInterval, Duration pulseInterval, Duration pulseExpiration,
int batchSize, int retryDelay, Integer transactionTimeout) {
int batchSize, int retryDelay, Integer transactionTimeout,
int lockEventsMaxRetry, long lockEventsInterval) {
this.mapping = mapping;
this.clock = clock;
this.tenantId = tenantId;
Expand All @@ -131,6 +154,8 @@ private Factory(AutomaticIndexingMappingContext mapping, Clock clock, String ten
this.batchSize = batchSize;
this.retryDelay = retryDelay;
this.transactionTimeout = transactionTimeout;
this.lockEventsMaxRetry = lockEventsMaxRetry;
this.lockEventsInterval = lockEventsInterval;
}

public OutboxPollingEventProcessor create(ScheduledExecutorService scheduledExecutor,
Expand Down Expand Up @@ -159,6 +184,8 @@ private enum Status {
private final long pollingInterval;
private final int batchSize;
private final int retryDelay;
private final int lockEventsMaxRetry;
private final long lockEventsInterval;

private final AtomicReference<Status> status = new AtomicReference<>( Status.STOPPED );
private final OutboxPollingEventProcessorClusterLink clusterLink;
Expand All @@ -180,6 +207,8 @@ public OutboxPollingEventProcessor(String name, Factory factory,
this.pollingInterval = factory.pollingInterval.toMillis();
this.batchSize = factory.batchSize;
this.retryDelay = factory.retryDelay;
this.lockEventsMaxRetry = factory.lockEventsMaxRetry;
this.lockEventsInterval = factory.lockEventsInterval;
this.clusterLink = clusterLink;

transactionHelper = new TransactionHelper( mapping.sessionFactory(), factory.transactionTimeout );
Expand Down Expand Up @@ -317,8 +346,28 @@ public CompletableFuture<?> work() {
// it locked a page instead of just a row.
// For more information, see
// org.hibernate.search.mapper.orm.outboxpolling.impl.OutboxEventLoader.tryLoadLocking
while ( eventUpdater.thereAreStillEventsToProcess() ) {
int retryCount = 0;
while ( true ) {
if ( retryCount > lockEventsMaxRetry ) {
OutboxPollingEventsLog.INSTANCE.eventLockingRetryLimitReached( clusterLink.selfReference(),
lockEventsMaxRetry, eventUpdater.eventsToProcess() );
// not failing to not produce an error log:
return CompletableFuture.completedFuture( null );
}
transactionHelper.inTransaction( session, eventUpdater::process );
if ( eventUpdater.thereAreStillEventsToProcess() ) {
try {
Thread.sleep( lockEventsInterval );
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
return CompletableFuture.failedFuture( e );
}
retryCount++;
}
else {
break;
}
}

return CompletableFuture.completedFuture( null );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,10 @@ void clusterMembersAreInClusterReachedExpectedStates(AgentReference agentReferen
@LogMessage(level = TRACE)
@Message(id = ID_OFFSET + 67, value = "Persisted %d outbox events: '%s'")
void eventPlanNumberOfPersistedEvents(int size, List<OutboxEvent> events);

@LogMessage(level = WARN)
@Message(id = ID_OFFSET + 70,
value = "Agent '%s': after %d retries, failed to acquire a lock on the following outbox events: %s. " +
"Events will be re-processed at a later time.")
void eventLockingRetryLimitReached(AgentReference agentReference, int numberOfRetries, Set<UUID> events);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ public interface OutboxPollingLog extends ConfigurationLog, DeprecationLog, Outb
* here to the next value.
*/
@LogMessage(level = TRACE)
@Message(id = ID_OFFSET + 70, value = "")
@Message(id = ID_OFFSET + 71, value = "")
void nextLoggerIdForConvenience();
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.hibernate.search.util.impl.test.extension;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

import java.util.ArrayList;
Expand Down Expand Up @@ -234,4 +235,8 @@ private static String buildFailureMessage(Set<LogChecker> failingCheckers) {
}
return description.toString();
}

public void expectationsMet() {
assertThat( currentAppender.getFailingCheckers() ).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
public class LogExpectation {

private final Matcher<?> matcher;
private Integer expectedCount;
private Integer expectedMinCount;
private Integer expectedMaxCount;

public LogExpectation(Matcher<?> matcher) {
this.matcher = matcher;
Expand All @@ -24,10 +25,16 @@ public void once() {
}

public void times(int expectedCount) {
if ( this.expectedCount != null ) {
if ( this.expectedMinCount != null || this.expectedMaxCount != null ) {
throw new IllegalStateException( "Can only set log expectations once" );
}
this.expectedCount = expectedCount;
this.expectedMinCount = expectedCount;
this.expectedMaxCount = expectedCount;
}

public void atLeast(int expectedCount) {
this.expectedMaxCount = Integer.MAX_VALUE;
this.expectedMinCount = expectedCount;
}

public LogChecker createChecker() {
Expand All @@ -39,10 +46,10 @@ Matcher<?> getMatcher() {
}

int getMinExpectedCount() {
return expectedCount == null ? 1 : expectedCount;
return expectedMinCount == null ? 1 : expectedMinCount;
}

Integer getMaxExpectedCount() {
return expectedCount;
return expectedMaxCount;
}
}