Skip to content

Commit

Permalink
GH-2568: Get rid of synchronized consumersMonitor
Browse files Browse the repository at this point in the history
Fixes: #2568

Rework all the `synchronized (this.consumersMonitor)` in the `AbstractMessageListenerContainer`
hierarchy for the `consumersLock.lock()/unlock()`
  • Loading branch information
artembilan committed Dec 15, 2023
1 parent ed2cddb commit c3672dd
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;

import org.aopalliance.aop.Advice;
Expand Down Expand Up @@ -131,7 +133,7 @@ public abstract class AbstractMessageListenerContainer extends ObservableListene

private final ContainerDelegate delegate = this::actualInvokeListener;

protected final Object consumersMonitor = new Object(); //NOSONAR
protected final Lock consumersLock = new ReentrantLock(); //NOSONAR

private final Map<String, Object> consumerArgs = new HashMap<>();

Expand Down Expand Up @@ -253,6 +255,7 @@ public void setApplicationEventPublisher(ApplicationEventPublisher applicationEv
this.applicationEventPublisher = applicationEventPublisher;
}

@Nullable
protected ApplicationEventPublisher getApplicationEventPublisher() {
return this.applicationEventPublisher;
}
Expand Down Expand Up @@ -686,10 +689,14 @@ protected ConsumerTagStrategy getConsumerTagStrategy() {
* @since 1.3
*/
public void setConsumerArguments(Map<String, Object> args) {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
this.consumerArgs.clear();
this.consumerArgs.putAll(args);
}
finally {
this.consumersLock.unlock();
}
}

/**
Expand All @@ -698,9 +705,13 @@ public void setConsumerArguments(Map<String, Object> args) {
* @since 2.0
*/
public Map<String, Object> getConsumerArguments() {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
return new HashMap<>(this.consumerArgs);
}
finally {
this.consumersLock.unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public void setLocallyTransacted(boolean locallyTransacted) {
this.locallyTransacted = locallyTransacted;
}

public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
public void setApplicationEventPublisher(@Nullable ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.springframework.amqp.rabbit.listener.support.ContainerUtils;
import org.springframework.amqp.rabbit.support.ActiveObjectCounter;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.lang.Nullable;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
Expand Down Expand Up @@ -279,7 +280,8 @@ public void addQueues(Queue... queues) {

private void addQueues(Stream<String> queueNameStream) {
if (isRunning()) {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
checkStartState();
Set<String> current = getQueueNamesAsSet();
queueNameStream.forEach(queue -> {
Expand All @@ -292,6 +294,9 @@ private void addQueues(Stream<String> queueNameStream) {
}
});
}
finally {
this.consumersLock.unlock();
}
}
}

Expand All @@ -309,7 +314,8 @@ public boolean removeQueues(Queue... queues) {

private void removeQueues(Stream<String> queueNames) {
if (isRunning()) {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
checkStartState();
queueNames.map(queue -> {
this.removedQueues.add(queue);
Expand All @@ -319,11 +325,15 @@ private void removeQueues(Stream<String> queueNames) {
.flatMap(Collection::stream)
.forEach(this::cancelConsumer);
}
finally {
this.consumersLock.unlock();
}
}
}

private void adjustConsumers(int newCount) {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
checkStartState();
this.consumersToRestart.clear();
for (String queue : getQueueNames()) {
Expand All @@ -334,9 +344,9 @@ private void adjustConsumers(int newCount) {
if (cBQ != null) {
// find a gap or set the index to the end
List<Integer> indices = cBQ.stream()
.map(cons -> cons.getIndex())
.map(SimpleConsumer::getIndex)
.sorted()
.collect(Collectors.toList());
.toList();
for (index = 0; index < indices.size(); index++) {
if (index < indices.get(index)) {
break;
Expand All @@ -348,6 +358,9 @@ private void adjustConsumers(int newCount) {
reduceConsumersIfIdle(newCount, queue);
}
}
finally {
this.consumersLock.unlock();
}
}

private void reduceConsumersIfIdle(int newCount, String queue) {
Expand All @@ -367,9 +380,8 @@ private void reduceConsumersIfIdle(int newCount, String queue) {
}

/**
* When adjusting down, return a consumer that can be canceled. Called while
* synchronized on consumersMonitor.
* @return the consumer index or -1 if non idle.
* When adjusting down, return a consumer that can be canceled. Called while locked on {@link #consumersLock}.
* @return the consumer index or -1 if non-idle.
* @since 2.0.6
*/
protected int findIdleConsumer() {
Expand Down Expand Up @@ -482,11 +494,12 @@ private void startMonitor(long idleEventInterval, final Map<String, Queue> names
checkIdle(idleEventInterval, now);
checkConsumers(now);
if (this.lastRestartAttempt + getFailedDeclarationRetryInterval() < now) {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
if (this.started) {
List<SimpleConsumer> restartableConsumers = new ArrayList<>(this.consumersToRestart);
this.consumersToRestart.clear();
if (restartableConsumers.size() > 0) {
if (!restartableConsumers.isEmpty()) {
doRedeclareElementsIfNecessary();
}
Iterator<SimpleConsumer> iterator = restartableConsumers.iterator();
Expand All @@ -509,6 +522,9 @@ private void startMonitor(long idleEventInterval, final Map<String, Queue> names
this.lastRestartAttempt = now;
}
}
finally {
this.consumersLock.unlock();
}
}
processMonitorTask();
}, Duration.ofMillis(this.monitorInterval));
Expand All @@ -524,7 +540,8 @@ private void checkIdle(long idleEventInterval, long now) {

private void checkConsumers(long now) {
final List<SimpleConsumer> consumersToCancel;
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
consumersToCancel = this.consumers.stream()
.filter(consumer -> {
boolean open = consumer.getChannel().isOpen() && !consumer.isAckFailed()
Expand All @@ -541,6 +558,9 @@ private void checkConsumers(long now) {
})
.collect(Collectors.toList());
}
finally {
this.consumersLock.unlock();
}
consumersToCancel
.forEach(consumer -> {
try {
Expand Down Expand Up @@ -591,7 +611,8 @@ private boolean restartConsumer(final Map<String, Queue> namesToQueues, List<Sim
}

private void startConsumers(final String[] queueNames) {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
if (this.hasStopped) { // container stopped before we got the lock
if (this.logger.isDebugEnabled()) {
this.logger.debug("Consumer start aborted - container stopping");
Expand Down Expand Up @@ -631,6 +652,9 @@ private void startConsumers(final String[] queueNames) {
}
}
}
finally {
this.consumersLock.unlock();
}
}

protected void doRedeclareElementsIfNecessary() {
Expand Down Expand Up @@ -731,19 +755,24 @@ private void doConsumeFromQueue(String queue, int index) {
}
}
SimpleConsumer consumer = consume(queue, index, connection);
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
if (consumer != null) {
this.cancellationLock.add(consumer);
this.consumers.add(consumer);
this.consumersByQueue.add(queue, consumer);
if (this.logger.isInfoEnabled()) {
this.logger.info(consumer + " started");
}
if (getApplicationEventPublisher() != null) {
getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
if (applicationEventPublisher != null) {
applicationEventPublisher.publishEvent(new AsyncConsumerStartedEvent(this, consumer));
}
}
}
finally {
this.consumersLock.unlock();
}
}

@Nullable
Expand Down Expand Up @@ -814,7 +843,8 @@ else if (this.logger.isWarnEnabled()) {
protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
LinkedList<SimpleConsumer> canceledConsumers = null;
boolean waitForConsumers = false;
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
if (this.started || this.aborted) {
// Copy in the same order to avoid ConcurrentModificationException during remove in the
// cancelConsumer().
Expand All @@ -823,6 +853,9 @@ protected void shutdownAndWaitOrCallback(@Nullable Runnable callback) {
waitForConsumers = true;
}
}
finally {
this.consumersLock.unlock();
}
if (waitForConsumers) {
LinkedList<SimpleConsumer> consumersToWait = canceledConsumers;
Runnable awaitShutdown = () -> {
Expand Down Expand Up @@ -872,7 +905,7 @@ private void runCallbackIfNotNull(@Nullable Runnable callback) {
}

/**
* Must hold this.consumersMonitor.
* Must hold this.consumersLock.
* @param consumers a copy of this.consumers.
*/
private void actualShutDown(List<SimpleConsumer> consumers) {
Expand Down Expand Up @@ -1005,7 +1038,7 @@ public String getConsumerTag() {
}

/**
* Return the current epoch for this consumer; consumersMonitor must be held.
* Return the current epoch for this consumer; consumersLock must be held.
* @return the epoch.
*/
int getEpoch() {
Expand Down Expand Up @@ -1039,7 +1072,7 @@ boolean targetChanged() {
}

/**
* Increment and return the current epoch for this consumer; consumersMonitor must
* Increment and return the current epoch for this consumer; consumersLock must
* be held.
* @return the epoch.
*/
Expand Down Expand Up @@ -1323,14 +1356,18 @@ public void handleCancel(String consumerTag) {

void cancelConsumer(final String eventMessage) {
publishConsumerFailedEvent(eventMessage, true, null);
synchronized (DirectMessageListenerContainer.this.consumersMonitor) {
DirectMessageListenerContainer.this.consumersLock.lock();
try {
List<SimpleConsumer> list = DirectMessageListenerContainer.this.consumersByQueue.get(this.queue);
if (list != null) {
list.remove(this);
}
DirectMessageListenerContainer.this.consumers.remove(this);
addConsumerToRestart(this);
}
finally {
DirectMessageListenerContainer.this.consumersLock.unlock();
}
finalizeConsumer();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2019 the original author or authors.
* Copyright 2016-2023 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -118,7 +118,8 @@ protected void doStart() {
@Override
protected void processMonitorTask() {
long now = System.currentTimeMillis();
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
long reduce = this.consumers.stream()
.filter(c -> this.whenUsed.containsKey(c) && !this.inUseConsumerChannels.containsValue(c)
&& this.whenUsed.get(c) < now - getIdleEventInterval())
Expand All @@ -131,6 +132,9 @@ protected void processMonitorTask() {
super.setConsumersPerQueue(this.consumerCount);
}
}
finally {
this.consumersLock.unlock();
}
}

@Override
Expand All @@ -155,7 +159,8 @@ protected void consumerRemoved(SimpleConsumer consumer) {
* @return the channel holder.
*/
public ChannelHolder getChannelHolder() {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
ChannelHolder channelHolder = null;
while (channelHolder == null) {
if (!isRunning()) {
Expand All @@ -177,6 +182,9 @@ public ChannelHolder getChannelHolder() {
}
return channelHolder;
}
finally {
this.consumersLock.unlock();
}
}

/**
Expand All @@ -188,7 +196,8 @@ public ChannelHolder getChannelHolder() {
* @param message a message to be included in the cancel event if cancelConsumer is true.
*/
public void releaseConsumerFor(ChannelHolder channelHolder, boolean cancelConsumer, @Nullable String message) {
synchronized (this.consumersMonitor) {
this.consumersLock.lock();
try {
SimpleConsumer consumer = this.inUseConsumerChannels.get(channelHolder.getChannel());
if (consumer != null && consumer.getEpoch() == channelHolder.getConsumerEpoch()) {
this.inUseConsumerChannels.remove(channelHolder.getChannel());
Expand All @@ -198,6 +207,9 @@ public void releaseConsumerFor(ChannelHolder channelHolder, boolean cancelConsum
}
}
}
finally {
this.consumersLock.unlock();
}
}

/**
Expand Down
Loading

0 comments on commit c3672dd

Please sign in to comment.