diff --git a/python_modules/dagster/dagster_tests/tutorials/Makefile b/python_modules/dagster/dagster_tests/tutorials/Makefile new file mode 100644 index 0000000000000..c68cf7f4b74c9 --- /dev/null +++ b/python_modules/dagster/dagster_tests/tutorials/Makefile @@ -0,0 +1,17 @@ +part_one: + dagster pipeline execute -f test_intro_tutorial_part_one.py -n define_pipeline + +part_two: + dagster pipeline execute -f test_intro_tutorial_part_two.py -n define_pipeline + +part_three: + dagster pipeline execute -f test_intro_tutorial_part_three.py -n define_pipeline + +part_four: + dagster pipeline execute -f test_intro_tutorial_part_four.py -n define_pipeline + +part_five_step_one: + dagster pipeline execute -f test_intro_tutorial_part_five.py -n define_step_one_pipeline + +part_five_step_two: + dagster pipeline execute -f test_intro_tutorial_part_five.py -n define_step_two_pipeline diff --git a/python_modules/dagster/dagster_tests/tutorials/part_five_step_three.yml b/python_modules/dagster/dagster_tests/tutorials/part_five_step_three.yml new file mode 100644 index 0000000000000..fe53e1d44e9d8 --- /dev/null +++ b/python_modules/dagster/dagster_tests/tutorials/part_five_step_three.yml @@ -0,0 +1,3 @@ +context: + config: + log_level: DEBUG diff --git a/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_five.py b/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_five.py index 12d88bb98b80e..6095af4f65acc 100644 --- a/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_five.py +++ b/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_five.py @@ -18,25 +18,35 @@ def solid_two(info): info.context.error('An error occurred.') +def define_step_one_pipeline(): + return PipelineDefinition(solids=[solid_one, solid_two]) + + def test_tutorial_part_five_sample_one(): - pipeline_result = execute_pipeline(PipelineDefinition(solids=[solid_one, solid_two])) + pipeline_result = execute_pipeline(define_step_one_pipeline()) assert pipeline_result.success return pipeline_result +def define_step_two_pipeline(): + return PipelineDefinition(name='part_five_step_two', solids=[solid_one, solid_two]) + + def test_tutorial_part_five_sample_two(): - pipeline_result = execute_pipeline( - PipelineDefinition(name='part_five', solids=[solid_one, solid_two]) - ) + pipeline_result = execute_pipeline(define_step_two_pipeline()) assert pipeline_result.success return pipeline_result +def define_step_three_pipeline(): + return PipelineDefinition(name='part_five_step_three', solids=[solid_one, solid_two]) + + def test_tutorial_part_five_sample_three(): pipeline_result = execute_pipeline( - PipelineDefinition(name='part_five', solids=[solid_one, solid_two]), + define_step_two_pipeline(), config.Environment(context=config.Context(config={'log_level': 'DEBUG'})) ) diff --git a/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_three.py b/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_three.py index 8fb8241d247b1..55605239ce6ba 100644 --- a/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_three.py +++ b/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_three.py @@ -37,25 +37,27 @@ def solid_d(arg_b, arg_c): print('d: {d}'.format(d=arg_b * arg_c)) -def test_tutorial_part_three(): - execute_pipeline( - PipelineDefinition( - solids=[solid_d, solid_c, solid_b, solid_a], - dependencies={ - 'solid_b': { - 'arg_a': DependencyDefinition('solid_a'), - }, - 'solid_c': { - 'arg_a': DependencyDefinition('solid_a'), - }, - 'solid_d': { - 'arg_b': DependencyDefinition('solid_b'), - 'arg_c': DependencyDefinition('solid_c'), - } +def define_pipeline(): + return PipelineDefinition( + solids=[solid_d, solid_c, solid_b, solid_a], + dependencies={ + 'solid_b': { + 'arg_a': DependencyDefinition('solid_a'), + }, + 'solid_c': { + 'arg_a': DependencyDefinition('solid_a'), + }, + 'solid_d': { + 'arg_b': DependencyDefinition('solid_b'), + 'arg_c': DependencyDefinition('solid_c'), } - ) + } ) +def test_tutorial_part_three(): + execute_pipeline(define_pipeline()) + + if __name__ == '__main__': test_tutorial_part_three() diff --git a/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_two.py b/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_two.py index 8271b4e4435e5..3ea30edef4a35 100644 --- a/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_two.py +++ b/python_modules/dagster/dagster_tests/tutorials/test_intro_tutorial_part_two.py @@ -18,18 +18,19 @@ def solid_two(arg_one): print(arg_one * 2) -def test_tutorial_part_two(): - pipeline_result = execute_pipeline( - PipelineDefinition( - solids=[solid_one, solid_two], - dependencies={ - 'solid_two': { - 'arg_one': DependencyDefinition('solid_one'), - }, - } - ) +def define_pipeline(): + return PipelineDefinition( + solids=[solid_one, solid_two], + dependencies={ + 'solid_two': { + 'arg_one': DependencyDefinition('solid_one'), + }, + } ) + +def test_tutorial_part_two(): + pipeline_result = execute_pipeline(define_pipeline()) assert pipeline_result.success return pipeline_result diff --git a/python_modules/dagster/docs/intro_tutorial/part_five.rst b/python_modules/dagster/docs/intro_tutorial/part_five.rst index 675af7c4cb464..232db1e6c916c 100644 --- a/python_modules/dagster/docs/intro_tutorial/part_five.rst +++ b/python_modules/dagster/docs/intro_tutorial/part_five.rst @@ -24,17 +24,15 @@ Let's use the context for one of its core capabilities: logging. info.context.error('An error occurred.') - if __name__ == '__main__': - execute_pipeline( - PipelineDefinition(solids=[solid_one, solid_two]) - ) + def define_step_one_pipeline(): + return PipelineDefinition(solids=[solid_one, solid_two]) Save this as part_five.py and run .. code-block:: sh - $ python3 part_five.py + $ dagster pipeline execute -f part_five.py -n define_step_one_pipeline ... 2018-09-09 07:14:19 - dagster - ERROR - message="An error occurred." pipeline=<> solid=solid_two 2018-09-20 17:44:47 - dagster - INFO - orig_message="Something you should know about occurred." log_message_id="c59070a1-f24c-4ac2-a3d4-42f52122e4c5" pipeline="<>" solid="solid_one" solid_definition="solid_one" @@ -48,19 +46,15 @@ For example, let's change the example by adding a name to the pipeline. (Naming .. code-block:: python - execute_pipeline( - PipelineDefinition( - name='part_five', - solids=[solid_one, solid_two] - ) - ) + def define_step_two_pipeline(): + return PipelineDefinition(name='part_five_step_two', solids=[solid_one, solid_two]) And then run it: .. code-block:: sh - $ python3 part_five.py - 2018-09-09 07:17:31 - dagster - ERROR - message="An error occurred." pipeline=part_five solid=solid_two + $ dagster pipeline execute -f part_five.py -n define_step_two_pipeline + 2018-09-09 07:17:31 - dagster - ERROR - message="An error occurred." pipeline=part_five_step_two solid=solid_two You'll note that the metadata in the log message now has the pipeline name. @@ -69,24 +63,27 @@ But what about the info message? The default context provided by dagster logs er not default. Just like we used the configuration system to configure a particular solid, we also use that same system to configure a context. - .. code-block:: python - execute_pipeline( - PipelineDefinition( - name='part_five', - solids=[solid_one, solid_two] - ), - config.Environment( - context=config.Context(config={'log_level': 'DEBUG'}) - ), - ) + def define_step_three_pipeline(): + return PipelineDefinition(name='part_five_step_three', solids=[solid_one, solid_two]) + +And now we want to configure this. We use a config file: + +.. code-block:: yaml + + context: + config: + log_level: DEBUG + + +Save this as ``step_three.yaml``. If we re-run the pipeline, you'll see a lot more output. .. code-block:: sh - $ python3 part_five.py + $ dagster pipeline execute -f part_five.py -n define_step_two_pipeline ... 2018-09-09 07:49:51 - dagster - INFO - message="Something you should know about occurred." pipeline=part_five solid=solid_one 2018-09-09 07:49:51 - dagster - INFO - metric:core_transform_time_ms=0.137 pipeline=part_five solid=solid_one diff --git a/python_modules/dagster/docs/intro_tutorial/part_four.rst b/python_modules/dagster/docs/intro_tutorial/part_four.rst index 4488fff4ce5e8..a78fb86b21ef3 100644 --- a/python_modules/dagster/docs/intro_tutorial/part_four.rst +++ b/python_modules/dagster/docs/intro_tutorial/part_four.rst @@ -23,10 +23,13 @@ minimal API. ``solid`` is more complicated, and has more capabilities: print(info.config) return conf + def define_pipeline(): + return PipelineDefinition(solids=[hello_world]) + if __name__ == '__main__': execute_pipeline( - PipelineDefinition(solids=[hello_world]), + define_pipeline(), config.Environment( solids={'hello_world': config.Solid('Hello, World!')} ), @@ -51,5 +54,4 @@ So save this example as step_four.py .. code-block:: sh - $ python3 step_four.py - Hello, World! + $ dagster pipeline execute -f part_four.py -n define_pipeline diff --git a/python_modules/dagster/docs/intro_tutorial/part_seven.rst b/python_modules/dagster/docs/intro_tutorial/part_seven.rst index 7203c8d995ea0..42bd34d9b94a5 100644 --- a/python_modules/dagster/docs/intro_tutorial/part_seven.rst +++ b/python_modules/dagster/docs/intro_tutorial/part_seven.rst @@ -1,9 +1,6 @@ Pipeline Execution ------------------ -So far we've been driving execution of pipelines from python APIs. Now it is -time to instead drive these from the command line. - Similar to the the part six tutorial, we are going to create a pipeline, a repository, and a yaml file so that the CLI tool can know about the repository. @@ -48,9 +45,7 @@ And now the repository file: fn: define_part_seven_repo Now we want to execute it from the command line. In order to do that we need to create a yaml file -with all the elements we need to create an environment. The form of this file is very similar -to the in-memory ``config.Environment`` and related objects that were used in previous steps -in the tutorial. +with all the elements we need to create an environment. .. code-block:: yaml @@ -63,7 +58,8 @@ in the tutorial. config: word: bar -With these elements in place we can now drive execution from the CLI +With these elements in place we can now drive execution from the CLI specifying only the pipeline name. +The tool loads the repository using the repository.yml file and looks up the pipeline by name. .. code-block:: sh diff --git a/python_modules/dagster/docs/intro_tutorial/part_six.rst b/python_modules/dagster/docs/intro_tutorial/part_six.rst index 5a468e2154e26..e8f59c1dbdf20 100644 --- a/python_modules/dagster/docs/intro_tutorial/part_six.rst +++ b/python_modules/dagster/docs/intro_tutorial/part_six.rst @@ -5,7 +5,11 @@ Dagster is a not just a programming model for pipelines, it is also a platform f tool-building. Included within dagster is a CLI tool, the first tool you'll use to interact with pipelines. -In order for a tool to interact with a pipeline, it must be made aware of the pipelines. +In previous examples we have specified what file and function a pipeline definition resides +in order to load. However it can get tedious to load individual pipelines. We will want to +be able to browse many pipelines at once and then be able to launch tools against that set +of pipelines without having to specify arguments. This is where repositories and repository +config files come into play. Dagster has a concept of a repositories which is used to organize a collection of pipelines (and eventually other concepts). Dagster tools must be pointed to a repository or many repositories @@ -34,7 +38,9 @@ Repostories are declared like this: Save this file as ``part_six.py``. -For tools to operate they must be able to access and create a `Repository`. The current mechanism is to +Dagster tools are easiest to used when they can operate on a repository. It is particularily convenient +to create a config file containing the location and name of the function to create that repository. This +avoids unnecessary and repetitive typing for CL+I tools. The current mechanism is to create a yaml file (default name: 'repository.yml') where you state the module or file and the function that creates the repository you want to operate on. @@ -57,16 +63,10 @@ Now you should be able to list the pipelines in this repo: hello_world -For a given project with 1 or more pipelines, you should define a repository for that -project so it is accessible via tools. If you are writing code within a python module -as opposed to a standalone file you can (and should) load the repository via that module. - -To do that you can specify a module instead of a file in the repository.yml file. +You can also specify a module instead of a file in the repository.yml file. .. code-block:: yaml - # To verify this will work, run this in your python enviroment: - # from some_module import define_some_repository repository: module: some_module fn: define_some_repository diff --git a/python_modules/dagster/docs/intro_tutorial/part_three.rst b/python_modules/dagster/docs/intro_tutorial/part_three.rst index bb9a217b0f010..15bc78a65a27d 100644 --- a/python_modules/dagster/docs/intro_tutorial/part_three.rst +++ b/python_modules/dagster/docs/intro_tutorial/part_three.rst @@ -65,15 +65,11 @@ Again it is worth noting how we are connecting *inputs* and *outputs* rather tha Point your attention to the ``solid_d`` entry in the dependencies dictionary. We are declaring dependencies on a per-input basis. -Save this to a file named ``step_three.py`` +Save this to a file named ``part_three.py`` .. code-block:: sh - $ python3 step_three.py - a: 1 - b: 2 - c: 3 - d: 6 + $ dagster pipeline execute -f part_three.py -n define_pipeline In this case ``solid_b`` happens to execute before ``solid_c``. However ``solid_c`` executing before ``solid_b`` would also be a valid execution order given this DAG. diff --git a/python_modules/dagster/docs/intro_tutorial/part_two.rst b/python_modules/dagster/docs/intro_tutorial/part_two.rst index 4c937ffa4ef36..77310606c67a6 100644 --- a/python_modules/dagster/docs/intro_tutorial/part_two.rst +++ b/python_modules/dagster/docs/intro_tutorial/part_two.rst @@ -8,28 +8,27 @@ We will be building a very simple pipeline where the first step returns a hardco passes that value to the next solid, which concatenates it to itself, and prints it. - .. code-block:: python +.. code-block:: python - @lambda_solid - def solid_one(): - return 'foo' + @lambda_solid + def solid_one(): + return 'foo' - @lambda_solid(inputs=[InputDefinition('arg_one')]) - def solid_two(arg_one): - print(arg_one * 2) + @lambda_solid(inputs=[InputDefinition('arg_one')]) + def solid_two(arg_one): + print(arg_one * 2) + def define_pipeline(): + return PipelineDefinition( + solids=[solid_one, solid_two], + dependencies={ + 'solid_two': { + 'arg_one': DependencyDefinition('solid_one'), + }, + } + ) - if __name__ == '__main__': - pipeline = PipelineDefinition( - solids=[solid_one, solid_two], - dependencies={ - 'solid_two': { - 'arg_one': DependencyDefinition('solid_one'), - }, - } - ) - pipeline_result = execute_pipeline(pipeline) We have a couple new concepts here. @@ -51,9 +50,8 @@ each step in the pipeline -- the solid -- is a *functional* unit of computation. Save this file to ``step_two.py`` -and run +and run it: .. code-block:: sh - $ python3 step_two.py - foofoo \ No newline at end of file + $ dagster pipeline execute -f part_two.py -n define_pipeline