Skip to content

Commit

Permalink
PipesSubprocess --> PipesSubprocessClient (#16813)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Renaming `PipesSubprocess` to `PipesSubprocessClient` in line with the
naming conventions of the other `PipesClient` classes. Flagged prior PR
and followed up on here.

## How I Tested These Changes
  • Loading branch information
schrockn authored Sep 27, 2023
1 parent 0ca6823 commit c0dbfb5
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
from dagster import Definitions
from dagster._core.pipes.subprocess import PipesSubprocess
from dagster._core.pipes.subprocess import PipesSubprocessClient

from .domain_specific_dsl.stocks_dsl import get_stocks_dsl_example_defs
from .pure_assets_dsl.assets_dsl import get_asset_dsl_example_defs

defs = Definitions(
assets=get_asset_dsl_example_defs() + get_stocks_dsl_example_defs(),
resources={"subprocess_resource": PipesSubprocess()},
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

from dagster import AssetKey, AssetsDefinition, asset, file_relative_path, multi_asset
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.pipes.subprocess import PipesSubprocess
from dagster._core.pipes.subprocess import PipesSubprocessClient


def load_yaml(relative_path: str) -> Dict[str, Any]:
Expand Down Expand Up @@ -86,11 +86,13 @@ def spec_for_stock_info(stock_info: StockInfo) -> AssetSpec:
ticker_specs = [spec_for_stock_info(stock_info) for stock_info in stock_assets.stock_infos]

@multi_asset(specs=ticker_specs)
def fetch_the_tickers(context: AssetExecutionContext, subprocess_resource: PipesSubprocess):
def fetch_the_tickers(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
):
python_executable = shutil.which("python")
assert python_executable is not None
script_path = file_relative_path(__file__, "user_scripts/fetch_the_tickers.py")
subprocess_resource.run(
pipes_subprocess_client.run(
command=[python_executable, script_path], context=context, extras={"tickers": tickers}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from yaml import Loader

from dagster import AssetKey, asset
from dagster._core.pipes.subprocess import PipesSubprocess
from dagster._core.pipes.subprocess import PipesSubprocessClient


def load_yaml(relative_path) -> Dict[str, Any]:
Expand All @@ -39,13 +39,13 @@ def from_asset_entries(asset_entries: Dict[str, Any]) -> List[AssetsDefinition]:
@asset(key=asset_key, deps=deps, description=description, group_name=group_name)
def _assets_def(
context: AssetExecutionContext,
subprocess_resource: PipesSubprocess,
pipes_subprocess_client: PipesSubprocessClient,
) -> None:
# instead of querying a dummy client, do your real data processing here

python_executable = shutil.which("python")
assert python_executable is not None
subprocess_resource.run(
pipes_subprocess_client.run(
command=[python_executable, file_relative_path(__file__, "sql_script.py"), sql],
context=context,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from dagster import AssetsDefinition
from dagster._core.definitions.events import AssetKey
from dagster._core.execution.context.invocation import build_asset_context
from dagster._core.pipes.subprocess import PipesSubprocess
from dagster._core.pipes.subprocess import PipesSubprocessClient


def assets_defs_from_yaml(yaml_string) -> List[AssetsDefinition]:
Expand Down Expand Up @@ -69,7 +69,7 @@ def test_execution() -> None:
assert assets_defs
assert len(assets_defs) == 1
assets_def = assets_defs[0]
assets_def(context=build_asset_context(), subprocess_resource=PipesSubprocess())
assets_def(context=build_asset_context(), pipes_subprocess_client=PipesSubprocessClient())


def test_basic_group() -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from assets_yaml_dsl.domain_specific_dsl.stocks_dsl import assets_defs_from_stock_assets
from dagster import AssetKey
from dagster._core.definitions import materialize
from dagster._core.pipes.subprocess import PipesSubprocess
from dagster._core.pipes.subprocess import PipesSubprocessClient
from examples.experimental.assets_yaml_dsl.assets_yaml_dsl.domain_specific_dsl.stocks_dsl import (
build_stock_assets_object,
)
Expand Down Expand Up @@ -55,5 +55,5 @@ def test_materialize_stocks_dsl():
stock_assets = build_stock_assets_object(stocks_dsl_document)
assets_defs = assets_defs_from_stock_assets(stock_assets)
assert materialize(
assets=assets_defs, resources={"subprocess_resource": PipesSubprocess()}
assets=assets_defs, resources={"pipes_subprocess_client": PipesSubprocessClient()}
).success
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from dagster import AssetExecutionContext, Config, Definitions, asset
from dagster._core.pipes.subprocess import (
PipesSubprocess,
PipesSubprocessClient,
)
from pydantic import Field

Expand Down Expand Up @@ -32,14 +32,22 @@ class NumberConfig(Config):


@asset
def number_x(context: AssetExecutionContext, ext: PipesSubprocess, config: NumberConfig) -> None:
def number_x(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
config: NumberConfig,
) -> None:
extras = {**get_common_extras(context), "multiplier": config.multiplier}
ext.run(command_for_asset("number_x"), context=context, extras=extras)
pipes_subprocess_client.run(command_for_asset("number_x"), context=context, extras=extras)


@asset
def number_y(context: AssetExecutionContext, ext: PipesSubprocess, config: NumberConfig):
ext.run(
def number_y(
context: AssetExecutionContext,
pipes_subprocess_client: PipesSubprocessClient,
config: NumberConfig,
):
pipes_subprocess_client.run(
command_for_asset("number_y"),
context=context,
extras=get_common_extras(context),
Expand All @@ -48,15 +56,21 @@ def number_y(context: AssetExecutionContext, ext: PipesSubprocess, config: Numbe


@asset(deps=[number_x, number_y])
def number_sum(context: AssetExecutionContext, ext: PipesSubprocess) -> None:
ext.run(command_for_asset("number_sum"), context=context, extras=get_common_extras(context))
def number_sum(
context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient
) -> None:
pipes_subprocess_client.run(
command_for_asset("number_sum"), context=context, extras=get_common_extras(context)
)


ext = PipesSubprocess(
pipes_subprocess_client = PipesSubprocessClient(
env=get_env(),
)

defs = Definitions(assets=[number_x, number_y, number_sum], resources={"ext": ext})
defs = Definitions(
assets=[number_x, number_y, number_sum], resources={"ext": pipes_subprocess_client}
)

if __name__ == "__main__":
from dagster import instance_for_test, materialize
Expand All @@ -65,5 +79,5 @@ def number_sum(context: AssetExecutionContext, ext: PipesSubprocess) -> None:
materialize(
[number_x, number_y, number_sum],
instance=instance,
resources={"ext": ext},
resources={"ext": pipes_subprocess_client},
)
2 changes: 1 addition & 1 deletion python_modules/dagster/dagster/_core/pipes/subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,4 @@ def run(
yield from pipes_session.get_results()


PipesSubprocess = ResourceParam[_PipesSubprocess]
PipesSubprocessClient = ResourceParam[_PipesSubprocess]
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from dagster._core.execution.context.invocation import build_asset_context
from dagster._core.instance_for_test import instance_for_test
from dagster._core.pipes.subprocess import (
PipesSubprocess,
PipesSubprocessClient,
)
from dagster._core.pipes.utils import (
PipesEnvContextInjector,
Expand Down Expand Up @@ -159,7 +159,7 @@ def test_ext_subprocess(
assert False, "Unreachable"

@asset(check_specs=[AssetCheckSpec(name="foo_check", asset=AssetKey(["foo"]))])
def foo(context: AssetExecutionContext, ext: PipesSubprocess):
def foo(context: AssetExecutionContext, ext: PipesSubprocessClient):
extras = {"bar": "baz"}
cmd = [_PYTHON_EXECUTABLE, external_script]
yield from ext.run(
Expand All @@ -172,7 +172,9 @@ def foo(context: AssetExecutionContext, ext: PipesSubprocess):
},
)

resource = PipesSubprocess(context_injector=context_injector, message_reader=message_reader)
resource = PipesSubprocessClient(
context_injector=context_injector, message_reader=message_reader
)

with instance_for_test() as instance:
materialize([foo], instance=instance, resources={"ext": resource})
Expand Down Expand Up @@ -207,13 +209,17 @@ def script_fn():
context.report_asset_materialization(data_version="alpha", asset_key="bar")

@multi_asset(specs=[AssetSpec("foo"), AssetSpec("bar")])
def foo_bar(context: AssetExecutionContext, ext: PipesSubprocess):
def foo_bar(context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
yield from ext.run(cmd, context=context)
yield from pipes_subprocess_client.run(cmd, context=context)

with instance_for_test() as instance:
materialize([foo_bar], instance=instance, resources={"ext": PipesSubprocess()})
materialize(
[foo_bar],
instance=instance,
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
foo_mat = instance.get_latest_materialization_event(AssetKey(["foo"]))
assert foo_mat and foo_mat.asset_materialization
assert foo_mat.asset_materialization.metadata["foo_meta"].value == "ok"
Expand Down Expand Up @@ -249,16 +255,16 @@ def script_fn():
)

@asset
def foo(context: AssetExecutionContext, ext: PipesSubprocess):
def foo(context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
yield from ext.run(cmd, context=context)
yield from pipes_subprocess_client.run(cmd, context=context)

with instance_for_test() as instance:
materialize(
[foo],
instance=instance,
resources={"ext": PipesSubprocess()},
resources={"pipes_subprocess_client": PipesSubprocessClient()},
)
mat = instance.get_latest_materialization_event(foo.key)
assert mat and mat.asset_materialization
Expand Down Expand Up @@ -296,13 +302,13 @@ def script_fn():
raise Exception("foo")

@asset
def foo(context: AssetExecutionContext, ext: PipesSubprocess):
def foo(context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
yield from ext.run(cmd, context=context)
yield from pipes_subprocess_client.run(cmd, context=context)

with pytest.raises(DagsterExternalExecutionError):
materialize([foo], resources={"ext": PipesSubprocess()})
materialize([foo], resources={"pipes_subprocess_client": PipesSubprocessClient()})


def test_ext_asset_invocation():
Expand All @@ -313,12 +319,12 @@ def script_fn():
context.log("hello world")

@asset
def foo(context: AssetExecutionContext, ext: PipesSubprocess):
def foo(context: AssetExecutionContext, pipes_subprocess_client: PipesSubprocessClient):
with temp_script(script_fn) as script_path:
cmd = [_PYTHON_EXECUTABLE, script_path]
ext.run(cmd, context=context)
pipes_subprocess_client.run(cmd, context=context)

foo(context=build_asset_context(), ext=PipesSubprocess())
foo(context=build_asset_context(), pipes_subprocess_client=PipesSubprocessClient())


PATH_WITH_NONEXISTENT_DIR = "/tmp/does-not-exist/foo"
Expand Down

0 comments on commit c0dbfb5

Please sign in to comment.