Skip to content

Commit

Permalink
Read from hapi dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
b-j-mills committed Feb 20, 2025
1 parent ab29b24 commit 43bb1f1
Show file tree
Hide file tree
Showing 9 changed files with 80,065 additions and 209 deletions.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ attrs==25.1.0
# jsonlines
# jsonschema
# referencing
cachetools==5.5.1
cachetools==5.5.2
# via google-auth
certifi==2025.1.31
# via requests
Expand Down Expand Up @@ -218,7 +218,7 @@ rfc3986==2.0.0
# via frictionless
rich==13.9.4
# via typer
rpds-py==0.22.3
rpds-py==0.23.0
# via
# jsonschema
# referencing
Expand Down
1 change: 0 additions & 1 deletion src/hapi/pipelines/app/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ def main(
project_configs = [
"core.yaml",
"food_security.yaml",
"idps.yaml",
"national_risk.yaml",
"refugees_and_returnees.yaml",
"wfp.yaml",
Expand Down
12 changes: 1 addition & 11 deletions src/hapi/pipelines/app/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,6 @@ def _create_configurable_scrapers(

_create_configurable_scrapers("national_risk", "national")
_create_configurable_scrapers("refugees_and_returnees", "national")
_create_configurable_scrapers("idps", "national")
_create_configurable_scrapers(
"idps", "adminone", adminlevel=self._adminone
)
_create_configurable_scrapers(
"idps", "admintwo", adminlevel=self._admintwo
)

def run(self):
self._runner.run()
Expand Down Expand Up @@ -272,14 +265,11 @@ def output_refugees_and_returnees(self):

def output_idps(self):
if not self._themes_to_run or "idps" in self._themes_to_run:
results = self._runner.get_hapi_results(
self._configurable_scrapers["idps"]
)
idps = IDPs(
session=self._session,
metadata=self._metadata,
admins=self._admins,
results=results,
configuration=self._configuration,
error_handler=self._error_handler,
)
idps.populate()
Expand Down
86 changes: 0 additions & 86 deletions src/hapi/pipelines/configs/idps.yaml

This file was deleted.

118 changes: 11 additions & 107 deletions src/hapi/pipelines/database/idps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,117 +4,21 @@
from typing import Dict

from hapi_schema.db_idps import DBIDPs
from hdx.api.utilities.hdx_error_handler import HDXErrorHandler
from hdx.utilities.dateparse import parse_date
from sqlalchemy.orm import Session

from ..utilities.provider_admin_names import get_provider_name
from . import admins
from .base_uploader import BaseUploader
from .metadata import Metadata
from .hapi_dataset_uploader import HapiDatasetUploader

logger = getLogger(__name__)


class IDPs(BaseUploader):
def __init__(
self,
session: Session,
metadata: Metadata,
admins: admins.Admins,
results: Dict,
error_handler: HDXErrorHandler,
):
super().__init__(session)
self._metadata = metadata
self._admins = admins
self._results = results
self._error_handler = error_handler
class IDPs(HapiDatasetUploader):
def populate_row(self, output_row: Dict, row: Dict) -> None:
output_row["assessment_type"] = row["assessment_type"]
output_row["reporting_round"] = row["reporting_round"]
output_row["operation"] = row["operation"]
output_row["population"] = row["population"]

def populate(self) -> None:
# TODO: This might be better suited to just work with the DTM resource
# directly as done with HNO, rather than using a configurable scraper
logger.info("Populating IDPs table")
# self._results is a dictionary where the keys are the HDX dataset ID and the
# values are a dictionary with keys containing HDX metadata plus a "results" key
# containing the results, stored in a dictionary with admin levels as keys.
# There is only one dataset for now in the results dictionary, take first value
# (popitem returns a tuple with (key, value) so take the value)
dataset = self._results.popitem()[1]
dataset_name = dataset["hdx_stub"]
for admin_level, admin_results in dataset["results"].items():
# admin_results contains the keys "headers", "values", and "hapi_resource_metadata".
# admin_results["values"] is a list of dictionaries of the format:
# [{AFG: [1, 2], BFA: [3, 4]}, {AFG: [A, B], BFA: [C, D]} etc
# So the way to get info from it is values[i_hdx_key][pcode][i] where
# i is just an iterator for the number of rows for that particular p-code
resource_id = admin_results["hapi_resource_metadata"]["hdx_id"]
hxl_tags = admin_results["headers"][1]
values = admin_results["values"]
admin_codes = values[0].keys()
for admin_code in admin_codes:
admin2_code = admins.get_admin2_code_based_on_level(
admin_code=admin_code, admin_level=admin_level
)
duplicate_rows = set()
for row in zip(
*[
values[hxl_tags.index(tag)][admin_code]
for tag in hxl_tags
]
):
# Keeping these defined outside of the row for now
# as we may need to check for duplicates in the future
admin2_ref = self._admins.admin2_data[admin2_code]
assessment_type = row[hxl_tags.index("#assessment+type")]
date_reported = row[hxl_tags.index("#date+reported")]
reporting_round = row[hxl_tags.index("#round+code")]
operation = row[hxl_tags.index("#operation+name")]
# Ignore rows with no reporting date since it is part of
# the primary key of DBIDPs
if date_reported is None:
text = (
f"No reportingDate for admin code {admin_code}, assessment type {assessment_type}, "
f"reporting round {reporting_round}, operation {operation}"
)
self._error_handler.add_message(
"IDPs", dataset_name, text
)
continue
duplicate_row_check = (
admin2_ref,
assessment_type,
date_reported,
reporting_round,
operation,
)
if duplicate_row_check in duplicate_rows:
text = (
f"Duplicate row for admin code {admin_code}, assessment type {assessment_type}, "
f"reporting round {reporting_round}, operation {operation}"
)
self._error_handler.add_message(
"IDPs", dataset_name, text
)
continue
provider_admin1_name = get_provider_name(
row, "#adm1+name", hxl_tags
)
provider_admin2_name = get_provider_name(
row, "#adm2+name", hxl_tags
)
idps_row = DBIDPs(
resource_hdx_id=resource_id,
admin2_ref=admin2_ref,
provider_admin1_name=provider_admin1_name,
provider_admin2_name=provider_admin2_name,
assessment_type=assessment_type,
reporting_round=reporting_round,
operation=operation,
population=row[hxl_tags.index("#affected+idps")],
reference_period_start=parse_date(date_reported),
reference_period_end=parse_date(date_reported),
)
self._session.add(idps_row)
duplicate_rows.add(duplicate_row_check)
self._session.commit()
self.hapi_populate(
"idps",
DBIDPs,
)

Large diffs are not rendered by default.

Loading

0 comments on commit 43bb1f1

Please sign in to comment.