Skip to content

KAFKA-17747: [6/N] Replace subscription metadata with metadata hash in share group #19796

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,12 @@
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.ResolvedRegularExpression;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup.InitMapValue;
Expand Down Expand Up @@ -569,51 +566,6 @@ public static CoordinatorRecord newShareGroupMemberSubscriptionTombstoneRecord(
);
}

/**
* Creates a ShareGroupPartitionMetadata record.
*
* @param groupId The group id.
* @param newSubscriptionMetadata The subscription metadata.
* @return The record.
*/
public static CoordinatorRecord newShareGroupSubscriptionMetadataRecord(
String groupId,
Map<String, TopicMetadata> newSubscriptionMetadata
) {
ShareGroupPartitionMetadataValue value = new ShareGroupPartitionMetadataValue();
newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
value.topics().add(new ShareGroupPartitionMetadataValue.TopicMetadata()
.setTopicId(topicMetadata.id())
.setTopicName(topicMetadata.name())
.setNumPartitions(topicMetadata.numPartitions())
)
);

return CoordinatorRecord.record(
new ShareGroupPartitionMetadataKey()
.setGroupId(groupId),
new ApiMessageAndVersion(
value,
(short) 0
)
);
}

/**
* Creates a ShareGroupPartitionMetadata tombstone.
*
* @param groupId The group id.
* @return The record.
*/
public static CoordinatorRecord newShareGroupSubscriptionMetadataTombstoneRecord(
String groupId
) {
return CoordinatorRecord.tombstone(
new ShareGroupPartitionMetadataKey()
.setGroupId(groupId)
);
}

/**
* Creates a ShareGroupMetadata record.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
Expand Down Expand Up @@ -1233,13 +1231,6 @@ public void replay(
);
break;

case SHARE_GROUP_PARTITION_METADATA:
groupMetadataManager.replay(
(ShareGroupPartitionMetadataKey) key,
(ShareGroupPartitionMetadataValue) Utils.messageOrNull(value)
);
break;

case SHARE_GROUP_MEMBER_METADATA:
groupMetadataManager.replay(
(ShareGroupMemberMetadataKey) key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupStatePartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
Expand All @@ -143,7 +141,6 @@
import org.apache.kafka.coordinator.group.modern.ModernGroup;
import org.apache.kafka.coordinator.group.modern.SubscriptionCount;
import org.apache.kafka.coordinator.group.modern.TargetAssignmentBuilder;
import org.apache.kafka.coordinator.group.modern.TopicMetadata;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.consumer.CurrentAssignmentBuilder;
Expand Down Expand Up @@ -237,7 +234,6 @@
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupMemberSubscriptionTombstoneRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupStatePartitionMetadataRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupSubscriptionMetadataRecord;
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers.newShareGroupTargetAssignmentTombstoneRecord;
import static org.apache.kafka.coordinator.group.Utils.assignmentToString;
import static org.apache.kafka.coordinator.group.Utils.ofSentinel;
Expand Down Expand Up @@ -2545,18 +2541,18 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
) || initializedAssignmentPending(group);

int groupEpoch = group.groupEpoch();
Map<String, TopicMetadata> subscriptionMetadata = group.subscriptionMetadata();
Map<String, SubscriptionCount> subscribedTopicNamesMap = group.subscribedTopicNames();
SubscriptionType subscriptionType = group.subscriptionType();

if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
// The subscription metadata is updated in two cases:
// 1) The member has updated its subscriptions;
// 2) The refresh deadline has been reached.
Map<String, SubscriptionCount> subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
subscriptionMetadata = group.computeSubscriptionMetadata(
subscribedTopicNamesMap = group.computeSubscribedTopicNames(member, updatedMember);
long groupMetadataHash = ModernGroup.computeMetadataHash(
subscribedTopicNamesMap,
metadataImage.topics(),
metadataImage.cluster()
topicHashCache,
metadataImage
);

int numMembers = group.numMembers();
Expand All @@ -2569,16 +2565,15 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
numMembers
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId {}] Computed new subscription metadata: {}.",
groupId, subscriptionMetadata);
if (groupMetadataHash != group.metadataHash()) {
log.info("[GroupId {}] Computed new metadata hash: {}.",
groupId, groupMetadataHash);
bumpGroupEpoch = true;
records.add(newShareGroupSubscriptionMetadataRecord(groupId, subscriptionMetadata));
}

if (bumpGroupEpoch) {
groupEpoch += 1;
records.add(newShareGroupEpochRecord(groupId, groupEpoch, 0));
records.add(newShareGroupEpochRecord(groupId, groupEpoch, groupMetadataHash));
log.info("[GroupId {}] Bumped group epoch to {}.", groupId, groupEpoch);
}

Expand Down Expand Up @@ -2635,7 +2630,7 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
records,
Map.entry(
response,
maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscriptionMetadata, records)
maybeCreateInitializeShareGroupStateRequest(groupId, groupEpoch, subscribedTopicNamesMap.keySet(), records)
)
);
}
Expand Down Expand Up @@ -2668,13 +2663,13 @@ private boolean initializedAssignmentPending(ShareGroup group) {
* Computes the diff between the subscribed metadata and the initialized share topic
* partitions corresponding to a share group.
*
* @param groupId The share group id for which diff is being calculated
* @param subscriptionMetadata The subscription metadata corresponding to the share group.
* @param groupId The share group id for which diff is being calculated
* @param subscriptionTopicNames The subscription topic names to the share group.
* @return A map of topic partitions which are subscribed by the share group but not initialized yet.
*/
// Visibility for testing
Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, TopicMetadata> subscriptionMetadata) {
if (subscriptionMetadata == null || subscriptionMetadata.isEmpty()) {
Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Set<String> subscriptionTopicNames) {
if (subscriptionTopicNames == null || subscriptionTopicNames.isEmpty()) {
return Map.of();
}

Expand All @@ -2693,18 +2688,20 @@ Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, To
.filter(entry -> curTimestamp - entry.getValue().timestamp() < delta)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
);

// Here will add any topics which are subscribed but not initialized and initializing
// topics whose timestamp indicates that they are older than delta elapsed.
subscriptionMetadata.forEach((topicName, topicMetadata) -> {
Set<Integer> alreadyInitializedPartSet = alreadyInitialized.containsKey(topicMetadata.id()) ? alreadyInitialized.get(topicMetadata.id()).partitions() : Set.of();
if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicMetadata.numPartitions()) {
Set<Integer> partitionSet = IntStream.range(0, topicMetadata.numPartitions()).boxed().collect(Collectors.toSet());
partitionSet.removeAll(alreadyInitializedPartSet);
// alreadyInitialized contains all initialized topics and initializing topics which are less than delta old
// which means we are putting subscribed topics which are unseen or initializing for more than delta. But, we
// are also updating the timestamp here which means, old initializing will not be included repeatedly.
topicPartitionChangeMap.computeIfAbsent(topicMetadata.id(), k -> new InitMapValue(topicMetadata.name(), partitionSet, curTimestamp));
subscriptionTopicNames.forEach(topicName -> {
TopicImage topicImage = metadataImage.topics().getTopic(topicName);
if (topicImage != null) {
Set<Integer> alreadyInitializedPartSet = alreadyInitialized.containsKey(topicImage.id()) ? alreadyInitialized.get(topicImage.id()).partitions() : Set.of();
if (alreadyInitializedPartSet.isEmpty() || alreadyInitializedPartSet.size() < topicImage.partitions().size()) {
Set<Integer> partitionSet = IntStream.range(0, topicImage.partitions().size()).boxed().collect(Collectors.toSet());
partitionSet.removeAll(alreadyInitializedPartSet);
// alreadyInitialized contains all initialized topics and initializing topics which are less than delta old
// which means we are putting subscribed topics which are unseen or initializing for more than delta. But, we
// are also updating the timestamp here which means, old initializing will not be included repeatedly.
topicPartitionChangeMap.computeIfAbsent(topicImage.id(), k -> new InitMapValue(topicImage.name(), partitionSet, curTimestamp));
}
}
});
return topicPartitionChangeMap;
Expand All @@ -2714,22 +2711,22 @@ Map<Uuid, InitMapValue> subscribedTopicsChangeMap(String groupId, Map<String, To
* Based on the diff between the subscribed topic partitions and the initialized topic partitions,
* created initialize request for the non-initialized ones.
*
* @param groupId The share group id for which partitions need to be initialized.
* @param groupEpoch The group epoch of the share group.
* @param subscriptionMetadata The subscription metadata for the share group.
* @param groupId The share group id for which partitions need to be initialized.
* @param groupEpoch The group epoch of the share group.
* @param subscriptionTopicNames The subscription topic names for the share group.
* @return An optional representing the persister initialize request.
*/
private Optional<InitializeShareGroupStateParameters> maybeCreateInitializeShareGroupStateRequest(
String groupId,
int groupEpoch,
Map<String, TopicMetadata> subscriptionMetadata,
Set<String> subscriptionTopicNames,
List<CoordinatorRecord> records
) {
if (subscriptionMetadata == null || subscriptionMetadata.isEmpty() || metadataImage.isEmpty()) {
if (subscriptionTopicNames == null || subscriptionTopicNames.isEmpty() || metadataImage.isEmpty()) {
return Optional.empty();
}

Map<Uuid, InitMapValue> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionMetadata);
Map<Uuid, InitMapValue> topicPartitionChangeMap = subscribedTopicsChangeMap(groupId, subscriptionTopicNames);

// Nothing to initialize.
if (topicPartitionChangeMap.isEmpty()) {
Expand Down Expand Up @@ -4077,21 +4074,20 @@ private <T> CoordinatorResult<T, CoordinatorRecord> shareGroupFenceMember(
records.add(newShareGroupMemberSubscriptionTombstoneRecord(group.groupId(), member.memberId()));

// We update the subscription metadata without the leaving member.
Map<String, TopicMetadata> subscriptionMetadata = group.computeSubscriptionMetadata(
long groupMetadataHash = ModernGroup.computeMetadataHash(
group.computeSubscribedTopicNames(member, null),
metadataImage.topics(),
metadataImage.cluster()
topicHashCache,
metadataImage
);

if (!subscriptionMetadata.equals(group.subscriptionMetadata())) {
log.info("[GroupId {}] Computed new subscription metadata: {}.",
group.groupId(), subscriptionMetadata);
records.add(newShareGroupSubscriptionMetadataRecord(group.groupId(), subscriptionMetadata));
if (groupMetadataHash != group.metadataHash()) {
log.info("[GroupId {}] Computed new metadata hash: {}.",
group.groupId(), groupMetadataHash);
}

// We bump the group epoch.
int groupEpoch = group.groupEpoch() + 1;
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, 0));
records.add(newShareGroupEpochRecord(group.groupId(), groupEpoch, groupMetadataHash));

cancelGroupSessionTimeout(group.groupId(), member.memberId());

Expand Down Expand Up @@ -5387,6 +5383,7 @@ public void replay(
if (value != null) {
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, true);
shareGroup.setGroupEpoch(value.epoch());
shareGroup.setMetadataHash(value.metadataHash());
} else {
ShareGroup shareGroup = getOrMaybeCreatePersistedShareGroup(groupId, false);
if (!shareGroup.members().isEmpty()) {
Expand Down Expand Up @@ -5567,32 +5564,6 @@ public void replay(
}
}

/**
* Replays ShareGroupPartitionMetadataKey/Value to update the hard state of
* the share group. It updates the subscription metadata of the share
* group.
*
* @param key A ShareGroupPartitionMetadataKey key.
* @param value A ShareGroupPartitionMetadataValue record.
*/
public void replay(
ShareGroupPartitionMetadataKey key,
ShareGroupPartitionMetadataValue value
) {
String groupId = key.groupId();
ShareGroup group = getOrMaybeCreatePersistedShareGroup(groupId, false);

if (value != null) {
Map<String, TopicMetadata> subscriptionMetadata = new HashMap<>();
value.topics().forEach(topicMetadata ->
subscriptionMetadata.put(topicMetadata.topicName(), TopicMetadata.fromRecord(topicMetadata))
);
group.setSubscriptionMetadata(subscriptionMetadata);
} else {
group.setSubscriptionMetadata(Map.of());
}
}

/**
* Replays ShareGroupTargetAssignmentMemberKey/Value to update the hard state of
* the share group. It updates the target assignment of the member or deletes it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,7 @@ static void throwIfRegularExpressionIsInvalid(
* The computed hash value is stored as the metadata hash in the *GroupMetadataValue.
* <p>
* If there is no topic, the hash value is set to 0.
* If a topic hash is 0, ignore the topic.
* The hashing process involves the following steps:
* 1. Sort the topic hashes by topic name.
* 2. Write each topic hash in order.
Expand Down
Loading