From 966338e95a1e63fddda0b8fef2c95c7597516be2 Mon Sep 17 00:00:00 2001 From: raylax Date: Wed, 17 Jan 2024 11:23:13 -0500 Subject: [PATCH] GH-2602: Fix `x-delay` header to `Long` Fixes: #2602 * Update deprecated API * Fix code style * Remove deprecated API usage * Some code clean of the affected classes --- .../amqp/core/MessageProperties.java | 82 ++++++++++++++++--- .../amqp/support/SimpleAmqpHeaderMapper.java | 9 +- .../amqp/core/MessagePropertiesTests.java | 9 +- .../support/SimpleAmqpHeaderMapperTests.java | 44 ++++++++-- .../DefaultMessagePropertiesConverter.java | 30 ++++--- .../core/RabbitAdminIntegrationTests.java | 32 ++++---- 6 files changed, 149 insertions(+), 57 deletions(-) diff --git a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java index eba31fb895..f999acc0a3 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/core/MessageProperties.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map; +import org.springframework.util.Assert; + /** * Message Properties for an AMQP message. * @@ -33,6 +35,7 @@ * @author Dmitry Chernyshov * @author Artem Bilan * @author Csaba Soti + * @author Raylax Grey */ public class MessageProperties implements Serializable { @@ -66,6 +69,12 @@ public class MessageProperties implements Serializable { public static final Integer DEFAULT_PRIORITY = 0; + /** + * The maximum value of x-delay header. + * @since 3.1.2 + */ + public static final long X_DELAY_MAX = 0xffffffffL; + private final Map headers = new HashMap<>(); private Date timestamp; @@ -118,7 +127,7 @@ public class MessageProperties implements Serializable { private String consumerQueue; - private Integer receivedDelay; + private Long receivedDelay; private MessageDeliveryMode receivedDeliveryMode; @@ -352,10 +361,13 @@ public String getReceivedRoutingKey() { * received message contains the delay. * @return the received delay. * @since 1.6 + * @deprecated in favor of {@link #getReceivedDelayLong()} * @see #getDelay() */ + @Deprecated(since = "3.1.2", forRemoval = true) public Integer getReceivedDelay() { - return this.receivedDelay; + Long receivedDelay = getReceivedDelayLong(); + return receivedDelay != null ? Math.toIntExact(receivedDelay) : null; } /** @@ -363,8 +375,32 @@ public Integer getReceivedDelay() { * received message contains the delay. * @param receivedDelay the received delay. * @since 1.6 + * @deprecated in favor of {@link #setReceivedDelayLong(Long)} */ + @Deprecated(since = "3.1.2", forRemoval = true) public void setReceivedDelay(Integer receivedDelay) { + setReceivedDelayLong(receivedDelay != null ? receivedDelay.longValue() : null); + } + + /** + * When a delayed message exchange is used the x-delay header on a + * received message contains the delay. + * @return the received delay. + * @since 3.1.2 + * @see #getDelayLong() + */ + public Long getReceivedDelayLong() { + return this.receivedDelay; + } + + /** + * When a delayed message exchange is used the x-delay header on a + * received message contains the delay. + * @param receivedDelay the received delay. + * @since 3.1.2 + * @see #setDelayLong(Long) + */ + public void setReceivedDelayLong(Long receivedDelay) { this.receivedDelay = receivedDelay; } @@ -434,12 +470,35 @@ public void setConsumerQueue(String consumerQueue) { * The x-delay header (outbound). * @return the delay. * @since 1.6 + * @deprecated in favor of {@link #getDelayLong()} * @see #getReceivedDelay() */ + @Deprecated(since = "3.1.2", forRemoval = true) public Integer getDelay() { + Long delay = getDelayLong(); + return delay != null ? Math.toIntExact(delay) : null; + } + + /** + * Set the x-delay header. + * @param delay the delay. + * @since 1.6 + * @deprecated in favor of {@link #setDelayLong(Long)} + */ + @Deprecated(since = "3.1.2", forRemoval = true) + public void setDelay(Integer delay) { + setDelayLong(delay != null ? delay.longValue() : null); + } + + /** + * Get the x-delay header long value. + * @return the delay. + * @since 3.1.2 + */ + public Long getDelayLong() { Object delay = this.headers.get(X_DELAY); - if (delay instanceof Integer) { - return (Integer) delay; + if (delay instanceof Long) { + return (Long) delay; } else { return null; @@ -447,17 +506,18 @@ public Integer getDelay() { } /** - * Set the x-delay header. + * Set the x-delay header to a long value. * @param delay the delay. - * @since 1.6 + * @since 3.1.2 */ - public void setDelay(Integer delay) { + public void setDelayLong(Long delay) { if (delay == null || delay < 0) { this.headers.remove(X_DELAY); + return; } - else { - this.headers.put(X_DELAY, delay); - } + + Assert.isTrue(delay <= X_DELAY_MAX, "Delay cannot exceed " + X_DELAY_MAX); + this.headers.put(X_DELAY, delay); } public boolean isFinalRetryForMessageWithNoId() { diff --git a/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java b/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java index 3031d3d87c..51af201fa8 100644 --- a/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java +++ b/spring-amqp/src/main/java/org/springframework/amqp/support/SimpleAmqpHeaderMapper.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,6 +48,7 @@ * @author Gary Russell * @author Artem Bilan * @author Stephane Nicoll + * @author Raylax Grey * @since 1.4 */ public class SimpleAmqpHeaderMapper extends AbstractHeaderMapper implements AmqpHeaderMapper { @@ -69,8 +70,8 @@ public void fromHeaders(MessageHeaders headers, MessageProperties amqpMessagePro amqpMessageProperties.setCorrelationId((String) correlationId); } javaUtils - .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Integer.class), - amqpMessageProperties::setDelay) + .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELAY, Long.class), + amqpMessageProperties::setDelayLong) .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.class), amqpMessageProperties::setDeliveryMode) .acceptIfNotNull(getHeaderIfAvailable(headers, AmqpHeaders.DELIVERY_TAG, Long.class), @@ -150,7 +151,7 @@ public MessageHeaders toHeaders(MessageProperties amqpMessageProperties) { javaUtils .acceptIfCondition(priority != null && priority > 0, AmqpMessageHeaderAccessor.PRIORITY, priority, putObject) - .acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelay(), putObject) + .acceptIfNotNull(AmqpHeaders.RECEIVED_DELAY, amqpMessageProperties.getReceivedDelayLong(), putObject) .acceptIfHasText(AmqpHeaders.RECEIVED_EXCHANGE, amqpMessageProperties.getReceivedExchange(), putString) .acceptIfHasText(AmqpHeaders.RECEIVED_ROUTING_KEY, amqpMessageProperties.getReceivedRoutingKey(), diff --git a/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java b/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java index 6a1df9d995..2d080e8ea4 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/core/MessagePropertiesTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -30,6 +30,7 @@ * @author Artem Bilan * @author Gary Russell * @author Csaba Soti + * @author Raylax Grey * */ public class MessagePropertiesTests { @@ -53,10 +54,10 @@ public void testReplyToNullByDefault() { @Test public void testDelayHeader() { MessageProperties properties = new MessageProperties(); - Integer delay = 100; - properties.setDelay(delay); + Long delay = 100L; + properties.setDelayLong(delay); assertThat(properties.getHeaders().get(MessageProperties.X_DELAY)).isEqualTo(delay); - properties.setDelay(null); + properties.setDelayLong(null); assertThat(properties.getHeaders().containsKey(MessageProperties.X_DELAY)).isFalse(); } diff --git a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java index 2e9c768279..2e4d846783 100644 --- a/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java +++ b/spring-amqp/src/test/java/org/springframework/amqp/support/SimpleAmqpHeaderMapperTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2019 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,6 +17,7 @@ package org.springframework.amqp.support; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.fail; import java.util.Date; @@ -37,13 +38,14 @@ * @author Mark Fisher * @author Gary Russell * @author Oleg Zhurakousky + * @author Raylax Grey */ public class SimpleAmqpHeaderMapperTests { @Test public void fromHeaders() { SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper(); - Map headerMap = new HashMap(); + Map headerMap = new HashMap<>(); headerMap.put(AmqpHeaders.APP_ID, "test.appId"); headerMap.put(AmqpHeaders.CLUSTER_ID, "test.clusterId"); headerMap.put(AmqpHeaders.CONTENT_ENCODING, "test.contentEncoding"); @@ -51,7 +53,7 @@ public void fromHeaders() { headerMap.put(AmqpHeaders.CONTENT_TYPE, "test.contentType"); String testCorrelationId = "foo"; headerMap.put(AmqpHeaders.CORRELATION_ID, testCorrelationId); - headerMap.put(AmqpHeaders.DELAY, 1234); + headerMap.put(AmqpHeaders.DELAY, 1234L); headerMap.put(AmqpHeaders.DELIVERY_MODE, MessageDeliveryMode.NON_PERSISTENT); headerMap.put(AmqpHeaders.DELIVERY_TAG, 1234L); headerMap.put(AmqpHeaders.EXPIRATION, "test.expiration"); @@ -92,13 +94,39 @@ public void fromHeaders() { assertThat(amqpProperties.getTimestamp()).isEqualTo(testTimestamp); assertThat(amqpProperties.getType()).isEqualTo("test.type"); assertThat(amqpProperties.getUserId()).isEqualTo("test.userId"); - assertThat(amqpProperties.getDelay()).isEqualTo(Integer.valueOf(1234)); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234)); } + @Test + public void fromHeadersWithLongDelay() { + SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper(); + Map headerMap = new HashMap<>(); + headerMap.put(AmqpHeaders.DELAY, 1234L); + MessageHeaders messageHeaders = new MessageHeaders(headerMap); + MessageProperties amqpProperties = new MessageProperties(); + headerMapper.fromHeaders(messageHeaders, amqpProperties); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(1234)); + + amqpProperties.setDelayLong(5678L); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(5678)); + + amqpProperties.setDelayLong(null); + assertThat(amqpProperties.getHeaders().containsKey(AmqpHeaders.DELAY)).isFalse(); + + amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX); + assertThat(amqpProperties.getDelayLong()).isEqualTo(Long.valueOf(MessageProperties.X_DELAY_MAX)); + + assertThatThrownBy(() -> amqpProperties.setDelayLong(MessageProperties.X_DELAY_MAX + 1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Delay cannot exceed"); + + } + + @Test public void fromHeadersWithContentTypeAsMediaType() { SimpleAmqpHeaderMapper headerMapper = new SimpleAmqpHeaderMapper(); - Map headerMap = new HashMap(); + Map headerMap = new HashMap<>(); headerMap.put(AmqpHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_HTML); @@ -126,7 +154,7 @@ public void toHeaders() { amqpProperties.setMessageCount(42); amqpProperties.setMessageId("test.messageId"); amqpProperties.setPriority(22); - amqpProperties.setReceivedDelay(1234); + amqpProperties.setReceivedDelayLong(1234L); amqpProperties.setReceivedExchange("test.receivedExchange"); amqpProperties.setReceivedRoutingKey("test.receivedRoutingKey"); amqpProperties.setRedelivered(true); @@ -151,7 +179,7 @@ public void toHeaders() { assertThat(headerMap.get(AmqpHeaders.EXPIRATION)).isEqualTo("test.expiration"); assertThat(headerMap.get(AmqpHeaders.MESSAGE_COUNT)).isEqualTo(42); assertThat(headerMap.get(AmqpHeaders.MESSAGE_ID)).isEqualTo("test.messageId"); - assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234); + assertThat(headerMap.get(AmqpHeaders.RECEIVED_DELAY)).isEqualTo(1234L); assertThat(headerMap.get(AmqpHeaders.RECEIVED_EXCHANGE)).isEqualTo("test.receivedExchange"); assertThat(headerMap.get(AmqpHeaders.RECEIVED_ROUTING_KEY)).isEqualTo("test.receivedRoutingKey"); assertThat(headerMap.get(AmqpHeaders.REPLY_TO)).isEqualTo("test.replyTo"); @@ -170,7 +198,7 @@ public void jsonTypeIdNotOverwritten() { Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter(); MessageProperties amqpProperties = new MessageProperties(); converter.toMessage("123", amqpProperties); - Map headerMap = new HashMap(); + Map headerMap = new HashMap<>(); headerMap.put("__TypeId__", "java.lang.Integer"); MessageHeaders messageHeaders = new MessageHeaders(headerMap); headerMapper.fromHeaders(messageHeaders, amqpProperties); diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java index 72053d2846..a90fae67c1 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/support/DefaultMessagePropertiesConverter.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,7 @@ * @author Mark Fisher * @author Gary Russell * @author Soeren Unruh + * @author Raylax Grey * @since 1.0 */ public class DefaultMessagePropertiesConverter implements MessagePropertiesConverter { @@ -83,8 +84,7 @@ public DefaultMessagePropertiesConverter(int longStringLimit, boolean convertLon } @Override - public MessageProperties toMessageProperties(final BasicProperties source, final Envelope envelope, - final String charset) { + public MessageProperties toMessageProperties(BasicProperties source, @Nullable Envelope envelope, String charset) { MessageProperties target = new MessageProperties(); Map headers = source.getHeaders(); if (!CollectionUtils.isEmpty(headers)) { @@ -92,8 +92,14 @@ public MessageProperties toMessageProperties(final BasicProperties source, final String key = entry.getKey(); if (MessageProperties.X_DELAY.equals(key)) { Object value = entry.getValue(); - if (value instanceof Integer integ) { - target.setReceivedDelay(integ); + if (value instanceof Integer intValue) { + long receivedDelayLongValue = intValue.longValue(); + target.setReceivedDelayLong(receivedDelayLongValue); + target.setHeader(key, receivedDelayLongValue); + } + else if (value instanceof Long longVal) { + target.setReceivedDelayLong(longVal); + target.setHeader(key, longVal); } } else { @@ -164,9 +170,9 @@ public BasicProperties fromMessageProperties(final MessageProperties source, fin private Map convertHeadersIfNecessary(Map headers) { if (CollectionUtils.isEmpty(headers)) { - return Collections.emptyMap(); + return Collections.emptyMap(); } - Map writableHeaders = new HashMap(); + Map writableHeaders = new HashMap<>(); for (Map.Entry entry : headers.entrySet()) { writableHeaders.put(entry.getKey(), this.convertHeaderValueIfNecessary(entry.getValue())); } @@ -200,7 +206,7 @@ else if (value instanceof Object[] array) { value = writableArray; } else if (value instanceof List) { - List writableList = new ArrayList(((List) value).size()); + List writableList = new ArrayList<>(((List) value).size()); for (Object listValue : (List) value) { writableList.add(convertHeaderValueIfNecessary(listValue)); } @@ -209,7 +215,7 @@ else if (value instanceof List) { else if (value instanceof Map) { @SuppressWarnings("unchecked") Map originalMap = (Map) value; - Map writableMap = new HashMap(originalMap.size()); + Map writableMap = new HashMap<>(originalMap.size()); for (Map.Entry entry : originalMap.entrySet()) { writableMap.put(entry.getKey(), this.convertHeaderValueIfNecessary(entry.getValue())); } @@ -222,7 +228,7 @@ else if (value instanceof Class clazz) { } /** - * Converts a LongString value to either a String or DataInputStream based on a + * Convert a LongString value to either a String or DataInputStream based on a * length-driven threshold. If the length is {@link #longStringLimit} bytes or less, a * String will be returned, otherwise a DataInputStream is returned or the {@link LongString} * is returned unconverted if {@link #convertLongLongStrings} is true. @@ -257,7 +263,7 @@ private Object convertLongStringIfNecessary(Object valueArg, String charset) { value = convertLongString(longStr, charset); } else if (value instanceof List) { - List convertedList = new ArrayList(((List) value).size()); + List convertedList = new ArrayList<>(((List) value).size()); for (Object listValue : (List) value) { convertedList.add(this.convertLongStringIfNecessary(listValue, charset)); } @@ -266,7 +272,7 @@ else if (value instanceof List) { else if (value instanceof Map) { @SuppressWarnings("unchecked") Map originalMap = (Map) value; - Map convertedMap = new HashMap(); + Map convertedMap = new HashMap<>(); for (Map.Entry entry : originalMap.entrySet()) { convertedMap.put(entry.getKey(), this.convertLongStringIfNecessary(entry.getValue(), charset)); } diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java index abf8f76cb3..aa2f95f648 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/core/RabbitAdminIntegrationTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2022 the original author or authors. + * Copyright 2002-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,12 +17,14 @@ package org.springframework.amqp.rabbit.core; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.awaitility.Awaitility.await; import java.io.IOException; import java.time.Duration; import java.util.Map; +import java.util.Objects; import java.util.UUID; import org.junit.jupiter.api.AfterEach; @@ -59,6 +61,7 @@ * @author Gary Russell * @author Gunnar Hillert * @author Artem Bilan + * @author Raylax Grey */ @RabbitAvailable(management = true) public class RabbitAdminIntegrationTests extends NeedsManagementTests { @@ -130,7 +133,7 @@ public void testDoubleDeclarationOfExclusiveQueue() { @Test public void testDoubleDeclarationOfAutodeleteQueue() { - // No error expected here: the queue is autodeleted when the last consumer is cancelled, but this one never has + // No error expected here: the queue is auto-deleted when the last consumer is cancelled, but this one never has // any consumers. CachingConnectionFactory connectionFactory1 = new CachingConnectionFactory(); connectionFactory1.setHost("localhost"); @@ -236,9 +239,7 @@ public void testSpringWithDefaultExchange() { context.getBeanFactory().registerSingleton("foo", exchange); rabbitAdmin.afterPropertiesSet(); - rabbitAdmin.initialize(); - - // Pass by virtue of RabbitMQ not firing a 403 reply code + assertThatNoException().isThrownBy(rabbitAdmin::initialize); } @Test @@ -367,7 +368,6 @@ public void testQueueDeclareBad() { this.rabbitAdmin.deleteQueue(queue.getName()); } - @SuppressWarnings("unchecked") @Test public void testDeclareDelayedExchange() throws Exception { DirectExchange exchange = new DirectExchange("test.delayed.exchange"); @@ -397,20 +397,20 @@ public void testDeclareDelayedExchange() throws Exception { RabbitTemplate template = new RabbitTemplate(this.connectionFactory); template.setReceiveTimeout(10000); template.convertAndSend(exchangeName, queue.getName(), "foo", message -> { - message.getMessageProperties().setDelay(1000); + message.getMessageProperties().setDelayLong(1000L); return message; }); MessageProperties properties = new MessageProperties(); - properties.setDelay(500); + properties.setDelayLong(500L); template.send(exchangeName, queue.getName(), MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build()); long t1 = System.currentTimeMillis(); Message received = template.receive(queue.getName()); assertThat(received).isNotNull(); - assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(500)); + assertThat(received.getMessageProperties().getDelayLong()).isEqualTo(500L); received = template.receive(queue.getName()); assertThat(received).isNotNull(); - assertThat(received.getMessageProperties().getReceivedDelay()).isEqualTo(Integer.valueOf(1000)); + assertThat(received.getMessageProperties().getDelayLong()).isEqualTo(1000L); assertThat(System.currentTimeMillis() - t1).isGreaterThan(950L); Map exchange2 = getExchange(exchangeName); @@ -421,9 +421,9 @@ public void testDeclareDelayedExchange() throws Exception { this.rabbitAdmin.deleteExchange(exchangeName); } - private Map getExchange(String exchangeName) throws Exception { + private Map getExchange(String exchangeName) { return await().pollDelay(Duration.ZERO) - .until(() -> exchangeInfo(exchangeName), exch -> exch != null); + .until(() -> exchangeInfo(exchangeName), Objects::nonNull); } /** @@ -437,18 +437,14 @@ private boolean queueExists(final Queue queue) throws Exception { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("localhost"); cf.setPort(BrokerTestUtils.getPort()); - Connection connection = cf.newConnection(); - Channel channel = connection.createChannel(); - try { + try (Connection connection = cf.newConnection()) { + Channel channel = connection.createChannel(); DeclareOk result = channel.queueDeclarePassive(queue.getName()); return result != null; } catch (IOException e) { return e.getCause().getMessage().contains("RESOURCE_LOCKED"); } - finally { - connection.close(); - } } }