Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock in KafkaMessageSource - between isRunning and doReceive #8778

Closed
sobychacko opened this issue Oct 26, 2023 · 1 comment · Fixed by #8780
Closed

Deadlock in KafkaMessageSource - between isRunning and doReceive #8778

sobychacko opened this issue Oct 26, 2023 · 1 comment · Fixed by #8780

Comments

@sobychacko
Copy link
Contributor

For more context: spring-cloud/spring-cloud-stream#2675

@artembilan
Copy link
Member

Also good suggestion: spring-cloud/spring-cloud-stream#2838 (comment)

artembilan added a commit that referenced this issue Oct 26, 2023
Related to #8778

**Cherry-pick to `6.0.x`**
artembilan added a commit that referenced this issue Oct 26, 2023
Related to #8778

**Cherry-pick to `6.0.x`**
@artembilan artembilan self-assigned this Oct 26, 2023
artembilan added a commit to artembilan/spring-integration that referenced this issue Oct 26, 2023
Fixes spring-projects#8778

The `KafkaMessageSource.doReceive()` have a lock around its whole body.
That includes the `pollRecord()` which can be blocked on the `KafkaConsumer.poll()`.
This way the rest of lifecycle management callbacks can be blocked until `KafkaConsumer.poll()` returns.

* Rework lifecycle management flags to `AtomicBoolean` since there is not too much work
in their respective callbacks
* Decrease a locking block in the `doReceive()` just to consumer setup part.
Leave `pollRecord()` outside of the lock
* Add `this.consumer.wakeup()` into `stopConsumer()` to break a `poll()` cycle
and return immediately for the next `close()` call

**Cherry-pick to `6.1.x` & `6.0.x`**
garyrussell pushed a commit that referenced this issue Oct 26, 2023
* GH-8778: Fix KafkaMessageSource deadlock

Fixes #8778

The `KafkaMessageSource.doReceive()` have a lock around its whole body.
That includes the `pollRecord()` which can be blocked on the `KafkaConsumer.poll()`.
This way the rest of lifecycle management callbacks can be blocked until `KafkaConsumer.poll()` returns.

* Rework lifecycle management flags to `AtomicBoolean` since there is not too much work
in their respective callbacks
* Decrease a locking block in the `doReceive()` just to consumer setup part.
Leave `pollRecord()` outside of the lock
* Add `this.consumer.wakeup()` into `stopConsumer()` to break a `poll()` cycle
and return immediately for the next `close()` call

**Cherry-pick to `6.1.x` & `6.0.x`**

* * Use `compareAndSet` in `start` & `stop`
garyrussell pushed a commit that referenced this issue Oct 26, 2023
* GH-8778: Fix KafkaMessageSource deadlock

Fixes #8778

The `KafkaMessageSource.doReceive()` have a lock around its whole body.
That includes the `pollRecord()` which can be blocked on the `KafkaConsumer.poll()`.
This way the rest of lifecycle management callbacks can be blocked until `KafkaConsumer.poll()` returns.

* Rework lifecycle management flags to `AtomicBoolean` since there is not too much work
in their respective callbacks
* Decrease a locking block in the `doReceive()` just to consumer setup part.
Leave `pollRecord()` outside of the lock
* Add `this.consumer.wakeup()` into `stopConsumer()` to break a `poll()` cycle
and return immediately for the next `close()` call

**Cherry-pick to `6.1.x` & `6.0.x`**

* * Use `compareAndSet` in `start` & `stop`
garyrussell pushed a commit that referenced this issue Oct 26, 2023
* GH-8778: Fix KafkaMessageSource deadlock

Fixes #8778

The `KafkaMessageSource.doReceive()` have a lock around its whole body.
That includes the `pollRecord()` which can be blocked on the `KafkaConsumer.poll()`.
This way the rest of lifecycle management callbacks can be blocked until `KafkaConsumer.poll()` returns.

* Rework lifecycle management flags to `AtomicBoolean` since there is not too much work
in their respective callbacks
* Decrease a locking block in the `doReceive()` just to consumer setup part.
Leave `pollRecord()` outside of the lock
* Add `this.consumer.wakeup()` into `stopConsumer()` to break a `poll()` cycle
and return immediately for the next `close()` call

**Cherry-pick to `6.1.x` & `6.0.x`**

* * Use `compareAndSet` in `start` & `stop`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants