Skip to content

Commit 645d0d9

Browse files
committed
feat(multi_read): add partition snapshot manager
Signed-off-by: Robin Han <[email protected]>
1 parent 7bdfc63 commit 645d0d9

File tree

47 files changed

+1387
-218
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+1387
-218
lines changed

clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java

+1
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ public enum ApiKeys {
148148
AUTOMQ_REGISTER_NODE(ApiMessageType.AUTOMQ_REGISTER_NODE, false, false),
149149
AUTOMQ_GET_NODES(ApiMessageType.AUTOMQ_GET_NODES, false, true),
150150
AUTOMQ_ZONE_ROUTER(ApiMessageType.AUTOMQ_ZONE_ROUTER, false, false),
151+
AUTOMQ_GET_PARTITION_SNAPSHOT(ApiMessageType.AUTOMQ_GET_PARTITION_SNAPSHOT, false, false),
151152

152153
GET_NEXT_NODE_ID(ApiMessageType.GET_NEXT_NODE_ID, false, true),
153154
DESCRIBE_STREAMS(ApiMessageType.DESCRIBE_STREAMS, false, true),

clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java

+3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.kafka.common.protocol.ObjectSerializationCache;
2525
import org.apache.kafka.common.protocol.SendBuilder;
2626
import org.apache.kafka.common.requests.s3.AutomqGetNodesRequest;
27+
import org.apache.kafka.common.requests.s3.AutomqGetPartitionSnapshotRequest;
2728
import org.apache.kafka.common.requests.s3.AutomqRegisterNodeRequest;
2829
import org.apache.kafka.common.requests.s3.AutomqUpdateGroupRequest;
2930
import org.apache.kafka.common.requests.s3.AutomqZoneRouterRequest;
@@ -376,6 +377,8 @@ private static AbstractRequest doParseRequest(ApiKeys apiKey, short apiVersion,
376377
return AutomqGetNodesRequest.parse(buffer, apiVersion);
377378
case AUTOMQ_ZONE_ROUTER:
378379
return AutomqZoneRouterRequest.parse(buffer, apiVersion);
380+
case AUTOMQ_GET_PARTITION_SNAPSHOT:
381+
return AutomqGetPartitionSnapshotRequest.parse(buffer, apiVersion);
379382
case GET_NEXT_NODE_ID:
380383
return GetNextNodeIdRequest.parse(buffer, apiVersion);
381384
case DESCRIBE_STREAMS:

clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java

+3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.kafka.common.protocol.MessageUtil;
2323
import org.apache.kafka.common.protocol.SendBuilder;
2424
import org.apache.kafka.common.requests.s3.AutomqGetNodesResponse;
25+
import org.apache.kafka.common.requests.s3.AutomqGetPartitionSnapshotResponse;
2526
import org.apache.kafka.common.requests.s3.AutomqRegisterNodeResponse;
2627
import org.apache.kafka.common.requests.s3.AutomqUpdateGroupResponse;
2728
import org.apache.kafka.common.requests.s3.AutomqZoneRouterResponse;
@@ -313,6 +314,8 @@ public static AbstractResponse parseResponse(ApiKeys apiKey, ByteBuffer response
313314
return AutomqGetNodesResponse.parse(responseBuffer, version);
314315
case AUTOMQ_ZONE_ROUTER:
315316
return AutomqZoneRouterResponse.parse(responseBuffer, version);
317+
case AUTOMQ_GET_PARTITION_SNAPSHOT:
318+
return AutomqGetPartitionSnapshotResponse.parse(responseBuffer, version);
316319
case GET_NEXT_NODE_ID:
317320
return GetNextNodeIdResponse.parse(responseBuffer, version);
318321
case DESCRIBE_STREAMS:
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package org.apache.kafka.common.requests.s3;
13+
14+
import org.apache.kafka.common.message.AutomqGetPartitionSnapshotRequestData;
15+
import org.apache.kafka.common.message.AutomqGetPartitionSnapshotResponseData;
16+
import org.apache.kafka.common.protocol.ApiKeys;
17+
import org.apache.kafka.common.protocol.ByteBufferAccessor;
18+
import org.apache.kafka.common.requests.AbstractRequest;
19+
import org.apache.kafka.common.requests.AbstractResponse;
20+
import org.apache.kafka.common.requests.ApiError;
21+
22+
import java.nio.ByteBuffer;
23+
24+
public class AutomqGetPartitionSnapshotRequest extends AbstractRequest {
25+
private final AutomqGetPartitionSnapshotRequestData data;
26+
27+
public AutomqGetPartitionSnapshotRequest(AutomqGetPartitionSnapshotRequestData data, short version) {
28+
super(ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT, version);
29+
this.data = data;
30+
}
31+
32+
@Override
33+
public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
34+
ApiError apiError = ApiError.fromThrowable(e);
35+
AutomqGetPartitionSnapshotResponseData response = new AutomqGetPartitionSnapshotResponseData()
36+
.setErrorCode(apiError.error().code())
37+
.setThrottleTimeMs(throttleTimeMs);
38+
return new AutomqGetPartitionSnapshotResponse(response);
39+
}
40+
41+
@Override
42+
public AutomqGetPartitionSnapshotRequestData data() {
43+
return data;
44+
}
45+
46+
public static AutomqGetPartitionSnapshotRequest parse(ByteBuffer buffer, short version) {
47+
return new AutomqGetPartitionSnapshotRequest(new AutomqGetPartitionSnapshotRequestData(new ByteBufferAccessor(buffer), version), version);
48+
}
49+
50+
public static class Builder extends AbstractRequest.Builder<AutomqGetPartitionSnapshotRequest> {
51+
private final AutomqGetPartitionSnapshotRequestData data;
52+
53+
public Builder(AutomqGetPartitionSnapshotRequestData data) {
54+
super(ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT);
55+
this.data = data;
56+
}
57+
58+
@Override
59+
public AutomqGetPartitionSnapshotRequest build(short version) {
60+
return new AutomqGetPartitionSnapshotRequest(data, version);
61+
}
62+
63+
@Override
64+
public String toString() {
65+
return data.toString();
66+
}
67+
}
68+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package org.apache.kafka.common.requests.s3;
13+
14+
import org.apache.kafka.common.message.AutomqGetPartitionSnapshotResponseData;
15+
import org.apache.kafka.common.protocol.ApiKeys;
16+
import org.apache.kafka.common.protocol.ByteBufferAccessor;
17+
import org.apache.kafka.common.protocol.Errors;
18+
import org.apache.kafka.common.requests.AbstractResponse;
19+
20+
import java.nio.ByteBuffer;
21+
import java.util.Map;
22+
23+
public class AutomqGetPartitionSnapshotResponse extends AbstractResponse {
24+
private final AutomqGetPartitionSnapshotResponseData data;
25+
26+
public AutomqGetPartitionSnapshotResponse(AutomqGetPartitionSnapshotResponseData data) {
27+
super(ApiKeys.AUTOMQ_GET_PARTITION_SNAPSHOT);
28+
this.data = data;
29+
}
30+
31+
@Override
32+
public Map<Errors, Integer> errorCounts() {
33+
return errorCounts(Errors.forCode(data.errorCode()));
34+
}
35+
36+
@Override
37+
public int throttleTimeMs() {
38+
return data.throttleTimeMs();
39+
}
40+
41+
@Override
42+
public void maybeSetThrottleTimeMs(int throttleTimeMs) {
43+
data.setThrottleTimeMs(throttleTimeMs);
44+
}
45+
46+
@Override
47+
public AutomqGetPartitionSnapshotResponseData data() {
48+
return data;
49+
}
50+
51+
public static AutomqGetPartitionSnapshotResponse parse(ByteBuffer buffer, short version) {
52+
return new AutomqGetPartitionSnapshotResponse(new AutomqGetPartitionSnapshotResponseData(new ByteBufferAccessor(buffer), version));
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
{
17+
"apiKey": 516,
18+
"type": "request",
19+
"listeners": [
20+
"broker"
21+
],
22+
"name": "AutomqGetPartitionSnapshotRequest",
23+
"validVersions": "0",
24+
"flexibleVersions": "0+",
25+
"fields": [
26+
{
27+
"name": "SessionId",
28+
"type": "int32",
29+
"versions": "0+",
30+
"about": "The get session id"
31+
},
32+
{
33+
"name": "SessionEpoch",
34+
"type": "int32",
35+
"versions": "0+",
36+
"about": "The get session epoch, which is used for ordering requests in a session"
37+
}
38+
]
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one or more
2+
// contributor license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright ownership.
4+
// The ASF licenses this file to You under the Apache License, Version 2.0
5+
// (the "License"); you may not use this file except in compliance with
6+
// the License. You may obtain a copy of the License at
7+
//
8+
// http://www.apache.org/licenses/LICENSE-2.0
9+
//
10+
// Unless required by applicable law or agreed to in writing, software
11+
// distributed under the License is distributed on an "AS IS" BASIS,
12+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
// See the License for the specific language governing permissions and
14+
// limitations under the License.
15+
16+
{
17+
"apiKey": 516,
18+
"type": "response",
19+
"name": "AutomqGetPartitionSnapshotResponse",
20+
"validVersions": "0",
21+
"flexibleVersions": "0+",
22+
"fields": [
23+
{ "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The top level response error code" },
24+
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
25+
{ "name": "SessionId", "type": "int32", "versions": "0+", "about": "The get session id" },
26+
{ "name": "SessionEpoch", "type": "int32", "versions": "0+", "about": "The next get session epoch" },
27+
{ "name": "Topics", "type": "[]Topic", "versions": "0+", "about": "The topic list", "fields": [
28+
{ "name": "TopicId", "type": "uuid", "versions": "0+", "about": "The topic id", "mapKey": true },
29+
{ "name": "Partitions", "type": "[]PartitionSnapshot", "versions": "0+", "about": "The partition snapshot list", "fields": [
30+
{ "name": "PartitionIndex", "type": "int32", "versions": "0+", "about": "The partition index"},
31+
{ "name": "LeaderEpoch", "type": "int32", "versions": "0+", "about": "The partition leader epoch"},
32+
{ "name": "Operation", "type": "int16", "versions": "0+", "about": "The snapshot operation, 0 -> ADD, 1 -> PATCH, 2 -> REMOVE"},
33+
{ "name": "LogMetadata", "type": "LogMetadata", "versions": "0+", "nullableVersions": "0+", "about": "The log metadata" },
34+
{ "name": "FirstUnstableOffset", "type": "LogOffsetMetadata", "versions": "0+", "nullableVersions": "0+", "about": "The partition first unstable offset" },
35+
{ "name": "LogEndOffset", "type": "LogOffsetMetadata", "versions": "0+", "nullableVersions": "0+", "about": "The partition log end offset" },
36+
{ "name": "StreamMetadata", "type": "[]StreamMetadata", "versions": "0+", "nullableVersions": "0+", "fields": [
37+
{ "name": "StreamId", "type": "int64", "versions": "0+", "about": "The streamId" },
38+
{ "name": "EndOffset", "type": "int64", "versions": "0+", "about": "The stream end offset" }
39+
]}
40+
]}
41+
]}
42+
],
43+
"commonStructs": [
44+
{ "name": "LogMetadata", "versions": "0+", "fields": [
45+
{ "name": "segments", "versions": "0+", "type": "[]SegmentMetadata", "about": "The segment list" },
46+
{ "name": "streamMap", "versions": "0+", "type": "[]StreamMapping" }
47+
]},
48+
{ "name": "StreamMapping", "versions": "0+", "fields": [
49+
{ "name": "name", "type": "string", "versions": "0+", "about": "The streamName", "mapKey": true },
50+
{ "name": "streamId", "type": "int64", "versions": "0+", "about": "The stream id" }
51+
]},
52+
{ "name": "LogOffsetMetadata", "versions": "0+", "fields": [
53+
{ "name": "messageOffset", "type": "int64", "versions": "0+", "about": "The message logic offset" },
54+
{ "name": "relativePositionInSegment", "type": "int32", "versions": "0+", "about": "The message relative physical offset" }
55+
]},
56+
{ "name": "SegmentMetadata", "versions": "0+", "fields": [
57+
{ "name": "baseOffset", "versions": "0+", "type": "int64", "about": "The segment base offset" },
58+
{ "name": "createTimestamp", "versions": "0+", "type": "int64", "about": "The segment create timestamp" },
59+
{ "name": "lastModifiedTimestamp", "versions": "0+", "type": "int64", "about": "The segment last modified timestamp" },
60+
{ "name": "streamSuffix", "versions": "0+", "type": "string", "about": "The segment's stream suffix" },
61+
{ "name": "logSize", "versions": "0+", "type": "int32", "about": "The segment size" },
62+
{ "name": "log", "versions": "0+", "type": "SliceRange", "about": "The segment log stream slice range" },
63+
{ "name": "time", "versions": "0+", "type": "SliceRange", "about": "The segment time stream slice range" },
64+
{ "name": "transaction", "versions": "0+", "type": "SliceRange", "about": "The segment transaction stream slice range" },
65+
{ "name": "firstBatchTimestamp", "versions": "0+", "type": "int64", "about": "The segment first batch timestamp" },
66+
{ "name": "timeIndexLastEntry", "versions": "0+", "type": "TimestampOffsetData", "about": "The segment last timestamp index entry" }
67+
]},
68+
{ "name": "SliceRange", "versions": "0+", "fields": [
69+
{ "name": "start", "versions": "0+", "type": "int64", "about": "The range start offset" },
70+
{ "name": "end", "versions": "0+", "type": "int64", "about": "The range end offset" }
71+
]},
72+
{ "name": "TimestampOffsetData", "versions": "0+", "fields": [
73+
{ "name": "timestamp", "versions": "0+", "type": "int64", "about": "The range start offset" },
74+
{ "name": "offset", "versions": "0+", "type": "int64", "about": "The range end offset" }
75+
]}
76+
]
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Copyright 2024, AutoMQ HK Limited.
3+
*
4+
* The use of this file is governed by the Business Source License,
5+
* as detailed in the file "/LICENSE.S3Stream" included in this repository.
6+
*
7+
* As of the Change Date specified in that file, in accordance with
8+
* the Business Source License, use of this software will be governed
9+
* by the Apache License, Version 2.0
10+
*/
11+
12+
package kafka.automq.partition.snapshot;
13+
14+
import java.util.Objects;
15+
16+
public class PartitionSnapshotVersion {
17+
// increment every time when there is a segment change
18+
private int segmentsVersion;
19+
// increment every time when there is a new record
20+
private int recordsVersion;
21+
22+
private PartitionSnapshotVersion(int segmentsVersion, int recordsVersion) {
23+
this.segmentsVersion = segmentsVersion;
24+
this.recordsVersion = recordsVersion;
25+
}
26+
27+
public static PartitionSnapshotVersion create() {
28+
return new PartitionSnapshotVersion(0, 0);
29+
}
30+
31+
public int segmentsVersion() {
32+
return segmentsVersion;
33+
}
34+
35+
public int recordsVersion() {
36+
return recordsVersion;
37+
}
38+
39+
public PartitionSnapshotVersion incrementSegmentsVersion() {
40+
segmentsVersion++;
41+
return this;
42+
}
43+
44+
public PartitionSnapshotVersion incrementRecordsVersion() {
45+
recordsVersion++;
46+
return this;
47+
}
48+
49+
@Override
50+
public boolean equals(Object o) {
51+
if (o == null || getClass() != o.getClass())
52+
return false;
53+
PartitionSnapshotVersion version = (PartitionSnapshotVersion) o;
54+
return segmentsVersion == version.segmentsVersion && recordsVersion == version.recordsVersion;
55+
}
56+
57+
@Override
58+
public int hashCode() {
59+
return Objects.hash(segmentsVersion, recordsVersion);
60+
}
61+
62+
public PartitionSnapshotVersion copy() {
63+
return new PartitionSnapshotVersion(segmentsVersion, recordsVersion);
64+
}
65+
66+
}

0 commit comments

Comments
 (0)