diff --git a/docs/content/_navigation.json b/docs/content/_navigation.json
index 899bb2d4f4c25..47c44aa9dcae7 100644
--- a/docs/content/_navigation.json
+++ b/docs/content/_navigation.json
@@ -374,6 +374,10 @@
}
]
},
+ {
+ "title": "Dagster Pipes + AWS ECS",
+ "path": "/concepts/dagster-pipes/aws-ecs"
+ },
{
"title": "Dagster Pipes + AWS Glue",
"path": "/concepts/dagster-pipes/aws-glue"
diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz
index e0a38edcd36e5..65af61ebb3024 100644
Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ
diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz
index 6e05a2c4a4ba1..e08650b0d922f 100644
Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ
diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz
index 5f130c0849c26..37e2629d6bdd3 100644
Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ
diff --git a/docs/content/concepts.mdx b/docs/content/concepts.mdx
index dbbf94a6129f8..ec9302948e48a 100644
--- a/docs/content/concepts.mdx
+++ b/docs/content/concepts.mdx
@@ -224,6 +224,10 @@ Dagster Pipes is a toolkit for building integrations between Dagster and externa
title="Dagster Pipes tutorial"
href="/concepts/dagster-pipes/subprocess"
>
+
resource which can be used to launch AWS ECS tasks from Dagster assets and ops. Dagster can receive regular events like logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes on the task side.
+
+---
+
+## Prerequisites
+
+- **In the orchestration environment**, you'll need to:
+
+ - Install the following packages:
+
+ ```shell
+ pip install dagster dagster-webserver dagster-aws
+ ```
+
+ Refer to the [Dagster installation guide](/getting-started/install) for more info.
+
+ - **An existing boto3 client that can authenticate to AWS.** If you don't have this set up already, refer to the [boto3 quickstart](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html).
+
+- **In AWS**:
+
+ - An existing AWS account
+ - An AWS ECS task. To receive logs and events from a task container, it must have `"logDriver"` set to `"awslogs"` in `"logConfiguration"`.
+
+`dagster-pipes` can be called from multiple ECS containers at the same time. For the sake of this tutorial, we will be referring to only one container.
+
+---
+
+## Step 1: Provide the dagster-pipes module
+
+Install `dagster-pipes` module in the image used for your ECS task. For example:
+
+```Dockerfile
+FROM python:3.11-slim
+
+RUN python -m pip install dagster-pipes
+
+# copy the task script
+COPY . .
+```
+
+---
+
+## Step 2: Add dagster-pipes to the ECS task script
+
+Call `open_dagster_pipes` in the ECS task script to create a context that can be used to send messages to Dagster:
+
+```python file=/guides/dagster/dagster_pipes/ecs/task.py
+import boto3
+from dagster_pipes import (
+ PipesEnvVarParamsLoader,
+ PipesS3ContextLoader,
+ open_dagster_pipes,
+)
+
+client = boto3.client("s3")
+
+
+def main():
+ with open_dagster_pipes() as pipes:
+ pipes.log.info("Hello from AWS ECS task!")
+ pipes.report_asset_materialization(
+ metadata={"some_metric": {"raw_value": 0, "type": "int"}},
+ data_version="alpha",
+ )
+
+
+if __name__ == "__main__":
+ main()
+```
+
+---
+
+## Step 3: Add the PipesECSClient to Dagster code
+
+In the Dagster asset/op code, use the `PipesECSClient` resource to launch the job:
+
+```python file=/guides/dagster/dagster_pipes/ecs/dagster_code.py startafter=start_asset_marker endbefore=end_asset_marker
+import os
+
+# dagster_glue_pipes.py
+import boto3
+from dagster_aws.pipes import PipesECSClient
+from docutils.nodes import entry
+
+from dagster import AssetExecutionContext, asset
+
+
+@asset
+def ecs_pipes_asset(context: AssetExecutionContext, pipes_ecs_client: PipesECSClient):
+ return pipes_ecs_client.run(
+ context=context,
+ taskDefinition="my-task",
+ count=1,
+ ).get_materialize_result()
+```
+
+This will launch the AWS ECS task and wait until it reaches `"STOPPED"` status. If any of the tasks's containers fail, the Dagster process will raise an exception. If the Dagster process is interrupted while the task is still running, the task will be terminated.
+
+---
+
+## Step 4: Create Dagster definitions
+
+Next, add the `PipesECSClient` resource to your project's object:
+
+```python file=/guides/dagster/dagster_pipes/ecs/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
+from dagster import Definitions # noqa
+from dagster_aws.pipes import PipesS3MessageReader
+
+
+defs = Definitions(
+ assets=[ecs_pipes_asset],
+ resources={"pipes_ecs_client": PipesECSClient()},
+)
+```
+
+Dagster will now be able to launch the AWS ECS task from the `ecs_pipes_asset` asset, and receive logs and events from the task. If using the default `message_reader` `PipesCloudwatchLogReader`, logs will be read from the Cloudwatch log group specified in the container `"logConfiguration"` field definition. Logs from all containers in the task will be read.
+
+---
+
+## Related
+
+
+
+
+
diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv
index b52155dd004d7..560545610de19 100644
Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ
diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
index 078144abceb26..c08acda013753 100644
--- a/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
+++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
@@ -120,6 +120,8 @@ Clients
.. autoclass:: dagster_aws.pipes.PipesGlueClient
+.. autoclass:: dagster_aws.pipes.PipesECSClient
+
Legacy
--------
diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dagster_code.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dagster_code.py
new file mode 100644
index 0000000000000..6140aaa861064
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dagster_code.py
@@ -0,0 +1,34 @@
+# start_asset_marker
+import os
+
+# dagster_glue_pipes.py
+import boto3
+from dagster_aws.pipes import PipesECSClient
+from docutils.nodes import entry
+
+from dagster import AssetExecutionContext, asset
+
+
+@asset
+def ecs_pipes_asset(context: AssetExecutionContext, pipes_ecs_client: PipesECSClient):
+ return pipes_ecs_client.run(
+ context=context,
+ taskDefinition="my-task",
+ count=1,
+ ).get_materialize_result()
+
+
+# end_asset_marker
+
+# start_definitions_marker
+
+from dagster import Definitions # noqa
+from dagster_aws.pipes import PipesS3MessageReader
+
+
+defs = Definitions(
+ assets=[ecs_pipes_asset],
+ resources={"pipes_ecs_client": PipesECSClient()},
+)
+
+# end_definitions_marker
diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dev.Dockerfile b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dev.Dockerfile
new file mode 100644
index 0000000000000..a83e1f046db54
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/dev.Dockerfile
@@ -0,0 +1,12 @@
+# this Dockerfile can be used for ECS Pipes development
+
+FROM python:3.11-slim
+
+RUN --mount=type=cache,target=/root/.cache/pip python -m pip install boto3
+
+COPY python_modules/dagster-pipes /src/dagster-pipes
+
+RUN pip install -e /src/dagster-pipes
+
+WORKDIR /app
+COPY examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py .
diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py
new file mode 100644
index 0000000000000..bea2c0314e6c3
--- /dev/null
+++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/ecs/task.py
@@ -0,0 +1,18 @@
+from dagster_pipes import (
+ PipesEnvVarParamsLoader,
+ PipesS3ContextLoader,
+ open_dagster_pipes,
+)
+
+
+def main():
+ with open_dagster_pipes() as pipes:
+ pipes.log.info("Hello from AWS ECS task!")
+ pipes.report_asset_materialization(
+ metadata={"some_metric": {"raw_value": 0, "type": "int"}},
+ data_version="alpha",
+ )
+
+
+if __name__ == "__main__":
+ main()
diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/ecs.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/ecs.py
index dbed30bec2001..598c955c8707c 100644
--- a/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/ecs.py
+++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes/clients/ecs.py
@@ -5,7 +5,7 @@
import botocore
import dagster._check as check
from dagster import PipesClient
-from dagster._annotations import experimental
+from dagster._annotations import experimental, public
from dagster._core.definitions.resource_annotation import TreatAsResourceParam
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._core.execution.context.compute import OpExecutionContext
@@ -71,6 +71,7 @@ def __init__(
def _is_dagster_maintained(cls) -> bool:
return True
+ @public
def run(
self,
*,
@@ -80,7 +81,12 @@ def run(
) -> PipesClientCompletedInvocation:
"""Start a ECS task, enriched with the pipes protocol.
- See also: `AWS API Documentation `_
+ Args:
+ context (OpExecutionContext): The Dagster op context.
+ extras (Optional[Dict[str, Any]]): Additional information to pass to the external process.
+ **params: Keyword arguments to pass to the `boto3` ECS client.
+ See `boto3 API Documentation `_
+ for more info.
Returns:
PipesClientCompletedInvocation: Wrapper containing results reported by the external