Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nondeterministically off by one #16

Closed
luminusian opened this issue Dec 31, 2016 · 2 comments
Closed

Nondeterministically off by one #16

luminusian opened this issue Dec 31, 2016 · 2 comments
Labels

Comments

@luminusian
Copy link

luminusian commented Dec 31, 2016

In the output of the code below, on-value-total is on-value plus or minus one.

(def subject (rx/subject))

(def stream
  (rx/take 1000 subject))

(rx/subscribe stream
              #(println "on-value:" %))

(def total-stream
  (->> stream
       (rx/filter number?)
       (rx/map (constantly 1))
       (rx/reduce + 0)))

(rx/subscribe total-stream
              #(println "on-value-total:" %))

(do (future (run! (partial rx/push! subject)
                  (range 1 1000)))
    (future (run! (partial rx/push! subject)
                  (repeat 1000 :foo))))

One possible output:

on-value: 458
on-value-total: 459

Another possible output:

on-value: 580
on-value-total: 579
@niwinz
Copy link
Member

niwinz commented Jan 30, 2017

Seems like some inconsistencies on rxjava and subjects, Seems like it does not push down the same number of objects on each subscription...

I have plan to update to rxjava 2.x in the near future, maybe it will be fixed.

@niwinz niwinz added the bug label Jan 30, 2017
@jespino
Copy link
Member

jespino commented Mar 10, 2017

Hi, this is related to a bad usage of Subject object. In the RxJava documentation (https://github.com/ReactiveX/RxJava/wiki/Subject#serializing), you can read that the Subject implementation isn't thread-safe. Only the SerializedSubject work as thread-safe, so to easy it, I have added the to-serialized function in the PR #23.

SerializerSubject add some buffering to the push, so if you want to have a similar behavior in your example code, you have to make the lock at pushing, for example, using this code:

(do 
  (future (run! (fn [item]
                  (locking subject
                    (rx/push! subject item)))
                (range 1 1000)))
  (future (run! (fn [item]
                  (locking subject
                    (rx/push! subject item)))
                (repeat 1000 :foo))))

@jespino jespino closed this as completed Mar 10, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants