Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka binder used with pollables makes fail the bindings actuator endpoint #2675

Closed
JordiMartinezVicent opened this issue Mar 23, 2023 · 2 comments

Comments

@JordiMartinezVicent
Copy link

JordiMartinezVicent commented Mar 23, 2023

Describe the issue
Given an application with two instances and:

  • Uses the kafka binder
  • Declares a pollable

The actuator endpoint /actuator/bindings lasts around 100s if the call is done to the instance which has no assigned partitions.

To Reproduce

scs-pollables.zip

With the attached application the behavior can be reproduced. In order to simulate two instances of the same application, it declares two pollables with the same group pointing to the same topic.

To startup the application, a kafka broker is needed.

To reproduce:

  • Start up the attached application
  • Call to the endpoint http://localhost:8080/actuator/bindings

Expected behavior
The call to the endpoint should not last around 100s

Version of the framework

Spring Boot: 3.0.4
spring-cloud-dependencies: 2022.0.1

Additional context

The issue is due to the class org.springframework.integration.kafka.inbound.KafkaMessageSource is synchronized and used for polling and to retrieve the binding state from the bindings endpoint.

On the one hand, it defines the following timeouts:

this.pollTimeout = Duration.ofMillis(consumerProperties.getPollTimeout());
this.assignTimeout = Duration.ofMillis(Math.max(this.pollTimeout.toMillis() * 20, MIN_ASSIGN_TIMEOUT)); // NOSONAR - magic 		

Note that the default pollTimeout is 5 seconds, so the assignTimeout will be 100s.

And it uses them here at the method pollRecord:

ConsumerRecords<K, V> records = this.consumer
  poll(this.assignedPartitions.isEmpty() ? this.assignTimeout : this.pollTimeout);

Note that the assignTimeout is used if there are no assignedPartitions

On the other hand, when calling the endpoint, the method of the same class isRunning is called (which is syncronized).

Maybe to avoid the synchronization point, the state running and stoped could be manage at the class DefaultBinding, same as it does with the "paused" state. In that manner, the polling and the retrieving of the state would not collision.

@JordiMartinezVicent
Copy link
Author

Hi guys, any update with this?
@olegz @garyrussell

sobychacko added a commit to sobychacko/spring-cloud-stream that referenced this issue Oct 25, 2023
 - Instead of relying on the lifecycle object's isRunning method
   which could possibly be synchronized and cause a contention for
   the lock,  maintain the running state separately inside DefaultBinding
   via a field (similar to how it tracks the pausable state).

   Without this change, in the pollable Kafka consumer based applications,
   there is a possiblity for a longer wait when queyring the binding status
   in DefaultBinding (for example via the binding actuator endpoint) due to
   this aforementioned lock contention.

Resolves spring-cloud#2675
@sobychacko
Copy link
Contributor

sobychacko commented Oct 26, 2023

Closing this in lieu of spring-projects/spring-integration#8778

The fix will be available in Spring Cloud Stream 4.1.0 and other patch releases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants