Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Initial support for cooperative-sticky rebalancing #407

Merged
merged 6 commits into from
Dec 17, 2024

Conversation

untitaker
Copy link
Member

Fix one bug in StreamProcessor where it assumed the passed assignments
are replacing the old ones.

Our consumer backends mostly work as-is, and are already passing the
right values in callbacks.

Fix one bug in StreamProcessor where it assumed the passed assignments
are replacing the old ones.

Our consumer backends mostly work as-is, and are already passing the
right values in callbacks.
@untitaker untitaker requested review from a team as code owners December 13, 2024 01:06
Copy link
Member

@lynnagara lynnagara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Were you wanting to merge + publish as-is or do more testing against this branch?

Is it worth including a test that fails against the current cluster but works with newer versions? Attempting to commit on an existing partition during a rebalance might do it.

arroyo/processing/processor.py Outdated Show resolved Hide resolved
@@ -245,6 +256,10 @@ def test_consumer_polls_when_paused(self) -> None:
assert consumer.paused() == []


class TestKafkaStreamsIncrementalRebalancing(TestKafkaStreams):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this actually re-declares all the tests in TestKafkaStreams, just re-running them with cooperative-sticky rebalancing

# Second partition assigned
offsets_p1 = {Partition(topic, 1): 0}
assignment_callback(offsets_p1)

create_args, _ = factory.create_with_partitions.call_args
assert factory.create_with_partitions.call_count == 2
assert create_args[1] == offsets_p1
assert create_args[1] == {**offsets_p1, **offsets_p0}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this test change related to your other changes? since there's no cooperative rebalancing here, seems like the assertions should stay the same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the mocked return value for consumer.tell was wrong, so this had the wrong value. the assignments in this test are actually incremental: first p1 is assigned, then p0, and there's no revocation.

@@ -161,6 +161,7 @@ def __init__(
)

configuration = dict(configuration)
self.__assignment_strategy = configuration.get("partition.assignment.strategy")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry i said the wrong thing earlier, this should be group.protocol

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after discussing offline i think we can support KIP-848 (group.protocol) as well as cooperative-sticky rebalancing. they're the same as far as rdkafka API is concerned. i just can't get it to work right now and might scope it out of this PR if it takes too much time.

@untitaker
Copy link
Member Author

Is it worth including a test that fails against the current cluster but works with newer versions? Attempting to commit on an existing partition during a rebalance might do it.

can you elaborate on this?

Comment on lines +274 to +275
logger.info("skipping empty assignment")
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need a different logic for partition assignment between cooperative and standard in case of empty assignment?
I assume you can get an empty assignment in the cooperative rebalancing when, after a rebalancing, your assignments do not change. Is that the scenario where you do not want to touch the existing assignments ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after sleeping on it i agree. i only added this because it made cooperative rebalancing more comprehensible, and wasn't sure of the implications on regular rebalancing. i think we can skip empty assignments regardless of the assignment strategy.

@@ -107,7 +107,7 @@ def test_dlq_policy_wrapper() -> None:
)
partition = Partition(topic, 0)
wrapper = DlqPolicyWrapper(dlq_policy)
wrapper.reset_offsets({partition: 0})
wrapper.reset_dlq_limits({partition: 0})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this rename is just to align with rust btw

@untitaker untitaker merged commit 8ba2e54 into main Dec 17, 2024
14 checks passed
@untitaker untitaker deleted the incremental-rebalancing branch December 17, 2024 00:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants