From 792c6803434dc73e23efdb03bf5703d770e525b8 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 26 Dec 2024 11:38:47 -0500 Subject: [PATCH 1/2] [dagster-airbyte][docs] Migrate Airbyte cloud docs to new doc site --- .../integrations/airbyte/airbyte-cloud.mdx | 10 +-- .../libraries/airbyte/airbyte-cloud.md | 83 +++++++++++++++++++ .../libraries/{ => airbyte}/airbyte.md | 0 .../integrations/airbyte_cloud}/__init__.py | 0 .../customize_airbyte_cloud_asset_defs.py | 0 ...ize_airbyte_cloud_translator_asset_spec.py | 0 .../multiple_airbyte_cloud_workspaces.py | 0 .../representing_airbyte_cloud_assets.py | 0 ...nc_and_materialize_airbyte_cloud_assets.py | 0 .../integrations/airbyte_cloud/__init__.py | 0 .../customize_airbyte_cloud_asset_defs.py | 29 +++++++ ...ize_airbyte_cloud_translator_asset_spec.py | 33 ++++++++ .../multiple_airbyte_cloud_workspaces.py | 27 ++++++ .../representing_airbyte_cloud_assets.py | 13 +++ ...nc_and_materialize_airbyte_cloud_assets.py | 16 ++++ 15 files changed, 206 insertions(+), 5 deletions(-) create mode 100644 docs/docs-beta/docs/integrations/libraries/airbyte/airbyte-cloud.md rename docs/docs-beta/docs/integrations/libraries/{ => airbyte}/airbyte.md (100%) rename examples/{docs_snippets/docs_snippets/integrations/airbyte-cloud => docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud}/__init__.py (100%) rename examples/{docs_snippets/docs_snippets/integrations/airbyte-cloud => docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud}/customize_airbyte_cloud_asset_defs.py (100%) rename examples/{docs_snippets/docs_snippets/integrations/airbyte-cloud => docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud}/customize_airbyte_cloud_translator_asset_spec.py (100%) rename examples/{docs_snippets/docs_snippets/integrations/airbyte-cloud => docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud}/multiple_airbyte_cloud_workspaces.py (100%) rename examples/{docs_snippets/docs_snippets/integrations/airbyte-cloud => docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud}/representing_airbyte_cloud_assets.py (100%) rename examples/{docs_snippets/docs_snippets/integrations/airbyte-cloud => docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud}/sync_and_materialize_airbyte_cloud_assets.py (100%) create mode 100644 examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/__init__.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_asset_defs.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/multiple_airbyte_cloud_workspaces.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/representing_airbyte_cloud_assets.py create mode 100644 examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/sync_and_materialize_airbyte_cloud_assets.py diff --git a/docs/content/integrations/airbyte/airbyte-cloud.mdx b/docs/content/integrations/airbyte/airbyte-cloud.mdx index 99314c90ac976..e876786cc4302 100644 --- a/docs/content/integrations/airbyte/airbyte-cloud.mdx +++ b/docs/content/integrations/airbyte/airbyte-cloud.mdx @@ -40,7 +40,7 @@ To load Airbyte Cloud assets into the Dagster asset graph, you must first constr Dagster can automatically load all connection tables from your Airbyte Cloud workspace as asset specs. Call the function, which returns list of s representing your Airbyte Cloud assets. You can then include these asset specs in your object: -```python file=/integrations/airbyte-cloud/representing_airbyte_cloud_assets.py +```python file=/integrations/airbyte_cloud/representing_airbyte_cloud_assets.py from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs import dagster as dg @@ -60,7 +60,7 @@ defs = dg.Definitions(assets=airbyte_cloud_specs) You can use Dagster to sync Airbyte Cloud connections and materialize Airbyte Cloud connection tables. You can use the factory to create all assets definitions for your Airbyte Cloud workspace. -```python file=/integrations/airbyte-cloud/sync_and_materialize_airbyte_cloud_assets.py +```python file=/integrations/airbyte_cloud/sync_and_materialize_airbyte_cloud_assets.py from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions import dagster as dg @@ -83,7 +83,7 @@ defs = dg.Definitions( If you want to customize the sync of your connections, you can use the decorator to do so. This allows you to execute custom code before and after the call to the Airbyte Cloud sync. -```python file=/integrations/airbyte-cloud/customize_airbyte_cloud_asset_defs.py +```python file=/integrations/airbyte_cloud/customize_airbyte_cloud_asset_defs.py from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets import dagster as dg @@ -119,7 +119,7 @@ defs = dg.Definitions( By default, Dagster will generate asset specs for each Airbyte Cloud asset and populate default metadata. You can further customize asset properties by passing an instance of the custom to the function. -```python file=/integrations/airbyte-cloud/customize_airbyte_cloud_translator_asset_spec.py +```python file=/integrations/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py from dagster_airbyte import ( AirbyteCloudWorkspace, AirbyteConnectionTableProps, @@ -163,7 +163,7 @@ You can pass an instance of the custom resources and merging their specs. This lets you view all your Airbyte Cloud assets in a single asset graph: -```python file=/integrations/airbyte-cloud/multiple_airbyte_cloud_workspaces.py +```python file=/integrations/airbyte_cloud/multiple_airbyte_cloud_workspaces.py from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs import dagster as dg diff --git a/docs/docs-beta/docs/integrations/libraries/airbyte/airbyte-cloud.md b/docs/docs-beta/docs/integrations/libraries/airbyte/airbyte-cloud.md new file mode 100644 index 0000000000000..c846fe1337cb8 --- /dev/null +++ b/docs/docs-beta/docs/integrations/libraries/airbyte/airbyte-cloud.md @@ -0,0 +1,83 @@ +--- +layout: Integration +status: published +name: Airbyte Cloud +title: Using Dagster with Airbyte Cloud +sidebar_label: Airbyte Cloud +excerpt: Orchestrate Airbyte Cloud connections and schedule syncs alongside upstream or downstream dependencies. +date: 2022-11-07 +apireflink: https://docs.dagster.io/_apidocs/libraries/dagster-airbyte +docslink: https://docs.dagster.io/integrations/airbyte-cloud +partnerlink: https://airbyte.com/tutorials/orchestrate-data-ingestion-and-transformation-pipelines +logo: /integrations/airbyte.svg +categories: + - ETL +enabledBy: +enables: +tags: [dagster-supported, etl] +--- + +This guide provides instructions for using Dagster with Airbyte Cloud using the `dagster-airbyte` library. Your Airbyte Cloud connection tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Airbyte Cloud assets and data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Airbyte Cloud connections, allowing you to trigger syncs for these on a cadence or based on upstream data changes. + +## What you'll learn + +- How to represent Airbyte Cloud assets in the Dagster asset graph, including lineage to other Dagster assets. +- How to customize asset definition metadata for these Airbyte Cloud assets. +- How to materialize Airbyte Cloud connection tables from Dagster. +- How to customize how Airbyte Cloud connection tables are materialized. + +
+ Prerequisites + +- The `dagster` and `dagster-airbyte` libraries installed in your environment +- Familiarity with asset definitions and the Dagster asset graph +- Familiarity with Dagster resources +- Familiarity with Airbyte Cloud concepts, like connections and connection tables +- An Airbyte Cloud workspace +- An Airbyte Cloud client ID and client secret. For more information, see [Configuring API Access](https://docs.airbyte.com/using-airbyte/configuring-api-access) in the Airbyte Cloud REST API documentation. + +
+ +## Set up your environment + +To get started, you'll need to install the `dagster` and `dagster-airbyte` Python packages: + +```bash +pip install dagster dagster-airbyte +``` + +## Represent Airbyte Cloud assets in the asset graph + +To load Airbyte Cloud assets into the Dagster asset graph, you must first construct a resource, which allows Dagster to communicate with your Airbyte Cloud workspace. You'll need to supply your workspace ID, client ID and client secret. See [Configuring API Access](https://docs.airbyte.com/using-airbyte/configuring-api-access) in the Airbyte Cloud REST API documentation for more information on how to create your client ID and client secret. + +Dagster can automatically load all connection tables from your Airbyte Cloud workspace as asset specs. Call the function, which returns list of s representing your Airbyte Cloud assets. You can then include these asset specs in your object: + + + +### Sync and materialize Airbyte Cloud assets + +You can use Dagster to sync Airbyte Cloud connections and materialize Airbyte Cloud connection tables. You can use the factory to create all assets definitions for your Airbyte Cloud workspace. + + + +### Customize the materialization of Airbyte Cloud assets + +If you want to customize the sync of your connections, you can use the decorator to do so. This allows you to execute custom code before and after the call to the Airbyte Cloud sync. + + + +### Customize asset definition metadata for Airbyte Cloud assets + +By default, Dagster will generate asset specs for each Airbyte Cloud asset and populate default metadata. You can further customize asset properties by passing an instance of the custom to the function. + + + +Note that `super()` is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it. + +You can pass an instance of the custom to the decorator or the factory. + +### Load Airbyte Cloud assets from multiple workspaces + +Definitions from multiple Airbyte Cloud workspaces can be combined by instantiating multiple resources and merging their specs. This lets you view all your Airbyte Cloud assets in a single asset graph: + + diff --git a/docs/docs-beta/docs/integrations/libraries/airbyte.md b/docs/docs-beta/docs/integrations/libraries/airbyte/airbyte.md similarity index 100% rename from docs/docs-beta/docs/integrations/libraries/airbyte.md rename to docs/docs-beta/docs/integrations/libraries/airbyte/airbyte.md diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/__init__.py b/examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/__init__.py similarity index 100% rename from examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/__init__.py rename to examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/__init__.py diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/customize_airbyte_cloud_asset_defs.py b/examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_asset_defs.py similarity index 100% rename from examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/customize_airbyte_cloud_asset_defs.py rename to examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_asset_defs.py diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/customize_airbyte_cloud_translator_asset_spec.py b/examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py similarity index 100% rename from examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/customize_airbyte_cloud_translator_asset_spec.py rename to examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/multiple_airbyte_cloud_workspaces.py b/examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/multiple_airbyte_cloud_workspaces.py similarity index 100% rename from examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/multiple_airbyte_cloud_workspaces.py rename to examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/multiple_airbyte_cloud_workspaces.py diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/representing_airbyte_cloud_assets.py b/examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/representing_airbyte_cloud_assets.py similarity index 100% rename from examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/representing_airbyte_cloud_assets.py rename to examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/representing_airbyte_cloud_assets.py diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/sync_and_materialize_airbyte_cloud_assets.py b/examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/sync_and_materialize_airbyte_cloud_assets.py similarity index 100% rename from examples/docs_snippets/docs_snippets/integrations/airbyte-cloud/sync_and_materialize_airbyte_cloud_assets.py rename to examples/docs_beta_snippets/docs_beta_snippets/integrations/airbyte_cloud/sync_and_materialize_airbyte_cloud_assets.py diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/__init__.py b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_asset_defs.py b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_asset_defs.py new file mode 100644 index 0000000000000..6ebac1f5125e9 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_asset_defs.py @@ -0,0 +1,29 @@ +from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets + +import dagster as dg + +airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), +) + + +@airbyte_assets( + connection_id="airbyte_connection_id", + workspace=airbyte_workspace, + name="airbyte_connection_name", + group_name="airbyte_connection_name", +) +def airbyte_connection_assets( + context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace +): + # Do something before the materialization... + yield from airbyte.sync_and_poll(context=context) + # Do something after the materialization... + + +defs = dg.Definitions( + assets=[airbyte_connection_assets], + resources={"airbyte": airbyte_workspace}, +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py new file mode 100644 index 0000000000000..f507038b7be5b --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py @@ -0,0 +1,33 @@ +from dagster_airbyte import ( + AirbyteCloudWorkspace, + AirbyteConnectionTableProps, + DagsterAirbyteTranslator, + load_airbyte_cloud_asset_specs, +) + +import dagster as dg + +airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), +) + + +# A translator class lets us customize properties of the built +# Airbyte Cloud assets, such as the owners or asset key +class MyCustomAirbyteTranslator(DagsterAirbyteTranslator): + def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec: + # We create the default asset spec using super() + default_spec = super().get_asset_spec(props) + # We customize the metadata and asset key prefix for all assets + return default_spec.replace_attributes( + key=default_spec.key.with_prefix("prefix"), + ).merge_attributes(metadata={"custom": "metadata"}) + + +airbyte_cloud_specs = load_airbyte_cloud_asset_specs( + airbyte_workspace, dagster_airbyte_translator=MyCustomAirbyteTranslator() +) + +defs = dg.Definitions(assets=airbyte_cloud_specs) diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/multiple_airbyte_cloud_workspaces.py b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/multiple_airbyte_cloud_workspaces.py new file mode 100644 index 0000000000000..f55cd96012131 --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/multiple_airbyte_cloud_workspaces.py @@ -0,0 +1,27 @@ +from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs + +import dagster as dg + +sales_airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_SECRET"), +) + +marketing_airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_SECRET"), +) + +sales_airbyte_cloud_specs = load_airbyte_cloud_asset_specs( + workspace=sales_airbyte_workspace +) +marketing_airbyte_cloud_specs = load_airbyte_cloud_asset_specs( + workspace=marketing_airbyte_workspace +) + +# Merge the specs into a single set of definitions +defs = dg.Definitions( + assets=[*sales_airbyte_cloud_specs, *marketing_airbyte_cloud_specs], +) diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/representing_airbyte_cloud_assets.py b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/representing_airbyte_cloud_assets.py new file mode 100644 index 0000000000000..0ab32622df43f --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/representing_airbyte_cloud_assets.py @@ -0,0 +1,13 @@ +from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs + +import dagster as dg + +airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), +) + + +airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_workspace) +defs = dg.Definitions(assets=airbyte_cloud_specs) diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/sync_and_materialize_airbyte_cloud_assets.py b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/sync_and_materialize_airbyte_cloud_assets.py new file mode 100644 index 0000000000000..5545cf553c50b --- /dev/null +++ b/examples/docs_snippets/docs_snippets/integrations/airbyte_cloud/sync_and_materialize_airbyte_cloud_assets.py @@ -0,0 +1,16 @@ +from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions + +import dagster as dg + +airbyte_workspace = AirbyteCloudWorkspace( + workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"), + client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"), + client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"), +) + +all_airbyte_assets = build_airbyte_assets_definitions(workspace=airbyte_workspace) + +defs = dg.Definitions( + assets=all_airbyte_assets, + resources={"airbyte": airbyte_workspace}, +) From a38796fcab9cc939371cb04daca4df99245757fd Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 26 Dec 2024 11:56:40 -0500 Subject: [PATCH 2/2] Exclude airbyte cloud snippets in tests --- .../docs_beta_snippets_tests/test_integration_files_load.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/examples/docs_beta_snippets/docs_beta_snippets_tests/test_integration_files_load.py b/examples/docs_beta_snippets/docs_beta_snippets_tests/test_integration_files_load.py index 89c5039563a98..dafb3a55714eb 100644 --- a/examples/docs_beta_snippets/docs_beta_snippets_tests/test_integration_files_load.py +++ b/examples/docs_beta_snippets/docs_beta_snippets_tests/test_integration_files_load.py @@ -24,6 +24,11 @@ f"{snippets_folder}/fivetran/multiple_fivetran_workspaces.py", f"{snippets_folder}/fivetran/representing_fivetran_assets.py", f"{snippets_folder}/fivetran/sync_and_materialize_fivetran_assets.py", + f"{snippets_folder}/airbyte_cloud/customize_airbyte_cloud_asset_defs.py", + f"{snippets_folder}/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py", + f"{snippets_folder}/airbyte_cloud/multiple_airbyte_cloud_workspaces.py", + f"{snippets_folder}/airbyte_cloud/representing_airbyte_cloud_assets.py", + f"{snippets_folder}/airbyte_cloud/sync_and_materialize_airbyte_cloud_assets.py", # FIXME: this breaks on py3.8 and seems related to the non-dagster dependencies f"{snippets_folder}/pandera.py", }