diff --git a/docs/content/concepts/assets/asset-checks/define-execute-asset-checks.mdx b/docs/content/concepts/assets/asset-checks/define-execute-asset-checks.mdx index 917a686a2878e..fe95c1eab2d47 100644 --- a/docs/content/concepts/assets/asset-checks/define-execute-asset-checks.mdx +++ b/docs/content/concepts/assets/asset-checks/define-execute-asset-checks.mdx @@ -319,7 +319,6 @@ When `blocking` is enabled, downstream assets will wait to execute until the che This feature has the following limitations: - **`blocking` is currently only supported with .** [For checks defined in the same operation as assets](#defining-checks-and-assets-together), you can explicitly raise an exception to block downstream execution. -- **Assets with an currently do not respect blocking asset checks** and will execute even if a blocking check on an upstream asset failed. --- diff --git a/docs/content/concepts/assets/asset-observations.mdx b/docs/content/concepts/assets/asset-observations.mdx index 37f8fda98565e..4cebdae12fe5e 100644 --- a/docs/content/concepts/assets/asset-observations.mdx +++ b/docs/content/concepts/assets/asset-observations.mdx @@ -119,7 +119,7 @@ observed to have a newer data version than the data version it had when a downstream asset was materialized, then the downstream asset will be given a label in the Dagster UI that indicates that upstream data has changed. - can be used to automatically + can be used to automatically materialize downstream assets when this occurs. The decorator provides a convenient way to define source assets with observation functions. The below observable source asset takes a file hash and returns it as the data version. Every time you run the observation function, a new observation will be generated with this hash set as its data version. diff --git a/docs/content/guides/dagster/managing-ml.mdx b/docs/content/guides/dagster/managing-ml.mdx index 4df5b51bca934..a7f9dc8b6f09a 100644 --- a/docs/content/guides/dagster/managing-ml.mdx +++ b/docs/content/guides/dagster/managing-ml.mdx @@ -16,7 +16,7 @@ Machine learning models are highly dependent on data at a point in time and must ## Prerequisites -Before proceeding, it is recommended to review [Building machine learning pipelines with Dagster ](/guides/dagster/ml-pipeline) which provides background on using Dagster's assets for machine learning. +Before proceeding, it is recommended to review [Building machine learning pipelines with Dagster](/guides/dagster/ml-pipeline) which provides background on using Dagster's assets for machine learning. --- @@ -28,23 +28,21 @@ You might have thought about your data sources, feature sets, and the best model Whether you have a large or small model, Dagster can help automate data refreshes and model training based on your business needs. -Auto-materializing assets can be used to update a machine learning model when the upstream data is updated. This can be done by setting the `AutoMaterializePolicy` to `eager`, which means that our machine learning model asset will be refreshed anytime our data asset is updated. +Declarative Automation can be used to update a machine learning model when the upstream data is updated. This can be done by setting the `AutomationCondition` to `eager`, which means that our machine learning model asset will be refreshed anytime our data asset is updated. ```python file=/guides/dagster/managing_ml/managing_ml_code.py startafter=eager_materilization_start endbefore=eager_materilization_end -from dagster import AutoMaterializePolicy, asset +from dagster import AutomationCondition, asset @asset def my_data(): ... -@asset( - auto_materialize_policy=AutoMaterializePolicy.eager(), -) +@asset(automation_condition=AutomationCondition.eager()) def my_ml_model(my_data): ... ``` -Some machine learning models might more be cumbersome to retrain; it also might be less important to update them as soon as new data arrives. For this, a lazy auto-materialization policy which can be used in two different ways. The first, by using it with a `freshness_policy` as shown below. In this case, `my_ml_model` will only be auto-materialized once a week. +Some machine learning models might be more cumbersome to retrain; it also might be less important to update them as soon as new data arrives. For this, the `on_cron` condition may be used, which will cause the asset to be updated on a given cron schedule, but only after all of its upstream dependencies have been updated. ```python file=/guides/dagster/managing_ml/managing_ml_code.py startafter=lazy_materlization_start endbefore=lazy_materlization_end from dagster import AutoMaterializePolicy, asset, FreshnessPolicy @@ -54,46 +52,10 @@ from dagster import AutoMaterializePolicy, asset, FreshnessPolicy def my_other_data(): ... -@asset( - auto_materialize_policy=AutoMaterializePolicy.lazy(), - freshness_policy=FreshnessPolicy(maximum_lag_minutes=7 * 24 * 60), -) +@asset(automation_condition=AutomationCondition.on_cron("0 9 * * *")) def my_other_ml_model(my_other_data): ... ``` -This can be useful if you know that you want your machine learning model retrained at least once a week. While Dagster allows you to refresh a machine learning model as often as you like, best practice is to re-train as seldomly as possible. Model retraining can be costly to compute and having a minimal number of model versions can reduce the complexity of reproducing results at a later point in time. In this case, the model is updated once a week for `predictions`, ensuring that `my_ml_model` is retrained before it is used. - -```python file=/guides/dagster/managing_ml/managing_ml_code.py startafter=without_policy_start endbefore=without_policy_end -from dagster import AutoMaterializePolicy, FreshnessPolicy, asset - - -@asset -def some_data(): ... - - -@asset(auto_materialize_policy=AutoMaterializePolicy.lazy()) -def some_ml_model(some_data): ... - - -@asset( - auto_materialize_policy=AutoMaterializePolicy.lazy(), - freshness_policy=FreshnessPolicy(maximum_lag_minutes=7 * 24 * 60), -) -def predictions(some_ml_model): ... -``` - -A more traditional schedule can also be used to update machine learning assets, causing them to be re-materialized or retrained on the latest data. For example, setting up a [cron schedule on a daily basis](/concepts/automation/schedules). - -This can be useful if you have data that is also being scheduled on a cron schedule and want to add your machine model jobs to run on a schedule as well. - -```python file=/guides/dagster/managing_ml/managing_ml_code.py startafter=basic_schedule_start endbefore=basic_schedule_end -from dagster import AssetSelection, define_asset_job, ScheduleDefinition - -ml_asset_job = define_asset_job("ml_asset_job", AssetSelection.groups("ml_asset_group")) - -basic_schedule = ScheduleDefinition(job=ml_asset_job, cron_schedule="0 9 * * *") -``` - ### Monitoring Integrating your machine learning models into Dagster allows you to see when the model and its data dependencies were refreshed, or when a refresh process has failed. By using Dagster to monitor performance changes and process failures on your ML model, it becomes possible to set up remediation paths, such as automated model retraining, that can help resolve issues like model drift. diff --git a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py index 59e6d14dee6f7..a2c5b7bb397fe 100644 --- a/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py +++ b/examples/docs_beta_snippets/docs_beta_snippets/guides/tutorials/etl_tutorial/etl_tutorial/definitions.py @@ -176,7 +176,7 @@ def missing_dimension_check(duckdb: DuckDBResource) -> dg.AssetCheckResult: compute_kind="duckdb", group_name="analysis", deps=[joined_data], - auto_materialize_policy=dg.AutoMaterializePolicy.eager(), + automation_condition=dg.AutomationCondition.eager(), ) def monthly_sales_performance( context: dg.AssetExecutionContext, duckdb: DuckDBResource @@ -237,7 +237,7 @@ def monthly_sales_performance( partitions_def=product_category_partition, group_name="analysis", compute_kind="duckdb", - auto_materialize_policy=dg.AutoMaterializePolicy.eager(), + automation_condition=dg.AutomationCondition.eager(), ) def product_performance(context: dg.AssetExecutionContext, duckdb: DuckDBResource): product_category_str = context.partition_key diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_after_all_parents.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_after_all_parents.py deleted file mode 100644 index bc2361ea1ba52..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_after_all_parents.py +++ /dev/null @@ -1,9 +0,0 @@ -from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset - -wait_for_all_parents_policy = AutoMaterializePolicy.eager().with_rules( - AutoMaterializeRule.skip_on_not_all_parents_updated() -) - - -@asset(auto_materialize_policy=wait_for_all_parents_policy) -def asset1(upstream1, upstream2): ... diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_eager.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_eager.py deleted file mode 100644 index b1add0fadf224..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_eager.py +++ /dev/null @@ -1,9 +0,0 @@ -from dagster import AutoMaterializePolicy, asset - - -@asset -def asset1(): ... - - -@asset(auto_materialize_policy=AutoMaterializePolicy.eager(), deps=[asset1]) -def asset2(): ... diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_lazy.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_lazy.py deleted file mode 100644 index f591287106493..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_lazy.py +++ /dev/null @@ -1,13 +0,0 @@ -from dagster import AutoMaterializePolicy, FreshnessPolicy, asset - - -@asset -def asset1(): ... - - -@asset( - auto_materialize_policy=AutoMaterializePolicy.lazy(), - freshness_policy=FreshnessPolicy(maximum_lag_minutes=24 * 60), - deps=[asset1], -) -def asset2(): ... diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_lazy_transitive.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_lazy_transitive.py deleted file mode 100644 index a26fba228f3c6..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_lazy_transitive.py +++ /dev/null @@ -1,17 +0,0 @@ -from dagster import AutoMaterializePolicy, FreshnessPolicy, asset - - -@asset -def asset1(): ... - - -@asset(auto_materialize_policy=AutoMaterializePolicy.lazy(), deps=[asset1]) -def asset2(): ... - - -@asset( - auto_materialize_policy=AutoMaterializePolicy.lazy(), - freshness_policy=FreshnessPolicy(maximum_lag_minutes=24 * 60), - deps=[asset2], -) -def asset3(): ... diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_max_materializations_per_minute.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_max_materializations_per_minute.py deleted file mode 100644 index 8182eb0f8ba1e..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_max_materializations_per_minute.py +++ /dev/null @@ -1,10 +0,0 @@ -from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset - - -@asset( - partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), - auto_materialize_policy=AutoMaterializePolicy.eager( - max_materializations_per_minute=7 - ), -) -def asset1(): ... diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_multiple.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_multiple.py deleted file mode 100644 index adbe982a7f246..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_multiple.py +++ /dev/null @@ -1,21 +0,0 @@ -from dagster import ( - AutoMaterializePolicy, - Definitions, - asset, - load_assets_from_current_module, -) - - -@asset -def asset1(): ... - - -@asset(deps=[asset1]) -def asset2(): ... - - -defs = Definitions( - assets=load_assets_from_current_module( - auto_materialize_policy=AutoMaterializePolicy.eager(), - ) -) diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_observable_source_asset.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_observable_source_asset.py deleted file mode 100644 index 0cfd6f1e57652..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_observable_source_asset.py +++ /dev/null @@ -1,15 +0,0 @@ -import os - -from dagster import AutoMaterializePolicy, DataVersion, asset, observable_source_asset - - -@observable_source_asset(auto_observe_interval_minutes=1) -def source_file(): - return DataVersion(str(os.path.getmtime("source_file.csv"))) - - -@asset( - deps=[source_file], - auto_materialize_policy=AutoMaterializePolicy.eager(), -) -def asset1(): ... diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_on_cron.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_on_cron.py deleted file mode 100644 index 2a83f793484a9..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_on_cron.py +++ /dev/null @@ -1,10 +0,0 @@ -from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset - -materialize_on_cron_policy = AutoMaterializePolicy.eager().with_rules( - # try to materialize this asset if it hasn't been materialized since the last cron tick - AutoMaterializeRule.materialize_on_cron("0 9 * * *", timezone="US/Central"), -) - - -@asset(auto_materialize_policy=materialize_on_cron_policy) -def root_asset(): ... diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_sensor.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_sensor.py deleted file mode 100644 index 7a2bac6a80c1c..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_sensor.py +++ /dev/null @@ -1,9 +0,0 @@ -from dagster import AssetSelection, AutomationConditionSensorDefinition, Definitions - -my_custom_auto_materialize_sensor = AutomationConditionSensorDefinition( - "my_custom_auto_materialize_sensor", - asset_selection=AssetSelection.all(include_sources=True), - minimum_interval_seconds=60 * 15, -) - -defs = Definitions(sensors=[my_custom_auto_materialize_sensor]) diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_time_partitions.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_time_partitions.py deleted file mode 100644 index 837691d9fee1a..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_time_partitions.py +++ /dev/null @@ -1,16 +0,0 @@ -from dagster import AutoMaterializePolicy, DailyPartitionsDefinition, asset - - -@asset( - partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), - auto_materialize_policy=AutoMaterializePolicy.eager(), -) -def asset1(): ... - - -@asset( - partitions_def=DailyPartitionsDefinition(start_date="2020-10-10"), - auto_materialize_policy=AutoMaterializePolicy.eager(), - deps=[asset1], -) -def asset2(): ... diff --git a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_with_missing_parents.py b/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_with_missing_parents.py deleted file mode 100644 index ca00b76e006a3..0000000000000 --- a/examples/docs_snippets/docs_snippets/concepts/assets/auto_materialize_with_missing_parents.py +++ /dev/null @@ -1,9 +0,0 @@ -from dagster import AutoMaterializePolicy, AutoMaterializeRule, asset - -allow_missing_parents_policy = AutoMaterializePolicy.eager().without_rules( - AutoMaterializeRule.skip_on_parent_missing(), -) - - -@asset(auto_materialize_policy=allow_missing_parents_policy) -def asset1(upstream1, upstream2): ... diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/managing_ml/managing_ml_code.py b/examples/docs_snippets/docs_snippets/guides/dagster/managing_ml/managing_ml_code.py index 529934b5048e7..b8c77ce8f25bf 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/managing_ml/managing_ml_code.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/managing_ml/managing_ml_code.py @@ -2,16 +2,14 @@ ## eager_materilization_start -from dagster import AutoMaterializePolicy, asset +from dagster import AutomationCondition, asset @asset def my_data(): ... -@asset( - auto_materialize_policy=AutoMaterializePolicy.eager(), -) +@asset(automation_condition=AutomationCondition.eager()) def my_ml_model(my_data): ... @@ -26,47 +24,13 @@ def my_ml_model(my_data): ... def my_other_data(): ... -@asset( - auto_materialize_policy=AutoMaterializePolicy.lazy(), - freshness_policy=FreshnessPolicy(maximum_lag_minutes=7 * 24 * 60), -) +@asset(automation_condition=AutomationCondition.on_cron("0 9 * * *")) def my_other_ml_model(my_other_data): ... ## lazy_materlization_end -## without_policy_start -from dagster import AutoMaterializePolicy, FreshnessPolicy, asset - - -@asset -def some_data(): ... - - -@asset(auto_materialize_policy=AutoMaterializePolicy.lazy()) -def some_ml_model(some_data): ... - - -@asset( - auto_materialize_policy=AutoMaterializePolicy.lazy(), - freshness_policy=FreshnessPolicy(maximum_lag_minutes=7 * 24 * 60), -) -def predictions(some_ml_model): ... - - -## without_policy_end - -## basic_schedule_start - -from dagster import AssetSelection, define_asset_job, ScheduleDefinition - -ml_asset_job = define_asset_job("ml_asset_job", AssetSelection.groups("ml_asset_group")) - -basic_schedule = ScheduleDefinition(job=ml_asset_job, cron_schedule="0 9 * * *") - -## basic_schedule_end - ## conditional_monitoring_start from sklearn import linear_model diff --git a/examples/docs_snippets/docs_snippets_tests/concepts_tests/assets_tests/test_auto_materialize.py b/examples/docs_snippets/docs_snippets_tests/concepts_tests/assets_tests/test_auto_materialize.py deleted file mode 100644 index 7de4b47ea8a7d..0000000000000 --- a/examples/docs_snippets/docs_snippets_tests/concepts_tests/assets_tests/test_auto_materialize.py +++ /dev/null @@ -1,39 +0,0 @@ -from dagster import Definitions, load_assets_from_modules -from docs_snippets.concepts.assets import ( - auto_materialize_eager, - auto_materialize_lazy, - auto_materialize_lazy_transitive, - auto_materialize_observable_source_asset, - auto_materialize_sensor, - auto_materialize_time_partitions, -) - - -def test_auto_materialize_eager_asset_defs(): - Definitions(assets=load_assets_from_modules([auto_materialize_eager])) - - -def test_auto_materialize_lazy_asset_defs(): - Definitions(assets=load_assets_from_modules([auto_materialize_lazy])) - - -def test_auto_materialize_lazy_transitive_asset_defs(): - Definitions(assets=load_assets_from_modules([auto_materialize_lazy_transitive])) - - -def test_auto_materialize_observable_source_asset(): - Definitions( - assets=load_assets_from_modules([auto_materialize_observable_source_asset]) - ) - - -def test_auto_materialize_time_partitions(): - Definitions(assets=load_assets_from_modules([auto_materialize_time_partitions])) - - -def test_auto_materialize_sensor(): - assert auto_materialize_sensor.defs - assert ( - auto_materialize_sensor.my_custom_auto_materialize_sensor.name - == "my_custom_auto_materialize_sensor" - )