From 2c664233ed51653b01f955be15c08bcc4249590c Mon Sep 17 00:00:00 2001 From: Chris Carlon Date: Wed, 25 Dec 2024 19:43:35 +0000 Subject: [PATCH] feat!(dataloader): changed dataloader method names to make them more user friendly and added more methods in for french data catalogue [2024-12-25] BREAKING CHANGE: changed dataloader method names to make them more user friendly and added more methods in for french data catalogue --- HerdingCats/endpoints/api_endpoints.py | 2 + HerdingCats/explorer/cat_explore.py | 205 +++++++++++++++++++++---- 2 files changed, 174 insertions(+), 33 deletions(-) diff --git a/HerdingCats/endpoints/api_endpoints.py b/HerdingCats/endpoints/api_endpoints.py index 1cc2997..ebbcd69 100644 --- a/HerdingCats/endpoints/api_endpoints.py +++ b/HerdingCats/endpoints/api_endpoints.py @@ -68,6 +68,8 @@ class FrenchGouvApiPaths: BASE_PATH = "/api/1/{}" SHOW_DATASETS = BASE_PATH.format("datasets") SHOW_DATASETS_BY_ID = BASE_PATH.format("datasets/{}") + SHOW_DATASET_RESOURCE_BY_ID = BASE_PATH.format("datasets/{}/resources/") + CATALOGUE = "https://object.files.data.gouv.fr/hydra-parquet/hydra-parquet/b06842f8ee27a0302ebbaaa344d35e4c.parquet" class FrenchGouvCatalogue(Enum): GOUV_FR = "https://www.data.gouv.fr" diff --git a/HerdingCats/explorer/cat_explore.py b/HerdingCats/explorer/cat_explore.py index b14a06e..125c37c 100644 --- a/HerdingCats/explorer/cat_explore.py +++ b/HerdingCats/explorer/cat_explore.py @@ -121,7 +121,7 @@ def get_package_count(self) -> int: logger.error(f"Failed to get package count: {e}") raise CatExplorerError(f"Failed to get package count: {str(e)}") - def package_list_dictionary(self) -> dict: + def get_package_list(self) -> dict: """ Explore all packages that are available to query as a dictionary. @@ -145,7 +145,7 @@ def package_list_dictionary(self) -> dict: if __name__ == "__main__": with CatSession("data.london.gov.uk") as session: explore = CkanCatExplorer(session) - all_packages = explore.package_list_dictionary() + all_packages = explore.get_package_list() pprint(all_packages) """ @@ -161,7 +161,7 @@ def package_list_dictionary(self) -> dict: except requests.RequestException as e: raise CatExplorerError(f"Failed to search datasets: {str(e)}") - def package_list_dataframe( + def get_package_list_dataframe( self, df_type: Literal["pandas", "polars"] ) -> Union[pd.DataFrame, "pl.DataFrame"]: """ @@ -198,7 +198,7 @@ def package_list_dataframe( if __name__ == "__main__": with CatSession("uk gov") as session: explorer = CkanCatExplorer(session) - results = explorer.package_list_dataframe('polars') + results = explorer.get_package_list_dataframe('polars') print(results) """ @@ -236,7 +236,7 @@ def package_list_dataframe( except (requests.RequestException, Exception) as e: raise CatExplorerError(f"Failed to search datasets: {str(e)}") - def package_list_dictionary_extra(self): + def get_package_list_extra(self): """ Explore all packages that are available to query. @@ -244,11 +244,13 @@ def package_list_dictionary_extra(self): Sorted by most recently updated dataset first. + This is sometimes implemented different depending on the organisation. + # Example usage... if __name__ == "__main__": with CatSession("data.london.gov.uk") as session: explore = CkanCatExplorer(session) - info_extra = package_list_dictionary_extra() + info_extra = get_package_list_extra() pprint(info_extra) """ url = ( @@ -277,7 +279,7 @@ def package_list_dictionary_extra(self): raise CatExplorerError(f"Failed to search datasets: {str(e)}") return - def package_list_dataframe_extra( + def get_package_list_dataframe_extra( self, df_type: Literal["pandas", "polars"] ) -> Union[pd.DataFrame, "pl.DataFrame"]: """ @@ -327,7 +329,7 @@ def package_list_dataframe_extra( if __name__ == "__main__": with CatSession("data.london.gov.uk") as session: explore = CkanCatExplorer(session) - info_extra = package_list_dataframe_extra('pandas') + info_extra = get_package_list_dataframe_extra('pandas') pprint(info_extra) """ @@ -407,13 +409,13 @@ def get_organisation_list(self) -> Tuple[int, list]: return length, maintainers except (requests.RequestException, Exception) as e: - logger.error(f"Both organization list methods failed: {e}") + logger.error(f"Both organisation list methods failed: {e}") raise # ---------------------------- # Show metadata using a package name # ---------------------------- - def package_show_info_json(self, package_name: Union[str, dict, Any]) -> List[Dict]: + def show_package_info(self, package_name: Union[str, dict, Any]) -> List[Dict]: """ Pass in a package name as a string or as a value from a dictionary. @@ -425,7 +427,7 @@ def package_show_info_json(self, package_name: Union[str, dict, Any]) -> List[Di explore = CkanCatExplorer(session) all_packages = explore.package_list_dictionary() census = all_packages.get("2011-boundary-files") - census_info = explore.package_show_info_json(census) + census_info = explore.show_package_info(census) pprint(census_info) """ @@ -450,12 +452,53 @@ def package_show_info_json(self, package_name: Union[str, dict, Any]) -> List[Di except requests.RequestException as e: raise CatExplorerError(f"Failed to search datasets: {str(e)}") + + def show_package_info_dataframe(self, package_name: Union[str, dict, Any], df_type: Literal["pandas", "polars"]) -> pd.DataFrame | pl.DataFrame: + """ + Pass in a package name as a string or as a value from a dictionary. + + This will return package metadata including resource information and download links for the data. + # Example usage... + if __name__ == "__main__": + with CkanCatSession("data.london.gov.uk") as session: + explore = CkanCatExplorer(session) + all_packages = explore.package_list_dictionary() + census = all_packages.get("2011-boundary-files") + census_info = explore.show_package_info_dataframe(census) + pprint(census_info) + """ + + if package_name is None: + raise ValueError("package name cannot be none") + + base_url = self.cat_session.base_url + CkanApiPaths.PACKAGE_INFO + params = {} + if package_name: + params["id"] = package_name + url = f"{base_url}?{urlencode(params)}" if params else base_url + + try: + response = self.cat_session.session.get(url) + response.raise_for_status() + data = response.json() + result_data = data["result"] + results = self._extract_resource_data(result_data) + + match df_type: + case "pandas": + return pd.DataFrame(results) + case "polars": + return pl.DataFrame(results) + + except requests.RequestException as e: + raise CatExplorerError(f"Failed to search datasets: {str(e)}") + # ---------------------------- # Search Packages and store in DataFrames / or keep as Dicts. # Unpack data or keep it packed (e.g. don't split out resources into own columns) # ---------------------------- - def package_search_json(self, search_query: str, num_rows: int): + def package_search(self, search_query: str, num_rows: int): """ Returns all available data for a particular search query @@ -467,7 +510,7 @@ def package_search_json(self, search_query: str, num_rows: int): def main(): with hc.CatSession(hc.CkanDataCatalogues.LONDON_DATA_STORE) as session: explore = hc.CkanCatExplorer(session) - packages_search = explore.package_search_json("police", 50) + packages_search = explore.package_search("police", 50) print(packages_search) if __name__ =="__main__": @@ -492,7 +535,7 @@ def main(): except requests.RequestException as e: raise CatExplorerError(f"Failed to search datasets: {str(e)}") - def package_search_condense_json_unpacked( + def package_search_condense( self, search_query: str, num_rows: int ) -> Optional[List[Dict]]: """ @@ -517,7 +560,7 @@ def package_search_condense_json_unpacked( def main(): with hc.CatSession(hc.CkanDataCatalogues.LONDON_DATA_STORE) as session: explore = hc.CkanCatExplorer(session) - packages_search = explore.package_search_condense_json_unpacked("police", 50) + packages_search = explore.package_search_condense("police", 50) pprint(packages_search) if __name__ =="__main__": @@ -558,7 +601,7 @@ def main(): except requests.RequestException as e: raise CatExplorerError(f"Failed to search datasets: {str(e)}") - def package_search_condense_dataframe_packed( + def package_search_condense_dataframe( self, search_query: str, num_rows: int, @@ -606,7 +649,7 @@ def package_search_condense_dataframe_packed( if __name__ == "__main__": with CkanCatSession("uk gov") as session: explorer = CkanCatExplorer(session) - results = explorer.package_search_condense_dataframe_packed('police', 500, "polars") + results = explorer.package_search_condense_dataframe('police', 500, "polars") print(results) """ @@ -653,7 +696,7 @@ def package_search_condense_dataframe_packed( except requests.RequestException as e: raise CatExplorerError(f"Failed to search datasets: {str(e)}") - def package_search_condense_dataframe_unpacked( + def package_search_condense_dataframe_unpack( self, search_query: str, num_rows: int, @@ -1058,7 +1101,7 @@ def fetch_all_datasets(self) -> dict | None: # ---------------------------- # Get metadata about specific datasets in the catalogue # ---------------------------- - def show_dataset_info_dict(self, dataset_id): + def show_dataset_info(self, dataset_id): urls = [ self.cat_session.base_url + OpenDataSoftApiPaths.SHOW_DATASET_INFO.format(dataset_id), self.cat_session.base_url + OpenDataSoftApiPaths.SHOW_DATASET_INFO.format(dataset_id), @@ -1079,7 +1122,7 @@ def show_dataset_info_dict(self, dataset_id): # ---------------------------- # Show what export file types are available for a particular dataset # ---------------------------- - def show_dataset_export_options_dict(self, dataset_id): + def show_dataset_export_options(self, dataset_id): urls = [ self.cat_session.base_url + OpenDataSoftApiPaths.SHOW_DATASET_EXPORTS.format(dataset_id), self.cat_session.base_url + OpenDataSoftApiPaths.SHOW_DATASET_EXPORTS_2.format(dataset_id), @@ -1184,7 +1227,10 @@ def get_all_datasets(self) -> dict: Returns: dict: Dictionary with slugs as keys and dataset IDs as values """ + try: + catalogue = FrenchGouvApiPaths.CATALOGUE + with duckdb.connect(':memory:') as con: # Install and load httpfs extension con.execute("INSTALL httpfs;") @@ -1192,11 +1238,11 @@ def get_all_datasets(self) -> dict: # Query to select only id and slug, converting to dict format query = """ SELECT DISTINCT slug, id - FROM read_parquet('https://object.files.data.gouv.fr/hydra-parquet/hydra-parquet/b06842f8ee27a0302ebbaaa344d35e4c.parquet') + FROM read_parquet(?) WHERE slug IS NOT NULL AND id IS NOT NULL """ # Execute query and fetch results - result = con.execute(query).fetchall() + result = con.execute(query, parameters=[catalogue]).fetchall() # Convert results to dictionary datasets = {slug: id for slug, id in result} return datasets @@ -1204,7 +1250,7 @@ def get_all_datasets(self) -> dict: logger.error(f"Error processing parquet file: {str(e)}") return {} - def get_dataset_by_identifier(self, identifier: str) -> dict: + def get_dataset_meta(self, identifier: str) -> dict: """ Fetches a metadata for a specific dataset using either its ID or slug. @@ -1240,6 +1286,41 @@ def get_dataset_by_identifier(self, identifier: str) -> dict: logger.error(f"Error fetching dataset {identifier}: {str(e)}") return {} + def get_dataset_meta_dataframe(self, identifier: str, df_type: Literal["pandas", "polars"]) -> pd.DataFrame | pl.DataFrame: + """ + Fetches a metadata for a specific dataset using either its ID or slug. + + Args: + identifier (str): Dataset ID or slug to fetch + + Returns: + dict: Dataset details or empty dict if not found + + Example identifier: + ID: "674de63d05a9bbeddc66bdc1" + """ + try: + url = self.cat_session.base_url + FrenchGouvApiPaths.SHOW_DATASETS_BY_ID.format(identifier) + response = self.cat_session.session.get(url) + + if response.status_code == 200: + data = response.json() + logger.success(f"Successfully retrieved dataset: {identifier}") + match df_type: + case "pandas": + return pd.DataFrame([data]) + case "polars": + return pl.DataFrame([data]) + elif response.status_code == 404: + logger.warning(f"Dataset not found: {identifier}") + return pd.DataFrame() if df_type == "pandas" else pl.DataFrame() + else: + logger.error(f"Failed to fetch dataset {identifier} with status code {response.status_code}") + return pd.DataFrame() if df_type == "pandas" else pl.DataFrame() + except Exception as e: + logger.error(f"Error fetching dataset {identifier}: {str(e)}") + return pd.DataFrame() if df_type == "pandas" else pl.DataFrame() + def get_datasets_by_identifiers(self, identifiers: list) -> dict: """ Fetches multiple datasets using a list of IDs or slugs. @@ -1254,7 +1335,7 @@ def get_datasets_by_identifiers(self, identifiers: list) -> dict: for identifier in identifiers: try: - dataset = self.get_dataset_by_identifier(identifier) + dataset = self.get_dataset_meta(identifier) if dataset: results[identifier] = dataset @@ -1271,13 +1352,71 @@ def get_datasets_by_identifiers(self, identifiers: list) -> dict: # ---------------------------- # Show available resources for a particular dataset # ---------------------------- - def get_dataset_resources(self, dataset) -> list: - + def get_dataset_resource(self, dataset_id: str, resource_id: str) -> dict: + """ + Fetches metadata for a specific resource within a dataset. + + Args: + dataset_id (str): Dataset ID or slug containing the resource + resource_id (str): Resource ID to fetch + + Returns: + dict: Resource details or empty dict if not found + """ + try: + url = self.cat_session.base_url + FrenchGouvApiPaths.SHOW_DATASET_RESOURCE_BY_ID.format( + dataset_id) + resource_id + + response = self.cat_session.session.get(url) + + if response.status_code == 200: + data = response.json() + logger.success(f"Successfully retrieved resource: {resource_id}") + return data + elif response.status_code == 404: + logger.warning(f"Resource not found: {resource_id}") + return {} + else: + logger.error(f"Failed to fetch resource {resource_id} with status {response.status_code}") + return {} + + except Exception as e: + logger.error(f"Error fetching resource {resource_id}: {str(e)}") + return {} + + def get_dataset_resource_dataframe(self, dataset_id: str, resource_id: str, df_type: Literal["pandas", "polars"]) -> pd.DataFrame | pl.DataFrame: + """ + Fetches metadata for a specific resource within a dataset. + + Args: + dataset_id (str): Dataset ID or slug containing the resource + resource_id (str): Resource ID to fetch + + Returns: + dict: Resource details or empty dict if not found + """ try: - resources = [resource for resource in dataset["resources"]] - return resources - except Exception: - raise + url = self.cat_session.base_url + FrenchGouvApiPaths.SHOW_DATASET_RESOURCE_BY_ID.format( + dataset_id) + resource_id + response = self.cat_session.session.get(url) + + if response.status_code == 200: + data = response.json() + logger.success(f"Successfully retrieved resource: {resource_id}") + match df_type: + case "pandas": + return pd.DataFrame([data]) + case "polars": + return pl.DataFrame([data]) + elif response.status_code == 404: + logger.warning(f"Resource not found: {resource_id}") + return pd.DataFrame() if df_type == "pandas" else pl.DataFrame() + else: + logger.error(f"Failed to fetch resource {resource_id} with status {response.status_code}") + return pd.DataFrame() if df_type == "pandas" else pl.DataFrame() + except Exception as e: + logger.error(f"Error fetching resource {resource_id}: {str(e)}") + return pd.DataFrame() if df_type == "pandas" else pl.DataFrame() # ---------------------------- # Show all organisation available @@ -1289,8 +1428,8 @@ def get_all_orgs(self) -> dict: Returns: dict: Dictionary with orgs as keys and org IDs as values """ - try: + catalogue = FrenchGouvApiPaths.CATALOGUE with duckdb.connect(':memory:') as con: # Install and load httpfs extension con.execute("INSTALL httpfs;") @@ -1298,11 +1437,11 @@ def get_all_orgs(self) -> dict: # Query to select only id and slug, converting to dict format query = """ SELECT DISTINCT organization, organization_id - FROM read_parquet('https://object.files.data.gouv.fr/hydra-parquet/hydra-parquet/b06842f8ee27a0302ebbaaa344d35e4c.parquet') + FROM read_parquet(?) WHERE organization IS NOT NULL AND organization_id IS NOT NULL """ # Execute query and fetch results - result = con.execute(query).fetchall() + result = con.execute(query, parameters=[catalogue]).fetchall() # Convert results to dictionary organisations = {organization: organization_id for organization, organization_id in result} return organisations