Skip to content

Commit ee428ff

Browse files
authored
Merge pull request #166 from pettermahlen/observe-race
solve race in MobiusLoop.observe()
2 parents 94e3515 + e5baa3d commit ee428ff

File tree

4 files changed

+188
-7
lines changed

4 files changed

+188
-7
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
* -\-\-
3+
* Mobius
4+
* --
5+
* Copyright (c) 2017-2020 Spotify AB
6+
* --
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* -/-/-
19+
*/
20+
package com.spotify.mobius;
21+
22+
import static com.spotify.mobius.internal_util.Preconditions.checkNotNull;
23+
24+
import com.spotify.mobius.functions.Consumer;
25+
import java.util.concurrent.ConcurrentLinkedQueue;
26+
27+
class FireAtLeastOnceObserver<V> implements Consumer<V> {
28+
private enum State {
29+
UNFIRED,
30+
FIRING,
31+
READY,
32+
}
33+
34+
private final Consumer<V> delegate;
35+
private volatile State state = State.UNFIRED;
36+
private final ConcurrentLinkedQueue<V> queue = new ConcurrentLinkedQueue<>();
37+
38+
public FireAtLeastOnceObserver(Consumer<V> delegate) {
39+
this.delegate = checkNotNull(delegate);
40+
}
41+
42+
@Override
43+
public void accept(V value) {
44+
// this is a bit racy, with three threads handling values with order 1, 2 and 3, respectively:
45+
// 1. thread 1 has called accceptIfUnfired and is in safeConsume, having published its value to
46+
// observers, and having just set the state to READY
47+
// 2. thread 2 has called accept, and is in safeConsume, before the first synchronized section
48+
// 3. thread 3 has called accept and is about to check the current state.
49+
//
50+
// now, if thread 3 reads READY and calls the delegate's accept method directly, before
51+
// thread 2 sets the state to FIRING and publishes its data, the observer will see 1, 3, 2.
52+
// this means that this class isn't safe for racing calls to accept(), but given that it's
53+
// only intended to be used within the event processing, which is sequential, that is not a
54+
// risk.
55+
// do note that this class isn't generally useful outside the specific context of event
56+
// processing.
57+
if (state != State.READY) {
58+
safeConsume(value, true);
59+
} else {
60+
delegate.accept(value);
61+
}
62+
}
63+
64+
public void acceptIfFirst(V value) {
65+
if (state == State.UNFIRED) {
66+
safeConsume(value, false);
67+
}
68+
}
69+
70+
private void safeConsume(V value, boolean acceptAlways) {
71+
// this synchronized section mustn't call unsafe external code like the delegate's accept
72+
// method to avoid the risk of deadlocks. It's synchronized because it's changing two stateful
73+
// fields: the 'state' and the 'queue', and those need to go together to guarantee ordering
74+
// of the emitted values.
75+
synchronized (this) {
76+
// add this item to the queue if we haven't fired, or if it should be added anyway
77+
if (state == State.UNFIRED || acceptAlways) {
78+
queue.add(value);
79+
}
80+
81+
// set state to FIRING to prevent acceptIfUnfired from adding items to the queue and messing
82+
// ordering up - regular accept mustn't invoke the delegate consumer directly until we've
83+
// processed the queue and entered READY state.
84+
state = State.FIRING;
85+
}
86+
87+
for (V toSend = queue.poll(); toSend != null; toSend = queue.poll()) {
88+
delegate.accept(value);
89+
}
90+
91+
synchronized (this) {
92+
// it's possible for a racing 'accept' call to add an item to the queue after the last poll
93+
// above, so check in an exclusive way that the queue is in fact empty TODO: not correct.
94+
if (queue.isEmpty()) {
95+
state = State.READY;
96+
}
97+
}
98+
}
99+
}

mobius-core/src/main/java/com/spotify/mobius/Loop.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ public interface Loop<M, E, F> extends Disposable {
4343
* notified of future changes to the model until the loop or the returned {@link Disposable} is
4444
* disposed.
4545
*
46-
* <p>If the addition of the observer races with an event that changes the current model, the
47-
* observer may get notified twice of the same model.
46+
* <p>The observer is guaranteed to get notified of models in the same order that they are emitted
47+
* by the loop, but there is no guarantee about which model will be the first one observed.
4848
*
4949
* @param observer a non-null observer of model changes
5050
* @return a {@link Disposable} that can be used to stop further notifications to the observer

mobius-core/src/main/java/com/spotify/mobius/MobiusLoop.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -194,18 +194,21 @@ public Disposable observe(final Consumer<M> observer) {
194194
return () -> {};
195195
}
196196

197+
FireAtLeastOnceObserver<M> wrapped = new FireAtLeastOnceObserver<>(observer);
198+
199+
modelObservers.add(wrapped);
200+
197201
final M currentModel = mostRecentModel;
198202
if (currentModel != null) {
199-
// Start by emitting the most recently received model.
200-
observer.accept(currentModel);
203+
// Start by emitting the most recently received model, if one hasn't already been emitted
204+
// because of a racing model update
205+
wrapped.acceptIfFirst(currentModel);
201206
}
202207

203-
modelObservers.add(checkNotNull(observer));
204-
205208
return new Disposable() {
206209
@Override
207210
public void dispose() {
208-
modelObservers.remove(observer);
211+
modelObservers.remove(wrapped);
209212
}
210213
};
211214
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* -\-\-
3+
* Mobius
4+
* --
5+
* Copyright (c) 2017-2020 Spotify AB
6+
* --
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
* -/-/-
19+
*/
20+
package com.spotify.mobius;
21+
22+
import static org.assertj.core.api.Assertions.assertThat;
23+
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
29+
public class FireAtLeastOnceObserverTest {
30+
List<Integer> observed;
31+
32+
FireAtLeastOnceObserver<Integer> observer;
33+
34+
@Before
35+
public void setUp() throws Exception {
36+
observed = new ArrayList<>();
37+
38+
observer = new FireAtLeastOnceObserver<>(observed::add);
39+
}
40+
41+
@Test
42+
public void shouldForwardAcceptValuesNormally() {
43+
observer.accept(1);
44+
observer.accept(875);
45+
46+
assertThat(observed).containsExactly(1, 875);
47+
}
48+
49+
@Test
50+
public void shouldForwardAcceptFirstOnce() {
51+
observer.acceptIfFirst(98);
52+
53+
assertThat(observed).containsExactly(98);
54+
}
55+
56+
@Test
57+
public void shouldForwardAcceptNormallyAfterAcceptFirst() {
58+
observer.acceptIfFirst(87);
59+
observer.accept(87678);
60+
61+
assertThat(observed).containsExactly(87, 87678);
62+
}
63+
64+
@Test
65+
public void shouldNotForwardAcceptFirstTwice() {
66+
observer.acceptIfFirst(87);
67+
observer.acceptIfFirst(7767);
68+
69+
assertThat(observed).containsExactly(87);
70+
}
71+
72+
@Test
73+
public void shouldNotForwardAcceptFirstAfterNormalAccept() {
74+
observer.acceptIfFirst(987987);
75+
observer.acceptIfFirst(7767);
76+
77+
assertThat(observed).containsExactly(987987);
78+
}
79+
}

0 commit comments

Comments
 (0)