diff --git a/.DS_Store b/.DS_Store
new file mode 100644
index 0000000..2a847af
Binary files /dev/null and b/.DS_Store differ
diff --git a/fa-etl.py b/fa-etl.py
index e86350e..bbe640a 100644
--- a/fa-etl.py
+++ b/fa-etl.py
@@ -10,6 +10,13 @@ def mem_profile() -> str:
mem_use = str(round(100 - psutil.virtual_memory().percent,4))+'% of '+str(round(psutil.virtual_memory().total/1e+9,3))+' GB RAM'
return mem_use
+def is_lazydataframe_empty(ldf):
+ """
+ Checks if a polars lazy dataframe is empty given a lazy dataframe.
+ Returns: boolean (True, False)
+ """
+ return ((ldf.describe().filter(pl.col("statistic") == "count")["PropertyID"])[0] == 0)
+
def convert_sales(filename, input_dir):
'''
Convert zipped txt sales (deed) file into parquet format.
@@ -266,6 +273,8 @@ def convert_valhist(filename, input_dir):
input_filepath = input_dir + "/raw/" + filename
output_dir = input_dir + "/" + "staging"
output_filepath = output_dir + "/" + filename.replace(".txt.zip", ".parquet")
+ output_filepath_temp1 = output_dir + "/rankedtemp1_" + filename.replace(".txt.zip", ".parquet")
+ output_filepath_temp2 = output_dir + "/rankedtemp2_" + filename.replace(".txt.zip", ".parquet")
output_filepath_ranked = output_dir + "/ranked_" + filename.replace(".txt.zip", ".parquet")
# check if parquet already exists, if it does, skip
@@ -303,7 +312,7 @@ def convert_valhist(filename, input_dir):
logging.info(f"{output_filepath} already exists. Moving on...")
if not os.path.exists(output_filepath_ranked):
- logging.info(f"Creating {output_filepath_ranked}...")
+ logging.info(f"Creating {output_filepath_temp1}...")
#temp filepaths
assd_filepath = output_dir+"/assd.parquet"
@@ -312,11 +321,7 @@ def convert_valhist(filename, input_dir):
logging.info(f"filepaths: {assd_filepath}, {market_filepath} and {appr_filepath}...")
if not os.path.exists(assd_filepath) & os.path.exists(market_filepath) & os.path.exists(appr_filepath):
-
logging.info(f"Creating assd parquet...")
- #split val hist into three separate datasets with PropertyID, Year as consistent
- ##TODO: this seems super repetitive, but it might be the best option given the size of the dataset.
-
(pl.scan_parquet(Path(output_filepath), low_memory = True, use_statistics=True, hive_partitioning=True)
.with_columns([pl.col('AssdYear').cast(pl.Int64).alias('Year')])
.filter(
@@ -342,9 +347,10 @@ def convert_valhist(filename, input_dir):
#write checks - make sure there are no duplicates in the above (by propID/year)
# if so, raise error and don't proceed
- assd = pl.scan_parquet(Path(assd_filepath), low_memory = True, parallel='auto')
- appr = pl.scan_parquet(Path(appr_filepath), low_memory = True, parallel='auto')
- market = pl.scan_parquet(Path(market_filepath), low_memory = True, parallel='auto')
+
+ assd = pl.scan_parquet(Path(assd_filepath), low_memory = True)
+ appr = pl.scan_parquet(Path(appr_filepath), low_memory = True)
+ market = pl.scan_parquet(Path(market_filepath), low_memory = True)
logging.info(f"Joining assessed values and market values on propid/year...")
# join with market data
@@ -352,40 +358,40 @@ def convert_valhist(filename, input_dir):
other=market,
how="left",
on=['PropertyID', 'Year'],
- ).sink_parquet(Path(output_filepath_ranked), compression="snappy")
-
+ ).sink_parquet(Path(output_filepath_temp1), compression="snappy")
logging.info(f"val/market join on propid/year complete. Starting second join...")
+ rankedtemp1_valhist = pl.scan_parquet(Path(output_filepath_temp1), low_memory = True)
+ logging.info(f"is ranked_valhist empty? {is_lazydataframe_empty(rankedtemp1_valhist)}")
+
# check if the length of the output of a ldf is 0 (aka dataframe is empty)
logging.info(f"Check if appraisal dataframe is empty...")
- if (appr.describe().filter(pl.col("statistic") == "count")["PropertyID"])[0] != 0:
+ if not is_lazydataframe_empty(appr):
logging.info(f"Appraisal dataframe is not empty! Joining with val/market...")
-
- (pl.scan_parquet(Path(output_filepath_ranked), low_memory = True, parallel='row_groups', use_statistics=False, hive_partitioning=False)
+ (rankedtemp1_valhist
# # join with appr data
).join(
other=appr,
how="left",
on=['PropertyID', 'Year'],
).sink_parquet(
- Path(output_filepath_ranked),
+ Path(output_filepath_temp2),
compression="snappy"
)
- else:
+ else:
logging.info(f"Appraisal dataframe is empty! Adding a col of nulls for appraisal col...")
- (pl.scan_parquet(Path(output_filepath_ranked), low_memory = True, parallel='row_groups', use_statistics=False, hive_partitioning=False)
- # # join with appr data
+ (rankedtemp1_valhist
+ # add col of nulls for ApprTotalValue because not present for any PropIDs
).with_columns([
- # add col of nulls for ApprTotalValue because not present for any PropIDs
pl.when(True).then(None).alias("ApprTotalValue")
]).sink_parquet(
- Path(output_filepath_ranked),
+ Path(output_filepath_temp2),
compression="snappy"
)
logging.info(f"val/market/appr join on propid/year complete. Doing with_cols operations...")
- (pl.scan_parquet(Path(output_filepath_ranked), low_memory = True, parallel='row_groups', use_statistics=False, hive_partitioning=False)
+ (pl.scan_parquet(Path(output_filepath_temp2), low_memory = True)
.with_columns([
#value conditional
pl.when((pl.col("AssdTotalValue").is_not_null()) & (pl.col("AssdTotalValue") != 0))
diff --git a/scratch.ipynb b/scratch.ipynb
index efd9477..d44198f 100644
--- a/scratch.ipynb
+++ b/scratch.ipynb
@@ -2,18 +2,9 @@
"cells": [
{
"cell_type": "code",
- "execution_count": 2,
+ "execution_count": 1,
"metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "The autoreload extension is already loaded. To reload it, use:\n",
- " %reload_ext autoreload\n"
- ]
- }
- ],
+ "outputs": [],
"source": [
"#set up autoreload\n",
"%load_ext autoreload\n",
@@ -28,7 +19,7 @@
},
{
"cell_type": "code",
- "execution_count": 69,
+ "execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
@@ -49,7 +40,7 @@
},
{
"cell_type": "code",
- "execution_count": 70,
+ "execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
@@ -57,184 +48,134 @@
" other=market,\n",
" how=\"left\",\n",
" on=['PropertyID', 'Year'],\n",
- ").sink_parquet(Path(output_filepath_ranked), compression=\"snappy\")\n"
+ ").sink_parquet(Path(output_filepath_ranked), compression=\"snappy\")"
]
},
{
"cell_type": "code",
- "execution_count": 71,
+ "execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
- "valhist = pl.read_parquet(path_to_dir+f\"dev/{county}/staging/ranked_ValHist{county}.parquet\")"
+ "ranked_valhist = pl.read_parquet(path_to_dir+f\"dev/{county}/staging/ranked_ValHist{county}.parquet\")"
]
},
{
"cell_type": "code",
- "execution_count": 72,
+ "execution_count": 22,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
- "(3566623, 4)"
+ "False"
]
},
- "execution_count": 72,
+ "execution_count": 22,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "valhist.shape"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 43,
- "metadata": {},
- "outputs": [],
- "source": [
- "lf = pl.LazyFrame(\n",
- " {\n",
- " \"a\": [1, 2, 3, 4],\n",
- " \"b\": [0.5, 4, 10, 13],\n",
- " \"c\": [True, True, False, True],\n",
- " }\n",
- ")"
+ "fa_etl.is_lazydataframe_empty(path_to_dir+f\"dev/{county}/staging/ranked_ValHist{county}.parquet\")"
]
},
{
"cell_type": "code",
- "execution_count": 56,
+ "execution_count": 23,
"metadata": {},
"outputs": [
{
"data": {
- "text/html": [
- "
\n",
- "
shape: (4, 4)a | b | c | Appr |
---|
i64 | f64 | bool | null |
1 | 0.5 | true | null |
2 | 4.0 | true | null |
3 | 10.0 | false | null |
4 | 13.0 | true | null |
"
- ],
"text/plain": [
- "shape: (4, 4)\n",
- "┌─────┬──────┬───────┬──────┐\n",
- "│ a ┆ b ┆ c ┆ Appr │\n",
- "│ --- ┆ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ f64 ┆ bool ┆ null │\n",
- "╞═════╪══════╪═══════╪══════╡\n",
- "│ 1 ┆ 0.5 ┆ true ┆ null │\n",
- "│ 2 ┆ 4.0 ┆ true ┆ null │\n",
- "│ 3 ┆ 10.0 ┆ false ┆ null │\n",
- "│ 4 ┆ 13.0 ┆ true ┆ null │\n",
- "└─────┴──────┴───────┴──────┘"
+ "(3566623, 4)"
]
},
- "execution_count": 56,
+ "execution_count": 23,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "lf.with_columns([\n",
- " pl.when(True).then(None).alias(\"ApprTotalValue\")\n",
- "]).collect()"
+ "ranked_valhist.shape"
]
},
{
"cell_type": "code",
- "execution_count": 12,
+ "execution_count": 8,
"metadata": {},
"outputs": [
{
"data": {
- "text/html": [
- "\n",
- "
shape: (0, 16)PropertyID | SaleAmt | RecordingDate | FIPS | FATimeStamp | FATransactionID | TransactionType | SaleDate | RecordingYearSlice | PropertyID_str | FATransactionID_1 | RecordingYear | SaleYear | FATimeStampYear | SaleFlag | PropIDFlag |
---|
i64 | i64 | date | str | date | i64 | str | date | str | str | str | i64 | i32 | i32 | i32 | i32 |
"
- ],
"text/plain": [
- "shape: (0, 16)\n",
- "┌────────────┬─────────┬──────────────┬──────┬───┬──────────┬──────────────┬──────────┬────────────┐\n",
- "│ PropertyID ┆ SaleAmt ┆ RecordingDat ┆ FIPS ┆ … ┆ SaleYear ┆ FATimeStampY ┆ SaleFlag ┆ PropIDFlag │\n",
- "│ --- ┆ --- ┆ e ┆ --- ┆ ┆ --- ┆ ear ┆ --- ┆ --- │\n",
- "│ i64 ┆ i64 ┆ --- ┆ str ┆ ┆ i32 ┆ --- ┆ i32 ┆ i32 │\n",
- "│ ┆ ┆ date ┆ ┆ ┆ ┆ i32 ┆ ┆ │\n",
- "╞════════════╪═════════╪══════════════╪══════╪═══╪══════════╪══════════════╪══════════╪════════════╡\n",
- "└────────────┴─────────┴──────────────┴──────┴───┴──────────┴──────────────┴──────────┴────────────┘"
+ "OrderedDict([('PropertyID', Int64),\n",
+ " ('AssdTotalValue', Int64),\n",
+ " ('AssdYear', Int64),\n",
+ " ('MarketTotalValue', Int64),\n",
+ " ('MarketValueYear', Int64),\n",
+ " ('ApprTotalValue', Int64),\n",
+ " ('ApprYear', Int64),\n",
+ " ('TaxableYear', Int64)])"
]
},
- "execution_count": 12,
+ "execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "sales.filter(\n",
- " pl.col(\"PropIDFlag\") == 1\n",
- ")"
+ "valhist.schema"
]
},
{
"cell_type": "code",
- "execution_count": 12,
+ "execution_count": 24,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "merged = pl.read_parquet(path_to_dir+f\"dev/{county}/unified/merged.parquet\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 27,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"\n",
- "
shape: (44_004, 8)PropertyID | Year | Value | AssessmentUsed | SaleAmt | TaxAmt | TaxAmtAdjusted | ApproxTaxRate |
---|
i64 | i64 | i64 | str | i64 | i64 | f64 | f64 |
91847870 | 2018 | 117677 | "Assd" | 1580000 | 1310424 | 13104.24 | 11.13577 |
91849008 | 2018 | 247162 | "Assd" | 4400000 | 3114052 | 31140.52 | 12.599235 |
91848046 | 2018 | 71155 | "Assd" | 712775 | 773352 | 7733.52 | 10.868555 |
91846549 | 2018 | 149310 | "Assd" | 2475000 | 1527372 | 15273.72 | 10.229536 |
91847537 | 2018 | 128574 | "Assd" | 1625000 | 1496148 | 14961.48 | 11.636474 |
91848333 | 2018 | 356895 | "Assd" | 7400000 | 3958776 | 39587.76 | 11.092271 |
91846649 | 2018 | 219589 | "Assd" | 3100000 | 2518932 | 25189.32 | 11.471121 |
91847215 | 2018 | 299300 | "Assd" | 2825000 | 3327036 | 33270.36 | 11.116057 |
91846134 | 2022 | 718067 | "Assd" | 12500000 | 8808528 | 88085.28 | 12.267 |
91847289 | 2018 | 143322 | "Assd" | 2840000 | 1723200 | 17232.0 | 12.023276 |
91848235 | 2018 | 98891 | "Assd" | 840000 | 1074844 | 10748.44 | 10.868977 |
91847234 | 2018 | 97929 | "Assd" | 2000000 | 1138124 | 11381.24 | 11.62193 |
… | … | … | … | … | … | … | … |
91951526 | 2020 | 43461 | "Assd" | 280000 | null | null | null |
91950328 | 2020 | 791550 | "Assd" | 12000000 | null | null | null |
91951497 | 2023 | 388800 | "Assd" | 2613000 | 4860778 | 48607.78 | 12.502001 |
91951494 | 2023 | 423000 | "Assd" | 2321000 | 5288346 | 52883.46 | 12.502 |
91951499 | 2023 | 376200 | "Assd" | 2566000 | 4703252 | 47032.52 | 12.501999 |
91951505 | 2023 | 418950 | "Assd" | 1942000 | 5237713 | 52377.13 | 12.502 |
91951508 | 2023 | 418950 | "Assd" | 2795000 | 5237713 | 52377.13 | 12.502 |
91950049 | 2023 | 88208 | "Assd" | 580000 | 1102776 | 11027.76 | 12.501995 |
91950055 | 2023 | 90672 | "Assd" | 570000 | 1133581 | 11335.81 | 12.501996 |
91949993 | 2023 | 30010 | "Assd" | 550000 | 375185 | 3751.85 | 12.501999 |
91950231 | 2023 | 30130 | "Assd" | 4601351 | 376685 | 3766.85 | 12.501991 |
91951304 | 2023 | 36494 | "Assd" | 971562 | 704863 | 7048.63 | 19.31449 |
"
+ "shape: (5, 17)PropertyID | Year | Value | MarketTotalValue | ApprTotalValue | SitusLatitude | SitusLongitude | SitusFullStreetAddress | SitusCity | SitusState | SitusZIP5 | FIPS | SitusCensusTract | SitusCensusBlock | SaleAmt | TaxAmt | TaxAmtAdjusted |
---|
i64 | i64 | i64 | i64 | null | f64 | f64 | str | str | str | str | str | str | str | i64 | i64 | f64 |
91847870 | 2018 | 117677 | 261504 | null | 40.750965 | -73.982198 | "425 5TH AVE AP… | "NEW YORK" | "NY" | "10016" | "36061" | "008200" | "2004" | 1580000 | 1310424 | 13104.24 |
91849008 | 2018 | 247162 | 1462000 | null | 40.750447 | -73.997723 | "362 W 30TH ST" | "NEW YORK" | "NY" | "10001" | "36061" | "009700" | "4000" | 4400000 | 3114052 | 31140.52 |
91848046 | 2018 | 71155 | 158123 | null | 40.738643 | -73.987954 | "254 PARK AVE S… | "NEW YORK" | "NY" | "10010" | "36061" | "005200" | "1000" | 712775 | 773352 | 7733.52 |
91846549 | 2018 | 149310 | 331800 | null | 40.729262 | -74.004785 | "63 DOWNING ST … | "NEW YORK" | "NY" | "10014" | "36061" | "006700" | "2002" | 2475000 | 1527372 | 15273.72 |
91847537 | 2018 | 128574 | 285721 | null | 40.73889 | -73.987904 | "260 PARK AVE S… | "NEW YORK" | "NY" | "10010" | "36061" | "005200" | "1000" | 1625000 | 1496148 | 14961.48 |
"
],
"text/plain": [
- "shape: (44_004, 8)\n",
- "┌────────────┬──────┬────────┬────────────────┬─────────┬─────────┬────────────────┬───────────────┐\n",
- "│ PropertyID ┆ Year ┆ Value ┆ AssessmentUsed ┆ SaleAmt ┆ TaxAmt ┆ TaxAmtAdjusted ┆ ApproxTaxRate │\n",
- "│ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- ┆ --- │\n",
- "│ i64 ┆ i64 ┆ i64 ┆ str ┆ i64 ┆ i64 ┆ f64 ┆ f64 │\n",
- "╞════════════╪══════╪════════╪════════════════╪═════════╪═════════╪════════════════╪═══════════════╡\n",
- "│ 91847870 ┆ 2018 ┆ 117677 ┆ Assd ┆ 1580000 ┆ 1310424 ┆ 13104.24 ┆ 11.13577 │\n",
- "│ 91849008 ┆ 2018 ┆ 247162 ┆ Assd ┆ 4400000 ┆ 3114052 ┆ 31140.52 ┆ 12.599235 │\n",
- "│ 91848046 ┆ 2018 ┆ 71155 ┆ Assd ┆ 712775 ┆ 773352 ┆ 7733.52 ┆ 10.868555 │\n",
- "│ 91846549 ┆ 2018 ┆ 149310 ┆ Assd ┆ 2475000 ┆ 1527372 ┆ 15273.72 ┆ 10.229536 │\n",
- "│ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … ┆ … │\n",
- "│ 91950055 ┆ 2023 ┆ 90672 ┆ Assd ┆ 570000 ┆ 1133581 ┆ 11335.81 ┆ 12.501996 │\n",
- "│ 91949993 ┆ 2023 ┆ 30010 ┆ Assd ┆ 550000 ┆ 375185 ┆ 3751.85 ┆ 12.501999 │\n",
- "│ 91950231 ┆ 2023 ┆ 30130 ┆ Assd ┆ 4601351 ┆ 376685 ┆ 3766.85 ┆ 12.501991 │\n",
- "│ 91951304 ┆ 2023 ┆ 36494 ┆ Assd ┆ 971562 ┆ 704863 ┆ 7048.63 ┆ 19.31449 │\n",
- "└────────────┴──────┴────────┴────────────────┴─────────┴─────────┴────────────────┴───────────────┘"
+ "shape: (5, 17)\n",
+ "┌────────────┬──────┬────────┬───────────────┬───┬──────────────┬─────────┬─────────┬──────────────┐\n",
+ "│ PropertyID ┆ Year ┆ Value ┆ MarketTotalVa ┆ … ┆ SitusCensusB ┆ SaleAmt ┆ TaxAmt ┆ TaxAmtAdjust │\n",
+ "│ --- ┆ --- ┆ --- ┆ lue ┆ ┆ lock ┆ --- ┆ --- ┆ ed │\n",
+ "│ i64 ┆ i64 ┆ i64 ┆ --- ┆ ┆ --- ┆ i64 ┆ i64 ┆ --- │\n",
+ "│ ┆ ┆ ┆ i64 ┆ ┆ str ┆ ┆ ┆ f64 │\n",
+ "╞════════════╪══════╪════════╪═══════════════╪═══╪══════════════╪═════════╪═════════╪══════════════╡\n",
+ "│ 91847870 ┆ 2018 ┆ 117677 ┆ 261504 ┆ … ┆ 2004 ┆ 1580000 ┆ 1310424 ┆ 13104.24 │\n",
+ "│ 91849008 ┆ 2018 ┆ 247162 ┆ 1462000 ┆ … ┆ 4000 ┆ 4400000 ┆ 3114052 ┆ 31140.52 │\n",
+ "│ 91848046 ┆ 2018 ┆ 71155 ┆ 158123 ┆ … ┆ 1000 ┆ 712775 ┆ 773352 ┆ 7733.52 │\n",
+ "│ 91846549 ┆ 2018 ┆ 149310 ┆ 331800 ┆ … ┆ 2002 ┆ 2475000 ┆ 1527372 ┆ 15273.72 │\n",
+ "│ 91847537 ┆ 2018 ┆ 128574 ┆ 285721 ┆ … ┆ 1000 ┆ 1625000 ┆ 1496148 ┆ 14961.48 │\n",
+ "└────────────┴──────┴────────┴───────────────┴───┴──────────────┴─────────┴─────────┴──────────────┘"
]
},
- "execution_count": 12,
+ "execution_count": 27,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "merged.filter([\n",
- " pl.col(\"SaleAmt\").is_not_null(),\n",
- " pl.col(\"Year\") > 2017\n",
- "]).select(\n",
- " ['PropertyID','Year','Value','AssessmentUsed','SaleAmt', 'TaxAmt', 'TaxAmtAdjusted']\n",
- ").with_columns([\n",
- " (pl.col('TaxAmt')/pl.col('Value')).alias(\"ApproxTaxRate\")\n",
- "])"
+ "merged.head()"
]
}
],