Skip to content

Commit

Permalink
GH-3526: Configure deserializers against modified configs (#3540)
Browse files Browse the repository at this point in the history
Fixes: #3526
  • Loading branch information
artembilan authored Oct 7, 2024
1 parent 367f40f commit 3b64536
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,42 +169,8 @@ public DefaultKafkaConsumerFactory(Map<String, Object> configs,

this.configs = new ConcurrentHashMap<>(configs);
this.configureDeserializers = configureDeserializers;
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
}

private Supplier<Deserializer<K>> keyDeserializerSupplier(
@Nullable Supplier<Deserializer<K>> keyDeserializerSupplier) {

if (!this.configureDeserializers) {
return keyDeserializerSupplier;
}
return keyDeserializerSupplier == null
? () -> null
: () -> {
Deserializer<K> deserializer = keyDeserializerSupplier.get();
if (deserializer != null) {
deserializer.configure(this.configs, true);
}
return deserializer;
};
}

private Supplier<Deserializer<V>> valueDeserializerSupplier(
@Nullable Supplier<Deserializer<V>> valueDeserializerSupplier) {

if (!this.configureDeserializers) {
return valueDeserializerSupplier;
}
return valueDeserializerSupplier == null
? () -> null
: () -> {
Deserializer<V> deserializer = valueDeserializerSupplier.get();
if (deserializer != null) {
deserializer.configure(this.configs, false);
}
return deserializer;
};
this.keyDeserializerSupplier = keyDeserializerSupplier;
this.valueDeserializerSupplier = valueDeserializerSupplier;
}

@Override
Expand All @@ -219,7 +185,7 @@ public void setBeanName(String name) {
* @param keyDeserializer the deserializer.
*/
public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
this.keyDeserializerSupplier = keyDeserializerSupplier(() -> keyDeserializer);
this.keyDeserializerSupplier = () -> keyDeserializer;
}

/**
Expand All @@ -229,7 +195,7 @@ public void setKeyDeserializer(@Nullable Deserializer<K> keyDeserializer) {
* @param valueDeserializer the value deserializer.
*/
public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
this.valueDeserializerSupplier = valueDeserializerSupplier(() -> valueDeserializer);
this.valueDeserializerSupplier = () -> valueDeserializer;
}

/**
Expand All @@ -240,7 +206,7 @@ public void setValueDeserializer(@Nullable Deserializer<V> valueDeserializer) {
* @since 2.8
*/
public void setKeyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializerSupplier) {
this.keyDeserializerSupplier = keyDeserializerSupplier(keyDeserializerSupplier);
this.keyDeserializerSupplier = keyDeserializerSupplier;
}

/**
Expand All @@ -251,7 +217,7 @@ public void setKeyDeserializerSupplier(Supplier<Deserializer<K>> keyDeserializer
* @since 2.8
*/
public void setValueDeserializerSupplier(Supplier<Deserializer<V>> valueDeserializerSupplier) {
this.valueDeserializerSupplier = valueDeserializerSupplier(valueDeserializerSupplier);
this.valueDeserializerSupplier = valueDeserializerSupplier;
}

/**
Expand Down Expand Up @@ -499,14 +465,36 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
this.applicationContext = applicationContext;
}

@Nullable
private Deserializer<K> keyDeserializer(Map<String, Object> configs) {
Deserializer<K> deserializer =
this.keyDeserializerSupplier != null
? this.keyDeserializerSupplier.get()
: null;
if (deserializer != null && this.configureDeserializers) {
deserializer.configure(configs, true);
}
return deserializer;
}

@Nullable
private Deserializer<V> valueDeserializer(Map<String, Object> configs) {
Deserializer<V> deserializer =
this.valueDeserializerSupplier != null
? this.valueDeserializerSupplier.get()
: null;
if (deserializer != null && this.configureDeserializers) {
deserializer.configure(configs, false);
}
return deserializer;
}

protected class ExtendedKafkaConsumer extends KafkaConsumer<K, V> {

private String idForListeners;

protected ExtendedKafkaConsumer(Map<String, Object> configProps) {
super(configProps,
DefaultKafkaConsumerFactory.this.keyDeserializerSupplier.get(),
DefaultKafkaConsumerFactory.this.valueDeserializerSupplier.get());
super(configProps, keyDeserializer(configProps), valueDeserializer(configProps));

if (!DefaultKafkaConsumerFactory.this.listeners.isEmpty()) {
Iterator<MetricName> metricIterator = metrics().keySet().iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,11 @@ public void consumerRemoved(String id, Consumer consumer) {
void configDeserializer() {
Deserializer key = mock(Deserializer.class);
Deserializer value = mock(Deserializer.class);
Map<String, Object> config = new HashMap<>();
Map<String, Object> config = KafkaTestUtils.consumerProps("mockGroup", "false", this.embeddedKafka);
DefaultKafkaConsumerFactory cf = new DefaultKafkaConsumerFactory(config, key, value);
Deserializer keyDeserializer = cf.getKeyDeserializer();
assertThat(keyDeserializer).isSameAs(key);
cf.createKafkaConsumer(config);
verify(key).configure(config, true);
Deserializer valueDeserializer = cf.getValueDeserializer();
assertThat(valueDeserializer).isSameAs(value);
Expand Down

0 comments on commit 3b64536

Please sign in to comment.