Skip to content

Commit

Permalink
fixed error - changed filepaths
Browse files Browse the repository at this point in the history
  • Loading branch information
claireboyd committed May 6, 2024
1 parent 99269f7 commit 781309c
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 133 deletions.
Binary file added .DS_Store
Binary file not shown.
46 changes: 26 additions & 20 deletions fa-etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@ def mem_profile() -> str:
mem_use = str(round(100 - psutil.virtual_memory().percent,4))+'% of '+str(round(psutil.virtual_memory().total/1e+9,3))+' GB RAM'
return mem_use

def is_lazydataframe_empty(ldf):
"""
Checks if a polars lazy dataframe is empty given a lazy dataframe.
Returns: boolean (True, False)
"""
return ((ldf.describe().filter(pl.col("statistic") == "count")["PropertyID"])[0] == 0)

def convert_sales(filename, input_dir):
'''
Convert zipped txt sales (deed) file into parquet format.
Expand Down Expand Up @@ -266,6 +273,8 @@ def convert_valhist(filename, input_dir):
input_filepath = input_dir + "/raw/" + filename
output_dir = input_dir + "/" + "staging"
output_filepath = output_dir + "/" + filename.replace(".txt.zip", ".parquet")
output_filepath_temp1 = output_dir + "/rankedtemp1_" + filename.replace(".txt.zip", ".parquet")
output_filepath_temp2 = output_dir + "/rankedtemp2_" + filename.replace(".txt.zip", ".parquet")
output_filepath_ranked = output_dir + "/ranked_" + filename.replace(".txt.zip", ".parquet")

# check if parquet already exists, if it does, skip
Expand Down Expand Up @@ -303,7 +312,7 @@ def convert_valhist(filename, input_dir):
logging.info(f"{output_filepath} already exists. Moving on...")

if not os.path.exists(output_filepath_ranked):
logging.info(f"Creating {output_filepath_ranked}...")
logging.info(f"Creating {output_filepath_temp1}...")

#temp filepaths
assd_filepath = output_dir+"/assd.parquet"
Expand All @@ -312,11 +321,7 @@ def convert_valhist(filename, input_dir):
logging.info(f"filepaths: {assd_filepath}, {market_filepath} and {appr_filepath}...")

if not os.path.exists(assd_filepath) & os.path.exists(market_filepath) & os.path.exists(appr_filepath):

logging.info(f"Creating assd parquet...")
#split val hist into three separate datasets with PropertyID, Year as consistent
##TODO: this seems super repetitive, but it might be the best option given the size of the dataset.

(pl.scan_parquet(Path(output_filepath), low_memory = True, use_statistics=True, hive_partitioning=True)
.with_columns([pl.col('AssdYear').cast(pl.Int64).alias('Year')])
.filter(
Expand All @@ -342,50 +347,51 @@ def convert_valhist(filename, input_dir):

#write checks - make sure there are no duplicates in the above (by propID/year)
# if so, raise error and don't proceed
assd = pl.scan_parquet(Path(assd_filepath), low_memory = True, parallel='auto')
appr = pl.scan_parquet(Path(appr_filepath), low_memory = True, parallel='auto')
market = pl.scan_parquet(Path(market_filepath), low_memory = True, parallel='auto')

assd = pl.scan_parquet(Path(assd_filepath), low_memory = True)
appr = pl.scan_parquet(Path(appr_filepath), low_memory = True)
market = pl.scan_parquet(Path(market_filepath), low_memory = True)

logging.info(f"Joining assessed values and market values on propid/year...")
# join with market data
assd.join(
other=market,
how="left",
on=['PropertyID', 'Year'],
).sink_parquet(Path(output_filepath_ranked), compression="snappy")

).sink_parquet(Path(output_filepath_temp1), compression="snappy")

logging.info(f"val/market join on propid/year complete. Starting second join...")

rankedtemp1_valhist = pl.scan_parquet(Path(output_filepath_temp1), low_memory = True)
logging.info(f"is ranked_valhist empty? {is_lazydataframe_empty(rankedtemp1_valhist)}")

# check if the length of the output of a ldf is 0 (aka dataframe is empty)
logging.info(f"Check if appraisal dataframe is empty...")
if (appr.describe().filter(pl.col("statistic") == "count")["PropertyID"])[0] != 0:
if not is_lazydataframe_empty(appr):
logging.info(f"Appraisal dataframe is not empty! Joining with val/market...")

(pl.scan_parquet(Path(output_filepath_ranked), low_memory = True, parallel='row_groups', use_statistics=False, hive_partitioning=False)
(rankedtemp1_valhist
# # join with appr data
).join(
other=appr,
how="left",
on=['PropertyID', 'Year'],
).sink_parquet(
Path(output_filepath_ranked),
Path(output_filepath_temp2),
compression="snappy"
)
else:
else:
logging.info(f"Appraisal dataframe is empty! Adding a col of nulls for appraisal col...")
(pl.scan_parquet(Path(output_filepath_ranked), low_memory = True, parallel='row_groups', use_statistics=False, hive_partitioning=False)
# # join with appr data
(rankedtemp1_valhist
# add col of nulls for ApprTotalValue because not present for any PropIDs
).with_columns([
# add col of nulls for ApprTotalValue because not present for any PropIDs
pl.when(True).then(None).alias("ApprTotalValue")
]).sink_parquet(
Path(output_filepath_ranked),
Path(output_filepath_temp2),
compression="snappy"
)

logging.info(f"val/market/appr join on propid/year complete. Doing with_cols operations...")
(pl.scan_parquet(Path(output_filepath_ranked), low_memory = True, parallel='row_groups', use_statistics=False, hive_partitioning=False)
(pl.scan_parquet(Path(output_filepath_temp2), low_memory = True)
.with_columns([
#value conditional
pl.when((pl.col("AssdTotalValue").is_not_null()) & (pl.col("AssdTotalValue") != 0))
Expand Down
Loading

0 comments on commit 781309c

Please sign in to comment.