Skip to content

Commit 5c50660

Browse files
committed
feat: add OwnershipSynchronizer to abstract consumer migration
1 parent 82ca33f commit 5c50660

File tree

4 files changed

+415
-92
lines changed

4 files changed

+415
-92
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package io.lettuce.core.concurrency;
2+
3+
import io.lettuce.core.internal.LettuceAssert;
4+
import io.netty.util.concurrent.EventExecutor;
5+
import io.netty.util.concurrent.SingleThreadEventExecutor;
6+
7+
/**
8+
* @author chenxiaofan
9+
*/
10+
class Owner {
11+
12+
final SingleThreadEventExecutor eventExecutor;
13+
14+
// if positive, no other thread can preempt the ownership.
15+
private final int runningTaskNum;
16+
17+
public Owner(EventExecutor eventExecutor, int runningTaskNum) {
18+
if (runningTaskNum < 0) {
19+
throw new IllegalArgumentException(String.format("negative runningTaskNum: %d", runningTaskNum));
20+
}
21+
LettuceAssert.assertState(eventExecutor instanceof SingleThreadEventExecutor,
22+
() -> String.format("unexpected event executor, expect %s got %s", SingleThreadEventExecutor.class.getName(),
23+
eventExecutor.getClass().getName()));
24+
this.eventExecutor = (SingleThreadEventExecutor) eventExecutor;
25+
this.runningTaskNum = runningTaskNum;
26+
}
27+
28+
public boolean inEventLoop() {
29+
return eventExecutor.inEventLoop();
30+
}
31+
32+
public Owner toAdd(int n) {
33+
return new Owner(eventExecutor, runningTaskNum + n);
34+
}
35+
36+
@SuppressWarnings("BooleanMethodIsAlwaysInverted")
37+
public boolean isDone() {
38+
return runningTaskNum == 0;
39+
}
40+
41+
public String getThreadName() {
42+
return eventExecutor.threadProperties().name();
43+
}
44+
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package io.lettuce.core.concurrency;
2+
3+
import java.time.Duration;
4+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
5+
import java.util.function.Consumer;
6+
7+
import io.lettuce.core.internal.LettuceAssert;
8+
import io.netty.util.concurrent.EventExecutor;
9+
import io.netty.util.internal.logging.InternalLogger;
10+
11+
/**
12+
* @author chenxiaofan
13+
*/
14+
public class OwnershipSynchronizer<R> {
15+
16+
public static final int LOOP_CHECK_PERIOD = 100_000;
17+
18+
public static class FailedToPreemptOwnershipException extends Exception {
19+
20+
public FailedToPreemptOwnershipException() {
21+
super("failed to preempt ownership");
22+
}
23+
24+
}
25+
26+
private static final AtomicReferenceFieldUpdater<OwnershipSynchronizer, Owner> OWNER = AtomicReferenceFieldUpdater
27+
.newUpdater(OwnershipSynchronizer.class, Owner.class, "owner");
28+
29+
@SuppressWarnings("java:S3077")
30+
private volatile Owner owner;
31+
32+
final InternalLogger logger;
33+
34+
final R protectedResource;
35+
36+
/**
37+
* Create OwnershipSynchronizer instance.
38+
*
39+
* @param protectedResource protected resource, which can only be accessed by the owner thread, e.g. mpsc queue.
40+
* @param initialOwnerEventExecutor initial owner thread.
41+
* @param initialRunningTaskNum initial running task number.
42+
* @param logger logger.
43+
*/
44+
public OwnershipSynchronizer(R protectedResource, EventExecutor initialOwnerEventExecutor, int initialRunningTaskNum,
45+
InternalLogger logger) {
46+
this.protectedResource = protectedResource;
47+
this.owner = new Owner(initialOwnerEventExecutor, initialRunningTaskNum);
48+
this.logger = logger;
49+
}
50+
51+
/**
52+
* Safely run a task in current owner thread and release its memory effect to next owner thread.
53+
*
54+
* @param task task to run
55+
*/
56+
public void execute(Consumer<R> task) {
57+
Owner cur;
58+
do {
59+
cur = this.owner;
60+
if (isOwnerCurrentThreadAndPreemptPrevented(cur)) {
61+
// already prevented preemption, safe to skip expensive add/done calls
62+
task.accept(protectedResource);
63+
return;
64+
}
65+
} while (!OWNER.compareAndSet(this, cur, cur.toAdd(1)));
66+
67+
if (cur.inEventLoop()) {
68+
try {
69+
task.accept(protectedResource);
70+
} finally {
71+
done(1);
72+
}
73+
} else {
74+
try {
75+
cur.eventExecutor.execute(() -> {
76+
try {
77+
task.accept(protectedResource);
78+
} finally {
79+
done(1);
80+
}
81+
});
82+
} catch (Exception e) {
83+
logger.error("failed to execute task in owner thread", e);
84+
done(1);
85+
throw e;
86+
}
87+
}
88+
}
89+
90+
/**
91+
* Preempt ownership only when there is no running tasks in current owner
92+
*
93+
* @param eventExecutor new thread
94+
* @param runningTaskNumber running task number to add
95+
*/
96+
@SuppressWarnings("unused")
97+
public void preempt(EventExecutor eventExecutor, int runningTaskNumber) throws FailedToPreemptOwnershipException {
98+
preempt(eventExecutor, runningTaskNumber, Long.MAX_VALUE);
99+
}
100+
101+
/**
102+
* Preempt ownership only when there is no running tasks in current owner
103+
*
104+
* @param eventExecutor new thread
105+
* @param runningTaskNumber running task number to add
106+
* @param timeout timeout
107+
*/
108+
public void preempt(EventExecutor eventExecutor, int runningTaskNumber, Duration timeout)
109+
throws FailedToPreemptOwnershipException {
110+
preempt(eventExecutor, runningTaskNumber, System.nanoTime() + timeout.toNanos());
111+
}
112+
113+
@SuppressWarnings("java:S3776" /* complexity */)
114+
private void preempt(EventExecutor eventExecutor, int runningTaskNumber, long deadline)
115+
throws FailedToPreemptOwnershipException {
116+
Owner newOwner = null;
117+
int i = 0;
118+
while (true) {
119+
final Owner cur = this.owner;
120+
121+
if (cur.eventExecutor == eventExecutor) {
122+
if (runningTaskNumber == 0 || OWNER.compareAndSet(this, cur, cur.toAdd(runningTaskNumber))) { // prevent preempt
123+
return;
124+
}
125+
} else if (cur.isDone()) {
126+
if (newOwner == null) {
127+
newOwner = new Owner(eventExecutor, runningTaskNumber);
128+
}
129+
130+
if (OWNER.compareAndSet(this, cur, newOwner)) {
131+
logger.debug("ownership preempted by a new thread [{}]", newOwner.getThreadName());
132+
// established happens-before with done()
133+
return;
134+
}
135+
}
136+
137+
// 1. unsafe to preempt, wait for the owner to finish
138+
// 2. CAS failed
139+
if (deadline < Long.MAX_VALUE && ++i > LOOP_CHECK_PERIOD) {
140+
if (System.nanoTime() > deadline) {
141+
throw new FailedToPreemptOwnershipException();
142+
}
143+
i = 0;
144+
}
145+
}
146+
}
147+
148+
/**
149+
* done n tasks in current owner.
150+
*
151+
* @param n number of tasks to be done.
152+
*/
153+
public void done(int n) {
154+
Owner cur;
155+
do {
156+
cur = this.owner;
157+
assertIsOwnerThreadAndPreemptPrevented(cur);
158+
} while (!OWNER.compareAndSet(this, cur, cur.toAdd(-n)));
159+
// create happens-before with preempt()
160+
}
161+
162+
public void assertIsOwnerThreadAndPreemptPrevented() {
163+
assertIsOwnerThreadAndPreemptPrevented(this.owner);
164+
}
165+
166+
private void assertIsOwnerThreadAndPreemptPrevented(Owner cur) {
167+
LettuceAssert.assertState(isOwnerCurrentThreadAndPreemptPrevented(cur),
168+
() -> "[executeInOwnerWithPreemptPrevention] unexpected: "
169+
+ (cur.inEventLoop() ? "preemption not prevented" : "owner is not this thread"));
170+
}
171+
172+
private boolean isOwnerCurrentThreadAndPreemptPrevented(Owner owner) {
173+
return owner.inEventLoop() && !owner.isDone();
174+
}
175+
176+
}

0 commit comments

Comments
 (0)