diff --git a/docs/content/api/modules.json.gz b/docs/content/api/modules.json.gz index 8b6d7eef297cc..e0a38edcd36e5 100644 Binary files a/docs/content/api/modules.json.gz and b/docs/content/api/modules.json.gz differ diff --git a/docs/content/api/searchindex.json.gz b/docs/content/api/searchindex.json.gz index e5cd77e15a6d3..6e05a2c4a4ba1 100644 Binary files a/docs/content/api/searchindex.json.gz and b/docs/content/api/searchindex.json.gz differ diff --git a/docs/content/api/sections.json.gz b/docs/content/api/sections.json.gz index e4261752b4208..5f130c0849c26 100644 Binary files a/docs/content/api/sections.json.gz and b/docs/content/api/sections.json.gz differ diff --git a/docs/content/concepts/dagster-pipes/aws-glue.mdx b/docs/content/concepts/dagster-pipes/aws-glue.mdx index 82737f6586600..406143be913ad 100644 --- a/docs/content/concepts/dagster-pipes/aws-glue.mdx +++ b/docs/content/concepts/dagster-pipes/aws-glue.mdx @@ -47,23 +47,24 @@ import boto3 from dagster_pipes import ( PipesCliArgsParamsLoader, PipesS3ContextLoader, - PipesS3MessageWriter, open_dagster_pipes, ) client = boto3.client("s3") context_loader = PipesS3ContextLoader(client) -message_writer = PipesS3MessageWriter(client) params_loader = PipesCliArgsParamsLoader() def main(): with open_dagster_pipes( context_loader=context_loader, - message_writer=message_writer, params_loader=params_loader, ) as pipes: pipes.log.info("Hello from AWS Glue job!") + pipes.report_asset_materialization( + metadata={"some_metric": {"raw_value": 0, "type": "int"}}, + data_version="alpha", + ) if __name__ == "__main__": @@ -107,7 +108,8 @@ Next, add the `PipesGlueClient` resource to your project's `) created by the Glue job to receive Dagster events. The client will also forward the stream to `stdout`. + +To customize this behavior, the client can be configured to use , and the Glue job to use . + --- ## Related diff --git a/docs/next/public/objects.inv b/docs/next/public/objects.inv index d9e1537489ec5..b52155dd004d7 100644 Binary files a/docs/next/public/objects.inv and b/docs/next/public/objects.inv differ diff --git a/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst b/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst index a0b72ecb195f0..078144abceb26 100644 --- a/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst +++ b/docs/sphinx/sections/api/apidocs/libraries/dagster-aws.rst @@ -98,6 +98,24 @@ Resources which surface SecretsManager secrets for use in Dagster resources and Pipes -------------- +Context Injectors +^^^^^^^^^^^^^^^^^ + +.. autoclass:: dagster_aws.pipes.PipesS3ContextInjector + +.. autoclass:: dagster_aws.pipes.PipesLambdaEventContextInjector + +Message Readers +^^^^^^^^^^^^^^^ + +.. autoclass:: dagster_aws.pipes.PipesS3MessageReader + +.. autoclass:: dagster_aws.pipes.PipesCloudWatchMessageReader + :members: consume_cloudwatch_logs + +Clients +^^^^^^^ + .. autoclass:: dagster_aws.pipes.PipesLambdaClient .. autoclass:: dagster_aws.pipes.PipesGlueClient diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/glue/dagster_code.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/glue/dagster_code.py index 7e224fb480893..92bb51092d023 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/glue/dagster_code.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/glue/dagster_code.py @@ -24,7 +24,8 @@ def glue_pipes_asset( # start_definitions_marker from dagster import Definitions # noqa -from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader +from dagster_aws.pipes import PipesS3ContextInjector, PipesCloudWatchMessageReader + bucket = os.environ["DAGSTER_GLUE_S3_CONTEXT_BUCKET"] @@ -38,9 +39,7 @@ def glue_pipes_asset( client=boto3.client("s3"), bucket=bucket, ), - message_reader=PipesS3MessageReader( - client=boto3.client("s3"), bucket=bucket - ), + message_reader=PipesCloudWatchMessageReader(client=boto3.client("logs")), ) }, ) diff --git a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/glue/glue_script.py b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/glue/glue_script.py index 8402f980f1866..df7c61a10fd4d 100644 --- a/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/glue/glue_script.py +++ b/examples/docs_snippets/docs_snippets/guides/dagster/dagster_pipes/glue/glue_script.py @@ -2,23 +2,24 @@ from dagster_pipes import ( PipesCliArgsParamsLoader, PipesS3ContextLoader, - PipesS3MessageWriter, open_dagster_pipes, ) client = boto3.client("s3") context_loader = PipesS3ContextLoader(client) -message_writer = PipesS3MessageWriter(client) params_loader = PipesCliArgsParamsLoader() def main(): with open_dagster_pipes( context_loader=context_loader, - message_writer=message_writer, params_loader=params_loader, ) as pipes: pipes.log.info("Hello from AWS Glue job!") + pipes.report_asset_materialization( + metadata={"some_metric": {"raw_value": 0, "type": "int"}}, + data_version="alpha", + ) if __name__ == "__main__": diff --git a/python_modules/libraries/dagster-aws/dagster_aws/pipes.py b/python_modules/libraries/dagster-aws/dagster_aws/pipes.py index fb3f7b230dfed..d50bd0e3315ef 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws/pipes.py +++ b/python_modules/libraries/dagster-aws/dagster_aws/pipes.py @@ -23,7 +23,7 @@ import dagster._check as check from botocore.exceptions import ClientError from dagster import PipesClient -from dagster._annotations import experimental +from dagster._annotations import experimental, public from dagster._core.definitions.resource_annotation import TreatAsResourceParam from dagster._core.errors import DagsterExecutionInterruptedError from dagster._core.execution.context.compute import OpExecutionContext @@ -198,6 +198,7 @@ def read_messages( finally: self._handler = None + @public def consume_cloudwatch_logs( self, log_group: str, @@ -205,6 +206,18 @@ def consume_cloudwatch_logs( start_time: Optional[int] = None, end_time: Optional[int] = None, ) -> None: + """Reads logs from AWS CloudWatch and forwards them to Dagster for events extraction and logging. + + Args: + log_group (str): CloudWatch log group name + log_stream (str): CLoudWatch log stream name + start_time (Optional[int]): The start of the time range, expressed as the number of + milliseconds after ``Jan 1, 1970 00:00:00 UTC``. Only events with a timestamp equal to this + time or later are included. + end_time (Optional[int]): The end of the time range, expressed as the number of + milliseconds after ``Jan 1, 1970 00:00:00 UTC``. Events with a timestamp equal to or + later than this time are not included. + """ handler = check.not_none( self._handler, "Can only consume logs within context manager scope." ) @@ -217,7 +230,7 @@ def consume_cloudwatch_logs( extract_message_or_forward_to_stdout(handler, log_line) def no_messages_debug_text(self) -> str: - return "Attempted to read messages by extracting them from the tail of CloudWatch logs directly." + return "Attempted to read messages by extracting them from CloudWatch logs directly." def _get_all_cloudwatch_events( self, @@ -248,6 +261,10 @@ def _get_all_cloudwatch_events( class PipesLambdaEventContextInjector(PipesEnvContextInjector): + """Injects context via AWS Lambda event input. + Should be paired with :py:class`~dagster_pipes.PipesMappingParamsLoader` on the Lambda side. + """ + def no_messages_debug_text(self) -> str: return "Attempted to inject context via the lambda event input." @@ -256,7 +273,7 @@ class PipesLambdaClient(PipesClient, TreatAsResourceParam): """A pipes client for invoking AWS lambda. By default context is injected via the lambda input event and messages are parsed out of the - 4k tail of logs. S3 + 4k tail of logs. Args: client (boto3.client): The boto lambda client used to call invoke. @@ -280,6 +297,7 @@ def __init__( def _is_dagster_maintained(cls) -> bool: return True + @public def run( self, *, @@ -344,6 +362,10 @@ class PipesGlueClient(PipesClient, TreatAsResourceParam): context into the Glue job, for example, :py:class:`PipesS3ContextInjector`. message_reader (Optional[PipesMessageReader]): A message reader to use to read messages from the glue job run. Defaults to :py:class:`PipesCloudWatchsMessageReader`. + When provided with :py:class:`PipesCloudWatchMessageReader`, + it will be used to recieve logs and events from the ``.../output/`` + CloudWatch log stream created by AWS Glue. Note that AWS Glue routes both + ``stderr`` and ``stdout`` from the main job process into this LogStream. client (Optional[boto3.client]): The boto Glue client used to launch the Glue job forward_termination (bool): Whether to cancel the Glue job run when the Dagster process receives a termination signal. """ @@ -364,6 +386,7 @@ def __init__( def _is_dagster_maintained(cls) -> bool: return True + @public def run( self, *,