Skip to content

Commit

Permalink
updates 2024-11-15 - flood data errors
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Nov 15, 2024
1 parent 703b0e3 commit 1acd6d7
Showing 1 changed file with 28 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,47 @@ def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_pub
"""
EA Public Forecast flooding data silver bucket - flattens nested JSON structure
"""
# Extract the statement data
statement = ea_flood_public_forecast_bronze['statement'].to_dict()
# Debug print to see what we're working with
context.log.info("Input type:", type(ea_flood_public_forecast_bronze))
context.log.info("Input data:", ea_flood_public_forecast_bronze)

# For Polars DataFrame, extract the statement column
if isinstance(ea_flood_public_forecast_bronze, pl.DataFrame):
# Get the first row since we expect only one record
statement = ea_flood_public_forecast_bronze.select('statement').item()
context.log.info("Statement data:", statement)
else:
statement = ea_flood_public_forecast_bronze['statement']
context.log.info("Statement data:", statement)

# Create base output dictionary with top-level fields
output = {
"area_of_concern_url": statement.get('area_of_concern_url'),
"headline": statement.get('headline'),
"pdf_url": statement.get('pdf_url'),
"issued_at": statement.get('issued_at'),
"next_issue_due_at": statement.get('next_issue_due_at'),
"area_of_concern_url": statement['area_of_concern_url'],
"headline": statement['headline'],
"pdf_url": statement['pdf_url'],
"issued_at": statement['issued_at'],
"next_issue_due_at": statement['next_issue_due_at'],
}

# Add flood risk trend data
risk_trend = statement.get('flood_risk_trend', {})
risk_trend = statement['flood_risk_trend']
output.update({
"flood_risk_day1": risk_trend.get('day1'),
"flood_risk_day2": risk_trend.get('day2'),
"flood_risk_day3": risk_trend.get('day3'),
"flood_risk_day4": risk_trend.get('day4'),
"flood_risk_day5": risk_trend.get('day5')
"flood_risk_day1": risk_trend['day1'],
"flood_risk_day2": risk_trend['day2'],
"flood_risk_day3": risk_trend['day3'],
"flood_risk_day4": risk_trend['day4'],
"flood_risk_day5": risk_trend['day5']
})

# Add public forecast data
public_forecast = statement.get('public_forecast', {})
public_forecast = statement['public_forecast']
output.update({
"england_forecast": public_forecast.get('england_forecast'),
"wales_forecast_english": public_forecast.get('wales_forecast_english')
"england_forecast": public_forecast['england_forecast'],
"wales_forecast_english": public_forecast['wales_forecast_english']
})

# Add source data
for source in statement.get('sources', []):
for source in statement['sources']:
if 'coastal' in source:
output['coastal_risk'] = source['coastal']
if 'surface' in source:
Expand All @@ -97,7 +107,7 @@ def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_pub
output['ground_risk'] = source['ground']

# Add risk areas
output['risk_areas'] = statement.get('risk_areas')
output['risk_areas'] = statement['risk_areas']

# Convert to DataFrame
df = pl.DataFrame([output])
Expand Down

0 comments on commit 1acd6d7

Please sign in to comment.