Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HDXDSYS-1796 Update HAPI Pipelines poverty rate to read from HAPI dataset #230

Merged
merged 2 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [0.10.42] = 2025-02-24

### Changed

- Poverty rate reads from HAPI dataset

## [0.10.41] = 2025-02-20

### Changed
Expand Down
8 changes: 4 additions & 4 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ hdx-python-utilities==3.8.4
# hdx-python-scraper
humanize==4.12.1
# via frictionless
identify==2.6.7
identify==2.6.8
# via pre-commit
idna==3.10
# via
Expand Down Expand Up @@ -146,9 +146,9 @@ pockets==0.9.1
# via sphinxcontrib-napoleon
pre-commit==4.1.0
# via hapi-pipelines (pyproject.toml)
psycopg==3.2.4
psycopg==3.2.5
# via hdx-python-database
psycopg-binary==3.2.4
psycopg-binary==3.2.5
# via psycopg
pyasn1==0.6.1
# via
Expand Down Expand Up @@ -218,7 +218,7 @@ rfc3986==2.0.0
# via frictionless
rich==13.9.4
# via typer
rpds-py==0.23.0
rpds-py==0.23.1
# via
# jsonschema
# referencing
Expand Down
2 changes: 1 addition & 1 deletion src/hapi/pipelines/database/conflict_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ def populate(self) -> None:
self.hapi_populate(
"conflict-event",
DBConflictEvent,
end_resource=29,
end_resource=None,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this

)
52 changes: 35 additions & 17 deletions src/hapi/pipelines/database/hapi_dataset_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def hapi_populate(
name_suffix: str,
hapi_table: Type[Base],
end_resource: Optional[int] = 1,
max_admin_level: int = 2,
):
log_name = name_suffix.replace("-", " ")
pipeline = []
Expand All @@ -60,7 +61,7 @@ def hapi_populate(
hxltag_to_header = invert_dictionary(next(rows))

for row in rows:
if row["error"]:
if row.get("error"):
continue
resource_id = row["resource_hdx_id"]
if resource_id in resources_to_ignore:
Expand All @@ -71,21 +72,8 @@ def hapi_populate(
output_str = dataset_name
else:
output_str = dataset_id
admin_level = self._admins.get_admin_level_from_row(
hxltag_to_header, row, 2
)
admin2_ref = self._admins.get_admin2_ref_from_row(
hxltag_to_header,
row,
output_str,
pipeline,
admin_level,
)

countryiso3 = row["location_code"]
provider_admin1_name = row["provider_admin1_name"] or ""
provider_admin2_name = row["provider_admin2_name"] or ""

resource_name = self._metadata.get_resource_name(resource_id)
if not resource_name:
dataset = reader.read_dataset(
Expand All @@ -108,18 +96,48 @@ def hapi_populate(
resources_to_ignore.append(resource_id)
continue

admin_level = self._admins.get_admin_level_from_row(
hxltag_to_header, row, max_admin_level
)
output_row = {
"resource_hdx_id": resource_id,
"admin2_ref": admin2_ref,
"provider_admin1_name": provider_admin1_name,
"provider_admin2_name": provider_admin2_name,
"reference_period_start": parse_date(
row["reference_period_start"]
),
"reference_period_end": parse_date(
row["reference_period_end"], max_time=True
),
}
if max_admin_level == 2:
admin2_ref = self._admins.get_admin2_ref_from_row(
hxltag_to_header,
row,
output_str,
pipeline,
admin_level,
)
output_row["admin2_ref"] = admin2_ref
output_row["provider_admin1_name"] = (
row["provider_admin1_name"] or ""
)
output_row["provider_admin2_name"] = (
row["provider_admin2_name"] or ""
)
elif max_admin_level == 1:
admin1_ref = self._admins.get_admin1_ref_from_row(
hxltag_to_header,
row,
output_str,
pipeline,
admin_level,
)
output_row["admin1_ref"] = admin1_ref
output_row["provider_admin1_name"] = (
row["provider_admin1_name"] or ""
)
else:
output_row["location_ref"] = countryiso3

self.populate_row(output_row, row)
output_rows.append(output_row)
logger.info(f"Writing to {log_name} table")
Expand Down
126 changes: 12 additions & 114 deletions src/hapi/pipelines/database/poverty_rate.py
Original file line number Diff line number Diff line change
@@ -1,122 +1,20 @@
"""Functions specific to the poverty rate theme."""

from logging import getLogger
from typing import Dict

from hapi_schema.db_poverty_rate import DBPovertyRate
from hdx.api.configuration import Configuration
from hdx.api.utilities.hdx_error_handler import HDXErrorHandler
from hdx.scraper.framework.utilities.reader import Read
from hdx.utilities.dateparse import parse_date
from hdx.utilities.dictandlist import dict_of_lists_add, invert_dictionary
from hdx.utilities.text import get_numeric_if_possible
from sqlalchemy.orm import Session

from ..utilities.provider_admin_names import get_provider_name
from . import admins
from .base_uploader import BaseUploader
from .metadata import Metadata

logger = getLogger(__name__)
from hapi.pipelines.database.hapi_dataset_uploader import HapiDatasetUploader


class PovertyRate(BaseUploader):
def __init__(
self,
session: Session,
metadata: Metadata,
admins: admins.Admins,
configuration: Configuration,
error_handler: HDXErrorHandler,
):
super().__init__(session)
self._metadata = metadata
self._admins = admins
self._configuration = configuration
self._error_handler = error_handler
class PovertyRate(HapiDatasetUploader):
def populate_row(self, output_row: Dict, row: Dict) -> None:
output_row["mpi"] = row["mpi"]
output_row["headcount_ratio"] = row["headcount_ratio"]
# TODO: Remove 0.0 fallback once schema is updated
output_row["intensity_of_deprivation"] = (
row["intensity_of_deprivation"] or 0.0
)
output_row["vulnerable_to_poverty"] = row["vulnerable_to_poverty"]
output_row["in_severe_poverty"] = row["in_severe_poverty"]

def populate(self) -> None:
logger.info("Populating poverty rate table")
reader = Read.get_reader("hdx")
dataset = reader.read_dataset("global-mpi", self._configuration)
self._metadata.add_dataset(dataset)
dataset_id = dataset["id"]
dataset_name = dataset["name"]
null_values_by_iso3 = {}

def get_value(row: Dict, in_col: str) -> float:
countryiso3 = row["Country ISO3"]
value = row[in_col]
admin_name = row["Admin 1 Name"]
if not admin_name:
admin_name = countryiso3
if value is None:
dict_of_lists_add(null_values_by_iso3, countryiso3, admin_name)
return 0.0
return get_numeric_if_possible(value)

output_rows = {}
for resource in list(reversed(dataset.get_resources()))[-2:]:
resource_id = resource["id"]
self._metadata.add_resource(dataset_id, resource)
url = resource["url"]
header, rows = reader.get_tabular_rows(url, dict_form=True)
hxltag_to_header = invert_dictionary(next(rows))
for row in rows:
admin_level = self._admins.get_admin_level_from_row(
hxltag_to_header, row, 1
)
admin1_ref = self._admins.get_admin1_ref_from_row(
hxltag_to_header,
row,
dataset_name,
"PovertyRate",
admin_level,
)
if not admin1_ref:
continue
provider_admin1_name = get_provider_name(row, "Admin 1 Name")
reference_period_start = parse_date(row["Start Date"])
reference_period_end = parse_date(row["End Date"])
key = (
admin1_ref,
provider_admin1_name,
reference_period_start,
reference_period_end,
)
existing_resource_name = output_rows.get(key)
if existing_resource_name:
if existing_resource_name != resource["name"]:
continue
else:
raise ValueError(
f"Duplicate row in resource {existing_resource_name} with key {key}!"
)
else:
output_rows[key] = resource["name"]
row = DBPovertyRate(
resource_hdx_id=resource_id,
admin1_ref=admin1_ref,
provider_admin1_name=provider_admin1_name,
reference_period_start=reference_period_start,
reference_period_end=reference_period_end,
mpi=get_value(row, "MPI"),
headcount_ratio=get_value(row, "Headcount Ratio"),
intensity_of_deprivation=get_value(
row, "Intensity of Deprivation"
),
vulnerable_to_poverty=get_value(
row, "Vulnerable to Poverty"
),
in_severe_poverty=get_value(row, "In Severe Poverty"),
)
self._session.add(row)
self._session.commit()

for countryiso3, values in null_values_by_iso3.items():
self._error_handler.add_multi_valued_message(
"PovertyRate",
dataset_name,
f"null values set to 0.0 in {countryiso3}",
values,
)
self.hapi_populate("poverty-rate", DBPovertyRate, max_admin_level=1)

Large diffs are not rendered by default.

Loading