From dd83235aa142ea17414f6d6cecb507f2c2d7bded Mon Sep 17 00:00:00 2001 From: NaccOll Date: Sun, 24 Nov 2024 02:58:23 +0800 Subject: [PATCH 1/4] fix(RedisMessageStore): RedisMessageStore add lock I found that when RedisMessageStore adds and removes messages, it operates on two keys separately, which may cause problems in multi-threading due to non-atomic operations. Although using redis to delay messages is not a good idea, the abnormal loss of messages in the logs alerted me when the number of requests was not large. By comparing the logs, I found the problem that the message group representing the metadata is not consistent with the actual message. A simple solution is to add lock like SimpleMessageStore, which is also the approach taken in this pull request. This will bring some performance loss, and I am not sure whether a configable switch is needed. --- .../redis/store/RedisMessageStore.java | 105 ++++++++++++++++++ .../store/RedisMessageGroupStoreTests.java | 9 ++ 2 files changed, 114 insertions(+) diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java index f5b05ae5728..c78464082f0 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java @@ -17,6 +17,9 @@ package org.springframework.integration.redis.store; import java.util.Collection; +import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -27,6 +30,11 @@ import org.springframework.data.redis.serializer.SerializationException; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.integration.store.AbstractKeyValueMessageStore; +import org.springframework.integration.store.MessageGroup; +import org.springframework.integration.support.locks.DefaultLockRegistry; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; import org.springframework.util.Assert; /** @@ -37,6 +45,7 @@ * @author Oleg Zhurakousky * @author Gary Russell * @author Artem Bilan + * @author Youbin Wu * * @since 2.1 */ @@ -44,12 +53,16 @@ public class RedisMessageStore extends AbstractKeyValueMessageStore implements B private static final String ID_MUST_NOT_BE_NULL = "'id' must not be null"; + private static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock"; + private final RedisTemplate redisTemplate; private boolean valueSerializerSet; private volatile boolean unlinkAvailable = true; + private LockRegistry lockRegistry; + /** * Construct {@link RedisMessageStore} based on the provided * {@link RedisConnectionFactory} and default empty prefix. @@ -69,12 +82,27 @@ public RedisMessageStore(RedisConnectionFactory connectionFactory) { * @see AbstractKeyValueMessageStore#AbstractKeyValueMessageStore(String) */ public RedisMessageStore(RedisConnectionFactory connectionFactory, String prefix) { + this(connectionFactory, prefix, new DefaultLockRegistry()); + } + + /** + * Construct {@link RedisMessageStore} based on the provided + * {@link RedisConnectionFactory} and prefix. + * @param connectionFactory the RedisConnectionFactory to use + * @param prefix the key prefix to use, allowing the same broker to be used for + * multiple stores. + * @param lockRegistry the {@link LockRegistry} to use. + * @since 6.4.1 + * @see AbstractKeyValueMessageStore#AbstractKeyValueMessageStore(String) + */ + public RedisMessageStore(RedisConnectionFactory connectionFactory, String prefix, LockRegistry lockRegistry) { super(prefix); this.redisTemplate = new RedisTemplate<>(); this.redisTemplate.setConnectionFactory(connectionFactory); this.redisTemplate.setKeySerializer(new StringRedisSerializer()); this.redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer()); this.redisTemplate.afterPropertiesSet(); + this.lockRegistry = lockRegistry; } @Override @@ -183,6 +211,83 @@ protected Collection doListKeys(String keyPattern) { return this.redisTemplate.keys(keyPattern); } + @Override + protected MessageGroup copy(MessageGroup group) { + return lockExecute(group.getGroupId(), () -> super.copy(group)); + } + + @Override + public void addMessagesToGroup(Object groupId, Message... messages) { + Assert.notNull(groupId, "'groupId' must not be null"); + + lockExecute(groupId, () -> { + super.addMessagesToGroup(groupId, messages); + return null; + }); + } + + @Override + public void removeMessageGroup(Object groupId) { + lockExecute(groupId, () -> { + super.removeMessageGroup(groupId); + return null; + }); + } + + @Override + public void removeMessagesFromGroup(Object groupId, Collection> messages) { + lockExecute(groupId, () -> { + super.removeMessagesFromGroup(groupId, messages); + return null; + }); + } + + @Override + public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + return lockExecute(groupId, () -> super.removeMessageFromGroupById(groupId, messageId)); + } + + @Override + public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + lockExecute(groupId, () -> { + super.setLastReleasedSequenceNumberForGroup(groupId, sequenceNumber); + return null; + }); + } + + @Override + public void completeGroup(Object groupId) { + lockExecute(groupId, () -> { + super.completeGroup(groupId); + return null; + }); + } + + @Override + public void setGroupCondition(Object groupId, String condition) { + lockExecute(groupId, () -> { + super.setGroupCondition(groupId, condition); + return null; + }); + } + + public T lockExecute(Object groupId, Supplier supplier) { + Lock lock = this.lockRegistry.obtain(groupId); + try { + lock.lockInterruptibly(); + try { + return supplier.get(); + } + finally { + lock.unlock(); + } + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + } + } + private void rethrowAsIllegalArgumentException(SerializationException e) { throw new IllegalArgumentException("If relying on the default RedisSerializer " + "(JdkSerializationRedisSerializer) the Object must be Serializable. " + diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java index 68bd2d112ce..e9b76d80dff 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java @@ -493,6 +493,15 @@ public void removeMessagesFromGroupDontRemoveSameMessageInOtherGroup() { assertThat(store.messageGroupSize("2")).isEqualTo(1); } + @Test + public void testMessageGroupCondition() { + String groupId = "X"; + Message message = MessageBuilder.withPayload("foo").build(); + store.addMessagesToGroup(groupId, message); + store.setGroupCondition(groupId, "testCondition"); + assertThat(store.getMessageGroup(groupId).getCondition()).isEqualTo("testCondition"); + } + private record Foo(String foo) { } From af5a737b3074a458e749b3f17b65d644142c3306 Mon Sep 17 00:00:00 2001 From: NaccOll Date: Mon, 9 Dec 2024 11:19:57 +0800 Subject: [PATCH 2/4] AbstractMessageGroupStore add lock when operate group metadata --- .../store/AbstractKeyValueMessageStore.java | 19 +- .../store/AbstractMessageGroupStore.java | 114 ++++++++- .../integration/store/SimpleMessageStore.java | 238 ++++++------------ ...bstractCorrelatingMessageHandlerTests.java | 5 +- .../integration/store/MessageStoreTests.java | 13 +- .../jdbc/store/JdbcMessageStore.java | 17 +- ...stractConfigurableMongoDbMessageStore.java | 13 +- .../ConfigurableMongoDbMessageStore.java | 15 +- .../store/MongoDbChannelMessageStore.java | 5 +- .../mongodb/store/MongoDbMessageStore.java | 17 +- .../redis/store/RedisMessageStore.java | 104 -------- 11 files changed, 244 insertions(+), 316 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java index b74be6ca439..5b738a4bb6c 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java @@ -38,13 +38,12 @@ * @author Gary Russell * @author Artem Bilan * @author Ngoc Nhan + * @author Youbin Wu * * @since 2.1 */ public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupStore implements MessageStore { - private static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null"; - protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_"; protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_"; @@ -206,7 +205,7 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + public void addMessagesToGroupInner(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messages' must not be null"); @@ -240,7 +239,7 @@ public void addMessagesToGroup(Object groupId, Message... messages) { } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { + public void removeMessagesFromGroupInner(Object groupId, Collection> messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messages' must not be null"); @@ -283,7 +282,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + public boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messageId, "'messageId' must not be null"); Object mgm = doRetrieve(this.groupPrefix + groupId); @@ -305,7 +304,7 @@ public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { } @Override - public void completeGroup(Object groupId) { + public void completeGroupInner(Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); MessageGroupMetadata metadata = getGroupMetadata(groupId); if (metadata != null) { @@ -319,7 +318,7 @@ public void completeGroup(Object groupId) { * Remove the MessageGroup with the provided group ID. */ @Override - public void removeMessageGroup(Object groupId) { + public void removeMessageGroupInner(Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Object mgm = doRemove(this.groupPrefix + groupId); if (mgm != null) { @@ -337,7 +336,7 @@ public void removeMessageGroup(Object groupId) { } @Override - public void setGroupCondition(Object groupId, String condition) { + public void setGroupConditionInner(Object groupId, String condition) { MessageGroupMetadata metadata = getGroupMetadata(groupId); if (metadata != null) { metadata.setCondition(condition); @@ -346,7 +345,7 @@ public void setGroupCondition(Object groupId, String condition) { } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + public void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); MessageGroupMetadata metadata = getGroupMetadata(groupId); if (metadata == null) { @@ -359,7 +358,7 @@ public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNu } @Override - public Message pollMessageFromGroup(Object groupId) { + public Message pollMessageFromGroupInner(Object groupId) { MessageGroupMetadata groupMetadata = getGroupMetadata(groupId); if (groupMetadata != null) { UUID firstId = groupMetadata.firstId(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java index 8496a51bf1a..d1d8f513c89 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,16 +19,23 @@ import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashSet; +import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.integration.support.locks.DefaultLockRegistry; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.integration.util.CheckedCallable; +import org.springframework.integration.util.CheckedRunnable; import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.util.Assert; /** * @author Dave Syer @@ -36,6 +43,7 @@ * @author Gary Russell * @author Artem Bilan * @author Christian Tzolov + * @author Youbin Wu * * @since 2.0 */ @@ -43,6 +51,10 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageGroupStore implements MessageGroupStore, Iterable { + protected static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock"; + + protected static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null"; + protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final private final Lock lock = new ReentrantLock(); @@ -56,11 +68,15 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG private boolean timeoutOnIdle; + protected LockRegistry lockRegistry; + protected AbstractMessageGroupStore() { + this.lockRegistry = new DefaultLockRegistry(); } protected AbstractMessageGroupStore(boolean lazyLoadMessageGroups) { this.lazyLoadMessageGroups = lazyLoadMessageGroups; + this.lockRegistry = new DefaultLockRegistry(); } @Override @@ -109,6 +125,16 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) { this.lazyLoadMessageGroups = lazyLoadMessageGroups; } + /** + * Specify the type of the {@link LockRegistry} to ensure atomic operations + * @param lockRegistry lockRegistryType + * @since 6.5 + */ + public void setLockRegistry(LockRegistry lockRegistry) { + Assert.notNull(lockRegistry, "The LockRegistry cannot be null"); + this.lockRegistry = lockRegistry; + } + @Override public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) { if (callback instanceof UniqueExpiryCallback) { @@ -195,12 +221,98 @@ public void removeMessagesFromGroup(Object key, Message... messages) { removeMessagesFromGroup(key, Arrays.asList(messages)); } + @Override + public void removeMessagesFromGroup(Object key, Collection> messages) { + Assert.notNull(key, GROUP_ID_MUST_NOT_BE_NULL); + lockExecute(key, () -> removeMessagesFromGroupInner(key, messages)); + } + + protected abstract void removeMessagesFromGroupInner(Object key, Collection> messages); + + @Override + public void addMessagesToGroup(Object groupId, Message... messages) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + lockExecute(groupId, () -> addMessagesToGroupInner(groupId, messages)); + } + + protected abstract void addMessagesToGroupInner(Object groupId, Message... messages); + @Override public MessageGroup addMessageToGroup(Object groupId, Message message) { addMessagesToGroup(groupId, message); return getMessageGroup(groupId); } + @Override + public void removeMessageGroup(Object groupId) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + lockExecute(groupId, () -> removeMessageGroupInner(groupId)); + } + + protected abstract void removeMessageGroupInner(Object groupId); + + @Override + public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + return lockExecute(groupId, () -> removeMessageFromGroupByIdInner(groupId, messageId)); + } + + protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { + throw new UnsupportedOperationException("Not supported for this store"); + } + + @Override + public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + lockExecute(groupId, () -> setLastReleasedSequenceNumberForGroupInner(groupId, sequenceNumber)); + } + + protected abstract void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber); + + @Override + public void completeGroup(Object groupId) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + lockExecute(groupId, () -> completeGroupInner(groupId)); + } + + protected abstract void completeGroupInner(Object groupId); + + @Override + public void setGroupCondition(Object groupId, String condition) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + lockExecute(groupId, () -> setGroupConditionInner(groupId, condition)); + } + + protected abstract void setGroupConditionInner(Object groupId, String condition); + + @Override + public Message pollMessageFromGroup(Object groupId) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + return lockExecute(groupId, () -> pollMessageFromGroupInner(groupId)); + } + + protected abstract Message pollMessageFromGroupInner(Object groupId); + + protected T lockExecute(Object groupId, CheckedCallable runnable) { + try { + return this.lockRegistry.executeLocked(groupId, runnable); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + } + } + + protected void lockExecute(Object groupId, CheckedRunnable runnable) { + try { + this.lockRegistry.executeLocked(groupId, runnable); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + } + } + private void expire(MessageGroup group) { RuntimeException exception = null; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java index 5f6cd4be70a..7c298f8f401 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java @@ -45,6 +45,7 @@ * @author Gary Russell * @author Ryan Barker * @author Artem Bilan + * @author Youbin Wu * * @since 2.0 */ @@ -55,8 +56,6 @@ public class SimpleMessageStore extends AbstractMessageGroupStore private static final String UPPER_BOUND_MUST_NOT_BE_NULL = "'upperBound' must not be null."; - private static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock"; - private final ConcurrentMap> idToMessage = new ConcurrentHashMap<>(); private final ConcurrentMap groupIdToMessageGroup = new ConcurrentHashMap<>(); @@ -71,8 +70,6 @@ public class SimpleMessageStore extends AbstractMessageGroupStore private final long upperBoundTimeout; - private LockRegistry lockRegistry; - private boolean copyOnGet = false; private volatile boolean isUsed; @@ -162,6 +159,7 @@ public void setCopyOnGet(boolean copyOnGet) { this.copyOnGet = copyOnGet; } + @Override public void setLockRegistry(LockRegistry lockRegistry) { Assert.notNull(lockRegistry, "The LockRegistry cannot be null"); Assert.isTrue(!(this.isUsed), "Cannot change the lock registry after the store has been used"); @@ -268,56 +266,37 @@ protected MessageGroup copy(MessageGroup group) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void addMessagesToGroupInner(Object groupId, Message... messages) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messages, "'messages' must not be null"); - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - boolean unlocked = false; - try { - UpperBound upperBound; - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - if (group == null) { - if (this.groupCapacity > 0 && messages.length > this.groupCapacity) { - throw outOfCapacityException(groupId); - } - group = getMessageGroupFactory().create(groupId); - this.groupIdToMessageGroup.put(groupId, group); - upperBound = new UpperBound(this.groupCapacity); - for (Message message : messages) { - upperBound.tryAcquire(-1); - group.add(message); - } - this.groupToUpperBound.put(groupId, upperBound); - } - else { - upperBound = this.groupToUpperBound.get(groupId); - Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); - for (Message message : messages) { - lock.unlock(); - if (!upperBound.tryAcquire(this.upperBoundTimeout)) { - unlocked = true; - throw outOfCapacityException(groupId); - } - lock.lockInterruptibly(); - group.add(message); - } - } - - group.setLastModified(System.currentTimeMillis()); + UpperBound upperBound; + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + if (group == null) { + if (this.groupCapacity > 0 && messages.length > this.groupCapacity) { + throw outOfCapacityException(groupId); } - finally { - if (!unlocked) { - lock.unlock(); - } + group = getMessageGroupFactory().create(groupId); + this.groupIdToMessageGroup.put(groupId, group); + upperBound = new UpperBound(this.groupCapacity); + for (Message message : messages) { + upperBound.tryAcquire(-1); + group.add(message); } + this.groupToUpperBound.put(groupId, upperBound); } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + else { + upperBound = this.groupToUpperBound.get(groupId); + Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); + for (Message message : messages) { + if (!upperBound.tryAcquire(this.upperBoundTimeout)) { + throw outOfCapacityException(groupId); + } + group.add(message); + } } + + group.setLastModified(System.currentTimeMillis()); } private MessagingException outOfCapacityException(Object groupId) { @@ -327,58 +306,32 @@ private MessagingException outOfCapacityException(Object groupId) { } @Override - public void removeMessageGroup(Object groupId) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup messageGroup = this.groupIdToMessageGroup.remove(groupId); - if (messageGroup != null) { - UpperBound upperBound = this.groupToUpperBound.remove(groupId); - Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); - upperBound.release(this.groupCapacity); - } - } - finally { - lock.unlock(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + protected void removeMessageGroupInner(Object groupId) { + MessageGroup messageGroup = this.groupIdToMessageGroup.remove(groupId); + if (messageGroup != null) { + UpperBound upperBound = this.groupToUpperBound.remove(groupId); + Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); + upperBound.release(this.groupCapacity); } } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - Assert.notNull(group, - () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + - "can not be located while attempting to remove Message(s) from the MessageGroup"); - UpperBound upperBound = this.groupToUpperBound.get(groupId); - Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); - boolean modified = false; - for (Message messageToRemove : messages) { - if (group.remove(messageToRemove)) { - upperBound.release(); - modified = true; - } - } - if (modified) { - group.setLastModified(System.currentTimeMillis()); - } - } - finally { - lock.unlock(); + protected void removeMessagesFromGroupInner(Object groupId, Collection> messages) { + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + Assert.notNull(group, + () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + + "can not be located while attempting to remove Message(s) from the MessageGroup"); + UpperBound upperBound = this.groupToUpperBound.get(groupId); + Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); + boolean modified = false; + for (Message messageToRemove : messages) { + if (group.remove(messageToRemove)) { + upperBound.release(); + modified = true; } } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + if (modified) { + group.setLastModified(System.currentTimeMillis()); } } @@ -397,35 +350,22 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - Assert.notNull(group, - () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + - "can not be located while attempting to remove Message from the MessageGroup"); - UpperBound upperBound = this.groupToUpperBound.get(groupId); - Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); - for (Message message : group.getMessages()) { - if (messageId.equals(message.getHeaders().getId())) { - group.remove(message); - upperBound.release(); - group.setLastModified(System.currentTimeMillis()); - return true; - } - } - return false; - } - finally { - lock.unlock(); + protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + Assert.notNull(group, + () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + + "can not be located while attempting to remove Message from the MessageGroup"); + UpperBound upperBound = this.groupToUpperBound.get(groupId); + Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); + for (Message message : group.getMessages()) { + if (messageId.equals(message.getHeaders().getId())) { + group.remove(message); + upperBound.release(); + group.setLastModified(System.currentTimeMillis()); + return true; } } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, ex); - } + return false; } @Override @@ -434,7 +374,7 @@ public Iterator iterator() { } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void setGroupConditionInner(Object groupId, String condition) { MessageGroup group = this.groupIdToMessageGroup.get(groupId); if (group != null) { group.setCondition(condition); @@ -442,53 +382,27 @@ public void setGroupCondition(Object groupId, String condition) { } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - Assert.notNull(group, - () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + - "can not be located while attempting to set 'lastReleasedSequenceNumber'"); - group.setLastReleasedMessageSequenceNumber(sequenceNumber); - group.setLastModified(System.currentTimeMillis()); - } - finally { - lock.unlock(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); - } + protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + Assert.notNull(group, + () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + + "can not be located while attempting to set 'lastReleasedSequenceNumber'"); + group.setLastReleasedMessageSequenceNumber(sequenceNumber); + group.setLastModified(System.currentTimeMillis()); } @Override - public void completeGroup(Object groupId) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - Assert.notNull(group, - () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + - "can not be located while attempting to complete the MessageGroup"); - group.complete(); - group.setLastModified(System.currentTimeMillis()); - } - finally { - lock.unlock(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); - } + protected void completeGroupInner(Object groupId) { + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + Assert.notNull(group, + () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + + "can not be located while attempting to complete the MessageGroup"); + group.complete(); + group.setLastModified(System.currentTimeMillis()); } @Override - public Message pollMessageFromGroup(Object groupId) { + protected Message pollMessageFromGroupInner(Object groupId) { Collection> messageList = getMessageGroup(groupId).getMessages(); Message message = null; if (!CollectionUtils.isEmpty(messageList)) { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java index 757ed23a328..c0fa3ec3e26 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,7 @@ * @author Gary Russell * @author Artem Bilan * @author Meherzad Lahewala + * @author Youbin Wu * * @since 2.2 * @@ -350,7 +351,7 @@ public void testInt3483DeadlockOnMessageStoreRemoveMessageGroup() throws Interru SimpleMessageStore messageStore = new SimpleMessageStore() { @Override - public void removeMessageGroup(Object groupId) { + public void removeMessageGroupInner(Object groupId) { throw new RuntimeException("intentional"); } }; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java index 988de554510..96b1a54172a 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java @@ -35,6 +35,7 @@ * @author Dave Syer * @author Gary Russell * @author Artem Bilan + * @author Youbin Wu */ public class MessageStoreTests { @@ -91,7 +92,7 @@ public Iterator iterator() { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void addMessagesToGroupInner(Object groupId, Message... messages) { throw new UnsupportedOperationException(); } @@ -101,30 +102,30 @@ public MessageGroup getMessageGroup(Object correlationKey) { } @Override - public void removeMessagesFromGroup(Object key, Collection> messages) { + protected void removeMessagesFromGroupInner(Object key, Collection> messages) { throw new UnsupportedOperationException(); } @Override - public void removeMessageGroup(Object correlationKey) { + protected void removeMessageGroupInner(Object correlationKey) { if (correlationKey.equals(testMessages.getGroupId())) { removed = true; } } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { throw new UnsupportedOperationException(); } @Override - public void completeGroup(Object groupId) { + protected void completeGroupInner(Object groupId) { throw new UnsupportedOperationException(); } @Override - public Message pollMessageFromGroup(Object groupId) { + protected Message pollMessageFromGroupInner(Object groupId) { return null; } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java index db47f54235b..f7df701521a 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java @@ -80,6 +80,7 @@ * @author Gary Russell * @author Artem Bilan * @author Ngoc Nhan + * @author Youbin Wu * * @since 2.0 */ @@ -472,7 +473,7 @@ public Message addMessage(final Message message) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void addMessagesToGroupInner(Object groupId, Message... messages) { String groupKey = getKey(groupId); MessageGroupMetadata groupMetadata = getGroupMetadata(groupKey); @@ -576,7 +577,7 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) { } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { + protected void removeMessagesFromGroupInner(Object groupId, Collection> messages) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messages, "'messages' must not be null"); @@ -621,7 +622,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { String groupKey = getKey(groupId); String messageKey = getKey(messageId); int messageToGroupRemoved = @@ -634,7 +635,7 @@ public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { } @Override - public void removeMessageGroup(Object groupId) { + protected void removeMessageGroupInner(Object groupId) { String groupKey = getKey(groupId); this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGES_FROM_GROUP), @@ -653,7 +654,7 @@ public void removeMessageGroup(Object groupId) { } @Override - public void completeGroup(Object groupId) { + protected void completeGroupInner(Object groupId) { final String groupKey = getKey(groupId); if (logger.isDebugEnabled()) { @@ -664,7 +665,7 @@ public void completeGroup(Object groupId) { } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void setGroupConditionInner(Object groupId, String condition) { Assert.notNull(groupId, "'groupId' must not be null"); String groupKey = getKey(groupId); Timestamp updatedDate = new Timestamp(System.currentTimeMillis()); @@ -675,7 +676,7 @@ public void setGroupCondition(Object groupId, String condition) { } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { Assert.notNull(groupId, "'groupId' must not be null"); String groupKey = getKey(groupId); @@ -687,7 +688,7 @@ public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNu } @Override - public Message pollMessageFromGroup(Object groupId) { + protected Message pollMessageFromGroupInner(Object groupId) { String key = getKey(groupId); Message polledMessage = doPollForMessage(key); diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java index d7cbd4259f6..29b1222fad9 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java @@ -61,6 +61,7 @@ * * @author Artem Bilan * @author Adama Sorho + * @author Youbin Wu * * @since 4.0 */ @@ -201,7 +202,7 @@ public MessageMetadata getMessageMetadata(UUID id) { } @Override - public void removeMessageGroup(Object groupId) { + protected void removeMessageGroupInner(Object groupId) { this.mongoTemplate.remove(groupIdQuery(groupId), this.collectionName); } @@ -250,17 +251,17 @@ protected static Query groupIdQuery(Object groupId) { } @Override - public void removeMessagesFromGroup(Object key, Collection> messages) { + protected void removeMessagesFromGroupInner(Object key, Collection> messages) { throw NOT_IMPLEMENTED; } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void setGroupConditionInner(Object groupId, String condition) { throw NOT_IMPLEMENTED; } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { throw NOT_IMPLEMENTED; } @@ -270,7 +271,7 @@ public Iterator iterator() { } @Override - public void completeGroup(Object groupId) { + protected void completeGroupInner(Object groupId) { throw NOT_IMPLEMENTED; } @@ -280,7 +281,7 @@ public Message getOneMessageFromGroup(Object groupId) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void addMessagesToGroupInner(Object groupId, Message... messages) { throw NOT_IMPLEMENTED; } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java index 6176a2ccafa..23405f47e05 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java @@ -54,6 +54,7 @@ * @author Artem Bilan * @author Gary Russell * @author Ngoc Nhan + * @author Youbin Wu * * @since 3.0 */ @@ -147,7 +148,7 @@ public MessageGroup addMessageToGroup(Object groupId, Message message) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void addMessagesToGroupInner(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'message' must not be null"); @@ -183,7 +184,7 @@ public void addMessagesToGroup(Object groupId, Message... messages) { } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { + protected void removeMessagesFromGroupInner(Object groupId, Collection> messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messageToRemove' must not be null"); @@ -215,7 +216,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messageId, "'messageId' must not be null"); Query query = @@ -234,7 +235,7 @@ private void removeMessages(Object groupId, Collection ids) { } @Override - public Message pollMessageFromGroup(final Object groupId) { + protected Message pollMessageFromGroupInner(final Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Sort sort = Sort.by(MessageDocumentFields.LAST_MODIFIED_TIME, MessageDocumentFields.SEQUENCE); @@ -249,17 +250,17 @@ public Message pollMessageFromGroup(final Object groupId) { } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.LAST_RELEASED_SEQUENCE, sequenceNumber)); } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void setGroupConditionInner(Object groupId, String condition) { updateGroup(groupId, lastModifiedUpdate().set("condition", condition)); } @Override - public void completeGroup(Object groupId) { + protected void completeGroupInner(Object groupId) { updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.COMPLETE, true)); } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java index 76ee7da77ad..af396f266e8 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,7 @@ * * @author Artem Bilan * @author Adama Sorho + * @author Youbin Wu * * @since 4.0 */ @@ -132,7 +133,7 @@ public MessageGroup getMessageGroup(Object groupId) { } @Override - public Message pollMessageFromGroup(Object groupId) { + protected Message pollMessageFromGroupInner(Object groupId) { Assert.notNull(groupId, "'groupId' must not be null"); Sort sort = Sort.by(MessageDocumentFields.LAST_MODIFIED_TIME, MessageDocumentFields.SEQUENCE); diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index 617d7e4c113..119a2d71f50 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -97,6 +97,7 @@ * @author Jodie StJohn * @author Gary Russell * @author Artem Bilan + * @author Youbin Wu * * @since 2.1 */ @@ -294,7 +295,7 @@ public MessageGroup getMessageGroup(Object groupId) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void addMessagesToGroupInner(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'message' must not be null"); Query query = whereGroupIdOrder(groupId); @@ -328,7 +329,7 @@ public void addMessagesToGroup(Object groupId, Message... messages) { } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { + protected void removeMessagesFromGroupInner(Object groupId, Collection> messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messageToRemove' must not be null"); @@ -367,7 +368,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messageId, "'messageId' must not be null"); return this.template.remove(whereMessageIdIsAndGroupIdIs(messageId, groupId), this.collectionName) @@ -375,7 +376,7 @@ public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { } @Override - public void removeMessageGroup(Object groupId) { + protected void removeMessageGroupInner(Object groupId) { this.template.remove(whereGroupIdIs(groupId), this.collectionName); } @@ -395,7 +396,7 @@ public Iterator iterator() { } @Override - public Message pollMessageFromGroup(final Object groupId) { + protected Message pollMessageFromGroupInner(final Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Query query = whereGroupIdIs(groupId).with(Sort.by(GROUP_UPDATE_TIMESTAMP_KEY, SEQUENCE)); MessageWrapper messageWrapper = this.template.findAndRemove(query, MessageWrapper.class, this.collectionName); @@ -415,17 +416,17 @@ public int messageGroupSize(Object groupId) { } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void setGroupConditionInner(Object groupId, String condition) { updateGroup(groupId, lastModifiedUpdate().set("_condition", condition)); } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + public void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { updateGroup(groupId, lastModifiedUpdate().set(LAST_RELEASED_SEQUENCE_NUMBER, sequenceNumber)); } @Override - public void completeGroup(Object groupId) { + protected void completeGroupInner(Object groupId) { this.updateGroup(groupId, lastModifiedUpdate().set(GROUP_COMPLETE_KEY, true)); } diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java index c78464082f0..dc422b94664 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java @@ -17,9 +17,6 @@ package org.springframework.integration.redis.store; import java.util.Collection; -import java.util.UUID; -import java.util.concurrent.locks.Lock; -import java.util.function.Supplier; import org.springframework.beans.factory.BeanClassLoaderAware; import org.springframework.data.redis.connection.RedisConnectionFactory; @@ -30,11 +27,6 @@ import org.springframework.data.redis.serializer.SerializationException; import org.springframework.data.redis.serializer.StringRedisSerializer; import org.springframework.integration.store.AbstractKeyValueMessageStore; -import org.springframework.integration.store.MessageGroup; -import org.springframework.integration.support.locks.DefaultLockRegistry; -import org.springframework.integration.support.locks.LockRegistry; -import org.springframework.messaging.Message; -import org.springframework.messaging.MessagingException; import org.springframework.util.Assert; /** @@ -53,16 +45,12 @@ public class RedisMessageStore extends AbstractKeyValueMessageStore implements B private static final String ID_MUST_NOT_BE_NULL = "'id' must not be null"; - private static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock"; - private final RedisTemplate redisTemplate; private boolean valueSerializerSet; private volatile boolean unlinkAvailable = true; - private LockRegistry lockRegistry; - /** * Construct {@link RedisMessageStore} based on the provided * {@link RedisConnectionFactory} and default empty prefix. @@ -82,27 +70,12 @@ public RedisMessageStore(RedisConnectionFactory connectionFactory) { * @see AbstractKeyValueMessageStore#AbstractKeyValueMessageStore(String) */ public RedisMessageStore(RedisConnectionFactory connectionFactory, String prefix) { - this(connectionFactory, prefix, new DefaultLockRegistry()); - } - - /** - * Construct {@link RedisMessageStore} based on the provided - * {@link RedisConnectionFactory} and prefix. - * @param connectionFactory the RedisConnectionFactory to use - * @param prefix the key prefix to use, allowing the same broker to be used for - * multiple stores. - * @param lockRegistry the {@link LockRegistry} to use. - * @since 6.4.1 - * @see AbstractKeyValueMessageStore#AbstractKeyValueMessageStore(String) - */ - public RedisMessageStore(RedisConnectionFactory connectionFactory, String prefix, LockRegistry lockRegistry) { super(prefix); this.redisTemplate = new RedisTemplate<>(); this.redisTemplate.setConnectionFactory(connectionFactory); this.redisTemplate.setKeySerializer(new StringRedisSerializer()); this.redisTemplate.setValueSerializer(new JdkSerializationRedisSerializer()); this.redisTemplate.afterPropertiesSet(); - this.lockRegistry = lockRegistry; } @Override @@ -211,83 +184,6 @@ protected Collection doListKeys(String keyPattern) { return this.redisTemplate.keys(keyPattern); } - @Override - protected MessageGroup copy(MessageGroup group) { - return lockExecute(group.getGroupId(), () -> super.copy(group)); - } - - @Override - public void addMessagesToGroup(Object groupId, Message... messages) { - Assert.notNull(groupId, "'groupId' must not be null"); - - lockExecute(groupId, () -> { - super.addMessagesToGroup(groupId, messages); - return null; - }); - } - - @Override - public void removeMessageGroup(Object groupId) { - lockExecute(groupId, () -> { - super.removeMessageGroup(groupId); - return null; - }); - } - - @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { - lockExecute(groupId, () -> { - super.removeMessagesFromGroup(groupId, messages); - return null; - }); - } - - @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { - return lockExecute(groupId, () -> super.removeMessageFromGroupById(groupId, messageId)); - } - - @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { - lockExecute(groupId, () -> { - super.setLastReleasedSequenceNumberForGroup(groupId, sequenceNumber); - return null; - }); - } - - @Override - public void completeGroup(Object groupId) { - lockExecute(groupId, () -> { - super.completeGroup(groupId); - return null; - }); - } - - @Override - public void setGroupCondition(Object groupId, String condition) { - lockExecute(groupId, () -> { - super.setGroupCondition(groupId, condition); - return null; - }); - } - - public T lockExecute(Object groupId, Supplier supplier) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - return supplier.get(); - } - finally { - lock.unlock(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); - } - } - private void rethrowAsIllegalArgumentException(SerializationException e) { throw new IllegalArgumentException("If relying on the default RedisSerializer " + "(JdkSerializationRedisSerializer) the Object must be Serializable. " + From ab0519d6960376804e7135601ebab0b9a8fc3496 Mon Sep 17 00:00:00 2001 From: NaccOll Date: Tue, 10 Dec 2024 08:26:33 +0800 Subject: [PATCH 3/4] Normalize access levels and method name about the lock of MessageGroupStore --- .../store/AbstractKeyValueMessageStore.java | 16 ++++----- .../store/AbstractMessageGroupStore.java | 36 +++++++++---------- .../integration/store/SimpleMessageStore.java | 16 ++++----- ...bstractCorrelatingMessageHandlerTests.java | 2 +- .../integration/store/MessageStoreTests.java | 12 +++---- .../jdbc/store/JdbcMessageStore.java | 16 ++++----- ...stractConfigurableMongoDbMessageStore.java | 12 +++---- .../ConfigurableMongoDbMessageStore.java | 14 ++++---- .../store/MongoDbChannelMessageStore.java | 2 +- .../mongodb/store/MongoDbMessageStore.java | 16 ++++----- .../redis/store/RedisMessageStore.java | 1 - 11 files changed, 71 insertions(+), 72 deletions(-) diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java index 5b738a4bb6c..fa415bf19f8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java @@ -205,7 +205,7 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) { } @Override - public void addMessagesToGroupInner(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messages' must not be null"); @@ -239,7 +239,7 @@ public void addMessagesToGroupInner(Object groupId, Message... messages) { } @Override - public void removeMessagesFromGroupInner(Object groupId, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messages' must not be null"); @@ -282,7 +282,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messageId, "'messageId' must not be null"); Object mgm = doRetrieve(this.groupPrefix + groupId); @@ -304,7 +304,7 @@ public boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { } @Override - public void completeGroupInner(Object groupId) { + protected void doCompleteGroup(Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); MessageGroupMetadata metadata = getGroupMetadata(groupId); if (metadata != null) { @@ -318,7 +318,7 @@ public void completeGroupInner(Object groupId) { * Remove the MessageGroup with the provided group ID. */ @Override - public void removeMessageGroupInner(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Object mgm = doRemove(this.groupPrefix + groupId); if (mgm != null) { @@ -336,7 +336,7 @@ public void removeMessageGroupInner(Object groupId) { } @Override - public void setGroupConditionInner(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { MessageGroupMetadata metadata = getGroupMetadata(groupId); if (metadata != null) { metadata.setCondition(condition); @@ -345,7 +345,7 @@ public void setGroupConditionInner(Object groupId, String condition) { } @Override - public void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); MessageGroupMetadata metadata = getGroupMetadata(groupId); if (metadata == null) { @@ -358,7 +358,7 @@ public void setLastReleasedSequenceNumberForGroupInner(Object groupId, int seque } @Override - public Message pollMessageFromGroupInner(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { MessageGroupMetadata groupMetadata = getGroupMetadata(groupId); if (groupMetadata != null) { UUID firstId = groupMetadata.firstId(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java index d1d8f513c89..9dcae51836a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java @@ -224,18 +224,18 @@ public void removeMessagesFromGroup(Object key, Message... messages) { @Override public void removeMessagesFromGroup(Object key, Collection> messages) { Assert.notNull(key, GROUP_ID_MUST_NOT_BE_NULL); - lockExecute(key, () -> removeMessagesFromGroupInner(key, messages)); + executeLocked(key, () -> doRemoveMessagesFromGroup(key, messages)); } - protected abstract void removeMessagesFromGroupInner(Object key, Collection> messages); + protected abstract void doRemoveMessagesFromGroup(Object key, Collection> messages); @Override public void addMessagesToGroup(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); - lockExecute(groupId, () -> addMessagesToGroupInner(groupId, messages)); + executeLocked(groupId, () -> doAddMessagesToGroup(groupId, messages)); } - protected abstract void addMessagesToGroupInner(Object groupId, Message... messages); + protected abstract void doAddMessagesToGroup(Object groupId, Message... messages); @Override public MessageGroup addMessageToGroup(Object groupId, Message message) { @@ -246,54 +246,54 @@ public MessageGroup addMessageToGroup(Object groupId, Message message) { @Override public void removeMessageGroup(Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); - lockExecute(groupId, () -> removeMessageGroupInner(groupId)); + executeLocked(groupId, () -> doRemoveMessageGroup(groupId)); } - protected abstract void removeMessageGroupInner(Object groupId); + protected abstract void doRemoveMessageGroup(Object groupId); @Override public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); - return lockExecute(groupId, () -> removeMessageFromGroupByIdInner(groupId, messageId)); + return executeLocked(groupId, () -> doRemoveMessageFromGroupById(groupId, messageId)); } - protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { throw new UnsupportedOperationException("Not supported for this store"); } @Override public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); - lockExecute(groupId, () -> setLastReleasedSequenceNumberForGroupInner(groupId, sequenceNumber)); + executeLocked(groupId, () -> doSetLastReleasedSequenceNumberForGroup(groupId, sequenceNumber)); } - protected abstract void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber); + protected abstract void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber); @Override public void completeGroup(Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); - lockExecute(groupId, () -> completeGroupInner(groupId)); + executeLocked(groupId, () -> doCompleteGroup(groupId)); } - protected abstract void completeGroupInner(Object groupId); + protected abstract void doCompleteGroup(Object groupId); @Override public void setGroupCondition(Object groupId, String condition) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); - lockExecute(groupId, () -> setGroupConditionInner(groupId, condition)); + executeLocked(groupId, () -> doSetGroupCondition(groupId, condition)); } - protected abstract void setGroupConditionInner(Object groupId, String condition); + protected abstract void doSetGroupCondition(Object groupId, String condition); @Override public Message pollMessageFromGroup(Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); - return lockExecute(groupId, () -> pollMessageFromGroupInner(groupId)); + return executeLocked(groupId, () -> doPollMessageFromGroup(groupId)); } - protected abstract Message pollMessageFromGroupInner(Object groupId); + protected abstract Message doPollMessageFromGroup(Object groupId); - protected T lockExecute(Object groupId, CheckedCallable runnable) { + protected T executeLocked(Object groupId, CheckedCallable runnable) { try { return this.lockRegistry.executeLocked(groupId, runnable); } @@ -303,7 +303,7 @@ protected T lockExecute(Object groupId, CheckedC } } - protected void lockExecute(Object groupId, CheckedRunnable runnable) { + protected void executeLocked(Object groupId, CheckedRunnable runnable) { try { this.lockRegistry.executeLocked(groupId, runnable); } diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java index 7c298f8f401..cc72cca7ade 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java @@ -266,7 +266,7 @@ protected MessageGroup copy(MessageGroup group) { } @Override - protected void addMessagesToGroupInner(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messages, "'messages' must not be null"); @@ -306,7 +306,7 @@ private MessagingException outOfCapacityException(Object groupId) { } @Override - protected void removeMessageGroupInner(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { MessageGroup messageGroup = this.groupIdToMessageGroup.remove(groupId); if (messageGroup != null) { UpperBound upperBound = this.groupToUpperBound.remove(groupId); @@ -316,7 +316,7 @@ protected void removeMessageGroupInner(Object groupId) { } @Override - protected void removeMessagesFromGroupInner(Object groupId, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { MessageGroup group = this.groupIdToMessageGroup.get(groupId); Assert.notNull(group, () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + @@ -350,7 +350,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { MessageGroup group = this.groupIdToMessageGroup.get(groupId); Assert.notNull(group, () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + @@ -374,7 +374,7 @@ public Iterator iterator() { } @Override - protected void setGroupConditionInner(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { MessageGroup group = this.groupIdToMessageGroup.get(groupId); if (group != null) { group.setCondition(condition); @@ -382,7 +382,7 @@ protected void setGroupConditionInner(Object groupId, String condition) { } @Override - protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { MessageGroup group = this.groupIdToMessageGroup.get(groupId); Assert.notNull(group, () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + @@ -392,7 +392,7 @@ protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int se } @Override - protected void completeGroupInner(Object groupId) { + protected void doCompleteGroup(Object groupId) { MessageGroup group = this.groupIdToMessageGroup.get(groupId); Assert.notNull(group, () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + @@ -402,7 +402,7 @@ protected void completeGroupInner(Object groupId) { } @Override - protected Message pollMessageFromGroupInner(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { Collection> messageList = getMessageGroup(groupId).getMessages(); Message message = null; if (!CollectionUtils.isEmpty(messageList)) { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java index c0fa3ec3e26..71b53c2bd5a 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java @@ -351,7 +351,7 @@ public void testInt3483DeadlockOnMessageStoreRemoveMessageGroup() throws Interru SimpleMessageStore messageStore = new SimpleMessageStore() { @Override - public void removeMessageGroupInner(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { throw new RuntimeException("intentional"); } }; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java index 96b1a54172a..5710f2a6075 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java @@ -92,7 +92,7 @@ public Iterator iterator() { } @Override - protected void addMessagesToGroupInner(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { throw new UnsupportedOperationException(); } @@ -102,30 +102,30 @@ public MessageGroup getMessageGroup(Object correlationKey) { } @Override - protected void removeMessagesFromGroupInner(Object key, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object key, Collection> messages) { throw new UnsupportedOperationException(); } @Override - protected void removeMessageGroupInner(Object correlationKey) { + protected void doRemoveMessageGroup(Object correlationKey) { if (correlationKey.equals(testMessages.getGroupId())) { removed = true; } } @Override - protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { throw new UnsupportedOperationException(); } @Override - protected void completeGroupInner(Object groupId) { + protected void doCompleteGroup(Object groupId) { throw new UnsupportedOperationException(); } @Override - protected Message pollMessageFromGroupInner(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { return null; } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java index f7df701521a..10dfd3ac3b0 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java @@ -473,7 +473,7 @@ public Message addMessage(final Message message) { } @Override - protected void addMessagesToGroupInner(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { String groupKey = getKey(groupId); MessageGroupMetadata groupMetadata = getGroupMetadata(groupKey); @@ -577,7 +577,7 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) { } @Override - protected void removeMessagesFromGroupInner(Object groupId, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messages, "'messages' must not be null"); @@ -622,7 +622,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { String groupKey = getKey(groupId); String messageKey = getKey(messageId); int messageToGroupRemoved = @@ -635,7 +635,7 @@ protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId } @Override - protected void removeMessageGroupInner(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { String groupKey = getKey(groupId); this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGES_FROM_GROUP), @@ -654,7 +654,7 @@ protected void removeMessageGroupInner(Object groupId) { } @Override - protected void completeGroupInner(Object groupId) { + protected void doCompleteGroup(Object groupId) { final String groupKey = getKey(groupId); if (logger.isDebugEnabled()) { @@ -665,7 +665,7 @@ protected void completeGroupInner(Object groupId) { } @Override - protected void setGroupConditionInner(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { Assert.notNull(groupId, "'groupId' must not be null"); String groupKey = getKey(groupId); Timestamp updatedDate = new Timestamp(System.currentTimeMillis()); @@ -676,7 +676,7 @@ protected void setGroupConditionInner(Object groupId, String condition) { } @Override - protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { Assert.notNull(groupId, "'groupId' must not be null"); String groupKey = getKey(groupId); @@ -688,7 +688,7 @@ protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int se } @Override - protected Message pollMessageFromGroupInner(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { String key = getKey(groupId); Message polledMessage = doPollForMessage(key); diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java index 29b1222fad9..dce4c00d4b5 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java @@ -202,7 +202,7 @@ public MessageMetadata getMessageMetadata(UUID id) { } @Override - protected void removeMessageGroupInner(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { this.mongoTemplate.remove(groupIdQuery(groupId), this.collectionName); } @@ -251,17 +251,17 @@ protected static Query groupIdQuery(Object groupId) { } @Override - protected void removeMessagesFromGroupInner(Object key, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object key, Collection> messages) { throw NOT_IMPLEMENTED; } @Override - protected void setGroupConditionInner(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { throw NOT_IMPLEMENTED; } @Override - protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { throw NOT_IMPLEMENTED; } @@ -271,7 +271,7 @@ public Iterator iterator() { } @Override - protected void completeGroupInner(Object groupId) { + protected void doCompleteGroup(Object groupId) { throw NOT_IMPLEMENTED; } @@ -281,7 +281,7 @@ public Message getOneMessageFromGroup(Object groupId) { } @Override - protected void addMessagesToGroupInner(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { throw NOT_IMPLEMENTED; } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java index 23405f47e05..426d622c53f 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java @@ -148,7 +148,7 @@ public MessageGroup addMessageToGroup(Object groupId, Message message) { } @Override - protected void addMessagesToGroupInner(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'message' must not be null"); @@ -184,7 +184,7 @@ protected void addMessagesToGroupInner(Object groupId, Message... messages) { } @Override - protected void removeMessagesFromGroupInner(Object groupId, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messageToRemove' must not be null"); @@ -216,7 +216,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messageId, "'messageId' must not be null"); Query query = @@ -235,7 +235,7 @@ private void removeMessages(Object groupId, Collection ids) { } @Override - protected Message pollMessageFromGroupInner(final Object groupId) { + protected Message doPollMessageFromGroup(final Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Sort sort = Sort.by(MessageDocumentFields.LAST_MODIFIED_TIME, MessageDocumentFields.SEQUENCE); @@ -250,17 +250,17 @@ protected Message pollMessageFromGroupInner(final Object groupId) { } @Override - protected void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.LAST_RELEASED_SEQUENCE, sequenceNumber)); } @Override - protected void setGroupConditionInner(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { updateGroup(groupId, lastModifiedUpdate().set("condition", condition)); } @Override - protected void completeGroupInner(Object groupId) { + protected void doCompleteGroup(Object groupId) { updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.COMPLETE, true)); } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java index af396f266e8..485fff046bf 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java @@ -133,7 +133,7 @@ public MessageGroup getMessageGroup(Object groupId) { } @Override - protected Message pollMessageFromGroupInner(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { Assert.notNull(groupId, "'groupId' must not be null"); Sort sort = Sort.by(MessageDocumentFields.LAST_MODIFIED_TIME, MessageDocumentFields.SEQUENCE); diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index 119a2d71f50..24717e0955e 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -295,7 +295,7 @@ public MessageGroup getMessageGroup(Object groupId) { } @Override - protected void addMessagesToGroupInner(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'message' must not be null"); Query query = whereGroupIdOrder(groupId); @@ -329,7 +329,7 @@ protected void addMessagesToGroupInner(Object groupId, Message... messages) { } @Override - protected void removeMessagesFromGroupInner(Object groupId, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messageToRemove' must not be null"); @@ -368,7 +368,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messageId, "'messageId' must not be null"); return this.template.remove(whereMessageIdIsAndGroupIdIs(messageId, groupId), this.collectionName) @@ -376,7 +376,7 @@ protected boolean removeMessageFromGroupByIdInner(Object groupId, UUID messageId } @Override - protected void removeMessageGroupInner(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { this.template.remove(whereGroupIdIs(groupId), this.collectionName); } @@ -396,7 +396,7 @@ public Iterator iterator() { } @Override - protected Message pollMessageFromGroupInner(final Object groupId) { + protected Message doPollMessageFromGroup(final Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Query query = whereGroupIdIs(groupId).with(Sort.by(GROUP_UPDATE_TIMESTAMP_KEY, SEQUENCE)); MessageWrapper messageWrapper = this.template.findAndRemove(query, MessageWrapper.class, this.collectionName); @@ -416,17 +416,17 @@ public int messageGroupSize(Object groupId) { } @Override - protected void setGroupConditionInner(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { updateGroup(groupId, lastModifiedUpdate().set("_condition", condition)); } @Override - public void setLastReleasedSequenceNumberForGroupInner(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { updateGroup(groupId, lastModifiedUpdate().set(LAST_RELEASED_SEQUENCE_NUMBER, sequenceNumber)); } @Override - protected void completeGroupInner(Object groupId) { + protected void doCompleteGroup(Object groupId) { this.updateGroup(groupId, lastModifiedUpdate().set(GROUP_COMPLETE_KEY, true)); } diff --git a/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java index dc422b94664..f5b05ae5728 100644 --- a/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java +++ b/spring-integration-redis/src/main/java/org/springframework/integration/redis/store/RedisMessageStore.java @@ -37,7 +37,6 @@ * @author Oleg Zhurakousky * @author Gary Russell * @author Artem Bilan - * @author Youbin Wu * * @since 2.1 */ From 5f5d40c7f2e90849fd428e66ed687315002d6518 Mon Sep 17 00:00:00 2001 From: NaccOll Date: Tue, 10 Dec 2024 08:27:25 +0800 Subject: [PATCH 4/4] Add document about the lock of AbstractMessageGroupStore --- .../antora/modules/ROOT/pages/message-store.adoc | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/reference/antora/modules/ROOT/pages/message-store.adoc b/src/reference/antora/modules/ROOT/pages/message-store.adoc index 3688e452bdf..e44c256bdc0 100644 --- a/src/reference/antora/modules/ROOT/pages/message-store.adoc +++ b/src/reference/antora/modules/ROOT/pages/message-store.adoc @@ -158,3 +158,12 @@ It also allows the end marker to arrive at the aggregator before all the other r In addition, for configuration convenience, a `GroupConditionProvider` contract has been introduced. The `AbstractCorrelatingMessageHandler` checks if the provided `ReleaseStrategy` implements this interface and extracts a `conditionSupplier` for group condition evaluation logic. + +[[use-lock-registry]] +== Use LockRegistry + +Starting with version 6.5, the `MessageStore` abstraction operate the metadata of message group with lock. +This lock acquires the groupId and generated by `LockRegister`. +Its purpose is to operate on the atomicity of messages and message groups. +In multiple threads, adding or removing messages or updating metadata at the same time, some implementations may have message group errors if the lock is missing. +By default, `LockRegister` uses `DefaultLockRegistry`, and developers can set it to the implementation they need through `setLockRegistry` \ No newline at end of file