-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuild_dictionary_for_term.py
68 lines (53 loc) · 2.22 KB
/
build_dictionary_for_term.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
import sqlite3
from typing import Any
import pandas as pd
from utils.env_variables import TERMS_DICTIONARY_TABLE, MERGED_STREET_DATA_TABLE, MPI_TAGS
def term_exists(country: str, term: str, conn: sqlite3.Connection) -> bool:
try:
result = conn.execute(f"SELECT * FROM {country}_{TERMS_DICTIONARY_TABLE} WHERE term = ? LIMIT 1",
(term,)).fetchone()
return result is not None
except sqlite3.OperationalError:
return False
def match_term(term: str, conn: sqlite3.Connection):
# Decided to not remove the country being processed
return pd.read_sql(f"SELECT * FROM {MERGED_STREET_DATA_TABLE} WHERE term = ?", conn,
params=[term])
def write_df_to_sql(
country: str,
df: pd.DataFrame,
conn: sqlite3.Connection,
mpi_comm: Any = None) -> None:
if df is None:
return
if mpi_comm is None:
df.to_sql(
f"{country}_{TERMS_DICTIONARY_TABLE}",
conn,
if_exists='append',
index=False)
else:
mpi_comm.send(df, dest=0, tag=MPI_TAGS.DONE)
def build_dictionary_for_term(
country: str, term: str, conn: sqlite3.Connection, mpi_comm=None) -> None:
# Skip if term already exists
if term_exists(country, term, conn):
return
# Query sql
exact_match = match_term(term, conn)
number_matches = len(exact_match)
# TODO: If term not found
# this is impossible because of the lookup of the country to oneself
if number_matches == 0:
return
if number_matches == 1:
exact_match = exact_match.assign(likelihood=[100])
# If more than one exact match, work out countries to retain
if number_matches > 1:
likelihoods = exact_match["frequency"] * 100 / exact_match["frequency"].sum()
exact_match = exact_match.assign(likelihood=likelihoods)
# Write all exact matches to sqlite
# exact_match = exact_match.sort_values(by=["likelihood"], ascending=False).reset_index(drop=True)
# threshold_index = exact_match['likelihood'].cumsum().gt(threshold).idxmin() + 1
# exact_match = exact_match.loc[:threshold_index]
write_df_to_sql(country, exact_match, conn, mpi_comm)