diff --git a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/__init__.py b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/__init__.py index 7b38cde4f3f3f..07a903cd005f3 100644 --- a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/__init__.py +++ b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/__init__.py @@ -1,10 +1,10 @@ from dagster import Definitions -from dagster._core.pipes.subprocess import PipesSubprocess +from dagster._core.pipes.subprocess import PipesSubprocessClient from .domain_specific_dsl.stocks_dsl import get_stocks_dsl_example_defs from .pure_assets_dsl.assets_dsl import get_asset_dsl_example_defs defs = Definitions( assets=get_asset_dsl_example_defs() + get_stocks_dsl_example_defs(), - resources={"subprocess_resource": PipesSubprocess()}, + resources={"pipes_subprocess_client": PipesSubprocessClient()}, ) diff --git a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/domain_specific_dsl/stocks_dsl.py b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/domain_specific_dsl/stocks_dsl.py index 79d4002816e87..a3dd2c3cf3677 100644 --- a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/domain_specific_dsl/stocks_dsl.py +++ b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/domain_specific_dsl/stocks_dsl.py @@ -12,7 +12,7 @@ from dagster import AssetKey, AssetsDefinition, asset, file_relative_path, multi_asset from dagster._core.definitions.asset_spec import AssetSpec -from dagster._core.pipes.subprocess import PipesSubprocess +from dagster._core.pipes.subprocess import PipesSubprocessClient def load_yaml(relative_path: str) -> Dict[str, Any]: @@ -86,11 +86,13 @@ def spec_for_stock_info(stock_info: StockInfo) -> AssetSpec: ticker_specs = [spec_for_stock_info(stock_info) for stock_info in stock_assets.stock_infos] @multi_asset(specs=ticker_specs) - def fetch_the_tickers(context: AssetExecutionContext, subprocess_resource: PipesSubprocess): + def fetch_the_tickers( + context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient + ): python_executable = shutil.which("python") assert python_executable is not None script_path = file_relative_path(__file__, "user_scripts/fetch_the_tickers.py") - subprocess_resource.run( + pipes_subprocess_client.run( command=[python_executable, script_path], context=context, extras={"tickers": tickers} ) diff --git a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/pure_assets_dsl/assets_dsl.py b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/pure_assets_dsl/assets_dsl.py index d343ee82e8c17..6a23f1707b4d7 100644 --- a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/pure_assets_dsl/assets_dsl.py +++ b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl/pure_assets_dsl/assets_dsl.py @@ -13,7 +13,7 @@ from yaml import Loader from dagster import AssetKey, asset -from dagster._core.pipes.subprocess import PipesSubprocess +from dagster._core.pipes.subprocess import PipesSubprocessClient def load_yaml(relative_path) -> Dict[str, Any]: @@ -39,13 +39,13 @@ def from_asset_entries(asset_entries: Dict[str, Any]) -> List[AssetsDefinition]: @asset(key=asset_key, deps=deps, description=description, group_name=group_name) def _assets_def( context: AssetExecutionContext, - subprocess_resource: PipesSubprocess, + pipes_subprocess_client: PipesSubprocessClient, ) -> None: # instead of querying a dummy client, do your real data processing here python_executable = shutil.which("python") assert python_executable is not None - subprocess_resource.run( + pipes_subprocess_client.run( command=[python_executable, file_relative_path(__file__, "sql_script.py"), sql], context=context, ) diff --git a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl_tests/test_assets_dsl.py b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl_tests/test_assets_dsl.py index 29f0904640b86..32136891fb052 100644 --- a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl_tests/test_assets_dsl.py +++ b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl_tests/test_assets_dsl.py @@ -5,7 +5,7 @@ from dagster import AssetsDefinition from dagster._core.definitions.events import AssetKey from dagster._core.execution.context.invocation import build_asset_context -from dagster._core.pipes.subprocess import PipesSubprocess +from dagster._core.pipes.subprocess import PipesSubprocessClient def assets_defs_from_yaml(yaml_string) -> List[AssetsDefinition]: @@ -69,7 +69,7 @@ def test_execution() -> None: assert assets_defs assert len(assets_defs) == 1 assets_def = assets_defs[0] - assets_def(context=build_asset_context(), subprocess_resource=PipesSubprocess()) + assets_def(context=build_asset_context(), pipes_subprocess_client=PipesSubprocessClient()) def test_basic_group() -> None: diff --git a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl_tests/test_stocks_dsl.py b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl_tests/test_stocks_dsl.py index a80012f879366..edda0f45e44bf 100644 --- a/examples/experimental/assets_yaml_dsl/assets_yaml_dsl_tests/test_stocks_dsl.py +++ b/examples/experimental/assets_yaml_dsl/assets_yaml_dsl_tests/test_stocks_dsl.py @@ -18,7 +18,7 @@ from assets_yaml_dsl.domain_specific_dsl.stocks_dsl import assets_defs_from_stock_assets from dagster import AssetKey from dagster._core.definitions import materialize -from dagster._core.pipes.subprocess import PipesSubprocess +from dagster._core.pipes.subprocess import PipesSubprocessClient from examples.experimental.assets_yaml_dsl.assets_yaml_dsl.domain_specific_dsl.stocks_dsl import ( build_stock_assets_object, ) @@ -55,5 +55,5 @@ def test_materialize_stocks_dsl(): stock_assets = build_stock_assets_object(stocks_dsl_document) assets_defs = assets_defs_from_stock_assets(stock_assets) assert materialize( - assets=assets_defs, resources={"subprocess_resource": PipesSubprocess()} + assets=assets_defs, resources={"pipes_subprocess_client": PipesSubprocessClient()} ).success diff --git a/python_modules/dagster-test/dagster_test/toys/external_execution/__init__.py b/python_modules/dagster-test/dagster_test/toys/external_execution/__init__.py index 87dd74b168427..1c17084de4c1d 100644 --- a/python_modules/dagster-test/dagster_test/toys/external_execution/__init__.py +++ b/python_modules/dagster-test/dagster_test/toys/external_execution/__init__.py @@ -3,7 +3,7 @@ from dagster import AssetExecutionContext, Config, Definitions, asset from dagster._core.pipes.subprocess import ( - PipesSubprocess, + PipesSubprocessClient, ) from pydantic import Field @@ -32,14 +32,22 @@ class NumberConfig(Config): @asset -def number_x(context: AssetExecutionContext, ext: PipesSubprocess, config: NumberConfig) -> None: +def number_x( + context: AssetExecutionContext, + pipes_subprocess_client: PipesSubprocessClient, + config: NumberConfig, +) -> None: extras = {**get_common_extras(context), "multiplier": config.multiplier} - ext.run(command_for_asset("number_x"), context=context, extras=extras) + pipes_subprocess_client.run(command_for_asset("number_x"), context=context, extras=extras) @asset -def number_y(context: AssetExecutionContext, ext: PipesSubprocess, config: NumberConfig): - ext.run( +def number_y( + context: AssetExecutionContext, + pipes_subprocess_client: PipesSubprocessClient, + config: NumberConfig, +): + pipes_subprocess_client.run( command_for_asset("number_y"), context=context, extras=get_common_extras(context), @@ -48,15 +56,21 @@ def number_y(context: AssetExecutionContext, ext: PipesSubprocess, config: Numbe @asset(deps=[number_x, number_y]) -def number_sum(context: AssetExecutionContext, ext: PipesSubprocess) -> None: - ext.run(command_for_asset("number_sum"), context=context, extras=get_common_extras(context)) +def number_sum( + context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient +) -> None: + pipes_subprocess_client.run( + command_for_asset("number_sum"), context=context, extras=get_common_extras(context) + ) -ext = PipesSubprocess( +pipes_subprocess_client = PipesSubprocessClient( env=get_env(), ) -defs = Definitions(assets=[number_x, number_y, number_sum], resources={"ext": ext}) +defs = Definitions( + assets=[number_x, number_y, number_sum], resources={"ext": pipes_subprocess_client} +) if __name__ == "__main__": from dagster import instance_for_test, materialize @@ -65,5 +79,5 @@ def number_sum(context: AssetExecutionContext, ext: PipesSubprocess) -> None: materialize( [number_x, number_y, number_sum], instance=instance, - resources={"ext": ext}, + resources={"ext": pipes_subprocess_client}, ) diff --git a/python_modules/dagster/dagster/_core/pipes/subprocess.py b/python_modules/dagster/dagster/_core/pipes/subprocess.py index 9730418b16db0..0be3b6e5e4dce 100644 --- a/python_modules/dagster/dagster/_core/pipes/subprocess.py +++ b/python_modules/dagster/dagster/_core/pipes/subprocess.py @@ -93,4 +93,4 @@ def run( yield from pipes_session.get_results() -PipesSubprocess = ResourceParam[_PipesSubprocess] +PipesSubprocessClient = ResourceParam[_PipesSubprocess] diff --git a/python_modules/dagster/dagster_tests/execution_tests/pipes_tests/test_subprocess.py b/python_modules/dagster/dagster_tests/execution_tests/pipes_tests/test_subprocess.py index 4d472135e135d..ab8a6d15b0cf6 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/pipes_tests/test_subprocess.py +++ b/python_modules/dagster/dagster_tests/execution_tests/pipes_tests/test_subprocess.py @@ -37,7 +37,7 @@ from dagster._core.execution.context.invocation import build_asset_context from dagster._core.instance_for_test import instance_for_test from dagster._core.pipes.subprocess import ( - PipesSubprocess, + PipesSubprocessClient, ) from dagster._core.pipes.utils import ( PipesEnvContextInjector, @@ -159,7 +159,7 @@ def test_ext_subprocess( assert False, "Unreachable" @asset(check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey(["foo"]))]) - def foo(context: AssetExecutionContext, ext: PipesSubprocess): + def foo(context: AssetExecutionContext, ext: PipesSubprocessClient): extras = {"bar": "baz"} cmd = [_PYTHON_EXECUTABLE, external_script] yield from ext.run( @@ -172,7 +172,9 @@ def foo(context: AssetExecutionContext, ext: PipesSubprocess): }, ) - resource = PipesSubprocess(context_injector=context_injector, message_reader=message_reader) + resource = PipesSubprocessClient( + context_injector=context_injector, message_reader=message_reader + ) with instance_for_test() as instance: materialize([foo], instance=instance, resources={"ext": resource}) @@ -207,13 +209,17 @@ def script_fn(): context.report_asset_materialization(data_version="alpha", asset_key="bar") @multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")]) - def foo_bar(context: AssetExecutionContext, ext: PipesSubprocess): + def foo_bar(context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient): with temp_script(script_fn) as script_path: cmd = [_PYTHON_EXECUTABLE, script_path] - yield from ext.run(cmd, context=context) + yield from pipes_subprocess_client.run(cmd, context=context) with instance_for_test() as instance: - materialize([foo_bar], instance=instance, resources={"ext": PipesSubprocess()}) + materialize( + [foo_bar], + instance=instance, + resources={"pipes_subprocess_client": PipesSubprocessClient()}, + ) foo_mat = instance.get_latest_materialization_event(AssetKey(["foo"])) assert foo_mat and foo_mat.asset_materialization assert foo_mat.asset_materialization.metadata["foo_meta"].value == "ok" @@ -249,16 +255,16 @@ def script_fn(): ) @asset - def foo(context: AssetExecutionContext, ext: PipesSubprocess): + def foo(context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient): with temp_script(script_fn) as script_path: cmd = [_PYTHON_EXECUTABLE, script_path] - yield from ext.run(cmd, context=context) + yield from pipes_subprocess_client.run(cmd, context=context) with instance_for_test() as instance: materialize( [foo], instance=instance, - resources={"ext": PipesSubprocess()}, + resources={"pipes_subprocess_client": PipesSubprocessClient()}, ) mat = instance.get_latest_materialization_event(foo.key) assert mat and mat.asset_materialization @@ -296,13 +302,13 @@ def script_fn(): raise Exception("foo") @asset - def foo(context: AssetExecutionContext, ext: PipesSubprocess): + def foo(context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient): with temp_script(script_fn) as script_path: cmd = [_PYTHON_EXECUTABLE, script_path] - yield from ext.run(cmd, context=context) + yield from pipes_subprocess_client.run(cmd, context=context) with pytest.raises(DagsterExternalExecutionError): - materialize([foo], resources={"ext": PipesSubprocess()}) + materialize([foo], resources={"pipes_subprocess_client": PipesSubprocessClient()}) def test_ext_asset_invocation(): @@ -313,12 +319,12 @@ def script_fn(): context.log("hello world") @asset - def foo(context: AssetExecutionContext, ext: PipesSubprocess): + def foo(context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient): with temp_script(script_fn) as script_path: cmd = [_PYTHON_EXECUTABLE, script_path] - ext.run(cmd, context=context) + pipes_subprocess_client.run(cmd, context=context) - foo(context=build_asset_context(), ext=PipesSubprocess()) + foo(context=build_asset_context(), pipes_subprocess_client=PipesSubprocessClient()) PATH_WITH_NONEXISTENT_DIR = "/tmp/does-not-exist/foo"