Skip to content

Commit

Permalink
Checking in DAGs and Docker CWL image
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCinquini committed Feb 23, 2024
1 parent 450a631 commit 4798c3e
Show file tree
Hide file tree
Showing 8 changed files with 362 additions and 3 deletions.
25 changes: 25 additions & 0 deletions .github/workflows/build_docker_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ env:
REGISTRY: ghcr.io
TAG: ${{ github.event.inputs.tag }}
SPS_AIRFLOW: ${{ github.repository }}/sps-airflow
SPS_DOCKER_CWL: ${{ github.repository }}/sps-docker-cwl

jobs:
build-sps-airflow:
Expand All @@ -37,3 +38,27 @@ jobs:
push: true
tags: ${{ env.REGISTRY }}/${{ env.SPS_AIRFLOW }}:${{ env.TAG }}
labels: ${{ steps.metascheduler.outputs.labels }}
build-sps-docker-cwl:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Log in to the Container registry
uses: docker/login-action@f054a8b539a109f9f41c372932f1ae047eff08c9
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for SPS Docker CWL image
id: metascheduler
uses: docker/metadata-action@98669ae865ea3cffbcbaa878cf57c20bbf1c6c38
with:
images: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL }}
- name: Build and push SPS Docker CWL image
uses: docker/build-push-action@ad44023a93711e3deb337508980b4b5e9bcdc5dc
with:
context: ./airflow/docker/cwl
file: airflow/docker/cwl/Dockerfile
push: true
tags: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL }}:${{ env.TAG }}
labels: ${{ steps.metascheduler.outputs.labels }}

63 changes: 63 additions & 0 deletions airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# DAG to execute a generic CWL workflow.
# The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries.
# The "cwl-runner" tool is invoked to execute the CWL workflow.
# Parameter cwl_workflow: the URL of the CWL workflow to execute.
# Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs.
from datetime import datetime
from airflow import DAG
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.models.param import Param
import uuid

# The Kubernetes Pod that executes the CWL-Docker container
# Must use elevated privileges to start/stop the Docker engine
POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml"

# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "airflow"

# Example arguments
default_cwl_workflow = "https://raw.githubusercontent.com/unity-sds/unity-sps-prototype/cwl-docker/cwl/cwl_workflows/echo_from_docker.cwl"
default_args_as_json = '{ "greeting": "Ciao", "name": "Terra" }'

# Default DAG configuration
dag_default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1, 0, 0)
}

# The DAG
dag = DAG(dag_id='cwl-dag',
description='DAG to execute a generic CWL workflow',
tags=['cwl', 'unity-sps', "docker"],
is_paused_upon_creation=True,
catchup=False,
schedule_interval=None,
max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(default_cwl_workflow, type="string"),
"args_as_json": Param(default_args_as_json, type="string"),
})

# Environment variables
default_env_vars = {}

# This section defines KubernetesPodOperator
cwl_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="cwl-task",
is_delete_operator_pod=True,
hostnetwork=False,
startup_timeout_seconds=1000,
get_logs=True,
task_id="docker-cwl-task",
full_pod_spec=k8s.V1Pod(
metadata=k8s.V1ObjectMeta(
name='docker-cwl-pod-' + uuid.uuid4().hex),
),
pod_template_file=POD_TEMPLATE_FILE,
arguments=["{{ params.cwl_workflow }}", "{{ params.args_as_json }}"],
dag=dag)
15 changes: 15 additions & 0 deletions airflow/dags/docker_cwl_pod.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
apiVersion: v1
kind: Pod
metadata:
name: docker-cwl-pod
spec:

restartPolicy: Never

containers:
- name: cwl-docker
image: ghcr.io/unity-sds/unity-sps/sps-docker-cwl:development-tag2
command: ["/usr/share/cwl/docker_cwl_entrypoint.sh"]
securityContext:
privileged: true

82 changes: 82 additions & 0 deletions airflow/dags/say_hello_from_cwl_and_docker_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from datetime import datetime
from airflow import DAG
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.models.param import Param
import json
import uuid


# Default DAG configuration
dag_default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1, 0, 0)
}

# The DAG
CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-prototype/cwl-docker/cwl/cwl_workflows/echo_from_docker.cwl"
dag = DAG(dag_id='say-hello-from-cwl-and-docker',
description='Workflow to greet anybody, anytime',
tags=["CWL", "World Peace", "The United Nations"],
is_paused_upon_creation=True,
catchup=False,
schedule=None,
max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(CWL_WORKFLOW, type="string"),
"greeting": Param("Hello", type="string"),
"name": Param("World", type="string"),
})

# Environment variables
default_env_vars = {}


# Task that captures the DAG specific arguments
# and creates a json-formatted string for the downstream Tasks
def setup(ti=None, **context):
task_dict = {
'greeting': context['params']['greeting'],
'name': context['params']['name']
}
ti.xcom_push(key='cwl_args', value=json.dumps(task_dict))


setup_task = PythonOperator(task_id="Setup",
python_callable=setup,
dag=dag)


stage_in_task = BashOperator(
task_id="Stage_In",
dag=dag,
bash_command="echo Downloading data")

# This section defines KubernetesPodOperator
cwl_task = KubernetesPodOperator(
namespace="airflow",
name="CWL_Workflow",
on_finish_action="delete_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
get_logs=True,
task_id="CWL_Workflow",
full_pod_spec=k8s.V1Pod(
k8s.V1ObjectMeta(name=('docker-cwl-pod-' + uuid.uuid4().hex))),
pod_template_file="/opt/airflow/dags/docker_cwl_pod.yaml",
#image="ghcr.io/unity-sds/unity-sps-prototype/unity-sps-docker-cwl:latest",
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"],
# resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
dag=dag)

stage_out_task = BashOperator(
task_id="Stage_Out",
dag=dag,
bash_command="echo Uploading data")

setup_task >> stage_in_task >> cwl_task >> stage_out_task

87 changes: 87 additions & 0 deletions airflow/dags/sbg_preprocess_cwl_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# DAG for executing the SBG Preprocess Workflow
# See https://github.com/unity-sds/sbg-workflows/blob/main/preprocess/sbg-preprocess-workflow.cwl
from datetime import datetime
from airflow import DAG
from kubernetes.client import models as k8s
from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator
from airflow.operators.python import PythonOperator
from airflow.models.param import Param
import json
import uuid

# The Kubernetes Pod that executes the CWL-Docker container
# Must use elevated privileges to start/stop the Docker engine
POD_TEMPLATE_FILE = "/opt/airflow/dags/docker_cwl_pod.yaml"

# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "airflow"


# Default DAG configuration
dag_default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1, 0, 0)
}
CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/1.0/preprocess/sbg-preprocess-workflow.cwl"

dag = DAG(dag_id='sbg-preprocess-cwl-dag',
description='SBG Preprocess Workflow as CWL',
tags=["SBG", "Unity", "SPS", "NASA", "JPL"],
is_paused_upon_creation=True,
catchup=False,
schedule=None,
max_active_runs=1,
default_args=dag_default_args,
params={
"cwl_workflow": Param(CWL_WORKFLOW, type="string"),
# "input_processing_labels": Param(["label1", "label2"], type="string[]"),
"input_cmr_collection_name": Param("C2408009906-LPCLOUD", type="string"),
"input_cmr_search_start_time": Param("2024-01-03T13:19:36.000Z", type="string"),
"input_cmr_search_stop_time": Param("2024-01-03T13:19:36.000Z", type="string"),
"input_unity_dapa_api": Param("https://d3vc8w9zcq658.cloudfront.net", type="string"),
"input_unity_dapa_client": Param("40c2s0ulbhp9i0fmaph3su9jch", type="string"),
"input_crid": Param("001", type="string"),
"output_collection_id": Param("urn:nasa:unity:unity:dev:SBG-L1B_PRE___1", type="string"),
"output_data_bucket": Param("sps-dev-ds-storage", type="string"),

})

# Task that serializes the job arguments into a JSON string
def setup(ti=None, **context):
task_dict = {
'input_processing_labels': ["label1", "label2"],
'input_cmr_collection_name': context['params']['input_cmr_collection_name'],
'input_cmr_search_start_time': context['params']['input_cmr_search_start_time'],
'input_cmr_search_stop_time': context['params']['input_cmr_search_stop_time'],
'input_unity_dapa_api': context['params']['input_unity_dapa_api'],
'input_unity_dapa_client': context['params']['input_unity_dapa_client'],
'input_crid': context['params']['input_crid'],
'output_collection_id': context['params']['output_collection_id'],
'output_data_bucket': context['params']['output_data_bucket']
}
ti.xcom_push(key='cwl_args', value=json.dumps(task_dict))


setup_task = PythonOperator(task_id="Setup",
python_callable=setup,
dag=dag)


# Task that executes the specific CWL workflow with the previous arguments
cwl_task = KubernetesPodOperator(
namespace=POD_NAMESPACE,
name="SBG_Preprocess_CWL",
on_finish_action="delete_pod",
hostnetwork=False,
startup_timeout_seconds=1000,
get_logs=True,
task_id="SBG_Preprocess_CWL",
full_pod_spec=k8s.V1Pod(
k8s.V1ObjectMeta(name=('sbg-preprocess-cwl-pod-' + uuid.uuid4().hex))),
pod_template_file=POD_TEMPLATE_FILE,
arguments=["{{ params.cwl_workflow }}", "{{ti.xcom_pull(task_ids='Setup', key='cwl_args')}}"],
# resources={"request_memory": "512Mi", "limit_memory": "1024Mi"},
dag=dag)

setup_task >> cwl_task
32 changes: 29 additions & 3 deletions airflow/docker/custom_airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,31 @@
FROM apache/airflow:2.8.1-python3.11
# FROM apache/airflow:2.8.1-python3.11
# cat /etc/os-release
# "Debian GNU/Linux 12 (bookworm)"
FROM apache/airflow

COPY ./airflow/dags/ ${AIRFLOW_HOME}/dags/
# add editor
USER root
RUN apt-get update \
&& apt-get install -y --no-install-recommends \
vim \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

# add git
RUN apt-get update \
&& apt-get install -y git \
&& git --version

USER airflow

# add Python libraries
# RUN pip install cwltool==3.1.20240112164112
RUN pip install cwltool cwl-runner \
apache-airflow-providers-docker \
apache-airflow-providers-cncf-kubernetes \
kubernetes-client

# add DAGs
COPY ./airflow/dags ${AIRFLOW_HOME}/dags/

RUN pip install cwltool==3.1.20240112164112
25 changes: 25 additions & 0 deletions airflow/docker/cwl/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# docker:dind Dockerfile: https://github.com/docker-library/docker/blob/master/Dockerfile-dind.template
# FROM docker:dind
FROM docker:25.0.3-dind

# install Python
RUN apk add --update --no-cache python3 && ln -sf python3 /usr/bin/python
RUN apk add gcc musl-dev linux-headers python3-dev
RUN apk add --no-cache python3 py3-pip
RUN apk add vim

# install CWL libraries
RUN mkdir /usr/share/cwl \
&& cd /usr/share/cwl \
&& python -m venv venv \
&& source venv/bin/activate \
&& pip install cwltool cwl-runner docker

# install nodejs to parse Javascript in CWL files
RUN apk add --no-cache nodejs npm

# script to execute a generic CWL workflow with arguments
COPY docker_cwl_entrypoint.sh /usr/share/cwl/docker_cwl_entrypoint.sh

WORKDIR /usr/share/cwl
ENTRYPOINT ["/usr/share/cwl/docker_cwl_entrypoint.sh"]
36 changes: 36 additions & 0 deletions airflow/docker/cwl/docker_cwl_entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/sh
# Script to execute a CWL workflow that includes Docker containers
# The Docker engine is started before the CWL execution, and stopped afterwards.
# $1: the CWL workflow URL (example: https://raw.githubusercontent.com/unity-sds/unity-sps-prototype/cwl-docker/cwl/cwl_workflows/echo_from_docker.cwl)
# $2: the CWL job parameters as a JSON fomatted string (example: { name: John Doe })
# $3: optional output directory, defaults to the current directory
# Note: $output_dir must be accessible by the Docker container that executes this script

set -ex
cwl_workflow=$1
job_args=$2
output_dir=${3:-.}
echo "Executing CWL workflow: $cwl_workflow with json arguments: $job_args and output directory: $output_dir"
echo $job_args > /tmp/job_args.json
cat /tmp/job_args.json

# create output directory if it doesn't exist
mkdir -p $output_dir

# Start Docker engine
dockerd &> dockerd-logfile &

# Wait until Docker engine is running
# Loop until 'docker version' exits with 0.
until docker version > /dev/null 2>&1
do
sleep 1
done

# Execute CWL workflow
source /usr/share/cwl/venv/bin/activate
cwl-runner --outdir $output_dir --no-match-user --no-read-only $cwl_workflow /tmp/job_args.json
deactivate

# Stop Docker engine
pkill -f dockerd

0 comments on commit 4798c3e

Please sign in to comment.