diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java index b74be6ca439..fa415bf19f8 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java @@ -38,13 +38,12 @@ * @author Gary Russell * @author Artem Bilan * @author Ngoc Nhan + * @author Youbin Wu * * @since 2.1 */ public abstract class AbstractKeyValueMessageStore extends AbstractMessageGroupStore implements MessageStore { - private static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null"; - protected static final String MESSAGE_KEY_PREFIX = "MESSAGE_"; protected static final String MESSAGE_GROUP_KEY_PREFIX = "GROUP_OF_MESSAGES_"; @@ -206,7 +205,7 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messages' must not be null"); @@ -240,7 +239,7 @@ public void addMessagesToGroup(Object groupId, Message... messages) { } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messages' must not be null"); @@ -283,7 +282,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messageId, "'messageId' must not be null"); Object mgm = doRetrieve(this.groupPrefix + groupId); @@ -305,7 +304,7 @@ public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { } @Override - public void completeGroup(Object groupId) { + protected void doCompleteGroup(Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); MessageGroupMetadata metadata = getGroupMetadata(groupId); if (metadata != null) { @@ -319,7 +318,7 @@ public void completeGroup(Object groupId) { * Remove the MessageGroup with the provided group ID. */ @Override - public void removeMessageGroup(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Object mgm = doRemove(this.groupPrefix + groupId); if (mgm != null) { @@ -337,7 +336,7 @@ public void removeMessageGroup(Object groupId) { } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { MessageGroupMetadata metadata = getGroupMetadata(groupId); if (metadata != null) { metadata.setCondition(condition); @@ -346,7 +345,7 @@ public void setGroupCondition(Object groupId, String condition) { } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); MessageGroupMetadata metadata = getGroupMetadata(groupId); if (metadata == null) { @@ -359,7 +358,7 @@ public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNu } @Override - public Message pollMessageFromGroup(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { MessageGroupMetadata groupMetadata = getGroupMetadata(groupId); if (groupMetadata != null) { UUID firstId = groupMetadata.firstId(); diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java index 8496a51bf1a..9dcae51836a 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/AbstractMessageGroupStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2023 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,16 +19,23 @@ import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashSet; +import java.util.UUID; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.integration.support.locks.DefaultLockRegistry; +import org.springframework.integration.support.locks.LockRegistry; +import org.springframework.integration.util.CheckedCallable; +import org.springframework.integration.util.CheckedRunnable; import org.springframework.jmx.export.annotation.ManagedAttribute; import org.springframework.jmx.export.annotation.ManagedOperation; import org.springframework.jmx.export.annotation.ManagedResource; import org.springframework.messaging.Message; +import org.springframework.messaging.MessagingException; +import org.springframework.util.Assert; /** * @author Dave Syer @@ -36,6 +43,7 @@ * @author Gary Russell * @author Artem Bilan * @author Christian Tzolov + * @author Youbin Wu * * @since 2.0 */ @@ -43,6 +51,10 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageGroupStore implements MessageGroupStore, Iterable { + protected static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock"; + + protected static final String GROUP_ID_MUST_NOT_BE_NULL = "'groupId' must not be null"; + protected final Log logger = LogFactory.getLog(getClass()); // NOSONAR final private final Lock lock = new ReentrantLock(); @@ -56,11 +68,15 @@ public abstract class AbstractMessageGroupStore extends AbstractBatchingMessageG private boolean timeoutOnIdle; + protected LockRegistry lockRegistry; + protected AbstractMessageGroupStore() { + this.lockRegistry = new DefaultLockRegistry(); } protected AbstractMessageGroupStore(boolean lazyLoadMessageGroups) { this.lazyLoadMessageGroups = lazyLoadMessageGroups; + this.lockRegistry = new DefaultLockRegistry(); } @Override @@ -109,6 +125,16 @@ public void setLazyLoadMessageGroups(boolean lazyLoadMessageGroups) { this.lazyLoadMessageGroups = lazyLoadMessageGroups; } + /** + * Specify the type of the {@link LockRegistry} to ensure atomic operations + * @param lockRegistry lockRegistryType + * @since 6.5 + */ + public void setLockRegistry(LockRegistry lockRegistry) { + Assert.notNull(lockRegistry, "The LockRegistry cannot be null"); + this.lockRegistry = lockRegistry; + } + @Override public void registerMessageGroupExpiryCallback(MessageGroupCallback callback) { if (callback instanceof UniqueExpiryCallback) { @@ -195,12 +221,98 @@ public void removeMessagesFromGroup(Object key, Message... messages) { removeMessagesFromGroup(key, Arrays.asList(messages)); } + @Override + public void removeMessagesFromGroup(Object key, Collection> messages) { + Assert.notNull(key, GROUP_ID_MUST_NOT_BE_NULL); + executeLocked(key, () -> doRemoveMessagesFromGroup(key, messages)); + } + + protected abstract void doRemoveMessagesFromGroup(Object key, Collection> messages); + + @Override + public void addMessagesToGroup(Object groupId, Message... messages) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + executeLocked(groupId, () -> doAddMessagesToGroup(groupId, messages)); + } + + protected abstract void doAddMessagesToGroup(Object groupId, Message... messages); + @Override public MessageGroup addMessageToGroup(Object groupId, Message message) { addMessagesToGroup(groupId, message); return getMessageGroup(groupId); } + @Override + public void removeMessageGroup(Object groupId) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + executeLocked(groupId, () -> doRemoveMessageGroup(groupId)); + } + + protected abstract void doRemoveMessageGroup(Object groupId); + + @Override + public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + return executeLocked(groupId, () -> doRemoveMessageFromGroupById(groupId, messageId)); + } + + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { + throw new UnsupportedOperationException("Not supported for this store"); + } + + @Override + public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + executeLocked(groupId, () -> doSetLastReleasedSequenceNumberForGroup(groupId, sequenceNumber)); + } + + protected abstract void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber); + + @Override + public void completeGroup(Object groupId) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + executeLocked(groupId, () -> doCompleteGroup(groupId)); + } + + protected abstract void doCompleteGroup(Object groupId); + + @Override + public void setGroupCondition(Object groupId, String condition) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + executeLocked(groupId, () -> doSetGroupCondition(groupId, condition)); + } + + protected abstract void doSetGroupCondition(Object groupId, String condition); + + @Override + public Message pollMessageFromGroup(Object groupId) { + Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); + return executeLocked(groupId, () -> doPollMessageFromGroup(groupId)); + } + + protected abstract Message doPollMessageFromGroup(Object groupId); + + protected T executeLocked(Object groupId, CheckedCallable runnable) { + try { + return this.lockRegistry.executeLocked(groupId, runnable); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + } + } + + protected void executeLocked(Object groupId, CheckedRunnable runnable) { + try { + this.lockRegistry.executeLocked(groupId, runnable); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + } + } + private void expire(MessageGroup group) { RuntimeException exception = null; diff --git a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java index 5f6cd4be70a..cc72cca7ade 100644 --- a/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java +++ b/spring-integration-core/src/main/java/org/springframework/integration/store/SimpleMessageStore.java @@ -45,6 +45,7 @@ * @author Gary Russell * @author Ryan Barker * @author Artem Bilan + * @author Youbin Wu * * @since 2.0 */ @@ -55,8 +56,6 @@ public class SimpleMessageStore extends AbstractMessageGroupStore private static final String UPPER_BOUND_MUST_NOT_BE_NULL = "'upperBound' must not be null."; - private static final String INTERRUPTED_WHILE_OBTAINING_LOCK = "Interrupted while obtaining lock"; - private final ConcurrentMap> idToMessage = new ConcurrentHashMap<>(); private final ConcurrentMap groupIdToMessageGroup = new ConcurrentHashMap<>(); @@ -71,8 +70,6 @@ public class SimpleMessageStore extends AbstractMessageGroupStore private final long upperBoundTimeout; - private LockRegistry lockRegistry; - private boolean copyOnGet = false; private volatile boolean isUsed; @@ -162,6 +159,7 @@ public void setCopyOnGet(boolean copyOnGet) { this.copyOnGet = copyOnGet; } + @Override public void setLockRegistry(LockRegistry lockRegistry) { Assert.notNull(lockRegistry, "The LockRegistry cannot be null"); Assert.isTrue(!(this.isUsed), "Cannot change the lock registry after the store has been used"); @@ -268,56 +266,37 @@ protected MessageGroup copy(MessageGroup group) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messages, "'messages' must not be null"); - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - boolean unlocked = false; - try { - UpperBound upperBound; - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - if (group == null) { - if (this.groupCapacity > 0 && messages.length > this.groupCapacity) { - throw outOfCapacityException(groupId); - } - group = getMessageGroupFactory().create(groupId); - this.groupIdToMessageGroup.put(groupId, group); - upperBound = new UpperBound(this.groupCapacity); - for (Message message : messages) { - upperBound.tryAcquire(-1); - group.add(message); - } - this.groupToUpperBound.put(groupId, upperBound); - } - else { - upperBound = this.groupToUpperBound.get(groupId); - Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); - for (Message message : messages) { - lock.unlock(); - if (!upperBound.tryAcquire(this.upperBoundTimeout)) { - unlocked = true; - throw outOfCapacityException(groupId); - } - lock.lockInterruptibly(); - group.add(message); - } - } - - group.setLastModified(System.currentTimeMillis()); + UpperBound upperBound; + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + if (group == null) { + if (this.groupCapacity > 0 && messages.length > this.groupCapacity) { + throw outOfCapacityException(groupId); } - finally { - if (!unlocked) { - lock.unlock(); - } + group = getMessageGroupFactory().create(groupId); + this.groupIdToMessageGroup.put(groupId, group); + upperBound = new UpperBound(this.groupCapacity); + for (Message message : messages) { + upperBound.tryAcquire(-1); + group.add(message); } + this.groupToUpperBound.put(groupId, upperBound); } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + else { + upperBound = this.groupToUpperBound.get(groupId); + Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); + for (Message message : messages) { + if (!upperBound.tryAcquire(this.upperBoundTimeout)) { + throw outOfCapacityException(groupId); + } + group.add(message); + } } + + group.setLastModified(System.currentTimeMillis()); } private MessagingException outOfCapacityException(Object groupId) { @@ -327,58 +306,32 @@ private MessagingException outOfCapacityException(Object groupId) { } @Override - public void removeMessageGroup(Object groupId) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup messageGroup = this.groupIdToMessageGroup.remove(groupId); - if (messageGroup != null) { - UpperBound upperBound = this.groupToUpperBound.remove(groupId); - Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); - upperBound.release(this.groupCapacity); - } - } - finally { - lock.unlock(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + protected void doRemoveMessageGroup(Object groupId) { + MessageGroup messageGroup = this.groupIdToMessageGroup.remove(groupId); + if (messageGroup != null) { + UpperBound upperBound = this.groupToUpperBound.remove(groupId); + Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); + upperBound.release(this.groupCapacity); } } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - Assert.notNull(group, - () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + - "can not be located while attempting to remove Message(s) from the MessageGroup"); - UpperBound upperBound = this.groupToUpperBound.get(groupId); - Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); - boolean modified = false; - for (Message messageToRemove : messages) { - if (group.remove(messageToRemove)) { - upperBound.release(); - modified = true; - } - } - if (modified) { - group.setLastModified(System.currentTimeMillis()); - } - } - finally { - lock.unlock(); + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + Assert.notNull(group, + () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + + "can not be located while attempting to remove Message(s) from the MessageGroup"); + UpperBound upperBound = this.groupToUpperBound.get(groupId); + Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); + boolean modified = false; + for (Message messageToRemove : messages) { + if (group.remove(messageToRemove)) { + upperBound.release(); + modified = true; } } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); + if (modified) { + group.setLastModified(System.currentTimeMillis()); } } @@ -397,35 +350,22 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - Assert.notNull(group, - () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + - "can not be located while attempting to remove Message from the MessageGroup"); - UpperBound upperBound = this.groupToUpperBound.get(groupId); - Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); - for (Message message : group.getMessages()) { - if (messageId.equals(message.getHeaders().getId())) { - group.remove(message); - upperBound.release(); - group.setLastModified(System.currentTimeMillis()); - return true; - } - } - return false; - } - finally { - lock.unlock(); + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + Assert.notNull(group, + () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + + "can not be located while attempting to remove Message from the MessageGroup"); + UpperBound upperBound = this.groupToUpperBound.get(groupId); + Assert.state(upperBound != null, UPPER_BOUND_MUST_NOT_BE_NULL); + for (Message message : group.getMessages()) { + if (messageId.equals(message.getHeaders().getId())) { + group.remove(message); + upperBound.release(); + group.setLastModified(System.currentTimeMillis()); + return true; } } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, ex); - } + return false; } @Override @@ -434,7 +374,7 @@ public Iterator iterator() { } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { MessageGroup group = this.groupIdToMessageGroup.get(groupId); if (group != null) { group.setCondition(condition); @@ -442,53 +382,27 @@ public void setGroupCondition(Object groupId, String condition) { } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - Assert.notNull(group, - () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + - "can not be located while attempting to set 'lastReleasedSequenceNumber'"); - group.setLastReleasedMessageSequenceNumber(sequenceNumber); - group.setLastModified(System.currentTimeMillis()); - } - finally { - lock.unlock(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); - } + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + Assert.notNull(group, + () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + + "can not be located while attempting to set 'lastReleasedSequenceNumber'"); + group.setLastReleasedMessageSequenceNumber(sequenceNumber); + group.setLastModified(System.currentTimeMillis()); } @Override - public void completeGroup(Object groupId) { - Lock lock = this.lockRegistry.obtain(groupId); - try { - lock.lockInterruptibly(); - try { - MessageGroup group = this.groupIdToMessageGroup.get(groupId); - Assert.notNull(group, - () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + - "can not be located while attempting to complete the MessageGroup"); - group.complete(); - group.setLastModified(System.currentTimeMillis()); - } - finally { - lock.unlock(); - } - } - catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MessagingException(INTERRUPTED_WHILE_OBTAINING_LOCK, e); - } + protected void doCompleteGroup(Object groupId) { + MessageGroup group = this.groupIdToMessageGroup.get(groupId); + Assert.notNull(group, + () -> MESSAGE_GROUP_FOR_GROUP_ID + groupId + "' " + + "can not be located while attempting to complete the MessageGroup"); + group.complete(); + group.setLastModified(System.currentTimeMillis()); } @Override - public Message pollMessageFromGroup(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { Collection> messageList = getMessageGroup(groupId).getMessages(); Message message = null; if (!CollectionUtils.isEmpty(messageList)) { diff --git a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java index 757ed23a328..71b53c2bd5a 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/aggregator/AbstractCorrelatingMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -56,6 +56,7 @@ * @author Gary Russell * @author Artem Bilan * @author Meherzad Lahewala + * @author Youbin Wu * * @since 2.2 * @@ -350,7 +351,7 @@ public void testInt3483DeadlockOnMessageStoreRemoveMessageGroup() throws Interru SimpleMessageStore messageStore = new SimpleMessageStore() { @Override - public void removeMessageGroup(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { throw new RuntimeException("intentional"); } }; diff --git a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java index 988de554510..5710f2a6075 100644 --- a/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java +++ b/spring-integration-core/src/test/java/org/springframework/integration/store/MessageStoreTests.java @@ -35,6 +35,7 @@ * @author Dave Syer * @author Gary Russell * @author Artem Bilan + * @author Youbin Wu */ public class MessageStoreTests { @@ -91,7 +92,7 @@ public Iterator iterator() { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { throw new UnsupportedOperationException(); } @@ -101,30 +102,30 @@ public MessageGroup getMessageGroup(Object correlationKey) { } @Override - public void removeMessagesFromGroup(Object key, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object key, Collection> messages) { throw new UnsupportedOperationException(); } @Override - public void removeMessageGroup(Object correlationKey) { + protected void doRemoveMessageGroup(Object correlationKey) { if (correlationKey.equals(testMessages.getGroupId())) { removed = true; } } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { throw new UnsupportedOperationException(); } @Override - public void completeGroup(Object groupId) { + protected void doCompleteGroup(Object groupId) { throw new UnsupportedOperationException(); } @Override - public Message pollMessageFromGroup(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { return null; } diff --git a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java index db47f54235b..10dfd3ac3b0 100644 --- a/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java +++ b/spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/store/JdbcMessageStore.java @@ -80,6 +80,7 @@ * @author Gary Russell * @author Artem Bilan * @author Ngoc Nhan + * @author Youbin Wu * * @since 2.0 */ @@ -472,7 +473,7 @@ public Message addMessage(final Message message) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { String groupKey = getKey(groupId); MessageGroupMetadata groupMetadata = getGroupMetadata(groupKey); @@ -576,7 +577,7 @@ public MessageGroupMetadata getGroupMetadata(Object groupId) { } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { Assert.notNull(groupId, "'groupId' must not be null"); Assert.notNull(messages, "'messages' must not be null"); @@ -621,7 +622,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { String groupKey = getKey(groupId); String messageKey = getKey(messageId); int messageToGroupRemoved = @@ -634,7 +635,7 @@ public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { } @Override - public void removeMessageGroup(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { String groupKey = getKey(groupId); this.jdbcTemplate.update(getQuery(Query.DELETE_MESSAGES_FROM_GROUP), @@ -653,7 +654,7 @@ public void removeMessageGroup(Object groupId) { } @Override - public void completeGroup(Object groupId) { + protected void doCompleteGroup(Object groupId) { final String groupKey = getKey(groupId); if (logger.isDebugEnabled()) { @@ -664,7 +665,7 @@ public void completeGroup(Object groupId) { } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { Assert.notNull(groupId, "'groupId' must not be null"); String groupKey = getKey(groupId); Timestamp updatedDate = new Timestamp(System.currentTimeMillis()); @@ -675,7 +676,7 @@ public void setGroupCondition(Object groupId, String condition) { } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { Assert.notNull(groupId, "'groupId' must not be null"); String groupKey = getKey(groupId); @@ -687,7 +688,7 @@ public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNu } @Override - public Message pollMessageFromGroup(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { String key = getKey(groupId); Message polledMessage = doPollForMessage(key); diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java index d7cbd4259f6..dce4c00d4b5 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/AbstractConfigurableMongoDbMessageStore.java @@ -61,6 +61,7 @@ * * @author Artem Bilan * @author Adama Sorho + * @author Youbin Wu * * @since 4.0 */ @@ -201,7 +202,7 @@ public MessageMetadata getMessageMetadata(UUID id) { } @Override - public void removeMessageGroup(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { this.mongoTemplate.remove(groupIdQuery(groupId), this.collectionName); } @@ -250,17 +251,17 @@ protected static Query groupIdQuery(Object groupId) { } @Override - public void removeMessagesFromGroup(Object key, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object key, Collection> messages) { throw NOT_IMPLEMENTED; } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { throw NOT_IMPLEMENTED; } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { throw NOT_IMPLEMENTED; } @@ -270,7 +271,7 @@ public Iterator iterator() { } @Override - public void completeGroup(Object groupId) { + protected void doCompleteGroup(Object groupId) { throw NOT_IMPLEMENTED; } @@ -280,7 +281,7 @@ public Message getOneMessageFromGroup(Object groupId) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { throw NOT_IMPLEMENTED; } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java index 6176a2ccafa..426d622c53f 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/ConfigurableMongoDbMessageStore.java @@ -54,6 +54,7 @@ * @author Artem Bilan * @author Gary Russell * @author Ngoc Nhan + * @author Youbin Wu * * @since 3.0 */ @@ -147,7 +148,7 @@ public MessageGroup addMessageToGroup(Object groupId, Message message) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'message' must not be null"); @@ -183,7 +184,7 @@ public void addMessagesToGroup(Object groupId, Message... messages) { } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messageToRemove' must not be null"); @@ -215,7 +216,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messageId, "'messageId' must not be null"); Query query = @@ -234,7 +235,7 @@ private void removeMessages(Object groupId, Collection ids) { } @Override - public Message pollMessageFromGroup(final Object groupId) { + protected Message doPollMessageFromGroup(final Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Sort sort = Sort.by(MessageDocumentFields.LAST_MODIFIED_TIME, MessageDocumentFields.SEQUENCE); @@ -249,17 +250,17 @@ public Message pollMessageFromGroup(final Object groupId) { } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.LAST_RELEASED_SEQUENCE, sequenceNumber)); } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { updateGroup(groupId, lastModifiedUpdate().set("condition", condition)); } @Override - public void completeGroup(Object groupId) { + protected void doCompleteGroup(Object groupId) { updateGroup(groupId, lastModifiedUpdate().set(MessageDocumentFields.COMPLETE, true)); } diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java index 76ee7da77ad..485fff046bf 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbChannelMessageStore.java @@ -1,5 +1,5 @@ /* - * Copyright 2014-2023 the original author or authors. + * Copyright 2014-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,7 @@ * * @author Artem Bilan * @author Adama Sorho + * @author Youbin Wu * * @since 4.0 */ @@ -132,7 +133,7 @@ public MessageGroup getMessageGroup(Object groupId) { } @Override - public Message pollMessageFromGroup(Object groupId) { + protected Message doPollMessageFromGroup(Object groupId) { Assert.notNull(groupId, "'groupId' must not be null"); Sort sort = Sort.by(MessageDocumentFields.LAST_MODIFIED_TIME, MessageDocumentFields.SEQUENCE); diff --git a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java index 617d7e4c113..24717e0955e 100644 --- a/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java +++ b/spring-integration-mongodb/src/main/java/org/springframework/integration/mongodb/store/MongoDbMessageStore.java @@ -97,6 +97,7 @@ * @author Jodie StJohn * @author Gary Russell * @author Artem Bilan + * @author Youbin Wu * * @since 2.1 */ @@ -294,7 +295,7 @@ public MessageGroup getMessageGroup(Object groupId) { } @Override - public void addMessagesToGroup(Object groupId, Message... messages) { + protected void doAddMessagesToGroup(Object groupId, Message... messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'message' must not be null"); Query query = whereGroupIdOrder(groupId); @@ -328,7 +329,7 @@ public void addMessagesToGroup(Object groupId, Message... messages) { } @Override - public void removeMessagesFromGroup(Object groupId, Collection> messages) { + protected void doRemoveMessagesFromGroup(Object groupId, Collection> messages) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messages, "'messageToRemove' must not be null"); @@ -367,7 +368,7 @@ public Message getMessageFromGroup(Object groupId, UUID messageId) { } @Override - public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { + protected boolean doRemoveMessageFromGroupById(Object groupId, UUID messageId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Assert.notNull(messageId, "'messageId' must not be null"); return this.template.remove(whereMessageIdIsAndGroupIdIs(messageId, groupId), this.collectionName) @@ -375,7 +376,7 @@ public boolean removeMessageFromGroupById(Object groupId, UUID messageId) { } @Override - public void removeMessageGroup(Object groupId) { + protected void doRemoveMessageGroup(Object groupId) { this.template.remove(whereGroupIdIs(groupId), this.collectionName); } @@ -395,7 +396,7 @@ public Iterator iterator() { } @Override - public Message pollMessageFromGroup(final Object groupId) { + protected Message doPollMessageFromGroup(final Object groupId) { Assert.notNull(groupId, GROUP_ID_MUST_NOT_BE_NULL); Query query = whereGroupIdIs(groupId).with(Sort.by(GROUP_UPDATE_TIMESTAMP_KEY, SEQUENCE)); MessageWrapper messageWrapper = this.template.findAndRemove(query, MessageWrapper.class, this.collectionName); @@ -415,17 +416,17 @@ public int messageGroupSize(Object groupId) { } @Override - public void setGroupCondition(Object groupId, String condition) { + protected void doSetGroupCondition(Object groupId, String condition) { updateGroup(groupId, lastModifiedUpdate().set("_condition", condition)); } @Override - public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { + protected void doSetLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) { updateGroup(groupId, lastModifiedUpdate().set(LAST_RELEASED_SEQUENCE_NUMBER, sequenceNumber)); } @Override - public void completeGroup(Object groupId) { + protected void doCompleteGroup(Object groupId) { this.updateGroup(groupId, lastModifiedUpdate().set(GROUP_COMPLETE_KEY, true)); } diff --git a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java index 68bd2d112ce..e9b76d80dff 100644 --- a/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java +++ b/spring-integration-redis/src/test/java/org/springframework/integration/redis/store/RedisMessageGroupStoreTests.java @@ -493,6 +493,15 @@ public void removeMessagesFromGroupDontRemoveSameMessageInOtherGroup() { assertThat(store.messageGroupSize("2")).isEqualTo(1); } + @Test + public void testMessageGroupCondition() { + String groupId = "X"; + Message message = MessageBuilder.withPayload("foo").build(); + store.addMessagesToGroup(groupId, message); + store.setGroupCondition(groupId, "testCondition"); + assertThat(store.getMessageGroup(groupId).getCondition()).isEqualTo("testCondition"); + } + private record Foo(String foo) { } diff --git a/src/reference/antora/modules/ROOT/pages/message-store.adoc b/src/reference/antora/modules/ROOT/pages/message-store.adoc index 3688e452bdf..e44c256bdc0 100644 --- a/src/reference/antora/modules/ROOT/pages/message-store.adoc +++ b/src/reference/antora/modules/ROOT/pages/message-store.adoc @@ -158,3 +158,12 @@ It also allows the end marker to arrive at the aggregator before all the other r In addition, for configuration convenience, a `GroupConditionProvider` contract has been introduced. The `AbstractCorrelatingMessageHandler` checks if the provided `ReleaseStrategy` implements this interface and extracts a `conditionSupplier` for group condition evaluation logic. + +[[use-lock-registry]] +== Use LockRegistry + +Starting with version 6.5, the `MessageStore` abstraction operate the metadata of message group with lock. +This lock acquires the groupId and generated by `LockRegister`. +Its purpose is to operate on the atomicity of messages and message groups. +In multiple threads, adding or removing messages or updating metadata at the same time, some implementations may have message group errors if the lock is missing. +By default, `LockRegister` uses `DefaultLockRegistry`, and developers can set it to the implementation they need through `setLockRegistry` \ No newline at end of file