Skip to content

Commit

Permalink
feat(open datasoft): added more dataloader methods for opendata soft …
Browse files Browse the repository at this point in the history
…catallogues [2024-11-27]
  • Loading branch information
CHRISCARLON committed Nov 27, 2024
1 parent 64dc7fe commit 3b1123a
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 4 deletions.
197 changes: 196 additions & 1 deletion HerdingCats/data_loader/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,6 @@ def polars_data_loader(
# Example usage
import HerdingCats as hc
from pprint import pprint
def main():
with hc.CatSession(hc.OpenDataSoftDataCatalogues.UK_POWER_NETWORKS) as session:
Expand Down Expand Up @@ -465,3 +464,199 @@ def main():
raise OpenDataSoftExplorerError("Failed to download resource", e)

raise OpenDataSoftExplorerError("No parquet format resource found")

def pandas_data_loader(
self, resource_data: Optional[List[Dict]], format_type: Literal["parquet"], api_key: Optional[str] = None
) -> pd.DataFrame:
"""
Load data from a resource URL into a Polars DataFrame.
Args:
resource_data: List of dictionaries containing resource information
format_type: Expected format type (currently only supports 'parquet')
api_key: Optional API key for authentication with OpenDataSoft
Returns:
Polars DataFrame
Raises:
OpenDataSoftExplorerError: If resource data is missing or download fails
# Example usage
import HerdingCats as hc
def main():
with hc.CatSession(hc.OpenDataSoftDataCatalogues.UK_POWER_NETWORKS) as session:
explore = hc.OpenDataSoftCatExplorer(session)
data_loader = hc.OpenDataSoftResourceLoader()
data = explore.show_dataset_export_options_dict("ukpn-smart-meter-installation-volumes")
pd_df = data_loader.pandas_data_loader(data, "parquet", "api_key")
print(pd_df.head(10))
if __name__ == "__main__":
main()
"""
if not resource_data:
raise OpenDataSoftExplorerError("No resource data provided")

headers = {'Accept': 'application/parquet'}
if api_key:
headers['Authorization'] = f'apikey {api_key}'

for resource in resource_data:
if resource.get('format', '').lower() == 'parquet':
url = resource.get('download_url')
if not url:
continue
try:
response = requests.get(url, headers=headers)
response.raise_for_status()
binary_data = BytesIO(response.content)
df = pd.read_parquet(binary_data)

if df.size == 0 and not api_key:
raise OpenDataSoftExplorerError(
"Received empty DataFrame. This likely means an API key is required for this dataset. "
"Please provide an API key and try again. You can usually do this by creating an account with the datastore you are tyring to access"
)
return df

except (requests.RequestException, Exception) as e:
raise OpenDataSoftExplorerError("Failed to download resource", e)

raise OpenDataSoftExplorerError("No parquet format resource found")

def duckdb_data_loader(
self, resource_data: Optional[List[Dict]], format_type: Literal["parquet"], api_key: Optional[str] = None
) -> duckdb.DuckDBPyConnection:
"""
Load data from a resource URL into a DuckDB in-memory DB via pandas.
Args:
resource_data: List of dictionaries containing resource information
format_type: Expected format type (currently only supports 'parquet')
api_key: Optional API key for authentication with OpenDataSoft
Returns:
DuckDB connection with loaded data
Raises:
OpenDataSoftExplorerError: If resource data is missing or download fails
# Example usage:
import HerdingCats as hc
def main():
with hc.CatSession(hc.OpenDataSoftDataCatalogues.ELIA_BELGIAN_ENERGY) as session:
explore = hc.OpenDataSoftCatExplorer(session)
loader = hc.OpenDataSoftResourceLoader()
data = explore.show_dataset_export_options_dict("ods036")
duckdb = loader.duckdb_data_loader(data, "parquet")
df = duckdb.execute("SELECT * FROM data LIMIT 10").fetchdf()
print(df)
"""
if not resource_data:
raise OpenDataSoftExplorerError("No resource data provided")

headers = {'Accept': 'application/parquet'}
if api_key:
headers['Authorization'] = f'apikey {api_key}'

# Create in-memory DuckDB connection
con = duckdb.connect(':memory:')

for resource in resource_data:
if resource.get('format', '').lower() == 'parquet':
url = resource.get('download_url')
if not url:
continue
try:
# Download parquet file to memory
response = requests.get(url, headers=headers)
response.raise_for_status()
binary_data = BytesIO(response.content)

# First read into pandas DataFrame
df = pd.read_parquet(binary_data)

# Check if DataFrame is empty
if df.empty and not api_key:
raise OpenDataSoftExplorerError(
"Received empty DataFrame. This likely means an API key is required for this dataset. "
"Please provide an API key and try again. You can usually do this by creating an account with the datastore you are trying to access"
)

# Load DataFrame into DuckDB
con.execute("CREATE TABLE data AS SELECT * FROM df")
return con

except (requests.RequestException, pd.errors.EmptyDataError, duckdb.Error) as e:
raise OpenDataSoftExplorerError("Failed to download or load resource", e)

raise OpenDataSoftExplorerError("No parquet format resource found")

def aws_s3_data_loader(
self,
resource_data: Optional[List[Dict]],
bucket_name: str,
custom_name: str,
api_key: Optional[str] = None,
) -> None:
"""
Load resource data into remote S3 storage as a parquet file.
Args:
resource_data: List of dictionaries containing resource information
bucket_name: S3 bucket name
custom_name: Custom prefix for the filename
api_key: Optional API key for authentication
"""
if not resource_data:
raise OpenDataSoftExplorerError("No resource data provided")

if not bucket_name:
raise ValueError("No bucket name provided")

# Create an S3 client
s3_client = boto3.client("s3")
logger.success("S3 Client Created")

# Check if the bucket exists
try:
s3_client.head_bucket(Bucket=bucket_name)
logger.success("Bucket Found")
except ClientError as e:
error_code = int(e.response["Error"]["Code"])
if error_code == 404:
logger.error(f"Bucket '{bucket_name}' does not exist.")
else:
logger.error(f"Error checking bucket '{bucket_name}': {e}")
return

headers = {'Accept': 'application/parquet'}
if api_key:
headers['Authorization'] = f'apikey {api_key}'

for resource in resource_data:
if resource.get('format', '').lower() == 'parquet':
url = resource.get('download_url')
if not url:
continue

try:
response = requests.get(url, headers=headers)
response.raise_for_status()
binary_data = BytesIO(response.content)

# Generate a unique filename
filename = f"{custom_name}-{uuid.uuid4()}.parquet"

# Upload the parquet file directly
s3_client.upload_fileobj(binary_data, bucket_name, filename)
logger.success("Parquet file uploaded successfully to S3")
return

except requests.RequestException as e:
raise OpenDataSoftExplorerError("Failed to download resource", e)
except ClientError as e:
logger.error(f"Error: {e}")
return

raise OpenDataSoftExplorerError("No parquet format resource found")
9 changes: 9 additions & 0 deletions HerdingCats/explorer/cat_explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1020,6 +1020,9 @@ def __init__(self, cat_session: CatSession):
"""
self.cat_session = cat_session

# ----------------------------
# Get all datasets available on the catalogue
# ----------------------------
def fetch_all_datasets(self) -> dict | None:
urls = [
self.cat_session.base_url + OpenDataSoftApiPaths.SHOW_DATASETS,
Expand Down Expand Up @@ -1095,6 +1098,9 @@ def fetch_all_datasets(self) -> dict | None:
logger.warning("No datasets were retrieved.")
return None

# ----------------------------
# Get metadata about specific datasets in the catalogue
# ----------------------------
def show_dataset_info_dict(self, dataset_id):
urls = [
self.cat_session.base_url + OpenDataSoftApiPaths.SHOW_DATASET_INFO.format(dataset_id),
Expand All @@ -1113,6 +1119,9 @@ def show_dataset_info_dict(self, dataset_id):
error_msg = f"\033[91mFailed to fetch dataset: {str(last_error)}. Are you sure this dataset exists? Check again.\033[0m"
raise CatExplorerError(error_msg)

# ----------------------------
# Show what export file types are available for a particular dataset
# ----------------------------
def show_dataset_export_options_dict(self, dataset_id):
urls = [
self.cat_session.base_url + OpenDataSoftApiPaths.SHOW_DATASET_EXPORTS.format(dataset_id),
Expand Down
3 changes: 0 additions & 3 deletions tests/ckan/test_ckan_package_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ def test_package_list_dictionary(catalogue_url):
# Assert that we got a result
assert results is not None, f"No results returned for {catalogue_url}"

# Check if we got the expected number of rows
assert len(results) > 100, "There could be a problem - check manually"

logger.info(f"Package search test passed for {catalogue_url}")
except requests.RequestException as e:
pytest.fail(
Expand Down
30 changes: 30 additions & 0 deletions tests/open_data_soft/test_ods_get_datasets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import pytest
from HerdingCats.session.cat_session import CatSession
from HerdingCats.explorer.cat_explore import OpenDataSoftCatExplorer
import requests
from loguru import logger

CATALOGUES = ["https://ukpowernetworks.opendatasoft.com"]

@pytest.mark.parametrize("catalogue_url", CATALOGUES)
def test_package_list_dictionary(catalogue_url):
"""
Test the package list functionality for predefined data catalogues
"""
with CatSession(catalogue_url) as cat_session:
explorer = OpenDataSoftCatExplorer(cat_session)
try:
results = explorer.fetch_all_datasets()

print(results)

# Assert that we got a result
assert results is not None, f"No results returned for {catalogue_url}"

logger.info(f"Package search test passed for {catalogue_url}")
except requests.RequestException as e:
pytest.fail(
f"Failed to perform package search for {catalogue_url}: {str(e)}"
)
except AssertionError as e:
pytest.fail(str(e))

0 comments on commit 3b1123a

Please sign in to comment.