Skip to content

Commit dbeba61

Browse files
authored
optimize: expand unit test coverage for the [rocketmq] module. (apache#6927)
feature: add unit-test for rocketmq module
1 parent 4223b85 commit dbeba61

File tree

5 files changed

+629
-16
lines changed

5 files changed

+629
-16
lines changed

changes/en-us/2.x.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Add changes here for all PR submitted to the 2.x branch.
3838
### security:
3939

4040
### test:
41-
41+
- [[#6927](https://github.com/apache/incubator-seata/pull/6927)] Add unit tests for the `seata-rocketmq` module
4242

4343
Thanks to these contributors for their code commits. Please report an unintended omission.
4444

changes/zh-cn/2.x.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
### security:
4242

4343
### test:
44-
44+
- [[#6927](https://github.com/apache/incubator-seata/pull/6927)] 增加`seata-rocketmq`模块的测试用例
4545

4646
非常感谢以下 contributors 的代码贡献。若有无意遗漏,请报告。
4747

rocketmq/src/main/java/org/apache/seata/integration/rocketmq/SeataMQProducer.java

+22-12
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,12 @@
1616
*/
1717
package org.apache.seata.integration.rocketmq;
1818

19-
import org.apache.rocketmq.client.producer.SendStatus;
20-
import org.apache.seata.common.util.StringUtils;
21-
import org.apache.seata.core.context.RootContext;
22-
import org.apache.seata.core.model.GlobalStatus;
23-
import org.apache.seata.rm.DefaultResourceManager;
2419
import org.apache.rocketmq.client.Validators;
2520
import org.apache.rocketmq.client.exception.MQBrokerException;
2621
import org.apache.rocketmq.client.exception.MQClientException;
2722
import org.apache.rocketmq.client.producer.LocalTransactionState;
2823
import org.apache.rocketmq.client.producer.SendResult;
24+
import org.apache.rocketmq.client.producer.SendStatus;
2925
import org.apache.rocketmq.client.producer.TransactionListener;
3026
import org.apache.rocketmq.client.producer.TransactionMQProducer;
3127
import org.apache.rocketmq.common.message.Message;
@@ -34,6 +30,10 @@
3430
import org.apache.rocketmq.common.message.MessageExt;
3531
import org.apache.rocketmq.remoting.RPCHook;
3632
import org.apache.rocketmq.remoting.exception.RemotingException;
33+
import org.apache.seata.common.util.StringUtils;
34+
import org.apache.seata.core.context.RootContext;
35+
import org.apache.seata.core.model.GlobalStatus;
36+
import org.apache.seata.rm.DefaultResourceManager;
3737
import org.slf4j.Logger;
3838
import org.slf4j.LoggerFactory;
3939

@@ -47,8 +47,10 @@ public class SeataMQProducer extends TransactionMQProducer {
4747

4848
private static final Logger LOGGER = LoggerFactory.getLogger(SeataMQProducer.class);
4949

50-
private static final List<GlobalStatus> COMMIT_STATUSES = Arrays.asList(GlobalStatus.Committed, GlobalStatus.Committing, GlobalStatus.CommitRetrying);
51-
private static final List<GlobalStatus> ROLLBACK_STATUSES = Arrays.asList(GlobalStatus.Rollbacked, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying);
50+
private static final List<GlobalStatus> COMMIT_STATUSES =
51+
Arrays.asList(GlobalStatus.Committed, GlobalStatus.Committing, GlobalStatus.CommitRetrying);
52+
private static final List<GlobalStatus> ROLLBACK_STATUSES =
53+
Arrays.asList(GlobalStatus.Rollbacked, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying);
5254

5355
public static String PROPERTY_SEATA_XID = RootContext.KEY_XID;
5456
public static String PROPERTY_SEATA_BRANCHID = RootContext.KEY_BRANCHID;
@@ -75,7 +77,8 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
7577
LOGGER.error("msg has no xid, msgTransactionId: {}, msg will be rollback", msg.getTransactionId());
7678
return LocalTransactionState.ROLLBACK_MESSAGE;
7779
}
78-
GlobalStatus globalStatus = DefaultResourceManager.get().getGlobalStatus(SeataMQProducerFactory.ROCKET_BRANCH_TYPE, xid);
80+
GlobalStatus globalStatus =
81+
DefaultResourceManager.get().getGlobalStatus(SeataMQProducerFactory.ROCKET_BRANCH_TYPE, xid);
7982
if (COMMIT_STATUSES.contains(globalStatus)) {
8083
return LocalTransactionState.COMMIT_MESSAGE;
8184
} else if (ROLLBACK_STATUSES.contains(globalStatus) || GlobalStatus.isOnePhaseTimeout(globalStatus)) {
@@ -90,12 +93,14 @@ public LocalTransactionState checkLocalTransaction(MessageExt msg) {
9093
}
9194

9295
@Override
93-
public SendResult send(Message msg) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
96+
public SendResult send(Message msg)
97+
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
9498
return send(msg, this.getSendMsgTimeout());
9599
}
96100

97101
@Override
98-
public SendResult send(Message msg, long timeout) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
102+
public SendResult send(Message msg, long timeout)
103+
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
99104
if (RootContext.inGlobalTransaction()) {
100105
if (tccRocketMQ == null) {
101106
throw new RuntimeException("TCCRocketMQ is not initialized");
@@ -106,7 +111,8 @@ public SendResult send(Message msg, long timeout) throws MQClientException, MQBr
106111
}
107112
}
108113

109-
public SendResult doSendMessageInTransaction(final Message msg, long timeout, String xid, long branchId) throws MQClientException {
114+
public SendResult doSendMessageInTransaction(final Message msg, long timeout, String xid, long branchId)
115+
throws MQClientException {
110116
msg.setTopic(withNamespace(msg.getTopic()));
111117
if (msg.getDelayTimeLevel() != 0) {
112118
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
@@ -119,7 +125,7 @@ public SendResult doSendMessageInTransaction(final Message msg, long timeout, St
119125
MessageAccessor.putProperty(msg, PROPERTY_SEATA_XID, xid);
120126
MessageAccessor.putProperty(msg, PROPERTY_SEATA_BRANCHID, String.valueOf(branchId));
121127
try {
122-
sendResult = super.send(msg, timeout);
128+
sendResult = superSend(msg, timeout);
123129
} catch (Exception e) {
124130
throw new MQClientException("send message Exception", e);
125131
}
@@ -137,6 +143,10 @@ public SendResult doSendMessageInTransaction(final Message msg, long timeout, St
137143
return sendResult;
138144
}
139145

146+
public SendResult superSend(Message msg, long timeout)
147+
throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
148+
return super.send(msg, timeout);
149+
}
140150

141151
@Override
142152
public TransactionListener getTransactionListener() {

0 commit comments

Comments
 (0)