Skip to content

Commit

Permalink
green belt asset added
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Nov 6, 2024
1 parent 1ec6e61 commit fa0b371
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 27 deletions.
11 changes: 7 additions & 4 deletions analytics_platform_dagster/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
from dagster import Definitions, load_assets_from_modules
from dagster_slack import SlackResource, make_slack_on_run_failure_sensor

from .assets.location_data_assets import analytics_platform_os_assets
from .assets.environment_data_assets import (
analytics_platform_ea_flood_areas,
analytics_platform_ea_flood_public_forecast,
green_belt
)
from .assets.trade_data_assets import analytics_platform_dbt_trade_barrier_assets
from .assets.energy_data_assets import (
Expand All @@ -29,6 +29,8 @@
from .jobs.analytics_platfom_jobs import (
environment_job_1,
environment_job_1_daily,
environment_job_2,
environment_job_2_monthly,
trade_job_1,
trade_job_1_daily,
metadata_job_1,
Expand Down Expand Up @@ -69,20 +71,20 @@ def get_env_var(var_name: str) -> str:
assets=load_assets_from_modules(
[
analytics_platform_datastore_assets,
analytics_platform_os_assets,
analytics_platform_ea_flood_areas,
analytics_platform_ea_flood_public_forecast,
analytics_platform_dbt_trade_barrier_assets,
analytics_platform_carbon_intensity_assets,
national_charge_points_london_assets,
entsog_uk_gas_assets,
uk_power_networks_live_faults,
national_charge_points_uk_assets

national_charge_points_uk_assets,
green_belt
]
),
jobs=[
environment_job_1,
environment_job_2,
trade_job_1,
metadata_job_1,
energy_job_1,
Expand All @@ -92,6 +94,7 @@ def get_env_var(var_name: str) -> str:
schedules=[
energy_job_1_daily,
environment_job_1_daily,
environment_job_2_monthly,
trade_job_1_daily,
infrastructure_job_1_weekly,
metadata_job_1_weekly,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
import requests
import pyarrow as pa
import pandas as pd
import io

from typing import List, Dict, Any
from pydantic import ValidationError
from ...utils.variables_helper.url_links import asset_urls
from dagster import AssetExecutionContext, AssetIn, asset
from ...utils.slack_messages.slack_message import with_slack_notification
from ...models.environment_data_models.green_belt_model import (
GreenBeltResponse,
)

@asset(
group_name="environment_data",
io_manager_key="S3Parquet"
)
def green_belt_bronze(context: AssetExecutionContext):
"""
Write green belt data out to raw staging area
Returns:
Parquet in S3.
"""
url = asset_urls.get("green_belt")

if url is None:
raise ValueError("No url!")

validation_errors = []
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
try:
GreenBeltResponse.model_validate(data)
except ValidationError as e:
validation_errors = e.errors()

df = pd.DataFrame(data)
df = df.astype(str)
context.log.info(f"Processed {len(df)} records with {len(validation_errors)} validation errors")

parquet_buffer = io.BytesIO()
df.to_parquet(parquet_buffer, engine="pyarrow")
parquet_bytes = parquet_buffer.getvalue()

context.log.info("Successfully processed batch into Parquet format")
return parquet_bytes

except Exception as e:
context.log.error(f"Error processing data: {str(e)}")
raise e

@asset(
group_name="environment_data",
io_manager_key="DeltaLake",
metadata={"mode": "overwrite"},
ins={
"green_belt_bronze": AssetIn(
"green_belt_bronze"
)
},
required_resource_keys={"slack"}
)
@with_slack_notification("GB Green Belt")
def green_belt_silver(
context: AssetExecutionContext, green_belt_bronze
) -> pd.DataFrame:
"""
Write green belt data out to Delta Lake
Returns:
Delta Lake table in S3.
"""
try:
df = pd.DataFrame(green_belt_bronze)
return df

except Exception as e:
context.log.error(f"Error processing data: {e}")
raise
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,6 @@
ChargeDeviceResponse,
)

# Get json helper - return as .json() and not .content
def return_json(url_link: str) -> dict:
"""
Simple json get request
Args:
Url (e.g. api endpoint): str
Returns:
json as bytes
"""
try:
response = requests.get(url_link)
response.raise_for_status()
data = response.json()
return data
except requests.RequestException as error:
print(f"An error occurred: {error}")
raise

# Create Arrow Schema creation of arrow table
def create_arrow_schema() -> pa.Schema:
"""
Expand Down Expand Up @@ -234,7 +214,6 @@ def national_charge_point_uk_bronze(context: AssetExecutionContext):
response = requests.get(url)
response.raise_for_status()
data = response.json()

try:
ChargeDeviceResponse.model_validate(data)
except ValidationError as e:
Expand Down
16 changes: 15 additions & 1 deletion analytics_platform_dagster/jobs/analytics_platfom_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"ea_flood_areas_bronze",
"ea_flood_areas_silver",
"ea_flood_public_forecast_bronze",
"ea_flood_public_forecast_silver"
"ea_flood_public_forecast_silver",
]
)

Expand All @@ -18,6 +18,20 @@
name="environment_daily_schedule",
)

environment_job_2 = define_asset_job(
name="environment_job_2",
selection=[
"green_belt_bronze",
"green_belt_silver",
]
)
environment_job_2_monthly = ScheduleDefinition(
job=environment_job_2,
cron_schedule="0 5 1 * *",
execution_timezone="Europe/London",
name="environment_monthly_schedule",
)

# TRADE
trade_job_1 = define_asset_job(
name="trade_job_1",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from pydantic import BaseModel, Field
from typing import Optional, List

class GreenBeltEntity(BaseModel):
dataset: Optional[str] = None
end_date: Optional[str] = Field(None, alias='end-date')
entity: Optional[str] = None
entry_date: Optional[str] = Field(None, alias='entry-date')
geometry: Optional[str] = None
green_belt_core: Optional[str] = Field(None, alias='green-belt-core')
local_authority_district: Optional[str] = Field(None, alias='local-authority-district')
name: Optional[str] = None
organisation_entity: Optional[str] = Field(None, alias='organisation-entity')
point: Optional[str] = None
prefix: Optional[str] = None
reference: Optional[str] = None
start_date: Optional[str] = Field(None, alias='start-date')
typology: Optional[str] = None

class GreenBeltResponse(BaseModel):
entities: List[GreenBeltEntity]

class Config:
allow_populate_by_name = True
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@
"national_charge_point_london": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ozev-ukpn-national-chargepoint-register/exports/json?lang=en&timezone=Europe%2FLondon",
"national_charge_point_uk": "http://chargepoints.dft.gov.uk/api/retrieve/registry/format/json",
"carbon_intensity_api": "https://api.carbonintensity.org.uk/regional/regionid/",
"ukpn_live_faults": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ukpn-live-faults/exports/xlsx"
"ukpn_live_faults": "https://ukpowernetworks.opendatasoft.com/api/explore/v2.1/catalog/datasets/ukpn-live-faults/exports/xlsx",
"green_belt": "https://files.planning.data.gov.uk/dataset/green-belt.json",
}

0 comments on commit fa0b371

Please sign in to comment.