Skip to content

fugue-project/prefect-fugue

Repository files navigation

Prefect Fugue Integration

GitHub release PyPI pyversions PyPI license PyPI version codecov Doc

Slack Status

This project provides the Fugue tasks, context and blocks for Prefect.

Getting Started

Fugue is a unified interface for distributed computing that lets users execute Python, pandas, and SQL code on Spark, Dask and Ray without rewrites.

The most common use cases are:

  • Accelerating or scaling existing Python and pandas code by bringing it to Spark or Dask with minimal rewrites. Using FugueSQL to define end-to-end workflows on top of pandas, Spark, and Dask DataFrames. FugueSQL is an enhanced SQL interface that can invoke Python code with added keywords.
  • Maintaining one codebase for pandas, Spark, Dask and Ray projects. Logic and execution are decoupled through Fugue, enabling users to be focused on their business logic rather than writing framework-specific code.
  • Improving iteration speed of big data projects. Fugue seamlessly scales execution to big data after local development and testing. By removing PySpark code, unit tests can be written in Python or pandas and ran locally without spinning up a cluster.

img

The best way to get started with Fugue is to work through the 10 minute tutorials:

Python setup

Requires an installation of Python 3.8+.

We recommend using a Python virtual environment manager such as pipenv, conda or virtualenv.

Installation

Install prefect-fugue with pip:

pip install prefect-fugue

It's also recommended to register Fugue blocks into your current Prefect workspace:

prefect block register -m prefect_fugue

Creating a Block

This will allow a creation of the Fugue Engine block

img

There are 4 items that need to be filled to create a block.

  • Block Name - name that will be used to use the block.
  • Engine Name - one of the Fugue supported backends (spark, dask, ray, duckdb)
  • Engine Config - configurations related to the cluster
  • Secret Config - credentials to connect to a cluster

For example, a Databricks Block could look like:

  • Block Name - databricks
  • Engine Name - spark
  • Engine Config - None
  • Secret Config - seen below
{
    "host": "https://dbc-38aaa459-faaf.cloud.databricks.com",
    "token": "dapiecaaae64a727498daaaaafe1bace968a",
    "cluster_id": "0612-191111-6fopaaaa"
}

As long as you installed prefect_fugue, Fugue is able to recognize and convert a block expression to a FugueExecutionEngine. For example if you have a block with path fugue/databricks, then the expression prefect:fugue/databricks becomes a valid execution engine expression. When fugue parse this expression, it will load the Block from fugue/databricks, then base on the fields of the block, it will construct a DatabricksExecutionEngine for your Fugue operations.

Using a Spark Cluster Inside a Flow

Let’s start by running code on top of Databricks. databricks-connect is already installed in this environment. This section may have a lot of logs because of the monitoring provided by Prefect. This section also assumes that the user has Prefect configured to the right workspace.

Below we have one task that takes in a SparkSession and uses it to run some Spark code. We can then use this in the Prefect Flow with the fugue.api.engine_context. This will create an ephemeral cluster to run the code underneath, and then turn off when finished.

from prefect import task, flow
import fugue.api as fa

@task
def my_spark_task(spark, n=1):
    df = spark.createDataFrame([[f"hello spark {n}"]], "a string")
    df.show()

@flow
def spark_flow(engine):
    with fa.engine_context(engine) as spark_engine:
        my_spark_task(spark_engine.spark_session, 1)

spark_flow("prefect:fugue/databricks")  # pay attention to the engine expression

Similarly, if you don’t use Databricks but have your own way to get a SparkSession, you can directly pass the SparkSession into the Flow.

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

spark_flow(spark)

More Flexibility

fugue.api.engine_context creates a context under which all fugue operations will use the context engine by default. This works either the fugue code is directly under the context or inside the tasks or flows that are invoked under this context.

from prefect import task, flow
import fugue.api as fa

def my_transformer(df:pd.DataFrame) -> pd.DataFrame:
    return df

@task
def sub_task(path:str):
    df = fa.load(path)
    df = fa.transform(df, my_transformer, schema="*")
    df.save(path+".output.parquet")

@flow
def sub_flow(path:str):
    df = fa.load(path)
    df = fa.transform(df, my_transformer, schema="*")
    df.save(path+".output.parquet")

@flow
def main_flow(path, engine=None):
    with fa.engine_context(engine) as spark_engine:
        sub_task(path)
        sub_flow(path)

main_flow("<local path>")  # test locally, all tasks and flows run without Spark
main_flow("<dbfs path>", "prefect:fugue/databricks")  # all tasks and flows will run on Databrickes

Testing Locally Before Running Map Jobs on Spark, Dask, and Ray

We showed how to run Spark code on top of a Spark cluster, but the strength of Fugue is decoupling from distributed framework code such as Spark and Dask. Decoupling from these frameworks allows us to test code locally before scaling out to a cluster. In the example below, we simulate having a pandas DataFrame where each row is a job.

When testing the Flow, we can pass None as the engine so everything runs on Pandas. When ready to scale out, we can pass in our Block or SparkSession. Fugue’s transform() task will use the engine provided by the fugue_engine context.

from time import sleep
import pandas as pd
import fugue.api as fa
from prefect import task, flow

@task
def create_jobs(n) -> pd.DataFrame:
    return pd.DataFrame(dict(jobs=range(n)))

# schema: *,batch_size:str
def run_one_job(df:pd.DataFrame) -> pd.DataFrame:
    sleep(len(df)*5)
    return df.assign(batch_size=len(df))

@flow
def run_all_jobs(n, engine=None):
    jobs = create_jobs(n)
    with fa.engine_context(engine):
        return transform(jobs, run_one_job, partition="per_row", as_local=True)

We can test the Flow above on a local machine without Spark. We run on one job first.

run_all_jobs(1) # run locally on Pandas

Becasue it succeeded, we can now attach our Fugue Databricks Block to run on Databricks. Now we run on 8 jobs, and we’ll see that parallelization from the Spark cluster will make this Flow execute faster.

run_all_jobs(8, "prefect:fugue/databricks") # run on databricks

There is still some overhead with sending the work, but the time is decreased compared to the expected execution time if ran sequentially (40 seconds).

We can also use local Dask by passing the string "dask". We can also pass a Dask Client() or use the Fugue Engine Block with Coiled. More information can be found in the Coiled cloudprovider docs.

run_all_jobs(4, dask_client)

Running SQL on any Spark, Dask, and Duckdb

Prototyping locally, and then running the full job on the cluster is also possible with FugueSQL. DuckDB is a good engine to run SQL queries on flat files or Pandas DataFrames. When ready, we can bring it to SparkSQL on the cluster. Similar to the transform() task shown above, there is also an fsql() task.

Here we can load in data and perform a query with FugueSQL. FugueSQL has additional keywords such as LOAD and SAVE so we can run everything from loading, processing, and saving all on DuckDB or SparkSQL. More information on FugueSQL can be found in the FugueSQL tutorials.

import fugue.api as fa

@flow
def run_sql(top, engine):
    with fa.engine_context(engine):
        fa.fugue_sql_flow("""
        df = LOAD "https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet"

        SELECT PULocationID, COUNT(*) AS ct FROM df
        GROUP BY 1 ORDER BY 2 DESC LIMIT {{top}}
        PRINT
        """, top=top).run()

To debug locally without SparkSQL, we can use DuckDB as the engine.

run_sql(2, "duckdb"); # debug/develop without spark

Again to run on the cluster, we can use the Databricks Block.

run_sql(10, "prefect:fugue/databricks")

Resources

If you encounter any bugs while using prefect-fugue, feel free to open an issue in the prefect-fugue repository.

If you have any questions or issues while using prefect-fugue, you can find help in the Fugue Slack community.

Development

If you'd like to install a version of prefect-fugue for development, clone the repository and perform an editable install with pip:

git clone https://github.com/fugue-project/prefect-fugue.git

cd prefect-fugue/

pip install -e ".[dev]"

# Install linting pre-commit hooks
pre-commit install