-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathFirstRate 30min data ETL.py
78 lines (55 loc) · 2.61 KB
/
FirstRate 30min data ETL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Databricks notebook source
#Mount an Azure Blob storage container
#If you mount it already, please comment the following code
"""
dbutils.fs.mount(
source = "wasbs://[email protected]",
mount_point = "/mnt/finance",
extra_configs = {"fs.azure.account.key.finstorage6ef5xpkr7mo3s.blob.core.windows.net":"n1cT5j8fFP+qHHI6ve/K2rWAIT/xf/yrTA19WmMZSneFYKYvHt3ux2KRcvIfqZ365meXDXzAOqMX+AStJdrpEA=="})
"""
# COMMAND ----------
#Unmount a mount point
#dbutils.fs.unmount("/mnt/finance")
# COMMAND ----------
# MAGIC %sql
# MAGIC CREATE DATABASE IF NOT EXISTS deltabase
# COMMAND ----------
# MAGIC %sql
# MAGIC Use deltabase
# COMMAND ----------
# MAGIC %sql
# MAGIC
# MAGIC --Enable Auto Optimization
# MAGIC set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
# MAGIC set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;
# COMMAND ----------
import os
from pyspark import SparkFiles
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql.functions import * #import avg, col, udf
from pyspark.sql import SQLContext
from pyspark.sql import DataFrame
from pyspark.sql.types import *
#LIST, RENAME, AND SAVE ALL FILES AS DELTA LAKE AUTOMATICALLY
#Data written to mount point paths ( /mnt ) is stored outside of the DBFS root
path_30min = '/dbfs/mnt/finance/FirstRate30min'
filename_lists_30min = os.listdir(path_30min)
df_30min_ = {}
delta_30min = {}
for filename_30min in os.listdir(path_30min):
#split file name
rawname_30min = filename_30min.split('_')[0]
name_30min = rawname_30min.split('-')[0]
#create clolumn header names
temp_30min = StructType([StructField(name_30min+"_dateTime", TimestampType(), True),StructField(name_30min+"_adjOpen", FloatType(), True),StructField(name_30min+"_adjHigh", FloatType(), True),StructField(name_30min+"_adjLow", FloatType(), True),StructField(name_30min+"_adjClose", FloatType(), True),StructField(name_30min+"_adjVolume", IntegerType(), True)])
#list and create csv dataframes
temp_df_30min = spark.read.format("csv").option("header", "false").schema(temp_30min).load("/mnt/finance/FirstRate30min/"+filename_30min).withColumn("Ticker", lit(name_30min))
#name each dataframes
df_30min_[name_30min] = temp_df_30min
#name each table
table_name_30min = name_30min+'_30min_delta'
print(table_name_30min)
#create delta lake for each dataframes
df_30min_[name_30min].write.format("delta").mode("overwrite").option("overwriteSchema","True").saveAsTable(table_name_30min)