diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index dc30c816dd041..f2e9292d3a142 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -934,6 +934,10 @@ "title": "Airlift", "path": "/integrations/airlift", "children": [ + { + "title": "Mapping Airflow Concepts to Dagster", + "path": "/integrations/airlift/airflow-dagster-equivalents" + }, { "title": "Airflow Migration Tutorial", "path": "/integrations/airlift/tutorial/overview", @@ -985,6 +989,32 @@ { "title": "DAG-level migration", "path": "/integrations/airlift/full_dag" + }, + { + "title": "Migration with Airflow 1", + "path": "/integrations/airlift/airflow-1-migration" + }, + { + "title": "Operator Migration Reference", + "path": "/integrations/airlift/operator-migration/overview", + "children": [ + { + "title": "Migrating a BashOperator for dbt", + "path": "/integrations/airlift/operator-migration/bash-operator-dbt" + }, + { + "title": "Migrating a KubernetesPodOperator", + "path": "/integrations/airlift/operator-migration/kubernetes-pod-operator" + }, + { + "title": "Migrating a PythonOperator", + "path": "/integrations/airlift/operator-migration/python-operator" + }, + { + "title": "Migrating a BashOperator (general)", + "path": "/integrations/airlift/operator-migration/bash-operator-general" + } + ] } ] }, diff --git a/docs/content/integrations/airlift.mdx b/docs/content/integrations/airlift.mdx index a3b5e15e954d7..ff632972e869e 100644 --- a/docs/content/integrations/airlift.mdx +++ b/docs/content/integrations/airlift.mdx @@ -54,6 +54,12 @@ In this tutorial, we'll use `dagster-airlift` to observe DAGs from multiple Airf [Click here to get started](/integrations/airlift/federation-tutorial/overview). +## Airlift Operator Migration Reference + +In this reference, we'll explain how to migrate common Airflow operators to Dagster. + +[Click here to get started](/integrations/airlift/operator-migration/overview). + ## References @@ -69,4 +75,8 @@ In this tutorial, we'll use `dagster-airlift` to observe DAGs from multiple Airf title="Additional Airlift Functionality" href="/integrations/airlift/reference" > + diff --git a/docs/content/integrations/airlift/airflow-1-migration.mdx b/docs/content/integrations/airlift/airflow-1-migration.mdx new file mode 100644 index 0000000000000..b99b294ee7d0e --- /dev/null +++ b/docs/content/integrations/airlift/airflow-1-migration.mdx @@ -0,0 +1,278 @@ +# Airflow 1.X Migration Tutorial + +## Overview + +This guide covers using `dagster-airlift` to migrate an Airflow DAG to Dagster on `apache-airflow` below version 2. + +Many APIs within the `dagster-airlift` package make use of Airflow's stable REST API, which was added in Airflow 2.0. However, we still enable a migration process for Airflow 1.x users. + +This guide will cover the migration process using the same base example as the [tutorial](/integrations/airlift/tutorial/overview). + +We recommend following the tutorial in order to understand the concepts and steps involved in the migration process, and then using this guide to apply those steps to an Airflow 1.x environment. + +## Prerequisites + +Before continuining, you should have + +- Basic familiarity with Dagster concepts such as [assets](/concepts/assets/software-defined-assets) and [code locations](/concepts/code-locations). +- Skimmed the [Airlift migration tutorial](/integrations/airlift/tutorial/overview) to understand the general process of migration. + +### Setup + + + +If you previously ran the Airlift tutorial, you can follow along by doing the following: + +- clear `tutorial_example/dagster_defs/definitions.py`, and mark all tasks as unproxied in the proxied state YAML file. + + + + +Start by following the [setup](/integrations/airlift/tutorial/setup) step of the migration tutorial, and we'll diverge from there. + +With Airflow 1.x, we won't [peer](/integrations/airlift/tutorial/peer) or [observe](/integrations/airlift/tutorial/observe) Airflow DAGs first - we'll immediately skip to the migration step and proxy execution to Dagster. + +### Scaffolding proxied state + +To begin proxying tasks in a DAG, first you will need a file to track proxying state. In your Airflow DAG directory, create a `proxied_state` folder, and in it create a yaml file with the same name as your DAG. The included example at `airflow_dags/proxied_state` is used by `make airflow_run`, and can be used as a template for your own proxied state files. + +Given our example DAG `rebuild_customers_list` with three tasks, `load_raw_customers`, `run_dbt_model`, and `export_customers`, `proxied_state/rebuild_customers_list.yaml` should look like the following: + +```yaml file=../../airlift-migration-tutorial/tutorial_example/airflow_dags/proxied_state/rebuild_customers_list.yaml +tasks: + - id: load_raw_customers + proxied: False + - id: build_dbt_models + proxied: False + - id: export_customers + proxied: False +``` + +Next, you will need to modify your Airflow DAG to make it aware of the proxied state. This is already done in the example DAG: + +```python file=../../airlift-migration-tutorial/tutorial_example/snippets/dags_truncated.py +# Dags file can be found at tutorial_example/airflow_dags/dags.py +from pathlib import Path + +from airflow import DAG +from dagster_airlift.in_airflow import proxying_to_dagster +from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml + +dag = DAG("rebuild_customers_list", ...) + +... + +# Set this to True to begin the proxying process +PROXYING = False + +if PROXYING: + proxying_to_dagster( + global_vars=globals(), + proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"), + ) +``` + +Set `PROXYING` to `True` or eliminate the `if` statement. + +The DAG will now display its proxied state in the Airflow UI. (There is some latency as Airflow evaluates the Python file periodically.) + +

+ + + +

+ +### Migrating `build_dbt_models` + +We'll now create Dagster assets that correspond to each Airflow task. First, since Dagster provides out of the box integration with dbt, we'll use some utilities from `dagster-dbt` to create assets for the `build_dbt_models` task. In our `tutorial_example/dagster_defs/definitions.py` file: + +```python file=../../airlift-migration-tutorial/snippets/airflow_1_dbt.py startafter=start_dbt_project_assets endbefore=end_dbt_project_assets +import os +from pathlib import Path + +from dagster import AssetExecutionContext +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + + +def dbt_project_path() -> Path: + env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") + assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" + return Path(env_val) + + +@dbt_assets( + manifest=dbt_project_path() / "target" / "manifest.json", + project=DbtProject(dbt_project_path()), +) +def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() +``` + +We used the decorator to create assets from the dbt project. The `manifest` argument points to the manifest file generated by dbt, and the `project` argument points to the dbt project directory. The resource is a wrapper around the dbt CLI, which we use to execute dbt commands. + +Now, we'll mark our `dbt_project_assets` as being mapped from Airflow: + +```python file=../../airlift-migration-tutorial/snippets/airflow_1_dbt.py startafter=start_mapping endbefore=end_mapping +from dagster_airlift.core import assets_with_task_mappings + +mapped_assets = assets_with_task_mappings( + dag_id="rebuild_customers_list", + task_mappings={ + "build_dbt_models": + # load rich set of assets from dbt project + [dbt_project_assets], + }, +) +``` + +The `assets_with_task_mappings` function adds some metadata to each passed-in asset which, over the wire in Airflow, we'll use to determine which assets to execute in Dagster. + +We'll provide the mapped assets to a `Definitions` object in our `tutorial_example/dagster_defs/definitions.py` file: + +```python file=../../airlift-migration-tutorial/snippets/airflow_1_dbt.py startafter=start_defs endbefore=end_defs +from dagster import Definitions + +defs = Definitions( + assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())} +) +``` + +Note how this differs from the original migration tutorial; we're not using `build_defs_from_airflow_instance`, which relies on the REST API. + +Finally, we'll mark the `build_dbt_models` task as proxied in the proxied state YAML file: + +```yaml file=../../airlift-migration-tutorial/tutorial_example/snippets/dbt_proxied.yaml +tasks: + - id: load_raw_customers + proxied: False + - id: build_dbt_models + proxied: True + - id: export_customers + proxied: False +``` + +**Important**: It may take up to 30 seconds for the proxied state in the Airflow UI to reflect this change. You must subsequently reload the definitions in Dagster via the UI or by restarting `dagster dev`. + +You can now run the `rebuild_customers_list` DAG in Airflow, and the `build_dbt_models` task will be executed in a Dagster run: + +

+ + + +

+ +### Completed code + +Migrating the other tasks should follow the same pattern as in the [migration tutorial](/integrations/airlift/tutorial/migrate#migrating-the-remaining-custom-operators). When you're done, your code should look like this: + +```python file=../../airlift-migration-tutorial/snippets/airflow_1_migrated.py +import os +from pathlib import Path + +from dagster import ( + AssetExecutionContext, + AssetsDefinition, + AssetSpec, + DailyPartitionsDefinition, + Definitions, + multi_asset, +) +from dagster._time import get_current_datetime_midnight +from dagster_airlift.core import assets_with_task_mappings +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + +# Code also invoked from Airflow +from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv +from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb + +PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight()) + + +def dbt_project_path() -> Path: + env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") + assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" + return Path(env_val) + + +def airflow_dags_path() -> Path: + return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags" + + +def load_csv_to_duckdb_asset(spec: AssetSpec, args: LoadCsvToDuckDbArgs) -> AssetsDefinition: + @multi_asset(name=f"load_{args.table_name}", specs=[spec]) + def _multi_asset() -> None: + load_csv_to_duckdb(args) + + return _multi_asset + + +def export_duckdb_to_csv_defs(spec: AssetSpec, args: ExportDuckDbToCsvArgs) -> AssetsDefinition: + @multi_asset(name=f"export_{args.table_name}", specs=[spec]) + def _multi_asset() -> None: + export_duckdb_to_csv(args) + + return _multi_asset + + +@dbt_assets( + manifest=dbt_project_path() / "target" / "manifest.json", + project=DbtProject(dbt_project_path()), + partitions_def=PARTITIONS_DEF, +) +def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() + + +mapped_assets = assets_with_task_mappings( + dag_id="rebuild_customers_list", + task_mappings={ + "load_raw_customers": [ + load_csv_to_duckdb_asset( + AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF), + LoadCsvToDuckDbArgs( + table_name="raw_customers", + csv_path=airflow_dags_path() / "raw_customers.csv", + duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", + names=["id", "first_name", "last_name"], + duckdb_schema="raw_data", + duckdb_database_name="jaffle_shop", + ), + ) + ], + "build_dbt_models": + # load rich set of assets from dbt project + [dbt_project_assets], + "export_customers": [ + export_duckdb_to_csv_defs( + AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF), + ExportDuckDbToCsvArgs( + table_name="customers", + csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv", + duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", + duckdb_database_name="jaffle_shop", + ), + ) + ], + }, +) + + +defs = Definitions( + assets=mapped_assets, + resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, +) +``` + +### Conclusion + +To recap, we've covered the process of migrating an Airflow 1.x DAG to Dagster using `dagster-airlift`. We've made clearer what functionality works wth Airflow < 2.0, and what does not. We've shown how to create Dagster assets that correspond to Airflow tasks, and how to mark those tasks as proxied in the proxied state YAML file. diff --git a/docs/content/integrations/airlift/airflow-dagster-equivalents.mdx b/docs/content/integrations/airlift/airflow-dagster-equivalents.mdx new file mode 100644 index 0000000000000..3a89b8f529eee --- /dev/null +++ b/docs/content/integrations/airlift/airflow-dagster-equivalents.mdx @@ -0,0 +1,617 @@ +# Airflow to Dagster concepts + +In this page, we'll explain how concepts map between Airflow and Dagster. + +### Airflow Tasks + +An Airflow Task is a single computation within a DAG, which executes an `execute` method of its given Operator. Here's an example of an Airflow Task using the `PythonOperator` to upload a file to S3, as well as the taskflow-based `@task` decorator (which achieves the same thing): + +```python file=/integrations/airlift/equivalents/airflow_task.py +from pathlib import Path + +import boto3 +from airflow.decorators import task +from airflow.operators.python import PythonOperator + + +def write_file_to_s3(path: Path) -> None: + boto3.client("s3").upload_file(str(path), "bucket", path.name) + + +def _write_customers_data(): + write_file_to_s3(Path("path/to/customers.csv")) + + +# Defining a task using the PythonOperator syntax +PythonOperator( + python_callable=_write_customers_data, task_id="write_customers_data", dag=... +) + + +# Defining a task using the task decorator +@task(task_id="write_customers_data") +def write_customers_data(): + write_file_to_s3(Path("path/to/customers.csv")) +``` + +Dagster in comparison uses software-defined assets to represent what a task produces. You define the data asset you want to exist, and then define a computation for _how_ to create that asset. If you're familiar with Airflow's Datasets, you can think of a software-defined asset as defining a task and a dataset at once. Here's an example of a Dagster asset that uploads a file to S3: + +```python file=/integrations/airlift/equivalents/task_equiv_asset.py +from pathlib import Path + +import boto3 + +from dagster import asset + + +def write_file_to_s3(path: Path) -> None: + boto3.client("s3").upload_file(str(path), "bucket", path.name) + + +# We define in terms of the "physical" asset - the uploaded file +@asset +def customers_data(): + write_file_to_s3(Path("path/to/customers.csv")) +``` + +Learn more about Dagster's asset-oriented approach in our [software-defined assets reference](/concepts/assets/software-defined-assets). + +### Airflow Operators + +An Airflow Operator is a "building block" which can be used to define tasks from. Here's an example of a `PythonOperator` subclass which takes a file as an input and uploads it to S3: + +```python file=/integrations/airlift/equivalents/custom_operator.py startafter=start_custom_op endbefore=end_custom_op +from pathlib import Path + +import boto3 +from airflow.operators.python import PythonOperator + + +class UploadToS3Operator(PythonOperator): + def __init__(self, path: str, *args, **kwargs) -> None: + super().__init__( + python_callable=self.upload_to_s3, op_args=[path], *args, **kwargs + ) + + def upload_to_s3(self, path: str) -> None: + boto3.client("s3").upload_file( + Filepath=path, Bucket="my_bucket", Key=Path(path).name + ) +``` + +Then, you would invoke this operator to create a task like so: + +```python file=/integrations/airlift/equivalents/custom_operator.py startafter=start_task endbefore=end_task +task = UploadToS3Operator(task_id="write_customers_data", path="path/to/customers.csv") +``` + +The Dagster equivalent would be to create a factory method that defines an asset for a given parameter. + +```python file=/integrations/airlift/equivalents/factory_asset.py startafter=start_factory endbefore=end_factory +import boto3 + +from dagster import asset + + +def build_s3_asset(path: str): + @asset(key=path) + def _s3_file(): + boto3.client("s3").upload_file(path, "my-bucket", path) + + return _s3_file +``` + +Then, you would invoke this factory method to create an asset like so: + +```python file=/integrations/airlift/equivalents/factory_asset.py startafter=start_usage endbefore=end_usage +customers_data = build_s3_asset("path/to/customers.csv") +``` + +### Airflow DAGs & Schedules + +An Airflow DAG is a collection of tasks with dependencies between them, and some scheduling information. Here's an example of an Airflow DAG which runs on a daily schedule: + +```python file=/integrations/airlift/equivalents/airflow_dag.py +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.decorators import task + +dag = DAG( + "simple_dag", + start_date=datetime(2024, 1, 1), + schedule_interval="@daily", +) + + +@task(task_id="write_customers_data", dag=dag) +def write_customers_data(): ... +``` + +You can define a schedule on an arbitrary set of assets in Dagster. Here's an example of a Dagster Asset that runs on a daily schedule: + +```python file=/integrations/airlift/equivalents/scheduled_asset.py +from dagster import ScheduleDefinition, asset, define_asset_job + + +@asset +def customers_data(): ... + + +ScheduleDefinition( + "daily_schedule", + cron_schedule="0 0 * * *", + target=customers_data, +) +``` + +### Airflow DagBags + +An Airflow DagBag is a collection of dags parsed from a particular file tree. The closest equivalent in Dagster is a code location, where you can set up collections of assets, schedules, sensors, etc. See our [code locations guide](/concepts/code-locations) for more information. + +### Airflow data interval, Logical Date, and Execution Date + +In Airflow, data interval is the range of data being processed by the dag, with logical date and execution date being synonymous with the "start" of the data interval. + +```python file=/integrations/airlift/equivalents/airflow_data_interval.py +import boto3 +from airflow.decorators import task + + +@task(task_id="write_customers_data") +def write_partitioned_customers_data(context): + prefix = context["data_interval_start"] + # or + prefix = context["logical_date"] + # or + prefix = context["execution_date"] + + # write data to S3 with the prefix + boto3.client("s3").upload_file( + "path/to/customers.csv", f"bucket/{prefix}/customers.csv" + ) +``` + +The equivalent concept in Dagster is partitions - where you can define a partitioning scheme to represent the data being processed by your computations. + +```python file=/integrations/airlift/equivalents/dagster_partition.py +import boto3 + +from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset + + +@asset(partitions_def=DailyPartitionsDefinition(...)) +def customers_data(context: AssetExecutionContext): + prefix = context.partition_key + boto3.client("s3").upload_file( + "path/to/customers.csv", f"bucket/{prefix}/customers.csv" + ) +``` + +To learn more, see our [partitions guide](/concepts/partitions-schedules-sensors/partitions). + +### Airflow Sensors & Triggers + +Both Airflow and Dagster have the concept of sensors, but they function very differently. In Airflow, sensors, are used to wait for a certain condition to be met before proceeding with the execution of a task. Let's take the example scenario of waiting for new files to exist before proceeding. + +```python file=/integrations/airlift/equivalents/airflow_sensor.py +from datetime import datetime + +from airflow import DAG +from airflow.sensors.filesystem import FileSensor + +dag = DAG("file_sensor_example", start_date=datetime(2024, 1, 1)) + +wait_for_file = FileSensor( + task_id="wait_for_new_customer_files", + filepath="/path/to/customer_files/*.csv", + poke_interval=60, # Check every minute + timeout=3600, # Timeout after 1 hour + mode="poke", + dag=dag, +) +``` + +Triggers in Airflow are an event-based way to accomplish a similar task: waiting for some condition to be true. They run asynchronously in a separate process. + +Dagster sensors combine the best of both worlds of Airflow sensors and triggers, while also providing additional capabilities. Here's an example of a Dagster sensor that will kick off computation of an asset whenever a new file is added to a directory: + +```python file=/integrations/airlift/equivalents/dagster_sensor.py +import json +from pathlib import Path + +from dagster import RunRequest, SensorEvaluationContext, SkipReason, asset, sensor + + +@asset +def uploaded_customers_data(): + pass + + +# Implementing the FileSensor from Airflow directly in Dagster +@sensor(target=uploaded_customers_data) +def wait_for_new_files(context: SensorEvaluationContext): + seen_files: list = json.loads(context.cursor) if context.cursor else [] + should_trigger = False + for file in Path("path/to/customer_files").iterdir(): + if file.name not in seen_files: + seen_files.append(file.name) + should_trigger = True + yield RunRequest() if should_trigger else SkipReason("No new files") +``` + +Key features of Dagster sensors: + +- In Dagster, sensors run _continuously_ and _independently_. This means that they don't use up a task slot in the scheduler, and can poll much more frequently than Airflow sensors. +- Sensors in Dagster can track state between evaluations using cursors. +- Sensors in Dagster can do more than trigger downstream computations - they can also be used to set run tags, trigger external APIs, and more. They are ultimately arbitrary Python functions that can do anything you want (although we generally recommend keeping them lightweight, since they are designed to poll often and not run on heavy infrastructure). + +#### A note on Airflow's ExternalTaskSensor + +Dagster can handle execution of downstreams automatically using [Declarative Automation](/concepts/automation/declarative-automation) - so you don't need to worry about cross-dag dependencies like you do in Airflow. So, for example, if you were previously using the ExternalTaskSensor in Airflow to wait for a task in another DAG to complete, you don't need to do that in Dagster. Instead, you would just define a dependency between those assets in the asset graph. + +### Airflow Hooks & Connections + +Hooks in Airflow are used to interact with external systems, and Connections are used to authenticate to those systems. For example, you would first set up a connection to AWS using an environment variable: + +```bash +export AIRFLOW_CONN_AWS_DEFAULT='aws://YOUR_AWS_ACCESS_KEY_ID:YOUR_AWS_SECRET_ACCESS_KEY@/?region_name=us-east-1' +``` + +Here's an example of using an Airflow Hook with a set-up Airflow connection to interact with S3: + +```python file=/integrations/airlift/equivalents/airflow_hook.py startafter=start_ex endbefore=end_ex +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + + +def upload_customers_data() -> None: + S3Hook(aws_conn_id="aws_default").load_file( + filename="path/to/customers.csv", + key="customers.csv", + bucket_name="my-cool-bucket", + replace=True, + ) + + +s3_task = PythonOperator( + task_id="s3_operations", + python_callable=upload_customers_data, +) +``` + +In Dagster, you interact with external systems using resources, and _configure_ those resources with specific connection information. Here's an example of a Dagster Resource to interact with S3, set up with S3 credentials: + +```python file=/integrations/airlift/equivalents/dagster_resource.py +import os +from pathlib import Path + +from dagster_aws.s3 import S3Resource + +from dagster import Definitions, asset + + +@asset +def customers_s3(s3: S3Resource): + local_file_data = Path("path/to/customers_data.csv").read_bytes() + s3_client = s3.get_client() + + s3_client.put_object( + Bucket="my-cool-bucket", + Key="customers.csv", + Body=local_file_data, + ) + + +defs = Definitions( + assets=[customers_s3], + resources={ + "s3": S3Resource(aws_access_key_id=os.environ["DEFAULT_AWS_ACCESS_KEY_ID"]) + }, +) +``` + +### Airflow XComs + +Airflow XComs are used to pass data between tasks. Here's an example of an Airflow XCom: + +```python file=/integrations/airlift/equivalents/airflow_xcom.py +import boto3 +import pandas as pd +from airflow.operators.python import PythonOperator + + +def upload_customers_data(**context): + raw_customers_data = pd.read_csv("path/to/customers.csv") + avg_revenue = raw_customers_data["revenue"].mean() + task_instance = context["task_instance"] + task_instance.xcom_push(key="avg_revenue", value=avg_revenue) + boto3.client("s3").upload_file("path/to/customers.csv", "bucket/customers.csv") + + +PythonOperator( + task_id="generate_stats", + python_callable=upload_customers_data, + provide_context=True, +) +``` + +In Dagster, you can automatically pass small amounts of data using `Asset metadata`. Here's an example of a Dagster Asset that passes data between tasks: + +```python file=/integrations/airlift/equivalents/dagster_asset.py +import boto3 +import pandas as pd + +from dagster import FloatMetadataValue, MaterializeResult, asset + + +@asset +def customers_data_s3(): + raw_customers_data = pd.read_csv("path/to/customers.csv") + avg_revenue = raw_customers_data["revenue"].mean() + boto3.client("s3").upload_file("path/to/customers.csv", "bucket/customers.csv") + return MaterializeResult(metadata={"avg_revenue": FloatMetadataValue(avg_revenue)}) +``` + +For larger amounts of data, you can use Dagster's [IOManager](/concepts/io-management/io-managers) to manage the data flow between tasks. + +### Airflow Templates & Macros + +Airflow allows templating variables into your DAGs using Jinja. Dagster doesn't have a direct jinja templating feature, instead you're encouraged to use Python functions and interact with Dagster's richly typed API to pass information. The context information available to each asset's execution is on the object. + +### Airflow Executors + +Airflow Executors are used to determine where your tasks are executed. You can use the equivalent Dagster executors concept for this purpose. Learn more about [Dagster executors](/deployment/executors). + +### Airflow Pools + +Airflow Pools allow users to limit the number of concurrent tasks that can be run. Dagster provides concurrency at various levels of the stack to limit the number of computations that can run at a given time, and also limit the number of runs that are in progress at a given time. View our full concurrency guide [here](/guides/limiting-concurrency-in-data-pipelines). + +### Airflow Task Groups + +Airflow task groups allow you to organize tasks into hierarchical groups within the Airflow UI for a particular DAG. Dagster has _global_ asset groups which can be applied to any asset. Learn more about [asset groups](/concepts/assets/software-defined-assets#assigning-assets-to-groups). + +## Cheatsheet + +Here's a cheatsheet for Airflow users migrating to Dagster: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ Airflow concept + + Dagster concept + Notes
Directed Acyclic Graphs (DAG) + Jobs +
Tasks & Datasets + Assets + + Dagster assets combine the concepts of tasks and datasets into a single + abstraction that can be used to define both the computation and the data + produced by that computation. They include support for things like{" "} + + partitioning + +
Connections/Hooks + + + Dagster's resource system allows you to define and configure connections + to external systems. Dagster+ also has a dedicated secrets management + system. +
Variables + + + Dagster's rich config system allows you to define configuration + schematics in code, and pass configuration to computations at runtime. +
DagBags + Code locations + + Multiple isolated code locations with different system and Python + dependencies can exist within the same Dagster deployment. +
DAG runsAsset Runs
+ depends_on_past + + + + An asset can{" "} + + depend on earlier partitions of itself + + . When this is the case, + backfills + and + Declarative Automation + will only materialize later partitions after earlier partitions have + completed. +
Executors + Executors +
Instances + Instances +
OperatorsAsset Factories + Dagster uses normal Python functions instead of framework-specific + operator classes. To define multiple assets from a shared + implementation, you can use a factory function. +
Pools & Task Concurrency + + Asset & Run Concurrency + +
Plugins/Providers + Integrations +
Schedulers + Schedules +
Sensors & Triggers + Sensors, + + Declarative Automation + + + Dagster Sensors provide the best of both worlds of Airflow's sensors and + triggers, while also providing additional functionality. Declarative + automation makes the need for cross-dag dependency sensors like in + Airflow unnecessary. +
SubDAGs/TaskGroups + + + Dagster provides rich, searchable metadata and{" "} + tagging support beyond what's + offered by Airflow. +
XComs + Runtime metadata + I/O managers + + For small data, you can use Dagster's rich metadata abstractions to make + data available between assets, and have it show up in the UI. For larger + datasets, I/O managers are more powerful than XComs and allow the + passing large datasets between jobs. +
diff --git a/docs/content/integrations/airlift/operator-migration/bash-operator-dbt.mdx b/docs/content/integrations/airlift/operator-migration/bash-operator-dbt.mdx new file mode 100644 index 0000000000000..30ab8f9ef47e2 --- /dev/null +++ b/docs/content/integrations/airlift/operator-migration/bash-operator-dbt.mdx @@ -0,0 +1,53 @@ +# Operator migration guides: Migrating usage of `BashOperator` for `dbt` + +In this page, we'll explain migrating an Airflow `BashOperator` that runs a `dbt` command to Dagster. + +### Background + +In Airflow, you might have a `BashOperator` that runs a `dbt` command. For example, you might have a task that runs `dbt run` to build your dbt models. + +```python file=/integrations/airlift/operator_migration/bash_operator_dbt.py +from airflow.operators.bash import BashOperator + +run_dbt_model = BashOperator(task_id="build_dbt_models", bash_command="dbt run") +``` + +### Dagster equivalent + +The Dagster equivalent is to instead use the `dagster-dbt` library to run commands against your dbt project. Here would be the equivalent code in Dagster: + +```python file=/integrations/airlift/operator_migration/using_dbt_assets.py +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + +from dagster import AssetExecutionContext + +project = DbtProject(project_dir="path/to/dbt_project") + + +@dbt_assets(manifest=project.manifest_path) +def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["run"], context=context).stream() +``` + +### Migrating the operator + +Migrating the operator breaks down into a few steps: + +1. Making the dbt project available to both your Airflow and Dagster deployments. +2. Writing a `@dbt_asset`-decorated function which runs your dbt commands. +3. Using `dagster-airlift` to proxy execution of the original task to Dagster. + +### Step 1: Making the dbt project available & building manifest + +First, you'll need to make the dbt project available to the Dagster runtime and build the manifest. + +- If you're building your Dagster deployment in a monorepo alongside your dbt and Airflow projects, you can follow this guide: [Monorepo setup](/integrations/dbt/reference#deploying-a-dagster-project-with-a-dbt-project). +- If you're deploying within a separate repository, you can follow this guide: [Separate repository setup](/integrations/dbt/reference#deploying-a-dbt-project-from-a-separate-git-repository). + +### Step 2: Writing a `@dbt_asset`-decorated function + +Once your dbt project is available, you can write a function that runs your dbt commands using the decorator and . Most dbt CLI commands and flags are supported - to learn more about using `@dbt_assets`, check out the [dagster-dbt quickstart](/integrations/dbt/quickstart) and [reference](/integrations/dbt/reference). + +### Step 3: Using `dagster-airlift` to proxy execution + +Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process. diff --git a/docs/content/integrations/airlift/operator-migration/bash-operator-general.mdx b/docs/content/integrations/airlift/operator-migration/bash-operator-general.mdx new file mode 100644 index 0000000000000..f37b13c9e2602 --- /dev/null +++ b/docs/content/integrations/airlift/operator-migration/bash-operator-general.mdx @@ -0,0 +1,77 @@ +# Operator migration guides: Migrating generalized usage of BashOperator + +In this page, we'll explain migrating an Airflow `BashOperator` to Dagster. + + + If using the `BashOperator` to execute dbt commands, see the [dbt migration + guide](/integrations/airlift/operator-migration/bash-operator-dbt). + + +### Background + +The Airflow `BashOperator` is a common operator used to execute bash commands as part of a data pipeline. + +```python file=/integrations/airlift/operator_migration/bash_operator_general.py +from airflow.operators.bash import BashOperator + +execute_script = BashOperator( + task_id="execute_script", + bash_command="python /path/to/script.py", +) +``` + +The `BashOperator`'s functionality is very general since it can be used to run any bash command, and there exist richer integrations in Dagster for many common BashOperator use cases. We'll explain how 1-1 migration of the BashOperator to execute a bash command in Dagster, and how to use the `dagster-airlift` library to proxy the execution of the original task to Dagster. We'll also provide a reference for richer integrations in Dagster for common BashOperator use cases. + +### Dagster equivalent + +The direct Dagster equivalent to the `BashOperator` is to use the to execute a bash command in a subprocess. + +### Migrating the operator + +Migrating the operator breaks down into a few steps: + +1. Ensure that the resources necessary for your bash command are available to both your Airflow and Dagster deployments. +2. Write an that executes the bash command using the . +3. Use `dagster-airlift` to proxy execution of the original task to Dagster. +4. \[Optional] Implement a richer integration for common BashOperator use cases. + +### Step 1: Ensure shared bash command access + +First, you'll need to ensure that the bash command you're running is available for use in both your Airflow and Dagster deployments. What this entails will vary depending on the command you're running. For example, if you're running a python script, it's as simple as ensuring the python script exists in a shared location accessible to both Airflow and Dagster, and all necessary env vars are set in both environments. + +### Step 2: Writing an `@asset`-decorated function + +You can write a Dagster -decorated function that runs your bash command. This is quite straightforward using the . + +```python file=/integrations/airlift/operator_migration/using_pipes_subprocess.py +from dagster import AssetExecutionContext, PipesSubprocessClient, asset + + +@asset +def script_result(context: AssetExecutionContext): + return ( + PipesSubprocessClient() + .run(context=context, command="python /path/to/script.py") + .get_results() + ) +``` + +### Step 3: Using `dagster-airlift` to proxy execution + +Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process. + +### Step 4: Implementing richer integrations + +For many of the use cases that you might be using the BashOperator for, Dagster might have better options. We'll detail some of those here. + +#### Running a python script + +As mentioned above, you can use the to run a python script in a subprocess. But you can also modify this script to send additional information and logging back to Dagster. See the [Dagster Pipes tutorial](/concepts/dagster-pipes/subprocess) for more information. + +#### Running a dbt command + +We have a whole guide for switching from the `BashOperator` to the `dbt` integration in Dagster. See the [dbt migration guide](/integrations/airlift/operator-migration/bash-operator-dbt) for more information. + +#### Running S3 Sync or other AWS CLI commands + +Dagster has a rich set of integrations for AWS services. For example, you can use the to interact with S3 directly. diff --git a/docs/content/integrations/airlift/operator-migration/kubernetes-pod-operator.mdx b/docs/content/integrations/airlift/operator-migration/kubernetes-pod-operator.mdx new file mode 100644 index 0000000000000..90590c4050a43 --- /dev/null +++ b/docs/content/integrations/airlift/operator-migration/kubernetes-pod-operator.mdx @@ -0,0 +1,159 @@ +# Operator migration guides: Migrating usage of KubernetesPodOperator + +In this page, we'll explain migrating an Airflow `KubernetesPodOperator` to Dagster. + +### Background + +The KubernetesPodOperator in Apache Airflow enables users to execute containerized tasks within Kubernetes pods as part of their data pipelines. + +```python file=/integrations/airlift/operator_migration/kubernetes_pod_operator.py +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator + +k8s_hello_world = KubernetesPodOperator( + task_id="hello_world_task", + name="hello-world-pod", + image="bash:latest", + cmds=["bash", "-cx"], + arguments=['echo "Hello World!"'], +) +``` + +### Dagster equivalent + +The Dagster equivalent is to use the to execute a task within a Kubernetes pod. + +```python file=/integrations/airlift/operator_migration/using_k8s_pipes.py +from dagster_k8s import PipesK8sClient + +from dagster import AssetExecutionContext, asset + +container_cfg = { + "name": "hello-world-pod", + "image": "bash:latest", + "command": ["bash", "-cx"], + "args": ['echo "Hello World!"'], +} + + +@asset +def execute_hello_world_task(context: AssetExecutionContext): + return ( + PipesK8sClient() + .run( + context=context, + base_pod_meta={"name": "hello-world-pod"}, + base_pod_spec={"containers": [container_cfg]}, + ) + .get_results() + ) +``` + +### Migrating the operator + +Migrating the operator breaks down into a few steps: + +1. Ensure that your Dagster deployment has access to the Kubernetes cluster. +2. Write an that executes the task within a Kubernetes pod using the . +3. Use `dagster-airlift` to proxy execution of the original task to Dagster. + +### Step 1: Ensure access to the Kubernetes cluster + +First, you need to ensure that your Dagster deployment has access to the Kubernetes cluster where you want to run your tasks. The accepts `kubeconfig` and `kubecontext`, and `env` arguments to configure the Kubernetes client. + +Here's an example of what this might look like when configuring the client to access an EKS cluster: + +```python file=/integrations/airlift/operator_migration/k8s_eks_fake_example.py startafter=start_client endbefore=end_client +from dagster_k8s import PipesK8sClient + +eks_client = PipesK8sClient( + # The client will have automatic access to all + # environment variables in the execution context. + env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"}, + kubeconfig_file="path/to/kubeconfig", + kube_context="my-eks-cluster", +) +``` + +### Step 2: Writing an asset that executes the task within a Kubernetes pod + +Once you have access to the Kubernetes cluster, you can write an asset that executes the task within a Kubernetes pod using the . In comparison to the KubernetesPodOperator, the PipesK8sClient allows you to define the pod spec directly in your Python code. + +In the [parameter comparison](/integrations/airlift/operator-migration/kubernetes-pod-operator#parameter-comparison) section of this doc, you'll find a detailed comparison describing how to map the KubernetesPodOperator parameters to the PipesK8sClient parameters. + +```python file=/integrations/airlift/operator_migration/k8s_eks_fake_example.py startafter=start_asset endbefore=end_asset +from dagster import AssetExecutionContext, asset + +container_cfg = { + "name": "hello-world-pod", + "image": "bash:latest", + "command": ["bash", "-cx"], + "args": ['echo "Hello World!"'], +} + + +@asset +def execute_hello_world_task(context: AssetExecutionContext): + return eks_client.run( + context=context, + base_pod_meta={"name": "hello-world-pod"}, + base_pod_spec={"containers": [container_cfg]}, + ).get_results() +``` + +This is just a snippet of what the PipesK8sClient can do. Take a look at our full guide on the [dagster-k8s PipesK8sClient](/concepts/dagster-pipes/kubernetes) for more information. + +### Step 3: Using `dagster-airlift` to proxy execution + +Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process. + +### Parameter comparison + +Here's a comparison of the parameters between the KubernetesPodOperator and the PipesK8sClient: Directly supported arguments: + +- in_cluster (named load_incluster_config in the PipesK8sClient) +- cluster_context (named kube_context in the PipesK8sClient) +- config_file (named kubeconfig_file in the PipesK8sClient) + +Many arguments are supported indirectly via the `base_pod_spec` argument. + +- volumes: Volumes to be used by the Pod (key `volumes`) +- affinity: Node affinity/anti-affinity rules for the Pod (key `affinity`) +- node_selector: Node selection constraints for the Pod (key `nodeSelector`) +- hostnetwork: Enable host networking for the Pod (key `hostNetwork`) +- dns_config: DNS settings for the Pod (key `dnsConfig`) +- dnspolicy: DNS policy for the Pod (key `dnsPolicy`) +- hostname: Hostname of the Pod (key `hostname`) +- subdomain: Subdomain for the Pod (key `subdomain`) +- schedulername: Scheduler to be used for the Pod (key `schedulerName`) +- service_account_name: Service account to be used by the Pod (key `serviceAccountName`) +- priority_class_name: Priority class for the Pod (key `priorityClassName`) +- security_context: Security context for the entire Pod (key `securityContext`) +- tolerations: Tolerations for the Pod (key `tolerations`) +- image_pull_secrets: Secrets for pulling container images (key `imagePullSecrets`) +- termination_grace_period: Grace period for Pod termination (key `terminationGracePeriodSeconds`) +- active_deadline_seconds: Deadline for the Pod's execution (key `activeDeadlineSeconds`) +- host_aliases: Additional entries for the Pod's /etc/hosts (key `hostAliases`) +- init_containers: Initialization containers for the Pod (key `initContainers`) + +The following arguments are supported under the nested `containers` key of the `base_pod_spec` argument of the PipesK8sClient: + +- image: Docker image for the container (key 'image') +- cmds: Entrypoint command for the container (key `command`) +- arguments: Arguments for the entrypoint command (key `args`) +- ports: List of ports to expose from the container (key `ports`) +- volume_mounts: List of volume mounts for the container (key `volumeMounts`) +- env_vars: Environment variables for the container (key `env`) +- env_from: List of sources to populate environment variables (key `envFrom`) +- image_pull_policy: Policy for pulling the container image (key `imagePullPolicy`) +- container_resources: Resource requirements for the container (key `resources`) +- container_security_context: Security context for the container (key `securityContext`) +- termination_message_policy: Policy for the termination message (key `terminationMessagePolicy`) + +For a full list, see the [kubernetes container spec documentation](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#container-v1-core). The following arguments are supported under the `base_pod_meta` argument, which configures the metadata of the pod: + +- name: `name` +- namespace: `namespace` +- labels: `labels` +- annotations: `annotations` + +For a full list, see the [kubernetes objectmeta spec documentation](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#objectmeta-v1-meta). diff --git a/docs/content/integrations/airlift/operator-migration/overview.mdx b/docs/content/integrations/airlift/operator-migration/overview.mdx new file mode 100644 index 0000000000000..2829d757d3e4f --- /dev/null +++ b/docs/content/integrations/airlift/operator-migration/overview.mdx @@ -0,0 +1,24 @@ +# Operator Migration Reference + +This page contains a collection of reference materials for migrating usage of common Airflow operator types to Dagster. + +## References + + + + + + + diff --git a/docs/content/integrations/airlift/operator-migration/python-operator.mdx b/docs/content/integrations/airlift/operator-migration/python-operator.mdx new file mode 100644 index 0000000000000..d11b37b45b2e4 --- /dev/null +++ b/docs/content/integrations/airlift/operator-migration/python-operator.mdx @@ -0,0 +1,124 @@ +# Migrating the PythonOperator to Dagster + +In this page, we'll explain migrating an Airflow `PythonOperator` to Dagster. + +### Background + +In Airflow, the `PythonOperator` runs arbitrary python functions. For example, you might have a task that runs function `write_to_db`, which combs a directory for files, and writes each one to a db table. + +```python file=/integrations/airlift/operator_migration/python_operator.py startafter=start_op endbefore=end_op +from airflow.operators.python import PythonOperator + + +def write_to_db() -> None: + for raw_file in RAW_DATA_DIR.iterdir(): + df = contents_as_df(raw_file) + upload_to_db(df) + + +PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...) +``` + +### Dagster equivalent + +The Dagster equivalent is instead to construct a or -decorated function, which materializes assets corresponding to what your python function is doing. + +```python file=/integrations/airlift/operator_migration/pyop_multi_asset_complete.py startafter=start_asset endbefore=end_asset +from dagster import asset + + +@asset(key=TABLE_URI) +def write_to_db() -> None: + for raw_file in RAW_DATA_DIR.iterdir(): + df = contents_as_df(raw_file) + upload_to_db(df) +``` + +### Migrating the operator + +Migrating the operator breaks down into a few steps: + +1. Make a shared library available to both Airflow and Dagster with your python function. +2. Writing an `@asset`-decorated function which runs the python function shared between both modules. +3. Using `dagster-airlift` to proxy execution of the original task to Dagster. + +### Step 1: Building a shared library + +We recommend a monorepo setup for migration; this allows you to keep all your code in one place and easily share code between Airflow and Dagster, without complex CI/CD coordination. + +First, we recommend factoring out a shared package to be available to both the Dagster runtime and the Airflow runtime which contains your python function. The process is as follows: + +1. Scaffold out a new python project which will contain your shared infrastructure. +2. Ensure that the shared library is available to both your Airflow and Dagster deployments. This can be done by adding an editable requirement to your `setup.py` or `pyproject.toml` file in your Airflow/Dagster package. +3. Include the python dependencies relevant to your particular function in your new package. Write your python function in the shared package, and change your Airflow code to import the function from the shared library. + +To illustrate what this might look like a bit more; let's say you originally have this project structure in Airflow: + +```plaintext +airflow_repo/ +├── airflow_package/ +│ └── dags/ +│ └── my_dag.py # Contains your Python function +``` + +With dag code that looks this: + +```python file=/integrations/airlift/operator_migration/python_operator.py startafter=start_op endbefore=end_op +from airflow.operators.python import PythonOperator + + +def write_to_db() -> None: + for raw_file in RAW_DATA_DIR.iterdir(): + df = contents_as_df(raw_file) + upload_to_db(df) + + +PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...) +``` + +You might create a new top-level package to contain the shared code: + +```plaintext +airflow_repo/ +├── airflow_package/ +│ └── dags/ +│ └── my_dag.py # Imports the python function from shared module. +├── shared-package/ +│ └── shared_package/ +│ └── shared_module.py # Contains your Python function +``` + +And then import the function from the shared package in Airflow: + +```python file=/integrations/airlift/operator_migration/python_operator.py startafter=start_shared endbefore=end_shared +from airflow.operators.python import PythonOperator +from shared_module import write_to_db + +PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...) +``` + +The reason we recommend using a separate `shared` package is to help ensure that there aren't dependency conflicts between Airflow and Dagster as you migrate. Airflow has very complex dependency management, and migrating to Dagster gives you an opportunity to clean up and isolate your dependencies. You can do this with a series of shared packages in the monorepo, which will eventually be isolated code locations in Dagster. + +### Step 2: Writing an `@asset`-decorated function + +Next, you can write a Dagster or -decorated function that runs your python function. This will generally be pretty straightforward for a `PythonOperator` migration, as you can generally just invoke the shared function into the `asset` function. + +```python file=/integrations/airlift/operator_migration/pyop_asset_shared.py +# start_asset +# This would be the python code living in a shared module. +from shared_module import my_shared_python_callable + +from dagster import asset + + +@asset +def my_shared_asset(): + return my_shared_python_callable() + + +# end_asset +``` + +### Step 3: Using `dagster-airlift` to proxy execution + +Finally, you can use `dagster-airlift` to proxy the execution of the original task to Dagster. The [dagster-airlift migration guide](/integrations/airlift/tutorial/overview) details this process. diff --git a/docs/content/integrations/airlift/reference.mdx b/docs/content/integrations/airlift/reference.mdx index f1adda58eb2ca..24cc925b99427 100644 --- a/docs/content/integrations/airlift/reference.mdx +++ b/docs/content/integrations/airlift/reference.mdx @@ -12,6 +12,14 @@ description: "dagster-airlift is a toolkit for observing and migrating Airflow D - [Dealing with changing Airflow](#dealing-with-changing-airflow) - [Automating changes to code locations](#automating-changes-to-code-locations) +### Migration best practices + +When migrating Airflow DAGs to Dagster, we recommend a few best practices: + +- **Create separate packages for the Airflow and Dagster deployments.** Airflow has complex dependencies and can be difficult to install in the same environment as Dagster. +- **Create user acceptance tests in Dagster before migrating.** This will help you catch issues easily during migration. +- **Understand the rollback procedure for your migration.** When proxying execution to Dagster from Airflow, you can always rollback with a single line-of-code change in the Airflow DAG. + ### Supporting custom authorization If your Dagster deployment lives behind a custom auth backend, you can customize the Airflow-to-Dagster proxying behavior to authenticate to your backend. `proxying_to_dagster` can take a parameter `dagster_operator_klass`, which allows you to define a custom `BaseProxyTasktoDagsterOperator` class. This allows you to override how a session is created. Let's say for example, your Dagster installation requires an access key to be set whenever a request is made, and that access key is set in an Airflow `Variable` called `my_api_key`. We can create a custom `BaseProxyTasktoDagsterOperator` subclass which will retrieve that variable value and set it on the session, so that any requests to Dagster's graphql API will be made using that api key. diff --git a/examples/airlift-migration-tutorial/snippets/airflow_1_dbt.py b/examples/airlift-migration-tutorial/snippets/airflow_1_dbt.py new file mode 100644 index 0000000000000..804d8412cb308 --- /dev/null +++ b/examples/airlift-migration-tutorial/snippets/airflow_1_dbt.py @@ -0,0 +1,44 @@ +# start_dbt_project_assets +import os +from pathlib import Path + +from dagster import AssetExecutionContext +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + + +def dbt_project_path() -> Path: + env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") + assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" + return Path(env_val) + + +@dbt_assets( + manifest=dbt_project_path() / "target" / "manifest.json", + project=DbtProject(dbt_project_path()), +) +def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() + + +# end_dbt_project_assets + +# start_mapping +from dagster_airlift.core import assets_with_task_mappings + +mapped_assets = assets_with_task_mappings( + dag_id="rebuild_customers_list", + task_mappings={ + "build_dbt_models": + # load rich set of assets from dbt project + [dbt_project_assets], + }, +) +# end_mapping + +# start_defs +from dagster import Definitions + +defs = Definitions( + assets=mapped_assets, resources={"dbt": DbtCliResource(project_dir=dbt_project_path())} +) +# end_defs diff --git a/examples/airlift-migration-tutorial/snippets/airflow_1_migrated.py b/examples/airlift-migration-tutorial/snippets/airflow_1_migrated.py new file mode 100644 index 0000000000000..a60482bd31bd5 --- /dev/null +++ b/examples/airlift-migration-tutorial/snippets/airflow_1_migrated.py @@ -0,0 +1,95 @@ +import os +from pathlib import Path + +from dagster import ( + AssetExecutionContext, + AssetsDefinition, + AssetSpec, + DailyPartitionsDefinition, + Definitions, + multi_asset, +) +from dagster._time import get_current_datetime_midnight +from dagster_airlift.core import assets_with_task_mappings +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + +# Code also invoked from Airflow +from tutorial_example.shared.export_duckdb_to_csv import ExportDuckDbToCsvArgs, export_duckdb_to_csv +from tutorial_example.shared.load_csv_to_duckdb import LoadCsvToDuckDbArgs, load_csv_to_duckdb + +PARTITIONS_DEF = DailyPartitionsDefinition(start_date=get_current_datetime_midnight()) + + +def dbt_project_path() -> Path: + env_val = os.getenv("TUTORIAL_DBT_PROJECT_DIR") + assert env_val, "TUTORIAL_DBT_PROJECT_DIR must be set" + return Path(env_val) + + +def airflow_dags_path() -> Path: + return Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "tutorial_example" / "airflow_dags" + + +def load_csv_to_duckdb_asset(spec: AssetSpec, args: LoadCsvToDuckDbArgs) -> AssetsDefinition: + @multi_asset(name=f"load_{args.table_name}", specs=[spec]) + def _multi_asset() -> None: + load_csv_to_duckdb(args) + + return _multi_asset + + +def export_duckdb_to_csv_defs(spec: AssetSpec, args: ExportDuckDbToCsvArgs) -> AssetsDefinition: + @multi_asset(name=f"export_{args.table_name}", specs=[spec]) + def _multi_asset() -> None: + export_duckdb_to_csv(args) + + return _multi_asset + + +@dbt_assets( + manifest=dbt_project_path() / "target" / "manifest.json", + project=DbtProject(dbt_project_path()), + partitions_def=PARTITIONS_DEF, +) +def dbt_project_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["build"], context=context).stream() + + +mapped_assets = assets_with_task_mappings( + dag_id="rebuild_customers_list", + task_mappings={ + "load_raw_customers": [ + load_csv_to_duckdb_asset( + AssetSpec(key=["raw_data", "raw_customers"], partitions_def=PARTITIONS_DEF), + LoadCsvToDuckDbArgs( + table_name="raw_customers", + csv_path=airflow_dags_path() / "raw_customers.csv", + duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", + names=["id", "first_name", "last_name"], + duckdb_schema="raw_data", + duckdb_database_name="jaffle_shop", + ), + ) + ], + "build_dbt_models": + # load rich set of assets from dbt project + [dbt_project_assets], + "export_customers": [ + export_duckdb_to_csv_defs( + AssetSpec(key="customers_csv", deps=["customers"], partitions_def=PARTITIONS_DEF), + ExportDuckDbToCsvArgs( + table_name="customers", + csv_path=Path(os.environ["TUTORIAL_EXAMPLE_DIR"]) / "customers.csv", + duckdb_path=Path(os.environ["AIRFLOW_HOME"]) / "jaffle_shop.duckdb", + duckdb_database_name="jaffle_shop", + ), + ) + ], + }, +) + + +defs = Definitions( + assets=mapped_assets, + resources={"dbt": DbtCliResource(project_dir=dbt_project_path())}, +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_dag.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_dag.py new file mode 100644 index 0000000000000..e5553c57ae82e --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_dag.py @@ -0,0 +1,14 @@ +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.decorators import task + +dag = DAG( + "simple_dag", + start_date=datetime(2024, 1, 1), + schedule_interval="@daily", +) + + +@task(task_id="write_customers_data", dag=dag) +def write_customers_data(): ... diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_data_interval.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_data_interval.py new file mode 100644 index 0000000000000..7cb0e6fa974ba --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_data_interval.py @@ -0,0 +1,16 @@ +import boto3 +from airflow.decorators import task + + +@task(task_id="write_customers_data") +def write_partitioned_customers_data(context): + prefix = context["data_interval_start"] + # or + prefix = context["logical_date"] + # or + prefix = context["execution_date"] + + # write data to S3 with the prefix + boto3.client("s3").upload_file( + "path/to/customers.csv", f"bucket/{prefix}/customers.csv" + ) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_hook.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_hook.py new file mode 100644 index 0000000000000..bee33967dffac --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_hook.py @@ -0,0 +1,23 @@ +# type: ignore +# start_ex +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.providers.amazon.aws.hooks.s3 import S3Hook + + +def upload_customers_data() -> None: + S3Hook(aws_conn_id="aws_default").load_file( + filename="path/to/customers.csv", + key="customers.csv", + bucket_name="my-cool-bucket", + replace=True, + ) + + +s3_task = PythonOperator( + task_id="s3_operations", + python_callable=upload_customers_data, +) +# end_ex diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_sensor.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_sensor.py new file mode 100644 index 0000000000000..7b7a4872f5aa4 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_sensor.py @@ -0,0 +1,15 @@ +from datetime import datetime + +from airflow import DAG +from airflow.sensors.filesystem import FileSensor + +dag = DAG("file_sensor_example", start_date=datetime(2024, 1, 1)) + +wait_for_file = FileSensor( + task_id="wait_for_new_customer_files", + filepath="/path/to/customer_files/*.csv", + poke_interval=60, # Check every minute + timeout=3600, # Timeout after 1 hour + mode="poke", + dag=dag, +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_task.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_task.py new file mode 100644 index 0000000000000..a8f8b8941e19c --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_task.py @@ -0,0 +1,25 @@ +from pathlib import Path + +import boto3 +from airflow.decorators import task +from airflow.operators.python import PythonOperator + + +def write_file_to_s3(path: Path) -> None: + boto3.client("s3").upload_file(str(path), "bucket", path.name) + + +def _write_customers_data(): + write_file_to_s3(Path("path/to/customers.csv")) + + +# Defining a task using the PythonOperator syntax +PythonOperator( + python_callable=_write_customers_data, task_id="write_customers_data", dag=... +) + + +# Defining a task using the task decorator +@task(task_id="write_customers_data") +def write_customers_data(): + write_file_to_s3(Path("path/to/customers.csv")) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_xcom.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_xcom.py new file mode 100644 index 0000000000000..712cdcfaf0ed0 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/airflow_xcom.py @@ -0,0 +1,18 @@ +import boto3 +import pandas as pd +from airflow.operators.python import PythonOperator + + +def upload_customers_data(**context): + raw_customers_data = pd.read_csv("path/to/customers.csv") + avg_revenue = raw_customers_data["revenue"].mean() + task_instance = context["task_instance"] + task_instance.xcom_push(key="avg_revenue", value=avg_revenue) + boto3.client("s3").upload_file("path/to/customers.csv", "bucket/customers.csv") + + +PythonOperator( + task_id="generate_stats", + python_callable=upload_customers_data, + provide_context=True, +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/custom_operator.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/custom_operator.py new file mode 100644 index 0000000000000..b114238c40e37 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/custom_operator.py @@ -0,0 +1,24 @@ +# start_custom_op +from pathlib import Path + +import boto3 +from airflow.operators.python import PythonOperator + + +class UploadToS3Operator(PythonOperator): + def __init__(self, path: str, *args, **kwargs) -> None: + super().__init__( + python_callable=self.upload_to_s3, op_args=[path], *args, **kwargs + ) + + def upload_to_s3(self, path: str) -> None: + boto3.client("s3").upload_file( + Filepath=path, Bucket="my_bucket", Key=Path(path).name + ) + + +# end_custom_op + +# start_task +task = UploadToS3Operator(task_id="write_customers_data", path="path/to/customers.csv") +# end_task diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_asset.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_asset.py new file mode 100644 index 0000000000000..73a644b139a04 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_asset.py @@ -0,0 +1,12 @@ +import boto3 +import pandas as pd + +from dagster import FloatMetadataValue, MaterializeResult, asset + + +@asset +def customers_data_s3(): + raw_customers_data = pd.read_csv("path/to/customers.csv") + avg_revenue = raw_customers_data["revenue"].mean() + boto3.client("s3").upload_file("path/to/customers.csv", "bucket/customers.csv") + return MaterializeResult(metadata={"avg_revenue": FloatMetadataValue(avg_revenue)}) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_partition.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_partition.py new file mode 100644 index 0000000000000..ebcbb2dda5dba --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_partition.py @@ -0,0 +1,11 @@ +import boto3 + +from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset + + +@asset(partitions_def=DailyPartitionsDefinition(...)) +def customers_data(context: AssetExecutionContext): + prefix = context.partition_key + boto3.client("s3").upload_file( + "path/to/customers.csv", f"bucket/{prefix}/customers.csv" + ) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_resource.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_resource.py new file mode 100644 index 0000000000000..9d0ec547a3d12 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_resource.py @@ -0,0 +1,26 @@ +import os +from pathlib import Path + +from dagster_aws.s3 import S3Resource + +from dagster import Definitions, asset + + +@asset +def customers_s3(s3: S3Resource): + local_file_data = Path("path/to/customers_data.csv").read_bytes() + s3_client = s3.get_client() + + s3_client.put_object( + Bucket="my-cool-bucket", + Key="customers.csv", + Body=local_file_data, + ) + + +defs = Definitions( + assets=[customers_s3], + resources={ + "s3": S3Resource(aws_access_key_id=os.environ["DEFAULT_AWS_ACCESS_KEY_ID"]) + }, +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_sensor.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_sensor.py new file mode 100644 index 0000000000000..1d698b405c6c1 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/dagster_sensor.py @@ -0,0 +1,21 @@ +import json +from pathlib import Path + +from dagster import RunRequest, SensorEvaluationContext, SkipReason, asset, sensor + + +@asset +def uploaded_customers_data(): + pass + + +# Implementing the FileSensor from Airflow directly in Dagster +@sensor(target=uploaded_customers_data) +def wait_for_new_files(context: SensorEvaluationContext): + seen_files: list = json.loads(context.cursor) if context.cursor else [] + should_trigger = False + for file in Path("path/to/customer_files").iterdir(): + if file.name not in seen_files: + seen_files.append(file.name) + should_trigger = True + yield RunRequest() if should_trigger else SkipReason("No new files") diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/factory_asset.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/factory_asset.py new file mode 100644 index 0000000000000..2dc779ec52aa8 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/factory_asset.py @@ -0,0 +1,19 @@ +# start_factory +import boto3 + +from dagster import asset + + +def build_s3_asset(path: str): + @asset(key=path) + def _s3_file(): + boto3.client("s3").upload_file(path, "my-bucket", path) + + return _s3_file + + +# end_factory + +# start_usage +customers_data = build_s3_asset("path/to/customers.csv") +# end_usage diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/scheduled_asset.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/scheduled_asset.py new file mode 100644 index 0000000000000..0f81181b8d78d --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/scheduled_asset.py @@ -0,0 +1,12 @@ +from dagster import ScheduleDefinition, asset, define_asset_job + + +@asset +def customers_data(): ... + + +ScheduleDefinition( + "daily_schedule", + cron_schedule="0 0 * * *", + target=customers_data, +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/task_equiv_asset.py b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/task_equiv_asset.py new file mode 100644 index 0000000000000..cd368682109c2 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/equivalents/task_equiv_asset.py @@ -0,0 +1,15 @@ +from pathlib import Path + +import boto3 + +from dagster import asset + + +def write_file_to_s3(path: Path) -> None: + boto3.client("s3").upload_file(str(path), "bucket", path.name) + + +# We define in terms of the "physical" asset - the uploaded file +@asset +def customers_data(): + write_file_to_s3(Path("path/to/customers.csv")) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_dbt.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_dbt.py new file mode 100644 index 0000000000000..7f500a680aebd --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_dbt.py @@ -0,0 +1,3 @@ +from airflow.operators.bash import BashOperator + +run_dbt_model = BashOperator(task_id="build_dbt_models", bash_command="dbt run") diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_general.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_general.py new file mode 100644 index 0000000000000..1a45e386c8567 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/bash_operator_general.py @@ -0,0 +1,6 @@ +from airflow.operators.bash import BashOperator + +execute_script = BashOperator( + task_id="execute_script", + bash_command="python /path/to/script.py", +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/k8s_eks_fake_example.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/k8s_eks_fake_example.py new file mode 100644 index 0000000000000..8ff590044138f --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/k8s_eks_fake_example.py @@ -0,0 +1,35 @@ +AWS_CREDENTIALS = {} + +# start_client +from dagster_k8s import PipesK8sClient + +eks_client = PipesK8sClient( + # The client will have automatic access to all + # environment variables in the execution context. + env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"}, + kubeconfig_file="path/to/kubeconfig", + kube_context="my-eks-cluster", +) +# end_client + +# start_asset +from dagster import AssetExecutionContext, asset + +container_cfg = { + "name": "hello-world-pod", + "image": "bash:latest", + "command": ["bash", "-cx"], + "args": ['echo "Hello World!"'], +} + + +@asset +def execute_hello_world_task(context: AssetExecutionContext): + return eks_client.run( + context=context, + base_pod_meta={"name": "hello-world-pod"}, + base_pod_spec={"containers": [container_cfg]}, + ).get_results() + + +# end_asset diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/kubernetes_pod_operator.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/kubernetes_pod_operator.py new file mode 100644 index 0000000000000..bde2a3e5b1a86 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/kubernetes_pod_operator.py @@ -0,0 +1,9 @@ +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator + +k8s_hello_world = KubernetesPodOperator( + task_id="hello_world_task", + name="hello-world-pod", + image="bash:latest", + cmds=["bash", "-cx"], + arguments=['echo "Hello World!"'], +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_asset_shared.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_asset_shared.py new file mode 100644 index 0000000000000..7cac49711e3b1 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_asset_shared.py @@ -0,0 +1,14 @@ +# type: ignore +# start_asset +# This would be the python code living in a shared module. +from shared_module import my_shared_python_callable + +from dagster import asset + + +@asset +def my_shared_asset(): + return my_shared_python_callable() + + +# end_asset diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_multi_asset_complete.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_multi_asset_complete.py new file mode 100644 index 0000000000000..91a1768e2dd13 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/pyop_multi_asset_complete.py @@ -0,0 +1,27 @@ +from pathlib import Path +from typing import Any + +RAW_DATA_DIR = Path("path") +TABLE_URI = "blah" + + +def contents_as_df(path: Path) -> Any: + pass + + +def upload_to_db(df): + pass + + +# start_asset +from dagster import asset + + +@asset(key=TABLE_URI) +def write_to_db() -> None: + for raw_file in RAW_DATA_DIR.iterdir(): + df = contents_as_df(raw_file) + upload_to_db(df) + + +# end_asset diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/python_operator.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/python_operator.py new file mode 100644 index 0000000000000..014e23a4bb7fc --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/python_operator.py @@ -0,0 +1,35 @@ +# type: ignore +from pathlib import Path +from typing import Any + +RAW_DATA_DIR = Path("path") + + +def contents_as_df(path: Path) -> Any: + pass + + +def upload_to_db(df: Any): + pass + + +# start_op +from airflow.operators.python import PythonOperator + + +def write_to_db() -> None: + for raw_file in RAW_DATA_DIR.iterdir(): + df = contents_as_df(raw_file) + upload_to_db(df) + + +PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...) + +# end_op + +# start_shared +from airflow.operators.python import PythonOperator +from shared_module import write_to_db + +PythonOperator(python_callable=write_to_db, task_id="db_upload", dag=...) +# end_shared diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/shared.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/shared.py new file mode 100644 index 0000000000000..83089bcd9c742 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/shared.py @@ -0,0 +1,2 @@ +def my_shared_python_callable(): + pass diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_dbt_assets.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_dbt_assets.py new file mode 100644 index 0000000000000..ea9937de0d4db --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_dbt_assets.py @@ -0,0 +1,10 @@ +from dagster_dbt import DbtCliResource, DbtProject, dbt_assets + +from dagster import AssetExecutionContext + +project = DbtProject(project_dir="path/to/dbt_project") + + +@dbt_assets(manifest=project.manifest_path) +def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): + yield from dbt.cli(["run"], context=context).stream() diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_k8s_pipes.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_k8s_pipes.py new file mode 100644 index 0000000000000..986125ea5daa4 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_k8s_pipes.py @@ -0,0 +1,23 @@ +from dagster_k8s import PipesK8sClient + +from dagster import AssetExecutionContext, asset + +container_cfg = { + "name": "hello-world-pod", + "image": "bash:latest", + "command": ["bash", "-cx"], + "args": ['echo "Hello World!"'], +} + + +@asset +def execute_hello_world_task(context: AssetExecutionContext): + return ( + PipesK8sClient() + .run( + context=context, + base_pod_meta={"name": "hello-world-pod"}, + base_pod_spec={"containers": [container_cfg]}, + ) + .get_results() + ) diff --git a/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_pipes_subprocess.py b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_pipes_subprocess.py new file mode 100644 index 0000000000000..29418f3aa6b38 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airlift/operator_migration/using_pipes_subprocess.py @@ -0,0 +1,10 @@ +from dagster import AssetExecutionContext, PipesSubprocessClient, asset + + +@asset +def script_result(context: AssetExecutionContext): + return ( + PipesSubprocessClient() + .run(context=context, command="python /path/to/script.py") + .get_results() + )