From 1acd6d72f657a802fc502813b0b202e29eebbfce Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Fri, 15 Nov 2024 12:38:41 +0000 Subject: [PATCH] updates 2024-11-15 - flood data errors --- ...ytics_platform_ea_flood_public_forecast.py | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py index 54b6d38..0267540 100644 --- a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py +++ b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py @@ -56,37 +56,47 @@ def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_pub """ EA Public Forecast flooding data silver bucket - flattens nested JSON structure """ - # Extract the statement data - statement = ea_flood_public_forecast_bronze['statement'].to_dict() + # Debug print to see what we're working with + context.log.info("Input type:", type(ea_flood_public_forecast_bronze)) + context.log.info("Input data:", ea_flood_public_forecast_bronze) + + # For Polars DataFrame, extract the statement column + if isinstance(ea_flood_public_forecast_bronze, pl.DataFrame): + # Get the first row since we expect only one record + statement = ea_flood_public_forecast_bronze.select('statement').item() + context.log.info("Statement data:", statement) + else: + statement = ea_flood_public_forecast_bronze['statement'] + context.log.info("Statement data:", statement) # Create base output dictionary with top-level fields output = { - "area_of_concern_url": statement.get('area_of_concern_url'), - "headline": statement.get('headline'), - "pdf_url": statement.get('pdf_url'), - "issued_at": statement.get('issued_at'), - "next_issue_due_at": statement.get('next_issue_due_at'), + "area_of_concern_url": statement['area_of_concern_url'], + "headline": statement['headline'], + "pdf_url": statement['pdf_url'], + "issued_at": statement['issued_at'], + "next_issue_due_at": statement['next_issue_due_at'], } # Add flood risk trend data - risk_trend = statement.get('flood_risk_trend', {}) + risk_trend = statement['flood_risk_trend'] output.update({ - "flood_risk_day1": risk_trend.get('day1'), - "flood_risk_day2": risk_trend.get('day2'), - "flood_risk_day3": risk_trend.get('day3'), - "flood_risk_day4": risk_trend.get('day4'), - "flood_risk_day5": risk_trend.get('day5') + "flood_risk_day1": risk_trend['day1'], + "flood_risk_day2": risk_trend['day2'], + "flood_risk_day3": risk_trend['day3'], + "flood_risk_day4": risk_trend['day4'], + "flood_risk_day5": risk_trend['day5'] }) # Add public forecast data - public_forecast = statement.get('public_forecast', {}) + public_forecast = statement['public_forecast'] output.update({ - "england_forecast": public_forecast.get('england_forecast'), - "wales_forecast_english": public_forecast.get('wales_forecast_english') + "england_forecast": public_forecast['england_forecast'], + "wales_forecast_english": public_forecast['wales_forecast_english'] }) # Add source data - for source in statement.get('sources', []): + for source in statement['sources']: if 'coastal' in source: output['coastal_risk'] = source['coastal'] if 'surface' in source: @@ -97,7 +107,7 @@ def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_pub output['ground_risk'] = source['ground'] # Add risk areas - output['risk_areas'] = statement.get('risk_areas') + output['risk_areas'] = statement['risk_areas'] # Convert to DataFrame df = pl.DataFrame([output])