-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
GH-8778: Fix KafkaMessageSource deadlock #8780
Conversation
Fixes spring-projects#8778 The `KafkaMessageSource.doReceive()` have a lock around its whole body. That includes the `pollRecord()` which can be blocked on the `KafkaConsumer.poll()`. This way the rest of lifecycle management callbacks can be blocked until `KafkaConsumer.poll()` returns. * Rework lifecycle management flags to `AtomicBoolean` since there is not too much work in their respective callbacks * Decrease a locking block in the `doReceive()` just to consumer setup part. Leave `pollRecord()` outside of the lock * Add `this.consumer.wakeup()` into `stopConsumer()` to break a `poll()` cycle and return immediately for the next `close()` call **Cherry-pick to `6.1.x` & `6.0.x`**
CC @sobychacko |
LGTM, @artembilan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't cherry-pick because of the ReentrantLock
.
finally { | ||
this.lock.unlock(); | ||
} | ||
stopConsumer(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (this.running.compareAndSet(true, false)) {...}
finally { | ||
this.lock.unlock(); | ||
} | ||
this.running.set(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (this.running.compareAndSet(false, true)) {...}
I have back-ported today such a fix: 991d29e |
* GH-8778: Fix KafkaMessageSource deadlock Fixes #8778 The `KafkaMessageSource.doReceive()` have a lock around its whole body. That includes the `pollRecord()` which can be blocked on the `KafkaConsumer.poll()`. This way the rest of lifecycle management callbacks can be blocked until `KafkaConsumer.poll()` returns. * Rework lifecycle management flags to `AtomicBoolean` since there is not too much work in their respective callbacks * Decrease a locking block in the `doReceive()` just to consumer setup part. Leave `pollRecord()` outside of the lock * Add `this.consumer.wakeup()` into `stopConsumer()` to break a `poll()` cycle and return immediately for the next `close()` call **Cherry-pick to `6.1.x` & `6.0.x`** * * Use `compareAndSet` in `start` & `stop`
* GH-8778: Fix KafkaMessageSource deadlock Fixes #8778 The `KafkaMessageSource.doReceive()` have a lock around its whole body. That includes the `pollRecord()` which can be blocked on the `KafkaConsumer.poll()`. This way the rest of lifecycle management callbacks can be blocked until `KafkaConsumer.poll()` returns. * Rework lifecycle management flags to `AtomicBoolean` since there is not too much work in their respective callbacks * Decrease a locking block in the `doReceive()` just to consumer setup part. Leave `pollRecord()` outside of the lock * Add `this.consumer.wakeup()` into `stopConsumer()` to break a `poll()` cycle and return immediately for the next `close()` call **Cherry-pick to `6.1.x` & `6.0.x`** * * Use `compareAndSet` in `start` & `stop`
...and cherry-picked. |
Fixes #8778
The
KafkaMessageSource.doReceive()
have a lock around its whole body. That includes thepollRecord()
which can be blocked on theKafkaConsumer.poll()
. This way the rest of lifecycle management callbacks can be blocked untilKafkaConsumer.poll()
returns.AtomicBoolean
since there is not too much work in their respective callbacksdoReceive()
just to consumer setup part. LeavepollRecord()
outside of the lockthis.consumer.wakeup()
intostopConsumer()
to break apoll()
cycle and return immediately for the nextclose()
callCherry-pick to
6.1.x
&6.0.x