Skip to content

Commit

Permalink
Fix typos in KafkaMLContainer & ContainerProps
Browse files Browse the repository at this point in the history
Fix typos in the `KafkaMessageListenerContainer` and `ContainerProperties`
  • Loading branch information
Wzy19930507 authored Dec 14, 2023
1 parent edfd8e8 commit 8e564bf
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public enum EOSMode {
/**
* The time (ms) after which outstanding offsets should be committed when
* {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
* larger than
* larger than zero.
*/
private long ackTime = DEFAULT_ACK_TIME;

Expand Down Expand Up @@ -388,7 +388,7 @@ public void setAckCount(int count) {
/**
* Set the time (ms) after which outstanding offsets should be committed when
* {@link AckMode#TIME} or {@link AckMode#COUNT_TIME} is being used. Should be
* larger than
* larger than zero.
* @param ackTime the time
*/
public void setAckTime(long ackTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ else if (partitionsListenerConsumer.assignedPartitions != null) {
@Nullable
public Map<String, Collection<TopicPartition>> getAssignmentsByClientId() {
ListenerConsumer partitionsListenerConsumer = this.listenerConsumer;
if (this.listenerConsumer != null) {
if (partitionsListenerConsumer != null) {
return Collections.singletonMap(partitionsListenerConsumer.getClientId(), getAssignedPartitions());
}
else {
Expand Down Expand Up @@ -324,7 +324,7 @@ public void resume() {
super.resume();
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
if (consumer != null) {
this.listenerConsumer.wakeIfNecessary();
consumer.wakeIfNecessary();
}
}

Expand All @@ -333,7 +333,7 @@ public void resumePartition(TopicPartition topicPartition) {
super.resumePartition(topicPartition);
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
if (consumer != null) {
this.listenerConsumer.wakeIfNecessary();
consumer.wakeIfNecessary();
}
}

Expand Down Expand Up @@ -926,7 +926,6 @@ else if (listener instanceof MessageListener) {
if (this.containerProperties.isLogContainerConfig()) {
this.logger.info(toString());
}
Map<String, Object> props = KafkaMessageListenerContainer.this.consumerFactory.getConfigurationProperties();
ApplicationContext applicationContext = getApplicationContext();
this.checkNullKeyForExceptions = this.containerProperties.isCheckDeserExWhenKeyNull()
|| ErrorHandlingUtils.checkDeserializer(KafkaMessageListenerContainer.this.consumerFactory,
Expand All @@ -942,7 +941,7 @@ else if (listener instanceof MessageListener) {
: applicationContext.getClassLoader());
this.syncCommitTimeout = determineSyncCommitTimeout();
if (this.containerProperties.getSyncCommitTimeout() == null) {
// update the property so we can use it directly from code elsewhere
// update the property, so we can use it directly from code elsewhere
this.containerProperties.setSyncCommitTimeout(this.syncCommitTimeout);
if (KafkaMessageListenerContainer.this.thisOrParentContainer != null) {
KafkaMessageListenerContainer.this.thisOrParentContainer
Expand Down Expand Up @@ -1019,15 +1018,7 @@ private void obtainClusterId() {

@Nullable
private ThreadStateProcessor setUpPollProcessor(boolean batch) {
if (batch) {
if (this.commonBatchInterceptor != null) {
return this.commonBatchInterceptor;
}
}
else if (this.commonRecordInterceptor != null) {
return this.commonRecordInterceptor;
}
return null;
return batch ? this.commonBatchInterceptor : this.commonRecordInterceptor;
}

@Nullable
Expand Down Expand Up @@ -3050,14 +3041,14 @@ private void timedAcks(AckMode ackMode) {
long now;
now = System.currentTimeMillis();
boolean elapsed = now - this.last > this.containerProperties.getAckTime();
if (ackMode.equals(AckMode.TIME) && elapsed) {
if (AckMode.TIME.equals(ackMode) && elapsed) {
this.logger.debug(() -> "Committing in AckMode.TIME " +
"because time elapsed exceeds configured limit of " +
this.containerProperties.getAckTime());
commitIfNecessary();
this.last = now;
}
else if (ackMode.equals(AckMode.COUNT_TIME) && elapsed) {
else if (AckMode.COUNT_TIME.equals(ackMode) && elapsed) {
this.logger.debug(() -> "Committing in AckMode.COUNT_TIME " +
"because time elapsed exceeds configured limit of " +
this.containerProperties.getAckTime());
Expand Down Expand Up @@ -3433,8 +3424,8 @@ public void acknowledge() {
acknowledge(this.partial + 1);
return;
}
Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
if (!this.acked) {
Map<TopicPartition, List<Long>> offs = ListenerConsumer.this.offsetsInThisBatch;
Map<TopicPartition, List<ConsumerRecord<K, V>>> deferred = ListenerConsumer.this.deferredOffsets;
for (ConsumerRecord<K, V> cRecord : getHighestOffsetRecords(this.records)) {
if (offs != null) {
Expand Down Expand Up @@ -3871,24 +3862,21 @@ private class StopCallback implements BiConsumer<Object, Throwable> {

private final Runnable callback;

StopCallback(Runnable callback) {
StopCallback(@Nullable Runnable callback) {
this.callback = callback;
}

@Override
public void accept(Object result, @Nullable Throwable throwable) {
if (throwable != null) {
KafkaMessageListenerContainer.this.logger.error(throwable, "Error while stopping the container");
if (this.callback != null) {
this.callback.run();
}
}
else {
KafkaMessageListenerContainer.this.logger
.debug(() -> KafkaMessageListenerContainer.this + " stopped normally");
if (this.callback != null) {
this.callback.run();
}
}
if (this.callback != null) {
this.callback.run();
}
}

Expand Down

0 comments on commit 8e564bf

Please sign in to comment.