Skip to content

Commit

Permalink
updates 2024-11-15 - flood data errors
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Nov 15, 2024
1 parent 72e69d2 commit d0161b1
Showing 1 changed file with 38 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,42 +54,50 @@ def ea_flood_public_forecast_bronze(context: AssetExecutionContext):
@with_slack_notification("Environment Agency Public Flood Forecast Data")
def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_public_forecast_bronze):
"""
EA Public Forecast flooding data silver bucket
EA Public Forecast flooding data silver bucket - flattens nested JSON structure
"""
# Extract the statement data
statement = ea_flood_public_forecast_bronze['statement']

data = ea_flood_public_forecast_bronze.to_dict()
print(data)

# Load data into model
flood_risk_data = FloodRiskData.model_validate(data)

# Create flattened dictionary structure
# Create base output dictionary with top-level fields
output = {
"area_of_concern": flood_risk_data.statement.area_of_concern_url,
"headline": flood_risk_data.statement.headline,
"pdf_report": flood_risk_data.statement.pdf_url,
"issued_at": flood_risk_data.statement.issued_at,
"next_issue_due": flood_risk_data.statement.next_issue_due_at,
"flood_risk_day1": flood_risk_data.statement.flood_risk_trend.day1,
"flood_risk_day2": flood_risk_data.statement.flood_risk_trend.day2,
"flood_risk_day3": flood_risk_data.statement.flood_risk_trend.day3,
"flood_risk_day4": flood_risk_data.statement.flood_risk_trend.day4,
"flood_risk_day5": flood_risk_data.statement.flood_risk_trend.day5,
"england_forecast": flood_risk_data.statement.public_forecast.england_forecast,
"wales_forecast_english": flood_risk_data.statement.public_forecast.wales_forecast_english,
"risks_areas": flood_risk_data.statement.risk_areas
"area_of_concern_url": statement.get('area_of_concern_url'),
"headline": statement.get('headline'),
"pdf_url": statement.get('pdf_url'),
"issued_at": statement.get('issued_at'),
"next_issue_due_at": statement.get('next_issue_due_at'),
}

# Add flood risk trend data
risk_trend = statement.get('flood_risk_trend', {})
output.update({
"flood_risk_day1": risk_trend.get('day1'),
"flood_risk_day2": risk_trend.get('day2'),
"flood_risk_day3": risk_trend.get('day3'),
"flood_risk_day4": risk_trend.get('day4'),
"flood_risk_day5": risk_trend.get('day5')
})

# Add public forecast data
public_forecast = statement.get('public_forecast', {})
output.update({
"england_forecast": public_forecast.get('england_forecast'),
"wales_forecast_english": public_forecast.get('wales_forecast_english')
})

# Add source data
for source in flood_risk_data.statement.sources:
if source.coastal:
output["coastal_risk"] = source.coastal
if source.surface:
output["surface_risk"] = source.surface
if source.river:
output["river_risk"] = source.river
if source.ground:
output["ground_risk"] = source.ground
for source in statement.get('sources', []):
if 'coastal' in source:
output['coastal_risk'] = source['coastal']
if 'surface' in source:
output['surface_risk'] = source['surface']
if 'river' in source:
output['river_risk'] = source['river']
if 'ground' in source:
output['ground_risk'] = source['ground']

# Add risk areas
output['risk_areas'] = statement.get('risk_areas')

# Convert to DataFrame
df = pl.DataFrame([output])
Expand Down

0 comments on commit d0161b1

Please sign in to comment.