Skip to content
This repository was archived by the owner on Jun 15, 2024. It is now read-only.

Commit 1940089

Browse files
committed
Refactor chaining to use execute-steps instead of using own implementation, add ability to unify step-result from children in execute-steps, use both to fix issue where chaining lost intermediate results (#120). Also prepares for #124 and others
1 parent f75cbab commit 1940089

File tree

8 files changed

+186
-163
lines changed

8 files changed

+186
-163
lines changed

CHANGELOG.md

+5
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,13 @@ The official release will have a defined and more stable API. If you are already
1111
* `lambdacd.steps.support/{chain,always-chain,chaining,always-chaining}` now return outputs of individual chained steps (#122)
1212
* Add `lambdacd.steps.support/last-step-status-wins` to coerce a step result into having the status of the last output
1313
to make an always-chained step successful even though it had a failing step in it (#122)
14+
* Add `:unify-results-fn` to unify the whole step-result, not just the step status from children in `core/execute-steps`
1415
* Bug fixes:
1516
* Refactored merging of step results and resolved overly broad merging behavior (see breaking changes)
17+
* Chaining no longer loses intermediate results (fixes #120)
18+
* API changes:
19+
* The `:unify-status-fn` parameter in `core/execute-steps` is now deprecated and will be removed in subsequent releases.
20+
Use `:unify-results-fn` instead.
1621
* Breaking changes:
1722
* Changed behavior of step-merging in some edge-cases where it was merging with special behavior in cases that were not necessary.
1823
This change should not affect normal pipeline behavior unless they rely on this very edge-case.

src/clj/lambdacd/internal/execution.clj

+58-40
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,18 @@
2929
:step-result step-result}]
3030
(event-bus/publish!! ctx :step-result-updated payload)))
3131

32-
(defn process-channel-result-async [c {step-id :step-id build-number :build-number :as ctx}]
32+
(defn- append-result [cur-result [key value]]
33+
(-> cur-result
34+
(assoc key value)
35+
(attach-wait-indicator-if-necessary key value)))
36+
37+
(defn- process-channel-result-async [c {step-id :step-id build-number :build-number :as ctx}]
3338
(async/go-loop [cur-result {:status :running}]
34-
(let [[key value] (async/<! c)
35-
new-result (-> cur-result
36-
(assoc key value)
37-
(attach-wait-indicator-if-necessary key value))]
38-
(if (and (nil? key) (nil? value))
39+
(let [ev (async/<! c)
40+
new-result (if (map? ev)
41+
ev
42+
(append-result cur-result ev))]
43+
(if (nil? ev)
3944
cur-result
4045
(do
4146
(event-bus/publish! ctx :step-result-updated {:build-number build-number
@@ -140,10 +145,10 @@
140145
(step-output step-id complete-step-result)))
141146

142147
(defn merge-two-step-results [r1 r2]
143-
(step-results/merge-step-results r1 r2 :resolvers [step-results/status-resolver
144-
step-results/merge-nested-maps-resolver
145-
step-results/combine-to-list-resolver
146-
step-results/second-wins-resolver]))
148+
(step-results/merge-two-step-results r1 r2 :resolvers [step-results/status-resolver
149+
step-results/merge-nested-maps-resolver
150+
step-results/combine-to-list-resolver
151+
step-results/second-wins-resolver]))
147152

148153
(defn- to-context-and-step [ctx]
149154
(fn [idx step]
@@ -152,18 +157,18 @@
152157
step-ctx (assoc ctx :step-id new-step-id)]
153158
[step-ctx step])))
154159

155-
(defn- process-inheritance [step-results-channel unify-status-fn]
160+
(defn- process-inheritance [step-results-channel unify-results-fn]
156161
(let [out-ch (async/chan)]
157162
(async/go
158-
(loop [statuses {}]
159-
(if-let [step-result-update (async/<! step-results-channel)]
160-
(let [step-status (get-in step-result-update [:step-result :status])
161-
new-statuses (assoc statuses (:step-id step-result-update) step-status)
162-
old-unified (unify-status-fn (vals statuses))
163-
new-unified (unify-status-fn (vals new-statuses))]
163+
(loop [results {}]
164+
(if-let [{step-id :step-id
165+
step-result :step-result} (async/<! step-results-channel)]
166+
(let [new-results (assoc results step-id step-result)
167+
old-unified (unify-results-fn results)
168+
new-unified (unify-results-fn new-results)]
164169
(if (not= old-unified new-unified)
165-
(async/>! out-ch [:status new-unified]))
166-
(recur new-statuses))
170+
(async/>! out-ch new-unified))
171+
(recur new-results))
167172
(async/close! out-ch))))
168173
out-ch))
169174

@@ -184,25 +189,30 @@
184189
args-with-old-and-new-globals))
185190

186191

187-
(defn- keep-original-args [old-args step-result]
192+
(defn keep-original-args [old-args step-result]
188193
(merge old-args step-result))
189194

190-
(defn- serial-step-result-producer [args s-with-id]
191-
(loop [result ()
192-
remaining-steps-with-id s-with-id
193-
cur-args args]
194-
(if (empty? remaining-steps-with-id)
195-
result
196-
(let [ctx-and-step (first remaining-steps-with-id)
197-
step-result (execute-step cur-args ctx-and-step)
198-
step-output (first (vals (:outputs step-result)))
199-
new-result (cons step-result result)
200-
new-args (->> step-output
201-
(keep-globals cur-args)
202-
(keep-original-args args))]
203-
(if (not= :success (:status step-result))
204-
new-result
205-
(recur (cons step-result result) (rest remaining-steps-with-id) new-args))))))
195+
(defn not-success? [step-result]
196+
(not= :success (:status step-result)))
197+
198+
(defn serial-step-result-producer [& {:keys [stop-predicate]
199+
:or {stop-predicate not-success?}}]
200+
(fn [args s-with-id]
201+
(loop [result ()
202+
remaining-steps-with-id s-with-id
203+
cur-args args]
204+
(if (empty? remaining-steps-with-id)
205+
result
206+
(let [ctx-and-step (first remaining-steps-with-id)
207+
step-result (execute-step cur-args ctx-and-step)
208+
step-output (first (vals (:outputs step-result)))
209+
new-result (cons step-result result)
210+
new-args (->> step-output
211+
(keep-globals cur-args)
212+
(keep-original-args args))]
213+
(if (stop-predicate step-result)
214+
new-result
215+
(recur (cons step-result result) (rest remaining-steps-with-id) new-args)))))))
206216

207217
(defn- inherit-message-from-parent? [parent-ctx]
208218
(fn [msg]
@@ -259,19 +269,27 @@
259269

260270
(def not-nil? (complement nil?))
261271

262-
(defn execute-steps [steps args ctx & {:keys [step-result-producer is-killed unify-status-fn retrigger-predicate]
263-
:or {step-result-producer serial-step-result-producer
272+
(defn unify-only-status [unify-status-fn]
273+
(fn [step-results]
274+
{:status (unify-status-fn (->> step-results
275+
(vals)
276+
(map :status)))}))
277+
278+
(defn execute-steps [steps args ctx & {:keys [step-result-producer is-killed unify-status-fn unify-results-fn retrigger-predicate]
279+
:or {step-result-producer (serial-step-result-producer)
264280
is-killed (atom false)
265281
unify-status-fn status/successful-when-all-successful
282+
unify-results-fn nil ; dependent on unify-status-fn, can't have it here for now
266283
retrigger-predicate sequential-retrigger-predicate}}]
267-
(let [steps (filter not-nil? steps)
284+
(let [unify-results-fn (or unify-results-fn (unify-only-status unify-status-fn))
285+
steps (filter not-nil? steps)
268286
base-ctx-with-kill-switch (assoc ctx :is-killed is-killed)
269287
subscription (event-bus/subscribe ctx :step-result-updated)
270288
children-step-results-channel (->> subscription
271289
(event-bus/only-payload)
272290
(async/filter< (inherit-message-from-parent? ctx)))
273291
step-contexts (contexts-for-steps steps base-ctx-with-kill-switch)
274-
_ (inherit-from children-step-results-channel (:result-channel ctx) unify-status-fn)
292+
_ (inherit-from children-step-results-channel (:result-channel ctx) unify-results-fn)
275293
step-contexts-with-retrigger-mocks (add-retrigger-mocks retrigger-predicate ctx step-contexts)
276294
step-results (step-result-producer args step-contexts-with-retrigger-mocks)
277295
result (reduce merge-two-step-results step-results)]

src/clj/lambdacd/steps/result.clj

+6-5
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@
3434
(filter (complement nil?))
3535
(first))))
3636

37-
(defn merge-step-results [a b & {:keys [resolvers]
38-
:or {resolvers [status-resolver
39-
merge-nested-maps-resolver
40-
second-wins-resolver]}}]
37+
(defn merge-two-step-results [a b & {:keys [resolvers]
38+
:or {resolvers [status-resolver
39+
merge-nested-maps-resolver
40+
second-wins-resolver]}}]
4141
(utils/merge-with-k-v (resolve-first-matching resolvers) a b))
4242

43-
43+
(defn merge-step-results [step-results merger] ; TODO: test this!
44+
(reduce merger {} step-results))
4445

4546

src/clj/lambdacd/steps/support.clj

+41-48
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,53 @@
11
(ns lambdacd.steps.support
2-
(:require [clojure.string :as s]
3-
[clojure.core.async :as async]
2+
(:require [clojure.core.async :as async]
43
[lambdacd.internal.execution :as execution]
54
[clojure.walk :as walk]
65
[lambdacd.step-id :as step-id]
7-
[lambdacd.steps.status :as status]
86
[lambdacd.steps.result :as step-results])
9-
(:import (java.io PrintWriter Writer StringWriter PrintStream)
10-
(org.apache.commons.io.output WriterOutputStream)))
11-
12-
(defn- do-chain-steps-final-result [merged-result all-outputs]
13-
(assoc merged-result :outputs all-outputs))
14-
15-
(defn merge-step-results-with-joined-output [a b]
16-
(step-results/merge-step-results a b :resolvers [step-results/status-resolver
17-
step-results/merge-nested-maps-resolver
18-
step-results/join-output-resolver
19-
step-results/second-wins-resolver]))
20-
21-
22-
(defn- do-chain-steps [stop-on-step-failure args ctx steps]
23-
"run the given steps one by one until a step fails and merge the results.
24-
results of one step are the inputs for the next one."
25-
(loop [counter 1
26-
x (first steps)
27-
rest (rest steps)
28-
result {}
29-
all-outputs {}
30-
args args]
31-
(if (nil? x)
32-
(do-chain-steps-final-result result all-outputs)
33-
(let [step-result (x args ctx)
34-
complete-result (merge-step-results-with-joined-output result step-result)
35-
next-args (merge args complete-result)
36-
step-failed (and
37-
(not= :success (:status step-result))
38-
(not= nil step-result))
39-
child-step-id (step-id/child-id (:step-id ctx) counter)
40-
new-all-outputs (assoc all-outputs child-step-id step-result)]
41-
(if (and stop-on-step-failure step-failed)
42-
(do-chain-steps-final-result complete-result new-all-outputs)
43-
(recur (inc counter)
44-
(first rest)
45-
(next rest)
46-
complete-result
47-
new-all-outputs
48-
next-args))))))
49-
50-
(defn always-chain-steps
51-
([args ctx & steps]
52-
(do-chain-steps false args ctx steps)))
7+
(:import (java.io Writer StringWriter)))
8+
9+
(defn- merge-step-results-with-joined-output [a b]
10+
(step-results/merge-two-step-results a b :resolvers [step-results/status-resolver
11+
step-results/merge-nested-maps-resolver
12+
step-results/join-output-resolver
13+
step-results/second-wins-resolver]))
14+
15+
(defn- wrap-step-to-allow-nil-values [step]
16+
(fn [args ctx]
17+
(let [result (step args ctx)]
18+
(if (nil? result)
19+
args
20+
result))))
21+
22+
(defn- step-results-sorted-by-id [outputs]
23+
(->> outputs
24+
(into (sorted-map-by step-id/before?))
25+
(vals)))
26+
27+
(defn unify-results [step-results]
28+
(-> step-results
29+
(step-results-sorted-by-id)
30+
(step-results/merge-step-results merge-step-results-with-joined-output)))
31+
32+
(defn- do-chain-steps-with-execute-steps [args ctx steps step-result-producer]
33+
(let [execute-step-result (execution/execute-steps steps args ctx
34+
:step-result-producer step-result-producer
35+
:unify-results-fn unify-results)
36+
sorted-step-results (step-results-sorted-by-id (:outputs execute-step-result))
37+
merged-step-results (step-results/merge-step-results sorted-step-results merge-step-results-with-joined-output)]
38+
(merge merged-step-results execute-step-result)))
5339

5440
(defn chain-steps
5541
([args ctx & steps]
56-
(do-chain-steps true args ctx steps)))
42+
(do-chain-steps-with-execute-steps args ctx
43+
(map wrap-step-to-allow-nil-values steps)
44+
(execution/serial-step-result-producer))))
5745

46+
(defn always-chain-steps
47+
([args ctx & steps]
48+
(do-chain-steps-with-execute-steps args ctx
49+
(map wrap-step-to-allow-nil-values steps)
50+
(execution/serial-step-result-producer :stop-predicate (constantly false)))))
5851

5952
(defn to-fn [form]
6053
(let [f# (first form)

test/clj/lambdacd/internal/execution_test.clj

+16-10
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@
7373
(Thread/sleep 10) ; wait for a bit so that the success doesn't screw up the order of events tested
7474
{:status :success})
7575

76+
(defn some-step-building-up-result-state-incrementally-and-resetting [_ {c :result-channel}]
77+
(async/>!! c [:out "hello"])
78+
(async/>!! c [:some-value 42])
79+
(async/>!! c {:status :running :other-value 21})
80+
(Thread/sleep 10) ; wait for a bit so that the success doesn't screw up the order of events tested
81+
{:status :success})
82+
7683
(defn some-step-sending-a-wait [_ {c :result-channel}]
7784
(async/>!! c [:status :waiting])
7885
(Thread/sleep 10) ; wait for a bit so that the success doesn't screw up the order of events tested
@@ -171,28 +178,27 @@
171178
(testing "that in doubt, the static output overlays the async output"
172179
(is (= {:outputs {[0 0] {:status :success } } :status :success }
173180
(execute-step {} [(some-ctx-with :step-id [0 0]) some-step-that-sends-failure-on-ch-returns-success]))))
174-
(testing "that we can pass in a step-results-channel that receives messages with the complete, accumulated step result"
175-
(let [ctx (some-ctx-with :step-id [0 0]
176-
:build-number 5
177-
:pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state))
181+
(testing "that the accumulated step-result is sent over the event-bus"
182+
(let [ctx (some-ctx-with :step-id [0 0]
183+
:build-number 5
184+
:pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state))
178185
step-results-channel (step-result-updates-for ctx)]
179186
(execute-step {} [ctx some-step-building-up-result-state-incrementally])
180187
(is (= [{:build-number 5 :step-id [0 0] :step-result {:status :running } }
181188
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello"} }
182189
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello world"} }
183190
{:build-number 5 :step-id [0 0] :step-result {:status :running :some-value 42 :out "hello world"} }
184-
{:build-number 5 :step-id [0 0] :step-result {:status :success :some-value 42 :out "hello world"} }] (slurp-chan step-results-channel)))))
185-
(testing "that the accumulated step-result is sent over the event-bus"
191+
{:build-number 5 :step-id [0 0] :step-result {:status :success :some-value 42 :out "hello world"} }] (slurp-chan step-results-channel)))))(testing "that the accumulated step-result is sent over the event-bus and can be resetted"
186192
(let [ctx (some-ctx-with :step-id [0 0]
187193
:build-number 5
188194
:pipeline-state-component (noop-pipeline-state/new-no-op-pipeline-state))
189195
step-results-channel (step-result-updates-for ctx)]
190-
(execute-step {} [ctx some-step-building-up-result-state-incrementally])
196+
(execute-step {} [ctx some-step-building-up-result-state-incrementally-and-resetting])
191197
(is (= [{:build-number 5 :step-id [0 0] :step-result {:status :running } }
192198
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello"} }
193-
{:build-number 5 :step-id [0 0] :step-result {:status :running :out "hello world"} }
194-
{:build-number 5 :step-id [0 0] :step-result {:status :running :some-value 42 :out "hello world"} }
195-
{:build-number 5 :step-id [0 0] :step-result {:status :success :some-value 42 :out "hello world"} }] (slurp-chan step-results-channel)))))
199+
{:build-number 5 :step-id [0 0] :step-result {:status :running :some-value 42 :out "hello"} }
200+
{:build-number 5 :step-id [0 0] :step-result {:status :running :other-value 21} }
201+
{:build-number 5 :step-id [0 0] :step-result {:status :success :other-value 21} }] (slurp-chan step-results-channel)))))
196202
(testing "that the event bus is notified when a step finishes"
197203
(let [ctx (some-ctx-with :build-number 3
198204
:step-id [1 2 3])

0 commit comments

Comments
 (0)