Skip to content

Commit f83b6ea

Browse files
feature: support grpc protocol (apache#6881)
1 parent 0af9125 commit f83b6ea

File tree

29 files changed

+1026
-18
lines changed

29 files changed

+1026
-18
lines changed

changes/en-us/2.x.md

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ Add changes here for all PR submitted to the 2.x branch.
55
### feature:
66

77
- [[#6876](https://github.com/apache/incubator-seata/pull/6876)]support kingbase
8+
- [[#6881](https://github.com/apache/incubator-seata/pull/6881)]support grpc
89

910
### bugfix:
1011

@@ -35,6 +36,7 @@ Thanks to these contributors for their code commits. Please report an unintended
3536
- [dk2k](https://github.com/dk2k)
3637
- [MaoMaoandSnail](https://github.com/MaoMaoandSnail)
3738
- [yougecn](https://github.com/yougecn)
39+
- [PleaseGiveMeTheCoke](https://github.com/PleaseGiveMeTheCoke)
3840

3941

4042

changes/zh-cn/2.x.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### feature:
66
[[#6876](https://github.com/apache/incubator-seata/pull/6876)]支持人大金仓数据库(kingbase)
7+
[[#6881](https://github.com/apache/incubator-seata/pull/6881)]全链路支持grpc
78

89
### bugfix:
910

@@ -35,7 +36,7 @@
3536
- [dk2k](https://github.com/dk2k)
3637
- [MaoMaoandSnail](https://github.com/MaoMaoandSnail)
3738
- [yougecn](https://github.com/yougecn)
38-
39+
- [PleaseGiveMeTheCoke](https://github.com/PleaseGiveMeTheCoke)
3940

4041

4142
同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。

common/src/main/java/org/apache/seata/common/ConfigurationKeys.java

+2
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,8 @@ public interface ConfigurationKeys {
628628
@Deprecated
629629
String ENABLE_CLIENT_BATCH_SEND_REQUEST = TRANSPORT_PREFIX + "enableClientBatchSendRequest";
630630

631+
String TRANSPORT_PROTOCOL = TRANSPORT_PREFIX + "protocol";
632+
631633
/**
632634
* The constant ENABLE_TM_CLIENT_BATCH_SEND_REQUEST
633635
*/

common/src/main/java/org/apache/seata/common/DefaultValues.java

+1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public interface DefaultValues {
6363
String DEFAULT_BOSS_THREAD_PREFIX = "NettyBoss";
6464
String DEFAULT_NIO_WORKER_THREAD_PREFIX = "NettyServerNIOWorker";
6565
String DEFAULT_EXECUTOR_THREAD_PREFIX = "NettyServerBizHandler";
66+
String DEFAULT_PROTOCOL = "seata";
6667

6768
boolean DEFAULT_TRANSPORT_HEARTBEAT = true;
6869
boolean DEFAULT_TRANSACTION_UNDO_DATA_VALIDATION = true;

core/pom.xml

+21
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@
6969
<artifactId>fastjson</artifactId>
7070
<scope>test</scope>
7171
</dependency>
72+
<dependency>
73+
<groupId>com.google.protobuf</groupId>
74+
<artifactId>protobuf-java</artifactId>
75+
</dependency>
7276
</dependencies>
7377

7478
<build>
@@ -90,6 +94,23 @@
9094
</execution>
9195
</executions>
9296
</plugin>
97+
<plugin>
98+
<groupId>org.xolstice.maven.plugins</groupId>
99+
<artifactId>protobuf-maven-plugin</artifactId>
100+
<configuration>
101+
<protoSourceRoot>${project.basedir}/src/main/resources/protobuf/org/apache/seata/protocol/transcation/</protoSourceRoot>
102+
<protocArtifact>
103+
com.google.protobuf:protoc:3.25.4:exe:${os.detected.classifier}
104+
</protocArtifact>
105+
</configuration>
106+
<executions>
107+
<execution>
108+
<goals>
109+
<goal>compile</goal>
110+
</goals>
111+
</execution>
112+
</executions>
113+
</plugin>
93114
</plugins>
94115
</build>
95116
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.core.protocol;
18+
19+
/**
20+
* seata transport protocol
21+
*/
22+
public enum Protocol {
23+
24+
/**
25+
* grpc
26+
*/
27+
GPRC("grpc"),
28+
29+
/**
30+
* seata
31+
*/
32+
SEATA("seata");
33+
34+
public final String value;
35+
36+
Protocol(String value) {
37+
this.value = value;
38+
}
39+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.core.protocol.detector;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.channel.ChannelHandler;
21+
import io.netty.channel.ChannelInitializer;
22+
import io.netty.channel.ChannelPipeline;
23+
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
24+
import io.netty.handler.codec.http2.Http2MultiplexHandler;
25+
import io.netty.handler.codec.http2.Http2StreamChannel;
26+
import io.netty.util.CharsetUtil;
27+
import org.apache.seata.core.rpc.netty.grpc.GrpcDecoder;
28+
import org.apache.seata.core.rpc.netty.grpc.GrpcEncoder;
29+
30+
public class Http2Detector implements ProtocolDetector {
31+
private static final byte[] HTTP2_PREFIX_BYTES = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n".getBytes(CharsetUtil.UTF_8);
32+
private ChannelHandler[] serverHandlers;
33+
34+
public Http2Detector(ChannelHandler[] serverHandlers) {
35+
this.serverHandlers = serverHandlers;
36+
}
37+
38+
@Override
39+
public boolean detect(ByteBuf in) {
40+
if (in.readableBytes() < HTTP2_PREFIX_BYTES.length) {
41+
return false;
42+
}
43+
for (int i = 0; i < HTTP2_PREFIX_BYTES.length; i++) {
44+
if (in.getByte(i) != HTTP2_PREFIX_BYTES[i]) {
45+
return false;
46+
}
47+
}
48+
return true;
49+
}
50+
51+
@Override
52+
public ChannelHandler[] getHandlers() {
53+
return new ChannelHandler[]{
54+
Http2FrameCodecBuilder.forServer().build(),
55+
new Http2MultiplexHandler(new ChannelInitializer<Http2StreamChannel>() {
56+
@Override
57+
protected void initChannel(Http2StreamChannel ch) {
58+
final ChannelPipeline p = ch.pipeline();
59+
p.addLast(new GrpcDecoder());
60+
p.addLast(new GrpcEncoder());
61+
p.addLast(serverHandlers);
62+
}
63+
})
64+
};
65+
}
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.core.protocol.detector;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.channel.ChannelHandler;
21+
22+
public interface ProtocolDetector {
23+
boolean detect(ByteBuf in);
24+
25+
ChannelHandler[] getHandlers();
26+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.core.protocol.detector;
18+
19+
import io.netty.buffer.ByteBuf;
20+
import io.netty.channel.ChannelHandler;
21+
import org.apache.seata.core.rpc.netty.MultiProtocolDecoder;
22+
23+
public class SeataDetector implements ProtocolDetector {
24+
private static final byte[] MAGIC_CODE_BYTES = {(byte) 0xda, (byte) 0xda};
25+
private ChannelHandler[] serverHandlers;
26+
27+
public SeataDetector(ChannelHandler[] serverHandlers) {
28+
this.serverHandlers = serverHandlers;
29+
}
30+
31+
@Override
32+
public boolean detect(ByteBuf in) {
33+
if (in.readableBytes() < MAGIC_CODE_BYTES.length) {
34+
return false;
35+
}
36+
for (int i = 0; i < MAGIC_CODE_BYTES.length; i++) {
37+
if (in.getByte(i) != MAGIC_CODE_BYTES[i]) {
38+
return false;
39+
}
40+
}
41+
return true;
42+
}
43+
44+
@Override
45+
public ChannelHandler[] getHandlers() {
46+
MultiProtocolDecoder multiProtocolDecoder = new MultiProtocolDecoder(serverHandlers);
47+
48+
return new ChannelHandler[]{multiProtocolDecoder};
49+
}
50+
}

core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientBootstrap.java

+42-8
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
import io.netty.bootstrap.Bootstrap;
2020
import io.netty.channel.Channel;
21+
import io.netty.channel.ChannelDuplexHandler;
2122
import io.netty.channel.ChannelFuture;
2223
import io.netty.channel.ChannelHandler;
24+
import io.netty.channel.ChannelHandlerContext;
25+
import io.netty.channel.ChannelInboundHandlerAdapter;
2326
import io.netty.channel.ChannelInitializer;
2427
import io.netty.channel.ChannelOption;
2528
import io.netty.channel.ChannelPipeline;
@@ -28,13 +31,19 @@
2831
import io.netty.channel.epoll.EpollMode;
2932
import io.netty.channel.nio.NioEventLoopGroup;
3033
import io.netty.channel.socket.SocketChannel;
34+
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
35+
import io.netty.handler.codec.http2.Http2MultiplexHandler;
36+
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
3137
import io.netty.handler.timeout.IdleStateHandler;
3238
import io.netty.util.concurrent.DefaultEventExecutorGroup;
3339
import io.netty.util.concurrent.EventExecutorGroup;
3440
import io.netty.util.internal.PlatformDependent;
3541
import org.apache.seata.common.exception.FrameworkException;
3642
import org.apache.seata.common.thread.NamedThreadFactory;
43+
import org.apache.seata.core.protocol.Protocol;
3744
import org.apache.seata.core.rpc.RemotingBootstrap;
45+
import org.apache.seata.core.rpc.netty.grpc.GrpcDecoder;
46+
import org.apache.seata.core.rpc.netty.grpc.GrpcEncoder;
3847
import org.apache.seata.core.rpc.netty.v1.ProtocolDecoderV1;
3948
import org.apache.seata.core.rpc.netty.v1.ProtocolEncoderV1;
4049
import org.slf4j.Logger;
@@ -130,14 +139,18 @@ public void start() {
130139
@Override
131140
public void initChannel(SocketChannel ch) {
132141
ChannelPipeline pipeline = ch.pipeline();
133-
pipeline
134-
.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
135-
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
136-
nettyClientConfig.getChannelMaxAllIdleSeconds()))
137-
.addLast(new ProtocolDecoderV1())
138-
.addLast(new ProtocolEncoderV1());
139-
if (channelHandlers != null) {
140-
addChannelPipelineLast(ch, channelHandlers);
142+
if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) {
143+
pipeline.addLast(Http2FrameCodecBuilder.forClient().build())
144+
.addLast(new Http2MultiplexHandler(new ChannelDuplexHandler()));
145+
} else {
146+
pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
147+
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
148+
nettyClientConfig.getChannelMaxAllIdleSeconds()));
149+
pipeline.addLast(new ProtocolDecoderV1())
150+
.addLast(new ProtocolEncoderV1());
151+
if (channelHandlers != null) {
152+
addChannelPipelineLast(ch, channelHandlers);
153+
}
141154
}
142155
}
143156
});
@@ -177,9 +190,30 @@ public Channel getNewChannel(InetSocketAddress address) {
177190
} else {
178191
channel = f.channel();
179192
}
193+
194+
if (nettyClientConfig.getProtocol().equals(Protocol.GPRC.value)) {
195+
Http2StreamChannelBootstrap bootstrap = new Http2StreamChannelBootstrap(channel);
196+
bootstrap.handler(new ChannelInboundHandlerAdapter() {
197+
@Override
198+
public void handlerAdded(ChannelHandlerContext ctx) {
199+
Channel channel = ctx.channel();
200+
channel.pipeline().addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),
201+
nettyClientConfig.getChannelMaxWriteIdleSeconds(),
202+
nettyClientConfig.getChannelMaxAllIdleSeconds()));
203+
channel.pipeline().addLast(new GrpcDecoder());
204+
channel.pipeline().addLast(new GrpcEncoder());
205+
if (channelHandlers != null) {
206+
addChannelPipelineLast(channel, channelHandlers);
207+
}
208+
}
209+
});
210+
channel = bootstrap.open().get();
211+
}
212+
180213
} catch (Exception e) {
181214
throw new FrameworkException(e, "can not connect to services-server.");
182215
}
216+
183217
return channel;
184218
}
185219

core/src/main/java/org/apache/seata/core/rpc/netty/NettyClientConfig.java

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.seata.core.rpc.TransportServerType;
2222

2323
import static org.apache.seata.common.DefaultValues.DEFAULT_ENABLE_CLIENT_BATCH_SEND_REQUEST;
24+
import static org.apache.seata.common.DefaultValues.DEFAULT_PROTOCOL;
2425
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_RM_REQUEST_TIMEOUT;
2526
import static org.apache.seata.common.DefaultValues.DEFAULT_RPC_TM_REQUEST_TIMEOUT;
2627
import static org.apache.seata.common.DefaultValues.DEFAULT_SELECTOR_THREAD_PREFIX;
@@ -451,6 +452,10 @@ public String getRmDispatchThreadPrefix() {
451452
return RPC_DISPATCH_THREAD_PREFIX + "_" + NettyPoolKey.TransactionRole.RMROLE.name();
452453
}
453454

455+
public String getProtocol() {
456+
return CONFIG.getConfig(org.apache.seata.common.ConfigurationKeys.TRANSPORT_PROTOCOL, DEFAULT_PROTOCOL);
457+
}
458+
454459
@Deprecated
455460
public static boolean isEnableClientBatchSendRequest() {
456461
return ENABLE_CLIENT_BATCH_SEND_REQUEST;

0 commit comments

Comments
 (0)