From 6250cc6b3332112d74aab3bf9d4ba2186396df90 Mon Sep 17 00:00:00 2001 From: Ryan Sundberg Date: Sat, 4 May 2019 13:12:59 -0700 Subject: [PATCH] Abort HTTP retry if we stop the peer #6 --- src/onyx/plugin/http_output.clj | 32 +++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/src/onyx/plugin/http_output.clj b/src/onyx/plugin/http_output.clj index 1dd9052..c0a3bce 100644 --- a/src/onyx/plugin/http_output.clj +++ b/src/onyx/plugin/http_output.clj @@ -37,7 +37,7 @@ :exception (pr-str e)}])))) -(defn process-message [message success? post-process ack-fn async-exception-fn retry-params] +(defn process-message [message success? post-process ack-fn async-exception-fn retry-params run-state] "Retry params: - allow-retry? - if we will retry or not - initial-request-time - time of first request @@ -62,11 +62,15 @@ (ack-fn)) next-backoff-ms - (do - (log/debugf "Backing off HTTP request: %S %s %.30s next retry in %d ms" - (name method) url args next-backoff-ms) - (Thread/sleep next-backoff-ms) - (d/recur (inc attempt))) + (if @run-state + (do + (log/debugf "Backing off HTTP request: %S %s %.30s next retry in %d ms" + (name method) url args next-backoff-ms) + (Thread/sleep next-backoff-ms) + (d/recur (inc attempt))) + (do + (log/warnf "Aborting HTTP %s request to %s due to stopped peer" (name method) url) + (async-exception-fn response))) :else (async-exception-fn response)))))))) @@ -77,10 +81,16 @@ (deftype HttpOutput [success? post-process retry-params ^:unsynchronized-mutable async-exception-info - ^:unsynchronized-mutable in-flight-writes] + ^:unsynchronized-mutable in-flight-writes + run-state] p/Plugin - (start [this event] this) - (stop [this event] this) + (start [this event] + (reset! run-state true) + this) + + (stop [this event] + (reset! run-state false) + this) p/BarrierSynchronization (synced? [this epoch] @@ -112,7 +122,7 @@ (run! (fn [message] (swap! in-flight-writes inc) (process-message message - success? post-process ack-fn async-exception-fn retry)) + success? post-process ack-fn async-exception-fn retry run-state)) write-batch)) true)) @@ -132,4 +142,4 @@ ::post-process-default)) retry-params (:http-output/retry-params task-map) retry-params (assoc retry-params :allow-retry? (some? retry-params))] - (->HttpOutput success? post-process retry-params (atom nil) (atom 0)))) + (->HttpOutput success? post-process retry-params (atom nil) (atom 0) (atom false))))