Skip to content

Commit 7ce96e4

Browse files
authored
feature : RocketMQ transaction are supported (apache#6230)
1 parent e83d49b commit 7ce96e4

File tree

26 files changed

+785
-1
lines changed

26 files changed

+785
-1
lines changed

all/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@
192192
</exclusion>
193193
</exclusions>
194194
</dependency>
195+
<dependency>
196+
<groupId>org.apache.seata</groupId>
197+
<artifactId>seata-rocketmq</artifactId>
198+
<version>${project.version}</version>
199+
</dependency>
195200
<dependency>
196201
<groupId>org.apache.seata</groupId>
197202
<artifactId>seata-sqlparser-core</artifactId>

bom/pom.xml

+5
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,11 @@
191191
<artifactId>seata-http</artifactId>
192192
<version>${project.version}</version>
193193
</dependency>
194+
<dependency>
195+
<groupId>org.apache.seata</groupId>
196+
<artifactId>seata-rocketmq</artifactId>
197+
<version>${project.version}</version>
198+
</dependency>
194199
<dependency>
195200
<groupId>org.apache.seata</groupId>
196201
<artifactId>seata-rm</artifactId>

changes/en-us/2.x.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Add changes here for all PR submitted to the 2.x branch.
66
- [[#6370](https://github.com/seata/seata/pull/6370)] seata saga decouple spring, optimize architecture.
77
- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] mock server
88
- [[#6169](https://github.com/apache/incubator-seata/pull/6169)] full support for states in the refactored state machine designer
9+
- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] RocketMQ transaction are supported
910

1011
### bugfix:
1112
- [[#6090](https://github.com/apache/incubator-seata/pull/6090)] fix the TCC aspect exception handling process, do not wrapping the internal call exceptions

changes/zh-cn/2.x.md

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- [[#6370](https://github.com/seata/seata/pull/6370)] seata saga spring接耦、架构优化。
77
- [[#6205](https://github.com/apache/incubator-seata/pull/6205)] 提供mock server
88
- [[#6169](https://github.com/apache/incubator-seata/pull/6169)] 支持新版本状态机设计器
9+
- [[#6230](https://github.com/apache/incubator-seata/pull/6230)] 支持RocketMQ消息事务
910

1011
### bugfix:
1112
- [[#6090](https://github.com/apache/incubator-seata/pull/6090)] 修复tcc切面异常处理过程,不对内部调用异常做包装处理,直接向外抛出

common/src/main/java/org/apache/seata/common/ConfigurationKeys.java

+5
Original file line numberDiff line numberDiff line change
@@ -1006,4 +1006,9 @@ public interface ConfigurationKeys {
10061006
* The constant SERVER_APPLICATION_DATA_SIZE_CHECK
10071007
*/
10081008
String SERVER_APPLICATION_DATA_SIZE_CHECK = SERVER_PREFIX + "applicationDataLimitCheck";
1009+
1010+
/**
1011+
* The constant ROCKET_MQ_MSG_TIMEOUT
1012+
*/
1013+
String ROCKET_MQ_MSG_TIMEOUT = SERVER_PREFIX + "rocketmqMsgTimeout";
10091014
}

common/src/main/java/org/apache/seata/common/DefaultValues.java

+2
Original file line numberDiff line numberDiff line change
@@ -312,4 +312,6 @@ public interface DefaultValues {
312312
* Default druid location in classpath
313313
*/
314314
String DRUID_LOCATION = "lib/sqlparser/druid.jar";
315+
316+
int DEFAULT_ROCKET_MQ_MSG_TIMEOUT = 60 * 1000;
315317
}

core/src/main/java/org/apache/seata/core/context/RootContext.java

+2
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ private RootContext() {
4848
*/
4949
public static final String KEY_XID = "TX_XID";
5050

51+
public static final String KEY_BRANCHID = "TX_BRANCHID";
52+
5153
/**
5254
* The constant HIDDEN_KEY_XID for sofa-rpc integration.
5355
*/

core/src/main/java/org/apache/seata/core/model/ResourceManager.java

+9
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,13 @@ public interface ResourceManager extends ResourceManagerInbound, ResourceManager
5151
* @return The BranchType of ResourceManager.
5252
*/
5353
BranchType getBranchType();
54+
55+
/**
56+
* Get the GlobalStatus.
57+
*
58+
* @param branchType The BranchType of ResourceManager.
59+
* @param xid The xid of transaction.
60+
* @return The GlobalStatus of transaction.
61+
*/
62+
GlobalStatus getGlobalStatus(BranchType branchType, String xid);
5463
}

dependencies/pom.xml

+7
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@
114114
<!-- for jdbc driver when package -->
115115
<mysql5.version>${mysql.version}</mysql5.version>
116116
<mysql8.version>8.0.27</mysql8.version>
117+
<!-- rocketmq -->
118+
<rocketmq-version>5.0.0</rocketmq-version>
117119

118120
<!-- # for kotlin -->
119121
<kotlin.version>1.4.32</kotlin.version>
@@ -781,6 +783,11 @@
781783
<artifactId>janino</artifactId>
782784
<version>${janino-version}</version>
783785
</dependency>
786+
<dependency>
787+
<groupId>org.apache.rocketmq</groupId>
788+
<artifactId>rocketmq-client</artifactId>
789+
<version>${rocketmq-version}</version>
790+
</dependency>
784791

785792
<!-- web -->
786793
<dependency>

pom.xml

+1
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
<module>integration/brpc</module>
5858
<module>rm</module>
5959
<module>rm-datasource</module>
60+
<module>rocketmq</module>
6061
<module>spring</module>
6162
<module>tcc</module>
6263
<module>test</module>

rm/src/main/java/org/apache/seata/rm/AbstractResourceManager.java

+15
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,16 @@
2828
import org.apache.seata.core.exception.TransactionExceptionCode;
2929
import org.apache.seata.core.model.BranchStatus;
3030
import org.apache.seata.core.model.BranchType;
31+
import org.apache.seata.core.model.GlobalStatus;
3132
import org.apache.seata.core.model.Resource;
3233
import org.apache.seata.core.model.ResourceManager;
3334
import org.apache.seata.core.protocol.ResultCode;
3435
import org.apache.seata.core.protocol.transaction.BranchRegisterRequest;
3536
import org.apache.seata.core.protocol.transaction.BranchRegisterResponse;
3637
import org.apache.seata.core.protocol.transaction.BranchReportRequest;
3738
import org.apache.seata.core.protocol.transaction.BranchReportResponse;
39+
import org.apache.seata.core.protocol.transaction.GlobalStatusRequest;
40+
import org.apache.seata.core.protocol.transaction.GlobalStatusResponse;
3841
import org.apache.seata.core.rpc.netty.RmNettyRemotingClient;
3942
import org.slf4j.Logger;
4043
import org.slf4j.LoggerFactory;
@@ -140,4 +143,16 @@ public void unregisterResource(Resource resource) {
140143
public void registerResource(Resource resource) {
141144
RmNettyRemotingClient.getInstance().registerResource(resource.getResourceGroupId(), resource.getResourceId());
142145
}
146+
147+
@Override
148+
public GlobalStatus getGlobalStatus(BranchType branchType, String xid) {
149+
GlobalStatusRequest queryGlobalStatus = new GlobalStatusRequest();
150+
queryGlobalStatus.setXid(xid);
151+
try {
152+
GlobalStatusResponse response = (GlobalStatusResponse) RmNettyRemotingClient.getInstance().sendSyncRequest(queryGlobalStatus);
153+
return response.getGlobalStatus();
154+
} catch (TimeoutException e) {
155+
throw new RuntimeException(e);
156+
}
157+
}
143158
}

rm/src/main/java/org/apache/seata/rm/DefaultResourceManager.java

+6
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.seata.core.exception.TransactionException;
2828
import org.apache.seata.core.model.BranchStatus;
2929
import org.apache.seata.core.model.BranchType;
30+
import org.apache.seata.core.model.GlobalStatus;
3031
import org.apache.seata.core.model.Resource;
3132
import org.apache.seata.core.model.ResourceManager;
3233

@@ -150,6 +151,11 @@ public BranchType getBranchType() {
150151
throw new FrameworkException("DefaultResourceManager isn't a real ResourceManager");
151152
}
152153

154+
@Override
155+
public GlobalStatus getGlobalStatus(BranchType branchType, String xid) {
156+
return getResourceManager(branchType).getGlobalStatus(branchType, xid);
157+
}
158+
153159
private static class SingletonHolder {
154160
private static DefaultResourceManager INSTANCE = new DefaultResourceManager();
155161
}

rocketmq/pom.xml

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<parent>
24+
<groupId>org.apache.seata</groupId>
25+
<artifactId>seata-parent</artifactId>
26+
<version>${revision}</version>
27+
<relativePath>../pom.xml</relativePath>
28+
</parent>
29+
<modelVersion>4.0.0</modelVersion>
30+
<artifactId>seata-rocketmq</artifactId>
31+
<packaging>jar</packaging>
32+
<name>seata-rocketmq ${project.version}</name>
33+
<description>rocketmq integration for Seata built with Maven</description>
34+
35+
<dependencies>
36+
<dependency>
37+
<groupId>${project.groupId}</groupId>
38+
<artifactId>seata-tcc</artifactId>
39+
<version>${project.version}</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.rocketmq</groupId>
43+
<artifactId>rocketmq-client</artifactId>
44+
<scope>provided</scope>
45+
</dependency>
46+
</dependencies>
47+
48+
49+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.integration.rocketmq;
18+
19+
import org.apache.rocketmq.client.producer.SendStatus;
20+
import org.apache.seata.common.util.StringUtils;
21+
import org.apache.seata.core.context.RootContext;
22+
import org.apache.seata.core.model.GlobalStatus;
23+
import org.apache.seata.rm.DefaultResourceManager;
24+
import org.apache.rocketmq.client.Validators;
25+
import org.apache.rocketmq.client.exception.MQBrokerException;
26+
import org.apache.rocketmq.client.exception.MQClientException;
27+
import org.apache.rocketmq.client.producer.LocalTransactionState;
28+
import org.apache.rocketmq.client.producer.SendResult;
29+
import org.apache.rocketmq.client.producer.TransactionListener;
30+
import org.apache.rocketmq.client.producer.TransactionMQProducer;
31+
import org.apache.rocketmq.common.message.Message;
32+
import org.apache.rocketmq.common.message.MessageAccessor;
33+
import org.apache.rocketmq.common.message.MessageConst;
34+
import org.apache.rocketmq.common.message.MessageExt;
35+
import org.apache.rocketmq.remoting.RPCHook;
36+
import org.apache.rocketmq.remoting.exception.RemotingException;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
import java.util.Arrays;
41+
import java.util.List;
42+
43+
/**
44+
* Seata MQ Producer
45+
**/
46+
public class SeataMQProducer extends TransactionMQProducer {
47+
48+
private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducer.class);
49+
50+
private static final List<GlobalStatus> COMMIT_STATUSES = Arrays.asList(GlobalStatus.Committed, GlobalStatus.Committing, GlobalStatus.CommitRetrying);
51+
private static final List<GlobalStatus> ROLLBACK_STATUSES = Arrays.asList(GlobalStatus.Rollbacked, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying);
52+
53+
public static String PROPERTY_SEATA_XID = RootContext.KEY_XID;
54+
public static String PROPERTY_SEATA_BRANCHID = RootContext.KEY_BRANCHID;
55+
private TransactionListener transactionListener;
56+
57+
private TCCRocketMQ tccRocketMQ;
58+
59+
SeataMQProducer(final String producerGroup) {
60+
this(null, producerGroup, null);
61+
}
62+
63+
SeataMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
64+
super(namespace, producerGroup, rpcHook);
65+
this.transactionListener = new TransactionListener() {
66+
@Override
67+
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
68+
return LocalTransactionState.UNKNOW;
69+
}
70+
71+
@Override
72+
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
73+
String xid = msg.getProperty(PROPERTY_SEATA_XID);
74+
if (StringUtils.isBlank(xid)) {
75+
LOGGER.error("msg has no xid, msgTransactionId: {}, msg will be rollback", msg.getTransactionId());
76+
return LocalTransactionState.ROLLBACK_MESSAGE;
77+
}
78+
GlobalStatus globalStatus = DefaultResourceManager.get().getGlobalStatus(SeataMQProducerFactory.ROCKET_BRANCH_TYPE, xid);
79+
if (COMMIT_STATUSES.contains(globalStatus)) {
80+
return LocalTransactionState.COMMIT_MESSAGE;
81+
} else if (ROLLBACK_STATUSES.contains(globalStatus) || GlobalStatus.isOnePhaseTimeout(globalStatus)) {
82+
return LocalTransactionState.ROLLBACK_MESSAGE;
83+
} else if (GlobalStatus.Finished.equals(globalStatus)) {
84+
LOGGER.error("global transaction finished, msg will be rollback, xid: {}", xid);
85+
return LocalTransactionState.ROLLBACK_MESSAGE;
86+
}
87+
return LocalTransactionState.UNKNOW;
88+
}
89+
};
90+
}
91+
92+
@Override
93+
public SendResult send(Message msg) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
94+
return send(msg, this.getSendMsgTimeout());
95+
}
96+
97+
@Override
98+
public SendResult send(Message msg, long timeout) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
99+
if (RootContext.inGlobalTransaction()) {
100+
if (tccRocketMQ == null) {
101+
throw new RuntimeException("TCCRocketMQ is not initialized");
102+
}
103+
return tccRocketMQ.prepare(msg, timeout);
104+
} else {
105+
return super.send(msg, timeout);
106+
}
107+
}
108+
109+
public SendResult doSendMessageInTransaction(final Message msg, long timeout, String xid, long branchId) throws MQClientException {
110+
msg.setTopic(withNamespace(msg.getTopic()));
111+
if (msg.getDelayTimeLevel() != 0) {
112+
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
113+
}
114+
Validators.checkMessage(msg, this);
115+
116+
SendResult sendResult = null;
117+
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
118+
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.getProducerGroup());
119+
MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid);
120+
MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId));
121+
try {
122+
sendResult = super.send(msg, timeout);
123+
} catch (Exception e) {
124+
throw new MQClientException("send message Exception", e);
125+
}
126+
127+
if (SendStatus.SEND_OK != sendResult.getSendStatus()) {
128+
throw new RuntimeException("Message send fail.status=" + sendResult.getSendStatus());
129+
}
130+
if (sendResult.getTransactionId() != null) {
131+
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
132+
}
133+
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
134+
if (null != transactionId && !"".equals(transactionId)) {
135+
msg.setTransactionId(transactionId);
136+
}
137+
return sendResult;
138+
}
139+
140+
141+
@Override
142+
public TransactionListener getTransactionListener() {
143+
return transactionListener;
144+
}
145+
146+
public void setTccRocketMQ(TCCRocketMQ tccRocketMQ) {
147+
this.tccRocketMQ = tccRocketMQ;
148+
}
149+
}

0 commit comments

Comments
 (0)