Skip to content

Commit a0c9c39

Browse files
committed
Add databricks version of module 3
Signed-off-by: Danny Chiao <[email protected]>
1 parent 802de24 commit a0c9c39

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+604
-9
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,6 @@ terraform.tfstate.backup
88
*.iml
99
**/feast-postgres-data/*
1010
**/airflow_demo/airflow_home/*
11-
.vscode/*
11+
.vscode/*
12+
**/derby.log
13+
**/metastore_db/*

README.md

+7-6

module_3_db/README.md

+312

module_3_db/airflow_demo/dag.py

+70
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import os
2+
from airflow.decorators import task
3+
from airflow.models.dag import DAG
4+
from airflow.operators.bash import BashOperator
5+
from feast import RepoConfig, FeatureStore
6+
from feast.infra.offline_stores.contrib.spark_offline_store.spark import (
7+
SparkOfflineStoreConfig,
8+
)
9+
from feast.repo_config import RegistryConfig
10+
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
11+
import pendulum
12+
13+
with DAG(
14+
dag_id="feature_dag",
15+
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
16+
description="A dbt + Feast DAG",
17+
schedule="@daily",
18+
catchup=False,
19+
tags=["feast"],
20+
) as dag:
21+
dbt_test = BashOperator(
22+
task_id="dbt_test",
23+
bash_command="""
24+
cd ${AIRFLOW_HOME}; dbt test --models "aggregate_transaction_features"
25+
""",
26+
dag=dag,
27+
)
28+
29+
# Generate new transformed feature values
30+
dbt_run = BashOperator(
31+
task_id="dbt_run",
32+
bash_command="""
33+
cd ${AIRFLOW_HOME}; dbt run --models "aggregate_transaction_features"
34+
""",
35+
dag=dag,
36+
)
37+
38+
# Use Feast to make these feature values available in a low latency store
39+
@task()
40+
def materialize(data_interval_start=None, data_interval_end=None):
41+
repo_config = RepoConfig(
42+
registry=RegistryConfig(
43+
registry_type="sql",
44+
path="postgresql://postgres:mysecretpassword@[YOUR-RDS-ENDPOINT:PORT]/feast",
45+
),
46+
project="feast_demo",
47+
provider="local",
48+
offline_store=SparkOfflineStoreConfig(
49+
spark_conf={
50+
"spark.ui.enabled": "false",
51+
"spark.eventLog.enabled": "false",
52+
"spark.sql.catalogImplementation": "hive",
53+
"spark.sql.parser.quotedRegexColumnNames": "true",
54+
"spark.sql.session.timeZone": "UTC",
55+
}
56+
),
57+
online_store=DynamoDBOnlineStoreConfig(region="us-west-1"),
58+
entity_key_serialization_version=2,
59+
)
60+
# Needed for Mac OS users because of a seg fault in requests for standalone Airflow (not needed in prod)
61+
os.environ["NO_PROXY"] = "*"
62+
os.environ["SPARK_LOCAL_IP"] = "127.0.0.1"
63+
store = FeatureStore(config=repo_config)
64+
# Add 1 hr overlap to account for late data
65+
# Note: normally, you'll probably have different feature views with different freshness requirements, instead
66+
# of materializing all feature views every day.
67+
store.materialize(data_interval_start.subtract(hours=1), data_interval_end)
68+
69+
# Setup DAG
70+
dbt_test >> dbt_run >> materialize()
+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Airflow needs a home. `~/airflow` is the default, but you can put it
2+
# somewhere else if you prefer (optional)
3+
export AIRFLOW_HOME=$(pwd)/airflow_home
4+
export AIRFLOW__CORE__LOAD_EXAMPLES=False
5+
# TODO: UPDATE THIS
6+
export AIRFLOW_CONN_DATABRICKS_DEFAULT='databricks://@host-url?token=yourtoken'
7+
8+
# Cleanup previous state, if it exists
9+
rm -rf $AIRFLOW_HOME
10+
11+
# Install Airflow using the constraints file
12+
AIRFLOW_VERSION=2.4.0
13+
PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
14+
# For example: 3.7
15+
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
16+
# For example: https://raw.githubusercontent.com/apache/airflow/constraints-2.4.0/constraints-3.7.txt
17+
pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
18+
19+
# Setup Feast dags
20+
mkdir -p $AIRFLOW_HOME/dags
21+
cp dag.py $AIRFLOW_HOME/dags
22+
23+
# Setup dbt dags
24+
cd ../dbt/feast_demo
25+
cp -R * $AIRFLOW_HOME
26+
cd $AIRFLOW_HOME
27+
28+
# The Standalone command will initialise the database, make a user,
29+
# and start all components for you.
30+
airflow standalone
31+
32+
# Visit localhost:8080 in the browser and use the admin account details
33+
# shown on the terminal to login.

module_3_db/architecture.png

164 KB
+29
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
2+
# Name your project! Project names should contain only lowercase characters
3+
# and underscores. A good package name should reflect your organization's
4+
# name or the intended use of these models
5+
name: 'feast_demo'
6+
version: '1.0.0'
7+
config-version: 2
8+
9+
# This setting configures which "profile" dbt uses for this project.
10+
profile: 'feast_demo'
11+
12+
# These configurations specify where dbt should look for different types of files.
13+
# The `model-paths` config, for example, states that models in this project can be
14+
# found in the "models/" directory. You probably won't need to change these!
15+
model-paths: ["models"]
16+
macro-paths: ["macros"]
17+
18+
19+
target-path: "target" # directory which will store compiled SQL files
20+
clean-targets: # directories to be removed by `dbt clean`
21+
- "target"
22+
- "dbt_packages"
23+
24+
models:
25+
feast_demo:
26+
example:
27+
materialized: view
28+
location_root: '[YOUR S3 BUCKET/FOLDER]'
29+
file_format: 'delta'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
{% macro prev_day_partition() %}
2+
(SELECT MAX(timestamp)::date FROM {{ this }})
3+
{% endmacro %}
4+
5+
{% macro next_day_partition() %}
6+
(SELECT date_add(MAX(timestamp)::date, 1) FROM {{ this }})
7+
{% endmacro %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
{{
2+
config(
3+
materialized='incremental',
4+
file_format='parquet',
5+
incremental_strategy='append'
6+
)
7+
}}
8+
9+
SELECT *
10+
FROM
11+
(
12+
SELECT
13+
user_id,
14+
to_timestamp(timestamp) AS timestamp,
15+
SUM(amt) OVER (
16+
PARTITION BY user_id
17+
ORDER BY to_timestamp(timestamp)
18+
RANGE BETWEEN INTERVAL 1 day PRECEDING AND CURRENT ROW
19+
) AS amt_sum_1d_10m,
20+
AVG(amt) OVER (
21+
PARTITION BY user_id
22+
ORDER BY to_timestamp(timestamp)
23+
RANGE BETWEEN INTERVAL 1 day PRECEDING AND CURRENT ROW
24+
) AS amt_mean_1d_10m
25+
FROM demo_fraud_v2.transactions
26+
{% if is_incremental() %}
27+
WHERE
28+
partition_0 BETWEEN date_format({{ prev_day_partition() }}, "yyyy") AND date_format({{ next_day_partition() }}, "yyyy") AND
29+
partition_1 BETWEEN date_format({{ prev_day_partition() }}, "MM") AND date_format({{ next_day_partition() }}, "MM") AND
30+
partition_2 BETWEEN date_format({{ prev_day_partition() }}, "dd") AND date_format({{ next_day_partition() }}, "dd")
31+
{% else %}
32+
-- Hack to simulate we started on 2021-06-01
33+
WHERE
34+
partition_0 = "2022" AND
35+
partition_1 = "04" AND
36+
partition_2 = "20"
37+
{% endif %}
38+
)
39+
{% if is_incremental() %}
40+
-- Need to only produce values in this window
41+
WHERE timestamp > (SELECT MAX(timestamp) FROM {{ this }})
42+
{% endif %}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
2+
version: 2
3+
4+
models:
5+
- name: aggregate_transaction_features
6+
description: ""
7+
columns:
8+
- name: "user_id"
9+
description: "The primary key for this table"
10+
tests:
11+
- not_null
File renamed without changes.

module_3_db/demo_images/db_ghf.png

494 KB

module_3_db/demo_images/db_gof.png

245 KB
201 KB
450 KB

module_3_db/demo_images/dbt.png

122 KB
558 KB
+22
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
from feast import PushSource
2+
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
3+
SparkSource,
4+
)
5+
6+
# Historical log of transactions stream
7+
transactions_source = SparkSource(
8+
name="transactions_source",
9+
table="demo_fraud_v2.transactions",
10+
timestamp_field="timestamp",
11+
)
12+
13+
# Precomputed aggregate transaction feature values (batch / stream)
14+
aggregate_transactions_source = PushSource(
15+
name="transactions_1d",
16+
batch_source=SparkSource(
17+
name="transactions_1d_batch",
18+
table="demo_fraud_v2.aggregate_transaction_features",
19+
timestamp_field="timestamp",
20+
tags={"dbtModel": "models/example/aggregate_transaction_features.sql"},
21+
),
22+
)

module_3_db/feature_repo/entities.py

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from feast import (
2+
Entity,
3+
)
4+
5+
user = Entity(
6+
name="user",
7+
join_keys=["user_id"],
8+
description="user id",
9+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from feast import FeatureService
2+
3+
from features import *
4+
5+
feature_service_1 = FeatureService(
6+
name="model_v1",
7+
features=[user_transaction_amount_metrics],
8+
9+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
project: feast_demo
2+
provider: aws
3+
registry: # where this repo's metadata is stored
4+
registry_type: sql
5+
path: postgresql://postgres:mysecretpassword@[your-rds-instance]:5432/feast
6+
online_store: # low latency online storage
7+
type: dynamodb
8+
region: us-west-1
9+
offline_store:
10+
type: spark
11+
spark_conf: # Note: pip install -U "databricks-connect"
12+
spark.ui.enabled: "false"
13+
spark.eventLog.enabled: "false"
14+
spark.sql.catalogImplementation: "hive"
15+
spark.sql.parser.quotedRegexColumnNames: "true"
16+
spark.sql.session.timeZone: "UTC"
17+
entity_key_serialization_version: 2

module_3_db/feature_repo/features.py

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from datetime import timedelta
2+
3+
from feast import (
4+
FeatureView,
5+
Field,
6+
)
7+
from feast.types import String, Float64
8+
9+
from data_sources import *
10+
from entities import *
11+
12+
user_transaction_amount_metrics = FeatureView(
13+
name="user_transaction_amount_metrics",
14+
description="User transaction features",
15+
entities=[user],
16+
ttl=timedelta(seconds=8640000000),
17+
schema=[
18+
Field(name="user_id", dtype=String),
19+
Field(name="amt_sum_1d_10m", dtype=Float64),
20+
Field(name="amt_mean_1d_10m", dtype=Float64),
21+
],
22+
online=True,
23+
source=aggregate_transactions_source,
24+
tags={"production": "True"},
25+
26+
)

module_3_db/orchestrate.png

217 KB

module_3_db/sample_db_notebook.ipynb

+1
Large diffs are not rendered by default.

module_3/README.md renamed to module_3_sf/README.md

+2-2

module_3_sf/airflow.png

247 KB
File renamed without changes.
File renamed without changes.

module_3_sf/dbt/feast_demo/.gitignore

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
2+
target/
3+
dbt_packages/
4+
logs/
File renamed without changes.
File renamed without changes.

0 commit comments

Comments
 (0)