Skip to content

Commit

Permalink
cloudwatch message reader docs
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Aug 6, 2024
1 parent dc0da1c commit a40c740
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 14 deletions.
13 changes: 6 additions & 7 deletions docs/content/concepts/dagster-pipes/aws-glue.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,26 @@ Call `open_dagster_pipes` in the Glue job script to create a context that can be
import boto3
from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesDefaultMessageWriter,
PipesS3ContextLoader,
PipesS3MessageWriter,
open_dagster_pipes,
)

client = boto3.client("s3")
context_loader = PipesS3ContextLoader(client)
message_writer = PipesS3MessageWriter(client)
params_loader = PipesCliArgsParamsLoader()


def main():
with open_dagster_pipes(
context_loader=context_loader,
message_writer=message_writer,
params_loader=params_loader,
) as pipes:
pipes.log.info("Hello from AWS Glue job!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)


if __name__ == "__main__":
Expand Down Expand Up @@ -107,7 +109,7 @@ Next, add the `PipesGlueClient` resource to your project's <PyObject object="Def

```python file=/guides/dagster/dagster_pipes/glue/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
from dagster import Definitions # noqa
from dagster_aws.pipes import PipesGlueContextInjector, PipesS3MessageReader
from dagster_aws.pipes import PipesGlueContextInjector


bucket = os.environ["DAGSTER_GLUE_S3_CONTEXT_BUCKET"]
Expand All @@ -122,9 +124,6 @@ defs = Definitions(
client=boto3.client("s3"),
bucket=bucket,
),
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=bucket
),
)
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def glue_pipes_asset(
# start_definitions_marker

from dagster import Definitions # noqa
from dagster_aws.pipes import PipesGlueContextInjector, PipesS3MessageReader
from dagster_aws.pipes import PipesGlueContextInjector


bucket = os.environ["DAGSTER_GLUE_S3_CONTEXT_BUCKET"]
Expand All @@ -39,9 +39,6 @@ def glue_pipes_asset(
client=boto3.client("s3"),
bucket=bucket,
),
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=bucket
),
)
},
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import boto3
from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesDefaultMessageWriter,
PipesS3ContextLoader,
PipesS3MessageWriter,
open_dagster_pipes,
)

client = boto3.client("s3")
context_loader = PipesS3ContextLoader(client)
message_writer = PipesS3MessageWriter(client)
params_loader = PipesCliArgsParamsLoader()


def main():
with open_dagster_pipes(
context_loader=context_loader,
message_writer=message_writer,
params_loader=params_loader,
) as pipes:
pipes.log.info("Hello from AWS Glue job!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)


if __name__ == "__main__":
Expand Down

0 comments on commit a40c740

Please sign in to comment.