diff --git a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_areas.py b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_areas.py index 3ea6247..1e03115 100644 --- a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_areas.py +++ b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_areas.py @@ -60,12 +60,11 @@ def ea_flood_areas_silver(context: AssetExecutionContext, ea_flood_areas_bronze) data = ea_flood_areas_bronze - if data: - try: - df = pl.DataFrame(data) - context.log.info(f"Success: {df.head(25)}, {df.columns}, {df.shape}") - context.log.info(f"Success: {df.columns}, {df.shape}") - context.log.info(f"Success: {df.shape}") - return df - except Exception as e: - raise e + try: + df = pl.DataFrame(data) + context.log.info(f"Success: {df.head(25)}, {df.columns}, {df.shape}") + context.log.info(f"Success: {df.columns}, {df.shape}") + context.log.info(f"Success: {df.shape}") + return df + except Exception as e: + raise e diff --git a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py index 8e391e1..60b693f 100644 --- a/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py +++ b/analytics_platform_dagster/assets/environment_data_assets/analytics_platform_ea_flood_public_forecast.py @@ -61,37 +61,36 @@ def ea_flood_public_forecast_silver(context: AssetExecutionContext, ea_flood_pub # Load data into model - to use dot notation further down flood_risk_data = FloodRiskData.model_validate(data) - if flood_risk_data: - statement = flood_risk_data.statement - - # Retrieve values using dot notation from the model - output = { - "area_of_concern": statement.area_of_concern_url, - "headline": statement.headline, - "pdf_report": statement.pdf_url, - "issued_at": statement.issued_at, - "next_issue_due": statement.next_issue_due_at, - "flood_risk_day1": statement.flood_risk_trend.day1, - "flood_risk_day2": statement.flood_risk_trend.day2, - "flood_risk_day3": statement.flood_risk_trend.day3, - "flood_risk_day4": statement.flood_risk_trend.day4, - "flood_risk_day5": statement.flood_risk_trend.day5, - "england_forecast": statement.public_forecast.england_forecast, - "wales_forecast_english": statement.public_forecast.wales_forecast_english, - "risks_areas": statement.risk_areas - } - - # Access nested values - for source in statement.sources: - if source.coastal: - output["coastal_risk"] = source.coastal - if source.surface: - output["surface_risk"] = source.surface - if source.river: - output["river_risk"] = source.river - if source.ground: - output["ground_risk"] = source.ground - - df = pl.DataFrame(output) - - return df + statement = flood_risk_data.statement + + # Retrieve values using dot notation from the model + output = { + "area_of_concern": statement.area_of_concern_url, + "headline": statement.headline, + "pdf_report": statement.pdf_url, + "issued_at": statement.issued_at, + "next_issue_due": statement.next_issue_due_at, + "flood_risk_day1": statement.flood_risk_trend.day1, + "flood_risk_day2": statement.flood_risk_trend.day2, + "flood_risk_day3": statement.flood_risk_trend.day3, + "flood_risk_day4": statement.flood_risk_trend.day4, + "flood_risk_day5": statement.flood_risk_trend.day5, + "england_forecast": statement.public_forecast.england_forecast, + "wales_forecast_english": statement.public_forecast.wales_forecast_english, + "risks_areas": statement.risk_areas + } + + # Access nested values + for source in statement.sources: + if source.coastal: + output["coastal_risk"] = source.coastal + if source.surface: + output["surface_risk"] = source.surface + if source.river: + output["river_risk"] = source.river + if source.ground: + output["ground_risk"] = source.ground + + df = pl.DataFrame(output) + + return df diff --git a/analytics_platform_dagster/assets/environment_data_assets/green_belt.py b/analytics_platform_dagster/assets/environment_data_assets/green_belt.py index 879b584..ed2b7c6 100644 --- a/analytics_platform_dagster/assets/environment_data_assets/green_belt.py +++ b/analytics_platform_dagster/assets/environment_data_assets/green_belt.py @@ -68,5 +68,5 @@ def green_belt_silver(context: AssetExecutionContext, green_belt_bronze) -> pl. """ Write green belt data directly to S3 using Polars """ - df = pl.DataFrame(green_belt_bronze) + df = green_belt_bronze return df diff --git a/analytics_platform_dagster/assets/infrastructure_data_assets/uk_power_networks_live_faults.py b/analytics_platform_dagster/assets/infrastructure_data_assets/uk_power_networks_live_faults.py index 1f3e7d8..fb3a382 100644 --- a/analytics_platform_dagster/assets/infrastructure_data_assets/uk_power_networks_live_faults.py +++ b/analytics_platform_dagster/assets/infrastructure_data_assets/uk_power_networks_live_faults.py @@ -65,7 +65,7 @@ def ukpn_live_faults_bronze(): @asset( group_name="infrastructure_assets", - io_manager_key="DeltaLake", + io_manager_key="PolarsDeltaLake", metadata={"mode": "overwrite"}, ins={"ukpn_live_faults_bronze": AssetIn("ukpn_live_faults_bronze")}, required_resource_keys={"slack"} @@ -79,10 +79,6 @@ def ukpn_live_faults_silver(context: AssetExecutionContext, ukpn_live_faults_bro """ data = ukpn_live_faults_bronze - # Add date_processed column - current_time = datetime.now().strftime("%Y%m%d_%H%M%S") - data["date_processed"] = current_time - # Check info context.log.info(f"{data.head(10)}") context.log.info(f"{data.columns}")