Skip to content

Commit a629cd8

Browse files
authored
Merge pull request #182 from spotify/fire-at-least-once-atomic
Rewrote FireAtLeastOnceObserver to ensure stronger ordering guarantees.
2 parents bdd6db2 + 46de35b commit a629cd8

File tree

1 file changed

+41
-47
lines changed

1 file changed

+41
-47
lines changed

mobius-core/src/main/java/com/spotify/mobius/FireAtLeastOnceObserver.java

+41-47
Original file line numberDiff line numberDiff line change
@@ -23,16 +23,13 @@
2323

2424
import com.spotify.mobius.functions.Consumer;
2525
import java.util.concurrent.ConcurrentLinkedQueue;
26+
import java.util.concurrent.atomic.AtomicBoolean;
27+
import java.util.concurrent.atomic.AtomicReference;
2628

2729
class FireAtLeastOnceObserver<V> implements Consumer<V> {
28-
private enum State {
29-
UNFIRED,
30-
FIRING,
31-
READY,
32-
}
3330

3431
private final Consumer<V> delegate;
35-
private volatile State state = State.UNFIRED;
32+
private volatile boolean hasStartedEmitting = false;
3633
private final ConcurrentLinkedQueue<V> queue = new ConcurrentLinkedQueue<>();
3734

3835
public FireAtLeastOnceObserver(Consumer<V> delegate) {
@@ -41,58 +38,55 @@ public FireAtLeastOnceObserver(Consumer<V> delegate) {
4138

4239
@Override
4340
public void accept(V value) {
44-
// this is a bit racy, with three threads handling values with order 1, 2 and 3, respectively:
45-
// 1. thread 1 has called accceptIfUnfired and is in safeConsume, having published its value to
46-
// observers, and having just set the state to READY
47-
// 2. thread 2 has called accept, and is in safeConsume, before the first synchronized section
48-
// 3. thread 3 has called accept and is about to check the current state.
49-
//
50-
// now, if thread 3 reads READY and calls the delegate's accept method directly, before
51-
// thread 2 sets the state to FIRING and publishes its data, the observer will see 1, 3, 2.
52-
// this means that this class isn't safe for racing calls to accept(), but given that it's
53-
// only intended to be used within the event processing, which is sequential, that is not a
54-
// risk.
55-
// do note that this class isn't generally useful outside the specific context of event
56-
// processing.
57-
if (state != State.READY) {
58-
safeConsume(value, true);
59-
} else {
60-
delegate.accept(value);
61-
}
41+
queue.add(value);
42+
drainQueue();
6243
}
6344

45+
private final AtomicReference<AtomicReference<V>> firstValue = new AtomicReference<>(null);
46+
6447
public void acceptIfFirst(V value) {
65-
if (state == State.UNFIRED) {
66-
safeConsume(value, false);
48+
// Wrap the value, so that we are able to represent having a `null` value vs. not having a value
49+
// at all.
50+
AtomicReference<V> wrappedValue = new AtomicReference<>(value);
51+
if (firstValue.compareAndSet(null, wrappedValue)) {
52+
drainQueue();
6753
}
6854
}
6955

70-
private void safeConsume(V value, boolean acceptAlways) {
71-
// this synchronized section mustn't call unsafe external code like the delegate's accept
72-
// method to avoid the risk of deadlocks. It's synchronized because it's changing two stateful
73-
// fields: the 'state' and the 'queue', and those need to go together to guarantee ordering
74-
// of the emitted values.
75-
synchronized (this) {
76-
// add this item to the queue if we haven't fired, or if it should be added anyway
77-
if (state == State.UNFIRED || acceptAlways) {
78-
queue.add(value);
79-
}
56+
private final AtomicBoolean processing = new AtomicBoolean(false);
8057

81-
// set state to FIRING to prevent acceptIfUnfired from adding items to the queue and messing
82-
// ordering up - regular accept mustn't invoke the delegate consumer directly until we've
83-
// processed the queue and entered READY state.
84-
state = State.FIRING;
58+
private void drainQueue() {
59+
if (!processing.compareAndSet(false, true)) {
60+
// already draining the queue
61+
return;
8562
}
8663

87-
for (V toSend = queue.poll(); toSend != null; toSend = queue.poll()) {
88-
delegate.accept(value);
64+
// We are now in a safe section that can only execute on one thread at the time.
65+
66+
// If this is the first time, try to emit a value that only can be emitted if it is first.
67+
if (!hasStartedEmitting) {
68+
hasStartedEmitting = true;
69+
AtomicReference<V> wrappedValue = firstValue.get();
70+
if (wrappedValue != null) {
71+
delegate.accept(wrappedValue.get());
72+
}
8973
}
9074

91-
synchronized (this) {
92-
// it's possible for a racing 'accept' call to add an item to the queue after the last poll
93-
// above, so check in an exclusive way that the queue is in fact empty TODO: not correct.
94-
if (queue.isEmpty()) {
95-
state = State.READY;
75+
boolean done = false;
76+
77+
while (!done) {
78+
try {
79+
for (V toSend = queue.poll(); toSend != null; toSend = queue.poll()) {
80+
delegate.accept(toSend);
81+
}
82+
83+
} finally {
84+
processing.set(false); // leave the safe section
85+
86+
// If the queue is empty or if we can't reacquire the processing lock, we're done,
87+
// because either there is nothing to do, or someone else will process the queue.
88+
// Note: it's important that we check the queue first, otherwise we might leak the lock.
89+
done = queue.isEmpty() || !processing.compareAndSet(false, true);
9690
}
9791
}
9892
}

0 commit comments

Comments
 (0)