Skip to content

Commit

Permalink
Merge pull request #62 from vincentclaes/update-readme
Browse files Browse the repository at this point in the history
update readme
  • Loading branch information
vincentclaes authored Apr 3, 2021
2 parents 4509616 + ae71c55 commit abf1fac
Show file tree
Hide file tree
Showing 4 changed files with 220 additions and 60 deletions.
241 changes: 183 additions & 58 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Datajob

#### Build and deploy a serverless data pipeline with no effort on AWS.
#### Build and deploy a serverless data pipeline on AWS with no effort.

- We support python shell / pyspark Glue jobs.
- Orchestrate using stepfunctions as simple as `task1 >> [task2,task3] >> task4`
Expand All @@ -18,43 +18,46 @@

# Quickstart

### Configuration

We have a simple data pipeline composed of 2 glue jobs orchestrated sequentially.
We have a simple data pipeline composed of [2 glue jobs](./examples/data_pipeline_with_packaged_project/glue_jobs/) orchestrated sequentially using step functions.

```python
import pathlib
from aws_cdk import core

from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow


current_dir = pathlib.Path(__file__).parent.absolute()

app = core.App()

# The datajob_stack is the instance that will result in a cloudformation stack.
# We inject the datajob_stack object through all the resources that we want to add.
with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack:

# We define 2 glue jobs with the relative path to the source code.
with DataJobStack(
scope=app, id="data-pipeline-pkg", project_root=current_dir
) as datajob_stack:

task1 = GlueJob(
datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py"
)

task2 = GlueJob(
datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py"
)

# We instantiate a step functions workflow and orchestrate the glue jobs.
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow:
task1 >> task2

app.synth()

```

We add the code in a file called `datajob_stack.py` in the [root of the project](./examples/data_pipeline_simple/).
We add the above code in a file called `datajob_stack.py` in the [root of the project](./examples/data_pipeline_with_packaged_project/).


### Configure CDK
Follow the steps [here](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-quickstart.html#cli-configure-quickstart-config) to configure your credentials.

```shell script
export AWS_PROFILE=my-profile # e.g. default
Expand All @@ -66,91 +69,195 @@ cdk bootstrap aws://$AWS_ACCOUNT/$AWS_DEFAULT_REGION
```

### Deploy
_cdk cli_

Datajob will create s3 buckets based on the `datajob_stack.id` and the `stage` variable.
The stage variable will typically be something like "dev", "stg", "prd", ...
but since S3 buckets need to be globally unique, for this example we will use our `$AWS_ACCOUNT` for the `--stage` parameter.

```shell
export STAGE=$AWS_ACCOUNT
```

Navigate to `datajob_stack.py` file and deploy the data pipeline.

```shell script
cd examples/data_pipeline_simple
python setup.py bdist_wheel
cdk deploy --app "python datajob_stack.py"
cd examples/data_pipeline_with_packaged_project
datajob deploy --config datajob_stack.py --stage $STAGE --package setuppy
```

_datajob cli_
<details>
<summary>use cdk cli</summary>

```shell script
cd examples/data_pipeline_simple
datajob deploy --config datajob_stack.py --package setuppy
cd examples/data_pipeline_with_packaged_project
python setup.py bdist_wheel
cdk deploy --app "python datajob_stack.py" -c stage=$STAGE
```
</details>

After running the `deploy` command, the code of glue jobs are deployed and orchestrated.
Your glue jobs are deployed and the orchestration is configured.

### Run

The step function state machine name is constructed as `<datajob_stack.id>-<stage>-<step_functions_workflow.name>`.

To run your data pipeline execute:

```shell script
datajob execute --state-machine data-pipeline-simple-dev-workflow
datajob execute --state-machine data-pipeline-pkg-$STAGE-workflow
```
The terminal will output a link to the step functions page to follow up on your pipeline run.

### Destroy
_cdk cli_

```shell script
cdk destroy --app "python datajob_stack.py"
datajob destroy --config datajob_stack.py --stage $STAGE
```

_datajob cli_
<details>
<summary>use cdk cli</summary>
```shell script
datajob destroy --config datajob_stack.py
cdk destroy --app "python datajob_stack.py" -c stage=$STAGE
```
</details>

> Note: you can use any cdk arguments in the datajob cli
# Functionality

<details>
<summary>Pass arguments to a glue job</summary>
#todo implemented not documented
<summary>Using datajob's S3 data bucket</summary>

Dynamically reference the `datajob_stack` data bucket name to the arguments of your GlueJob by calling
`datajob_stack.context.data_bucket_name`.

```python
import pathlib

from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

current_dir = str(pathlib.Path(__file__).parent.absolute())

app = core.App()

with DataJobStack(
scope=app, id="datajob-python-pyspark", project_root=current_dir
) as datajob_stack:

pyspark_job = GlueJob(
datajob_stack=datajob_stack,
name="pyspark-job",
job_path="glue_job/glue_pyspark_example.py",
job_type="glueetl",
glue_version="2.0", # we only support glue 2.0
python_version="3",
worker_type="Standard", # options are Standard / G.1X / G.2X
number_of_workers=1,
arguments={
"--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
"--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
},
)

with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
pyspark_job >> ...

```

deploy to stage `my-stage`:

```shell
datajob deploy --config datajob_stack.py --stage my-stage --package setuppy
```

`datajob_stack.context.data_bucket_name` will evaluate to `datajob-python-pyspark-my-stage`

you can find this example [here](./examples/data_pipeline_pyspark/glue_job/glue_pyspark_example.py)

</details>

<details>
<summary>Deploy files to deployment bucket</summary>
#todo implemented not documented

Specify the path to the folder we would like to include in the deployment bucket.

```python

from aws_cdk import core
from datajob.datajob_stack import DataJobStack

app = core.App()

with DataJobStack(
scope=app, id="some-stack-name", include_folder="path/to/folder/"
) as datajob_stack:

...

```

</details>

<details>
<summary>Package project</summary>
#todo implemented not documented

Package you project using [poetry](https://python-poetry.org/)

```shell
datajob deploy --config datajob_stack.py --package poetry
```
Package you project using [setup.py](./examples/data_pipeline_with_packaged_project)
```shell
datajob deploy --config datajob_stack.py --package setuppy
```
</details>

<details>
<summary>Using Pyspark</summary>

```python
pyspark_job = GlueJob(
datajob_stack=datajob_stack,
name="pyspark-job",
job_path="glue_job/glue_pyspark_example.py",
job_type="glueetl",
glue_version="2.0", # we only support glue 2.0
python_version="3",
worker_type="Standard", # options are Standard / G.1X / G.2X
number_of_workers=1,
arguments={
"--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
"--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
}
)
import pathlib

from aws_cdk import core
from datajob.datajob_stack import DataJobStack
from datajob.glue.glue_job import GlueJob
from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

current_dir = str(pathlib.Path(__file__).parent.absolute())

app = core.App()

with DataJobStack(
scope=app, id="datajob-python-pyspark", project_root=current_dir
) as datajob_stack:

pyspark_job = GlueJob(
datajob_stack=datajob_stack,
name="pyspark-job",
job_path="glue_job/glue_pyspark_example.py",
job_type="glueetl",
glue_version="2.0", # we only support glue 2.0
python_version="3",
worker_type="Standard", # options are Standard / G.1X / G.2X
number_of_workers=1,
arguments={
"--source": f"s3://{datajob_stack.context.data_bucket_name}/raw/iris_dataset.csv",
"--destination": f"s3://{datajob_stack.context.data_bucket_name}/target/pyspark_job/iris_dataset.parquet",
},
)
```
full example can be found in [examples/data_pipeline_pyspark](examples/data_pipeline_pyspark]).
</details>

<details>
<summary>Using S3 bucket to dump data</summary>
#todo implemented not documented
# create an example that dumps and reads from s3
</details>

<details>
<summary>Orchestrate stepfunctions tasks in parallel</summary>

```python
# task1 and task2 are orchestrated in parallel.
# task 3 will only start when both task1 and task2 have succeeded.
# task3 will only start when both task1 and task2 have succeeded.
[task1, task2] >> task3
```

Expand All @@ -159,7 +266,7 @@ full example can be found in [examples/data_pipeline_pyspark](examples/data_pipe
<details>
<summary>Orchestrate 1 stepfunction task</summary>

Use the [Ellipsis](https://docs.python.org/dev/library/constants.html#Ellipsis) object to be able to orchestrate a job via step functions.
Use the [Ellipsis](https://docs.python.org/dev/library/constants.html#Ellipsis) object to be able to orchestrate 1 job via step functions.

```python
some_task >> ...
Expand All @@ -168,10 +275,25 @@ some_task >> ...
</details>


# The magic behind datajob
# Datajob in depth

The `datajob_stack` is the instance that will result in a cloudformation stack.
The path in `project_root` helps `datajob_stack` locate the root of the project where
the setup.py/poetry pyproject.toml file can be found, as well as the `dist/` folder with the wheel of your project .

```python
with DataJobStack(scope=app, id="data-pipeline-simple") as datajob_stack:
import pathlib
from aws_cdk import core

from datajob.datajob_stack import DataJobStack

current_dir = pathlib.Path(__file__).parent.absolute()
app = core.App()

with DataJobStack(
scope=app, id="data-pipeline-pkg", project_root=current_dir
) as datajob_stack:

...
```

Expand All @@ -195,6 +317,7 @@ when __exiting the context manager__ all the resources of our DataJobStack objec
<summary>We can write the above example more explicitly...</summary>

```python
import pathlib
from aws_cdk import core

from datajob.datajob_stack import DataJobStack
Expand All @@ -203,13 +326,17 @@ from datajob.stepfunctions.stepfunctions_workflow import StepfunctionsWorkflow

app = core.App()

datajob_stack = DataJobStack(scope=app, id="data-pipeline-simple")
current_dir = pathlib.Path(__file__).parent.absolute()

app = core.App()

datajob_stack = DataJobStack(scope=app, id="data-pipeline-pkg", project_root=current_dir)
datajob_stack.init_datajob_context()

task1 = GlueJob(datajob_stack=datajob_stack, name="task1", job_path="glue_jobs/task1.py")
task2 = GlueJob(datajob_stack=datajob_stack, name="task2", job_path="glue_jobs/task2.py")

with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as step_functions_workflow:
task1 >> task2

datajob_stack.create_resources()
Expand All @@ -233,10 +360,8 @@ These are the ideas, we find interesting to implement;
- processing jobs
- hyperparameter tuning jobs
- training jobs
- create sagemaker model
- create sagemaker endpoint
- expose sagemaker endpoint to the internet by levering lambda + api gateway

- implement lambda
- implement ECS Fargate
- create a serverless UI that follows up on the different pipelines deployed on possibly different AWS accounts using Datajob

> [Feedback](https://github.com/vincentclaes/datajob/discussions) is much appreciated!
2 changes: 1 addition & 1 deletion datajob/datajob_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(
:param id: a name for this stack.
:param stage: the stage name to which we are deploying
:param project_root: the path to the root of this project
:param include_folder: specify the name of the folder we would like to include in the deployment bucket.
:param include_folder: specify the path to the folder we would like to include in the deployment bucket.
:param account: AWS account number
:param region: AWS region where we want to deploy our datajob to
:param kwargs: any extra kwargs for the core.Construct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@

# we instantiate a step functions workflow
# and orchestrate the glue jobs.
with StepfunctionsWorkflow(datajob_stack=datajob_stack, name="workflow") as sfn:
with StepfunctionsWorkflow(
datajob_stack=datajob_stack, name="workflow"
) as step_functions_workflow:
task1 >> task2

app.synth()
Loading

0 comments on commit abf1fac

Please sign in to comment.