diff --git a/capstone/rainforest/great_expectations/.gitignore b/capstone/rainforest/great_expectations/.gitignore index b386a5e..966f6d2 100644 --- a/capstone/rainforest/great_expectations/.gitignore +++ b/capstone/rainforest/great_expectations/.gitignore @@ -1,4 +1,5 @@ +uncommitted/ uncommitted/ uncommitted/ uncommitted/ \ No newline at end of file diff --git a/capstone/rainforest/resources/schemas/bronze_schema.py b/capstone/rainforest/resources/schemas/bronze_schema.py deleted file mode 100644 index afd91a2..0000000 --- a/capstone/rainforest/resources/schemas/bronze_schema.py +++ /dev/null @@ -1,254 +0,0 @@ -from pyspark.sql import SparkSession - - -def create_tables( - spark, path="s3a://rainforest/delta", database: str = "rainforest" -): - spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}") - - # User table - spark.sql(f"DROP TABLE IF EXISTS {database}.user") - spark.sql( - f""" - CREATE TABLE {database}.user ( - user_id INT, - username STRING, - email STRING, - is_active BOOLEAN, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/user' - """ - ) - - # Seller table - spark.sql(f"DROP TABLE IF EXISTS {database}.seller") - spark.sql( - f""" - CREATE TABLE {database}.seller ( - seller_id INT, - user_id INT, - first_time_sold_timestamp TIMESTAMP, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/seller' - """ - ) - - # Buyer table - spark.sql(f"DROP TABLE IF EXISTS {database}.buyer") - spark.sql( - f""" - CREATE TABLE {database}.buyer ( - buyer_id INT, - user_id INT, - first_time_purchased_timestamp TIMESTAMP, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/buyer' - """ - ) - - # SellerProduct table - spark.sql(f"DROP TABLE IF EXISTS {database}.seller_product") - spark.sql( - f""" - CREATE TABLE {database}.seller_product ( - seller_id INT, - product_id INT, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/seller_product' - """ - ) - - # Brand table - spark.sql(f"DROP TABLE IF EXISTS {database}.brand") - spark.sql( - f""" - CREATE TABLE {database}.brand ( - brand_id INT, - name STRING, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/brand' - """ - ) - - # Manufacturer table - spark.sql(f"DROP TABLE IF EXISTS {database}.manufacturer") - spark.sql( - f""" - CREATE TABLE {database}.manufacturer ( - manufacturer_id INT, - name STRING, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/manufacturer' - """ - ) - - # Product table - spark.sql(f"DROP TABLE IF EXISTS {database}.product") - spark.sql( - f""" - CREATE TABLE {database}.product ( - product_id INT, - name STRING, - description STRING, - price DECIMAL(10, 2), - brand_id INT, - manufacturer_id INT, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/product' - """ - ) - - # Reviews table - spark.sql(f"DROP TABLE IF EXISTS {database}.reviews") - spark.sql( - f""" - CREATE TABLE {database}.reviews ( - review_id INT, - product_id INT, - rating INT, - comment STRING, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/reviews' - """ - ) - - # ProductCategory table - spark.sql(f"DROP TABLE IF EXISTS {database}.product_category") - spark.sql( - f""" - CREATE TABLE {database}.product_category ( - product_id INT, - category_id INT, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/product_category' - """ - ) - - # Category table - spark.sql(f"DROP TABLE IF EXISTS {database}.category") - spark.sql( - f""" - CREATE TABLE {database}.category ( - category_id INT, - name STRING, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/category' - """ - ) - - # Order table - spark.sql(f"DROP TABLE IF EXISTS {database}.order") - spark.sql( - f""" - CREATE TABLE {database}.order ( - order_id INT, - buyer_id INT, - order_ts TIMESTAMP, - total_price DECIMAL(10, 2), - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/order' - """ - ) - - # OrderItem table - spark.sql(f"DROP TABLE IF EXISTS {database}.order_item") - spark.sql( - f""" - CREATE TABLE {database}.order_item ( - order_item_id INT, - order_id INT, - product_id INT, - seller_id INT, - quantity INT, - base_price DECIMAL(10, 2), - tax DECIMAL(10, 2), - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/order_item' - """ - ) - - # Clickstream table - spark.sql(f"DROP TABLE IF EXISTS {database}.clickstream") - spark.sql( - f""" - CREATE TABLE {database}.clickstream ( - event_id INT, - user_id INT, - event_type STRING, - product_id INT, - order_id INT, - timestamp TIMESTAMP, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/clickstream' - """ - ) - - -if __name__ == '__main__': - spark = ( - SparkSession.builder.appName("rainforest_ddl") - .config("spark.executor.cores", "1") - .config("spark.executor.instances", "1") - .enableHiveSupport() - .getOrCreate() - ) - create_tables(spark) diff --git a/capstone/rainforest/resources/schemas/gold_schema.py b/capstone/rainforest/resources/schemas/gold_schema.py deleted file mode 100644 index 5d3e278..0000000 --- a/capstone/rainforest/resources/schemas/gold_schema.py +++ /dev/null @@ -1,89 +0,0 @@ -def create_tables( - spark, path="s3a://rainforest/delta", database: str = "rainforest" -): - spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}") - - # OBT Schema (Order Item Grain) - spark.sql(f"DROP TABLE IF EXISTS {database}.obt_order_item") - spark.sql( - f""" - CREATE TABLE {database}.obt_order_item ( - order_item_id INT, - order_id INT, - order_ts TIMESTAMP, - total_price DECIMAL(10, 2), - buyer_id INT, - buyer_username STRING, - buyer_email STRING, - buyer_is_active BOOLEAN, - buyer_first_time_purchased_timestamp TIMESTAMP, - seller_id INT, - seller_username STRING, - seller_email STRING, - seller_is_active BOOLEAN, - seller_first_time_sold_timestamp TIMESTAMP, - product_id INT, - product_name STRING, - product_description STRING, - product_price DECIMAL(10, 2), - brand_id INT, - brand_name STRING, - manufacturer_id INT, - manufacturer_name STRING, - review_count INT, - avg_rating DECIMAL(3, 2), - category_id INT, - category_name STRING, - quantity INT, - base_price DECIMAL(10, 2), - tax DECIMAL(10, 2), - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/obt_order_item' - """ - ) - - # OBR Schema (User Event Grain) - spark.sql(f"DROP TABLE IF EXISTS {database}.obr_user_event") - spark.sql( - f""" - CREATE TABLE {database}.obr_user_event ( - event_id INT, - user_id INT, - username STRING, - email STRING, - is_active BOOLEAN, - is_buyer BOOLEAN, - is_seller BOOLEAN, - first_time_purchased_timestamp TIMESTAMP, - first_time_sold_timestamp TIMESTAMP, - event_type STRING, - product_id INT, - product_name STRING, - product_description STRING, - product_price DECIMAL(10, 2), - brand_id INT, - brand_name STRING, - manufacturer_id INT, - manufacturer_name STRING, - review_count INT, - avg_rating DECIMAL(3, 2), - category_id INT, - category_name STRING, - order_id INT, - order_ts TIMESTAMP, - total_price DECIMAL(10, 2), - timestamp TIMESTAMP, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/obr_user_event' - """ - ) diff --git a/capstone/rainforest/resources/schemas/interface_schema.py b/capstone/rainforest/resources/schemas/interface_schema.py deleted file mode 100644 index 3f74b68..0000000 --- a/capstone/rainforest/resources/schemas/interface_schema.py +++ /dev/null @@ -1,54 +0,0 @@ -def create_tables( - spark, path="s3a://rainforest/delta", database: str = "rainforest" -): - spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}") - - # Table for event-level metrics - spark.sql(f"DROP TABLE IF EXISTS {database}.event_level_metrics") - spark.sql( - f""" - CREATE TABLE {database}.event_level_metrics ( - product_id INT, - product_name STRING, - category_id INT, - category_name STRING, - brand_id INT, - brand_name STRING, - manufacturer_id INT, - manufacturer_name STRING, - event_month INT, - event_year INT, - total_events INT, - view_events INT, - add_to_cart_events INT, - purchase_events INT, - conversion_rate DECIMAL(5, 4), - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (event_year, event_month, etl_inserted) - LOCATION '{path}/event_level_metrics' - """ - ) - - # Table for sales metrics - spark.sql(f"DROP TABLE IF EXISTS {database}.sales_metrics") - spark.sql( - f""" - CREATE TABLE {database}.sales_metrics ( - brand_id INT, - brand_name STRING, - manufacturer_id INT, - manufacturer_name STRING, - category_id INT, - category_name STRING, - sales_month INT, - sales_year INT, - total_sales DECIMAL(18, 2), - total_quantity INT, - avg_price DECIMAL(10, 2), - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (sales_year, sales_month, etl_inserted) - LOCATION '{path}/sales_metrics' - """ - ) diff --git a/capstone/rainforest/resources/schemas/silver_schema.py b/capstone/rainforest/resources/schemas/silver_schema.py deleted file mode 100644 index 08cb39b..0000000 --- a/capstone/rainforest/resources/schemas/silver_schema.py +++ /dev/null @@ -1,197 +0,0 @@ -def create_tables( - spark, path="s3a://rainforest/delta", database: str = "rainforest" -): - spark.sql(f"CREATE DATABASE IF NOT EXISTS {database}") - - # Dimensions - spark.sql(f"DROP TABLE IF EXISTS {database}.dim_buyer") - spark.sql( - f""" - CREATE TABLE {database}.dim_buyer ( - buyer_id INT, - user_id INT, - username STRING, - email STRING, - is_active BOOLEAN, - first_time_purchased_timestamp TIMESTAMP, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/dim_buyer' - """ - ) - - spark.sql(f"DROP TABLE IF EXISTS {database}.dim_seller") - spark.sql( - f""" - CREATE TABLE {database}.dim_seller ( - seller_id INT, - user_id INT, - username STRING, - email STRING, - is_active BOOLEAN, - first_time_sold_timestamp TIMESTAMP, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/dim_seller' - """ - ) - - spark.sql(f"DROP TABLE IF EXISTS {database}.dim_product") - spark.sql( - f""" - CREATE TABLE {database}.dim_product ( - product_id INT, - name STRING, - description STRING, - price DECIMAL(10, 2), - brand_id INT, - brand_name STRING, - manufacturer_id INT, - manufacturer_name STRING, - review_count INT, - avg_rating DECIMAL(3, 2), - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/dim_product' - """ - ) - - spark.sql(f"DROP TABLE IF EXISTS {database}.dim_category") - spark.sql( - f""" - CREATE TABLE {database}.dim_category ( - category_id INT, - name STRING, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/dim_category' - """ - ) - - spark.sql(f"DROP TABLE IF EXISTS {database}.dim_date") - spark.sql( - f""" - CREATE TABLE {database}.dim_date ( - date_key DATE, - date DATE, - year INT, - quarter INT, - month INT, - day INT, - day_of_week INT, - day_name STRING, - day_of_month INT, - day_of_quarter INT, - day_of_year INT, - week_of_month INT, - week_of_year INT, - month_name STRING, - year_month INT, - year_quarter INT - ) USING DELTA - LOCATION '{path}/dim_date' - """ - ) - - # Bridge Tables - spark.sql(f"DROP TABLE IF EXISTS {database}.seller_x_product") - spark.sql( - f""" - CREATE TABLE {database}.seller_x_product ( - seller_id INT, - product_id INT, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/seller_x_product' - """ - ) - - spark.sql(f"DROP TABLE IF EXISTS {database}.product_x_category") - spark.sql( - f""" - CREATE TABLE {database}.product_x_category ( - product_id INT, - category_id INT, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/product_x_category' - """ - ) - - # Facts - spark.sql(f"DROP TABLE IF EXISTS {database}.fact_orders") - spark.sql( - f""" - CREATE TABLE {database}.fact_orders ( - order_id INT, - buyer_id INT, - order_ts TIMESTAMP, - total_price DECIMAL(10, 2), - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/fact_orders' - """ - ) - - spark.sql(f"DROP TABLE IF EXISTS {database}.fact_order_items") - spark.sql( - f""" - CREATE TABLE {database}.fact_order_items ( - order_item_id INT, - order_id INT, - product_id INT, - seller_id INT, - quantity INT, - base_price DECIMAL(10, 2), - tax DECIMAL(10, 2), - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/fact_order_items' - """ - ) - - spark.sql(f"DROP TABLE IF EXISTS {database}.fact_clickstream_events") - spark.sql( - f""" - CREATE TABLE {database}.fact_clickstream_events ( - event_id INT, - user_id INT, - event_type STRING, - product_id INT, - order_id INT, - timestamp TIMESTAMP, - created_ts TIMESTAMP, - last_updated_by INT, - last_updated_ts TIMESTAMP, - etl_inserted TIMESTAMP - ) USING DELTA - PARTITIONED BY (etl_inserted) - LOCATION '{path}/fact_clickstream_events' - """ - )