Skip to content

Commit 91eb505

Browse files
authored
feature: add TCC three-phase hooks (apache#6766)
1 parent 77c81d5 commit 91eb505

File tree

8 files changed

+570
-3
lines changed

8 files changed

+570
-3
lines changed

changes/en-us/2.x.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Add changes here for all PR submitted to the 2.x branch.
77
- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] multi-version seata protocol support
88
- [[#6537](https://github.com/apache/incubator-seata/pull/6537)] support Namingserver
99
- [[#6538](https://github.com/apache/incubator-seata/pull/6538)] Integration of naming server on the Seata server side
10+
- [[#6766](https://github.com/apache/incubator-seata/pull/6766)] add TCC three-phase hooks
1011

1112
### bugfix:
1213
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async annotation not working in ClusterWatcherManager

changes/zh-cn/2.x.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
- [[#6226](https://github.com/apache/incubator-seata/pull/6226)] 支持seata私有协议多版本兼容
88
- [[#6537](https://github.com/apache/incubator-seata/pull/6537)] 支持 Namingserver
99
- [[#6538](https://github.com/apache/incubator-seata/pull/6538)] seata server端集成naming server
10+
- [[#6766](https://github.com/apache/incubator-seata/pull/6766)] 添加tcc三阶段钩子函数,方便用户拓展业务逻辑(比如多数据源情况下可以用于切换数据源)
1011

1112
### bugfix:
1213
- [[#6592](https://github.com/apache/incubator-seata/pull/6592)] fix @Async注解ClusterWatcherManager中不生效的问题
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.integration.tx.api.fence.hook;
18+
19+
20+
import org.apache.seata.rm.tcc.api.BusinessActionContext;
21+
22+
public interface TccHook {
23+
24+
/**
25+
* before tcc prepare
26+
*/
27+
void beforeTccPrepare(String xid, Long branchId, String actionName, BusinessActionContext context);
28+
29+
/**
30+
* after tcc prepare
31+
*/
32+
void afterTccPrepare(String xid, Long branchId, String actionName, BusinessActionContext context);
33+
34+
/**
35+
* before tcc commit
36+
*/
37+
void beforeTccCommit(String xid, Long branchId, String actionName, BusinessActionContext context);
38+
39+
/**
40+
* after tcc commit
41+
*/
42+
void afterTccCommit(String xid, Long branchId, String actionName, BusinessActionContext context);
43+
44+
/**
45+
* before tcc rollback
46+
*/
47+
void beforeTccRollback(String xid, Long branchId, String actionName, BusinessActionContext context);
48+
49+
/**
50+
* after tcc rollback
51+
*/
52+
void afterTccRollback(String xid, Long branchId, String actionName, BusinessActionContext context);
53+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.integration.tx.api.fence.hook;
18+
19+
import java.util.Collections;
20+
import java.util.List;
21+
import java.util.concurrent.CopyOnWriteArrayList;
22+
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
public final class TccHookManager {
27+
private static final Logger LOGGER = LoggerFactory.getLogger(TccHookManager.class);
28+
29+
private TccHookManager() {
30+
31+
}
32+
33+
private static final List<TccHook> TCC_HOOKS = new CopyOnWriteArrayList<>();
34+
// Cache unmodifiable lists
35+
private volatile static List<TccHook> CACHED_UNMODIFIABLE_HOOKS = null;
36+
37+
/**
38+
* get the hooks
39+
* @return tccHook list
40+
*/
41+
public static List<TccHook> getHooks() {
42+
if (CACHED_UNMODIFIABLE_HOOKS == null) {
43+
synchronized (TccHookManager.class) {
44+
if (CACHED_UNMODIFIABLE_HOOKS == null) {
45+
CACHED_UNMODIFIABLE_HOOKS = Collections.unmodifiableList(TCC_HOOKS);
46+
}
47+
}
48+
}
49+
return CACHED_UNMODIFIABLE_HOOKS;
50+
}
51+
52+
/**
53+
* add new hook
54+
* @param tccHook tccHook
55+
*/
56+
public static void registerHook(TccHook tccHook) {
57+
if (tccHook == null) {
58+
throw new NullPointerException("tccHook must not be null");
59+
}
60+
TCC_HOOKS.add(tccHook);
61+
CACHED_UNMODIFIABLE_HOOKS = null;
62+
LOGGER.info("TccHook registered succeeded! TccHooks size: {}", TCC_HOOKS.size());
63+
}
64+
65+
/**
66+
* clear hooks
67+
*/
68+
public static void clear() {
69+
TCC_HOOKS.clear();
70+
CACHED_UNMODIFIABLE_HOOKS = null;
71+
LOGGER.info("All TccHooks have been cleared.");
72+
}
73+
}

integration-tx-api/src/main/java/org/apache/seata/integration/tx/api/interceptor/ActionInterceptorHandler.java

+39-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.lang.reflect.UndeclaredThrowableException;
2323
import java.util.Collections;
2424
import java.util.HashMap;
25+
import java.util.List;
2526
import java.util.Map;
2627

2728
import org.apache.seata.common.Constants;
@@ -32,6 +33,8 @@
3233
import org.apache.seata.common.util.NetUtil;
3334
import org.apache.seata.core.context.RootContext;
3435
import org.apache.seata.integration.tx.api.fence.DefaultCommonFenceHandler;
36+
import org.apache.seata.integration.tx.api.fence.hook.TccHook;
37+
import org.apache.seata.integration.tx.api.fence.hook.TccHookManager;
3538
import org.apache.seata.integration.tx.api.util.JsonUtil;
3639
import org.apache.seata.rm.DefaultResourceManager;
3740
import org.apache.seata.rm.tcc.api.BusinessActionContext;
@@ -87,7 +90,7 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
8790
try {
8891
//share actionContext implicitly
8992
BusinessActionContextUtil.setContext(actionContext);
90-
93+
doBeforeTccPrepare(xid, branchId, actionName, actionContext);
9194
if (businessActionParam.getUseCommonFence()) {
9295
try {
9396
// Use common Fence, and return the business result
@@ -105,6 +108,7 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
105108
}
106109
} finally {
107110
try {
111+
doAfterTccPrepare(xid, branchId, actionName, actionContext);
108112
//to report business action context finally if the actionContext.getUpdated() is true
109113
BusinessActionContextUtil.reportContext(actionContext);
110114
} finally {
@@ -119,6 +123,40 @@ public Object proceed(Method method, Object[] arguments, String xid, TwoPhaseBus
119123
}
120124
}
121125

126+
/**
127+
* to do some business operations before tcc prepare
128+
* @param xid the xid
129+
* @param branchId the branchId
130+
* @param actionName the actionName
131+
* @param context the business action context
132+
*/
133+
private void doBeforeTccPrepare(String xid, String branchId, String actionName, BusinessActionContext context) {
134+
List<TccHook> hooks = TccHookManager.getHooks();
135+
if (hooks.isEmpty()) {
136+
return;
137+
}
138+
for (TccHook hook : hooks) {
139+
hook.beforeTccPrepare(xid, Long.valueOf(branchId), actionName, context);
140+
}
141+
}
142+
143+
/**
144+
* to do some business operations after tcc prepare
145+
* @param xid the xid
146+
* @param branchId the branchId
147+
* @param actionName the actionName
148+
* @param context the business action context
149+
*/
150+
private void doAfterTccPrepare(String xid, String branchId, String actionName, BusinessActionContext context) {
151+
List<TccHook> hooks = TccHookManager.getHooks();
152+
if (hooks.isEmpty()) {
153+
return;
154+
}
155+
for (TccHook hook : hooks) {
156+
hook.afterTccPrepare(xid, Long.valueOf(branchId), actionName, context);
157+
}
158+
}
159+
122160
/**
123161
* Get or create action context, and reset to arguments
124162
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seata.integration.tx.api.fence.hook;
18+
19+
20+
import java.util.List;
21+
22+
import org.junit.jupiter.api.BeforeEach;
23+
import org.junit.jupiter.api.Test;
24+
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertTrue;
27+
import static org.junit.jupiter.api.Assertions.assertThrows;
28+
import static org.mockito.Mockito.mock;
29+
30+
31+
public class TccHookManagerTest {
32+
33+
@BeforeEach
34+
public void setUp() {
35+
TccHookManager.clear();
36+
}
37+
38+
@Test
39+
public void testRegisterHook() {
40+
TccHook hook = mock(TccHook.class);
41+
TccHookManager.registerHook(hook);
42+
43+
List<TccHook> hooks = TccHookManager.getHooks();
44+
assertEquals(1, hooks.size());
45+
assertTrue(hooks.contains(hook));
46+
}
47+
48+
@Test
49+
public void testClear() {
50+
TccHook hook = mock(TccHook.class);
51+
TccHookManager.registerHook(hook);
52+
List<TccHook> hooks = TccHookManager.getHooks();
53+
assertEquals(1, hooks.size());
54+
assertTrue(hooks.contains(hook));
55+
56+
TccHookManager.clear();
57+
58+
assertTrue(TccHookManager.getHooks().isEmpty());
59+
}
60+
61+
@Test
62+
public void testGetHooks() {
63+
TccHook hook1 = mock(TccHook.class);
64+
TccHook hook2 = mock(TccHook.class);
65+
TccHookManager.registerHook(hook1);
66+
TccHookManager.registerHook(hook2);
67+
68+
List<TccHook> hooks = TccHookManager.getHooks();
69+
assertEquals(2, hooks.size());
70+
assertTrue(hooks.contains(hook1));
71+
assertTrue(hooks.contains(hook2));
72+
73+
// Check unmodifiable list
74+
assertThrows(UnsupportedOperationException.class, () -> hooks.add(mock(TccHook.class)));
75+
}
76+
}

0 commit comments

Comments
 (0)