Skip to content

Commit

Permalink
Updates 2024-08-28 - created new Python classes and project structure
Browse files Browse the repository at this point in the history
  • Loading branch information
CHRISCARLON committed Aug 28, 2024
1 parent 9c339ae commit b819be3
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 100 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ruff = "^0.5.1"
tabulate = "^0.9.0"
boto3 = "^1.34.140"
pytest = "^8.2.2"
polars = "^1.5.0"

[build-system]
requires = ["poetry-core"]
Expand Down
9 changes: 9 additions & 0 deletions src/api_endpoints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
class CkanApiPaths:
BASE_PATH = "/api/3/action/{}"
PACKAGE_LIST = BASE_PATH.format("package_list")
PACKAGE_SEARCH = BASE_PATH.format("package_search")
# Add more paths as needed


class DcatApiPaths:
BASE_PATH = "/api/feed/dcat-ap/2.1.1.json"
11 changes: 4 additions & 7 deletions src/cats_errors.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
class CATExploreError(Exception):
"""Base exception for CATExplore"""
class CatSessionError(Exception):
pass

class CKANFetchError(CATExploreError):
"""Raised when CKAN fetch fails"""

class DCATFetchError(CATExploreError):
"""Raised when DCAT fetch fails"""
class CatExplorerError(Exception):
pass
201 changes: 108 additions & 93 deletions src/herding_cats.py
Original file line number Diff line number Diff line change
@@ -1,110 +1,125 @@
from pandas.io.clipboards import option_context
import requests
import pandas as pd
import polars as pl

from typing import Any, Dict
from typing import Any, Dict, Optional, Union, Literal
from loguru import logger
from pprint import pprint
from enum import Enum
from urllib.parse import urlencode

from .cats_errors import CATExploreError, CKANFetchError, DCATFetchError
from api_endpoints import CkanApiPaths
from cats_errors import CatExplorerError, CatSessionError

class CatSession:
def __init__(self, domain: str) -> None:
"""Initialise CATExplore with a domain."""
self.domain = domain
self.session = requests.Session()
self.base_url = f"https://{self.domain}"

def start_session(self) -> None:
"""Start a session with the specified domain."""
try:
response = self.session.get(self.base_url)
response.raise_for_status()
logger.info(f"Session started successfully with {self.domain}")
except requests.RequestException as e:
logger.error(f"Failed to start session: {e}")
raise CatSessionError(f"Failed to start session: {str(e)}")

def close_session(self) -> None:
"""Close the session."""
self.session.close()
logger.info(f"Session closed for {self.domain}")

class CATExplore:
def __enter__(self):
"""Allow use with context manager with"""
self.start_session()
return self

CKAN_API_PATH = "/api/3/action/{}"
DCAT_API_PATH = "/api/feed/dcat-ap/2.1.1.json"
REQUEST_TIMEOUT = 15
def __exit__(self, exc_type, exc_val, exc_tb):
"""Allows use with context manager with"""
self.close_session()

def __init__(self, domain: str) -> None:
"""Initialise CATExplore with a domain."""
self.domain = domain

# DATA SAMPLES
def fetch_sample(self) -> Dict[str, Any]:
"""Fetch a sample from either CKAN or DCAT API."""
class CatExplorer:
def __init__(self, cat_session: CatSession):
self.cat_session = cat_session

def package_list_json(self, search_query: Optional[str]=None):
url = self.cat_session.base_url + CkanApiPaths.PACKAGE_LIST
try:
return self.fetch_ckan_sample()
except CKANFetchError as ckan_error:
logger.error(f"CKAN fetch failed: {ckan_error} - Attempting DCAT")
try:
return self.fetch_dcat_sample()
except DCATFetchError as dcat_error:
logger.error(f"DCAT fetch failed: {dcat_error}")
raise CATExploreError("Both CKAN and DCAT fetches failed") from dcat_error

def fetch_ckan_sample(self, endpoint: str = "package_search") -> Dict[str, Any]:
"""Fetch a sample from CKAN API."""
url = f"https://{self.domain}{self.CKAN_API_PATH.format(endpoint)}"
data = self._make_request(url)
return self._extract_ckan_result_sample(data)

def fetch_dcat_sample(self) -> Dict[str, Any]:
"""Fetch a sample from DCAT API."""
url = f"https://{self.domain}{self.DCAT_API_PATH}"
data = self._make_request(url)
return self._extract_dcat_result_sample(data)

# SEARCH DATA
def basic_search_ckan_data(self, user_input: str, endpoint: str = "package_search") -> Dict[str, Any]:
try:
url = f"https://{self.domain}{self.CKAN_API_PATH.format(endpoint)}"
params = {
"q": user_input
}
return self._make_request(url, params)
except requests.exceptions.RequestException as error:
logger.error(f"An error occurred during the request: {error}")
raise

# UTILITY FUNCTIONS
def _make_request(self, url: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
"""Make a GET request to the specified URL with optional parameters."""
response = self.cat_session.session.get(url)
response.raise_for_status()
data = response.json()
return data['result']
except requests.RequestException as e:
logger.error(f"Failed to search datasets: {e}")
raise CatExplorerError(f"Failed to search datasets: {str(e)}")

def package_list_dataframe(self, df_type: Literal["pandas", "polars"]) -> Union[pd.DataFrame, 'pl.DataFrame']:
if df_type.lower() not in ["pandas", "polars"]:
raise ValueError(f"Invalid df_type: '{df_type}'. Must be either 'pandas' or 'polars'.")

url = self.cat_session.base_url + CkanApiPaths.PACKAGE_LIST
try:
response = requests.get(url, params=params, timeout=self.REQUEST_TIMEOUT)
response = self.cat_session.session.get(url)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as error:
logger.error(f"An error occurred during the request: {error}")
raise

@staticmethod
def _extract_ckan_result_sample(data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract the first result from CKAN API response."""
if 'result' in data:
if 'results' in data['result'] and data['result']['results']:
return data['result']['results'][0]
elif 'result' in data['result'] and data['result']['result']:
return data['result']['result'][0]
raise CKANFetchError("Expected data structure not found in CKAN response")

@staticmethod
def _extract_dcat_result_sample(data: Dict[str, Any]) -> Dict[str, Any]:
"""Extract the first result from DCAT API response."""
if 'dcat:dataset' in data and isinstance(data['dcat:dataset'], list):
return data['dcat:dataset'][0]
raise DCATFetchError("Expected data structure not found in DCAT response")

@staticmethod
def print_structure(data: Any, indent: int = 0, key: str = "root") -> None:
"""Print the structure of any data type."""
if isinstance(data, dict):
print(f"{' ' * indent}{key}:")
for k, v in data.items():
CATExplore.print_structure(v, indent + 1, k)
elif isinstance(data, list):
print(f"{' ' * indent}{key}: (list of {len(data)} items)")
if data:
CATExplore.print_structure(data[0], indent + 1, f"{key}[0]")
else:
value_type = type(data).__name__
value_preview = str(data)[:50] + "..." if len(str(data)) > 50 else str(data)
print(f"{' ' * indent}{key}: ({value_type}) {value_preview}")

@staticmethod
def pretty_print_helper(data: Any) -> None:
return pprint(data)
data = response.json()
result = data['result']

if df_type.lower() == "polars":
try:
import polars as pl
return pl.DataFrame(result)
except ImportError:
logger.warning("Polars is not installed. Please run 'pip install polars' to use this option.")
raise ImportError("Polars is not installed. Please run 'pip install polars' to use this option.")
else: # df_type.lower() == "pandas"
return pd.DataFrame(result)

except requests.RequestException as e:
logger.error(f"Failed to search datasets: {e}")
raise CatExplorerError(f"Failed to search datasets: {str(e)}")
except Exception as e:
logger.error(f"Failed to create DataFrame: {e}")
raise CatExplorerError(f"Failed to create DataFrame: {str(e)}")

def package_search_json(self, search_query: Optional[str]=None):
base_url = self.cat_session.base_url + CkanApiPaths.PACKAGE_SEARCH

params = {}
if search_query:
params["q"] = search_query

url = f"{base_url}?{urlencode(params)}" if params else base_url
print(url)

try:
response = self.cat_session.session.get(url)
response.raise_for_status()
data = response.json()
return data['result']
except requests.RequestException as e:
logger.error(f"Failed to search datasets: {e}")
raise CatExplorerError(f"Failed to search datasets: {str(e)}")

def get_package_count(self) -> int:
url = self.cat_session.base_url + CkanApiPaths.PACKAGE_LIST
try:
response = self.cat_session.session.get(url)
response.raise_for_status()
data = response.json()
return len(data['result'])
except requests.RequestException as e:
logger.error(f"Failed to get package count: {e}")
raise CatExplorerError(f"Failed to get package count: {str(e)}")

# Example usage
if __name__ == "__main__":
explorer = CATExplore("data.london.gov.uk")
result = explorer.basic_search_ckan_data("climate")
explorer.pretty_print_helper(result)
with CatSession("data.london.gov.uk") as session:
explore = CatExplorer(session)
v = explore.package_search_json(search_query="census")
pprint(v)

0 comments on commit b819be3

Please sign in to comment.