Skip to content

Commit

Permalink
cloudwatch message reader
Browse files Browse the repository at this point in the history
  • Loading branch information
danielgafni committed Aug 5, 2024
1 parent 9cd950a commit d31a46d
Show file tree
Hide file tree
Showing 10 changed files with 225 additions and 55 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
13 changes: 6 additions & 7 deletions docs/content/concepts/dagster-pipes/aws-glue.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,26 @@ Call `open_dagster_pipes` in the Glue job script to create a context that can be
import boto3
from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesDefaultMessageWriter,
PipesS3ContextLoader,
PipesS3MessageWriter,
open_dagster_pipes,
)

client = boto3.client("s3")
context_loader = PipesS3ContextLoader(client)
message_writer = PipesS3MessageWriter(client)
params_loader = PipesCliArgsParamsLoader()


def main():
with open_dagster_pipes(
context_loader=context_loader,
message_writer=message_writer,
params_loader=params_loader,
) as pipes:
pipes.log.info("Hello from AWS Glue job!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)


if __name__ == "__main__":
Expand Down Expand Up @@ -107,7 +109,7 @@ Next, add the `PipesGlueClient` resource to your project's <PyObject object="Def

```python file=/guides/dagster/dagster_pipes/glue/dagster_code.py startafter=start_definitions_marker endbefore=end_definitions_marker
from dagster import Definitions # noqa
from dagster_aws.pipes import PipesGlueContextInjector, PipesS3MessageReader
from dagster_aws.pipes import PipesGlueContextInjector


bucket = os.environ["DAGSTER_GLUE_S3_CONTEXT_BUCKET"]
Expand All @@ -122,9 +124,6 @@ defs = Definitions(
client=boto3.client("s3"),
bucket=bucket,
),
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=bucket
),
)
},
)
Expand Down
Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def glue_pipes_asset(
# start_definitions_marker

from dagster import Definitions # noqa
from dagster_aws.pipes import PipesGlueContextInjector, PipesS3MessageReader
from dagster_aws.pipes import PipesGlueContextInjector


bucket = os.environ["DAGSTER_GLUE_S3_CONTEXT_BUCKET"]
Expand All @@ -39,9 +39,6 @@ def glue_pipes_asset(
client=boto3.client("s3"),
bucket=bucket,
),
message_reader=PipesS3MessageReader(
client=boto3.client("s3"), bucket=bucket
),
)
},
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import boto3
from dagster_pipes import (
PipesCliArgsParamsLoader,
PipesDefaultMessageWriter,
PipesS3ContextLoader,
PipesS3MessageWriter,
open_dagster_pipes,
)

client = boto3.client("s3")
context_loader = PipesS3ContextLoader(client)
message_writer = PipesS3MessageWriter(client)
params_loader = PipesCliArgsParamsLoader()


def main():
with open_dagster_pipes(
context_loader=context_loader,
message_writer=message_writer,
params_loader=params_loader,
) as pipes:
pipes.log.info("Hello from AWS Glue job!")
pipes.report_asset_materialization(
metadata={"some_metric": {"raw_value": 0, "type": "int"}},
data_version="alpha",
)


if __name__ == "__main__":
Expand Down
151 changes: 123 additions & 28 deletions python_modules/libraries/dagster-aws/dagster_aws/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,19 @@
import string
import time
from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Dict, Iterator, Literal, Mapping, Optional, Sequence
from typing import (
TYPE_CHECKING,
Any,
Dict,
Generator,
Iterator,
List,
Literal,
Mapping,
Optional,
Sequence,
TypedDict,
)

import boto3
import dagster._check as check
Expand Down Expand Up @@ -157,10 +169,22 @@ def no_messages_debug_text(self) -> str:
)


class CloudWatchEvent(TypedDict):
timestamp: int
message: str
ingestionTime: int


@experimental
class PipesCloudWatchMessageReader(PipesMessageReader):
"""Message reader that consumes AWS CloudWatch logs to read pipes messages."""

def __init__(self, client: Optional[boto3.client] = None):
"""Args:
client (boto3.client): boto3 CloudWatch client.
"""
self.client = client or boto3.client("logs")

@contextmanager
def read_messages(
self,
Expand All @@ -174,13 +198,80 @@ def read_messages(
self._handler = None

def consume_cloudwatch_logs(
self, client: boto3.client, log_group: str, log_stream: str
self,
log_group: str,
log_stream: str,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
) -> None:
raise NotImplementedError("CloudWatch logs are not yet supported in the pipes protocol.")
handler = check.not_none(
self._handler, "Can only consume logs within context manager scope."
)

for events_batch in self._get_all_cloudwatch_events(
log_group=log_group, log_stream=log_stream, start_time=start_time, end_time=end_time
):
for event in events_batch:
for log_line in event["message"].splitlines():
extract_message_or_forward_to_stdout(handler, log_line)

def no_messages_debug_text(self) -> str:
return "Attempted to read messages by extracting them from the tail of CloudWatch logs directly."

def _get_all_cloudwatch_events(
self,
log_group: str,
log_stream: str,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
) -> Generator[List[CloudWatchEvent], None, None]:
"""Returns batches of CloudWatch events until the stream is complete or end_time."""
params: Dict[str, Any] = {
"logGroupName": log_group,
"logStreamName": log_stream,
}

if start_time is not None:
params["startTime"] = start_time
if end_time is not None:
params["endTime"] = end_time

response = self.client.get_log_events(**params)

while events := response.get("events"):
yield events

params["nextToken"] = response["nextForwardToken"]

response = self.client.get_log_events(**params)

def _get_all_cloudwatch_events(
self,
log_group: str,
log_stream: str,
start_time: Optional[int] = None,
end_time: Optional[int] = None,
) -> Generator[List[CloudWatchEvent], None, None]:
"""Returns batches of CloudWatch events until the stream is complete or end_time."""
params = {
"logGroupName": log_group,
"logStreamName": log_stream,
}

if start_time is not None:
params["startTime"] = start_time
if end_time is not None:
params["endTime"] = end_time

response = self.client.get_log_events(**params)

while events := response.get("events"):
yield events

params["nextToken"] = response["nextForwardToken"]

response = self.client.get_log_events(**params)


class PipesLambdaEventContextInjector(PipesEnvContextInjector):
def no_messages_debug_text(self) -> str:
Expand All @@ -203,11 +294,11 @@ class PipesLambdaClient(PipesClient, TreatAsResourceParam):

def __init__(
self,
client: boto3.client,
client: Optional[boto3.client] = None,
context_injector: Optional[PipesContextInjector] = None,
message_reader: Optional[PipesMessageReader] = None,
):
self._client = client
self._client = client or boto3.client("lambda")
self._message_reader = message_reader or PipesLambdaLogsMessageReader()
self._context_injector = context_injector or PipesLambdaEventContextInjector()

Expand Down Expand Up @@ -272,35 +363,34 @@ def run(

class PipesGlueContextInjector(PipesS3ContextInjector):
def no_messages_debug_text(self) -> str:
return "Attempted to inject context via Glue job arguments."
return "Attempted to inject context via Glue job Arguments"


class PipesGlueLogsMessageReader(PipesCloudWatchMessageReader):
def no_messages_debug_text(self) -> str:
return "Attempted to read messages by extracting them from the tail of CloudWatch logs directly."
pass


@experimental
class PipesGlueClient(PipesClient, TreatAsResourceParam):
"""A pipes client for invoking AWS Glue jobs.
Args:
client (boto3.client): The boto Glue client used to call invoke.
context_injector (Optional[PipesContextInjector]): A context injector to use to inject
context into the Glue job, for example, :py:class:`PipesGlueContextInjector`.
message_reader (Optional[PipesMessageReader]): A message reader to use to read messages
from the glue job run. Defaults to :py:class:`PipesGlueLogsMessageReader`.
client (Optional[boto3.client]): The boto Glue client used to launch the Glue job
"""

def __init__(
self,
client: boto3.client,
context_injector: PipesContextInjector,
message_reader: Optional[PipesMessageReader] = None,
client: Optional[boto3.client] = None,
):
self._client = client
self._client = client or boto3.client("glue")
self._context_injector = context_injector
self._message_reader = message_reader or PipesCloudWatchMessageReader()
self._message_reader = message_reader or PipesGlueLogsMessageReader()

@classmethod
def _is_dagster_maintained(cls) -> bool:
Expand Down Expand Up @@ -377,19 +467,10 @@ def run(
# so we need to filter them out
params = {k: v for k, v in params.items() if v is not None}

try:
response = self._client.start_job_run(**params)
run_id = response["JobRunId"]
context.log.info(f"Started AWS Glue job {job_name} run: {run_id}")
response = self._wait_for_job_run_completion(job_name, run_id)

if response["JobRun"]["JobRunState"] == "FAILED":
raise RuntimeError(
f"Glue job {job_name} run {run_id} failed:\n{response['JobRun']['ErrorMessage']}"
)
else:
context.log.info(f"Glue job {job_name} run {run_id} completed successfully")
start_timestamp = time.time() * 1000 # unix time in ms

try:
run_id = self._client.start_job_run(**params)["JobRunId"]
except ClientError as err:
context.log.error(
"Couldn't create job %s. Here's why: %s: %s",
Expand All @@ -399,11 +480,25 @@ def run(
)
raise

# TODO: get logs from CloudWatch. there are 2 separate streams for stdout and driver stderr to read from
# the log group can be found in the response from start_job_run, and the log stream is the job run id
# worker logs have log streams like: <job_id>_<worker_id> but we probably don't need to read those
response = self._client.get_job_run(JobName=job_name, RunId=run_id)
log_group = response["JobRun"]["LogGroupName"]
context.log.info(f"Started AWS Glue job {job_name} run: {run_id}")

response = self._wait_for_job_run_completion(job_name, run_id)

if response["JobRun"]["JobRunState"] == "FAILED":
raise RuntimeError(
f"Glue job {job_name} run {run_id} failed:\n{response['JobRun']['ErrorMessage']}"
)
else:
context.log.info(f"Glue job {job_name} run {run_id} completed successfully")

if isinstance(self._message_reader, PipesCloudWatchMessageReader):
# TODO: receive messages from a background thread in real-time
self._message_reader.consume_cloudwatch_logs(
f"{log_group}/output", run_id, start_time=int(start_timestamp)
)

# should probably have a way to return the lambda result payload
return PipesClientCompletedInvocation(session)

def _wait_for_job_run_completion(self, job_name: str, run_id: str) -> Dict[str, Any]:
Expand Down
Loading

0 comments on commit d31a46d

Please sign in to comment.