Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

partition methods on AssetExecutionContext #16625

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
bdf4d78
start interface to begin discussion
jamiedemaria Sep 13, 2023
712da7d
revert trying to install dagster ext
jamiedemaria Sep 21, 2023
1a84005
dagster ext dep attempt 2
jamiedemaria Sep 21, 2023
e77fce0
revert
jamiedemaria Sep 21, 2023
9b7a187
pr comments, cleanup
jamiedemaria Sep 22, 2023
819d90c
test fix
jamiedemaria Sep 22, 2023
c278d0a
fix conflict
jamiedemaria Sep 22, 2023
65b6c1e
fix conflixt
jamiedemaria Sep 22, 2023
b89018e
subclass the context
jamiedemaria Sep 18, 2023
09c0e84
collapse lines
jamiedemaria Sep 18, 2023
c289ff9
lines
jamiedemaria Sep 18, 2023
eb04828
update context with methods found during testing
jamiedemaria Sep 18, 2023
5e7855e
change how instance check works
jamiedemaria Sep 18, 2023
57dae89
remove unneeded comments
jamiedemaria Sep 19, 2023
c9c3fd9
update step execution context methods
jamiedemaria Sep 19, 2023
0e37585
small pr comments
jamiedemaria Sep 20, 2023
ecdefc1
small changes from testing
jamiedemaria Sep 20, 2023
11273c8
un-deprecate partition methods for the moment
jamiedemaria Sep 20, 2023
d96360c
make asset provenance and code version populate on init
jamiedemaria Sep 20, 2023
5f5eae7
update check specs method
jamiedemaria Sep 20, 2023
bf29203
remove public marker on deprecated methods
jamiedemaria Sep 20, 2023
519576d
tiny cleanup
jamiedemaria Sep 20, 2023
38ed7f5
disable experimental warning
jamiedemaria Sep 20, 2023
f55bc05
add context interface to asset context
jamiedemaria Sep 21, 2023
9ca0fbf
remove interface for now
jamiedemaria Sep 21, 2023
89f7433
update with interface changes
jamiedemaria Sep 22, 2023
d01d5d2
add interface back
jamiedemaria Sep 22, 2023
b129fbf
update selected asset keys to asset keys
jamiedemaria Sep 22, 2023
2e71fd1
random bk
jamiedemaria Sep 22, 2023
fded8b2
add partition methods
jamiedemaria Sep 19, 2023
737c656
small updates and testing
jamiedemaria Sep 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ def build_sql_schema_check_steps() -> List[CommandStep]:
return [
CommandStepBuilder(":mysql: mysql-schema")
.on_test_image(AvailablePythonVersion.get_default())
.run("pip install -e python_modules/dagster", "python scripts/check_schemas.py")
.run(
"pip install -e python_modules/dagster -e python_modules/dagster-ext",
"python scripts/check_schemas.py",
)
.with_skip(skip_mysql_if_no_changes_to_dependencies(["dagster"]))
.build()
]
Expand All @@ -151,8 +154,8 @@ def build_graphql_python_client_backcompat_steps() -> List[CommandStep]:
CommandStepBuilder(":graphql: GraphQL Python Client backcompat")
.on_test_image(AvailablePythonVersion.get_default())
.run(
"pip install -e python_modules/dagster[test] -e python_modules/dagster-graphql -e"
" python_modules/automation",
"pip install -e python_modules/dagster[test] -e python_modules/dagster-graphql -e "
" python_modules/automation -e python_modules/dagster-ext",
"dagster-graphql-client query check",
)
.with_skip(
Expand Down
68 changes: 66 additions & 2 deletions python_modules/dagster-ext/dagster_ext/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,70 @@ def make_channel(
# ########################


class IContext(ABC):
"""Base class for asset context implemented by AssetExecutionContext and ExtContext."""

@property
@abstractmethod
def asset_key(self):
"""The AssetKey for the asset being materialized. If no asset is being materialized, errors. If
multiple assets are being materialized (as in a @multi_asset), errors.
"""

@property
@abstractmethod
def asset_keys(self):
"""The AssetKeys for the asset being materialized. If no asset is being materialized, errors."""

@property
@abstractmethod
def provenance(self):
"""The data provenance for the asset being materialized. If no asset is being materialized, errors. If
multiple assets are being materialized (as in a @multi_asset), errors.
"""

@property
@abstractmethod
def provenance_by_asset_key(self):
"""A dictionary of data provenance for the assets being materialized, keyed by asset key.
If no asset is being materialized, errors.
"""

@property
@abstractmethod
def code_version(self):
"""The code version for the asset being materialized. If no asset is being materialized, errors. If
multiple assets are being materialized (as in a @multi_asset), errors.
"""

@property
@abstractmethod
def code_version_by_asset_key(self):
"""A dictionary of code versions for the assets being materialized, keyed by asset key.
If no asset is being materialized, errors.
"""

@property
@abstractmethod
def is_partitioned(self) -> bool:
"""True if the current execution is partitioned."""

@property
@abstractmethod
def run_id(self) -> str:
"""The run id of the current execution."""

@property
@abstractmethod
def job_name(self) -> Optional[str]:
"""The name of the job that is executing."""

@property
@abstractmethod
def retry_number(self) -> int:
"""The number of retries of this execution."""


def init_dagster_ext(
*,
context_loader: Optional[ExtContextLoader] = None,
Expand Down Expand Up @@ -648,7 +712,7 @@ def init_dagster_ext(
return context


class ExtContext:
class ExtContext(IContext):
_instance: ClassVar[Optional["ExtContext"]] = None

@classmethod
Expand Down Expand Up @@ -730,7 +794,7 @@ def code_version_by_asset_key(self) -> Mapping[str, Optional[str]]:
return code_version_by_asset_key

@property
def is_partition_step(self) -> bool:
def is_partitioned(self) -> bool:
return self._data["partition_key_range"] is not None

@property
Expand Down
6 changes: 3 additions & 3 deletions python_modules/dagster-ext/dagster_ext_tests/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def test_multi_asset_context():
def test_no_partition_context():
context = _make_external_execution_context()

assert not context.is_partition_step
assert not context.is_partitioned
_assert_undefined(context, "partition_key")
_assert_undefined(context, "partition_key_range")
_assert_undefined(context, "partition_time_window")
Expand All @@ -135,7 +135,7 @@ def test_single_partition_context():
partition_time_window=None,
)

assert context.is_partition_step
assert context.is_partitioned
assert context.partition_key == "foo"
assert context.partition_key_range == partition_key_range
assert context.partition_time_window is None
Expand All @@ -151,7 +151,7 @@ def test_multiple_partition_context():
partition_time_window=time_window,
)

assert context.is_partition_step
assert context.is_partitioned
_assert_undefined(context, "partition_key")
assert context.partition_key_range == partition_key_range
assert context.partition_time_window == time_window
Expand Down
Loading