Skip to content

Commit

Permalink
Tutorial work (#220)
Browse files Browse the repository at this point in the history
Move more of the tutorial to be CLI-driven rather than API-driven
  • Loading branch information
schrockn authored Sep 27, 2018
1 parent dea83df commit ad149be
Show file tree
Hide file tree
Showing 11 changed files with 122 additions and 100 deletions.
17 changes: 17 additions & 0 deletions python_modules/dagster/dagster_tests/tutorials/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
part_one:
dagster pipeline execute -f test_intro_tutorial_part_one.py -n define_pipeline

part_two:
dagster pipeline execute -f test_intro_tutorial_part_two.py -n define_pipeline

part_three:
dagster pipeline execute -f test_intro_tutorial_part_three.py -n define_pipeline

part_four:
dagster pipeline execute -f test_intro_tutorial_part_four.py -n define_pipeline

part_five_step_one:
dagster pipeline execute -f test_intro_tutorial_part_five.py -n define_step_one_pipeline

part_five_step_two:
dagster pipeline execute -f test_intro_tutorial_part_five.py -n define_step_two_pipeline
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
context:
config:
log_level: DEBUG
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,35 @@ def solid_two(info):
info.context.error('An error occurred.')


def define_step_one_pipeline():
return PipelineDefinition(solids=[solid_one, solid_two])


def test_tutorial_part_five_sample_one():
pipeline_result = execute_pipeline(PipelineDefinition(solids=[solid_one, solid_two]))
pipeline_result = execute_pipeline(define_step_one_pipeline())

assert pipeline_result.success
return pipeline_result


def define_step_two_pipeline():
return PipelineDefinition(name='part_five_step_two', solids=[solid_one, solid_two])


def test_tutorial_part_five_sample_two():
pipeline_result = execute_pipeline(
PipelineDefinition(name='part_five', solids=[solid_one, solid_two])
)
pipeline_result = execute_pipeline(define_step_two_pipeline())

assert pipeline_result.success
return pipeline_result


def define_step_three_pipeline():
return PipelineDefinition(name='part_five_step_three', solids=[solid_one, solid_two])


def test_tutorial_part_five_sample_three():
pipeline_result = execute_pipeline(
PipelineDefinition(name='part_five', solids=[solid_one, solid_two]),
define_step_two_pipeline(),
config.Environment(context=config.Context(config={'log_level': 'DEBUG'}))
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,27 @@ def solid_d(arg_b, arg_c):
print('d: {d}'.format(d=arg_b * arg_c))


def test_tutorial_part_three():
execute_pipeline(
PipelineDefinition(
solids=[solid_d, solid_c, solid_b, solid_a],
dependencies={
'solid_b': {
'arg_a': DependencyDefinition('solid_a'),
},
'solid_c': {
'arg_a': DependencyDefinition('solid_a'),
},
'solid_d': {
'arg_b': DependencyDefinition('solid_b'),
'arg_c': DependencyDefinition('solid_c'),
}
def define_pipeline():
return PipelineDefinition(
solids=[solid_d, solid_c, solid_b, solid_a],
dependencies={
'solid_b': {
'arg_a': DependencyDefinition('solid_a'),
},
'solid_c': {
'arg_a': DependencyDefinition('solid_a'),
},
'solid_d': {
'arg_b': DependencyDefinition('solid_b'),
'arg_c': DependencyDefinition('solid_c'),
}
)
}
)


def test_tutorial_part_three():
execute_pipeline(define_pipeline())


if __name__ == '__main__':
test_tutorial_part_three()
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ def solid_two(arg_one):
print(arg_one * 2)


def test_tutorial_part_two():
pipeline_result = execute_pipeline(
PipelineDefinition(
solids=[solid_one, solid_two],
dependencies={
'solid_two': {
'arg_one': DependencyDefinition('solid_one'),
},
}
)
def define_pipeline():
return PipelineDefinition(
solids=[solid_one, solid_two],
dependencies={
'solid_two': {
'arg_one': DependencyDefinition('solid_one'),
},
}
)


def test_tutorial_part_two():
pipeline_result = execute_pipeline(define_pipeline())
assert pipeline_result.success
return pipeline_result

Expand Down
45 changes: 21 additions & 24 deletions python_modules/dagster/docs/intro_tutorial/part_five.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,15 @@ Let's use the context for one of its core capabilities: logging.
info.context.error('An error occurred.')
if __name__ == '__main__':
execute_pipeline(
PipelineDefinition(solids=[solid_one, solid_two])
)
def define_step_one_pipeline():
return PipelineDefinition(solids=[solid_one, solid_two])
Save this as part_five.py and run

.. code-block:: sh
$ python3 part_five.py
$ dagster pipeline execute -f part_five.py -n define_step_one_pipeline
...
2018-09-09 07:14:19 - dagster - ERROR - message="An error occurred." pipeline=<<unnamed>> solid=solid_two
2018-09-20 17:44:47 - dagster - INFO - orig_message="Something you should know about occurred." log_message_id="c59070a1-f24c-4ac2-a3d4-42f52122e4c5" pipeline="<<unnamed>>" solid="solid_one" solid_definition="solid_one"
Expand All @@ -48,19 +46,15 @@ For example, let's change the example by adding a name to the pipeline. (Naming
.. code-block:: python
execute_pipeline(
PipelineDefinition(
name='part_five',
solids=[solid_one, solid_two]
)
)
def define_step_two_pipeline():
return PipelineDefinition(name='part_five_step_two', solids=[solid_one, solid_two])
And then run it:
.. code-block:: sh
$ python3 part_five.py
2018-09-09 07:17:31 - dagster - ERROR - message="An error occurred." pipeline=part_five solid=solid_two
$ dagster pipeline execute -f part_five.py -n define_step_two_pipeline
2018-09-09 07:17:31 - dagster - ERROR - message="An error occurred." pipeline=part_five_step_two solid=solid_two
You'll note that the metadata in the log message now has the pipeline name.
Expand All @@ -69,24 +63,27 @@ But what about the info message? The default context provided by dagster logs er
not default. Just like we used the configuration system to configure a particular solid, we also
use that same system to configure a context.
.. code-block:: python
execute_pipeline(
PipelineDefinition(
name='part_five',
solids=[solid_one, solid_two]
),
config.Environment(
context=config.Context(config={'log_level': 'DEBUG'})
),
)
def define_step_three_pipeline():
return PipelineDefinition(name='part_five_step_three', solids=[solid_one, solid_two])
And now we want to configure this. We use a config file:
.. code-block:: yaml
context:
config:
log_level: DEBUG
Save this as ``step_three.yaml``.
If we re-run the pipeline, you'll see a lot more output.
.. code-block:: sh
$ python3 part_five.py
$ dagster pipeline execute -f part_five.py -n define_step_two_pipeline
...
2018-09-09 07:49:51 - dagster - INFO - message="Something you should know about occurred." pipeline=part_five solid=solid_one
2018-09-09 07:49:51 - dagster - INFO - metric:core_transform_time_ms=0.137 pipeline=part_five solid=solid_one
Expand Down
8 changes: 5 additions & 3 deletions python_modules/dagster/docs/intro_tutorial/part_four.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ minimal API. ``solid`` is more complicated, and has more capabilities:
print(info.config)
return conf
def define_pipeline():
return PipelineDefinition(solids=[hello_world])
if __name__ == '__main__':
execute_pipeline(
PipelineDefinition(solids=[hello_world]),
define_pipeline(),
config.Environment(
solids={'hello_world': config.Solid('Hello, World!')}
),
Expand All @@ -51,5 +54,4 @@ So save this example as step_four.py

.. code-block:: sh
$ python3 step_four.py
Hello, World!
$ dagster pipeline execute -f part_four.py -n define_pipeline
10 changes: 3 additions & 7 deletions python_modules/dagster/docs/intro_tutorial/part_seven.rst
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
Pipeline Execution
------------------

So far we've been driving execution of pipelines from python APIs. Now it is
time to instead drive these from the command line.

Similar to the the part six tutorial, we are going to create a pipeline, a repository,
and a yaml file so that the CLI tool can know about the repository.

Expand Down Expand Up @@ -48,9 +45,7 @@ And now the repository file:
fn: define_part_seven_repo
Now we want to execute it from the command line. In order to do that we need to create a yaml file
with all the elements we need to create an environment. The form of this file is very similar
to the in-memory ``config.Environment`` and related objects that were used in previous steps
in the tutorial.
with all the elements we need to create an environment.

.. code-block:: yaml
Expand All @@ -63,7 +58,8 @@ in the tutorial.
config:
word: bar
With these elements in place we can now drive execution from the CLI
With these elements in place we can now drive execution from the CLI specifying only the pipeline name.
The tool loads the repository using the repository.yml file and looks up the pipeline by name.

.. code-block:: sh
Expand Down
18 changes: 9 additions & 9 deletions python_modules/dagster/docs/intro_tutorial/part_six.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ Dagster is a not just a programming model for pipelines, it is also a platform f
tool-building. Included within dagster is a CLI tool, the first tool you'll use
to interact with pipelines.

In order for a tool to interact with a pipeline, it must be made aware of the pipelines.
In previous examples we have specified what file and function a pipeline definition resides
in order to load. However it can get tedious to load individual pipelines. We will want to
be able to browse many pipelines at once and then be able to launch tools against that set
of pipelines without having to specify arguments. This is where repositories and repository
config files come into play.

Dagster has a concept of a repositories which is used to organize a collection of pipelines
(and eventually other concepts). Dagster tools must be pointed to a repository or many repositories
Expand Down Expand Up @@ -34,7 +38,9 @@ Repostories are declared like this:
Save this file as ``part_six.py``.
For tools to operate they must be able to access and create a `Repository`. The current mechanism is to
Dagster tools are easiest to used when they can operate on a repository. It is particularily convenient
to create a config file containing the location and name of the function to create that repository. This
avoids unnecessary and repetitive typing for CL+I tools. The current mechanism is to
create a yaml file (default name: 'repository.yml') where you state the module or file and the
function that creates the repository you want to operate on.
Expand All @@ -57,16 +63,10 @@ Now you should be able to list the pipelines in this repo:
hello_world
For a given project with 1 or more pipelines, you should define a repository for that
project so it is accessible via tools. If you are writing code within a python module
as opposed to a standalone file you can (and should) load the repository via that module.
To do that you can specify a module instead of a file in the repository.yml file.
You can also specify a module instead of a file in the repository.yml file.
.. code-block:: yaml
# To verify this will work, run this in your python enviroment:
# from some_module import define_some_repository
repository:
module: some_module
fn: define_some_repository
Expand Down
8 changes: 2 additions & 6 deletions python_modules/dagster/docs/intro_tutorial/part_three.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,11 @@ Again it is worth noting how we are connecting *inputs* and *outputs* rather tha
Point your attention to the ``solid_d`` entry in the dependencies dictionary. We are declaring
dependencies on a per-input basis.

Save this to a file named ``step_three.py``
Save this to a file named ``part_three.py``

.. code-block:: sh
$ python3 step_three.py
a: 1
b: 2
c: 3
d: 6
$ dagster pipeline execute -f part_three.py -n define_pipeline
In this case ``solid_b`` happens to execute before ``solid_c``. However ``solid_c`` executing
before ``solid_b`` would also be a valid execution order given this DAG.
38 changes: 18 additions & 20 deletions python_modules/dagster/docs/intro_tutorial/part_two.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,27 @@ We will be building a very simple pipeline where the first step returns a hardco
passes that value to the next solid, which concatenates it to itself, and prints it.


.. code-block:: python
.. code-block:: python
@lambda_solid
def solid_one():
return 'foo'
@lambda_solid
def solid_one():
return 'foo'
@lambda_solid(inputs=[InputDefinition('arg_one')])
def solid_two(arg_one):
print(arg_one * 2)
@lambda_solid(inputs=[InputDefinition('arg_one')])
def solid_two(arg_one):
print(arg_one * 2)
def define_pipeline():
return PipelineDefinition(
solids=[solid_one, solid_two],
dependencies={
'solid_two': {
'arg_one': DependencyDefinition('solid_one'),
},
}
)
if __name__ == '__main__':
pipeline = PipelineDefinition(
solids=[solid_one, solid_two],
dependencies={
'solid_two': {
'arg_one': DependencyDefinition('solid_one'),
},
}
)
pipeline_result = execute_pipeline(pipeline)
We have a couple new concepts here.

Expand All @@ -51,9 +50,8 @@ each step in the pipeline -- the solid -- is a *functional* unit of computation.

Save this file to ``step_two.py``

and run
and run it:

.. code-block:: sh
$ python3 step_two.py
foofoo
$ dagster pipeline execute -f part_two.py -n define_pipeline

0 comments on commit ad149be

Please sign in to comment.