Skip to content

A simple, scalable Apache Airflow and Databricks use case utilizing Delta Tables & PySpark.

License

Notifications You must be signed in to change notification settings

neubrom/Airflow_Databricks_DAG

Repository files navigation

GPL License Forks Issues Code Size


Apache Airflow <> Databricks Example Pipeline

A simple, scalable use case utilizing Apache Airflow, Databricks, Delta Tables, & PySpark!
Check out the Airflow code »
Check out the Databricks code »

Download · Report Bug · Request Feature

Table of Contents

About The Project

diagram

Databricks is powerful, as is Apache Airflow. Together, they make a compelling use case for a well-rounded, all-you-need stack for many of your data pipeline needs. This project showcases the utilization of Databricks to extract, manipulate, and upsert data into its Delta Table infrastructure, all nicely wrapped (and automated!) with Apache Airflow.

Built With

Built For

  • Astronomer as one of my many projects during my 2022 Winter/Spring internship.

Important Files

Use Case

Imagine a Data Analyst who works for an investment management firm, helping clients make good decisions about their investment portfolios. To do so, the Data Analyst retrieves market data regularly, and for each client provides an analysis of how the industries they are invested in perform.

The Data Analyst persists the transformed data from analyses, sends automated notifications to clients to take action when relevant, and keeps a dashboard up to date for them to check their investment health at a glance.

Let’s look into this Data Analysts workflow.

Process

Part 1: Airflow Triggers Databricks Notebook While Passing Parameters.

Step 1: Pass parameters from Airflow using notebook_params = portfolio

  portfolio = {
               "stocks": "MSFT AAPL IBM WMT SHOP GOOGL TSLA GME AMZN COST COKE CBRE NVDA AMD PG"
               }

   # Run the Databricks job and retrieve the job Run ID
   run_databricks_job = DatabricksRunNowOperator(
       task_id="Run_Databricks_Job",
       databricks_conn_id=DATABRICKS_CONNECTION_ID,
       job_id=137122987688189,
       do_xcom_push=True,
       notebook_params = portfolio
   )

Step 2: Use dbutils.widgets.text(param, default_value) to load params pushed by Airflow into the Databricks notebook. param_get

Part 2: Data ingestion & Transformations

Step 1: Invoking the API, we pull today's market cap data from Yahoo Finance using the yfinance Python package. invoke_api

Step 2: Aggregate today's market cap data by Industry Sector
aggregate_mkt_cap

Part 3: Enjoying the View, A (Delta) Table on a (Delta) Lake

Step 1: Transform the pandas dataframe into a Spark dataframe. Write that dataframe into a temporary Delta Table. to_spark

Step 2: Upsert the temp Delta Table (containing today's data) into the permanent Delta Table containing all previous historic data. upsert


Going forward, you can now link this table to Tableau for analysis.

Part 4: Monitoring portfolio performance & Email Notifications

Step 1: Determining the Percentage Change from Day Prior percent_change

Step 2: Exit the Databricks Notebook with output data, which is subsequently captured by Airflow and passed around via. XCOM.
xcom

Step 3: Ingesting results in Airflow: This data is picked up using the DatabricksHook and assigned to the variable model_uri.

@task
 def Retreive_Databricks_Output(id):

     # retrieve xcom data using DatabricksHook
     databricks_hook = DatabricksHook()
     model_uri = databricks_hook.get_run_output(id)['notebook_output']['result']

     return model_uri

 # Variable "Output" contains the xcom data from Databricks
 retreive_databricks_output = Retreive_Databricks_Output(run_databricks_job.output['run_id'])

Step 4: Using the BranchPythonOperator to decide whether to notify

   # Decide as to whether or not an email should be sent based on the content of Output
 branching = BranchPythonOperator(
     task_id='Check_if_Email_is_Needed',
     op_args = [retreive_databricks_output],
     python_callable=_split,
 )

 def _split(data):
     if data == "No Email Required":
         print("LOG: No big movers, no email was sent")
         return 'No_Email_Required'
     else:
         return 'Send_Email'

Step 5: Send email notification.

   # Send email containing the content of the xcom
 mail = EmailOperator(
     task_id='Send_Email',
     to='[email protected]',
     subject='Daily Movers',
     html_content=retreive_databricks_output,
     )

License

Distributed under Apache License 2.0. See LICENSE for more information.

Contact

Amir Zahreddine - [email protected]

Project Link: https://github.com/AmirZahre/Data_Analyst_DAG

Acknowledgements

  • Santona Tuli for being an awesome mentor towards my introduction to DevOps!
  • The team @ Astronomer for help with any questions that arose while learning Airflow.
  • Ran Aroussi for creating a fabulous Yahoo Finance API library for Python.

About

A simple, scalable Apache Airflow and Databricks use case utilizing Delta Tables & PySpark.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published