Skip to content

KAFKA-19268 Missing mocks for SharePartitionManagerTest tests #19786

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
May 26, 2025

Conversation

ji-seung-ryu
Copy link
Contributor

@ji-seung-ryu ji-seung-ryu commented May 22, 2025

jira:
https://issues.apache.org/jira/browse/KAFKA-19268

In jira, there are 5 unit
tests(testAcknowledgeCompletesDelayedShareFetchRequest,
testMultipleConcurrentShareFetches
testCachedTopicPartitionsForValidShareSessions,
testReleaseSessionCompletesDelayedShareFetchRequest,
testReleaseSessionSuccess) that print exceptions.

testAcknowledgeCompletesDelayedShareFetchRequest

[problem]

  • buildLogReadResult was programmed to return list of tp1, tp2 .
  • readFromLog should return only list of topic partitions that will be
    acknowleged, otherwise error occurs.

[solution]

  • Make buildLogReadResult return list of tp1 which will be acknowleged.

testMultipleConcurrentShareFetches

[problem]

  • buildlogReadResult had no records which leads to error in
    maybeSliceFetchRecords.
  • java.util.NoSuchElementException: null
    at
    org.apache.kafka.common.utils.AbstractIterator.next(AbstractIterator.java:52)
    ~[kafka-clients-4.1.0-SNAPSHOT.jar:?]
    at
    kafka.server.share.ShareFetchUtils.maybeSliceFetchRecords(ShareFetchUtils.java:219)
    [main/:?]
    at
    kafka.server.share.ShareFetchUtils.processFetchResponse(ShareFetchUtils.java:132)
    [main/:?]

[solution]

  • I add buildLogReadResultWithFakeRecords which has records.
  • Though above addition makes the test pass well, still not sure
    doAnswer chaning (line 1075 ~ 1105) works well in multi threading test.
    Even though I changed values of assertEquals in doAnswer chaining to
    random value, test passed as well. It needs to be checked that chaining
    works well even in multi thread test.

testCachedTopicPartitionsForValidShareSessions

[problem]

  • ReleaseAcquiredRecords mocks missed.
  • SharePartitionManager didn't have right partitionCache.

[solution]

  • Above two things are added.

testReleaseSessionCompletesDelayedShareFetchRequest

[problem]

  • tp3 is not in sharePartitionManager's partition cache.
  • In test code, it tries to release tp3 by
    'sharePartitionManager.releaseSession(groupId, memberId);' which leads
    to exception.

[solution]

  • I haven't added nothing since this exception was intended.
  • Printing exeption looks bad though.

testReleaseSessionSuccess

[problem]

  • tp3 is not in sharePartitionManager's partition cache.
  • In test code, it tries to release tp3 by
    'sharePartitionManager.releaseSession(groupId, memberId.toString());'
    which leads to exception.

[solution]

  • I haven't added nothing since this exception was intended.
  • Printing exeption looks bad though.

Reviewers: Abhinav Dixit [email protected], Andrew Schofield
[email protected]

@github-actions github-actions bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) KIP-932 Queues for Kafka small Small PRs labels May 22, 2025
Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Overall looks fine. A few comments.
For testReleaseSessionCompletesDelayedShareFetchRequest -> you can just mock behaviour for releaseAcquiredRecords for tp3. Example - when(sp3.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.completedFuture(null));
that should be enough

For testReleaseSessionSuccess, I agree that getting the exception is intentional. So, we don't need a change there

@AndrewJSchofield AndrewJSchofield removed the triage PRs from the community label May 23, 2025
@ji-seung-ryu
Copy link
Contributor Author

ji-seung-ryu commented May 24, 2025

Thank you for kind code review. I resolved all you mentioned. Also, I want you to see if testMultipleConcurrentShareFetches works as it is intended. I don't think doAnswer chaning (line 1072 ~ 1101 in sharePartitionManagerTest.java) works well in multi threading test.
Even though I changed values of assertEquals in doAnswer chaining to random value, test passed as well. It needs to be checked that chaining works well even in multi thread test.

@adixitconfluent
Copy link
Contributor

@ji-seung-ryu Thanks for the changes. As I mentioned in this comment, we also need to change testReleaseSessionCompletesDelayedShareFetchRequest to mock the behaviour for releaseAcquiredRecords for tp3. Example - when(sp3.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.completedFuture(null));

Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A small comment

@adixitconfluent
Copy link
Contributor

Thank you for kind code review. I resolved all you mentioned. Also, I want you to see if testMultipleConcurrentShareFetches works as it is intended. I don't think doAnswer chaning (line 1072 ~ 1101 in sharePartitionManagerTest.java) works well in multi threading test. Even though I changed values of assertEquals in doAnswer chaining to random value, test passed as well. It needs to be checked that chaining works well even in multi thread test.

@ji-seung-ryu , hmm, I think you're right. I have created a ticket https://issues.apache.org/jira/browse/KAFKA-19328 for tracking the same.

@ji-seung-ryu
Copy link
Contributor Author

ji-seung-ryu commented May 25, 2025

@ji-seung-ryu Thanks for the changes. As I mentioned in this comment, we also need to change testReleaseSessionCompletesDelayedShareFetchRequest to mock the behaviour for releaseAcquiredRecords for tp3. Example - when(sp3.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.completedFuture(null));

@adixitconfluent Simply adding that kinds of line is not sufficient. Since tp3 is not in sharePartitionManager.partitionCache, but returned in cachedTopicIdPartitionsInShareSession. Currently, In Following codes of SharePartitionManager.java, exceptions occurs.

        List<TopicIdPartition> topicIdPartitions = cachedTopicIdPartitionsInShareSession(
        groupId, memberIdUuid); // tp3 is included.

        ...

       topicIdPartitions.forEach(topicIdPartition -> {
        SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);

        SharePartition sharePartition = partitionCache.get(sharePartitionKey); // In partitionCache, no tp3
        if (sharePartition == null) {
            // this exception is called. 
            log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition);'

There are two options we can choose.

  1. add tp3 in sharePartitionManager.partitionCache and make mock behaviors for it.
  2. do nothing and see if exception is detected and handled well.

I guess 2 is the better choice, since I see this is intentional like testReleaseSessionSuccess.

@adixitconfluent
Copy link
Contributor

@adixitconfluent Simply adding that kinds of line is not sufficient. Since tp3 is not in sharePartitionManager.partitionCache, but returned in cachedTopicIdPartitionsInShareSession. Currently, In Following codes of SharePartitionManager.java, exceptions occurs.

        List<TopicIdPartition> topicIdPartitions = cachedTopicIdPartitionsInShareSession(
        groupId, memberIdUuid); // tp3 is included.

        ...

       topicIdPartitions.forEach(topicIdPartition -> {
        SharePartitionKey sharePartitionKey = sharePartitionKey(groupId, topicIdPartition);

        SharePartition sharePartition = partitionCache.get(sharePartitionKey); // In partitionCache, no tp3
        if (sharePartition == null) {
            // this exception is called. 
            log.error("No share partition found for groupId {} topicPartition {} while releasing acquired topic partitions", groupId, topicIdPartition);'

There are two options we can choose.

  1. add tp3 in sharePartitionManager.partitionCache and make mock behaviors for it.
  2. do nothing and see if exception is detected and handled well.

I guess 2 is the better choice, since I see this is intentional like testReleaseSessionSuccess.

@ji-seung-ryu , the point of this test is to test that delayed share fetch requests are completed for the given watch keys when release session API is called for any group member. So, considering that, I think you should add tp3 to partition cache map and then mock releaseAcquiredRecords for tp3 as I mentioned here. Basically, option 1 of your comment.

@ji-seung-ryu
Copy link
Contributor Author

when(sp3.releaseAcquiredRecords(ArgumentMatchers.eq(memberId))).thenReturn(CompletableFuture.completedFuture(null));

I got your point. I will add it.

Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the changes. One minor nit comment

Copy link
Contributor

@adixitconfluent adixitconfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the changes.

@adixitconfluent
Copy link
Contributor

@AndrewJSchofield can you please review this PR as well?

@AndrewJSchofield AndrewJSchofield merged commit 77aff85 into apache:trunk May 26, 2025
25 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ci-approved core Kafka Broker KIP-932 Queues for Kafka small Small PRs tests Test fixes (including flaky tests)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants