Skip to content

Commit

Permalink
Merge pull request #72 from vincentclaes/71-implement-notification
Browse files Browse the repository at this point in the history
71 implement notification
  • Loading branch information
vincentclaes authored Jun 19, 2021
2 parents 2780b09 + 3b03015 commit cf63712
Show file tree
Hide file tree
Showing 10 changed files with 396 additions and 588 deletions.
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v2.3.0
rev: v4.0.1
hooks:
- id: check-yaml
- id: end-of-file-fixer
- id: trailing-whitespace
- repo: https://github.com/psf/black
rev: 19.3b0
rev: 21.6b0
hooks:
- id: black
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,23 @@ some_task >> ...

</details>

<details>
<summary>Notify in case of error/success.</summary>

Provide the parameter `notification` in the constructor of a `StepfunctionsWorkflow` object.
This will create an SNS Topic which will be triggered in case of failure or success.
The email will subscribe to the topic and receive the notification in its inbox.

```python
with StepfunctionsWorkflow(datajob_stack=datajob_stack,
name="workflow",
notification="[email protected]") as sfn:
task1 >> task2
```

You can provide 1 email or a list of emails `["[email protected]", "[email protected]"]`.

</details>

# Datajob in depth

Expand Down Expand Up @@ -348,7 +365,6 @@ These are the ideas, we find interesting to implement;
- add a time based trigger to the step functions workflow.
- add an s3 event trigger to the step functions workflow.
- add a lambda that copies data from one s3 location to another.
- add an sns that notifies in case of any failure (slack/email)
- version your data pipeline.
- cli command to view the logs / glue jobs / s3 bucket
- implement sagemaker services
Expand Down
11 changes: 6 additions & 5 deletions datajob/datajob_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ def __init__(self, datajob_stack, name, **kwargs):
assert isinstance(
datajob_stack, DataJobStack
), f"we expect the scope argument to be of type {DataJobStack}"
self.datajob_stack = datajob_stack
self.name = name
self.project_root = datajob_stack.project_root
self.stage = datajob_stack.stage
self.unique_name = f"{datajob_stack.unique_stack_name}-{self.name}"
self.context = datajob_stack.context
self.project_root = self.datajob_stack.project_root
self.stage = self.datajob_stack.stage
self.unique_name = f"{self.datajob_stack.unique_stack_name}-{self.name}"
self.context = self.datajob_stack.context
logger.info(f"adding job {self} to stack workflow resources")
datajob_stack.resources.append(self)
self.datajob_stack.resources.append(self)

@abstractmethod
def create(self):
Expand Down
6 changes: 4 additions & 2 deletions datajob/datajob_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ def __init__(
self.scope = scope
self.stage = self.get_stage(stage)
self.unique_stack_name = self._create_unique_stack_name(id, self.stage)
env = DataJobStack._create_environment_object(account=account, region=region)
super().__init__(scope=scope, id=self.unique_stack_name, env=env, **kwargs)
self.env = DataJobStack._create_environment_object(
account=account, region=region
)
super().__init__(scope=scope, id=self.unique_stack_name, env=self.env, **kwargs)
self.project_root = project_root
self.include_folder = include_folder
self.resources = []
Expand Down
Empty file added datajob/sns/__init__.py
Empty file.
67 changes: 67 additions & 0 deletions datajob/sns/sns.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import Union

from datajob.datajob_base import DataJobBase
from aws_cdk import core
from aws_cdk import aws_sns
from aws_cdk import aws_sns_subscriptions
from aws_cdk.core import Arn, ArnComponents


class SnsTopic(DataJobBase):
def __init__(
self,
datajob_stack: core.Construct,
name: str,
notification: Union[str, list],
**kwargs
):
"""
:param datajob_stack: aws cdk core construct object.
:param name: name for the SNS Topic.
:param notification: email address as string or list of email addresses to be subscribed.
:param kwargs:
"""
super().__init__(datajob_stack, name, **kwargs)
self.notification = notification
self.sns_topic = None

def create(self):
self.sns_topic = aws_sns.Topic(
scope=self,
id=self.unique_name,
display_name=self.unique_name,
topic_name=self.unique_name,
)
self.add_email_subscription()

def add_email_subscription(self) -> None:
"""
Add an email or a list of emails as subscribers to a topic.
:param sns_topic: an SNS Topic instance of aws cdk
:param notification: email address as string or list of email addresses to be subscribed.
:return: None
"""
if isinstance(self.notification, list):
for email in self.notification:
self.sns_topic.add_subscription(
aws_sns_subscriptions.EmailSubscription(email)
)
else:
self.sns_topic.add_subscription(
aws_sns_subscriptions.EmailSubscription(self.notification)
)

def get_topic_arn(self) -> str:
"""
The ARN will be formatted as follows:
arn:{partition}:{service}:{region}:{account}:{resource}{sep}{resource-name}
:return: return a well formatted arn string
"""
arn_components = ArnComponents(
partition="aws",
service="sns",
region=self.datajob_stack.env.region,
account=self.datajob_stack.env.account,
resource=self.unique_name,
)
return Arn.format(components=arn_components, stack=self.datajob_stack)
64 changes: 64 additions & 0 deletions datajob/stepfunctions/stepfunctions_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@
import tempfile
import uuid
from pathlib import Path
from typing import Union

import boto3
import contextvars
from aws_cdk import aws_iam as iam
from aws_cdk import cloudformation_include as cfn_inc
from aws_cdk import core
from stepfunctions.steps import Catch, Pass, Fail
from stepfunctions.steps.service import SnsPublishStep
from stepfunctions import steps
from stepfunctions.steps.compute import GlueStartJobRunStep
from stepfunctions.steps.states import Parallel
from stepfunctions.workflow import Workflow

from datajob import logger
from datajob.datajob_base import DataJobBase
from datajob.sns.sns import SnsTopic

__workflow = contextvars.ContextVar("workflow")

Expand All @@ -36,6 +40,7 @@ def __init__(
self,
datajob_stack: core.Construct,
name: str,
notification: Union[str, list] = None,
role: iam.Role = None,
region: str = None,
**kwargs,
Expand All @@ -53,6 +58,7 @@ def __init__(
self.region = (
region if region is not None else os.environ.get("AWS_DEFAULT_REGION")
)
self.notification = self._setup_notification(notification)

def add_task(self, task_other):
"""add a task to the workflow we would like to orchestrate."""
Expand Down Expand Up @@ -87,6 +93,9 @@ def _build_workflow(self):
f"creating a chain from all the different steps. \n {self.chain_of_tasks}"
)
workflow_definition = steps.Chain(self.chain_of_tasks)
workflow_definition = self._integrate_notification_in_workflow(
workflow_definition=workflow_definition
)
logger.debug(f"creating a workflow with name {self.unique_name}")
self.client = boto3.client("stepfunctions")
self.workflow = Workflow(
Expand All @@ -104,6 +113,61 @@ def create(self):
text_file.write(self.workflow.get_cloudformation_template())
cfn_inc.CfnInclude(self, self.unique_name, template_file=sfn_cf_file_path)

def _setup_notification(
self, notification: Union[str, list]
) -> Union[SnsTopic, None]:
"""Create a SnsTopic if the notification parameter is defined.
:param notification: email address as string or list of email addresses to be subscribed.
:return:
"""
if notification is not None:
name = f"{self.name}-notification"
return SnsTopic(self.datajob_stack, name, notification)

def _integrate_notification_in_workflow(
self, workflow_definition: steps.Chain
) -> steps.Chain:
"""If a notification is defined we configure an SNS with email subscription to alert the user
if the stepfunctions workflow failed or succeeded.
:param workflow_definition: the workflow definition that contains all the steps we want to execute.
:return: if notification is set, we adapt the workflow to include an SnsPublishStep on failure or on success.
If notification is not set, we return the workflow as we received it.
"""
if self.notification:
logger.debug(
"A notification is configured, "
"implementing a notification on Error or when the stepfunctions workflow succeeds."
)
failure_notification = SnsPublishStep(
"FailureNotification",
parameters={
"TopicArn": self.notification.get_topic_arn(),
"Message": f"Stepfunctions workflow {self.unique_name} Failed.",
},
)
pass_notification = SnsPublishStep(
"SuccessNotification",
parameters={
"TopicArn": self.notification.get_topic_arn(),
"Message": f"Stepfunctions workflow {self.unique_name} Succeeded.",
},
)

catch_error = Catch(
error_equals=["States.ALL"], next_step=failure_notification
)
workflow_with_notification = Parallel(state_id="notification")
workflow_with_notification.add_branch(workflow_definition)
workflow_with_notification.add_catch(catch_error)
workflow_with_notification.next(pass_notification)
return steps.Chain([workflow_with_notification])
logger.debug(
"No notification is configured, returning the workflow definition."
)
return workflow_definition

def __enter__(self):
"""first steps we have to do when entering the context manager."""
logger.info(f"creating step functions workflow for {self.unique_name}")
Expand Down
48 changes: 47 additions & 1 deletion datajob_tests/stepfunctions/test_stepfunctions_workflow.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json
import os
import unittest
import io

import yaml
from aws_cdk import core
from moto import mock_stepfunctions
from stepfunctions.steps.compute import GlueStartJobRunStep
Expand Down Expand Up @@ -82,7 +85,9 @@ def test_create_tasks_for_orchestration_starts_with_parallel_flow_successfully(
isinstance(a_step_functions_workflow.chain_of_tasks[1], GlueStartJobRunStep)
)

def test_orchestrate_1_task_successfully(self,):
def test_orchestrate_1_task_successfully(
self,
):
task1 = stepfunctions_workflow.task(SomeMockedClass("task1"))
djs = DataJobStack(
scope=self.app,
Expand All @@ -98,3 +103,44 @@ def test_orchestrate_1_task_successfully(self,):
self.assertTrue(
isinstance(a_step_functions_workflow.chain_of_tasks[0], GlueStartJobRunStep)
)

@mock_stepfunctions
def test_create_workflow_with_notification_successfully(self):
task1 = stepfunctions_workflow.task(SomeMockedClass("task1"))
task2 = stepfunctions_workflow.task(SomeMockedClass("task2"))

djs = DataJobStack(
scope=self.app,
id="a-unique-name-3",
stage="stage",
project_root="sampleproject/",
region="eu-west-1",
account="3098726354",
)
with StepfunctionsWorkflow(
djs, "some-name", notification="[email protected]"
) as a_step_functions_workflow:
task1 >> task2

with io.StringIO() as f:
f.write(a_step_functions_workflow.workflow.get_cloudformation_template())
f.seek(0)
cf_template = yaml.load(f, Loader=yaml.FullLoader)

sfn_workflow = json.loads(
cf_template.get("Resources")
.get("StateMachineComponent")
.get("Properties")
.get("DefinitionString")
)
# we expect two notifications; 1 for success and one for failure
self.assertTrue("SuccessNotification" in sfn_workflow.get("States").keys())
self.assertTrue("FailureNotification" in sfn_workflow.get("States").keys())
# there is a catch statement in the statemachine
self.assertTrue(
"Catch" in sfn_workflow.get("States").get("notification").keys()
)
# when implementing a notification we expect a Parallel branch
self.assertEqual(
sfn_workflow.get("States").get("notification").get("Type"), "Parallel"
)
Loading

0 comments on commit cf63712

Please sign in to comment.