Skip to content

Commit

Permalink
update all assets invoked with build_op_context to use build_asset_co…
Browse files Browse the repository at this point in the history
…ntext
  • Loading branch information
jamiedemaria committed Sep 21, 2023
1 parent 02e1483 commit 8ef0346
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import pandas as pd
from dagster import build_op_context
from dagster import build_asset_context

from development_to_production.assets import comments, items, stories
from development_to_production.resources import StubHNClient


def test_items():
context = build_op_context(
context = build_asset_context(
resources={"hn_client": StubHNClient()}, op_config={"N": StubHNClient().fetch_max_item_id()}
)
hn_dataset = items(context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
Out,
op,
graph,
build_op_context,
)


Expand Down Expand Up @@ -330,7 +331,7 @@ def test_asset_with_inputs():


# start_test_resource_asset
from dagster import asset, ConfigurableResource, build_op_context, with_resources
from dagster import asset, ConfigurableResource


class BarResource(ConfigurableResource):
Expand All @@ -351,7 +352,7 @@ def test_asset_requires_bar():


# start_test_config_asset
from dagster import asset, Config, build_op_context
from dagster import asset, Config


class MyAssetConfig(Config):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pandas as pd

from dagster import build_op_context
from dagster import build_asset_context

from .assets_v2 import items
from .resources.resources_v2 import StubHNClient
Expand All @@ -10,7 +10,7 @@


def test_items():
context = build_op_context(
context = build_asset_context(
resources={"hn_client": StubHNClient()},
op_config={"N": StubHNClient().fetch_max_item_id()},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import mock
import pytest

from dagster import build_init_resource_context, build_op_context
from dagster._core.definitions.run_config import RunConfig
from dagster._core.errors import DagsterInvalidConfigError

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
import pytest
from dagster import (
ConfigurableResource,
asset,
op,
)
from dagster import AssetExecutionContext, ConfigurableResource, OpExecutionContext, asset, op
from dagster._core.errors import (
DagsterInvalidInvocationError,
)
from dagster._core.execution.context.invocation import build_op_context
from dagster._core.execution.context.invocation import build_asset_context, build_op_context


def test_direct_op_invocation() -> None:
class MyResource(ConfigurableResource):
a_str: str

@op
def my_op(context, my_resource: MyResource) -> str:
def my_op(context: OpExecutionContext, my_resource: MyResource) -> str:
assert my_resource.a_str == "foo"
return my_resource.a_str

Expand Down Expand Up @@ -61,7 +57,9 @@ class MyResource(ConfigurableResource):
a_str: str

@op
def my_op(context, my_resource: MyResource, my_other_resource: MyResource) -> str:
def my_op(
context: OpExecutionContext, my_resource: MyResource, my_other_resource: MyResource
) -> str:
assert my_resource.a_str == "foo"
assert my_other_resource.a_str == "bar"
return my_resource.a_str
Expand Down Expand Up @@ -122,7 +120,9 @@ class MyResource(ConfigurableResource):
z: int

@op
def my_wacky_addition_op(context, x: int, y: int, my_resource: MyResource) -> int:
def my_wacky_addition_op(
context: OpExecutionContext, x: int, y: int, my_resource: MyResource
) -> int:
return x + y + my_resource.z

# Just providing context is ok, we'll use the resource from the context
Expand Down Expand Up @@ -192,12 +192,14 @@ class MyResource(ConfigurableResource):
a_str: str

@asset
def my_asset(context, my_resource: MyResource) -> str:
def my_asset(context: AssetExecutionContext, my_resource: MyResource) -> str:
assert my_resource.a_str == "foo"
return my_resource.a_str

# Just providing context is ok, we'll use the resource from the context
assert my_asset(build_op_context(resources={"my_resource": MyResource(a_str="foo")})) == "foo"
assert (
my_asset(build_asset_context(resources={"my_resource": MyResource(a_str="foo")})) == "foo"
)

# Providing both context and resource is not ok, because we don't know which one to use
with pytest.raises(
Expand All @@ -206,17 +208,17 @@ def my_asset(context, my_resource: MyResource) -> str:
):
assert (
my_asset(
context=build_op_context(resources={"my_resource": MyResource(a_str="foo")}),
context=build_asset_context(resources={"my_resource": MyResource(a_str="foo")}),
my_resource=MyResource(a_str="foo"),
)
== "foo"
)

# Providing resource only as kwarg is ok, we'll use that (we still need a context though)
assert my_asset(context=build_op_context(), my_resource=MyResource(a_str="foo")) == "foo"
assert my_asset(context=build_asset_context(), my_resource=MyResource(a_str="foo")) == "foo"

# Providing resource as arg is ok, we'll use that (we still need a context though)
assert my_asset(build_op_context(), MyResource(a_str="foo")) == "foo"
assert my_asset(build_asset_context(), MyResource(a_str="foo")) == "foo"

@asset
def my_asset_no_context(my_resource: MyResource) -> str:
Expand All @@ -225,7 +227,7 @@ def my_asset_no_context(my_resource: MyResource) -> str:

# Providing context is ok, we just discard it and use the resource from the context
assert (
my_asset_no_context(build_op_context(resources={"my_resource": MyResource(a_str="foo")}))
my_asset_no_context(build_asset_context(resources={"my_resource": MyResource(a_str="foo")}))
== "foo"
)

Expand All @@ -238,28 +240,36 @@ class MyResource(ConfigurableResource):
z: int

@asset
def my_wacky_addition_asset(context, x: int, y: int, my_resource: MyResource) -> int:
def my_wacky_addition_asset(
context: AssetExecutionContext, x: int, y: int, my_resource: MyResource
) -> int:
return x + y + my_resource.z

# Just providing context is ok, we'll use the resource from the context
# We are successfully able to input x and y as args
assert (
my_wacky_addition_asset(build_op_context(resources={"my_resource": MyResource(z=2)}), 4, 5)
my_wacky_addition_asset(
build_asset_context(resources={"my_resource": MyResource(z=2)}), 4, 5
)
== 11
)
# We can also input x and y as kwargs
assert (
my_wacky_addition_asset(
build_op_context(resources={"my_resource": MyResource(z=3)}), y=1, x=2
build_asset_context(resources={"my_resource": MyResource(z=3)}), y=1, x=2
)
== 6
)

# Providing resource only as kwarg is ok, we'll use that (we still need a context though)
# We can input x and y as args
assert my_wacky_addition_asset(build_op_context(), 10, 20, my_resource=MyResource(z=30)) == 60
assert (
my_wacky_addition_asset(build_asset_context(), 10, 20, my_resource=MyResource(z=30)) == 60
)
# We can also input x and y as kwargs in this case
assert my_wacky_addition_asset(build_op_context(), y=1, x=2, my_resource=MyResource(z=3)) == 6
assert (
my_wacky_addition_asset(build_asset_context(), y=1, x=2, my_resource=MyResource(z=3)) == 6
)

@asset
def my_wacky_addition_asset_no_context(x: int, y: int, my_resource: MyResource) -> int:
Expand All @@ -269,14 +279,14 @@ def my_wacky_addition_asset_no_context(x: int, y: int, my_resource: MyResource)
# We can input x and y as args
assert (
my_wacky_addition_asset_no_context(
build_op_context(resources={"my_resource": MyResource(z=2)}), 4, 5
build_asset_context(resources={"my_resource": MyResource(z=2)}), 4, 5
)
== 11
)
# We can also input x and y as kwargs
assert (
my_wacky_addition_asset_no_context(
build_op_context(resources={"my_resource": MyResource(z=3)}), y=1, x=2
build_asset_context(resources={"my_resource": MyResource(z=3)}), y=1, x=2
)
== 6
)
Expand Down Expand Up @@ -387,27 +397,29 @@ class NumResource(ConfigurableResource):
executed = {}

@asset
def an_asset(context, my_resource: NumResource, my_other_resource: NumResource) -> None:
def an_asset(
context: AssetExecutionContext, my_resource: NumResource, my_other_resource: NumResource
) -> None:
assert context.resources.my_resource.num == 1
assert context.resources.my_other_resource.num == 2
assert my_resource.num == 1
assert my_other_resource.num == 2
executed["yes"] = True

an_asset(build_op_context(), NumResource(num=1), NumResource(num=2))
an_asset(build_asset_context(), NumResource(num=1), NumResource(num=2))
assert executed["yes"]
executed.clear()

an_asset(
build_op_context(), my_resource=NumResource(num=1), my_other_resource=NumResource(num=2)
build_asset_context(), my_resource=NumResource(num=1), my_other_resource=NumResource(num=2)
)
assert executed["yes"]
executed.clear()

an_asset(
my_other_resource=NumResource(num=2),
my_resource=NumResource(num=1),
context=build_op_context(),
context=build_asset_context(),
)
assert executed["yes"]
executed.clear()
Original file line number Diff line number Diff line change
Expand Up @@ -1240,7 +1240,7 @@ def my_asset(context):
def non_asset_op(context):
context.assets_def # noqa: B018

with build_op_context(
with build_asset_context(
partition_key="2023-02-02",
) as context:
my_asset(context)
Expand All @@ -1260,7 +1260,7 @@ def partitioned_asset(context):
assert start == pendulum.instance(datetime(2023, 2, 2), tz=partitions_def.timezone)
assert end == pendulum.instance(datetime(2023, 2, 3), tz=partitions_def.timezone)

context = build_op_context(
context = build_asset_context(
partition_key="2023-02-02",
)
partitioned_asset(context)
Expand Down Expand Up @@ -1289,7 +1289,7 @@ def my_asset(context):
assert context.asset_partitions_time_window_for_output() == time_window
return 1

context = build_op_context(
context = build_asset_context(
partition_key="2020-01-01|a",
)
my_asset(context)
Expand All @@ -1309,7 +1309,7 @@ def static_multipartitioned_asset(context):
):
context.asset_partitions_time_window_for_output()

context = build_op_context(
context = build_asset_context(
partition_key="a|a",
)
static_multipartitioned_asset(context)
Expand All @@ -1323,7 +1323,7 @@ def foo(context: AssetExecutionContext):
keys = partitions_def.get_partition_keys_in_range(context.partition_key_range)
return {k: True for k in keys}

context = build_op_context(
context = build_asset_context(
partition_key_range=PartitionKeyRange("2023-01-01", "2023-01-02"),
)
assert foo(context) == {"2023-01-01": True, "2023-01-02": True}
Expand Down

0 comments on commit 8ef0346

Please sign in to comment.