From 2e45c6ca9fbc94c9bfd3b80bc70eed1dbab02cf0 Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Wed, 21 Sep 2016 15:35:40 +0800 Subject: [PATCH 1/4] push stuff up --- project.clj | 6 +++--- src/onyx/plugin/datomic.clj | 31 +++++++++++++++++++++++-------- 2 files changed, 26 insertions(+), 11 deletions(-) diff --git a/project.clj b/project.clj index 123a0ea..b9c7148 100644 --- a/project.clj +++ b/project.clj @@ -13,12 +13,12 @@ :sign-releases false}} :dependencies [[org.clojure/clojure "1.7.0"] ^{:voom {:repo "git@github.com:onyx-platform/onyx.git" :branch "master"}} - [org.onyxplatform/onyx "0.9.10-beta4"] - [aero "0.2.0"]] + [org.onyxplatform/onyx "0.9.10-beta4"]] :test-selectors {:default (complement :ci) :ci :ci :all (constantly true)} - :profiles {:dev {:dependencies [[com.datomic/datomic-free "0.9.5153"]] + :profiles {:dev {:dependencies [[aero "0.2.0"] + [com.datomic/datomic-free "0.9.5153"]] :plugins [[lein-set-version "0.4.1"] [lein-update-dependency "0.1.2"] [lein-pprint "1.1.1"]] diff --git a/src/onyx/plugin/datomic.clj b/src/onyx/plugin/datomic.clj index 8dbcbd2..f5d68cd 100644 --- a/src/onyx/plugin/datomic.clj +++ b/src/onyx/plugin/datomic.clj @@ -501,22 +501,36 @@ (defrecord DatomicWriteBulkDatomsAsync [conn] p-ext/Pipeline (read-batch - [_ event] + [_ event] (function/read-batch event)) (write-batch [_ event] - {;; Transact each tx individually to avoid tempid conflicts. - :datomic/written (->> (mapv (fn [tx] - (d/transact-async conn (:tx (:message tx)))) - (mapcat :leaves (:tree (:onyx.core/results event)))) - (into [] (comp (map deref)))) - :datomic/written? true}) + (let [timeout-ms 10 + written (->> (mapcat :leaves (:tree (:onyx.core/results event))) + (map (fn [tx] + (d/transact-async conn (:tx (:message tx))))) + (doall) + (map #(deref % timeout-ms ::timed-out)) + (doall))] + (when (some #(= ::timed-out written) written) + (throw (ex-info "Timed out, writing async message. Rebooting task" {:restartable? true + :datomic-plugin? true + :timeout timeout-ms}))) + {;; Transact each tx individually to avoid tempid conflicts. + :datomic/written written + :datomic/written? true})) (seal-resource [_ _] {})) +(defn handle-timed-out-exception [event lifecycle lf-kw exception] + (if (and (:datomic-plugin? (ex-data exception)) + (:restartable? (ex-data exception))) + :restartable + :kill)) + (defn write-bulk-datoms-async [pipeline-data] (let [task-map (:onyx.core/task-map pipeline-data) conn (safe-connect task-map)] @@ -537,7 +551,8 @@ {:lifecycle/before-task-start inject-write-bulk-tx-resources}) (def write-bulk-tx-async-calls - {:lifecycle/before-task-start inject-write-bulk-tx-async-resources}) + {:lifecycle/before-task-start inject-write-bulk-tx-async-resources + :lifecycle/handle-exception handle-timed-out-exception}) ;;;;;;;;; ;;; params lifecycles From 81a39a1ade9201d967b60ef51b90e063950175bf Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Mon, 7 Nov 2016 18:11:08 +0800 Subject: [PATCH 2/4] Manually deref regular transacts --- src/onyx/plugin/datomic.clj | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/src/onyx/plugin/datomic.clj b/src/onyx/plugin/datomic.clj index f5d68cd..087a799 100644 --- a/src/onyx/plugin/datomic.clj +++ b/src/onyx/plugin/datomic.clj @@ -483,11 +483,21 @@ (write-batch [_ event] - {;; Transact each tx individually to avoid tempid conflicts. - :datomic/written (mapv (fn [tx] - @(d/transact conn (:tx (:message tx)))) - (mapcat :leaves (:tree (:onyx.core/results event)))) - :datomic/written? true}) + + (let [written (->> (mapcat :leaves (:tree (:onyx.core/results event))) + (map (fn [tx] + (d/transact conn (:tx (:message tx))))) + (doall) + (map #(deref % timeout-ms ::timed-out)) + (doall))] + (when (some #(= ::timed-out written) written) + (throw (ex-info "Timed out transacting message to datomic. Rebooting task" + {:restartable? true + :datomic-plugin? true + :timeout timeout-ms}))) + {;; Transact each tx individually to avoid tempid conflicts. + :datomic/written written + :datomic/written? true})) (seal-resource [_ _] @@ -514,9 +524,10 @@ (map #(deref % timeout-ms ::timed-out)) (doall))] (when (some #(= ::timed-out written) written) - (throw (ex-info "Timed out, writing async message. Rebooting task" {:restartable? true - :datomic-plugin? true - :timeout timeout-ms}))) + (throw (ex-info "Timed out writing async message to datomic. Rebooting task" + {:restartable? true + :datomic-plugin? true + :timeout timeout-ms}))) {;; Transact each tx individually to avoid tempid conflicts. :datomic/written written :datomic/written? true})) @@ -548,7 +559,8 @@ {:lifecycle/before-task-start inject-write-tx-resources}) (def write-bulk-tx-calls - {:lifecycle/before-task-start inject-write-bulk-tx-resources}) + {:lifecycle/before-task-start inject-write-bulk-tx-resources + :lifecycle/handle-exception handle-timed-out-exception}) (def write-bulk-tx-async-calls {:lifecycle/before-task-start inject-write-bulk-tx-async-resources From 52176026e9e6816bcd737281572ff8a1df3e2ede Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Mon, 7 Nov 2016 18:15:27 +0800 Subject: [PATCH 3/4] Deref in regular write-batch --- src/onyx/plugin/datomic.clj | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/src/onyx/plugin/datomic.clj b/src/onyx/plugin/datomic.clj index 087a799..0ef7a7f 100644 --- a/src/onyx/plugin/datomic.clj +++ b/src/onyx/plugin/datomic.clj @@ -150,7 +150,7 @@ read-ch retry-ch commit-ch] p-ext/Pipeline (write-batch - [this event] + [this event] (function/write-batch event)) (read-batch @@ -457,12 +457,20 @@ (write-batch [_ event] - (let [messages (mapcat :leaves (:tree (:onyx.core/results event)))] - {:datomic/written @(d/transact conn - (map (fn [msg] (if (and partition (not (sequential? msg))) - (assoc msg :db/id (d/tempid partition)) - msg)) - (map :message messages))) + (let [messages (mapcat :leaves (:tree (:onyx.core/results event))) + timeout-ms 10 + written (-> conn + (d/transact (map (fn [msg] (if (and partition (not (sequential? msg))) + (assoc msg :db/id (d/tempid partition)) + msg)) + (map :message messages))) + (deref timeout-ms ::timed-out))] + (when (= ::timed-out written) + (throw (ex-info "Timed out transacting message to datomic. Rebooting task" + {:restartable? true + :datomic-plugin? true + :timeout timeout-ms}))) + {:datomic/written written :datomic/written? true})) (seal-resource @@ -484,7 +492,8 @@ (write-batch [_ event] - (let [written (->> (mapcat :leaves (:tree (:onyx.core/results event))) + (let [timeout-ms 10 + written (->> (mapcat :leaves (:tree (:onyx.core/results event))) (map (fn [tx] (d/transact conn (:tx (:message tx))))) (doall) From e7a053752dfa61c353746da7a14b7da76b04b03f Mon Sep 17 00:00:00 2001 From: Lucas Bradstreet Date: Mon, 7 Nov 2016 21:44:22 +0800 Subject: [PATCH 4/4] Hardcode write timeout to 15000 ms --- src/onyx/plugin/datomic.clj | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/onyx/plugin/datomic.clj b/src/onyx/plugin/datomic.clj index 0ef7a7f..92c6faf 100644 --- a/src/onyx/plugin/datomic.clj +++ b/src/onyx/plugin/datomic.clj @@ -458,7 +458,7 @@ (write-batch [_ event] (let [messages (mapcat :leaves (:tree (:onyx.core/results event))) - timeout-ms 10 + timeout-ms 15000 written (-> conn (d/transact (map (fn [msg] (if (and partition (not (sequential? msg))) (assoc msg :db/id (d/tempid partition)) @@ -492,7 +492,7 @@ (write-batch [_ event] - (let [timeout-ms 10 + (let [timeout-ms 15000 written (->> (mapcat :leaves (:tree (:onyx.core/results event))) (map (fn [tx] (d/transact conn (:tx (:message tx))))) @@ -525,7 +525,7 @@ (write-batch [_ event] - (let [timeout-ms 10 + (let [timeout-ms 15000 written (->> (mapcat :leaves (:tree (:onyx.core/results event))) (map (fn [tx] (d/transact-async conn (:tx (:message tx)))))