Skip to content

Commit

Permalink
[1/n] Enable materializing ranges of partitions using the asset CLI (#…
Browse files Browse the repository at this point in the history
…24303)

## Summary & Motivation
Enable materializing ranges of partitions using the asset CLI. This PR
only handles the case for a single run backfill policy.

## How I Tested These Changes
Added tests in
`python_modules/dagster/dagster_tests/cli_tests/command_tests/assets.py`

## Changelog
Enable materializing ranges of partitions using the asset CLI

- [x] `NEW` _(added new feature or capability)_
- [ ] `BUGFIX` _(fixed a bug)_
- [ ] `DOCS` _(added or updated documentation)_
  • Loading branch information
briantu authored Sep 12, 2024
1 parent 4c0a91f commit 431964d
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 2 deletions.
51 changes: 50 additions & 1 deletion python_modules/dagster/dagster/_cli/asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,16 @@
python_origin_target_argument,
)
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.backfill_policy import BackfillPolicyType
from dagster._core.definitions.events import AssetKey
from dagster._core.errors import DagsterInvalidSubsetError, DagsterUnknownPartitionError
from dagster._core.execution.api import execute_job
from dagster._core.instance import DagsterInstance
from dagster._core.origin import JobPythonOrigin
from dagster._core.storage.tags import (
ASSET_PARTITION_RANGE_END_TAG,
ASSET_PARTITION_RANGE_START_TAG,
)
from dagster._core.telemetry import telemetry_wrapper
from dagster._utils.hosted_user_process import recon_job_from_origin, recon_repository_from_origin
from dagster._utils.interrupts import capture_interrupts
Expand All @@ -28,6 +33,11 @@ def asset_cli():
@python_origin_target_argument
@click.option("--select", help="Asset selection to target", required=True)
@click.option("--partition", help="Asset partition to target", required=False)
@click.option(
"--partition-range",
help="Asset partition range to target i.e. <start>...<end>",
required=False,
)
def asset_materialize_command(**kwargs):
with capture_interrupts():
with get_possibly_temporary_instance_for_cli(
Expand Down Expand Up @@ -59,6 +69,11 @@ def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str,
JobPythonOrigin(implicit_job_def.name, repository_origin=repository_origin)
)
partition = kwargs.get("partition")
partition_range = kwargs.get("partition_range")

if partition and partition_range:
check.failed("Cannot specify both --partition and --partition-range options. Use only one.")

if partition:
if all(
implicit_job_def.asset_layer.get(asset_key).partitions_def is None
Expand All @@ -78,7 +93,41 @@ def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str,
"All selected assets must have a PartitionsDefinition containing the passed"
f" partition key `{partition}` or have no PartitionsDefinition."
)

elif partition_range:
if len(partition_range.split("...")) != 2:
check.failed("Invalid partition range format. Expected <start>...<end>.")

partition_range_start, partition_range_end = partition_range.split("...")

for asset_key in asset_keys:
backfill_policy = implicit_job_def.asset_layer.get(asset_key).backfill_policy
if (
backfill_policy is not None
and backfill_policy.policy_type != BackfillPolicyType.SINGLE_RUN
):
check.failed(
"Provided partition range, but not all assets have a single-run backfill policy."
)
try:
implicit_job_def.validate_partition_key(
partition_range_start,
selected_asset_keys=asset_keys,
dynamic_partitions_store=instance,
)
implicit_job_def.validate_partition_key(
check.not_none(partition_range_end),
selected_asset_keys=asset_keys,
dynamic_partitions_store=instance,
)
except DagsterUnknownPartitionError:
raise DagsterInvalidSubsetError(
"All selected assets must have a PartitionsDefinition containing the passed"
f" partition key `{partition_range_start}` or have no PartitionsDefinition."
)
tags = {
ASSET_PARTITION_RANGE_START_TAG: partition_range_start,
ASSET_PARTITION_RANGE_END_TAG: partition_range_end,
}
else:
if any(
implicit_job_def.asset_layer.get(asset_key).partitions_def is not None
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
from dagster import StaticPartitionsDefinition, asset
from dagster._core.definitions.backfill_policy import BackfillPolicy
from dagster._core.definitions.time_window_partitions import DailyPartitionsDefinition


@asset
Expand All @@ -21,6 +23,20 @@ def partitioned_asset() -> None: ...
def differently_partitioned_asset() -> None: ...


@asset(
partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"),
backfill_policy=BackfillPolicy.single_run(),
)
def single_run_partitioned_asset() -> None: ...


@asset(
partitions_def=DailyPartitionsDefinition(start_date="2020-01-01"),
backfill_policy=BackfillPolicy.multi_run(),
)
def multi_run_partitioned_asset() -> None: ...


@asset
def fail_asset() -> None:
raise Exception("failure")
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ def test_no_selection():
"differently_partitioned_asset",
"downstream_asset",
"fail_asset",
"multi_run_partitioned_asset",
"partitioned_asset",
"single_run_partitioned_asset",
"some/key/prefix/asset_with_prefix",
]
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@
from dagster._utils import file_relative_path


def invoke_materialize(select: str, partition: Optional[str] = None):
def invoke_materialize(
select: str,
partition: Optional[str] = None,
partition_range: Optional[str] = None,
):
runner = CliRunner()
options = ["-f", file_relative_path(__file__, "assets.py"), "--select", select]
if partition:
options.extend(["--partition", partition])
if partition_range:
options.extend(["--partition-range", partition_range])
return runner.invoke(asset_materialize_command, options)


Expand Down Expand Up @@ -122,6 +128,52 @@ def test_conflicting_partitions():
)


def test_partition_and_partition_range_options():
with instance_for_test():
result = invoke_materialize(
"single_run_partitioned_asset",
partition="2020-01-01",
partition_range="2020-01-01...2020-01-03",
)
assert (
"Cannot specify both --partition and --partition-range options. Use only one."
in str(result.exception)
)


def test_partition_range_invalid_format():
with instance_for_test():
result = invoke_materialize(
"single_run_partitioned_asset",
partition_range="2020-01-01",
)
assert "Invalid partition range format. Expected <start>...<end>." in str(result.exception)


def test_partition_range_single_run_backfill_policy():
with instance_for_test() as instance:
result = invoke_materialize(
"single_run_partitioned_asset",
partition_range="2020-01-01...2020-01-03",
)
assert "RUN_SUCCESS" in result.output
partitions = instance.get_materialized_partitions(AssetKey("single_run_partitioned_asset"))
for partition in ["2020-01-01", "2020-01-02", "2020-01-03"]:
assert partition in partitions


def test_partition_range_multi_run_backfill_policy():
with instance_for_test():
result = invoke_materialize(
"multi_run_partitioned_asset",
partition_range="2020-01-01...2020-01-03",
)
assert (
"Provided partition range, but not all assets have a single-run backfill policy."
in str(result.exception)
)


def test_failure():
result = invoke_materialize("fail_asset")
assert result.exit_code == 1

0 comments on commit 431964d

Please sign in to comment.