diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json index 0653bca3b7a19..9449264d25ac3 100644 --- a/docs/content/_navigation.json +++ b/docs/content/_navigation.json @@ -889,10 +889,6 @@ { "title": "Airbyte Cloud & Dagster", "path": "/integrations/airbyte-cloud" - }, - { - "title": "Ingestion as code", - "path": "/guides/dagster/airbyte-ingestion-as-code" } ] }, @@ -902,7 +898,7 @@ }, { "title": "Databricks", - "path": "/integrations/databricks" + "path": "/concepts/dagster-pipes/databricks" }, { "title": "dbt", @@ -1004,10 +1000,6 @@ "title": "Fivetran", "path": "/integrations/fivetran" }, - { - "title": "Great Expectations", - "path": "/integrations/great-expectations" - }, { "title": "Google BigQuery", "path": "/integrations/bigquery", diff --git a/docs/content/guides/dagster/airbyte-ingestion-as-code.mdx b/docs/content/guides/dagster/airbyte-ingestion-as-code.mdx deleted file mode 100644 index 7fbc55672a98a..0000000000000 --- a/docs/content/guides/dagster/airbyte-ingestion-as-code.mdx +++ /dev/null @@ -1,223 +0,0 @@ ---- -title: Airbyte ingestion as code | Dagster Docs ---- - -# Airbyte ingestion as code - - - This feature is experimental and deprecated{" "} - and will be removed with a future release. We suggest using the{" "} - - Airbyte terraform provider - {" "} - instead. - - -This guide provides an introduction to using Dagster to configure your [Airbyte](/integrations/airbyte) connections. This allows you to centralize the configuration for your data stack, specifying configuration in Python code. You can check-in and version your config with version control or programmatically generate config for more complex use cases. - ---- - -## Prerequisites - -To use this feature, you'll need to install the `dagster-airbyte` and `dagster-managed-elements` libraries: - -```bash -pip install dagster-airbyte dagster-managed-elements -``` - -The `dagster-managed-elements` library includes the base config reconciliation APIs and a CLI. - ---- - -## Step 1: Define a reconciler - -The config for your Airbyte instance is specified in an `AirbyteManagedElementReconciler`, which is pointed at a specific Airbyte instance using an Airbyte resource. The config is also provided with a list of connections to reconcile, which we'll set up later in the guide. - -```python startafter=start_define_reconciler endbefore=end_define_reconciler file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 -from dagster_airbyte import AirbyteManagedElementReconciler, airbyte_resource - -airbyte_instance = airbyte_resource.configured( - { - "host": "localhost", - "port": "8000", - # If using basic auth, include username and password: - "username": "airbyte", - "password": {"env": "AIRBYTE_PASSWORD"}, - } -) - -airbyte_reconciler = AirbyteManagedElementReconciler( - airbyte=airbyte_instance, - connections=[], -) -``` - -For more info on setting up an Airbyte resource, refer to [the Airbyte guide](/integrations/airbyte#step-1-connecting-to-airbyte). Additional configuration options for the reconciler are [detailed below](#additional-configuration-options). - -For more information on setting up secrets from the environment, refer to the [Environment variables and secrets guide](/guides/dagster/using-environment-variables-and-secrets). - ---- - -## Step 2: Define sources and destinations - -Next, we'll define our sources and destinations. Sources and destinations can be constructed manually using the `AirbyteSource` and `AirbyteDestination` classes, respectively, but `dagster-airbyte` also provides generated, typed classes for specific source and destination types. Refer to the [Airbyte API docs](/\_apidocs/libraries/dagster-airbyte#managed-config-generated-sources) for the properties required to configure each source and destination type. - -In this example, we'll configure a source that reads from a hosted CSV file and a destination that writes it to a local JSON file. To do this, we'll import the generated classes for the `File` source and `Local JSON` destination: - -```python startafter=start_define_sources endbefore=end_define_sources file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 -from dagster_airbyte.managed.generated.sources import FileSource -from dagster_airbyte.managed.generated.destinations import LocalJsonDestination - -cereals_csv_source = FileSource( - name="cereals-csv", - url="https://docs.dagster.io/assets/cereal.csv", - format="csv", - provider=FileSource.HTTPSPublicWeb(), - dataset_name="cereals", -) - -local_json_destination = LocalJsonDestination( - name="local-json", - destination_path="/local/cereals_out.json", -) -``` - ---- - -## Step 3: Define a connection - -Next, we'll define a connection between the source and destination using the [`AirbyteConnection`](/\_apidocs/libraries/dagster-airbyte#dagster_airbyte.AirbyteConnection) class: - -```python startafter=start_define_connection endbefore=end_define_connection file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 -from dagster_airbyte import AirbyteConnection, AirbyteSyncMode - -cereals_connection = AirbyteConnection( - name="download-cereals", - source=cereals_csv_source, - destination=local_json_destination, - stream_config={"cereals": AirbyteSyncMode.full_refresh_overwrite()}, -) -``` - -Then, we'll supply the new connection to the reconciler we defined in [Step 1](#step-1-define-a-reconciler): - -```python startafter=start_new_reconciler endbefore=end_new_reconciler file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 -airbyte_reconciler = AirbyteManagedElementReconciler( - airbyte=airbyte_instance, - connections=[cereals_connection], -) -``` - ---- - -## Step 4. Validate changes - -Next, we'll use the `dagster-airbyte` CLI to sanity-check our reconciler and apply any changes. - -The `check` command prints out differences between the current state of the Airbyte instance and the desired state specified in the reconciler. To invoke the CLI, point it at a module containing the reconciler: - -```bash -dagster-airbyte check --module my_python_module.my_submodule:reconciler - -Found 1 reconciler, checking... -+ cereals-csv: - + url: https://docs.dagster.io/assets/cereal.csv - + format: csv - + dataset_name: cereals - + provider: - + user_agent: False - + storage: HTTPS -+ local-json: - + destination_path: /local/cereals_out.json -+ download-cereals: - + source: cereals-csv - + destination: local-json - + normalize data: None - + streams: - + cereals: FULL_REFRESH_OVERWRITE -``` - ---- - -## Step 5. Apply changes - -As the changes printed out by the `check` command in the previous step look like what we expect, we can now apply them: - -```bash -dagster-airbyte apply --module my_python_module.my_submodule:reconciler -``` - -Now, we should see our new connection in the Airbyte UI: - - - ---- - -## Step 6. Load connections into Dagster - -To load managed connections into Dagster, use the `load_assets_from_connections` utility method. This functions similarly to [`load_assets_from_airbyte_instance`](/integrations/airbyte#loading-airbyte-asset-definitions-from-an-airbyte-instance), but validates that the connections passed in match the connections defined in your Airbyte instance: - -```python startafter=start_load_assets endbefore=end_load_assets file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 -from dagster_airbyte import load_assets_from_connections, airbyte_resource - -airbyte_instance = airbyte_resource.configured( - { - "host": "localhost", - "port": 8000, - # If using basic auth, include username and password: - "username": "airbyte", - "password": {"env": "AIRBYTE_PASSWORD"}, - } -) - -airbyte_assets = load_assets_from_connections( - airbyte=airbyte_instance, connections=[cereals_connection] -) -``` - -For more info on managing Airbyte assets in Dagster, refer to the [Airbyte guide](/integrations/airbyte). - ---- - -## Additional configuration options - -The Airbyte reconciler also supports some additional configuration options, which can be passed to the `AirbyteManagedElementReconciler` constructor. - -By default, the reconciler will not modify any sources, destinations, or connections which are outside of those specified in the reconciler. This allows you to adopt the reconciler incrementally, without having to reconcile all of your existing Airbyte configuration. - -To reconcile all of your existing Airbyte configuration, pass `delete_unmentioned_resources=True` to the reconciler constructor: - -```python startafter=start_new_reconciler_delete endbefore=end_new_reconciler_delete file=/guides/dagster/ingestion_as_code/airbyte.py dedent=4 -airbyte_reconciler = AirbyteManagedElementReconciler( - airbyte=airbyte_instance, connections=[...], delete_unmentioned_resources=True -) -``` - -This tells the reconciler to clean up any sources, destinations, or connections which are not explicitly defined in Python code. - ---- - -## Related - - - - - - diff --git a/docs/content/integrations.mdx b/docs/content/integrations.mdx index d0b8cf1dcf639..122dd9934f6d3 100644 --- a/docs/content/integrations.mdx +++ b/docs/content/integrations.mdx @@ -11,11 +11,47 @@ Using our integration guides and libraries, you can extend Dagster to interopera ## Guides +Explore guides for integrations with external services. + + + Looking for deployment options? + + + + + + + + - - - - - - - - - - - - + + + + + diff --git a/docs/content/integrations/databricks.mdx b/docs/content/integrations/databricks.mdx deleted file mode 100644 index bea80caeff022..0000000000000 --- a/docs/content/integrations/databricks.mdx +++ /dev/null @@ -1,162 +0,0 @@ ---- -title: "Databricks & Dagster | Dagster Docs" -description: Dagster can orchestrate Databricks jobs alongside other technologies. ---- - -# Databricks & Dagster - -Dagster can orchestrate your Databricks jobs and other Databricks API calls, making it easy to chain together multiple Databricks jobs and orchestrate Databricks alongside your other technologies. - ---- - -## Prerequisites - -To get started, you will need to install the `dagster` and `dagster-databricks` Python packages: - -```bash -pip install dagster dagster-databricks -``` - -You'll also want to have a Databricks workspace with an existing project that is deployed with a Databricks job. If you don't have this, [follow the Databricks quickstart](https://docs.databricks.com/workflows/jobs/jobs-quickstart.html) to set one up. - -To manage your Databricks job from Dagster, you'll need three values, which can be set as [environment variables in Dagster](/guides/dagster/using-environment-variables-and-secrets): - -1. A `host` for connecting with your Databricks workspace, starting with `https://`, stored in an environment variable `DATABRICKS_HOST`, -2. A `token` corresponding to a personal access token for your Databricks workspace, stored in an environment variable `DATABRICKS_TOKEN`, and -3. A `DATABRICKS_JOB_ID` for the Databricks job you want to run. - -You can follow the [Databricks API authentication instructions](https://docs.databricks.com/dev-tools/python-api.html#step-1-set-up-authentication) to retrieve these values. - ---- - -## Step 1: Connecting to Databricks - -The first step in using Databricks with Dagster is to tell Dagster how to connect to your Databricks workspace using a Databricks [resource](/concepts/resources). This resource contains information on the location of your Databricks workspace and any credentials sourced from environment variables that are required to access it. You can access the underlying [Databricks API client](https://docs.databricks.com/dev-tools/python-api.html) to communicate to your Databricks workspace by configuring the resource. - -For more information about the Databricks resource, see the [API reference](/\_apidocs/libraries/dagster-databricks). - -```python startafter=start_define_databricks_client_instance endbefore=end_define_databricks_client_instance file=/integrations/databricks/databricks.py dedent=4 -from dagster_databricks import databricks_client - -databricks_client_instance = databricks_client.configured( - { - "host": {"env": "DATABRICKS_HOST"}, - "token": {"env": "DATABRICKS_TOKEN"}, - } -) -``` - ---- - -## Step 2: Create an op/asset that connects to Databricks - -In this step, we'll demonstrate several ways to model a Databricks API call as either a Dagster [op](/concepts/ops-jobs-graphs/ops) or the computation backing an [asset definition](/concepts/assets/software-defined-assets). You can either: - -- Use the `dagster-databricks` op factories, which create ops that invoke the Databricks Jobs' [Run Now](https://docs.databricks.com/api/workspace/jobs/runnow) ([`create_databricks_run_now_op`](/\_apidocs/libraries/dagster-databricks)) or [Submit Run](https://docs.databricks.com/api/workspace/jobs/submit) ([`create_databricks_submit_run_op`](/\_apidocs/libraries/dagster-databricks)) APIs, or -- Manually create a Dagster op or asset that connects to Databricks using the configured Databricks resource. - -Afterward, we create a Dagster [job](/concepts/ops-jobs-graphs/jobs) that invokes the op or selects the asset to run the Databricks API call. - -For guidance on deciding whether to use an op or asset, refer to the [Understanding how assets relate to ops guide](/guides/dagster/how-assets-relate-to-ops-and-graphs). - - - - - -```python startafter=start_define_databricks_op_factories endbefore=end_define_databricks_op_factories file=/integrations/databricks/databricks.py dedent=4 -from dagster_databricks import create_databricks_run_now_op - -my_databricks_run_now_op = create_databricks_run_now_op( - databricks_job_id=DATABRICKS_JOB_ID, -) - -@job(resource_defs={"databricks": databricks_client_instance}) -def my_databricks_job(): - my_databricks_run_now_op() -``` - - - - - -```python startafter=start_define_databricks_custom_op endbefore=end_define_databricks_custom_op file=/integrations/databricks/databricks.py dedent=4 -from databricks_cli.sdk import DbfsService - -from dagster import AssetExecutionContext, job, op - -@op(required_resource_keys={"databricks"}) -def my_databricks_op(context: AssetExecutionContext) -> None: - databricks_api_client = context.resources.databricks.api_client - dbfs_service = DbfsService(databricks_api_client) - - dbfs_service.read(path="/tmp/HelloWorld.txt") - -@job(resource_defs={"databricks": databricks_client_instance}) -def my_databricks_job(): - my_databricks_op() -``` - - - - - -```python startafter=start_define_databricks_custom_asset endbefore=end_define_databricks_custom_asset file=/integrations/databricks/databricks.py dedent=4 -from databricks_cli.sdk import JobsService - -from dagster import AssetExecutionContext, AssetSelection, asset, define_asset_job - -@asset(required_resource_keys={"databricks"}) -def my_databricks_table(context: AssetExecutionContext) -> None: - databricks_api_client = context.resources.databricks.api_client - jobs_service = JobsService(databricks_api_client) - - jobs_service.run_now(job_id=DATABRICKS_JOB_ID) - -materialize_databricks_table = define_asset_job( - name="materialize_databricks_table", - selection=AssetSelection.assets(my_databricks_table), -) -``` - - - - - ---- - -## Step 3: Schedule your Databricks computation - -Now that your Databricks API calls are modeled within Dagster, you can [schedule](/concepts/automation/schedules) them to run regularly. - -In the example below, we schedule the `materialize_databricks_table` and `my_databricks_job` jobs to run daily: - -```python startafter=start_schedule_databricks endbefore=end_schedule_databricks file=/integrations/databricks/databricks.py dedent=4 -from dagster import AssetSelection, Definitions, ScheduleDefinition - -defs = Definitions( - assets=[my_databricks_table], - schedules=[ - ScheduleDefinition( - job=materialize_databricks_table, - cron_schedule="@daily", - ), - ScheduleDefinition( - job=my_databricks_job, - cron_schedule="@daily", - ), - ], - jobs=[my_databricks_job], - resources={"databricks": databricks_client_instance}, -) -``` - ---- - -## What's next? - -By now, you should have a working Databricks and Dagster integration! - -What's next? From here, you can: - -- Learn more about [asset definitions](/concepts/assets/software-defined-assets) -- Check out the [`dagster-databricks` API docs](/\_apidocs/libraries/dagster-databricks) diff --git a/docs/content/integrations/great-expectations.mdx b/docs/content/integrations/great-expectations.mdx deleted file mode 100644 index e7a97e8cbf708..0000000000000 --- a/docs/content/integrations/great-expectations.mdx +++ /dev/null @@ -1,44 +0,0 @@ ---- -title: "Great Expectations & Dagster | Dagster Docs" ---- - -# Great Expectations & Dagster - - - -This example demonstrates how to use the GE op factory [`dagster-ge`](/\_apidocs/libraries/dagster-ge) to test incoming data against a set of expectations built through [Great Expectations](https://docs.greatexpectations.io/en/latest/)' tooling. - -For this example, we'll be using two versions of a dataset of baseball team payroll and wins, with one version modified to hold incorrect data. - -You can use to generate Dagster ops that integrate with Great Expectations. For example, here we show a basic call to this GE op factory, with two required arguments: `datasource_name` and expectation `suite_name`. - -```python file=../../with_great_expectations/with_great_expectations/ge_demo.py startafter=start_ge_demo_marker_factory endbefore=end_ge_demo_marker_factory -payroll_expectations = ge_validation_op_factory( - name="ge_validation_op", datasource_name="getest", suite_name="basic.warning" -) -``` - -The GE validations will happen inside the ops created above. Each of the ops will yield an with a structured dict of metadata from the GE suite. The structured metadata contain both summary stats from the suite and expectation by expectation results. The op will output the full result in case you want to process it differently. Here's how other ops could use the full result, where `expectation` is the result: - -```python file=../../with_great_expectations/with_great_expectations/ge_demo.py startafter=start_ge_demo_marker_op endbefore=end_ge_demo_marker_op -@op -def postprocess_payroll(numrows, expectation): - if expectation["success"]: - return numrows - else: - raise ValueError -``` - -You can configure the GE Data Context via the `ge_data_context` resource from `dagster-ge` integration package. All we need to do to expose GE to Dagster is to provide the root of the GE directory (the path to the great_expectations file on your machine). - -Finally, here's the full job definition using the GE op, with a default run configuration to use the correct set of data: - -```python file=../../with_great_expectations/with_great_expectations/ge_demo.py startafter=start_ge_demo_marker_job endbefore=end_ge_demo_marker_job -@job -def payroll_data(): - output_df = read_in_datafile() - - postprocess_payroll(process_payroll(output_df), payroll_expectations(output_df)) -``` - -We can see that we can easily swap the path for `succeed.csv` with `fail.csv` to exercise our job with incorrect data. diff --git a/docs/next/util/redirectUrls.json b/docs/next/util/redirectUrls.json index cc165ba7e036d..7bc9309b657cb 100644 --- a/docs/next/util/redirectUrls.json +++ b/docs/next/util/redirectUrls.json @@ -695,24 +695,24 @@ "statusCode": 302 }, { - "source": "/dagster-cloud", - "destination": "/dagster-plus", - "statusCode": 302 + "source": "/dagster-cloud", + "destination": "/dagster-plus", + "statusCode": 302 }, { - "source": "/dagster-cloud/managing-deployments/setting-up-alerts", - "destination": "/dagster-plus/managing-deployments/alerts", - "statusCode": 302 + "source": "/dagster-cloud/managing-deployments/setting-up-alerts", + "destination": "/dagster-plus/managing-deployments/alerts", + "statusCode": 302 }, { - "source": "/dagster-cloud/managing-deployments/dagster-cloud-cli", - "destination": "/dagster-plus/managing-deployments/dagster-plus-cli", - "statusCode": 302 + "source": "/dagster-cloud/managing-deployments/dagster-cloud-cli", + "destination": "/dagster-plus/managing-deployments/dagster-plus-cli", + "statusCode": 302 }, { - "source": "/dagster-cloud/:slug*", - "destination": "/dagster-plus/:slug*", - "statusCode": 302 + "source": "/dagster-cloud/:slug*", + "destination": "/dagster-plus/:slug*", + "statusCode": 302 }, { "source": "/guides/dagster-pipes/:slug*", @@ -743,5 +743,20 @@ "source": "/_apidocs/libraries/dagster-airflow", "destination": "/integrations/airflow", "statusCode": 302 + }, + { + "source": "/integrations/great-expectations", + "destination": "https://dagster.io/blog/ensuring-data-quality-with-dagster-and-great-expectations", + "statusCode": 302 + }, + { + "source": "/guides/dagster/airbyte-ingestion-as-code", + "destination": "/integrations/airbyte", + "statusCode": 302 + }, + { + "source": "/integrations/databricks", + "destination": "/concepts/dagster-pipes/databricks", + "statusCode": 302 } ]