Loading or adding assets and partitions at run time from config #16524
Replies: 3 comments 9 replies
-
Hi, |
Beta Was this translation helpful? Give feedback.
-
Hey @dmsfabiano - not sure how this discussion slipped through the cracks. I think I've wrapped my head around part of what you're asking, but still trying to understand it completely If you want to apply the same logic - e.g. @asset(partitions_def=my_dynamic_partitions_def)
def datasets_without_nans(context):
raw_dataset_for_partition = get_raw_dataset_for_partition(partition=context.partition_key)
write_dataset_without_nans(remove_nans(raw_dataset_for_partition), partition=context.partition_key) Are you looking to have a more complicated dependency graph than this though? E.g. is the idea that you want to only apply |
Beta Was this translation helpful? Give feedback.
-
@cimadure @sryza I have created a PR here #18625 to further the examples of usage of dynamic asset partitions, which showcases how to load dynamic partitions dynamically through a job that is configurable, but essentially comes down to a simple usage: @op
def dynamic_partition_loader(
context: OpExecutionContext, asset_key: str, partition_key: str
) -> Any:
"""Dynamically fetches a previous value of asset_key at partition_id.
Args:
context (OpExecutionContext): standard op context
asset_key (str): unique identifier of the asset to load partition from
partition_key (str): unique identifier of the partition
Returns:
Any: the previously stored value of the dynamic partition ww
"""
with defs.get_asset_value_loader(instance=context.instance) as loader:
partition_value = loader.load_asset_value(
AssetKey(asset_key),
partition_key=partition_key,
)
return partition_value
@job
def adhoc_partition_load():
"""Job wrapper of dynamic_partition_loader."""
dynamic_partition_loader()
defs = Definitions(
assets=load_assets_from_modules([assets]),
sensors=[release_sensor],
jobs=[adhoc_partition_load],
resources={"warehouse": duckdb_io_manager.configured({"database": "releases.duckdb"})},
) |
Beta Was this translation helpful? Give feedback.
-
Background
We are evaluating the feasibility of using Dagster for all our Data/ML pipelines. The use-case is simple; we receive datasets, in which we apply arbitrary (but repetitive) rules to clean those and make clean datasets, which are then used to train our ML models
To accomplish this, we have:
Used
DynamicPartitionsDefinition
to define araw_datasets
partitioned asset, where each partition is a dataset. They all have the same structure, but we receive them on a schedule from production dumpsTo populate
raw_datasets
we usesensor
that based on some arbitrary checks on our SQL Datasets returns aSensorResult
that yieldsrun_requests
anddynamic_partitions_requests
This has worked smoothly so far, what we are trying to do next, is have a job that references (by parameter) a dataset name (
raw_datasets.partition_key
) and some extra parameter's to apply some common data cleaning tasks and creates a partition for a new asset (sayclean_datasets
)One example argument could be
remove_nan
where if true it drops all rows with remove_nan. We can have an asset that holds the logic of cleaning theraw_dataset
into partitions, butSpecificPartitionMappings
have to be hard coded and declared upfrontIssue
I have not found a way to have an op or a job that can yield or create a partition based on parameters, and here is what we have tried
I went through Using LastPartitionMapping without an asset #13918 and Partitioned jobs with partitioned source assets as input #13357 where this is a mention of a workaround using
load_asset_value
inside anop
to be able to get an specific partition. This helps, we can use this concept to fetchraw_datasets.partition_key
I went through Enable yielding dynamic partitions requests within assets/ops #13955 which mentions is not currently supporting yield dynamic partitions requests. However, while reading through How do I create dynamic partitions within an op/asset? #15428, I realized that we could use this, to create a new partition on the parametrized job of
clean_datasets
. However, how do we tell dagster to start materializing the new partition, and furthermore, pass the parameters for that run?Other solutions
We can always go back to the
workflow
approach, where we have a job that creates an entry for a newclean_dataset
in our SQL dataset along with the parameters to clean, and have asensor
that triggers Partition Runs based on the DB changes. However, we really want to embrace the concept of assets and this would lose the data lineage, and feel that applying generic data cleansing rules is a common oneCreate repetitive assets manually (in code) that apply repetitive cleaning rules
Desired outcome
Ideally, we would have something like (pseudo-code)
Another way would be to somehow have the ability to manually trigger creations of partitions based on some arbitrary rules (This is exactly what a
sensor
does) but there is a need to trigger manually, and with different parametersDisclaimer
We are relatively new dagster, we may be missing understanding of some concepts
Beta Was this translation helpful? Give feedback.
All reactions