Skip to content

Commit

Permalink
[dagster-aws] [docs] add documentation for ECS Pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Aug 21, 2024
1 parent 2914e92 commit a74b96e
Show file tree
Hide file tree
Showing 12 changed files with 222 additions and 2 deletions.
4 changes: 4 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,10 @@
}
]
},
{
"title": "Dagster Pipes + AWS ECS",
"path": "/concepts/dagster-pipes/aws-ecs"
},
{
"title": "Dagster Pipes + AWS Glue",
"path": "/concepts/dagster-pipes/aws-glue"
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
4 changes: 4 additions & 0 deletions docs/content/concepts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ Dagster Pipes is a toolkit for building integrations between Dagster and externa
title="Dagster Pipes tutorial"
href="/concepts/dagster-pipes/subprocess"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS ECS"
href="/concepts/dagster-pipes/ecs"
></ArticleListItem>
<ArticleListItem
title="Dagster Pipes + AWS Glue"
href="/concepts/dagster-pipes/aws-glue"
Expand Down
140 changes: 140 additions & 0 deletions docs/content/concepts/dagster-pipes/aws-ecs.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
---
title: "Integrating AWS ECS with Dagster Pipes | Dagster Docs"
description: "Learn to integrate Dagster Pipes with AWS ECS to launch external code from Dagster assets."
---

# AWS ECS & Dagster Pipes

This tutorial gives a short overview on how to use [Dagster Pipes](/concepts/dagster-pipes) with [AWS ECS](https://aws.amazon.com/ecs/).

The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesECSClient" module="dagster_aws.pipes" /> resource which can be used to launch AWS ECS tasks from Dagster assets and ops. Dagster can receive regular events like logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes on the task side.

---

## Prerequisites

- **In the orchestration environment**, you'll need to:

- Install the following packages:

```shell
pip install dagster dagster-webserver dagster-aws
```

Refer to the [Dagster installation guide](/getting-started/install) for more info.

- **An existing boto3 client that can authenticate to AWS.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html).

- **In AWS**:

- An existing AWS account
- An AWS ECS task. To receive logs and events from a task container, it must have `"logDriver"` set to `"awslogs"` in `"logConfiguration"`.

`dagster-pipes` can be called from multiple ECS containers at the same time. For the sake of this tutorial, we will be referring to only one container.

---

## Step 1: Provide the dagster-pipes module

Install `dagster-pipes` module in the image used for your ECS task. For example:

```Dockerfile
FROM python:3.11-slim

RUN python -m pip install dagster-pipes

# copy the task script
COPY . .
```

---

## Step 2: Add dagster-pipes to the ECS task script

Call `open_dagster_pipes` in the ECS task script to create a context that can be used to send messages to Dagster:

```python file=/guides/dagster/dagster_pipes/ecs/task.py
import boto3
from dagster_pipes import (
PipesEnvVarParamsLoader,
PipesS3ContextLoader,
open_dagster_pipes,
)

client = boto3.client("s3")


def main():
with open_dagster_pipes() as pipes:
pipes.log.info("Hello from AWS ECS task!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)


if __name__ == "__main__":
main()
```

---

## Step 3: Add the PipesECSClient to Dagster code

In the Dagster asset/op code, use the `PipesECSClient` resource to launch the job:

```python file=/guides/dagster/dagster_pipes/ecs/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker
import os

# dagster_glue_pipes.py
import boto3
from dagster_aws.pipes import PipesECSClient
from docutils.nodes import entry

from dagster import AssetExecutionContext, asset


@asset
def ecs_pipes_asset(context: AssetExecutionContext, pipes_ecs_client: PipesECSClient):
return pipes_ecs_client.run(
context=context,
taskDefinition="my-task",
count=1,
).get_materialize_result()
```

This will launch the AWS ECS task and wait until it reaches `"STOPPED"` status. If any of the tasks's containers fail, the Dagster process will raise an exception. If the Dagster process is interrupted while the task is still running, the task will be terminated.

---

## Step 4: Create Dagster definitions

Next, add the `PipesECSClient` resource to your project's <PyObject object="Definitions" /> object:

```python file=/guides/dagster/dagster_pipes/ecs/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
from dagster import Definitions # noqa
from dagster_aws.pipes import PipesS3MessageReader


defs = Definitions(
assets=[ecs_pipes_asset],
resources={"pipes_ecs_client": PipesECSClient()},
)
```

Dagster will now be able to launch the AWS ECS task from the `ecs_pipes_asset` asset, and receive logs and events from the task. If using the default `message_reader` `PipesCloudwatchLogReader`, logs will be read from the Cloudwatch log group specified in the container `"logConfiguration"` field definition. Logs from all containers in the task will be read.

---

## Related

<ArticleList>
<ArticleListItem
title="Dagster Pipes"
href="/concepts/dagster-pipes"
></ArticleListItem>
<ArticleListItem
title="AWS ECS Pipes API reference"
href="/_apidocs/libraries/dagster-aws#dagster_aws.pipes.PipesECSClient"
></ArticleListItem>
</ArticleList>
Binary file modified docs/next/public/objects.inv
Binary file not shown.
2 changes: 2 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ Clients

.. autoclass:: dagster_aws.pipes.PipesGlueClient

.. autoclass:: dagster_aws.pipes.PipesECSClient

Legacy
--------

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# start_asset_marker
import os

# dagster_glue_pipes.py
import boto3
from dagster_aws.pipes import PipesECSClient
from docutils.nodes import entry

from dagster import AssetExecutionContext, asset


@asset
def ecs_pipes_asset(context: AssetExecutionContext, pipes_ecs_client: PipesECSClient):
return pipes_ecs_client.run(
context=context,
taskDefinition="my-task",
count=1,
).get_materialize_result()


# end_asset_marker

# start_definitions_marker

from dagster import Definitions # noqa
from dagster_aws.pipes import PipesS3MessageReader


defs = Definitions(
assets=[ecs_pipes_asset],
resources={"pipes_ecs_client": PipesECSClient()},
)

# end_definitions_marker
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# this Dockerfile can be used for ECS Pipes development

FROM python:3.11-slim

RUN --mount=type=cache,target=/root/.cache/pip python -m pip install boto3

COPY python_modules/dagster-pipes /src/dagster-pipes

RUN pip install -e /src/dagster-pipes

WORKDIR /app
COPY examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py .
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dagster_pipes import (
PipesEnvVarParamsLoader,
PipesS3ContextLoader,
open_dagster_pipes,
)


def main():
with open_dagster_pipes() as pipes:
pipes.log.info("Hello from AWS ECS task!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)


if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import botocore
import dagster._check as check
from dagster import PipesClient
from dagster._annotations import experimental
from dagster._annotations import experimental, public
from dagster._core.definitions.resource_annotation import TreatAsResourceParam
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._core.execution.context.compute import OpExecutionContext
Expand Down Expand Up @@ -71,6 +71,7 @@ def __init__(
def _is_dagster_maintained(cls) -> bool:
return True

@public
def run(
self,
*,
Expand All @@ -80,7 +81,12 @@ def run(
) -> PipesClientCompletedInvocation:
"""Start a ECS task, enriched with the pipes protocol.
See also: `AWS API Documentation <https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_RunTask.html>`_
Args:
context (OpExecutionContext): The Dagster op context.
extras (Optional[Dict[str, Any]]): Additional information to pass to the external process.
**params: Keyword arguments to pass to the `boto3` ECS client.
See `boto3 API Documentation <https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/ecs/client/run_task.html>`_
for more info.
Returns:
PipesClientCompletedInvocation: Wrapper containing results reported by the external
Expand Down

0 comments on commit a74b96e

Please sign in to comment.