Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDXDSYS-1312 hapi idps #228

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ attrs==25.1.0
# jsonlines
# jsonschema
# referencing
cachetools==5.5.1
cachetools==5.5.2
# via google-auth
certifi==2025.1.31
# via requests
Expand Down Expand Up @@ -218,7 +218,7 @@ rfc3986==2.0.0
# via frictionless
rich==13.9.4
# via typer
rpds-py==0.22.3
rpds-py==0.23.0
# via
# jsonschema
# referencing
Expand Down
1 change: 0 additions & 1 deletion src/hapi/pipelines/app/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ def main(
project_configs = [
"core.yaml",
"food_security.yaml",
"idps.yaml",
"national_risk.yaml",
"refugees_and_returnees.yaml",
"wfp.yaml",
Expand Down
12 changes: 1 addition & 11 deletions src/hapi/pipelines/app/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,6 @@ def _create_configurable_scrapers(

_create_configurable_scrapers("national_risk", "national")
_create_configurable_scrapers("refugees_and_returnees", "national")
_create_configurable_scrapers("idps", "national")
_create_configurable_scrapers(
"idps", "adminone", adminlevel=self._adminone
)
_create_configurable_scrapers(
"idps", "admintwo", adminlevel=self._admintwo
)

def run(self):
self._runner.run()
Expand Down Expand Up @@ -272,14 +265,11 @@ def output_refugees_and_returnees(self):

def output_idps(self):
if not self._themes_to_run or "idps" in self._themes_to_run:
results = self._runner.get_hapi_results(
self._configurable_scrapers["idps"]
)
idps = IDPs(
session=self._session,
metadata=self._metadata,
admins=self._admins,
results=results,
configuration=self._configuration,
error_handler=self._error_handler,
)
idps.populate()
Expand Down
86 changes: 0 additions & 86 deletions src/hapi/pipelines/configs/idps.yaml

This file was deleted.

118 changes: 11 additions & 107 deletions src/hapi/pipelines/database/idps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,117 +4,21 @@
from typing import Dict

from hapi_schema.db_idps import DBIDPs
from hdx.api.utilities.hdx_error_handler import HDXErrorHandler
from hdx.utilities.dateparse import parse_date
from sqlalchemy.orm import Session

from ..utilities.provider_admin_names import get_provider_name
from . import admins
from .base_uploader import BaseUploader
from .metadata import Metadata
from .hapi_dataset_uploader import HapiDatasetUploader

logger = getLogger(__name__)


class IDPs(BaseUploader):
def __init__(
self,
session: Session,
metadata: Metadata,
admins: admins.Admins,
results: Dict,
error_handler: HDXErrorHandler,
):
super().__init__(session)
self._metadata = metadata
self._admins = admins
self._results = results
self._error_handler = error_handler
class IDPs(HapiDatasetUploader):
def populate_row(self, output_row: Dict, row: Dict) -> None:
output_row["assessment_type"] = row["assessment_type"]
output_row["reporting_round"] = row["reporting_round"]
output_row["operation"] = row["operation"]
output_row["population"] = row["population"]

def populate(self) -> None:
# TODO: This might be better suited to just work with the DTM resource
# directly as done with HNO, rather than using a configurable scraper
logger.info("Populating IDPs table")
# self._results is a dictionary where the keys are the HDX dataset ID and the
# values are a dictionary with keys containing HDX metadata plus a "results" key
# containing the results, stored in a dictionary with admin levels as keys.
# There is only one dataset for now in the results dictionary, take first value
# (popitem returns a tuple with (key, value) so take the value)
dataset = self._results.popitem()[1]
dataset_name = dataset["hdx_stub"]
for admin_level, admin_results in dataset["results"].items():
# admin_results contains the keys "headers", "values", and "hapi_resource_metadata".
# admin_results["values"] is a list of dictionaries of the format:
# [{AFG: [1, 2], BFA: [3, 4]}, {AFG: [A, B], BFA: [C, D]} etc
# So the way to get info from it is values[i_hdx_key][pcode][i] where
# i is just an iterator for the number of rows for that particular p-code
resource_id = admin_results["hapi_resource_metadata"]["hdx_id"]
hxl_tags = admin_results["headers"][1]
values = admin_results["values"]
admin_codes = values[0].keys()
for admin_code in admin_codes:
admin2_code = admins.get_admin2_code_based_on_level(
admin_code=admin_code, admin_level=admin_level
)
duplicate_rows = set()
for row in zip(
*[
values[hxl_tags.index(tag)][admin_code]
for tag in hxl_tags
]
):
# Keeping these defined outside of the row for now
# as we may need to check for duplicates in the future
admin2_ref = self._admins.admin2_data[admin2_code]
assessment_type = row[hxl_tags.index("#assessment+type")]
date_reported = row[hxl_tags.index("#date+reported")]
reporting_round = row[hxl_tags.index("#round+code")]
operation = row[hxl_tags.index("#operation+name")]
# Ignore rows with no reporting date since it is part of
# the primary key of DBIDPs
if date_reported is None:
text = (
f"No reportingDate for admin code {admin_code}, assessment type {assessment_type}, "
f"reporting round {reporting_round}, operation {operation}"
)
self._error_handler.add_message(
"IDPs", dataset_name, text
)
continue
duplicate_row_check = (
admin2_ref,
assessment_type,
date_reported,
reporting_round,
operation,
)
if duplicate_row_check in duplicate_rows:
text = (
f"Duplicate row for admin code {admin_code}, assessment type {assessment_type}, "
f"reporting round {reporting_round}, operation {operation}"
)
self._error_handler.add_message(
"IDPs", dataset_name, text
)
continue
provider_admin1_name = get_provider_name(
row, "#adm1+name", hxl_tags
)
provider_admin2_name = get_provider_name(
row, "#adm2+name", hxl_tags
)
idps_row = DBIDPs(
resource_hdx_id=resource_id,
admin2_ref=admin2_ref,
provider_admin1_name=provider_admin1_name,
provider_admin2_name=provider_admin2_name,
assessment_type=assessment_type,
reporting_round=reporting_round,
operation=operation,
population=row[hxl_tags.index("#affected+idps")],
reference_period_start=parse_date(date_reported),
reference_period_end=parse_date(date_reported),
)
self._session.add(idps_row)
duplicate_rows.add(duplicate_row_check)
self._session.commit()
self.hapi_populate(
"idps",
DBIDPs,
)

Large diffs are not rendered by default.

Loading