Skip to content

Commit

Permalink
Use hapi dataset uploader
Browse files Browse the repository at this point in the history
  • Loading branch information
b-j-mills committed Feb 20, 2025
1 parent 923c82d commit 12353e2
Showing 1 changed file with 10 additions and 82 deletions.
92 changes: 10 additions & 82 deletions src/hapi/pipelines/database/conflict_event.py
Original file line number Diff line number Diff line change
@@ -1,95 +1,23 @@
"""Functions specific to the conflict event theme."""

from logging import getLogger
from typing import Dict

from hapi_schema.db_conflict_event import DBConflictEvent
from hdx.api.configuration import Configuration
from hdx.api.utilities.hdx_error_handler import HDXErrorHandler
from hdx.scraper.framework.utilities.reader import Read
from hdx.utilities.dateparse import parse_date
from hdx.utilities.dictandlist import invert_dictionary
from sqlalchemy.orm import Session

from ..utilities.batch_populate import batch_populate
from . import admins
from .base_uploader import BaseUploader
from .metadata import Metadata
from .hapi_dataset_uploader import HapiDatasetUploader

logger = getLogger(__name__)


class ConflictEvent(BaseUploader):
def __init__(
self,
session: Session,
metadata: Metadata,
admins: admins.Admins,
configuration: Configuration,
error_handler: HDXErrorHandler,
):
super().__init__(session)
self._metadata = metadata
self._admins = admins
self._configuration = configuration
self._error_handler = error_handler
class ConflictEvent(HapiDatasetUploader):
def populate_row(self, output_row: Dict, row: Dict) -> None:
output_row["event_type"] = row["event_type"]
output_row["events"] = row["events"]
output_row["fatalities"] = row["fatalities"]

def populate(self) -> None:
logger.info("Populating conflict event table")
reader = Read.get_reader("hdx")
dataset = reader.read_dataset(
"hdx-hapi-conflict-event", self._configuration
self.hapi_populate(
"conflict-event",
DBConflictEvent,
)
resources = dataset.get_resources()
for resource in resources:
url = resource["url"]
headers, rows = reader.get_tabular_rows(url, dict_form=True)
hxltag_to_header = invert_dictionary(next(rows))
conflict_event_rows = []

for row in rows:
if row["error"]:
continue
resource_id = row["resource_hdx_id"]
dataset_id = row["dataset_hdx_id"]
dataset_name = self._metadata.get_dataset_name(dataset_id)
resource_name = self._metadata.get_resource_name(resource_id)
if not resource_name:
dataset = reader.read_dataset(
dataset_id, self._configuration
)
for r in dataset.get_resources():
if r["id"] == resource_id:
self._metadata.add_dataset(dataset)
self._metadata.add_resource(dataset_id, r)

admin_level = self._admins.get_admin_level_from_row(
hxltag_to_header, row, 2
)
admin2_ref = self._admins.get_admin2_ref_from_row(
hxltag_to_header,
row,
dataset_name,
"ConflictEvent",
admin_level,
)
provider_admin1_name = row["provider_admin1_name"] or ""
provider_admin2_name = row["provider_admin2_name"] or ""

conflict_event_row = {
"resource_hdx_id": resource_id,
"admin2_ref": admin2_ref,
"provider_admin1_name": provider_admin1_name,
"provider_admin2_name": provider_admin2_name,
"event_type": row["event_type"],
"fatalities": row["fatalities"],
"reference_period_start": parse_date(
row["reference_period_start"]
),
"reference_period_end": parse_date(
row["reference_period_end"],
max_time=True,
),
}
conflict_event_rows.append(conflict_event_row)

batch_populate(conflict_event_rows, self._session, DBConflictEvent)

0 comments on commit 12353e2

Please sign in to comment.