Skip to content

Commit

Permalink
[docathon] External assets guide (#23893)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Docathon guide for external assets

## How I Tested These Changes

## Changelog [New | Bug | Docs]

NOCHANGELOG

---------

Co-authored-by: Pedram Navid <[email protected]>
  • Loading branch information
petehunt and PedramNavid authored Aug 26, 2024
1 parent b2eb696 commit 6e6b506
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 34 deletions.
1 change: 1 addition & 0 deletions .github/styles
9 changes: 6 additions & 3 deletions .github/workflows/vale.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ name: Vale Docs
on:
pull_request:
paths:
- 'docs/**'
- 'docs/docs-beta/**'
- .github/workflows/vale.yml
push:
branches:
- master
- docs-prod
paths:
- 'docs/**'
- 'docs/docs-beta/**'
- .github/workflows/vale.yml

concurrency:
Expand All @@ -29,6 +29,9 @@ jobs:
env:
REVIEWDOG_GITHUB_API_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
files: '["docs-beta/docs"]'
files: '["docs/docs-beta/docs"]'
vale_flags: "--config=docs/.vale.ini"
fail_on_error: true
reporter: github-pr-review


101 changes: 99 additions & 2 deletions docs/docs-beta/docs/guides/data-modeling/external-assets.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,102 @@
---
title: Representing external data sources with external assets
sidebar_position: 80
sidebar_label: "Representing external data sources"
---
sidebar_label: 'Representing external data sources'
---

One of Dagster's goals is to present a single unified lineage of all of the data assets in an organization. This can include assets orchestrated by Dagster and assets orchestrated by other systems.

**External assets** enable you to model assets orchestrated by other systems natively within Dagster's Asset catalog, and create new data assets downstream of these external assets.

External assets differ from native Dagster assets in that Dagster can't materialize them directly or put them on a schedule. Instead, an external system must inform Dagster of when an external asset is updated.

Examples of external assets could be files in a data lake that are populated by a bespoke internal tool, a CSV file delivered daily by SFTP from a partner, or a table in a data warehouse populated by another orchestrator.

## What you'll learn

- How to create external assets
- How to create assets that depend on external assets
- How to record materializations and metadata
- How to model a DAG of multiple external assets

---

<details>
<summary>Prerequisites</summary>

To follow the steps in this guide, you'll need:

- A basic understanding of Dagster and assets. See the [Quick Start](/tutorial/quick-start) tutorial for an overview.
- Familiarity with [Sensors](/guides/automation/sensors)
</details>

---

## Creating and depending on external assets

Let's imagine that we have a partner that sends us some raw transaction data by SFTP on, roughly, a daily basis, that's later cleaned and stored in an internal data lake. Because the raw transaction data isn't materialized by Dagster, it makes sense to model it as an external asset.

<CodeExample filePath="guides/data-modeling/external-assets/creating-external-assets.py" language="python" title="Creating an external asset" />

See the [AssetSpec API docs](/todo) for all the potential parameters you can provide to an external asset.

## Recording materializations and metadata

In the preceding example, we modeled the external asset in the asset graph. We also need to inform Dagster whenever an external asset is updated, and include any relevant metadata about the asset.

There are two main ways to do this: "pulling" external assets events with sensors, and "pushing" external asset events using the REST API.

### "Pulling" with sensors

You can use a Dagster [sensor](/guides/automation/sensors) to regularly poll the external system and "pull" information about the external asset into Dagster.

For example, here's how you would poll an external system (like an SFTP server) to update an external asset whenever the file is changed.

<CodeExample filePath="guides/data-modeling/external-assets/pulling-with-sensors.py" language="python" title="Pulling external asset events with sensors" />

See the [sensors guide](/guides/automation/sensors) for more information about sensors.

### "Pushing" with the REST API

You can inform Dagster that an external asset has materialized by "pushing" the event from an external system to the REST API.

For example, here's how we would inform Dagster of a materialization of the `raw_transactions` external asset in Dagster+:

```shell
curl \
-X POST \
-H 'Content-Type: application/json' \
-H 'Dagster-Cloud-Api-Token: [YOUR API TOKEN]' \
'https://[YOUR ORG NAME].dagster.cloud/[YOUR DEPLOYMENT NAME]/report_asset_materialization/' \
-d '
{
"asset_key": "raw_transactions",
"metadata": {
"file_last_modified_at_ms": 1724614700266
}
}'
```

If you're using open source, you don't need the authentication headers and should point it at your open source URL (in this example, `http://localhost:3000`):

```shell
curl \
-X POST \
-H 'Content-Type: application/json' \
'http://localhost:3000/report_asset_materialization/' \
-d '
{
"asset_key": "raw_transactions",
"metadata": {
"file_last_modified_at_ms": 1724614700266
}
}'
```

See the [external assets REST API docs](/todo) for more information.

## Modeling a DAG of external assets

Like regular Dagster assets, external assets can have dependencies. This is useful when you want to model an entire data pipeline orchestrated by another system.

<CodeExample filePath="guides/data-modeling/external-assets/dag-of-external-assets.py" language="python" title="External assets with dependencies" />
10 changes: 0 additions & 10 deletions docs/vale/styles/Dagster/headings-gerund.yml

This file was deleted.

7 changes: 0 additions & 7 deletions docs/vale/styles/Google/Periods.yml

This file was deleted.

10 changes: 0 additions & 10 deletions docs/vale/styles/Google/Spacing.yml

This file was deleted.

4 changes: 3 additions & 1 deletion docs/vale/styles/Terms/dagster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ swap:
dagster plus: Dagster+
'[Dd]agit': "Dagster UI"
'[Ss]oftware-defined [Aa]sset': &asset-def asset definition|asset
'[Ss][Dd][Aa]': *asset-def
'[Ss][Dd][Aa]': *asset-def
'[Rr][Ee][Ss][Tt]\s*[Aa][Pp][Ii]': REST API

7 changes: 6 additions & 1 deletion docs/vale/styles/config/vocabularies/Dagster/accept.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
[Cc]onfig
\bDagster\b
DAG
DataFrame
Declarative Automation
dagster-.*
gRPC
[Mm]aterializations
[Mm]emoization
REST API
[Ss]ubprocess
Serverless

Expand All @@ -31,4 +33,7 @@ PingOne
Postgres
S3
Snowflake
Twilio
Twilio

We have
we have
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import dagster as dg

# We define a new external asset with the key "raw_transactions".
# This will appear in the Dagster asset catalog, but cannot
# be materialized by Dagster itself.
raw_transactions = dg.AssetSpec("raw_transactions")


# This asset is materialized by Dagster and depends on the
# external asset.
@dg.asset(deps=[raw_transactions])
def cleaned_transactions(): ...


defs = dg.Definitions(assets=[raw_transactions, cleaned_transactions])
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import dagster as dg

# Three external assets that depend on each other
raw_data = dg.AssetSpec("raw_data")
stg_data = dg.AssetSpec("stg_data", deps=[raw_data])
cleaned_data = dg.AssetSpec("cleaned_data", deps=[stg_data])


@dg.asset(deps=[cleaned_data])
def derived_data(): ...


defs = dg.Definitions(assets=[raw_data, stg_data, cleaned_data, derived_data])
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import dagster as dg

raw_transactions = dg.AssetSpec("raw_transactions")


@dg.sensor(minimum_interval_seconds=30)
def raw_transactions_sensor(
context: dg.SensorEvaluationContext,
) -> dg.SensorResult:
# This sensor polls the external system every 30 seconds
# for the last time the file was modified.
file_last_modified_at_ms = ...

# We can use the cursor to store the last time the sensor updated the asset
if context.cursor is not None:
external_asset_last_updated_at_ms = float(context.cursor)
else:
external_asset_last_updated_at_ms = 0

if file_last_modified_at_ms > external_asset_last_updated_at_ms:
# The external asset has been modified since we last updated it,
# so record a materialization and update the cursor.
return dg.SensorResult(
asset_events=[
dg.AssetMaterialization(
asset_key=raw_transactions.key,
# You can optionally attach metadata
metadata={"file_last_modified_at_ms": file_last_modified_at_ms},
)
],
cursor=str(file_last_modified_at_ms),
)
else:
# Nothing has happened since the last time we checked
return dg.SensorResult()


defs = dg.Definitions(assets=[raw_transactions], sensors=[raw_transactions_sensor])

2 comments on commit 6e6b506

@github-actions
Copy link

@github-actions github-actions bot commented on 6e6b506 Aug 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs-beta ready!

✅ Preview
https://dagster-docs-beta-3ctg79hcl-elementl.vercel.app
https://dagster-docs-beta.dagster-docs.io

Built with commit 6e6b506.
This pull request is being automatically deployed with vercel-action

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-coo3iphx0-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit 6e6b506.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.