Skip to content

Commit

Permalink
Apply suggestions from code review
Browse files Browse the repository at this point in the history
Co-authored-by: Erin Cochran <[email protected]>
  • Loading branch information
danielgafni and erinkcochran87 committed Aug 16, 2024
1 parent d389069 commit 1a2da02
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 10 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
8 changes: 6 additions & 2 deletions docs/content/concepts/dagster-pipes/aws-glue.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ description: "Learn to integrate Dagster Pipes with AWS Glue to launch external

This tutorial gives a short overview on how to use [Dagster Pipes](/concepts/dagster-pipes) with [AWS Glue](https://aws.amazon.com/glue/).

## The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesGlueClient" module="dagster_aws.pipes" /> resource which can be used to launch AWS Glue jobs from Dagster assets and ops. Dagster can receive regular events like logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes on the job side.
The [dagster-aws](/\_apidocs/libraries/dagster-aws) integration library provides the <PyObject object="PipesGlueClient" module="dagster_aws.pipes" /> resource which can be used to launch AWS Glue jobs from Dagster assets and ops. Dagster can receive regular events like logs, asset checks, or asset materializations from jobs launched with this client. Using it requires minimal code changes on the job side.

---

## Prerequisites

Expand Down Expand Up @@ -129,7 +131,9 @@ defs = Definitions(

Dagster will now be able to launch the AWS Glue job from the `glue_pipes_asset` asset.

By default the client will be using the CloudWatch log stream (`.../output/<job-run-id>`) created by the Glue job to receive Dagster events and will forward this stream to `stdout`. If this is undesired, instead, the client can be configured to use <PyObject object="PipesS3MessageReader" module="dagster_aws.pipes" />, and the Glue job can use <PyObject object="PipesS3MessageWriter" module="dagster_pipes" /> .
By default, the client uses the CloudWatch log stream (`.../output/<job-run-id>`) created by the Glue job to receive Dagster events. The client will also forward the stream to `stdout`.

To customize this behavior, the client can be configured to use <PyObject object="PipesS3MessageReader" module="dagster_aws.pipes" />, and the Glue job to use <PyObject object="PipesS3MessageWriter" module="dagster_pipes" /> .

---

Expand Down
Binary file modified docs/next/public/objects.inv
Binary file not shown.
1 change: 0 additions & 1 deletion docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ Clients
.. autoclass:: dagster_aws.pipes.PipesLambdaClient

.. autoclass:: dagster_aws.pipes.PipesGlueClient
:members: run

Legacy
--------
Expand Down
16 changes: 9 additions & 7 deletions python_modules/libraries/dagster-aws/dagster_aws/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import dagster._check as check
from botocore.exceptions import ClientError
from dagster import PipesClient
from dagster._annotations import experimental
from dagster._annotations import experimental, public
from dagster._core.definitions.resource_annotation import TreatAsResourceParam
from dagster._core.errors import DagsterExecutionInterruptedError
from dagster._core.execution.context.compute import OpExecutionContext
Expand Down Expand Up @@ -198,6 +198,7 @@ def read_messages(
finally:
self._handler = None

@public
def consume_cloudwatch_logs(
self,
log_group: str,
Expand All @@ -211,11 +212,10 @@ def consume_cloudwatch_logs(
log_group (str): CloudWatch log group name
log_stream (str): CLoudWatch log stream name
start_time (Optional[int]): The start of the time range, expressed as the number of
milliseconds after Jan 1, 1970 00:00:00 UTC. Events with a timestamp equal to this
time or later than this time are included.
Events with a timestamp earlier than this time are not included.
milliseconds after ``Jan 1, 1970 00:00:00 UTC``. Only events with a timestamp equal to this
time or later are included.
end_time (Optional[int]): The end of the time range, expressed as the number of
milliseconds after Jan 1, 1970 00:00:00 UTC. Events with a timestamp equal to or
milliseconds after ``Jan 1, 1970 00:00:00 UTC``. Events with a timestamp equal to or
later than this time are not included.
"""
handler = check.not_none(
Expand Down Expand Up @@ -297,6 +297,7 @@ def __init__(
def _is_dagster_maintained(cls) -> bool:
return True

@public
def run(
self,
*,
Expand Down Expand Up @@ -362,9 +363,9 @@ class PipesGlueClient(PipesClient, TreatAsResourceParam):
message_reader (Optional[PipesMessageReader]): A message reader to use to read messages
from the glue job run. Defaults to :py:class:`PipesCloudWatchsMessageReader`.
When provided with :py:class:`PipesCloudWatchMessageReader`,
it will be used to recieve logs and events from the `.../output/<job-run-id>`
it will be used to recieve logs and events from the ``.../output/<job-run-id>``
CloudWatch log stream created by AWS Glue. Note that AWS Glue routes both
`stderr` and `stdout` from the main job process into this LogStream.
``stderr`` and ``stdout`` from the main job process into this LogStream.
client (Optional[boto3.client]): The boto Glue client used to launch the Glue job
forward_termination (bool): Whether to cancel the Glue job run when the Dagster process receives a termination signal.
"""
Expand All @@ -385,6 +386,7 @@ def __init__(
def _is_dagster_maintained(cls) -> bool:
return True

@public
def run(
self,
*,
Expand Down

0 comments on commit 1a2da02

Please sign in to comment.