diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml
index 6e6e4e9..c4500c5 100644
--- a/.pre-commit-config.yaml
+++ b/.pre-commit-config.yaml
@@ -11,3 +11,8 @@ repos:
rev: 21.6b0
hooks:
- id: black
+- repo: https://github.com/myint/docformatter
+ rev: v1.4
+ hooks:
+ - id: docformatter
+ args: [--in-place]
diff --git a/README.md b/README.md
index 0b12197..7cba821 100644
--- a/README.md
+++ b/README.md
@@ -25,33 +25,33 @@
# Quickstart
-We have a simple data pipeline composed of [2 glue jobs](./examples/data_pipeline_with_packaged_project/glue_jobs/) orchestrated sequentially using step functions.
+You can find the full example in [examples/data_pipeline_simple/glue_jobs/](./examples/data_pipeline_simple/glue_jobs/).
+
+We have a simple data pipeline composed of [2 glue jobs](./examples/data_pipeline_simple/glue_jobs/) orchestrated sequentially using step functions.
```python
-import pathlib
from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow
-
-current_dir = pathlib.Path(__file__).parent.absolute()
-
app = core.App()
+# The datajob_stack is the instance that will result in a cloudformation stack.
+# We inject the datajob_stack object through all the resources that we want to add.
+with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack:
-with DataJobStack(scope=app, id="data-pipeline-pkg", project_root=current_dir) as datajob_stack:
-
+ # We define 2 glue jobs with the relative path to the source code.
task1 = GlueJob(
datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py"
)
-
task2 = GlueJob(
datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py"
)
- with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow:
+ # We instantiate a step functions workflow and orchestrate the glue jobs.
+ with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
task1 >> task2
app.synth()
@@ -70,53 +70,31 @@ export AWS_PROFILE=default
export AWS_ACCOUNT=$(aws sts get-caller-identity --query Account --output text --profile $AWS_PROFILE)
export AWS_DEFAULT_REGION=us-east-2
+# init cdk
cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION
```
### Deploy
```shell
-export STAGE=$AWS_ACCOUNT
-cd examples/data_pipeline_with_packaged_project
-datajob deploy --config datajob_stack.py --stage $STAGE --package setuppy
-```
-Datajob will create s3 buckets based on the `stage` variable.
-The stage variable will typically be something like "dev", "stg", "prd", ...
-but since S3 buckets need to be globally unique, for this example we will use our `$AWS_ACCOUNT` for the `--stage` parameter.
-
-
-use cdk cli
-
-```shell script
-cd examples/data_pipeline_with_packaged_project
-python setup.py bdist_wheel
-cdk deploy --app "python datajob_stack.py" -c stage=$STAGE
+cd examples/data_pipeline_simple
+cdk deploy --app "python datajob_stack.py"
```
-
### Run
```shell script
-datajob execute --state-machine data-pipeline-pkg-$STAGE-workflow
+datajob execute --state-machine data-pipeline-simple-workflow
```
-The step function state machine name is constructed as `--`.
The terminal will show a link to the step functions page to follow up on your pipeline run.
-### Destroy
+![sfn](./assets/sfn.png)
-```shell script
-datajob destroy --config datajob_stack.py --stage $STAGE
-```
-
-
-use cdk cli
+### Destroy
```shell script
-cdk destroy --app "python datajob_stack.py" -c stage=$STAGE
+cdk destroy --app "python datajob_stack.py"
```
-
-
-> Note: you can use any cdk arguments in the datajob cli
# Functionality
diff --git a/assets/sfn.png b/assets/sfn.png
new file mode 100644
index 0000000..082b314
Binary files /dev/null and b/assets/sfn.png differ
diff --git a/datajob/datajob.py b/datajob/datajob.py
index e697486..c9d0ba8 100644
--- a/datajob/datajob.py
+++ b/datajob/datajob.py
@@ -7,7 +7,7 @@
import typer
from stepfunctions.workflow.widgets.utils import create_sfn_execution_url
-from datajob import console, DEFAULT_STACK_STAGE
+from datajob import console
from datajob.package import wheel
from datajob.stepfunctions import stepfunctions_execute
@@ -16,7 +16,7 @@
def run():
- """entrypoint for datajob"""
+ """entrypoint for datajob."""
app()
@@ -25,7 +25,7 @@ def run():
)
def deploy(
stage: str = typer.Option(
- DEFAULT_STACK_STAGE,
+ None,
help="the stage of the data pipeline stack you would like to deploy (dev/stg/prd/ ...)",
),
config: str = typer.Option(
@@ -55,7 +55,7 @@ def deploy(
)
def synthesize(
stage: str = typer.Option(
- DEFAULT_STACK_STAGE,
+ None,
help="the stage of the data pipeline stack you would like to synthesize (dev/stg/prd/ ...)",
),
config: str = typer.Option(
@@ -77,7 +77,7 @@ def synthesize(
)
def destroy(
stage: str = typer.Option(
- DEFAULT_STACK_STAGE,
+ None,
help="the stage of the data pipeline stack you would like to destroy (dev/stg/prd/ ...)",
),
config: str = typer.Option(
diff --git a/datajob/datajob_context.py b/datajob/datajob_context.py
index 4bc9d67..ff43834 100644
--- a/datajob/datajob_context.py
+++ b/datajob/datajob_context.py
@@ -1,3 +1,4 @@
+import uuid
from pathlib import Path
from aws_cdk import core, aws_s3_deployment, aws_s3
@@ -15,41 +16,38 @@ class DatajobContextWheelError(Exception):
class DataJobContext(core.Construct):
- """
- DataJobContext is a class that creates context in order to deploy and run our
- pipeline. You have to instantiate this class once per DatajobStack.
+ """DataJobContext is a class that creates context in order to deploy and
+ run our pipeline. You have to instantiate this class once per DatajobStack.
DataJobContext creates:
- data bucket: this is the bucket that you can use to ingest, dump intermediate results and the final ouptut.
- deployment bucket: this is the bucket that holds you code ( scripts, wheel, config, ...)
-
"""
def __init__(
self,
scope: core.Construct,
- unique_stack_name: str,
project_root: str = None,
include_folder: str = None,
**kwargs,
) -> None:
"""
:param scope: aws cdk core construct object.
- :param unique_stack_name: a unique name for this stack. like this the name of our resources will not collide with other deployments.
+ :param stage: stage from DataJobStack.
:param project_root: the path to the root of this project
:param include_folder: specify the name of the folder we would like to include in the deployment bucket.
"""
logger.info("creating datajob context.")
- super().__init__(scope, unique_stack_name, **kwargs)
+ self.unique_stack_name = scope.unique_stack_name
+ super().__init__(scope, self.unique_stack_name, **kwargs)
+ self.stage = scope.stage
+ self.bucket_suffix = None
self.project_root = project_root
- self.unique_stack_name = unique_stack_name
(
self.deployment_bucket,
self.deployment_bucket_name,
- ) = self._create_deployment_bucket(self.unique_stack_name)
- (self.data_bucket, self.data_bucket_name) = self._create_data_bucket(
- self.unique_stack_name
- )
+ ) = self._create_deployment_bucket()
+ (self.data_bucket, self.data_bucket_name) = self._create_data_bucket()
self.s3_url_wheel = None
if self.project_root:
self.s3_url_wheel = self._deploy_wheel(
@@ -63,16 +61,17 @@ def __init__(
self._deploy_local_folder(include_folder)
logger.info("datajob context created.")
- def _create_data_bucket(self, unique_stack_name: str) -> tuple:
- """
- use the unique stack name to create an s3 bucket for your data.
- We take an EmptyS3Bucket so that we can remove the stack including the deployment bucket with its contents.
- if we take a regular S3 bucket, the bucket will be orphaned from the stack leaving
- our account with all oprhaned s3 buckets.
+ def _create_data_bucket(self) -> tuple:
+ """use the unique stack name to create an s3 bucket for your data. We
+ take an EmptyS3Bucket so that we can remove the stack including the
+ deployment bucket with its contents. if we take a regular S3 bucket,
+ the bucket will be orphaned from the stack leaving our account with all
+ oprhaned s3 buckets.
+
:param unique_stack_name: the unique stack name of the datajob stack.
:return: s3 bucket object, name of our bucket
"""
- data_bucket_name = f"{unique_stack_name}"
+ data_bucket_name = self._get_unique_bucket_name()
# todo - can we validate the bucket name?
logger.debug(f"creating deployment bucket {data_bucket_name}")
data_bucket = EmptyS3Bucket(
@@ -85,16 +84,18 @@ def _create_data_bucket(self, unique_stack_name: str) -> tuple:
)
return data_bucket, data_bucket_name
- def _create_deployment_bucket(self, unique_stack_name: str) -> tuple:
- """
- use the unique stack name to create an s3 bucket for deployment purposes.
- We take an EmptyS3Bucket so that we can remove the stack including the deployment bucket with its contents.
- if we take a regular S3 bucket, the bucket will be orphaned from the stack leaving
- our account with all oprhaned s3 buckets.
+ def _create_deployment_bucket(self) -> tuple:
+ """use the unique stack name to create an s3 bucket for deployment
+ purposes. We take an EmptyS3Bucket so that we can remove the stack
+ including the deployment bucket with its contents. if we take a regular
+ S3 bucket, the bucket will be orphaned from the stack leaving our
+ account with all oprhaned s3 buckets.
+
:param unique_stack_name: the unique stack name of the datajob stack.
:return: s3 bucket object, name of our bucket
"""
- deployment_bucket_name = f"{unique_stack_name}-deployment-bucket"
+ unique_bucket_name = self._get_unique_bucket_name()
+ deployment_bucket_name = f"{unique_bucket_name}-deployment-bucket"
# todo - can we validate the bucket name?
logger.debug(f"creating deployment bucket {deployment_bucket_name}")
deployment_bucket = EmptyS3Bucket(
@@ -105,6 +106,20 @@ def _create_deployment_bucket(self, unique_stack_name: str) -> tuple:
)
return deployment_bucket, deployment_bucket_name
+ def _get_unique_bucket_name(self):
+ """if a stage is specified we use the unique_stack_name, if no stage is
+ specified we return some random characters to have a high chance of a
+ unique name."""
+ if self.stage:
+ logger.debug(
+ "We have a stage therefore we have a unique name for our bucket."
+ )
+ return self.unique_stack_name
+ logger.debug(
+ "We don't have a stage, therefore we generate a random value for the bucketname."
+ )
+ return f"{self.unique_stack_name}-{uuid.uuid4().hex[:4]}"
+
def _deploy_wheel(
self,
unique_stack_name: str,
@@ -112,8 +127,8 @@ def _deploy_wheel(
deployment_bucket: aws_s3.Bucket,
deployment_bucket_name: str,
) -> str:
- """
- Create a wheel and add the .whl file to the deployment bucket.
+ """Create a wheel and add the .whl file to the deployment bucket.
+
:param unique_stack_name: the unique stack name of the datajob stack.
:param project_root: the absolute path to the root of a project.
:param deployment_bucket: s3 deployment bucket object
@@ -150,8 +165,8 @@ def _get_wheel_name(
project_root: str,
dist_folder="dist",
):
- """
- find the name of the wheel we created.
+ """find the name of the wheel we created.
+
:param deployment_bucket_name: s3 deployment bucket name.
:param wheel_deployment_name: name of the wheel of our project.
:param project_root: the absolute path to the root of a project.
@@ -165,8 +180,9 @@ def _get_wheel_name(
return f"s3://{deployment_bucket_name}/{wheel_deployment_name}/{dist_file_names[0].name}"
def _deploy_local_folder(self, include_folder: str) -> None:
- """
- deploy the contents of a local folder from our project to the deployment bucket.
+ """deploy the contents of a local folder from our project to the
+ deployment bucket.
+
:param include_folder: path to the folder
:return: None
"""
diff --git a/datajob/datajob_stack.py b/datajob/datajob_stack.py
index cafff00..47650ca 100644
--- a/datajob/datajob_stack.py
+++ b/datajob/datajob_stack.py
@@ -1,4 +1,5 @@
import os
+from typing import Union
from aws_cdk import core
@@ -31,11 +32,11 @@ def __init__(
:param kwargs: any extra kwargs for the core.Construct
"""
self.scope = scope
- self.stage = self.get_stage(stage)
- self.unique_stack_name = self._create_unique_stack_name(id, self.stage)
self.env = DataJobStack._create_environment_object(
account=account, region=region
)
+ self.stage = self.get_stage(stage)
+ self.unique_stack_name = self._create_unique_stack_name(id, self.stage)
super().__init__(scope=scope, id=self.unique_stack_name, env=self.env, **kwargs)
self.project_root = project_root
self.include_folder = include_folder
@@ -43,16 +44,17 @@ def __init__(
self.context = None
def __enter__(self):
- """
- As soon as we enter the contextmanager, we create the datajob context.
+ """As soon as we enter the contextmanager, we create the datajob
+ context.
+
:return: datajob stack.
"""
self.init_datajob_context()
return self
def __exit__(self, exc_type, exc_value, traceback):
- """
- steps we have to do when exiting the context manager.
+ """steps we have to do when exiting the context manager.
+
- we will create the resources we have defined.
- we will synthesize our stack so that we have everything to deploy.
:param exc_type:
@@ -68,26 +70,26 @@ def add(self, task: str) -> None:
task.create()
@staticmethod
- def _create_unique_stack_name(stack_name: str, stage: str) -> str:
- """
- create a unique name for the datajob stack.
+ def _create_unique_stack_name(stack_name: str, stage: Union[str, None]) -> str:
+ """create a unique name for the datajob stack.
+
:param stack_name: a name for the stack.
:param stage: the stage name we give our pipeline.
:return: a unique name.
"""
- return f"{stack_name}-{stage}"
+ if stage:
+ return f"{stack_name}-{stage}"
+ return stack_name
@staticmethod
def _create_environment_object(account, region) -> core.Environment:
- """
- create an aws cdk Environment object.
+ """create an aws cdk Environment object.
Args:
account: AWS account number: 12 numbers
region: AWS region. e.g. eu-west-1
Returns: AWS cdk Environment object.
-
"""
account = (
account if account is not None else os.environ.get("AWS_DEFAULT_ACCOUNT")
@@ -96,35 +98,31 @@ def _create_environment_object(account, region) -> core.Environment:
return core.Environment(account=account, region=region)
def create_resources(self):
- """create each of the resources of this stack"""
+ """create each of the resources of this stack."""
[resource.create() for resource in self.resources]
def get_stage(self, stage):
"""get the stage parameter and return a default if not found."""
- try:
- if stage:
- logger.debug(
- "a stage parameter is passed directly to the stack object, take this value."
- )
- return stage
- else:
- logger.debug(
- "check cdk context if there is not a stage value provided."
- )
- return self.get_context_parameter(DataJobStack.STAGE_NAME)
-
- except ValueError:
+ if stage:
logger.debug(
- "no stage is provided to the datajob stack object or passed via the cli, taking the default one. "
+ "a stage parameter is passed directly to the stack object, take this value."
)
- return DEFAULT_STACK_STAGE
+ return stage
+ else:
+ logger.debug("check cdk context if there is not a stage value provided.")
+ try:
+ return self.get_context_parameter(DataJobStack.STAGE_NAME)
+ except ValueError:
+ logger.debug("no stage is found on the context. Will return None.")
+ return None
def get_context_parameter(self, name: str) -> str:
"""get a cdk context parameter from the cli."""
context_parameter = self.scope.node.try_get_context(name)
if not context_parameter:
raise ValueError(
- "we expect a stage to be set on the cli. e.g 'cdk deploy -c stage=my-stage'"
+ f"we expect a cdk context parameter to be set on the cli with key {name}. "
+ f"e.g 'cdk deploy -c stage=my-stage' where stage is the key and my-stage is the value."
)
logger.debug(f"context parameter {name} found.")
return context_parameter
@@ -132,8 +130,5 @@ def get_context_parameter(self, name: str) -> str:
def init_datajob_context(self) -> None:
"""Initializes a datajob context."""
self.context = DataJobContext(
- self,
- unique_stack_name=self.unique_stack_name,
- project_root=self.project_root,
- include_folder=self.include_folder,
+ self, project_root=self.project_root, include_folder=self.include_folder
)
diff --git a/datajob_tests/test_datajob_context.py b/datajob_tests/test_datajob_context.py
index 3349dd0..a48b12c 100644
--- a/datajob_tests/test_datajob_context.py
+++ b/datajob_tests/test_datajob_context.py
@@ -5,12 +5,33 @@
class TestDatajobContext(unittest.TestCase):
- def test_datajob_context_initiates_without_error(self):
+ def test_datajob_context_initiates_without_stage(self):
exception_ = None
try:
app = core.App()
djs = DataJobStack(scope=app, id="some-stack-name")
- DataJobContext(djs, unique_stack_name="some-unique-name")
+ djc = DataJobContext(djs)
+ except Exception as e:
+ exception_ = e
+ self.assertIsNone(exception_)
+ # some random characters are appended to the bucketname
+ self.assertIsNone(djc.stage)
+ self.assertTrue(len(djc.data_bucket_name.split("-")[-1]), 4)
+ self.assertTrue(len(djc.deployment_bucket_name.split("-")[-1]), 4)
+
+ def test_datajob_context_with_stage(self):
+ exception_ = None
+ try:
+ stack_name = "some-stack"
+ stage = "some-stage"
+ app = core.App()
+ djs = DataJobStack(scope=app, id=stack_name, stage=stage)
+ djc = DataJobContext(djs)
+ self.assertIsNotNone(djc.stage)
+ self.assertEqual(djc.data_bucket_name, djs.unique_stack_name)
+ self.assertTrue(
+ djc.deployment_bucket_name, f"{djs.unique_stack_name}-deployment-bucket"
+ )
except Exception as e:
exception_ = e
self.assertIsNone(exception_)
diff --git a/datajob_tests/test_datajob_stack.py b/datajob_tests/test_datajob_stack.py
index 120c3fc..daf6fd4 100644
--- a/datajob_tests/test_datajob_stack.py
+++ b/datajob_tests/test_datajob_stack.py
@@ -21,7 +21,7 @@ def test_datajob_stack_initiates_without_error(self):
def test_datajob_stack_with_no_stage(self):
with DataJobStack(scope=self.app, id="datajob-stack-no-stage") as djs:
pass
- self.assertEqual(djs.stage, DEFAULT_STACK_STAGE)
+ self.assertIsNone(djs.stage)
def test_datajob_stack_with_stage_passed_via_cli(self):
stage_value = "some-value"