From aeb88d01e9e37f47d5a332466980cdccb01c6f42 Mon Sep 17 00:00:00 2001 From: Soby Chacko Date: Thu, 21 Sep 2023 15:34:03 -0400 Subject: [PATCH] KafkaBinderTests cleanup - Remove deprecations - Other minor cleanup --- .../stream/binder/kafka/KafkaBinderTests.java | 278 +++++++++--------- 1 file changed, 139 insertions(+), 139 deletions(-) diff --git a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java index 701af2102..efd506055 100644 --- a/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java +++ b/binders/kafka-binder/spring-cloud-stream-binder-kafka/src/test/java/org/springframework/cloud/stream/binder/kafka/KafkaBinderTests.java @@ -255,7 +255,7 @@ private KafkaTestBinder getBinder( KafkaBinderConfigurationProperties kafkaBinderConfigurationProperties, DlqPartitionFunction dlqPartitionFunction, DlqDestinationResolver dlqDestinationResolver) { - KafkaTopicProvisioner provisioningProvider = new KafkaTopicProvisioner( + var provisioningProvider = new KafkaTopicProvisioner( kafkaBinderConfigurationProperties, new TestKafkaProperties(), prop -> { }); try { @@ -269,15 +269,15 @@ kafkaBinderConfigurationProperties, new TestKafkaProperties(), prop -> { } private KafkaBinderConfigurationProperties createConfigurationProperties() { - KafkaBinderConfigurationProperties binderConfiguration = new KafkaBinderConfigurationProperties( + var binderConfiguration = new KafkaBinderConfigurationProperties( new TestKafkaProperties()); BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses(); List bAddresses = new ArrayList<>(); - for (BrokerAddress bAddress : brokerAddresses) { + for (var bAddress : brokerAddresses) { bAddresses.add(bAddress.toString()); } - String[] foo = new String[bAddresses.size()]; + var foo = new String[bAddresses.size()]; binderConfiguration.setBrokers(bAddresses.toArray(foo)); return binderConfiguration; } @@ -289,7 +289,7 @@ private int partitionSize(String topic) { private void invokeCreateTopic(String topic, int partitions, int replicationFactor) throws Exception { - NewTopic newTopic = new NewTopic(topic, partitions, (short) replicationFactor); + var newTopic = new NewTopic(topic, partitions, (short) replicationFactor); CreateTopicsResult topics = adminClient .createTopics(Collections.singletonList(newTopic)); topics.all().get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.SECONDS); @@ -307,11 +307,11 @@ public void init() { } BrokerAddress[] brokerAddresses = embeddedKafka.getBrokerAddresses(); - List bAddresses = new ArrayList<>(); - for (BrokerAddress bAddress : brokerAddresses) { + var bAddresses = new ArrayList<>(); + for (var bAddress : brokerAddresses) { bAddresses.add(bAddress.toString()); } - String[] foo = new String[bAddresses.size()]; + var foo = new String[bAddresses.size()]; Map adminConfigs = new HashMap<>(); adminConfigs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, @@ -323,10 +323,10 @@ private int invokePartitionSize(String topic) throws Throwable { DescribeTopicsResult describeTopicsResult = adminClient .describeTopics(Collections.singletonList(topic)); - KafkaFuture> all = describeTopicsResult.all(); + KafkaFuture> all = describeTopicsResult.allTopicNames(); Map stringTopicDescriptionMap = all .get(DEFAULT_OPERATION_TIMEOUT, TimeUnit.SECONDS); - TopicDescription topicDescription = stringTopicDescriptionMap.get(topic); + var topicDescription = stringTopicDescriptionMap.get(topic); return topicDescription.partitions().size(); } @@ -417,14 +417,14 @@ void testDefaultHeaderMapper() throws Exception { consumerProperties); binderBindUnbindLatency(); - final Pojo pojoHeader = new Pojo("testing"); - Message message = org.springframework.integration.support.MessageBuilder + var pojoHeader = new Pojo("testing"); + var message = org.springframework.integration.support.MessageBuilder .withPayload("foo") .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) .setHeader("foo", pojoHeader).build(); moduleOutputChannel.send(message); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { @@ -445,7 +445,7 @@ void testDefaultHeaderMapper() throws Exception { .isEqualTo(MimeTypeUtils.TEXT_PLAIN); Assertions.assertThat(inboundMessageRef.get().getHeaders().get("foo")) .isInstanceOf(Pojo.class); - Pojo actual = (Pojo) inboundMessageRef.get().getHeaders().get("foo"); + var actual = (Pojo) inboundMessageRef.get().getHeaders().get("foo"); Assertions.assertThat(actual.field).isEqualTo(pojoHeader.field); producerBinding.unbind(); consumerBinding.unbind(); @@ -458,7 +458,7 @@ void testCustomHeaderMapper() throws Exception { KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); binderConfiguration.setHeaderMapperBeanName("headerMapper"); - KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner( + var kafkaTopicProvisioner = new KafkaTopicProvisioner( binderConfiguration, new TestKafkaProperties(), prop -> { }); try { @@ -467,7 +467,7 @@ binderConfiguration, new TestKafkaProperties(), prop -> { catch (Exception e) { throw new RuntimeException(e); } - KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner); + var binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner); ((GenericApplicationContext) binder.getApplicationContext()).registerBean("headerMapper", KafkaHeaderMapper.class, () -> new KafkaHeaderMapper() { @Override @@ -502,12 +502,12 @@ public void toHeaders(Headers source, Map target) { consumerProperties); binderBindUnbindLatency(); - Message message = org.springframework.integration.support.MessageBuilder + var message = org.springframework.integration.support.MessageBuilder .withPayload("foo") .setHeader("foo", MimeTypeUtils.TEXT_PLAIN).build(); moduleOutputChannel.send(message); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { @@ -536,7 +536,7 @@ void testWellKnownHeaderMapperWithBeanNameKafkaHeaderMapper() throws Exception { KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); - KafkaTopicProvisioner kafkaTopicProvisioner = new KafkaTopicProvisioner( + var kafkaTopicProvisioner = new KafkaTopicProvisioner( binderConfiguration, new TestKafkaProperties(), prop -> { }); try { @@ -545,7 +545,7 @@ binderConfiguration, new TestKafkaProperties(), prop -> { catch (Exception e) { throw new RuntimeException(e); } - KafkaTestBinder binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner); + var binder = new KafkaTestBinder(binderConfiguration, kafkaTopicProvisioner); ((GenericApplicationContext) binder.getApplicationContext()).registerBean("kafkaBinderHeaderMapper", KafkaHeaderMapper.class, () -> new BinderHeaderMapper() { @Override @@ -581,12 +581,12 @@ public void toHeaders(Headers source, Map target) { consumerProperties); binderBindUnbindLatency(); - Message message = org.springframework.integration.support.MessageBuilder + var message = org.springframework.integration.support.MessageBuilder .withPayload("foo") .setHeader("foo", MimeTypeUtils.TEXT_PLAIN).build(); moduleOutputChannel.send(message); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { @@ -635,11 +635,11 @@ public void testSendAndReceiveNoOriginalContentType(TestInfo testInfo) throws Ex // TODO: Will have to fix the MimeType to convert to byte array once this issue // has been resolved: // https://github.com/spring-projects/spring-kafka/issues/424 - Message message = org.springframework.integration.support.MessageBuilder + var message = org.springframework.integration.support.MessageBuilder .withPayload("foo") .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN).build(); moduleOutputChannel.send(message); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { @@ -672,7 +672,7 @@ public void testSendAndReceive(TestInfo testInfo) throws Exception { DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); - MessagingMessageConverter mmc = new MessagingMessageConverter(); + var mmc = new MessagingMessageConverter(); ((GenericApplicationContext) ((KafkaTestBinder) binder).getApplicationContext()) .registerBean("tSARmmc", MessagingMessageConverter.class, () -> mmc); consumerProperties.getExtension().setConverterBeanName("tSARmmc"); @@ -683,7 +683,7 @@ public void testSendAndReceive(TestInfo testInfo) throws Exception { "testSendAndReceive", moduleInputChannel, consumerProperties); assertThat(KafkaTestUtils.getPropertyValue(consumerBinding, "lifecycle.messageListenerContainer.applicationContext")).isNotNull(); - Message message = org.springframework.integration.support.MessageBuilder + var message = org.springframework.integration.support.MessageBuilder .withPayload("foo".getBytes(StandardCharsets.UTF_8)) .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_OCTET_STREAM) @@ -692,7 +692,7 @@ public void testSendAndReceive(TestInfo testInfo) throws Exception { // Let the consumer actually bind to the producer before sending a msg binderBindUnbindLatency(); moduleOutputChannel.send(message); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { @@ -813,7 +813,7 @@ void testSendAndReceiveBatchWithDlqEnabled() throws Exception { Binding consumerBinding = binder.bindConsumer("tsarbwde.batching", "testSendAndReceiveBatch", moduleInputChannel, consumerProperties); - QueueChannel dlqChannel = new QueueChannel(); + var dlqChannel = new QueueChannel(); ExtendedConsumerProperties dlqConsumerProperties = createConsumerProperties(); Binding dlqConsumerBinding = binder.bindConsumer( "tsarbwde-dlq-topic", null, dlqChannel, @@ -822,11 +822,11 @@ void testSendAndReceiveBatchWithDlqEnabled() throws Exception { // Let the consumer actually bind to the producer before sending a msg binderBindUnbindLatency(); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); - String testMessagePayload = "test." + UUID.randomUUID(); - Message message = org.springframework.integration.support.MessageBuilder + var testMessagePayload = "test." + UUID.randomUUID(); + var message = org.springframework.integration.support.MessageBuilder .withPayload(testMessagePayload.getBytes(StandardCharsets.UTF_8)) .setHeader(KafkaHeaders.PARTITION, 0) .build(); @@ -885,11 +885,11 @@ void testDlqWithNativeSerializationEnabledOnDlqProducer() throws Exception { // Let the consumer actually bind to the producer before sending a msg binderBindUnbindLatency(); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); // Consumer for the DLQ destination - QueueChannel dlqChannel = new QueueChannel(); + var dlqChannel = new QueueChannel(); ExtendedConsumerProperties dlqConsumerProperties = createConsumerProperties(); dlqConsumerProperties.setMaxAttempts(1); @@ -964,11 +964,11 @@ void testDlqWithNativeDecodingOnConsumerButMissingSerializerOnDlqProducer() // Let the consumer actually bind to the producer before sending a msg binderBindUnbindLatency(); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); // Consumer for the DLQ destination - QueueChannel dlqChannel = new QueueChannel(); + var dlqChannel = new QueueChannel(); ExtendedConsumerProperties dlqConsumerProperties = createConsumerProperties(); dlqConsumerProperties.setMaxAttempts(1); @@ -1034,11 +1034,11 @@ void testDlqWithProducerPropertiesSetAtBinderLevel() // Let the consumer actually bind to the producer before sending a msg binderBindUnbindLatency(); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); // Consumer for the DLQ destination - QueueChannel dlqChannel = new QueueChannel(); + var dlqChannel = new QueueChannel(); ExtendedConsumerProperties dlqConsumerProperties = createConsumerProperties(); dlqConsumerProperties.setMaxAttempts(1); @@ -1149,8 +1149,8 @@ else if (dlqPartitions == null) { DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); - QueueChannel dlqChannel = new QueueChannel(); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var dlqChannel = new QueueChannel(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); long uniqueBindingId = System.currentTimeMillis(); @@ -1178,12 +1178,12 @@ else if (dlqPartitions == null) { embeddedKafka.getBrokersAsString()))) { if (useDlqDestResolver) { List nonProvisionedDlqTopics = new ArrayList<>(); - NewTopic nTopic = new NewTopic(dlqTopic, 3, (short) 1); + var nTopic = new NewTopic(dlqTopic, 3, (short) 1); nonProvisionedDlqTopics.add(nTopic); admin.createTopics(nonProvisionedDlqTopics); } Map topicDescriptions = admin.describeTopics(Collections.singletonList(dlqTopic)) - .all() + .allTopicNames() .get(10, TimeUnit.SECONDS); assertThat(topicDescriptions).hasSize(1); assertThat(topicDescriptions.values().iterator().next().partitions()) @@ -1204,8 +1204,8 @@ else if (dlqPartitions == null) { SubscribableChannel.class); final AtomicReference> boundErrorChannelMessage = new AtomicReference<>(); final AtomicReference> globalErrorChannelMessage = new AtomicReference<>(); - final AtomicBoolean hasRecovererInCallStack = new AtomicBoolean(!withRetry); - final AtomicBoolean hasAfterRollbackProcessorInStack = new AtomicBoolean(!withRetry); + final var hasRecovererInCallStack = new AtomicBoolean(!withRetry); + final var hasAfterRollbackProcessorInStack = new AtomicBoolean(!withRetry); boundErrorChannel.subscribe(message -> { boundErrorChannelMessage.set(message); String stackTrace = Arrays.toString(new RuntimeException().getStackTrace()); @@ -1350,7 +1350,7 @@ void testDefaultAutoCommitOnErrorWithDlq() throws Exception { DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); long uniqueBindingId = System.currentTimeMillis(); Binding producerBinding = binder.bindProducer( @@ -1361,7 +1361,7 @@ void testDefaultAutoCommitOnErrorWithDlq() throws Exception { consumerProperties); ExtendedConsumerProperties dlqConsumerProperties = createConsumerProperties(); dlqConsumerProperties.setMaxAttempts(1); - QueueChannel dlqChannel = new QueueChannel(); + var dlqChannel = new QueueChannel(); Binding dlqConsumerBinding = binder.bindConsumer( "error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties); @@ -1390,7 +1390,7 @@ void testDefaultAutoCommitOnErrorWithDlq() throws Exception { consumerBinding.unbind(); // on the second attempt the message is not redelivered because the DLQ is set - QueueChannel successfulInputChannel = new QueueChannel(); + var successfulInputChannel = new QueueChannel(); consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup", successfulInputChannel, consumerProperties); String testMessage2Payload = "test." + UUID.randomUUID().toString(); @@ -1426,7 +1426,7 @@ void testRetriesWithoutDlq() throws Exception { DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); long uniqueBindingId = System.currentTimeMillis(); Binding producerBinding = binder.bindProducer( @@ -1463,7 +1463,7 @@ void testCommonErrorHandlerBeanNameOnConsumerBinding() throws Exception { DirectChannel moduleOutputChannel = createBindableChannel("output", producerBindingProperties); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); CommonErrorHandler commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0L)) { @Override public void handleRemaining(Exception thrownException, List> records, @@ -1486,7 +1486,7 @@ public void handleRemaining(Exception thrownException, List DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); long uniqueBindingId = System.currentTimeMillis(); Binding producerBinding = binder.bindProducer( @@ -1538,7 +1538,7 @@ void testAutoCommitOnErrorWhenManualAcknowledgement() throws Exception { DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); long uniqueBindingId = System.currentTimeMillis(); Binding producerBinding = binder.bindProducer( @@ -1549,7 +1549,7 @@ void testAutoCommitOnErrorWhenManualAcknowledgement() throws Exception { consumerProperties); ExtendedConsumerProperties dlqConsumerProperties = createConsumerProperties(); dlqConsumerProperties.setMaxAttempts(1); - QueueChannel dlqChannel = new QueueChannel(); + var dlqChannel = new QueueChannel(); Binding dlqConsumerBinding = binder.bindConsumer( "error.retryTest." + uniqueBindingId + ".0.testGroup", null, dlqChannel, dlqConsumerProperties); @@ -1578,7 +1578,7 @@ void testAutoCommitOnErrorWhenManualAcknowledgement() throws Exception { consumerBinding.unbind(); // on the second attempt the message is not redelivered because the DLQ is set and the record in error is already committed. - QueueChannel successfulInputChannel = new QueueChannel(); + var successfulInputChannel = new QueueChannel(); consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup", successfulInputChannel, consumerProperties); String testMessage2Payload = "test1." + UUID.randomUUID().toString(); @@ -1618,7 +1618,7 @@ void testConfigurableDlqName() throws Exception { DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); - FailingInvocationCountingMessageHandler handler = new FailingInvocationCountingMessageHandler(); + var handler = new FailingInvocationCountingMessageHandler(); moduleInputChannel.subscribe(handler); long uniqueBindingId = System.currentTimeMillis(); @@ -1630,7 +1630,7 @@ void testConfigurableDlqName() throws Exception { consumerProperties); ExtendedConsumerProperties dlqConsumerProperties = createConsumerProperties(); dlqConsumerProperties.setMaxAttempts(1); - QueueChannel dlqChannel = new QueueChannel(); + var dlqChannel = new QueueChannel(); Binding dlqConsumerBinding = binder.bindConsumer(dlqName, null, dlqChannel, dlqConsumerProperties); @@ -1658,7 +1658,7 @@ void testConfigurableDlqName() throws Exception { consumerBinding.unbind(); // on the second attempt the message is not redelivered because the DLQ is set - QueueChannel successfulInputChannel = new QueueChannel(); + var successfulInputChannel = new QueueChannel(); consumerBinding = binder.bindConsumer("retryTest." + uniqueBindingId + ".0", "testGroup", successfulInputChannel, consumerProperties); String testMessage2Payload = "test." + UUID.randomUUID().toString(); @@ -1697,7 +1697,7 @@ void testCompression() throws Exception { KafkaProducerProperties.CompressionType.gzip, KafkaProducerProperties.CompressionType.snappy, KafkaProducerProperties.CompressionType.zstd}; - byte[] testPayload = new byte[2048]; + var testPayload = new byte[2048]; Arrays.fill(testPayload, (byte) 65); Binder binder = getBinder(); for (KafkaProducerProperties.CompressionType codec : codecs) { @@ -1722,7 +1722,7 @@ void testCompression() throws Exception { // Let the consumer actually bind to the producer before sending a msg binderBindUnbindLatency(); moduleOutputChannel.send(message); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { @@ -1772,7 +1772,7 @@ void testEarliest() throws Exception { consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1, consumerProperties); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef1 = new AtomicReference<>(); MessageHandler messageHandler = message1 -> { try { @@ -1789,7 +1789,7 @@ void testEarliest() throws Exception { input1.unsubscribe(messageHandler); output.send(new GenericMessage<>(testPayload2.getBytes())); - CountDownLatch latch1 = new CountDownLatch(1); + var latch1 = new CountDownLatch(1); AtomicReference> inboundMessageRef2 = new AtomicReference<>(); input1.subscribe(message1 -> { try { @@ -1829,7 +1829,7 @@ public void testSendAndReceiveMultipleTopics(TestInfo testInfo) throws Exception DirectChannel moduleOutputChannel2 = createBindableChannel("output2", createProducerBindingProperties(createProducerProperties())); - QueueChannel moduleInputChannel = new QueueChannel(); + var moduleInputChannel = new QueueChannel(); ExtendedProducerProperties producer1Props = createProducerProperties(); producer1Props.getExtension().setUseTopicHeader(true); @@ -1897,7 +1897,7 @@ void testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff() throws Exception { DirectChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(createProducerProperties())); - QueueChannel moduleInputChannel = new QueueChannel(); + var moduleInputChannel = new QueueChannel(); Binding producerBinding = binder.bindProducer( "testManualAckSucceedsWhenAutoCommitOffsetIsTurnedOff", @@ -1944,7 +1944,7 @@ void testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder() DirectChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(createProducerProperties())); - QueueChannel moduleInputChannel = new QueueChannel(); + var moduleInputChannel = new QueueChannel(); Binding producerBinding = binder.bindProducer( "testManualAckIsNotPossibleWhenAutoCommitOffsetIsEnabledOnTheBinder", @@ -1999,13 +1999,13 @@ public void testTwoRequiredGroups(TestInfo testInfo) throws Exception { String testPayload = "foo-" + UUID.randomUUID().toString(); output.send(new GenericMessage<>(testPayload.getBytes())); - QueueChannel inbound1 = new QueueChannel(); + var inbound1 = new QueueChannel(); ExtendedConsumerProperties consumerProperties = createConsumerProperties(); consumerProperties.getExtension().setAutoRebalanceEnabled(false); consumerProperties.getExtension().setAckMode(ContainerProperties.AckMode.RECORD); Binding consumerBinding1 = binder.bindConsumer(testDestination, "test1", inbound1, consumerProperties); - QueueChannel inbound2 = new QueueChannel(); + var inbound2 = new QueueChannel(); Binding consumerBinding2 = binder.bindConsumer(testDestination, "test2", inbound2, consumerProperties); @@ -2041,17 +2041,17 @@ public void testPartitionedModuleSpEL(TestInfo testInfo) throws Exception { consumerProperties.setInstanceCount(3); consumerProperties.setPartitioned(true); consumerProperties.getExtension().setAutoRebalanceEnabled(false); - QueueChannel input0 = new QueueChannel(); + var input0 = new QueueChannel(); input0.setBeanName("test.input0S"); Binding input0Binding = binder.bindConsumer("part.0", "test", input0, consumerProperties); consumerProperties.setInstanceIndex(1); - QueueChannel input1 = new QueueChannel(); + var input1 = new QueueChannel(); input1.setBeanName("test.input1S"); Binding input1Binding = binder.bindConsumer("part.0", "test", input1, consumerProperties); consumerProperties.setInstanceIndex(2); - QueueChannel input2 = new QueueChannel(); + var input2 = new QueueChannel(); input2.setBeanName("test.input2S"); Binding input2Binding = binder.bindConsumer("part.0", "test", input2, consumerProperties); @@ -2078,7 +2078,7 @@ public void testPartitionedModuleSpEL(TestInfo testInfo) throws Exception { catch (UnsupportedOperationException ignored) { } List interceptors = output.getInterceptors(); - AtomicInteger count = new AtomicInteger(); + var count = new AtomicInteger(); interceptors.forEach(interceptor -> { if (interceptor instanceof PartitioningInterceptor) { count.set(TestUtils.getPropertyValue(interceptor, @@ -2114,7 +2114,7 @@ public boolean matches(Message value) { } }; - ObjectMapper om = new ObjectMapper(); + var om = new ObjectMapper(); if (usesExplicitRouting()) { assertThat(om.readValue((byte[]) receive0.getPayload(), Integer.class)) @@ -2168,22 +2168,22 @@ void testPartitionedModuleJava() throws Exception { consumerProperties.setInstanceIndex(0); consumerProperties.setPartitioned(true); consumerProperties.getExtension().setAutoRebalanceEnabled(false); - QueueChannel input0 = new QueueChannel(); + var input0 = new QueueChannel(); input0.setBeanName("test.input0J"); Binding input0Binding = binder.bindConsumer("partJ.0", "test", input0, consumerProperties); consumerProperties.setInstanceIndex(1); - QueueChannel input1 = new QueueChannel(); + var input1 = new QueueChannel(); input1.setBeanName("test.input1J"); Binding input1Binding = binder.bindConsumer("partJ.0", "test", input1, consumerProperties); consumerProperties.setInstanceIndex(2); - QueueChannel input2 = new QueueChannel(); + var input2 = new QueueChannel(); input2.setBeanName("test.input2J"); Binding input2Binding = binder.bindConsumer("partJ.0", "test", input2, consumerProperties); consumerProperties.setInstanceIndex(3); - QueueChannel input3 = new QueueChannel(); + var input3 = new QueueChannel(); input3.setBeanName("test.input3J"); Binding input3Binding = binder.bindConsumer("partJ.0", "test", input3, consumerProperties); @@ -2222,7 +2222,7 @@ void testPartitionedModuleJava() throws Exception { assertThat(receive2).isNotNull(); Message receive3 = receive(input3); assertThat(receive3).isNotNull(); - ObjectMapper om = new ObjectMapper(); + var om = new ObjectMapper(); assertThat(om.readValue((byte[]) receive0.getPayload(), Integer.class)) .isEqualTo(0); @@ -2251,17 +2251,17 @@ public void testAnonymousGroup(TestInfo testInfo) throws Exception { Binding producerBinding = binder.bindProducer("defaultGroup.0", output, producerBindingProperties.getProducer()); - QueueChannel input1 = new QueueChannel(); + var input1 = new QueueChannel(); ExtendedConsumerProperties consumerProperties = createConsumerProperties(); Binding binding1 = binder.bindConsumer("defaultGroup.0", null, input1, consumerProperties); - QueueChannel input2 = new QueueChannel(); + var input2 = new QueueChannel(); Binding binding2 = binder.bindConsumer("defaultGroup.0", null, input2, consumerProperties); // Since we don't provide any topic info, let Kafka bind the consumer successfully Thread.sleep(1000); - String testPayload1 = "foo-" + UUID.randomUUID().toString(); + var testPayload1 = "foo-" + UUID.randomUUID(); output.send(new GenericMessage<>(testPayload1.getBytes())); Message receivedMessage1 = (Message) receive(input1); @@ -2339,17 +2339,17 @@ void testPartitionedModuleJavaWithRawMode() throws Exception { consumerProperties.setPartitioned(true); consumerProperties.setHeaderMode(HeaderMode.none); consumerProperties.getExtension().setAutoRebalanceEnabled(false); - QueueChannel input0 = new QueueChannel(); + var input0 = new QueueChannel(); input0.setBeanName("test.input0J"); Binding input0Binding = binder.bindConsumer("partJ.raw.0", "test", input0, consumerProperties); consumerProperties.setInstanceIndex(1); - QueueChannel input1 = new QueueChannel(); + var input1 = new QueueChannel(); input1.setBeanName("test.input1J"); Binding input1Binding = binder.bindConsumer("partJ.raw.0", "test", input1, consumerProperties); consumerProperties.setInstanceIndex(2); - QueueChannel input2 = new QueueChannel(); + var input2 = new QueueChannel(); input2.setBeanName("test.input2J"); Binding input2Binding = binder.bindConsumer("partJ.raw.0", "test", input2, consumerProperties); @@ -2408,17 +2408,17 @@ void testPartitionedModuleSpELWithRawMode() throws Exception { consumerProperties.setPartitioned(true); consumerProperties.setHeaderMode(HeaderMode.none); consumerProperties.getExtension().setAutoRebalanceEnabled(false); - QueueChannel input0 = new QueueChannel(); + var input0 = new QueueChannel(); input0.setBeanName("test.input0S"); Binding input0Binding = binder.bindConsumer("part.raw.0", "test", input0, consumerProperties); consumerProperties.setInstanceIndex(1); - QueueChannel input1 = new QueueChannel(); + var input1 = new QueueChannel(); input1.setBeanName("test.input1S"); Binding input1Binding = binder.bindConsumer("part.raw.0", "test", input1, consumerProperties); consumerProperties.setInstanceIndex(2); - QueueChannel input2 = new QueueChannel(); + var input2 = new QueueChannel(); input2.setBeanName("test.input2S"); Binding input2Binding = binder.bindConsumer("part.raw.0", "test", input2, consumerProperties); @@ -2461,7 +2461,7 @@ void testPartitionedNative() throws Exception { output, properties); ExtendedConsumerProperties consumerProperties = createConsumerProperties(); - QueueChannel input0 = new QueueChannel(); + var input0 = new QueueChannel(); input0.setBeanName("test.inputNative"); Binding inputBinding = binder.bindConsumer("partNative.raw.0", "test", input0, consumerProperties); @@ -2505,7 +2505,7 @@ void testSendAndReceiveWithRawMode() throws Exception { binderBindUnbindLatency(); moduleOutputChannel.send(message); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { @@ -2540,7 +2540,7 @@ void testAllowNonTransactionalProducerSetting() throws Exception { KafkaProducerMessageHandler endpoint = TestUtils.getPropertyValue(producerBinding, "lifecycle", KafkaProducerMessageHandler.class); - final KafkaTemplate kafkaTemplate = (KafkaTemplate) new DirectFieldAccessor(endpoint).getPropertyValue("kafkaTemplate"); + final var kafkaTemplate = (KafkaTemplate) new DirectFieldAccessor(endpoint).getPropertyValue("kafkaTemplate"); assertThat(kafkaTemplate.isAllowNonTransactional()).isTrue(); @@ -2564,7 +2564,7 @@ void testProducerErrorChannel() throws Exception { .setHeader(MessageHeaders.CONTENT_TYPE, "application/json").build(); SubscribableChannel ec = binder.getApplicationContext().getBean(binder.getBinder().getBinderIdentity() + ".foobar.errors", SubscribableChannel.class); final AtomicReference> errorMessage = new AtomicReference<>(); - final CountDownLatch latch = new CountDownLatch(2); + final var latch = new CountDownLatch(2); ec.subscribe(message1 -> { errorMessage.set(message1); latch.countDown(); @@ -2575,7 +2575,7 @@ void testProducerErrorChannel() throws Exception { globalEc.subscribe(message12 -> latch.countDown()); KafkaProducerMessageHandler endpoint = TestUtils.getPropertyValue(producerBinding, "lifecycle", KafkaProducerMessageHandler.class); - final RuntimeException fooException = new RuntimeException("foo"); + final var fooException = new RuntimeException("foo"); final AtomicReference sent = new AtomicReference<>(); new DirectFieldAccessor(endpoint).setPropertyValue("kafkaTemplate", new KafkaTemplate(mock(ProducerFactory.class)) { @@ -2631,7 +2631,7 @@ void testAutoCreateTopicsEnabledSucceeds() throws Exception { @Test @SuppressWarnings("unchecked") void testCustomPartitionCountOverridesDefaultIfLarger() throws Exception { - byte[] testPayload = new byte[2048]; + var testPayload = new byte[2048]; Arrays.fill(testPayload, (byte) 65); KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); binderConfiguration.setMinPartitionCount(10); @@ -2669,12 +2669,12 @@ void testCustomPartitionCountOverridesDefaultIfLarger() throws Exception { @SuppressWarnings("unchecked") void testCustomPartitionCountDoesNotOverridePartitioningIfSmaller() throws Exception { - byte[] testPayload = new byte[2048]; + var testPayload = new byte[2048]; Arrays.fill(testPayload, (byte) 65); KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); binderConfiguration.setMinPartitionCount(6); Binder binder = getBinder(binderConfiguration); - QueueChannel moduleInputChannel = new QueueChannel(); + var moduleInputChannel = new QueueChannel(); ExtendedProducerProperties producerProperties = createProducerProperties(); producerProperties.setPartitionCount(5); producerProperties.setPartitionKeyExpression( @@ -2707,7 +2707,7 @@ void testCustomPartitionCountDoesNotOverridePartitioningIfSmaller() @SuppressWarnings("unchecked") void testDynamicKeyExpression() throws Exception { Binder binder = getBinder(createConfigurationProperties()); - QueueChannel moduleInputChannel = new QueueChannel(); + var moduleInputChannel = new QueueChannel(); ExtendedProducerProperties producerProperties = createProducerProperties(); producerProperties.getExtension().getConfiguration().put("key.serializer", StringSerializer.class.getName()); @@ -2730,7 +2730,7 @@ void testDynamicKeyExpression() throws Exception { moduleOutputChannel.send(message); Message inbound = receive(moduleInputChannel); assertThat(inbound).isNotNull(); - String receivedKey = new String(inbound.getHeaders() + var receivedKey = new String(inbound.getHeaders() .get(KafkaHeaders.RECEIVED_KEY, byte[].class)); assertThat(receivedKey).isEqualTo("myDynamicKey"); producerBinding.unbind(); @@ -2740,13 +2740,13 @@ void testDynamicKeyExpression() throws Exception { @Test @SuppressWarnings("unchecked") void testCustomPartitionCountOverridesPartitioningIfLarger() throws Exception { - byte[] testPayload = new byte[2048]; + var testPayload = new byte[2048]; Arrays.fill(testPayload, (byte) 65); KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); binderConfiguration.setMinPartitionCount(4); Binder binder = getBinder(binderConfiguration); - QueueChannel moduleInputChannel = new QueueChannel(); + var moduleInputChannel = new QueueChannel(); ExtendedProducerProperties producerProperties = createProducerProperties(); producerProperties.setPartitionCount(5); producerProperties.setPartitionKeyExpression( @@ -2797,7 +2797,7 @@ void testDefaultConsumerStartsAtEarliest() throws Exception { Binding consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1, consumerProperties); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef1 = new AtomicReference<>(); MessageHandler messageHandler = message1 -> { try { @@ -2818,7 +2818,7 @@ void testDefaultConsumerStartsAtEarliest() throws Exception { input1.unsubscribe(messageHandler); output.send(new GenericMessage<>(testPayload2.getBytes())); - CountDownLatch latch1 = new CountDownLatch(1); + var latch1 = new CountDownLatch(1); AtomicReference> inboundMessageRef2 = new AtomicReference<>(); input1.subscribe(message1 -> { try { @@ -2863,7 +2863,7 @@ void testResume() throws Exception { ExtendedConsumerProperties firstConsumerProperties = createConsumerProperties(); consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1, firstConsumerProperties); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef1 = new AtomicReference<>(); MessageHandler messageHandler = message1 -> { try { @@ -2881,7 +2881,7 @@ void testResume() throws Exception { assertThat(inboundMessageRef1.get()).isNotNull(); assertThat(inboundMessageRef1.get().getPayload()).isNotNull(); input1.unsubscribe(messageHandler); - CountDownLatch latch1 = new CountDownLatch(1); + var latch1 = new CountDownLatch(1); AtomicReference> inboundMessageRef2 = new AtomicReference<>(); MessageHandler messageHandler1 = message1 -> { try { @@ -2905,7 +2905,7 @@ void testResume() throws Exception { consumerBinding = binder.bindConsumer(testTopicName, "startOffsets", input1, consumerProperties); input1.unsubscribe(messageHandler1); - CountDownLatch latch2 = new CountDownLatch(1); + var latch2 = new CountDownLatch(1); AtomicReference> inboundMessageRef3 = new AtomicReference<>(); MessageHandler messageHandler2 = message1 -> { try { @@ -2938,13 +2938,13 @@ void testResume() throws Exception { @SuppressWarnings("unchecked") void testSyncProducerMetadata() throws Exception { Binder binder = getBinder(createConfigurationProperties()); - DirectChannel output = new DirectChannel(); + var output = new DirectChannel(); String testTopicName = UUID.randomUUID().toString(); ExtendedProducerProperties properties = createProducerProperties(); properties.getExtension().setSync(true); Binding producerBinding = binder.bindProducer(testTopicName, output, properties); - DirectFieldAccessor accessor = new DirectFieldAccessor( + var accessor = new DirectFieldAccessor( extractEndpoint(producerBinding)); KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor .getWrappedInstance(); @@ -2958,16 +2958,16 @@ void testSyncProducerMetadata() throws Exception { @SuppressWarnings("unchecked") void testSendTimeoutExpressionProducerMetadata() throws Exception { Binder binder = getBinder(createConfigurationProperties()); - DirectChannel output = new DirectChannel(); + var output = new DirectChannel(); String testTopicName = UUID.randomUUID().toString(); ExtendedProducerProperties properties = createProducerProperties(); properties.getExtension().setSync(true); - SpelExpressionParser parser = new SpelExpressionParser(); + var parser = new SpelExpressionParser(); Expression sendTimeoutExpression = parser.parseExpression("5000"); properties.getExtension().setSendTimeoutExpression(sendTimeoutExpression); Binding producerBinding = binder.bindProducer(testTopicName, output, properties); - DirectFieldAccessor accessor = new DirectFieldAccessor( + var accessor = new DirectFieldAccessor( extractEndpoint(producerBinding)); KafkaProducerMessageHandler wrappedInstance = (KafkaProducerMessageHandler) accessor .getWrappedInstance(); @@ -3002,7 +3002,7 @@ void testAutoCreateTopicsDisabledOnBinderStillWorksAsLongAsBrokerCreatesTopic() Binding consumerBinding = binder.bindConsumer(testTopicName, "test", input, consumerProperties); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef = new AtomicReference<>(); input.subscribe(message1 -> { try { @@ -3072,7 +3072,7 @@ void testAutoAddPartitionsDisabledSucceedsIfTopicUnderPartitionedAndAutoRebalanc invokeCreateTopic(testTopicName, 1, 1); configurationProperties.setAutoAddPartitions(false); Binder binder = getBinder(configurationProperties); - GenericApplicationContext context = new GenericApplicationContext(); + var context = new GenericApplicationContext(); context.refresh(); ExtendedConsumerProperties consumerProperties = createConsumerProperties(); @@ -3099,7 +3099,7 @@ void testAutoAddPartitionsDisabledFailsIfTopicUnderPartitionedAndAutoRebalanceDi invokeCreateTopic(testTopicName, 1, 1); configurationProperties.setAutoAddPartitions(false); Binder binder = getBinder(configurationProperties); - GenericApplicationContext context = new GenericApplicationContext(); + var context = new GenericApplicationContext(); context.refresh(); ExtendedConsumerProperties consumerProperties = createConsumerProperties(); @@ -3172,7 +3172,7 @@ void testPartitionCountNotReduced() throws Throwable { invokeCreateTopic(testTopicName, 6, 1); configurationProperties.setAutoAddPartitions(true); Binder binder = getBinder(configurationProperties); - GenericApplicationContext context = new GenericApplicationContext(); + var context = new GenericApplicationContext(); context.refresh(); ExtendedConsumerProperties consumerProperties = createConsumerProperties(); @@ -3203,7 +3203,7 @@ void testConsumerDefaultDeserializer() throws Throwable { binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties); - DirectFieldAccessor consumerAccessor = new DirectFieldAccessor( + var consumerAccessor = new DirectFieldAccessor( getKafkaConsumer(binding)); assertThat(consumerAccessor .getPropertyValue("keyDeserializer") instanceof ByteArrayDeserializer) @@ -3241,7 +3241,7 @@ void testConsumerCustomDeserializer() throws Exception { binding = binder.bindConsumer(testTopicName, "test", input, consumerProperties); - DirectFieldAccessor consumerAccessor = new DirectFieldAccessor( + var consumerAccessor = new DirectFieldAccessor( getKafkaConsumer(binding)); assertThat(consumerAccessor .getPropertyValue("keyDeserializer") instanceof StringDeserializer) @@ -3271,7 +3271,7 @@ void testNativeSerializationWithCustomSerializerDeserializer() KafkaBinderConfigurationProperties configurationProperties = createConfigurationProperties(); configurationProperties.setAutoAddPartitions(true); Binder binder = getBinder(configurationProperties); - QueueChannel moduleInputChannel = new QueueChannel(); + var moduleInputChannel = new QueueChannel(); ExtendedProducerProperties producerProperties = createProducerProperties(); producerProperties.setUseNativeEncoding(true); producerProperties.getExtension().getConfiguration().put("value.serializer", @@ -3307,13 +3307,13 @@ void testNativeSerializationWithCustomSerializerDeserializer() } private KafkaConsumer getKafkaConsumer(Binding binding) { - DirectFieldAccessor bindingAccessor = new DirectFieldAccessor(binding); + var bindingAccessor = new DirectFieldAccessor(binding); KafkaMessageDrivenChannelAdapter adapter = (KafkaMessageDrivenChannelAdapter) bindingAccessor .getPropertyValue("lifecycle"); - DirectFieldAccessor adapterAccessor = new DirectFieldAccessor(adapter); + var adapterAccessor = new DirectFieldAccessor(adapter); ConcurrentMessageListenerContainer messageListenerContainer = (ConcurrentMessageListenerContainer) adapterAccessor .getPropertyValue("messageListenerContainer"); - DirectFieldAccessor containerAccessor = new DirectFieldAccessor( + var containerAccessor = new DirectFieldAccessor( messageListenerContainer); DefaultKafkaConsumerFactory consumerFactory = (DefaultKafkaConsumerFactory) containerAccessor .getPropertyValue("consumerFactory"); @@ -3327,7 +3327,7 @@ void testNativeSerializationWithCustomSerializerDeserializerBytesPayload() Binding producerBinding = null; Binding consumerBinding = null; try { - byte[] testPayload = new byte[1]; + var testPayload = new byte[1]; Message message = MessageBuilder.withPayload(testPayload) .setHeader(MessageHeaders.CONTENT_TYPE, "something/funky").build(); SubscribableChannel moduleOutputChannel = new DirectChannel(); @@ -3337,11 +3337,11 @@ void testNativeSerializationWithCustomSerializerDeserializerBytesPayload() Binder binder = getBinder(configurationProperties); ConfigurableApplicationContext context = TestUtils.getPropertyValue(binder, "binder.applicationContext", ConfigurableApplicationContext.class); - MessagingMessageConverter converter = new MessagingMessageConverter(); + var converter = new MessagingMessageConverter(); converter.setGenerateMessageId(true); converter.setGenerateTimestamp(true); context.getBeanFactory().registerSingleton("testConverter", converter); - QueueChannel moduleInputChannel = new QueueChannel(); + var moduleInputChannel = new QueueChannel(); ExtendedProducerProperties producerProperties = createProducerProperties(); producerProperties.setUseNativeEncoding(true); producerProperties.getExtension().getConfiguration().put( @@ -3413,7 +3413,7 @@ void testBuiltinSerialization() throws Exception { // Let the consumer actually bind to the producer before sending a msg binderBindUnbindLatency(); moduleOutputChannel.send(message); - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); AtomicReference> inboundMessageRef = new AtomicReference<>(); moduleInputChannel.subscribe(message1 -> { try { @@ -3450,19 +3450,19 @@ void testSendAndReceiveWithMixedMode() throws Exception { KafkaBinderConfigurationProperties binderConfiguration = createConfigurationProperties(); binderConfiguration.setHeaders("foo"); Binder binder = getBinder(binderConfiguration); - DirectChannel moduleOutputChannel1 = new DirectChannel(); + var moduleOutputChannel1 = new DirectChannel(); ExtendedProducerProperties producerProperties1 = createProducerProperties(); producerProperties1.setHeaderMode(HeaderMode.embeddedHeaders); Binding producerBinding1 = binder.bindProducer("mixed.0", moduleOutputChannel1, producerProperties1); - DirectChannel moduleOutputChannel2 = new DirectChannel(); + var moduleOutputChannel2 = new DirectChannel(); ExtendedProducerProperties producerProperties2 = createProducerProperties(); producerProperties2.setHeaderMode(HeaderMode.headers); Binding producerBinding2 = binder.bindProducer("mixed.0", moduleOutputChannel2, producerProperties2); - DirectChannel moduleOutputChannel3 = new DirectChannel(); + var moduleOutputChannel3 = new DirectChannel(); ExtendedProducerProperties producerProperties3 = createProducerProperties(); producerProperties3.setHeaderMode(HeaderMode.none); Binding producerBinding3 = binder.bindProducer("mixed.0", @@ -3472,8 +3472,8 @@ void testSendAndReceiveWithMixedMode() throws Exception { consumerProperties.setHeaderMode(HeaderMode.embeddedHeaders); DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); - QueueChannel bridged = new QueueChannel(); - BridgeHandler bridge = new BridgeHandler(); + var bridged = new QueueChannel(); + var bridge = new BridgeHandler(); bridge.setOutputChannel(bridged); moduleInputChannel.subscribe(bridge); Binding consumerBinding = binder.bindConsumer("mixed.0", "test", @@ -3516,7 +3516,7 @@ void testSendAndReceiveWithMixedMode() throws Exception { ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + var cf = new DefaultKafkaConsumerFactory<>(consumerProps); Consumer consumer = cf.createConsumer(); consumer.subscribe(Collections.singletonList("mixed.0")); @@ -3556,7 +3556,7 @@ void testPolledConsumer() throws Exception { consumerProps); Map producerProps = KafkaTestUtils .producerProps(embeddedKafka); - KafkaTemplate template = new KafkaTemplate( + var template = new KafkaTemplate( new DefaultKafkaProducerFactory<>(producerProps)); template.send("pollable", "testPollable"); boolean polled = inboundBindTarget.poll(m -> { @@ -3607,7 +3607,7 @@ void testPolledConsumerRequeue() throws Exception { "pollableRequeue", "group", inboundBindTarget, properties); Map producerProps = KafkaTestUtils .producerProps(embeddedKafka); - KafkaTemplate template = new KafkaTemplate( + var template = new KafkaTemplate( new DefaultKafkaProducerFactory<>(producerProps)); template.send("pollableRequeue", "testPollable"); try { @@ -3645,7 +3645,7 @@ void testPolledConsumerWithDlq() throws Exception { .producerProps(embeddedKafka); Binding> binding = binder.bindPollableConsumer( "pollableDlq", "group-pcWithDlq", inboundBindTarget, properties); - KafkaTemplate template = new KafkaTemplate( + var template = new KafkaTemplate( new DefaultKafkaProducerFactory<>(producerProps)); template.send("pollableDlq", "testPollableDLQ"); try { @@ -3689,7 +3689,7 @@ void testTopicPatterns() throws Exception { consumerProperties.getExtension().setDestinationIsPattern(true); DirectChannel moduleInputChannel = createBindableChannel("input", createConsumerBindingProperties(consumerProperties)); - final CountDownLatch latch = new CountDownLatch(1); + final var latch = new CountDownLatch(1); final AtomicReference topic = new AtomicReference<>(); moduleInputChannel.subscribe(m -> { topic.set(m.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)); @@ -3698,9 +3698,9 @@ void testTopicPatterns() throws Exception { Binding consumerBinding = binder.bindConsumer( "topicPatterns\\..*", "testTopicPatterns", moduleInputChannel, consumerProperties); - DefaultKafkaProducerFactory pf = new DefaultKafkaProducerFactory( + var pf = new DefaultKafkaProducerFactory( KafkaTestUtils.producerProps(embeddedKafka)); - KafkaTemplate template = new KafkaTemplate(pf); + var template = new KafkaTemplate(pf); template.send("topicPatterns.1", "foo"); assertThat(latch.await(10, TimeUnit.SECONDS)).isTrue(); assertThat(topic.get()).isEqualTo("topicPatterns.1"); @@ -3711,7 +3711,7 @@ void testTopicPatterns() throws Exception { @Test void testSameTopicCannotBeProvisionedAgain() throws Throwable { - CountDownLatch latch = new CountDownLatch(1); + var latch = new CountDownLatch(1); try (AdminClient admin = AdminClient.create( Collections.singletonMap(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString()))) { @@ -3768,10 +3768,10 @@ void testResetOffsets() throws Exception { .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.TEXT_PLAIN) .setHeader(KafkaHeaders.PARTITION, i) .build())); - CountDownLatch latch1 = new CountDownLatch(10); - CountDownLatch latch2 = new CountDownLatch(20); + var latch1 = new CountDownLatch(10); + var latch2 = new CountDownLatch(20); AtomicReference> inboundMessageRef = new AtomicReference<>(); - AtomicInteger received = new AtomicInteger(); + var received = new AtomicInteger(); moduleInputChannel.subscribe(message1 -> { try { inboundMessageRef.set((Message) message1); @@ -3815,7 +3815,7 @@ void testRecordMetadata() throws Exception { ExtendedProducerProperties producerProperties = createProducerProperties(); producerProperties.getExtension().setRecordMetadataChannel("metaChannel"); - QueueChannel metaChannel = new QueueChannel(); + var metaChannel = new QueueChannel(); DirectChannel moduleOutputChannel = createBindableChannel("output", createProducerBindingProperties(producerProperties)); @@ -3932,7 +3932,7 @@ binderConfiguration, new TestKafkaProperties(), prop -> { ExtendedConsumerProperties consumerProperties = createConsumerProperties(); Binding consumerBinding = binder.bindConsumer(name + ".0", name, input, consumerProperties); Map producerProps = KafkaTestUtils.producerProps(embeddedKafka); - KafkaTemplate template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps)); + var template = new KafkaTemplate(new DefaultKafkaProducerFactory<>(producerProps)); template.send(MessageBuilder.withPayload("internalHeaderPropagation") .setHeader(KafkaHeaders.TOPIC, name + ".0") .setHeader("someHeader", "someValue") @@ -3949,7 +3949,7 @@ binderConfiguration, new TestKafkaProperties(), prop -> { consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory<>(consumerProps); + var cf = new DefaultKafkaConsumerFactory<>(consumerProps); Consumer consumer = cf.createConsumer(); consumer.assign(Collections.singletonList(new TopicPartition(name + ".1", 0))); ConsumerRecords records = consumer.poll(Duration.ofSeconds(10)); @@ -4091,7 +4091,7 @@ private static void assertionsOnKafkaTemplate(TestObservationRegistry observatio KafkaProducerMessageHandler endpoint = TestUtils.getPropertyValue(producerBinding, "lifecycle", KafkaProducerMessageHandler.class); - final KafkaTemplate kafkaTemplate = (KafkaTemplate) new DirectFieldAccessor(endpoint).getPropertyValue("kafkaTemplate"); + final var kafkaTemplate = (KafkaTemplate) new DirectFieldAccessor(endpoint).getPropertyValue("kafkaTemplate"); assertThat(kafkaTemplate).isNotNull(); Boolean observationEnabled = (Boolean) new DirectFieldAccessor(kafkaTemplate).getPropertyValue("observationEnabled"); assertThat(observationEnabled).isTrue();