diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index 8e14474c7a64c..cf6c2f8907aa1 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index d95058769efc5..5aa13c7d4a733 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index 4e9e183b3e74b..6de86d3fc15d9 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/content/integrations/airbyte.mdx b/docs/content/integrations/airbyte.mdx index ee6dc64856358..3a11e5c35ecaf 100644 --- a/docs/content/integrations/airbyte.mdx +++ b/docs/content/integrations/airbyte.mdx @@ -94,55 +94,6 @@ airbyte_assets = load_assets_from_airbyte_instance(airbyte_instance) The `load_assets_from_airbyte_instance` function retrieves all of the connections you have defined in the Airbyte interface, creating asset definitions for each data stream. Each connection has an associated [op](https://docs.dagster.io/concepts/ops-jobs-graphs/ops#ops) which triggers a sync of that connection. - - - - -### Loading Airbyte asset definitions from YAML config - - - {" "} - has been deprecated as the Octavia CLI is no longer maintained. Consider using{" "} - {" "} - instead. - - -To load Airbyte assets into Dagster from a set of YAML configuration files, specify the Octavia project directory, which contains the `sources`, `destinations`, and `connections` subfolders. This is the directory where you first ran `octavia init`. Here, the YAML files are treated as the source of truth for building Dagster assets. - -```python startafter=start_load_assets_from_airbyte_project endbefore=end_load_assets_from_airbyte_project file=/integrations/airbyte/airbyte.py dedent=4 -from dagster_airbyte import load_assets_from_airbyte_project - -airbyte_assets = load_assets_from_airbyte_project( - project_dir="path/to/airbyte/project", -) -``` - -The `load_assets_from_airbyte_project` function parses the YAML metadata, generating a set of asset definitions which reflect each of the data streams synced by your connections. Each connection has an associated [op](https://docs.dagster.io/concepts/ops-jobs-graphs/ops#ops) which triggers a sync of that connection. - -#### Adding a resource - -Assets loaded from Airbyte require an `AirbyteResource`, which defines how to connect and interact with your Airbyte instance. - -We can add the Airbyte resource we configured above to our Airbyte assets by doing the following: - -```python startafter=start_airbyte_project_config endbefore=end_airbyte_project_config file=/integrations/airbyte/airbyte.py dedent=4 -from dagster_airbyte import load_assets_from_airbyte_project - -from dagster import with_resources - -# Use the airbyte_instance resource we defined in Step 1 -airbyte_assets = with_resources( - [load_assets_from_airbyte_project(project_dir="path/to/airbyte/project")], - {"airbyte": airbyte_instance}, -) -``` - diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index 6973c693d3c7a..d0ce4de5c8d98 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-airbyte.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-airbyte.rst index afaff2a405b6b..7e0024acb3f87 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-airbyte.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-airbyte.rst @@ -21,12 +21,10 @@ Assets .. autofunction:: load_assets_from_airbyte_instance -.. autofunction:: load_assets_from_airbyte_project - .. autofunction:: build_airbyte_assets Ops === -.. autoconfigurable:: airbyte_sync_op \ No newline at end of file +.. autoconfigurable:: airbyte_sync_op diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst index 8bb49d282b2ca..eb49ce237d67d 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-embedded-elt.rst @@ -35,8 +35,6 @@ Resources (Sling) .. autoclass:: SlingConnectionResource ----- - ******************************* dlt (dagster-embedded-elt.dlt) ******************************* diff --git a/docs/tox.ini b/docs/tox.ini index 53199468dd971..557c740995cc8 100644 --- a/docs/tox.ini +++ b/docs/tox.ini @@ -49,6 +49,8 @@ deps = -e ../python_modules/libraries/dagster-tableau -e ../python_modules/libraries/dagster-powerbi + sling + commands = make --directory=sphinx clean make --directory=sphinx json SPHINXOPTS="-W --keep-going" @@ -85,6 +87,7 @@ deps = -e ../python_modules/libraries/dagster-sigma -e ../python_modules/libraries/dagster-tableau -e ../python_modules/libraries/dagster-powerbi + sling commands = make --directory=sphinx clean diff --git a/examples/docs_snippets/docs_snippets/integrations/airbyte/airbyte.py b/examples/docs_snippets/docs_snippets/integrations/airbyte/airbyte.py index 4f69a699ae955..20ffee7016912 100644 --- a/examples/docs_snippets/docs_snippets/integrations/airbyte/airbyte.py +++ b/examples/docs_snippets/docs_snippets/integrations/airbyte/airbyte.py @@ -28,16 +28,6 @@ def scope_define_cloud_instance() -> None: # end_define_cloud_instance -def scope_load_assets_from_airbyte_project(): - # start_load_assets_from_airbyte_project - from dagster_airbyte import load_assets_from_airbyte_project - - airbyte_assets = load_assets_from_airbyte_project( - project_dir="path/to/airbyte/project", - ) - # end_load_assets_from_airbyte_project - - def scope_load_assets_from_airbyte_instance(): from dagster_airbyte import AirbyteResource from dagster import EnvVar @@ -57,26 +47,6 @@ def scope_load_assets_from_airbyte_instance(): # end_load_assets_from_airbyte_instance -def scope_airbyte_project_config(): - from dagster_airbyte import AirbyteResource - - airbyte_instance = AirbyteResource( - host="localhost", - port="8000", - ) - # start_airbyte_project_config - from dagster_airbyte import load_assets_from_airbyte_project - - from dagster import with_resources - - # Use the airbyte_instance resource we defined in Step 1 - airbyte_assets = with_resources( - [load_assets_from_airbyte_project(project_dir="path/to/airbyte/project")], - {"airbyte": airbyte_instance}, - ) - # end_airbyte_project_config - - def scope_manually_define_airbyte_assets(): # start_manually_define_airbyte_assets from dagster_airbyte import build_airbyte_assets diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py index 1471fd16d8b9e..2a4161961ea24 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/__init__.py @@ -17,7 +17,6 @@ from dagster_airbyte.asset_defs import ( build_airbyte_assets as build_airbyte_assets, load_assets_from_airbyte_instance as load_assets_from_airbyte_instance, - load_assets_from_airbyte_project as load_assets_from_airbyte_project, ) from dagster_airbyte.ops import airbyte_sync_op as airbyte_sync_op from dagster_airbyte.resources import ( diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py index 9b64a59c0d44f..f982d71748a10 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/asset_defs.py @@ -33,7 +33,6 @@ SourceAsset, _check as check, ) -from dagster._annotations import deprecated from dagster._core.definitions import AssetsDefinition, multi_asset from dagster._core.definitions.cacheable_assets import ( AssetsDefinitionCacheableData, @@ -1033,129 +1032,3 @@ def load_assets_from_airbyte_instance( connection_to_freshness_policy_fn=connection_to_freshness_policy_fn, connection_to_auto_materialize_policy_fn=connection_to_auto_materialize_policy_fn, ) - - -@deprecated( - breaking_version="1.9", - additional_warn_text="The Airbyte Octavia CLI has been deprecated. Consider using load_assets_from_airbyte_instance instead.", -) -def load_assets_from_airbyte_project( - project_dir: str, - workspace_id: Optional[str] = None, - key_prefix: Optional[CoercibleToAssetKeyPrefix] = None, - create_assets_for_normalization_tables: bool = True, - connection_to_group_fn: Optional[Callable[[str], Optional[str]]] = _clean_name, - connection_meta_to_group_fn: Optional[ - Callable[[AirbyteConnectionMetadata], Optional[str]] - ] = None, - io_manager_key: Optional[str] = None, - connection_to_io_manager_key_fn: Optional[Callable[[str], Optional[str]]] = None, - connection_filter: Optional[Callable[[AirbyteConnectionMetadata], bool]] = None, - connection_directories: Optional[Sequence[str]] = None, - connection_to_asset_key_fn: Optional[ - Callable[[AirbyteConnectionMetadata, str], AssetKey] - ] = None, - connection_to_freshness_policy_fn: Optional[ - Callable[[AirbyteConnectionMetadata], Optional[FreshnessPolicy]] - ] = None, - connection_to_auto_materialize_policy_fn: Optional[ - Callable[[AirbyteConnectionMetadata], Optional[AutoMaterializePolicy]] - ] = None, -) -> CacheableAssetsDefinition: - """Loads an Airbyte project into a set of Dagster assets. - - Point to the root folder of an Airbyte project synced using the Octavia CLI. For - more information, see https://airbyte.com/tutorials/version-control-airbyte-configurations. - - Args: - project_dir (str): The path to the root of your Airbyte project, containing sources, destinations, - and connections folders. - workspace_id (Optional[str]): The ID of the Airbyte workspace to load connections from. Only - required if multiple workspace state YAMLfiles exist in the project. - key_prefix (Optional[CoercibleToAssetKeyPrefix]): A prefix for the asset keys created. - create_assets_for_normalization_tables (bool): If True, assets will be created for tables - created by Airbyte's normalization feature. If False, only the destination tables - will be created. Defaults to True. - connection_to_group_fn (Optional[Callable[[str], Optional[str]]]): Function which returns an asset - group name for a given Airbyte connection name. If None, no groups will be created. Defaults - to a basic sanitization function. - connection_meta_to_group_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[str]]]): Function - which returns an asset group name for a given Airbyte connection metadata. If None and connection_to_group_fn - is None, no groups will be created. Defaults to None. - io_manager_key (Optional[str]): The I/O manager key to use for all assets. Defaults to "io_manager". - Use this if all assets should be loaded from the same source, otherwise use connection_to_io_manager_key_fn. - connection_to_io_manager_key_fn (Optional[Callable[[str], Optional[str]]]): Function which returns an - I/O manager key for a given Airbyte connection name. When other ops are downstream of the loaded assets, - the IOManager specified determines how the inputs to those ops are loaded. Defaults to "io_manager". - connection_filter (Optional[Callable[[AirbyteConnectionMetadata], bool]]): Optional function which - takes in connection metadata and returns False if the connection should be excluded from the output assets. - connection_directories (Optional[List[str]]): Optional list of connection directories to load assets from. - If omitted, all connections in the Airbyte project are loaded. May be faster than connection_filter - if the project has many connections or if the connection yaml files are large. - connection_to_asset_key_fn (Optional[Callable[[AirbyteConnectionMetadata, str], AssetKey]]): Optional function which - takes in connection metadata and table name and returns an asset key for the table. If None, the default asset - key is based on the table name. Any asset key prefix will be applied to the output of this function. - connection_to_freshness_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[FreshnessPolicy]]]): - Optional function which takes in connection metadata and returns a freshness policy for the connection's assets. - If None, no freshness policies will be applied to the assets. - connection_to_auto_materialize_policy_fn (Optional[Callable[[AirbyteConnectionMetadata], Optional[AutoMaterializePolicy]]]): - Optional function which takes in connection metadata and returns an auto materialization policy for the connection's assets. - If None, no auto materialization policies will be applied to the assets. - - **Examples:** - - Loading all Airbyte connections as assets: - - .. code-block:: python - - from dagster_airbyte import load_assets_from_airbyte_project - - airbyte_assets = load_assets_from_airbyte_project( - project_dir="path/to/airbyte/project", - ) - - Filtering the set of loaded connections: - - .. code-block:: python - - from dagster_airbyte import load_assets_from_airbyte_project - - airbyte_assets = load_assets_from_airbyte_project( - project_dir="path/to/airbyte/project", - connection_filter=lambda meta: "snowflake" in meta.name, - ) - """ - if isinstance(key_prefix, str): - key_prefix = [key_prefix] - key_prefix = check.list_param(key_prefix or [], "key_prefix", of_type=str) - - check.invariant( - not io_manager_key or not connection_to_io_manager_key_fn, - "Cannot specify both io_manager_key and connection_to_io_manager_key_fn", - ) - if not connection_to_io_manager_key_fn: - connection_to_io_manager_key_fn = lambda _: io_manager_key - - check.invariant( - not connection_meta_to_group_fn - or not connection_to_group_fn - or connection_to_group_fn == _clean_name, - "Cannot specify both connection_meta_to_group_fn and connection_to_group_fn", - ) - - if not connection_meta_to_group_fn and connection_to_group_fn: - connection_meta_to_group_fn = lambda meta: connection_to_group_fn(meta.name) - - return AirbyteYAMLCacheableAssetsDefinition( - project_dir=project_dir, - workspace_id=workspace_id, - key_prefix=key_prefix, - create_assets_for_normalization_tables=create_assets_for_normalization_tables, - connection_meta_to_group_fn=connection_meta_to_group_fn, - connection_to_io_manager_key_fn=connection_to_io_manager_key_fn, - connection_filter=connection_filter, - connection_directories=connection_directories, - connection_to_asset_key_fn=connection_to_asset_key_fn, - connection_to_freshness_policy_fn=connection_to_freshness_policy_fn, - connection_to_auto_materialize_policy_fn=connection_to_auto_materialize_policy_fn, - ) diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_load_from_project.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_load_from_project.py deleted file mode 100644 index 58df98d541514..0000000000000 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_load_from_project.py +++ /dev/null @@ -1,205 +0,0 @@ -import pytest -import responses -import yaml -from dagster import AssetKey, build_init_resource_context, materialize, with_resources -from dagster._utils import file_relative_path -from dagster_airbyte import AirbyteResource, airbyte_resource, load_assets_from_airbyte_project -from dagster_airbyte.asset_defs import AirbyteConnectionMetadata - -from dagster_airbyte_tests.utils import get_project_connection_json, get_project_job_json - - -@pytest.fixture(name="airbyte_instance", params=[True, False], scope="module") -def airbyte_instance_fixture(request) -> AirbyteResource: - if request.param: - return AirbyteResource(host="some_host", port="8000", poll_interval=0) - else: - return airbyte_resource( - build_init_resource_context({"host": "some_host", "port": "8000", "poll_interval": 0}) - ) - - -@responses.activate -@pytest.mark.parametrize("use_normalization_tables", [True, False]) -@pytest.mark.parametrize( - "connection_to_group_fn, connection_meta_to_group_fn", - [(None, lambda meta: f"{meta.name[0]}_group"), (None, None), (lambda x: f"{x[0]}_group", None)], -) -@pytest.mark.parametrize("filter_connection", [None, "filter_fn", "dirs"]) -@pytest.mark.parametrize( - "connection_to_asset_key_fn", [None, lambda conn, name: AssetKey([f"{conn.name[0]}_{name}"])] -) -def test_load_from_project( - use_normalization_tables, - connection_to_group_fn, - connection_meta_to_group_fn, - filter_connection, - connection_to_asset_key_fn, - airbyte_instance, -): - if connection_to_group_fn: - ab_cacheable_assets = load_assets_from_airbyte_project( - file_relative_path(__file__, "./test_airbyte_project"), - create_assets_for_normalization_tables=use_normalization_tables, - connection_to_group_fn=connection_to_group_fn, - connection_meta_to_group_fn=connection_meta_to_group_fn, - connection_filter=(lambda _: False) if filter_connection == "filter_fn" else None, - connection_directories=( - ["github_snowflake_ben"] if filter_connection == "dirs" else None - ), - connection_to_asset_key_fn=connection_to_asset_key_fn, - ) - else: - ab_cacheable_assets = load_assets_from_airbyte_project( - file_relative_path(__file__, "./test_airbyte_project"), - create_assets_for_normalization_tables=use_normalization_tables, - connection_meta_to_group_fn=connection_meta_to_group_fn, - connection_filter=(lambda _: False) if filter_connection == "filter_fn" else None, - connection_directories=( - ["github_snowflake_ben"] if filter_connection == "dirs" else None - ), - connection_to_asset_key_fn=connection_to_asset_key_fn, - ) - ab_assets = ab_cacheable_assets.build_definitions(ab_cacheable_assets.compute_cacheable_data()) - - if filter_connection == "filter_fn": - assert len(ab_assets) == 0 - return - - tables = { - "dagster_releases", - "dagster_tags", - "dagster_teams", - "dagster_array_test", - "dagster_unknown_test", - } | ( - { - "dagster_releases_assets", - "dagster_releases_author", - "dagster_tags_commit", - "dagster_releases_foo", - "dagster_array_test_author", - } - if use_normalization_tables - else set() - ) - - with open( - file_relative_path( - __file__, "./test_airbyte_project/destinations/snowflake_ben/configuration.yaml" - ), - encoding="utf-8", - ) as f: - destination_data = yaml.safe_load(f.read()) - - if connection_to_asset_key_fn: - tables = { - connection_to_asset_key_fn( - AirbyteConnectionMetadata( - "Github <> snowflake-ben", "", use_normalization_tables, [], destination_data - ), - t, - ).path[0] - for t in tables - } - - # Check metadata is added correctly to asset def - assets_def = ab_assets[0] - - table_names = { - "AIRBYTE.BEN_DEMO.releases", - "AIRBYTE.BEN_DEMO.tags", - "AIRBYTE.BEN_DEMO.teams", - "AIRBYTE.BEN_DEMO.array_test", - "AIRBYTE.BEN_DEMO.unknown_test", - } | ( - { - "AIRBYTE.BEN_DEMO.releases.assets", - "AIRBYTE.BEN_DEMO.releases.author", - "AIRBYTE.BEN_DEMO.tags.commit", - "AIRBYTE.BEN_DEMO.releases.foo", - "AIRBYTE.BEN_DEMO.array_test.author", - } - if use_normalization_tables - else set() - ) - - for key, metadata in assets_def.metadata_by_key.items(): - # Extract the table name from the asset key - table_name = ( - key.path[-1] - .replace("dagster_", "") - .replace("G_", "") - .replace("_test", "") - .split("_")[-1] - ) - assert metadata["dagster/table_name"] in table_names - assert table_name in metadata["dagster/table_name"] - - assert assets_def.keys == {AssetKey(t) for t in tables} - assert all( - [ - assets_def.group_names_by_key.get(AssetKey(t)) - == ( - connection_meta_to_group_fn( - AirbyteConnectionMetadata( - "GitHub <> snowflake-ben", - "", - use_normalization_tables, - [], - destination_data, - ) - ) - if connection_meta_to_group_fn - else ( - connection_to_group_fn("GitHub <> snowflake-ben") - if connection_to_group_fn - else "github_snowflake_ben" - ) - ) - for t in tables - ] - ) - assert len(assets_def.op.output_defs) == len(tables) - - responses.add( - method=responses.POST, - url=airbyte_instance.api_base_url + "/connections/get", - json=get_project_connection_json(), - status=200, - ) - responses.add( - method=responses.POST, - url=airbyte_instance.api_base_url + "/connections/sync", - json={"job": {"id": 1}}, - status=200, - ) - responses.add( - method=responses.POST, - url=airbyte_instance.api_base_url + "/jobs/get", - json=get_project_job_json(), - status=200, - ) - - res = materialize( - with_resources( - ab_assets, - resource_defs={ - "airbyte": airbyte_resource.configured( - { - "host": "some_host", - "port": "8000", - "poll_interval": 0, - } - ) - }, - ) - ) - - materializations = [ - event.event_specific_data.materialization - for event in res.events_for_node("airbyte_sync_87b7fe85_a22c_420e_8d74_b30e7ede77df") - if event.event_type_value == "ASSET_MATERIALIZATION" - ] - assert len(materializations) == len(tables) - assert {m.asset_key for m in materializations} == {AssetKey(t) for t in tables}