Skip to content

Commit

Permalink
fix defects in DefaultKafkaProducerFactory#updateConfigs()
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanQingyangXu committed Nov 16, 2023
1 parent f768874 commit 03e8d20
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -626,23 +626,31 @@ public boolean removePostProcessor(ProducerPostProcessor<K, V> postProcessor) {

@Override
public void updateConfigs(Map<String, Object> updates) {
updates.entrySet().forEach(entry -> {
if (entry.getKey().equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG
+ "' must be a String, not a " + entry.getClass().getName());
Assert.isTrue(this.transactionIdPrefix != null
? entry.getValue() != null
: entry.getValue() == null,
"Cannot change transactional capability");
this.transactionIdPrefix = (String) entry.getValue();
updates.forEach((key, value) -> {
if (key == null) {
return;
}
else if (entry.getKey().equals(ProducerConfig.CLIENT_ID_CONFIG)) {
Assert.isTrue(entry.getValue() instanceof String, () -> "'" + ProducerConfig.CLIENT_ID_CONFIG
+ "' must be a String, not a " + entry.getClass().getName());
this.clientIdPrefix = (String) entry.getValue();
if (key.equals(ProducerConfig.TRANSACTIONAL_ID_CONFIG)) {
Assert.isTrue(
value == null || value instanceof String,
() -> "'" + ProducerConfig.TRANSACTIONAL_ID_CONFIG
+ "' must be null or a String, not " + value.getClass().getName()
);
Assert.isTrue(
(this.transactionIdPrefix != null) == (value != null),
"Cannot change transactional capability"
);
this.transactionIdPrefix = (String) value;
}
else {
this.configs.put(entry.getKey(), entry.getValue());
else if (key.equals(ProducerConfig.CLIENT_ID_CONFIG)) {
Assert.isTrue(
value == null || value instanceof String,
() -> "'" + ProducerConfig.CLIENT_ID_CONFIG
+ "' must be null or a String, not " + value.getClass().getName());
this.clientIdPrefix = (String) value;
}
else if (value != null) {
this.configs.put(key, value);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.springframework.kafka.core;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -545,6 +546,7 @@ void configUpdates() {
assertThat(KafkaTestUtils.getPropertyValue(pf1, "transactionIdPrefix")).isEqualTo("tx2-");
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, null);
assertThatIllegalArgumentException().isThrownBy(() -> pf1.updateConfigs(configs));
assertThatCode(() -> pf1.updateConfigs(Collections.singletonMap(null, null))).doesNotThrowAnyException();
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 03e8d20

Please sign in to comment.