From d0161b13f7b6216eddd221e31c61c66feaf7af04 Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Fri, 15 Nov 2024 12:17:24 +0000 Subject: [PATCH] updates 2024-11-15 - flood data errors --- ...ytics_platform_ea_flood_public_forecast.py | 68 +++++++++++-------- 1 file changed, 38 insertions(+), 30 deletions(-) diff --git a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py index a32e7a6..a8e7c26 100644 --- a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py +++ b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py @@ -54,42 +54,50 @@ def ea_flood_public_forecast_bronze(context: AssetExecutionContext): @with_slack_notification("Environment Agency Public Flood Forecast Data") def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_public_forecast_bronze): """ - EA Public Forecast flooding data silver bucket + EA Public Forecast flooding data silver bucket - flattens nested JSON structure """ + # Extract the statement data + statement = ea_flood_public_forecast_bronze['statement'] - data = ea_flood_public_forecast_bronze.to_dict() - print(data) - - # Load data into model - flood_risk_data = FloodRiskData.model_validate(data) - - # Create flattened dictionary structure + # Create base output dictionary with top-level fields output = { - "area_of_concern": flood_risk_data.statement.area_of_concern_url, - "headline": flood_risk_data.statement.headline, - "pdf_report": flood_risk_data.statement.pdf_url, - "issued_at": flood_risk_data.statement.issued_at, - "next_issue_due": flood_risk_data.statement.next_issue_due_at, - "flood_risk_day1": flood_risk_data.statement.flood_risk_trend.day1, - "flood_risk_day2": flood_risk_data.statement.flood_risk_trend.day2, - "flood_risk_day3": flood_risk_data.statement.flood_risk_trend.day3, - "flood_risk_day4": flood_risk_data.statement.flood_risk_trend.day4, - "flood_risk_day5": flood_risk_data.statement.flood_risk_trend.day5, - "england_forecast": flood_risk_data.statement.public_forecast.england_forecast, - "wales_forecast_english": flood_risk_data.statement.public_forecast.wales_forecast_english, - "risks_areas": flood_risk_data.statement.risk_areas + "area_of_concern_url": statement.get('area_of_concern_url'), + "headline": statement.get('headline'), + "pdf_url": statement.get('pdf_url'), + "issued_at": statement.get('issued_at'), + "next_issue_due_at": statement.get('next_issue_due_at'), } + # Add flood risk trend data + risk_trend = statement.get('flood_risk_trend', {}) + output.update({ + "flood_risk_day1": risk_trend.get('day1'), + "flood_risk_day2": risk_trend.get('day2'), + "flood_risk_day3": risk_trend.get('day3'), + "flood_risk_day4": risk_trend.get('day4'), + "flood_risk_day5": risk_trend.get('day5') + }) + + # Add public forecast data + public_forecast = statement.get('public_forecast', {}) + output.update({ + "england_forecast": public_forecast.get('england_forecast'), + "wales_forecast_english": public_forecast.get('wales_forecast_english') + }) + # Add source data - for source in flood_risk_data.statement.sources: - if source.coastal: - output["coastal_risk"] = source.coastal - if source.surface: - output["surface_risk"] = source.surface - if source.river: - output["river_risk"] = source.river - if source.ground: - output["ground_risk"] = source.ground + for source in statement.get('sources', []): + if 'coastal' in source: + output['coastal_risk'] = source['coastal'] + if 'surface' in source: + output['surface_risk'] = source['surface'] + if 'river' in source: + output['river_risk'] = source['river'] + if 'ground' in source: + output['ground_risk'] = source['ground'] + + # Add risk areas + output['risk_areas'] = statement.get('risk_areas') # Convert to DataFrame df = pl.DataFrame([output])