Skip to content

Commit

Permalink
Merge pull request #74 from vincentclaes/61-simplify-deployment
Browse files Browse the repository at this point in the history
61 simplify deployment
  • Loading branch information
vincentclaes authored Jun 19, 2021
2 parents cf63712 + aeaf58f commit a0662d9
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 112 deletions.
5 changes: 5 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,8 @@ repos:
rev: 21.6b0
hooks:
- id: black
- repo: https://github.com/myint/docformatter
rev: v1.4
hooks:
- id: docformatter
args: [--in-place]
54 changes: 16 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,33 +25,33 @@

# Quickstart

We have a simple data pipeline composed of [2 glue jobs](./examples/data_pipeline_with_packaged_project/glue_jobs/) orchestrated sequentially using step functions.
You can find the full example in [examples/data_pipeline_simple/glue_jobs/](./examples/data_pipeline_simple/glue_jobs/).

We have a simple data pipeline composed of [2 glue jobs](./examples/data_pipeline_simple/glue_jobs/) orchestrated sequentially using step functions.

```python
import pathlib
from aws_cdk import core

from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow


current_dir = pathlib.Path(__file__).parent.absolute()

app = core.App()

# The datajob_stack is the instance that will result in a cloudformation stack.
# We inject the datajob_stack object through all the resources that we want to add.
with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack:

with DataJobStack(scope=app, id="data-pipeline-pkg", project_root=current_dir) as datajob_stack:

# We define 2 glue jobs with the relative path to the source code.
task1 = GlueJob(
datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py"
)

task2 = GlueJob(
datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py"
)

with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow:
# We instantiate a step functions workflow and orchestrate the glue jobs.
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
task1 >> task2

app.synth()
Expand All @@ -70,53 +70,31 @@ export AWS_PROFILE=default
export AWS_ACCOUNT=$(aws sts get-caller-identity --query Account --output text --profile $AWS_PROFILE)
export AWS_DEFAULT_REGION=us-east-2

# init cdk
cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION
```

### Deploy

```shell
export STAGE=$AWS_ACCOUNT
cd examples/data_pipeline_with_packaged_project
datajob deploy --config datajob_stack.py --stage $STAGE --package setuppy
```
Datajob will create s3 buckets based on the `stage` variable.
The stage variable will typically be something like "dev", "stg", "prd", ...
but since S3 buckets need to be globally unique, for this example we will use our `$AWS_ACCOUNT` for the `--stage` parameter.

<details>
<summary>use cdk cli</summary>

```shell script
cd examples/data_pipeline_with_packaged_project
python setup.py bdist_wheel
cdk deploy --app "python datajob_stack.py" -c stage=$STAGE
cd examples/data_pipeline_simple
cdk deploy --app "python datajob_stack.py"
```
</details>

### Run

```shell script
datajob execute --state-machine data-pipeline-pkg-$STAGE-workflow
datajob execute --state-machine data-pipeline-simple-workflow
```
The step function state machine name is constructed as `<datajob_stack.id>-<stage>-<step_functions_workflow.name>`.
The terminal will show a link to the step functions page to follow up on your pipeline run.

### Destroy
![sfn](./assets/sfn.png)

```shell script
datajob destroy --config datajob_stack.py --stage $STAGE
```

<details>
<summary>use cdk cli</summary>
### Destroy

```shell script
cdk destroy --app "python datajob_stack.py" -c stage=$STAGE
cdk destroy --app "python datajob_stack.py"
```
</details>

> Note: you can use any cdk arguments in the datajob cli

# Functionality

Expand Down
Binary file added assets/sfn.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
10 changes: 5 additions & 5 deletions datajob/datajob.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import typer
from stepfunctions.workflow.widgets.utils import create_sfn_execution_url

from datajob import console, DEFAULT_STACK_STAGE
from datajob import console
from datajob.package import wheel
from datajob.stepfunctions import stepfunctions_execute

Expand All @@ -16,7 +16,7 @@


def run():
"""entrypoint for datajob"""
"""entrypoint for datajob."""
app()


Expand All @@ -25,7 +25,7 @@ def run():
)
def deploy(
stage: str = typer.Option(
DEFAULT_STACK_STAGE,
None,
help="the stage of the data pipeline stack you would like to deploy (dev/stg/prd/ ...)",
),
config: str = typer.Option(
Expand Down Expand Up @@ -55,7 +55,7 @@ def deploy(
)
def synthesize(
stage: str = typer.Option(
DEFAULT_STACK_STAGE,
None,
help="the stage of the data pipeline stack you would like to synthesize (dev/stg/prd/ ...)",
),
config: str = typer.Option(
Expand All @@ -77,7 +77,7 @@ def synthesize(
)
def destroy(
stage: str = typer.Option(
DEFAULT_STACK_STAGE,
None,
help="the stage of the data pipeline stack you would like to destroy (dev/stg/prd/ ...)",
),
config: str = typer.Option(
Expand Down
80 changes: 48 additions & 32 deletions datajob/datajob_context.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import uuid
from pathlib import Path

from aws_cdk import core, aws_s3_deployment, aws_s3
Expand All @@ -15,41 +16,38 @@ class DatajobContextWheelError(Exception):


class DataJobContext(core.Construct):
"""
DataJobContext is a class that creates context in order to deploy and run our
pipeline. You have to instantiate this class once per DatajobStack.
"""DataJobContext is a class that creates context in order to deploy and
run our pipeline. You have to instantiate this class once per DatajobStack.
DataJobContext creates:
- data bucket: this is the bucket that you can use to ingest, dump intermediate results and the final ouptut.
- deployment bucket: this is the bucket that holds you code ( scripts, wheel, config, ...)
"""

def __init__(
self,
scope: core.Construct,
unique_stack_name: str,
project_root: str = None,
include_folder: str = None,
**kwargs,
) -> None:
"""
:param scope: aws cdk core construct object.
:param unique_stack_name: a unique name for this stack. like this the name of our resources will not collide with other deployments.
:param stage: stage from DataJobStack.
:param project_root: the path to the root of this project
:param include_folder: specify the name of the folder we would like to include in the deployment bucket.
"""
logger.info("creating datajob context.")
super().__init__(scope, unique_stack_name, **kwargs)
self.unique_stack_name = scope.unique_stack_name
super().__init__(scope, self.unique_stack_name, **kwargs)
self.stage = scope.stage
self.bucket_suffix = None
self.project_root = project_root
self.unique_stack_name = unique_stack_name
(
self.deployment_bucket,
self.deployment_bucket_name,
) = self._create_deployment_bucket(self.unique_stack_name)
(self.data_bucket, self.data_bucket_name) = self._create_data_bucket(
self.unique_stack_name
)
) = self._create_deployment_bucket()
(self.data_bucket, self.data_bucket_name) = self._create_data_bucket()
self.s3_url_wheel = None
if self.project_root:
self.s3_url_wheel = self._deploy_wheel(
Expand All @@ -63,16 +61,17 @@ def __init__(
self._deploy_local_folder(include_folder)
logger.info("datajob context created.")

def _create_data_bucket(self, unique_stack_name: str) -> tuple:
"""
use the unique stack name to create an s3 bucket for your data.
We take an EmptyS3Bucket so that we can remove the stack including the deployment bucket with its contents.
if we take a regular S3 bucket, the bucket will be orphaned from the stack leaving
our account with all oprhaned s3 buckets.
def _create_data_bucket(self) -> tuple:
"""use the unique stack name to create an s3 bucket for your data. We
take an EmptyS3Bucket so that we can remove the stack including the
deployment bucket with its contents. if we take a regular S3 bucket,
the bucket will be orphaned from the stack leaving our account with all
oprhaned s3 buckets.
:param unique_stack_name: the unique stack name of the datajob stack.
:return: s3 bucket object, name of our bucket
"""
data_bucket_name = f"{unique_stack_name}"
data_bucket_name = self._get_unique_bucket_name()
# todo - can we validate the bucket name?
logger.debug(f"creating deployment bucket {data_bucket_name}")
data_bucket = EmptyS3Bucket(
Expand All @@ -85,16 +84,18 @@ def _create_data_bucket(self, unique_stack_name: str) -> tuple:
)
return data_bucket, data_bucket_name

def _create_deployment_bucket(self, unique_stack_name: str) -> tuple:
"""
use the unique stack name to create an s3 bucket for deployment purposes.
We take an EmptyS3Bucket so that we can remove the stack including the deployment bucket with its contents.
if we take a regular S3 bucket, the bucket will be orphaned from the stack leaving
our account with all oprhaned s3 buckets.
def _create_deployment_bucket(self) -> tuple:
"""use the unique stack name to create an s3 bucket for deployment
purposes. We take an EmptyS3Bucket so that we can remove the stack
including the deployment bucket with its contents. if we take a regular
S3 bucket, the bucket will be orphaned from the stack leaving our
account with all oprhaned s3 buckets.
:param unique_stack_name: the unique stack name of the datajob stack.
:return: s3 bucket object, name of our bucket
"""
deployment_bucket_name = f"{unique_stack_name}-deployment-bucket"
unique_bucket_name = self._get_unique_bucket_name()
deployment_bucket_name = f"{unique_bucket_name}-deployment-bucket"
# todo - can we validate the bucket name?
logger.debug(f"creating deployment bucket {deployment_bucket_name}")
deployment_bucket = EmptyS3Bucket(
Expand All @@ -105,15 +106,29 @@ def _create_deployment_bucket(self, unique_stack_name: str) -> tuple:
)
return deployment_bucket, deployment_bucket_name

def _get_unique_bucket_name(self):
"""if a stage is specified we use the unique_stack_name, if no stage is
specified we return some random characters to have a high chance of a
unique name."""
if self.stage:
logger.debug(
"We have a stage therefore we have a unique name for our bucket."
)
return self.unique_stack_name
logger.debug(
"We don't have a stage, therefore we generate a random value for the bucketname."
)
return f"{self.unique_stack_name}-{uuid.uuid4().hex[:4]}"

def _deploy_wheel(
self,
unique_stack_name: str,
project_root: str,
deployment_bucket: aws_s3.Bucket,
deployment_bucket_name: str,
) -> str:
"""
Create a wheel and add the .whl file to the deployment bucket.
"""Create a wheel and add the .whl file to the deployment bucket.
:param unique_stack_name: the unique stack name of the datajob stack.
:param project_root: the absolute path to the root of a project.
:param deployment_bucket: s3 deployment bucket object
Expand Down Expand Up @@ -150,8 +165,8 @@ def _get_wheel_name(
project_root: str,
dist_folder="dist",
):
"""
find the name of the wheel we created.
"""find the name of the wheel we created.
:param deployment_bucket_name: s3 deployment bucket name.
:param wheel_deployment_name: name of the wheel of our project.
:param project_root: the absolute path to the root of a project.
Expand All @@ -165,8 +180,9 @@ def _get_wheel_name(
return f"s3://{deployment_bucket_name}/{wheel_deployment_name}/{dist_file_names[0].name}"

def _deploy_local_folder(self, include_folder: str) -> None:
"""
deploy the contents of a local folder from our project to the deployment bucket.
"""deploy the contents of a local folder from our project to the
deployment bucket.
:param include_folder: path to the folder
:return: None
"""
Expand Down
Loading

0 comments on commit a0662d9

Please sign in to comment.