Skip to content

Commit

Permalink
updates 2024-11-10 - small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Nov 10, 2024
1 parent 1d7ab8b commit 48738dc
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,11 @@ def ea_flood_areas_silver(context: AssetExecutionContext, ea_flood_areas_bronze)

data = ea_flood_areas_bronze

if data:
try:
df = pl.DataFrame(data)
context.log.info(f"Success: {df.head(25)}, {df.columns}, {df.shape}")
context.log.info(f"Success: {df.columns}, {df.shape}")
context.log.info(f"Success: {df.shape}")
return df
except Exception as e:
raise e
try:
df = pl.DataFrame(data)
context.log.info(f"Success: {df.head(25)}, {df.columns}, {df.shape}")
context.log.info(f"Success: {df.columns}, {df.shape}")
context.log.info(f"Success: {df.shape}")
return df
except Exception as e:
raise e
Original file line number Diff line number Diff line change
Expand Up @@ -61,37 +61,36 @@ def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_pub
# Load data into model - to use dot notation further down
flood_risk_data = FloodRiskData.model_validate(data)

if flood_risk_data:
statement = flood_risk_data.statement

# Retrieve values using dot notation from the model
output = {
"area_of_concern": statement.area_of_concern_url,
"headline": statement.headline,
"pdf_report": statement.pdf_url,
"issued_at": statement.issued_at,
"next_issue_due": statement.next_issue_due_at,
"flood_risk_day1": statement.flood_risk_trend.day1,
"flood_risk_day2": statement.flood_risk_trend.day2,
"flood_risk_day3": statement.flood_risk_trend.day3,
"flood_risk_day4": statement.flood_risk_trend.day4,
"flood_risk_day5": statement.flood_risk_trend.day5,
"england_forecast": statement.public_forecast.england_forecast,
"wales_forecast_english": statement.public_forecast.wales_forecast_english,
"risks_areas": statement.risk_areas
}

# Access nested values
for source in statement.sources:
if source.coastal:
output["coastal_risk"] = source.coastal
if source.surface:
output["surface_risk"] = source.surface
if source.river:
output["river_risk"] = source.river
if source.ground:
output["ground_risk"] = source.ground

df = pl.DataFrame(output)

return df
statement = flood_risk_data.statement

# Retrieve values using dot notation from the model
output = {
"area_of_concern": statement.area_of_concern_url,
"headline": statement.headline,
"pdf_report": statement.pdf_url,
"issued_at": statement.issued_at,
"next_issue_due": statement.next_issue_due_at,
"flood_risk_day1": statement.flood_risk_trend.day1,
"flood_risk_day2": statement.flood_risk_trend.day2,
"flood_risk_day3": statement.flood_risk_trend.day3,
"flood_risk_day4": statement.flood_risk_trend.day4,
"flood_risk_day5": statement.flood_risk_trend.day5,
"england_forecast": statement.public_forecast.england_forecast,
"wales_forecast_english": statement.public_forecast.wales_forecast_english,
"risks_areas": statement.risk_areas
}

# Access nested values
for source in statement.sources:
if source.coastal:
output["coastal_risk"] = source.coastal
if source.surface:
output["surface_risk"] = source.surface
if source.river:
output["river_risk"] = source.river
if source.ground:
output["ground_risk"] = source.ground

df = pl.DataFrame(output)

return df
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ def green_belt_silver(context: AssetExecutionContext, green_belt_bronze) -> pl.
"""
Write green belt data directly to S3 using Polars
"""
df = pl.DataFrame(green_belt_bronze)
df = green_belt_bronze
return df
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def ukpn_live_faults_bronze():

@asset(
group_name="infrastructure_assets",
io_manager_key="DeltaLake",
io_manager_key="PolarsDeltaLake",
metadata={"mode": "overwrite"},
ins={"ukpn_live_faults_bronze": AssetIn("ukpn_live_faults_bronze")},
required_resource_keys={"slack"}
Expand All @@ -79,10 +79,6 @@ def ukpn_live_faults_silver(context: AssetExecutionContext, ukpn_live_faults_bro
"""
data = ukpn_live_faults_bronze

# Add date_processed column
current_time = datetime.now().strftime("%Y%m%d_%H%M%S")
data["date_processed"] = current_time

# Check info
context.log.info(f"{data.head(10)}")
context.log.info(f"{data.columns}")
Expand Down

0 comments on commit 48738dc

Please sign in to comment.