Replies: 6 comments 10 replies
-
Hi @sryza, Thanks for this, really useful feature! Just a sanity check, am I right in thinking that this won't work with multi-dimensional partitions due to the fact that they don't have partition ranges, but use subsets instead? Just wanting to make sure before I start throwing weird strings in to the tag to try and trick it in to working! |
Beta Was this translation helpful? Give feedback.
-
Thanks for providing this example! Is this warning expected when running a partition range using this method? This is with a custom IO manager and I don't get this warning when I run for a single partition.
|
Beta Was this translation helpful? Give feedback.
-
@sryza do they even support non contiguous ranges? |
Beta Was this translation helpful? Give feedback.
-
When using this feature, how does one handle AssetMaterializations and Outputs? Yield one output for each partition? One AssetMaterialization for each partition? |
Beta Was this translation helpful? Give feedback.
-
An example of how you could programmatically launch runs across a partition range, i.e. within a schedule:
|
Beta Was this translation helpful? Give feedback.
-
An example of a project that implements this capability, specifically for two assets: hourly_partitions = HourlyPartitionsDefinition(
start_date="2023-04-11-00:00"
)
def _hourly_partition_seq(start, end):
start = pd.to_datetime(start)
end = pd.to_datetime(end)
hourly_diffs = int((end - start) / timedelta(hours=1))
return [str(start + timedelta(hours=i)) for i in range(hourly_diffs)]
@asset(
compute_kind="api",
required_resource_keys={"data_api"},
partitions_def=hourly_partitions,
metadata={"partition_expr": "created_at"},
)
def users(context: OpExecutionContext) -> pd.DataFrame:
"""A table containing all users data"""
api = context.resources.data_api
# during a backfill the partition range will span multiple hours
# during a single run the partition range will be for a single hour
first_partition, last_partition = context.asset_partitions_time_window_for_output()
partition_seq = _hourly_partition_seq(first_partition, last_partition)
all_users = []
for partition in partition_seq:
resp = api.get_users(partition)
users = pd.read_json(resp.json())
all_users.append(users)
return pd.concat(all_users) In this example the asset is written so that it always uses The example also includes a unit test to ensure the asset works whether it is called with a single partition (regular incremental runs) or a backfill (multiple partitions supplied in one run): https://github.com/dagster-io/hooli-data-eng-pipelines/blob/master/hooli_data_eng_tests/test_assets.py |
Beta Was this translation helpful? Give feedback.
-
Refer to the Dagster documentation: https://docs.dagster.io/concepts/partitions-schedules-sensors/backfills#single-run-backfills
Beta Was this translation helpful? Give feedback.
All reactions