Skip to content

Commit

Permalink
update workflow only at exit
Browse files Browse the repository at this point in the history
  • Loading branch information
vincentclaes committed Aug 13, 2021
1 parent 3bb3660 commit 971505e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
2 changes: 1 addition & 1 deletion datajob/stepfunctions/stepfunctions_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def __enter__(self):

def __exit__(self, exc_type, exc_value, traceback) -> None:
"""steps we have to do when exiting the context manager."""
self.build_workflow()
_set_workflow(None)
logger.info(f"step functions workflow {self.unique_name} created")

Expand Down Expand Up @@ -270,4 +271,3 @@ def _get_workflow():
def connect(self, other: DataJobBase) -> None:
work_flow = _get_workflow()
work_flow.directed_graph[other].add(self)
work_flow.build_workflow()
13 changes: 8 additions & 5 deletions datajob_tests/stepfunctions/test_stepfunctions_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,15 @@ def test_update_stepfunctions_continuously(self):
test written based on ticket
https://github.com/vincentclaes/datajob/issues/116
Update:
this continous update causes duplicate states. removing it for now.
"""

task1 = stepfunctions_workflow.task(SomeMockedClass("task1"))
task2 = stepfunctions_workflow.task(SomeMockedClass("task2"))
task3 = stepfunctions_workflow.task(SomeMockedClass("task3"))
task4 = stepfunctions_workflow.task(SomeMockedClass("task4"))

djs = DataJobStack(
scope=self.app,
Expand All @@ -181,13 +185,12 @@ def test_update_stepfunctions_continuously(self):
account="3098726354",
)
with StepfunctionsWorkflow(djs, "some-name") as a_step_functions_workflow:
self.assertIsNone(a_step_functions_workflow.workflow)
self.assertIsNone(a_step_functions_workflow.chain_of_tasks)
task1 >> task2
self.assertIsNotNone(a_step_functions_workflow.workflow)
self.assertEqual(len(a_step_functions_workflow.chain_of_tasks.steps), 2)
task2 >> task3
self.assertEqual(len(a_step_functions_workflow.chain_of_tasks.steps), 3)
self.assertIsNone(a_step_functions_workflow.workflow)
self.assertIsNone(a_step_functions_workflow.chain_of_tasks)
self.assertIsNotNone(a_step_functions_workflow.workflow)
self.assertEqual(len(a_step_functions_workflow.chain_of_tasks.steps), 3)

expected_workflow_definition = {
"StartAt": "task1",
Expand Down

0 comments on commit 971505e

Please sign in to comment.