Skip to content

Commit

Permalink
feat(french cat): added duckdb loader for french cat data [2024-12-26]
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Dec 26, 2024
1 parent 2eadf75 commit de0045a
Showing 1 changed file with 68 additions and 11 deletions.
79 changes: 68 additions & 11 deletions HerdingCats/data_loader/data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,8 @@ def validate_inputs(func):
"""Decorator to validate resource data containing download URLs and formats."""
@wraps(func)
def wrapper(self, resource_data: Optional[List[Dict]], *args, **kwargs):
# Check if resource data exists and is non-empty
if not resource_data:
logger.error("No resource data provided")
raise ValueError("Resource data must be provided")

if not isinstance(resource_data, list):
# Check if resource data exists and is non-empty
if not resource_data or not isinstance(resource_data, list):
logger.error("Resource data must be a list")
raise ValueError("Resource data must be a list of dictionaries")
return func(self, resource_data, *args, **kwargs)
Expand Down Expand Up @@ -631,7 +627,8 @@ class FrenchGouvResourceLoader:
"""A class to load French Gouv data resources into various formats and storage systems."""

SUPPORTED_FORMATS = {
"spreadsheet": ["xls", "xlsx"],
"xls": ["xls"],
"xlsx": ["xlsx"],
"csv": ["csv"],
"parquet": ["parquet"],
"geopackage": ["gpkg", "geopackage"]
Expand All @@ -653,7 +650,19 @@ def _validate_dependencies(self):
if missing:
raise ImportError(f"Missing required dependencies: {', '.join(missing)}")

def validate_resource_data(
@staticmethod
def validate_inputs(func):
"""Decorator to validate resource data containing download URLs and formats."""
@wraps(func)
def wrapper(self, resource_data: Optional[List[Dict]], *args, **kwargs):
# Check if resource data exists and is non-empty
if not resource_data or not isinstance(resource_data, list):
logger.error("Resource data must be a list")
raise ValueError("Resource data must be a list of dictionaries")
return func(self, resource_data, *args, **kwargs)
return wrapper

def _validate_resource_data(
self,
resource_data: Optional[List[Dict[str, str]]],
format_type: str
Expand All @@ -670,6 +679,8 @@ def validate_resource_data(
if format_type in self.SUPPORTED_FORMATS
else [format_type])

print(valid_formats)

# Validate format type
if format_type not in self.SUPPORTED_FORMATS and format_type not in all_formats:
raise OpenDataSoftExplorerError(
Expand All @@ -679,17 +690,63 @@ def validate_resource_data(

# Find matching resource
url = next(
(r.get('resource_latest') for r in resource_data
(r.get('resource_url') for r in resource_data
if r.get('resource_format', '').lower() in valid_formats),
None
)

# If format provided does not have a url provide the formats that do
if not url:
available_formats = [r['resource_format'] for r in resource_data]
available_formats = [r['resource_url'] for r in resource_data]
raise OpenDataSoftExplorerError(
f"No resource found with format: {format_type}. "
f"Available formats: {', '.join(available_formats)}"
)

return url
return url

@validate_inputs
def duckdb_data_loader(
self,
resource_data: Optional[List[Dict[str, str]]],
format_type: Literal["csv", "parquet", "xls", "xlsx"],
api_key: Optional[str] = None,
sheet_name: Optional[str] = None
) -> duckdb.DuckDBPyConnection:
"""Load data from a resource URL directly into DuckDB."""
url = self._validate_resource_data(resource_data, format_type)

if api_key:
url = f"{url}?apikey={api_key}"

con = duckdb.connect(':memory:')
con.execute("SET force_download=true")
con.execute("INSTALL spatial")
con.execute("LOAD spatial")

try:
# Use match statement for format handling
match format_type:
case "parquet":
con.execute("CREATE TABLE data AS SELECT * FROM read_parquet(?)", [url])
case "csv":
con.execute("CREATE TABLE data AS SELECT * FROM read_csv(?)", [url])
case "xls" | "xlsx" | "spreadsheet":
if sheet_name:
con.execute("CREATE TABLE data AS SELECT * FROM st_read(?, sheet_name=?)", [url, sheet_name])
else:
con.execute("CREATE TABLE data AS SELECT * FROM st_read(?)", [url])
case _:
raise ValueError(f"Unsupported format type: {format_type}")

# Verify data was loaded
sample_data = con.execute("SELECT * FROM data LIMIT 10").fetchall()
if not sample_data and not api_key:
raise OpenDataSoftExplorerError(
"Received empty dataset. This likely means an API key is required."
)

return con

except duckdb.Error as e:
raise OpenDataSoftExplorerError(f"Failed to load {format_type} resource into DuckDB", e)

0 comments on commit de0045a

Please sign in to comment.