diff --git a/README.md b/README.md index 0bc9b1d..eab6c2c 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Datajob -#### Build and deploy a serverless data pipeline with no effort on AWS. +#### Build and deploy a serverless data pipeline on AWS with no effort. - We support python shell / pyspark Glue jobs. - Orchestrate using stepfunctions as simple as `task1 >> [task2,task3] >> task4` @@ -18,43 +18,46 @@ # Quickstart -### Configuration - -We have a simple data pipeline composed of 2 glue jobs orchestrated sequentially. +We have a simple data pipeline composed of [2 glue jobs](./examples/data_pipeline_with_packaged_project/glue_jobs/) orchestrated sequentially using step functions. ```python +import pathlib from aws_cdk import core from datajob.datajob_stack import DataJobStack from datajob.glue.glue_job import GlueJob from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow + +current_dir = pathlib.Path(__file__).parent.absolute() + app = core.App() -# The datajob_stack is the instance that will result in a cloudformation stack. -# We inject the datajob_stack object through all the resources that we want to add. -with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack: - # We define 2 glue jobs with the relative path to the source code. +with DataJobStack( + scope=app, id="data-pipeline-pkg", project_root=current_dir +) as datajob_stack: + task1 = GlueJob( datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py" ) + task2 = GlueJob( datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" ) - # We instantiate a step functions workflow and orchestrate the glue jobs. - with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: + with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow: task1 >> task2 app.synth() ``` -We add the code in a file called `datajob_stack.py` in the [root of the project](./examples/data_pipeline_simple/). +We add the above code in a file called `datajob_stack.py` in the [root of the project](./examples/data_pipeline_with_packaged_project/). ### Configure CDK +Follow the steps [here](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html#cli-configure-quickstart-config) to configure your credentials. ```shell script export AWS_PROFILE=my-profile # e.g. default @@ -66,91 +69,195 @@ cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION ``` ### Deploy -_cdk cli_ + +Datajob will create s3 buckets based on the `datajob_stack.id` and the `stage` variable. +The stage variable will typically be something like "dev", "stg", "prd", ... +but since S3 buckets need to be globally unique, for this example we will use our `$AWS_ACCOUNT` for the `--stage` parameter. + +```shell +export STAGE=$AWS_ACCOUNT +``` + +Navigate to `datajob_stack.py` file and deploy the data pipeline. ```shell script -cd examples/data_pipeline_simple -python setup.py bdist_wheel -cdk deploy --app "python datajob_stack.py" +cd examples/data_pipeline_with_packaged_project +datajob deploy --config datajob_stack.py --stage $STAGE --package setuppy ``` -_datajob cli_ +
+use cdk cli ```shell script -cd examples/data_pipeline_simple -datajob deploy --config datajob_stack.py --package setuppy +cd examples/data_pipeline_with_packaged_project +python setup.py bdist_wheel +cdk deploy --app "python datajob_stack.py" -c stage=$STAGE ``` +
-After running the `deploy` command, the code of glue jobs are deployed and orchestrated. +Your glue jobs are deployed and the orchestration is configured. ### Run +The step function state machine name is constructed as `--`. + +To run your data pipeline execute: + ```shell script -datajob execute --state-machine data-pipeline-simple-dev-workflow +datajob execute --state-machine data-pipeline-pkg-$STAGE-workflow ``` +The terminal will output a link to the step functions page to follow up on your pipeline run. ### Destroy -_cdk cli_ + ```shell script -cdk destroy --app "python datajob_stack.py" +datajob destroy --config datajob_stack.py --stage $STAGE ``` -_datajob cli_ +
+use cdk cli ```shell script -datajob destroy --config datajob_stack.py +cdk destroy --app "python datajob_stack.py" -c stage=$STAGE ``` +
+ +> Note: you can use any cdk arguments in the datajob cli # Functionality
-Pass arguments to a glue job -#todo implemented not documented +Using datajob's S3 data bucket + +Dynamically reference the `datajob_stack` data bucket name to the arguments of your GlueJob by calling +`datajob_stack.context.data_bucket_name`. + +```python +import pathlib + +from aws_cdk import core +from datajob.datajob_stack import DataJobStack +from datajob.glue.glue_job import GlueJob +from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow + +current_dir = str(pathlib.Path(__file__).parent.absolute()) + +app = core.App() + +with DataJobStack( + scope=app, id="datajob-python-pyspark", project_root=current_dir +) as datajob_stack: + + pyspark_job = GlueJob( + datajob_stack=datajob_stack, + name="pyspark-job", + job_path="glue_job/glue_pyspark_example.py", + job_type="glueetl", + glue_version="2.0", # we only support glue 2.0 + python_version="3", + worker_type="Standard", # options are Standard / G.1X / G.2X + number_of_workers=1, + arguments={ + "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv", + "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet", + }, + ) + + with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: + pyspark_job >> ... + +``` + +deploy to stage `my-stage`: + +```shell +datajob deploy --config datajob_stack.py --stage my-stage --package setuppy +``` + +`datajob_stack.context.data_bucket_name` will evaluate to `datajob-python-pyspark-my-stage` + +you can find this example [here](./examples/data_pipeline_pyspark/glue_job/glue_pyspark_example.py) +
Deploy files to deployment bucket -#todo implemented not documented + +Specify the path to the folder we would like to include in the deployment bucket. + +```python + +from aws_cdk import core +from datajob.datajob_stack import DataJobStack + +app = core.App() + +with DataJobStack( + scope=app, id="some-stack-name", include_folder="path/to/folder/" +) as datajob_stack: + + ... + +``` +
Package project -#todo implemented not documented + +Package you project using [poetry](https://python-poetry.org/) + +```shell +datajob deploy --config datajob_stack.py --package poetry +``` +Package you project using [setup.py](./examples/data_pipeline_with_packaged_project) +```shell +datajob deploy --config datajob_stack.py --package setuppy +```
Using Pyspark ```python -pyspark_job = GlueJob( - datajob_stack=datajob_stack, - name="pyspark-job", - job_path="glue_job/glue_pyspark_example.py", - job_type="glueetl", - glue_version="2.0", # we only support glue 2.0 - python_version="3", - worker_type="Standard", # options are Standard / G.1X / G.2X - number_of_workers=1, - arguments={ - "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv", - "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet", - } -) +import pathlib + +from aws_cdk import core +from datajob.datajob_stack import DataJobStack +from datajob.glue.glue_job import GlueJob +from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow + +current_dir = str(pathlib.Path(__file__).parent.absolute()) + +app = core.App() + +with DataJobStack( + scope=app, id="datajob-python-pyspark", project_root=current_dir +) as datajob_stack: + + pyspark_job = GlueJob( + datajob_stack=datajob_stack, + name="pyspark-job", + job_path="glue_job/glue_pyspark_example.py", + job_type="glueetl", + glue_version="2.0", # we only support glue 2.0 + python_version="3", + worker_type="Standard", # options are Standard / G.1X / G.2X + number_of_workers=1, + arguments={ + "--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv", + "--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet", + }, + ) ``` full example can be found in [examples/data_pipeline_pyspark](examples/data_pipeline_pyspark]).
-
-Using S3 bucket to dump data -#todo implemented not documented -# create an example that dumps and reads from s3 -
-
Orchestrate stepfunctions tasks in parallel ```python # task1 and task2 are orchestrated in parallel. -# task 3 will only start when both task1 and task2 have succeeded. +# task3 will only start when both task1 and task2 have succeeded. [task1, task2] >> task3 ``` @@ -159,7 +266,7 @@ full example can be found in [examples/data_pipeline_pyspark](examples/data_pipe
Orchestrate 1 stepfunction task -Use the [Ellipsis](https://docs.python.org/dev/library/constants.html#Ellipsis) object to be able to orchestrate a job via step functions. +Use the [Ellipsis](https://docs.python.org/dev/library/constants.html#Ellipsis) object to be able to orchestrate 1 job via step functions. ```python some_task >> ... @@ -168,10 +275,25 @@ some_task >> ...
-# The magic behind datajob +# Datajob in depth + +The `datajob_stack` is the instance that will result in a cloudformation stack. +The path in `project_root` helps `datajob_stack` locate the root of the project where +the setup.py/poetry pyproject.toml file can be found, as well as the `dist/` folder with the wheel of your project . ```python -with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack: +import pathlib +from aws_cdk import core + +from datajob.datajob_stack import DataJobStack + +current_dir = pathlib.Path(__file__).parent.absolute() +app = core.App() + +with DataJobStack( + scope=app, id="data-pipeline-pkg", project_root=current_dir +) as datajob_stack: + ... ``` @@ -195,6 +317,7 @@ when __exiting the context manager__ all the resources of our DataJobStack objec We can write the above example more explicitly... ```python +import pathlib from aws_cdk import core from datajob.datajob_stack import DataJobStack @@ -203,13 +326,17 @@ from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow app = core.App() -datajob_stack = DataJobStack(scope=app, id="data-pipeline-simple") +current_dir = pathlib.Path(__file__).parent.absolute() + +app = core.App() + +datajob_stack = DataJobStack(scope=app, id="data-pipeline-pkg", project_root=current_dir) datajob_stack.init_datajob_context() task1 = GlueJob(datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py") task2 = GlueJob(datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py") -with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: +with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow: task1 >> task2 datajob_stack.create_resources() @@ -233,10 +360,8 @@ These are the ideas, we find interesting to implement; - processing jobs - hyperparameter tuning jobs - training jobs - - create sagemaker model - - create sagemaker endpoint - - expose sagemaker endpoint to the internet by levering lambda + api gateway - +- implement lambda +- implement ECS Fargate - create a serverless UI that follows up on the different pipelines deployed on possibly different AWS accounts using Datajob > [Feedback](https://github.com/vincentclaes/datajob/discussions) is much appreciated! diff --git a/datajob/datajob_stack.py b/datajob/datajob_stack.py index d251ed2..175b940 100644 --- a/datajob/datajob_stack.py +++ b/datajob/datajob_stack.py @@ -25,7 +25,7 @@ def __init__( :param id: a name for this stack. :param stage: the stage name to which we are deploying :param project_root: the path to the root of this project - :param include_folder: specify the name of the folder we would like to include in the deployment bucket. + :param include_folder: specify the path to the folder we would like to include in the deployment bucket. :param account: AWS account number :param region: AWS region where we want to deploy our datajob to :param kwargs: any extra kwargs for the core.Construct diff --git a/examples/data_pipeline_with_packaged_project/datajob_stack.py b/examples/data_pipeline_with_packaged_project/datajob_stack.py index 3f96557..69b9723 100644 --- a/examples/data_pipeline_with_packaged_project/datajob_stack.py +++ b/examples/data_pipeline_with_packaged_project/datajob_stack.py @@ -27,7 +27,9 @@ # we instantiate a step functions workflow # and orchestrate the glue jobs. - with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: + with StepfunctionsWorkflow( + datajob_stack=datajob_stack, name="workflow" + ) as step_functions_workflow: task1 >> task2 app.synth() diff --git a/examples/data_pipeline_with_packaged_project/datajob_stack_explicit.py b/examples/data_pipeline_with_packaged_project/datajob_stack_explicit.py new file mode 100644 index 0000000..6d5c88a --- /dev/null +++ b/examples/data_pipeline_with_packaged_project/datajob_stack_explicit.py @@ -0,0 +1,33 @@ +""" +same as ./datajob_stack.py but more explicit +""" +import pathlib +from aws_cdk import core + +from datajob.datajob_stack import DataJobStack +from datajob.glue.glue_job import GlueJob +from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow + +app = core.App() + +current_dir = pathlib.Path(__file__).parent.absolute() + +app = core.App() + +datajob_stack = DataJobStack( + scope=app, id="data-pipeline-pkg", project_root=current_dir +) +datajob_stack.init_datajob_context() + +task1 = GlueJob( + datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py" +) +task2 = GlueJob( + datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py" +) + +with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn: + task1 >> task2 + +datajob_stack.create_resources() +app.synth()