Skip to content

Commit

Permalink
[KED-1963] Add option to run asynchronously via the Kedro CLI (#747)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepyaman authored Sep 8, 2020
1 parent d895a29 commit 8e6e8b2
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 23 deletions.
10 changes: 5 additions & 5 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,15 @@ These can be defined in `src/<package-name>/hooks.py` and added to `.kedro.yml`
* Added ability to disable auto-registered Hooks using `.kedro.yml` (or `pyproject.toml`) configuration file.

## Bug fixes and other changes
* `project_name`, `project_version` and `package_name` now have to be defined in `.kedro.yml` for the projects generated using Kedro 0.16.5+.
* Added option to run asynchronously via the Kedro CLI.
* Absorbed `.isort.cfg` settings into `setup.cfg`.
* `project_name`, `project_version` and `package_name` now have to be defined in `.kedro.yml` for projects generated using Kedro 0.16.5+.
* Packaging a modular pipeline raises an error if the pipeline directory is empty or non-existent.

## Breaking changes to the API

## Thanks for supporting contributions

[Sebastian Bertoli](https://github.com/sebastianbertoli), [Deepyaman Datta](https://github.com/deepyaman)
[Deepyaman Datta](https://github.com/deepyaman), [Bas Nijholt](https://github.com/basnijholt), [Sebastian Bertoli](https://github.com/sebastianbertoli)

# Release 0.16.4

Expand All @@ -46,12 +47,11 @@ These can be defined in `src/<package-name>/hooks.py` and added to `.kedro.yml`
* Fixed a bug for using `ParallelRunner` on Windows.
* Modified `GBQTableDataSet` to load customized results using customized queries from Google Big Query tables.
* Documentation improvements.
* Absorbed `.isort.cfg` settings into `setup.cfg`

## Breaking changes to the API

## Thanks for supporting contributions
[Ajay Bisht](https://github.com/ajb7), [Vijay Sajjanar](https://github.com/vjkr), [Deepyaman Datta](https://github.com/deepyaman), [Sebastian Bertoli](https://github.com/sebastianbertoli), [Shahil Mawjee](https://github.com/s-mawjee), [Louis Guitton](https://github.com/louisguitton), [Emanuel Ferm](https://github.com/eferm), [Bas Nijholt](https://github.com/basnijholt)
[Ajay Bisht](https://github.com/ajb7), [Vijay Sajjanar](https://github.com/vjkr), [Deepyaman Datta](https://github.com/deepyaman), [Sebastian Bertoli](https://github.com/sebastianbertoli), [Shahil Mawjee](https://github.com/s-mawjee), [Louis Guitton](https://github.com/louisguitton), [Emanuel Ferm](https://github.com/eferm)

# Release 0.16.3

Expand Down
2 changes: 1 addition & 1 deletion kedro/runner/parallel_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def __init__(self, max_workers: int = None, is_async: bool = False):
and CPU core count. On windows machines, the max_workers value
cannot be larger than 61 and will be set to min(61, max_workers).
is_async: If True, the node inputs and outputs are loaded and saved
asynchronously with threads. Defaults to False.
asynchronously with threads. Defaults to False.
Raises:
ValueError: bad parameters passed
Expand Down
13 changes: 11 additions & 2 deletions kedro/runner/thread_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
be used to run the ``Pipeline`` in parallel groups formed by toposort
using threads.
"""
import warnings
from collections import Counter
from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait
from itertools import chain
Expand All @@ -46,19 +47,27 @@ class ThreadRunner(AbstractRunner):
using threads.
"""

def __init__(self, max_workers: int = None):
def __init__(self, max_workers: int = None, is_async: bool = False):
"""
Instantiates the runner.
Args:
max_workers: Number of worker processes to spawn. If not set,
calculated automatically based on the pipeline configuration
and CPU core count.
is_async: If True, set to False, because `ThreadRunner`
doesn't support loading and saving the node inputs and
outputs asynchronously with threads. Defaults to False.
Raises:
ValueError: bad parameters passed
"""
# Doesn't support asynchronous inputs loading and outputs saving
if is_async:
warnings.warn(
"`ThreadRunner` doesn't support loading and saving the "
"node inputs and outputs asynchronously with threads. "
"Setting `is_async` to False."
)
super().__init__(is_async=False)

if max_workers is not None and max_workers <= 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
PARALLEL_ARG_HELP = """Run the pipeline using the `ParallelRunner`.
If not specified, use the `SequentialRunner`. This flag cannot be used together
with --runner."""
ASYNC_ARG_HELP = """Load and save node inputs and outputs asynchronously
with threads. If not specified, load and save datasets synchronously."""
TAG_ARG_HELP = """Construct the pipeline using only nodes which have this tag
attached. Option can be used multiple times, what results in a
pipeline constructed from nodes having any of those tags."""
Expand Down Expand Up @@ -169,6 +171,7 @@ def cli():
"--runner", "-r", type=str, default=None, multiple=False, help=RUNNER_ARG_HELP
)
@click.option("--parallel", "-p", is_flag=True, multiple=False, help=PARALLEL_ARG_HELP)
@click.option("--async", "is_async", is_flag=True, multiple=False, help=ASYNC_ARG_HELP)
@env_option
@click.option("--tag", "-t", type=str, multiple=True, help=TAG_ARG_HELP)
@click.option(
Expand All @@ -195,6 +198,7 @@ def run(
env,
parallel,
runner,
is_async,
node_names,
to_nodes,
from_nodes,
Expand All @@ -221,7 +225,7 @@ def run(
context = load_context(Path.cwd(), env=env, extra_params=params)
context.run(
tags=tag,
runner=runner_class(),
runner=runner_class(is_async=is_async),
node_names=node_names,
from_nodes=from_nodes,
to_nodes=to_nodes,
Expand Down
14 changes: 14 additions & 0 deletions tests/runner/test_thread_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,20 @@ def test_init_with_negative_process_count(self):
ThreadRunner(max_workers=-1)


class TestIsAsync:
def test_thread_run(self, fan_out_fan_in, catalog):
catalog.add_feed_dict(dict(A=42))
pattern = (
"`ThreadRunner` doesn't support loading and saving the "
"node inputs and outputs asynchronously with threads. "
"Setting `is_async` to False."
)
with pytest.warns(UserWarning, match=pattern):
result = ThreadRunner(is_async=True).run(fan_out_fan_in, catalog)
assert "Z" in result
assert result["Z"] == (42, 42, 42)


class TestInvalidThreadRunner:
def test_task_exception(self, fan_out_fan_in, catalog):
catalog.add_feed_dict(feed_dict=dict(A=42))
Expand Down
32 changes: 18 additions & 14 deletions tests/template/test_kedro_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,27 +80,25 @@ def test_run_successfully(self, fake_kedro_cli, fake_load_context, mocker):
pipeline_name=None,
)

assert isinstance(
fake_load_context.return_value.run.call_args_list[0][1]["runner"],
SequentialRunner,
)
runner = fake_load_context.return_value.run.call_args_list[0][1]["runner"]
assert isinstance(runner, SequentialRunner)
assert not runner._is_async

def test_with_sequential_runner_and_parallel_flag(
self, fake_kedro_cli, fake_load_context
):
result = CliRunner().invoke(
fake_kedro_cli.cli, ["run", "--parallel", "--runner=SequentialRunner"]
)

assert result.exit_code
assert "Please use either --parallel or --runner" in result.stdout

fake_load_context.return_value.run.assert_not_called()

def test_run_successfully_parallel_via_flag(
self, fake_kedro_cli, fake_load_context, mocker
):
result = CliRunner().invoke(fake_kedro_cli.cli, ["run", "--parallel"])

assert not result.exit_code

fake_load_context.return_value.run.assert_called_once_with(
Expand All @@ -114,23 +112,29 @@ def test_run_successfully_parallel_via_flag(
pipeline_name=None,
)

assert isinstance(
fake_load_context.return_value.run.call_args_list[0][1]["runner"],
ParallelRunner,
)
runner = fake_load_context.return_value.run.call_args_list[0][1]["runner"]
assert isinstance(runner, ParallelRunner)
assert not runner._is_async

def test_run_successfully_parallel_via_name(
self, fake_kedro_cli, fake_load_context
):
result = CliRunner().invoke(
fake_kedro_cli.cli, ["run", "--runner=ParallelRunner"]
)
assert not result.exit_code

runner = fake_load_context.return_value.run.call_args_list[0][1]["runner"]
assert isinstance(runner, ParallelRunner)
assert not runner._is_async

def test_run_async(self, fake_kedro_cli, fake_load_context, mocker):
result = CliRunner().invoke(fake_kedro_cli.cli, ["run", "--async"])
assert not result.exit_code
assert isinstance(
fake_load_context.return_value.run.call_args_list[0][1]["runner"],
ParallelRunner,
)

runner = fake_load_context.return_value.run.call_args_list[0][1]["runner"]
assert isinstance(runner, SequentialRunner)
assert runner._is_async

@pytest.mark.parametrize("config_flag", ["--config", "-c"])
def test_run_with_config(
Expand Down

0 comments on commit 8e6e8b2

Please sign in to comment.