From c363d341eaa9d23dc8f0a7dcb6083c4cfdcdb3af Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 08:25:31 +0200 Subject: [PATCH 01/21] update readme --- README.md | 205 +++++++++++++++--- datajob/datajob_stack.py | 2 +- .../datajob_stack_explicit.py | 33 +++ 3 files changed, 203 insertions(+), 37 deletions(-) create mode 100644 examples/data_pipeline_with_packaged_project/datajob_stack_explicit.py diff --git a/README.md b/README.md index 0bc9b1d..44a3bf6 100644 --- a/README.md +++ b/README.md @@ -23,27 +23,38 @@ We have a simple data pipeline composed of 2 glue jobs orchestrated sequentially. ```python +import pathlib from aws_cdk import core from datajob.datajob_stack import DataJobStack from datajob.glue.glue_job import GlueJob from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow + +current_dir = pathlib.Path(__file__).parent.absolute() + app = core.App() -# The datajob_stack is the instance that will result in a cloudformation stack. -# We inject the datajob_stack object through all the resources that we want to add. -with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack: +# the datajob_stack is the instance that will result in a cloudformation stack. +# the path project_root helps datajob_stack package the project as a wheel +# by default datajob_stack will provision 2 s3 buckets. +# one s3 bucket for deploying packages, files, ... and one s3 bucket to be used by the data pipeline. +with DataJobStack( + scope=app, id="data-pipeline-pkg", project_root=current_dir +) as datajob_stack: - # We define 2 glue jobs with the relative path to the source code. + # here we define 2 glue jobs with the path to the source code. + # we inject the datajob_stack object through all the resources that we want to add. task1 = GlueJob( datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py" ) + task2 = GlueJob( datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" ) - # We instantiate a step functions workflow and orchestrate the glue jobs. + # we instantiate a step functions workflow + # and orchestrate the glue jobs. with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: task1 >> task2 @@ -51,10 +62,11 @@ app.synth() ``` -We add the code in a file called `datajob_stack.py` in the [root of the project](./examples/data_pipeline_simple/). +We add the code in a file called `datajob_stack.py` in the [root of the project](./examples/data_pipeline_with_packaged_project/). ### Configure CDK +Follow the steps [here](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html#cli-configure-quickstart-config) to configure your credentials. ```shell script export AWS_PROFILE=my-profile # e.g. default @@ -66,27 +78,29 @@ cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION ``` ### Deploy -_cdk cli_ +We want to create a unique stack containing 2 glue jobs +with our project and it's depenencies available as a wheel. + +_cdk cli_ ```shell script -cd examples/data_pipeline_simple +cd examples/data_pipeline_with_packaged_project python setup.py bdist_wheel cdk deploy --app "python datajob_stack.py" ``` _datajob cli_ - ```shell script -cd examples/data_pipeline_simple +cd examples/data_pipeline_with_packaged_project datajob deploy --config datajob_stack.py --package setuppy ``` -After running the `deploy` command, the code of glue jobs are deployed and orchestrated. +After running the `deploy` command, the glue jobs are deployed and the orchestration is configured. ### Run ```shell script -datajob execute --state-machine data-pipeline-simple-dev-workflow +datajob execute --state-machine data-pipeline-pkg-dev-workflow ``` ### Destroy @@ -103,46 +117,160 @@ datajob destroy --config datajob_stack.py # Functionality
-Pass arguments to a glue job -#todo implemented not documented +Using datajob's S3 data bucket to pass arguments to a glue job + +Pass arguments to your Glue job using the `arguments` parameter and +dynamically reference the `datajob_stack` data bucket name for this `stage` to the arguments. + +```python +import pathlib + +from aws_cdk import core +from datajob.datajob_stack import DataJobStack +from datajob.glue.glue_job import GlueJob +from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow + +current_dir = str(pathlib.Path(__file__).parent.absolute()) + +app = core.App() + +with DataJobStack( + scope=app, id="datajob-python-pyspark", project_root=current_dir +) as datajob_stack: + + pyspark_job = GlueJob( + datajob_stack=datajob_stack, + name="pyspark-job", + job_path="glue_job/glue_pyspark_example.py", + job_type="glueetl", + glue_version="2.0", # we only support glue 2.0 + python_version="3", + worker_type="Standard", # options are Standard / G.1X / G.2X + number_of_workers=1, + arguments={ + "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv", + "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet", + }, + ) + + with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: + pyspark_job >> ... + +``` + +deploy to stage `my-stage`: + +```shell +datajob deploy --config datajob_stack.py --stage my-stage --package setuppy +``` + +`datajob_stack.context.data_bucket_name` will evaluate to `datajob-python-pyspark-my-stage # -` + +you can find this example [here](./examples/data_pipeline_pyspark/glue_job/glue_pyspark_example.py) +
Deploy files to deployment bucket -#todo implemented not documented + +Specify the path to the folder we would like to include in the deployment bucket. + +```python + +from aws_cdk import core +from datajob.datajob_stack import DataJobStack + +app = core.App() + +with DataJobStack( + scope=app, id="some-stack-name", include_folder="path/to/folder/" +) as datajob_stack: + + ... + +``` +
Package project -#todo implemented not documented + +Package you project using [poetry](https://python-poetry.org/) + +```shell +datajob deploy --config datajob_stack.py --package poetry +``` +Package you project using [setup.py](./examples/data_pipeline_with_packaged_project) +```shell +datajob deploy --config datajob_stack.py --package setuppy +```
Using Pyspark ```python -pyspark_job = GlueJob( - datajob_stack=datajob_stack, - name="pyspark-job", - job_path="glue_job/glue_pyspark_example.py", - job_type="glueetl", - glue_version="2.0", # we only support glue 2.0 - python_version="3", - worker_type="Standard", # options are Standard / G.1X / G.2X - number_of_workers=1, - arguments={ - "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv", - "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet", - } -) +import pathlib + +from aws_cdk import core +from datajob.datajob_stack import DataJobStack +from datajob.glue.glue_job import GlueJob +from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow + +current_dir = str(pathlib.Path(__file__).parent.absolute()) + +app = core.App() + +with DataJobStack( + scope=app, id="datajob-python-pyspark", project_root=current_dir +) as datajob_stack: + + pyspark_job = GlueJob( + datajob_stack=datajob_stack, + name="pyspark-job", + job_path="glue_job/glue_pyspark_example.py", + job_type="glueetl", + glue_version="2.0", # we only support glue 2.0 + python_version="3", + worker_type="Standard", # options are Standard / G.1X / G.2X + number_of_workers=1, + arguments={ + "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv", + "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet", + }, + ) ``` full example can be found in [examples/data_pipeline_pyspark](examples/data_pipeline_pyspark]).
-Using S3 bucket to dump data -#todo implemented not documented -# create an example that dumps and reads from s3 +Using datajob's S3 data bucket + + +```python + +with DataJobStack( + scope=app, id="datajob-python-pyspark", project_root=current_dir +) as datajob_stack: + + pyspark_job = GlueJob( + datajob_stack=datajob_stack, + name="pyspark-job", + job_path="glue_job/glue_pyspark_example.py", + job_type="glueetl", + glue_version="2.0", # we only support glue 2.0 + python_version="3", + worker_type="Standard", # options are Standard / G.1X / G.2X + number_of_workers=1, + arguments={ + "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv", + "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet", + }, + ) + +``` + +
@@ -150,7 +278,7 @@ full example can be found in [examples/data_pipeline_pyspark](examples/data_pipe ```python # task1 and task2 are orchestrated in parallel. -# task 3 will only start when both task1 and task2 have succeeded. +# task3 will only start when both task1 and task2 have succeeded. [task1, task2] >> task3 ``` @@ -159,7 +287,7 @@ full example can be found in [examples/data_pipeline_pyspark](examples/data_pipe
Orchestrate 1 stepfunction task -Use the [Ellipsis](https://docs.python.org/dev/library/constants.html#Ellipsis) object to be able to orchestrate a job via step functions. +Use the [Ellipsis](https://docs.python.org/dev/library/constants.html#Ellipsis) object to be able to orchestrate 1 job via step functions. ```python some_task >> ... @@ -195,6 +323,7 @@ when __exiting the context manager__ all the resources of our DataJobStack objec We can write the above example more explicitly... ```python +import pathlib from aws_cdk import core from datajob.datajob_stack import DataJobStack @@ -203,7 +332,11 @@ from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow app = core.App() -datajob_stack = DataJobStack(scope=app, id="data-pipeline-simple") +current_dir = pathlib.Path(__file__).parent.absolute() + +app = core.App() + +datajob_stack = DataJobStack(scope=app, id="data-pipeline-pkg", project_root=current_dir) datajob_stack.init_datajob_context() task1 = GlueJob(datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py") diff --git a/datajob/datajob_stack.py b/datajob/datajob_stack.py index d251ed2..175b940 100644 --- a/datajob/datajob_stack.py +++ b/datajob/datajob_stack.py @@ -25,7 +25,7 @@ def __init__( :param id: a name for this stack. :param stage: the stage name to which we are deploying :param project_root: the path to the root of this project - :param include_folder: specify the name of the folder we would like to include in the deployment bucket. + :param include_folder: specify the path to the folder we would like to include in the deployment bucket. :param account: AWS account number :param region: AWS region where we want to deploy our datajob to :param kwargs: any extra kwargs for the core.Construct diff --git a/examples/data_pipeline_with_packaged_project/datajob_stack_explicit.py b/examples/data_pipeline_with_packaged_project/datajob_stack_explicit.py new file mode 100644 index 0000000..6d5c88a --- /dev/null +++ b/examples/data_pipeline_with_packaged_project/datajob_stack_explicit.py @@ -0,0 +1,33 @@ +""" +same as ./datajob_stack.py but more explicit +""" +import pathlib +from aws_cdk import core + +from datajob.datajob_stack import DataJobStack +from datajob.glue.glue_job import GlueJob +from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow + +app = core.App() + +current_dir = pathlib.Path(__file__).parent.absolute() + +app = core.App() + +datajob_stack = DataJobStack( + scope=app, id="data-pipeline-pkg", project_root=current_dir +) +datajob_stack.init_datajob_context() + +task1 = GlueJob( + datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py" +) +task2 = GlueJob( + datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" +) + +with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: + task1 >> task2 + +datajob_stack.create_resources() +app.synth() From f5140e41b9becd4ec8805c354ad3e6a82c0be9b4 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 12:46:12 +0200 Subject: [PATCH 02/21] update readme --- README.md | 59 ++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index 44a3bf6..7052790 100644 --- a/README.md +++ b/README.md @@ -35,10 +35,7 @@ current_dir = pathlib.Path(__file__).parent.absolute() app = core.App() -# the datajob_stack is the instance that will result in a cloudformation stack. -# the path project_root helps datajob_stack package the project as a wheel -# by default datajob_stack will provision 2 s3 buckets. -# one s3 bucket for deploying packages, files, ... and one s3 bucket to be used by the data pipeline. + with DataJobStack( scope=app, id="data-pipeline-pkg", project_root=current_dir ) as datajob_stack: @@ -79,20 +76,23 @@ cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION ### Deploy -We want to create a unique stack containing 2 glue jobs -with our project and it's depenencies available as a wheel. +Create a stack containing 2 glue jobs with our packaged project and its dependencies, orchestrated using step functions. -_cdk cli_ +Datajob will create s3 buckets based on the datajob stack id and the stage variable. +The stage variable will typically be something like "dev", "stg", "prd", ... +but since S3 buckets need to be globally unique we will use our $AWS_ACCOUNT for the `--stage` parameter. + +_datajob cli_ ```shell script cd examples/data_pipeline_with_packaged_project -python setup.py bdist_wheel -cdk deploy --app "python datajob_stack.py" +datajob deploy --config datajob_stack.py --stage $AWS_ACCOUNT --package setuppy ``` -_datajob cli_ +_cdk cli_ ```shell script cd examples/data_pipeline_with_packaged_project -datajob deploy --config datajob_stack.py --package setuppy +python setup.py bdist_wheel +cdk deploy --app "python datajob_stack.py" -c stage=$AWS_ACCOUNT ``` After running the `deploy` command, the glue jobs are deployed and the orchestration is configured. @@ -100,20 +100,22 @@ After running the `deploy` command, the glue jobs are deployed and the orchestra ### Run ```shell script -datajob execute --state-machine data-pipeline-pkg-dev-workflow +datajob execute --state-machine data-pipeline-pkg-$AWS_ACCOUNT-workflow ``` ### Destroy -_cdk cli_ + +_datajob cli_ ```shell script -cdk destroy --app "python datajob_stack.py" +datajob destroy --config datajob_stack.py --stage $AWS_ACCOUNT ``` -_datajob cli_ +_cdk cli_ ```shell script -datajob destroy --config datajob_stack.py +cdk destroy --app "python datajob_stack.py" -c stage=$AWS_ACCOUNT ``` + # Functionality
@@ -296,10 +298,25 @@ some_task >> ...
-# The magic behind datajob +# Datajob in depth + +The `datajob_stack` is the instance that will result in a cloudformation stack. +The path in `project_root` helps datajob_stack locate the root of the project where +the setup.py/poetry pyproject.toml file can be found as well as the `dist/` folder with the wheel of your project . ```python -with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack: +import pathlib +from aws_cdk import core + +from datajob.datajob_stack import DataJobStack + +current_dir = pathlib.Path(__file__).parent.absolute() +app = core.App() + +with DataJobStack( + scope=app, id="data-pipeline-pkg", project_root=current_dir +) as datajob_stack: + ... ``` @@ -366,10 +383,8 @@ These are the ideas, we find interesting to implement; - processing jobs - hyperparameter tuning jobs - training jobs - - create sagemaker model - - create sagemaker endpoint - - expose sagemaker endpoint to the internet by levering lambda + api gateway - +- implement lambda +- implement ECS Fargate - create a serverless UI that follows up on the different pipelines deployed on possibly different AWS accounts using Datajob > [Feedback](https://github.com/vincentclaes/datajob/discussions) is much appreciated! From 25c18a2c51311a898dd0ef69ef0d627238cc5c02 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 12:46:58 +0200 Subject: [PATCH 03/21] update readme --- README.md | 4 ---- 1 file changed, 4 deletions(-) diff --git a/README.md b/README.md index 7052790..a7107fe 100644 --- a/README.md +++ b/README.md @@ -40,8 +40,6 @@ with DataJobStack( scope=app, id="data-pipeline-pkg", project_root=current_dir ) as datajob_stack: - # here we define 2 glue jobs with the path to the source code. - # we inject the datajob_stack object through all the resources that we want to add. task1 = GlueJob( datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py" ) @@ -50,8 +48,6 @@ with DataJobStack( datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" ) - # we instantiate a step functions workflow - # and orchestrate the glue jobs. with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: task1 >> task2 From a122832b09036358a0864732423d4b9061bc2e6e Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 12:47:51 +0200 Subject: [PATCH 04/21] update readme --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index a7107fe..8a6b2c5 100644 --- a/README.md +++ b/README.md @@ -41,12 +41,12 @@ with DataJobStack( ) as datajob_stack: task1 = GlueJob( - datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py" - ) + datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py" + ) task2 = GlueJob( - datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" - ) + datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" + ) with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: task1 >> task2 From 0689a87bf88c0da061bdebaf41908910b5d90c86 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 12:48:34 +0200 Subject: [PATCH 05/21] update readme --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 8a6b2c5..a7107fe 100644 --- a/README.md +++ b/README.md @@ -41,12 +41,12 @@ with DataJobStack( ) as datajob_stack: task1 = GlueJob( - datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py" - ) + datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py" + ) task2 = GlueJob( - datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" - ) + datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" + ) with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: task1 >> task2 From d884ee2fd6c8537661e528f8137267c2bf7c3e58 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 12:53:15 +0200 Subject: [PATCH 06/21] update readme --- README.md | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index a7107fe..21f181d 100644 --- a/README.md +++ b/README.md @@ -78,37 +78,44 @@ Datajob will create s3 buckets based on the datajob stack id and the stage varia The stage variable will typically be something like "dev", "stg", "prd", ... but since S3 buckets need to be globally unique we will use our $AWS_ACCOUNT for the `--stage` parameter. +```shell +export STAGE=$AWS_ACCOUNT +``` + _datajob cli_ ```shell script cd examples/data_pipeline_with_packaged_project -datajob deploy --config datajob_stack.py --stage $AWS_ACCOUNT --package setuppy +datajob deploy --config datajob_stack.py --stage $STAGE --package setuppy ``` _cdk cli_ ```shell script cd examples/data_pipeline_with_packaged_project python setup.py bdist_wheel -cdk deploy --app "python datajob_stack.py" -c stage=$AWS_ACCOUNT +cdk deploy --app "python datajob_stack.py" -c stage=$STAGE ``` After running the `deploy` command, the glue jobs are deployed and the orchestration is configured. ### Run +The step function statemachine name is constructed as `--` +To run it execute: + ```shell script -datajob execute --state-machine data-pipeline-pkg-$AWS_ACCOUNT-workflow +datajob execute --state-machine data-pipeline-pkg-$STAGE-workflow ``` ### Destroy _datajob cli_ ```shell script -datajob destroy --config datajob_stack.py --stage $AWS_ACCOUNT +datajob destroy --config datajob_stack.py --stage $STAGE ``` _cdk cli_ ```shell script -cdk destroy --app "python datajob_stack.py" -c stage=$AWS_ACCOUNT +cdk destroy --app "python datajob_stack.py" -c stage=$STAGE ``` From 2493f55cbee7fad7b0e9555e15721e47f302af5b Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 13:02:54 +0200 Subject: [PATCH 07/21] update readme --- README.md | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 21f181d..dc6a73b 100644 --- a/README.md +++ b/README.md @@ -74,26 +74,28 @@ cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION Create a stack containing 2 glue jobs with our packaged project and its dependencies, orchestrated using step functions. -Datajob will create s3 buckets based on the datajob stack id and the stage variable. -The stage variable will typically be something like "dev", "stg", "prd", ... -but since S3 buckets need to be globally unique we will use our $AWS_ACCOUNT for the `--stage` parameter. - ```shell export STAGE=$AWS_ACCOUNT ``` -_datajob cli_ +Datajob will create s3 buckets based on the `` and the `stage` variable. +The stage variable will typically be something like "dev", "stg", "prd", ... +but since S3 buckets need to be globally unique we will use our `$AWS_ACCOUNT` for the `--stage` parameter. + ```shell script cd examples/data_pipeline_with_packaged_project datajob deploy --config datajob_stack.py --stage $STAGE --package setuppy ``` -_cdk cli_ +
+use cdk cli + ```shell script cd examples/data_pipeline_with_packaged_project python setup.py bdist_wheel cdk deploy --app "python datajob_stack.py" -c stage=$STAGE ``` +
After running the `deploy` command, the glue jobs are deployed and the orchestration is configured. @@ -108,16 +110,18 @@ datajob execute --state-machine data-pipeline-pkg-$STAGE-workflow ### Destroy -_datajob cli_ ```shell script datajob destroy --config datajob_stack.py --stage $STAGE ``` -_cdk cli_ +
+use cdk cli ```shell script cdk destroy --app "python datajob_stack.py" -c stage=$STAGE ``` +
+> Note: you can use any cdk arguments in the datajob cli # Functionality From 94e566140769827b8446dc1371add77a1e86be26 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:16:18 +0200 Subject: [PATCH 08/21] update readme --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index dc6a73b..5cbbf51 100644 --- a/README.md +++ b/README.md @@ -74,14 +74,14 @@ cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION Create a stack containing 2 glue jobs with our packaged project and its dependencies, orchestrated using step functions. -```shell -export STAGE=$AWS_ACCOUNT -``` - Datajob will create s3 buckets based on the `` and the `stage` variable. The stage variable will typically be something like "dev", "stg", "prd", ... but since S3 buckets need to be globally unique we will use our `$AWS_ACCOUNT` for the `--stage` parameter. +```shell +export STAGE=$AWS_ACCOUNT +``` + ```shell script cd examples/data_pipeline_with_packaged_project datajob deploy --config datajob_stack.py --stage $STAGE --package setuppy @@ -173,7 +173,7 @@ deploy to stage `my-stage`: datajob deploy --config datajob_stack.py --stage my-stage --package setuppy ``` -`datajob_stack.context.data_bucket_name` will evaluate to `datajob-python-pyspark-my-stage # -` +`datajob_stack.context.data_bucket_name` will evaluate to `datajob-python-pyspark-my-stage you can find this example [here](./examples/data_pipeline_pyspark/glue_job/glue_pyspark_example.py) From 3938d37dd3a2644e8d904ceb09e2089893c75ed0 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:17:51 +0200 Subject: [PATCH 09/21] update readme --- README.md | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/README.md b/README.md index 5cbbf51..de009fa 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,7 @@ # Quickstart -### Configuration - -We have a simple data pipeline composed of 2 glue jobs orchestrated sequentially. +We have a simple data pipeline composed of [2 glue jobs](./examples/data_pipeline_with_packaged_project/glue_jobs/) orchestrated sequentially. ```python import pathlib From 6a38144fcef90e8f9ad14d7f53064bf8d8b67621 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:22:34 +0200 Subject: [PATCH 10/21] update readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index de009fa..acb70e9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Datajob -#### Build and deploy a serverless data pipeline with no effort on AWS. +#### Build and deploy a serverless data pipeline on AWS with no effort. - We support python shell / pyspark Glue jobs. - Orchestrate using stepfunctions as simple as `task1 >> [task2,task3] >> task4` @@ -53,7 +53,7 @@ app.synth() ``` -We add the code in a file called `datajob_stack.py` in the [root of the project](./examples/data_pipeline_with_packaged_project/). +We add the above code in a file called `datajob_stack.py` in the [root of the project](./examples/data_pipeline_with_packaged_project/). ### Configure CDK From 50666065cfdc3dcaa0dc17f2cc622404fde0064e Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:25:05 +0200 Subject: [PATCH 11/21] update readme --- README.md | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index acb70e9..911e452 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ # Quickstart -We have a simple data pipeline composed of [2 glue jobs](./examples/data_pipeline_with_packaged_project/glue_jobs/) orchestrated sequentially. +We have a simple data pipeline composed of [2 glue jobs](./examples/data_pipeline_with_packaged_project/glue_jobs/) orchestrated sequentially using step functions. ```python import pathlib @@ -70,11 +70,9 @@ cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION ### Deploy -Create a stack containing 2 glue jobs with our packaged project and its dependencies, orchestrated using step functions. - -Datajob will create s3 buckets based on the `` and the `stage` variable. +Datajob will create s3 buckets based on the `datajob_stack.id` and the `stage` variable. The stage variable will typically be something like "dev", "stg", "prd", ... -but since S3 buckets need to be globally unique we will use our `$AWS_ACCOUNT` for the `--stage` parameter. +but since S3 buckets need to be globally unique, for this example we will use our `$AWS_ACCOUNT` for the `--stage` parameter. ```shell export STAGE=$AWS_ACCOUNT From 0dd814ce77fc785e6825a6ea28920a4eb38cede8 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:30:47 +0200 Subject: [PATCH 12/21] update readme --- README.md | 6 +++--- .../data_pipeline_with_packaged_project/datajob_stack.py | 4 +++- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 911e452..b259e2c 100644 --- a/README.md +++ b/README.md @@ -46,7 +46,7 @@ with DataJobStack( datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" ) - with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: + with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow: task1 >> task2 app.synth() @@ -97,7 +97,7 @@ After running the `deploy` command, the glue jobs are deployed and the orchestra ### Run -The step function statemachine name is constructed as `--` +The step function state machine name is constructed as `--` To run it execute: ```shell script @@ -362,7 +362,7 @@ datajob_stack.init_datajob_context() task1 = GlueJob(datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py") task2 = GlueJob(datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py") -with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: +with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow: task1 >> task2 datajob_stack.create_resources() diff --git a/examples/data_pipeline_with_packaged_project/datajob_stack.py b/examples/data_pipeline_with_packaged_project/datajob_stack.py index 3f96557..69b9723 100644 --- a/examples/data_pipeline_with_packaged_project/datajob_stack.py +++ b/examples/data_pipeline_with_packaged_project/datajob_stack.py @@ -27,7 +27,9 @@ # we instantiate a step functions workflow # and orchestrate the glue jobs. - with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: + with StepfunctionsWorkflow( + datajob_stack=datajob_stack, name="workflow" + ) as step_functions_workflow: task1 >> task2 app.synth() From b383c8320f0cb707fe62e5c6d6b336d8a4eebe36 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:33:29 +0200 Subject: [PATCH 13/21] update readme --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b259e2c..8f0458d 100644 --- a/README.md +++ b/README.md @@ -93,16 +93,18 @@ cdk deploy --app "python datajob_stack.py" -c stage=$STAGE ```
-After running the `deploy` command, the glue jobs are deployed and the orchestration is configured. +Your glue jobs are deployed and the orchestration is configured. ### Run -The step function state machine name is constructed as `--` +The step function state machine name is constructed as `--`. + To run it execute: ```shell script datajob execute --state-machine data-pipeline-pkg-$STAGE-workflow ``` +The terminal will output a link to the step functions page to follow up on your pipeline run. ### Destroy From 12bdbc68d080429e034bb418dbe2ff2be9ba83eb Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:35:49 +0200 Subject: [PATCH 14/21] update readme --- README.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/README.md b/README.md index 8f0458d..9335e61 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,8 @@ but since S3 buckets need to be globally unique, for this example we will use ou export STAGE=$AWS_ACCOUNT ``` +Navigate to `datajob_stack.py` file and deploy the data pipeline. + ```shell script cd examples/data_pipeline_with_packaged_project datajob deploy --config datajob_stack.py --stage $STAGE --package setuppy From 7202fc1a040391a0591a32e3bbe062c2e8c246a8 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:37:56 +0200 Subject: [PATCH 15/21] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 9335e61..17eea25 100644 --- a/README.md +++ b/README.md @@ -101,7 +101,7 @@ Your glue jobs are deployed and the orchestration is configured. The step function state machine name is constructed as `--`. -To run it execute: +To run your data pipeline execute: ```shell script datajob execute --state-machine data-pipeline-pkg-$STAGE-workflow From fb29bd426537fd304c5b240a3d4733f5c8b760a8 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:38:28 +0200 Subject: [PATCH 16/21] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 17eea25..05be0d9 100644 --- a/README.md +++ b/README.md @@ -126,7 +126,7 @@ cdk destroy --app "python datajob_stack.py" -c stage=$STAGE # Functionality
-Using datajob's S3 data bucket to pass arguments to a glue job +Using datajob's S3 data bucket Pass arguments to your Glue job using the `arguments` parameter and dynamically reference the `datajob_stack` data bucket name for this `stage` to the arguments. From 713cff6b02afebef30706db7b9bdbcbefcdb7204 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:39:56 +0200 Subject: [PATCH 17/21] update readme --- README.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/README.md b/README.md index 05be0d9..a85a94c 100644 --- a/README.md +++ b/README.md @@ -128,8 +128,7 @@ cdk destroy --app "python datajob_stack.py" -c stage=$STAGE
Using datajob's S3 data bucket -Pass arguments to your Glue job using the `arguments` parameter and -dynamically reference the `datajob_stack` data bucket name for this `stage` to the arguments. +Dynamically reference the `datajob_stack` data bucket name for this `stage` to the arguments. ```python import pathlib From 1049fcd2c9b1d411995cf057ee6b443d24cc837b Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:43:13 +0200 Subject: [PATCH 18/21] update readme --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index a85a94c..4ccbe2b 100644 --- a/README.md +++ b/README.md @@ -128,7 +128,8 @@ cdk destroy --app "python datajob_stack.py" -c stage=$STAGE
Using datajob's S3 data bucket -Dynamically reference the `datajob_stack` data bucket name for this `stage` to the arguments. +Dynamically reference the `datajob_stack` data bucket name to the arguments of your GlueJob by calling +`datajob_stack.context.data_bucket_name`. ```python import pathlib From ed644f46b94ce6f5a8de279ac24ce5a4b7a99502 Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:43:51 +0200 Subject: [PATCH 19/21] update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 4ccbe2b..7bec06d 100644 --- a/README.md +++ b/README.md @@ -173,7 +173,7 @@ deploy to stage `my-stage`: datajob deploy --config datajob_stack.py --stage my-stage --package setuppy ``` -`datajob_stack.context.data_bucket_name` will evaluate to `datajob-python-pyspark-my-stage +`datajob_stack.context.data_bucket_name` will evaluate to `datajob-python-pyspark-my-stage` you can find this example [here](./examples/data_pipeline_pyspark/glue_job/glue_pyspark_example.py) From 5310b537a40b8e0864c6ce380e941f3fb2c2ebff Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:48:42 +0200 Subject: [PATCH 20/21] update readme --- README.md | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/README.md b/README.md index 7bec06d..de540bd 100644 --- a/README.md +++ b/README.md @@ -252,36 +252,6 @@ with DataJobStack( full example can be found in [examples/data_pipeline_pyspark](examples/data_pipeline_pyspark]).
-
-Using datajob's S3 data bucket - - -```python - -with DataJobStack( - scope=app, id="datajob-python-pyspark", project_root=current_dir -) as datajob_stack: - - pyspark_job = GlueJob( - datajob_stack=datajob_stack, - name="pyspark-job", - job_path="glue_job/glue_pyspark_example.py", - job_type="glueetl", - glue_version="2.0", # we only support glue 2.0 - python_version="3", - worker_type="Standard", # options are Standard / G.1X / G.2X - number_of_workers=1, - arguments={ - "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv", - "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet", - }, - ) - -``` - - -
-
Orchestrate stepfunctions tasks in parallel From ae71c554a90013b8f58c35fb0c2c5a3ba911e1fb Mon Sep 17 00:00:00 2001 From: vincent Date: Sat, 3 Apr 2021 14:50:03 +0200 Subject: [PATCH 21/21] update readme --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index de540bd..eab6c2c 100644 --- a/README.md +++ b/README.md @@ -278,8 +278,8 @@ some_task >> ... # Datajob in depth The `datajob_stack` is the instance that will result in a cloudformation stack. -The path in `project_root` helps datajob_stack locate the root of the project where -the setup.py/poetry pyproject.toml file can be found as well as the `dist/` folder with the wheel of your project . +The path in `project_root` helps `datajob_stack` locate the root of the project where +the setup.py/poetry pyproject.toml file can be found, as well as the `dist/` folder with the wheel of your project . ```python import pathlib