diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index bd98723..89d6e83 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -1,12 +1,24 @@ -// For format details, see https://aka.ms/devcontainer.json. { - "name": "Snowflake Demo Codespace", - // Configure tool-specific properties. - "customizations": { - "vscode": { - "extensions": [ - "snowflake.snowflake-vsc" - ] + "name": "Snowflake Demo Codespace", + "features": { + "ghcr.io/devcontainers/features/python:1": { + "version": "3.11" + } + }, + "customizations": { + "vscode": { + "extensions": [ + "ms-python.python", + "snowflake.snowflake-vsc" + ], + "settings": { + "snowflake.snowsqlConfigPath": "${containerWorkspaceFolder}/.snowflake/config.toml" } } - } \ No newline at end of file + }, + "updateContentCommand": "bash .devcontainer/install-dependencies.sh", + "postCreateCommand": "chmod 0600 \"${containerWorkspaceFolder}/.snowflake/config.toml\"", + "containerEnv": { + "SNOWFLAKE_HOME": "${containerWorkspaceFolder}/.snowflake" + } +} \ No newline at end of file diff --git a/.devcontainer/install-dependencies.sh b/.devcontainer/install-dependencies.sh new file mode 100644 index 0000000..b09cb97 --- /dev/null +++ b/.devcontainer/install-dependencies.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +pip install snowflake-snowpark-python +pip install snowflake.core +pipx install snowflake-cli --python 3.11 \ No newline at end of file diff --git a/.github/workflows/deploy_pipeline.yml b/.github/workflows/deploy_pipeline.yml index 9cd698b..ce06406 100644 --- a/.github/workflows/deploy_pipeline.yml +++ b/.github/workflows/deploy_pipeline.yml @@ -27,10 +27,11 @@ jobs: uses: actions/checkout@v4 # Install Snowflake CLI GitHub Action and point to config file - - uses: Snowflake-Labs/snowflake-cli-action@v1 + - name: Install snowflake-cli + uses: Snowflake-Labs/snowflake-cli-action@v1.5 with: cli-version: "latest" - default-config-file-path: "config.toml" + default-config-file-path: ".snowflake/config.toml" # Update Snowflake's copy of the repository - name: Fetch repository changes @@ -42,10 +43,10 @@ jobs: BRANCH_NAME=${{ github.ref_name }} if [ "${BRANCH_NAME}" == "main" ]; then RETENTION_TIME=1 - elif [ "${BRANCH_NAME}" == "dev" ]; then + else RETENTION_TIME=0 fi snow git execute \ - "@${REPO_NAME}/branches/${BRANCH_NAME}/deploy_parametrized_pipeline.sql" \ + "@${REPO_NAME}/branches/${BRANCH_NAME}/steps/0[134]_*" \ -D "environment='${BRANCH_NAME}'" \ -D "retention_time=${RETENTION_TIME}" diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7ab19c7 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +.snowflake/logs diff --git a/.snowflake/config.toml b/.snowflake/config.toml new file mode 100644 index 0000000..5988380 --- /dev/null +++ b/.snowflake/config.toml @@ -0,0 +1,8 @@ +[connections.default] +warehouse = "QUICKSTART_WH" +role = "ACCOUNTADMIN" +database = "QUICKSTART_COMMON" +schema = "PUBLIC" +account = "" +user = "" +password = "" diff --git a/config.toml b/config.toml deleted file mode 100644 index 57bdc58..0000000 --- a/config.toml +++ /dev/null @@ -1,3 +0,0 @@ -[connections.default] -warehouse = "QUICKSTART_WH" -role = "ACCOUNTADMIN" diff --git a/deploy_parametrized_pipeline.sql b/deploy_parametrized_pipeline.sql deleted file mode 100644 index 9fe030f..0000000 --- a/deploy_parametrized_pipeline.sql +++ /dev/null @@ -1,3 +0,0 @@ -execute immediate from 'steps/01_setup_snowflake.sql' using (environment => '{{environment}}'); -execute immediate from 'steps/03_harmonize_data.sql' using (environment => '{{environment}}'); -execute immediate from 'steps/04_orchestrate_jobs.sql' using (environment => '{{environment}}', retention_time => {{retention_time}}); diff --git a/deploy_pipeline.sql b/deploy_pipeline.sql deleted file mode 100644 index f8c8461..0000000 --- a/deploy_pipeline.sql +++ /dev/null @@ -1,3 +0,0 @@ -execute immediate from 'steps/01_setup_snowflake.sql'; -execute immediate from 'steps/03_harmonize_data.sql'; -execute immediate from 'steps/04_orchestrate_jobs.sql'; \ No newline at end of file diff --git a/steps/01_setup_snowflake.sql b/steps/01_setup_snowflake.sql index d25e472..7e1bb6a 100644 --- a/steps/01_setup_snowflake.sql +++ b/steps/01_setup_snowflake.sql @@ -1,10 +1,13 @@ USE ROLE ACCOUNTADMIN; -CREATE WAREHOUSE IF NOT EXISTS QUICKSTART_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE; +CREATE OR ALTER WAREHOUSE QUICKSTART_WH + WAREHOUSE_SIZE = XSMALL + AUTO_SUSPEND = 300 + AUTO_RESUME= TRUE; -- Separate database for git repository -CREATE DATABASE IF NOT EXISTS QUICKSTART_COMMON; +CREATE OR ALTER DATABASE QUICKSTART_COMMON; -- API integration is needed for GitHub integration @@ -20,7 +23,7 @@ CREATE OR REPLACE GIT REPOSITORY quickstart_common.public.quickstart_repo ORIGIN = ''; -- INSERT URL OF FORKED REPO HERE -CREATE OR REPLACE DATABASE QUICKSTART_PROD; +CREATE OR ALTER DATABASE QUICKSTART_PROD; -- To monitor data pipeline's completion @@ -30,14 +33,14 @@ CREATE OR REPLACE NOTIFICATION INTEGRATION email_integration -- Database level objects -CREATE SCHEMA IF NOT EXISTS bronze; -CREATE SCHEMA IF NOT EXISTS silver; -CREATE SCHEMA IF NOT EXISTS gold; +CREATE OR ALTER SCHEMA bronze; +CREATE OR ALTER SCHEMA silver; +CREATE OR ALTER SCHEMA gold; -- Schema level objects CREATE OR REPLACE FILE FORMAT bronze.json_format TYPE = 'json'; -CREATE OR REPLACE STAGE bronze.raw; +CREATE OR ALTER STAGE bronze.raw; -- Copy file from GitHub to internal stage diff --git a/steps/02_access_marketplace_data.sql b/steps/02_access_marketplace_data.sql index 4c7a534..5143b18 100644 --- a/steps/02_access_marketplace_data.sql +++ b/steps/02_access_marketplace_data.sql @@ -4,5 +4,5 @@ USE ROLE ACCOUNTADMIN; SELECT * FROM oag_flight_emissions_data_sample.public.estimated_emissions_schedules_sample LIMIT 100; SELECT * FROM oag_flight_status_data_sample.public.flight_status_latest_sample LIMIT 100; SELECT * FROM global_weather__climate_data_for_bi.standard_tile.forecast_day LIMIT 100; -SELECT * FROM government_essentials.cybersyn.datacommons_timeseries LIMIT 100; -SELECT * FROM us_points_of_interest__addresses.cybersyn.point_of_interest_index LIMIT 100; +SELECT * FROM global_government.cybersyn.datacommons_timeseries LIMIT 100; +SELECT * FROM us_addresses__poi.cybersyn.point_of_interest_index LIMIT 100; diff --git a/steps/03_harmonize_data.py b/steps/03_harmonize_data.py new file mode 100644 index 0000000..83b62d4 --- /dev/null +++ b/steps/03_harmonize_data.py @@ -0,0 +1,247 @@ +# Views to transform marketplace data in pipeline + +import os + +from snowflake.core import Root, CreateMode +from snowflake.snowpark import Session +from snowflake.core.user_defined_function import ( + Argument, + ReturnDataType, + PythonFunction, + UserDefinedFunction, +) +from snowflake.core.view import View, ViewColumn + + +""" +To join the flight and location focused tables +we need to cross the gap between the airport and cities domains. +For this we make use of a Snowpark Python UDF. +What's really cool is that Snowpark allows us to define a vectorized UDF +making the processing super efficient as we don’t have to invoke the +function on each row individually! + +To compute the mapping between airports and cities, +we use SnowflakeFile to read a JSON list from the pyairports package. +The SnowflakeFile class provides dynamic file access, to stream files of any size. +""" +map_city_to_airport = UserDefinedFunction( + name="get_city_for_airport", + arguments=[Argument(name="iata", datatype="VARCHAR")], + return_type=ReturnDataType(datatype="VARCHAR"), + language_config=PythonFunction( + runtime_version="3.11", packages=["snowflake-snowpark-python"], handler="main" + ), + body=""" +from snowflake.snowpark.files import SnowflakeFile +from _snowflake import vectorized +import pandas +import json + +@vectorized(input=pandas.DataFrame) +def main(df): + airport_list = json.loads( + SnowflakeFile.open("@bronze.raw/airport_list.json", 'r', require_scoped_url = False).read() + ) + airports = {airport[3]: airport[1] for airport in airport_list} + return df[0].apply(lambda iata: airports.get(iata.upper())) +""", +) + + +""" +To mangle the data into a more usable form, +we make use of views to not materialize the marketplace data +and avoid the corresponding storage costs. +""" + +pipeline = [ + # We are interested in the per seat carbon emissions. + # To obtain these, we need to divide the emission data by the number of seats in the airplane. + View( + name="flight_emissions", + columns=[ + ViewColumn(name="departure_airport"), + ViewColumn(name="arrival_airport"), + ViewColumn(name="co2_emissions_kg_per_person"), + ], + query=""" + select + departure_airport, + arrival_airport, + avg(estimated_co2_total_tonnes / seats) * 1000 as co2_emissions_kg_per_person + from oag_flight_emissions_data_sample.public.estimated_emissions_schedules_sample + where seats != 0 and estimated_co2_total_tonnes is not null + group by departure_airport, arrival_airport + """, + ), + # To avoid unreliable flight connections, we compute the fraction of flights that arrive + # early or on time from the flight status data provided by OAG. + View( + name="flight_punctuality", + columns=[ + ViewColumn(name="departure_iata_airport_code"), + ViewColumn(name="arrival_iata_airport_code"), + ViewColumn(name="punctual_pct"), + ], + query=""" + select + departure_iata_airport_code, + arrival_iata_airport_code, + count( + case when arrival_actual_ingate_timeliness IN ('OnTime', 'Early') THEN 1 END + ) / COUNT(*) * 100 as punctual_pct + from oag_flight_status_data_sample.public.flight_status_latest_sample + where arrival_actual_ingate_timeliness is not null + group by departure_iata_airport_code, arrival_iata_airport_code + """, + ), + # When joining the flight emissions with the punctuality view, + # we filter for flights starting from the airport closest to where we live. + # This information is provided in the tiny JSON file data/home.json which we query directly in the view. + View( + name="flights_from_home", + columns=[ + ViewColumn(name="departure_airport"), + ViewColumn(name="arrival_airport"), + ViewColumn(name="arrival_city"), + ViewColumn(name="co2_emissions_kg_per_person"), + ViewColumn(name="punctual_pct"), + ], + query=""" + select + departure_airport, + arrival_airport, + get_city_for_airport(arrival_airport) arrival_city, + co2_emissions_kg_per_person, + punctual_pct, + from flight_emissions + join flight_punctuality + on departure_airport = departure_iata_airport_code + and arrival_airport = arrival_iata_airport_code + where departure_airport = ( + select $1:airport + from @quickstart_common.public.quickstart_repo/branches/main/data/home.json + (FILE_FORMAT => bronze.json_format)) + """, + ), + # Weather Source provides a weather forecast for the upcoming two weeks. + # As the free versions of the data sets we use do not cover the entire globe, + # we limit our pipeline to zip codes inside the US and compute the average + # temperature, humidity, precipitation probability and cloud coverage. + View( + name="weather_forecast", + columns=[ + ViewColumn(name="postal_code"), + ViewColumn(name="avg_temperature_air_f"), + ViewColumn(name="avg_relative_humidity_pct"), + ViewColumn(name="avg_cloud_cover_pct"), + ViewColumn(name="precipitation_probability_pct"), + ], + query=""" + select + postal_code, + avg(avg_temperature_air_2m_f) avg_temperature_air_f, + avg(avg_humidity_relative_2m_pct) avg_relative_humidity_pct, + avg(avg_cloud_cover_tot_pct) avg_cloud_cover_pct, + avg(probability_of_precipitation_pct) precipitation_probability_pct + from global_weather__climate_data_for_bi.standard_tile.forecast_day + where country = 'US' + group by postal_code + """, + ), + # We use the data provided by Cybersyn to limit our pipeline to US cities with atleast + # 100k residents to enjoy all the benefits a big city provides during our vacation. + View( + name="major_us_cities", + columns=[ + ViewColumn(name="geo_id"), + ViewColumn(name="geo_name"), + ViewColumn(name="total_population"), + ], + query=""" + select + geo.geo_id, + geo.geo_name, + max(ts.value) total_population + from global_government.cybersyn.datacommons_timeseries ts + join global_government.cybersyn.geography_index geo + on ts.geo_id = geo.geo_id + join global_government.cybersyn.geography_relationships geo_rel + on geo_rel.related_geo_id = geo.geo_id + where true + and ts.variable_name = 'Total Population, census.gov' + and date >= '2020-01-01' + and geo.level = 'City' + and geo_rel.geo_id = 'country/USA' + and value > 100000 + group by geo.geo_id, geo.geo_name + order by total_population desc + """, + ), + # Using the geography relationships provided by Cybersyn we collect all the + # zip codes belonging to a city. + View( + name="zip_codes_in_city", + columns=[ + ViewColumn(name="city_geo_id"), + ViewColumn(name="city_geo_name"), + ViewColumn(name="zip_geo_id"), + ViewColumn(name="zip_geo_name"), + ], + query=""" + select + city.geo_id city_geo_id, + city.geo_name city_geo_name, + city.related_geo_id zip_geo_id, + city.related_geo_name zip_geo_name + from us_addresses__poi.cybersyn.geography_relationships country + join us_addresses__poi.cybersyn.geography_relationships city + on country.related_geo_id = city.geo_id + where true + and country.geo_id = 'country/USA' + and city.level = 'City' + and city.related_level = 'CensusZipCodeTabulationArea' + order by city_geo_id + """, + ), + View( + name="weather_joined_with_major_cities", + columns=[ + ViewColumn(name="geo_id"), + ViewColumn(name="geo_name"), + ViewColumn(name="total_population"), + ViewColumn(name="avg_temperature_air_f"), + ViewColumn(name="avg_relative_humidity_pct"), + ViewColumn(name="avg_cloud_cover_pct"), + ViewColumn(name="precipitation_probability_pct"), + ], + query=""" + select + city.geo_id, + city.geo_name, + city.total_population, + avg(avg_temperature_air_f) avg_temperature_air_f, + avg(avg_relative_humidity_pct) avg_relative_humidity_pct, + avg(avg_cloud_cover_pct) avg_cloud_cover_pct, + avg(precipitation_probability_pct) precipitation_probability_pct + from major_us_cities city + join zip_codes_in_city zip on city.geo_id = zip.city_geo_id + join weather_forecast weather on zip.zip_geo_name = weather.postal_code + group by city.geo_id, city.geo_name, city.total_population + """, + ), + # Placeholder: Add new view definition here +] + + +# entry point for PythonAPI +root = Root(Session.builder.getOrCreate()) + +# create views in Snowflake +silver_schema = root.databases["quickstart_prod"].schemas["silver"] +silver_schema.user_defined_functions.create( + map_city_to_airport, mode=CreateMode.or_replace +) +for view in pipeline: + silver_schema.views.create(view, mode=CreateMode.or_replace) diff --git a/steps/03_harmonize_data.sql b/steps/03_harmonize_data.sql deleted file mode 100644 index f5c9546..0000000 --- a/steps/03_harmonize_data.sql +++ /dev/null @@ -1,114 +0,0 @@ --- Views to transform marketplace data in pipeline -use role accountadmin; -use schema quickstart_prod.silver; - -/* -To join the flight and location focused tables -we need to cross the gap between the airport and cities domains. -For this we make use of a Snowpark Python UDF. -What's really cool is that Snowpark allows us to define a vectorized UDF -making the processing super efficient as we don’t have to invoke the -function on each row individually! - -To compute the mapping between airports and cities, -we use SnowflakeFile to read a JSON list from the pyairports package. -The SnowflakeFile class provides dynamic file access, to stream files of any size. - */ -create or replace function get_city_for_airport(iata varchar) -returns varchar -language python -runtime_version = '3.11' -handler = 'get_city_for_airport' -packages = ('snowflake-snowpark-python') -as $$ -from snowflake.snowpark.files import SnowflakeFile -from _snowflake import vectorized -import pandas -import json -@vectorized(input=pandas.DataFrame) -def get_city_for_airport(df): - airport_list = json.loads(SnowflakeFile.open("@bronze.raw/airport_list.json", 'r', require_scoped_url = False).read()) - airports = {airport[3]: airport[1] for airport in airport_list} - return df[0].apply(lambda iata: airports.get(iata.upper())) -$$; - -/* -To mangle the data into a more usable form, -we make use of views to not materialize the marketplace data -and avoid the corresponding storage costs. - */ - --- We are interested in the per seat carbon emissions. --- To obtain these, we need to divide the emission data by the number of seats in the airplane. -create or replace view flight_emissions as select departure_airport, arrival_airport, avg(estimated_co2_total_tonnes / seats) * 1000 as co2_emissions_kg_per_person - from oag_flight_emissions_data_sample.public.estimated_emissions_schedules_sample - where seats != 0 and estimated_co2_total_tonnes is not null - group by departure_airport, arrival_airport; - --- To avoid unreliable flight connections, we compute the fraction of flights that arrive --- early or on time from the flight status data provided by OAG. -create or replace view flight_punctuality as select departure_iata_airport_code, arrival_iata_airport_code, count(case when arrival_actual_ingate_timeliness IN ('OnTime', 'Early') THEN 1 END) / COUNT(*) * 100 as punctual_pct - from oag_flight_status_data_sample.public.flight_status_latest_sample - where arrival_actual_ingate_timeliness is not null - group by departure_iata_airport_code, arrival_iata_airport_code; - --- When joining the flight emissions with the punctuality view, --- we filter for flights starting from the airport closest to where we live. --- This information is provided in the tiny JSON file data/home.json which we query directly in the view. -create or replace view flights_from_home as - select - departure_airport, - arrival_airport, - get_city_for_airport(arrival_airport) arrival_city, - co2_emissions_kg_per_person, - punctual_pct, - from flight_emissions - join flight_punctuality on departure_airport = departure_iata_airport_code and arrival_airport = arrival_iata_airport_code - where departure_airport = (select $1:airport from @quickstart_common.public.quickstart_repo/branches/main/data/home.json (FILE_FORMAT => bronze.json_format)); - --- Weather Source provides a weather forecast for the upcoming two weeks. --- As the free versions of the data sets we use do not cover the entire globe, --- we limit our pipeline to zip codes inside the US and compute the average --- temperature, humidity, precipitation probability and cloud coverage. -create or replace view weather_forecast as select postal_code, avg(avg_temperature_air_2m_f) avg_temperature_air_f, avg(avg_humidity_relative_2m_pct) avg_relative_humidity_pct, avg(avg_cloud_cover_tot_pct) avg_cloud_cover_pct, avg(probability_of_precipitation_pct) precipitation_probability_pct - from global_weather__climate_data_for_bi.standard_tile.forecast_day - where country = 'US' - group by postal_code; - --- We use the data provided by Cybersyn to limit our pipeline to --- US cities with atleast 100k residents to enjoy all the benefits a big city provides during our vacation. -create or replace view major_us_cities as select geo.geo_id, geo.geo_name, max(ts.value) total_population - from government_essentials.cybersyn.datacommons_timeseries ts - join government_essentials.cybersyn.geography_index geo on ts.geo_id = geo.geo_id - join government_essentials.cybersyn.geography_relationships geo_rel on geo_rel.related_geo_id = geo.geo_id - where true - and ts.variable_name = 'Total Population, census.gov' - and date >= '2020-01-01' - and geo.level = 'City' - and geo_rel.geo_id = 'country/USA' - and value > 100000 - group by geo.geo_id, geo.geo_name - order by total_population desc; - --- Using the geography relationships provided by Cybersyn we collect all the zip codes belonging to a city. -create or replace view zip_codes_in_city as select city.geo_id city_geo_id, city.geo_name city_geo_name, city.related_geo_id zip_geo_id, city.related_geo_name zip_geo_name - from us_points_of_interest__addresses.cybersyn.geography_relationships country - join us_points_of_interest__addresses.cybersyn.geography_relationships city on country.related_geo_id = city.geo_id - where true - and country.geo_id = 'country/USA' - and city.level = 'City' - and city.related_level = 'CensusZipCodeTabulationArea' - order by city_geo_id; - -create or replace view weather_joined_with_major_cities as - select - city.geo_id, - city.geo_name, city.total_population, - avg(avg_temperature_air_f) avg_temperature_air_f, - avg(avg_relative_humidity_pct) avg_relative_humidity_pct, - avg(avg_cloud_cover_pct) avg_cloud_cover_pct, - avg(precipitation_probability_pct) precipitation_probability_pct - from major_us_cities city - join zip_codes_in_city zip on city.geo_id = zip.city_geo_id - join weather_forecast weather on zip.zip_geo_name = weather.postal_code - group by city.geo_id, city.geo_name, city.total_population; diff --git a/steps/04_orchestrate_jobs.sql b/steps/04_orchestrate_jobs.sql index 3802fb7..e5e1d12 100644 --- a/steps/04_orchestrate_jobs.sql +++ b/steps/04_orchestrate_jobs.sql @@ -20,6 +20,7 @@ create or alter table vacation_spots ( create or alter task vacation_spots_update schedule = '1440 minute' warehouse = 'quickstart_wh' + ERROR_ON_NONDETERMINISTIC_MERGE = false AS MERGE INTO vacation_spots USING ( select * from silver.flights_from_home flight diff --git a/steps/05_database_change_management.sql b/steps/05_database_change_management.sh similarity index 52% rename from steps/05_database_change_management.sql rename to steps/05_database_change_management.sh index a54c1d5..398c5b7 100644 --- a/steps/05_database_change_management.sql +++ b/steps/05_database_change_management.sh @@ -1,23 +1,37 @@ -/* +: ' ----------------------------------------------------------------- Perform the following changes after switching to the "dev" branch ----------------------------------------------------------------- --- Add this view definition at the end of steps/03_harmonize_data.sql -create or replace view attractions as select - city.geo_id, - city.geo_name, - count(case when category_main = 'Aquarium' THEN 1 END) aquarium_cnt, - count(case when category_main = 'Zoo' THEN 1 END) zoo_cnt, - count(case when category_main = 'Korean Restaurant' THEN 1 END) korean_restaurant_cnt, -from us_points_of_interest__addresses.cybersyn.point_of_interest_index poi -join us_points_of_interest__addresses.cybersyn.point_of_interest_addresses_relationships poi_add on poi_add.poi_id = poi.poi_id -join us_points_of_interest__addresses.cybersyn.us_addresses address on address.address_id = poi_add.address_id -join major_us_cities city on city.geo_id = address.id_city -where true - and category_main in ('Aquarium', 'Zoo', 'Korean Restaurant') - and id_country = 'country/USA' -group by city.geo_id, city.geo_name; +-- Add this view definition at the placeholder near the end of steps/03_harmonize_data.sql +View( + name="attractions", + columns=[ + ViewColumn(name="geo_id"), + ViewColumn(name="geo_name"), + ViewColumn(name="aquarium_cnt"), + ViewColumn(name="zoo_cnt"), + ViewColumn(name="korean_restaurant_cnt"), + ], + query=""" + select + city.geo_id, + city.geo_name, + count(case when category_main = 'Aquarium' THEN 1 END) aquarium_cnt, + count(case when category_main = 'Zoo' THEN 1 END) zoo_cnt, + count(case when category_main = 'Korean Restaurant' THEN 1 END) korean_restaurant_cnt, + from us_addresses__poi.cybersyn.point_of_interest_index poi + join us_addresses__poi.cybersyn.point_of_interest_addresses_relationships poi_add + on poi_add.poi_id = poi.poi_id + join us_addresses__poi.cybersyn.us_addresses address + on address.address_id = poi_add.address_id + join major_us_cities city on city.geo_id = address.id_city + where true + and category_main in ('Aquarium', 'Zoo', 'Korean Restaurant') + and id_country = 'country/USA' + group by city.geo_id, city.geo_name + """, +), -- Append the following column definitions to the column list of the CREATE OR ALTER TABLE in steps/04_orchestrate_jobs.sql @@ -50,10 +64,9 @@ and (zoo_cnt > 0 or aquarium_cnt > 0) --------------------------------------------------------- Commit your changes to the "dev" branch before continuing --------------------------------------------------------- -*/ +' -use role accountadmin; --- Fetch changes from GitHub -alter git repository quickstart_common.public.quickstart_repo fetch; --- Deploy the updated data pipeline -execute immediate from @quickstart_common.public.quickstart_repo/branches/dev/deploy_pipeline.sql; +# Fetch changes from GitHub +snow git fetch quickstart_common.public.quickstart_repo +# Deploy the updated data pipeline +snow git execute @quickstart_common.public.quickstart_repo/branches/dev/steps/0[134]_* diff --git a/steps/06_separate_dev_and_prod_environments.sql b/steps/06_separate_dev_and_prod_environments.sql index 94d2d0b..7ab98c0 100644 --- a/steps/06_separate_dev_and_prod_environments.sql +++ b/steps/06_separate_dev_and_prod_environments.sql @@ -4,11 +4,11 @@ Perform the following changes to parametrize the deployment target of the pipeli ---------------------------------------------------------------------------------- -- Parametrize the database name of the CREATE DATABASE command in steps/01_setup_snowflake.sql -CREATE DATABASE IF NOT EXISTS QUICKSTART_{{environment}}; +CREATE OR ALTER DATABASE QUICKSTART_{{environment}}; --- Parametrize the USE SCHEMA in steps/03_harmonize_data.sql -use schema quickstart_{{environment}}.silver; +-- Use the environment variable in steps/03_harmonize_data.py +silver_schema = root.databases[f"quickstart_{os.environ['environment']}"].schemas["silver"] -- Parametrize the USE SCHEMA in steps/04_orchestrate_jobs.sql @@ -17,6 +17,4 @@ use schema quickstart_{{environment}}.gold; -- Parametrize DATA_RETENTION_TIME_IN_DAYS of CREATE OR ALTER TABLE in steps/04_orchestrate_jobs.sql data_retention_time_in_days = {{retention_time}}; - - */