From 7e69aaaf18c3d4ce3e86fb1ebbcfb8304a6ba6ab Mon Sep 17 00:00:00 2001 From: romainsacchi Date: Thu, 21 Mar 2024 18:14:48 +0000 Subject: [PATCH] Black reformating --- dev/timing.py | 2 +- pathways/data_validation.py | 4 +- pathways/filesystem_constants.py | 1 + pathways/pathways.py | 144 +++++++++++++++++-------------- 4 files changed, 82 insertions(+), 69 deletions(-) diff --git a/dev/timing.py b/dev/timing.py index 8090d50..5237459 100644 --- a/dev/timing.py +++ b/dev/timing.py @@ -13,7 +13,7 @@ ], regions=[ "WEU", - #"USA", + # "USA", ], scenarios=[scenario], years=[ diff --git a/pathways/data_validation.py b/pathways/data_validation.py index 6c727cb..4c7a846 100644 --- a/pathways/data_validation.py +++ b/pathways/data_validation.py @@ -57,10 +57,10 @@ def validate_datapackage(datapackage: datapackage.DataPackage): dataframe = pd.DataFrame(data, columns=headers) # Check that the scenario data is valid - #validate_scenario_data(dataframe) + # validate_scenario_data(dataframe) # Check that the mapping is valid - #validate_mapping(datapackage.get_resource("mapping"), dataframe) + # validate_mapping(datapackage.get_resource("mapping"), dataframe) # Check that the LCA data is valid # validate_lca_data(datapackage) diff --git a/pathways/filesystem_constants.py b/pathways/filesystem_constants.py index d17411b..6bf5e90 100644 --- a/pathways/filesystem_constants.py +++ b/pathways/filesystem_constants.py @@ -3,6 +3,7 @@ """ from pathlib import Path + import platformdirs # Directories for data which comes with Pathways diff --git a/pathways/pathways.py b/pathways/pathways.py index 8f7620e..9a378db 100644 --- a/pathways/pathways.py +++ b/pathways/pathways.py @@ -5,11 +5,11 @@ """ import csv +import uuid from collections import defaultdict from multiprocessing import Pool, cpu_count from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union -import uuid import bw2calc as bc import numpy as np @@ -21,12 +21,9 @@ from numpy import dtype, ndarray from premise.geomap import Geomap -from .filesystem_constants import DATA_DIR, DIR_CACHED_DB from .data_validation import validate_datapackage -from .lca import ( - get_lca_matrices, - remove_double_counting, -) +from .filesystem_constants import DATA_DIR, DIR_CACHED_DB +from .lca import get_lca_matrices, remove_double_counting from .lcia import get_lcia_method_names from .utils import ( _get_activity_indices, @@ -35,8 +32,8 @@ get_unit_conversion_factors, harmonize_units, load_classifications, + load_numpy_array_from_disk, load_units_conversion, - load_numpy_array_from_disk ) @@ -82,12 +79,12 @@ def get_visible_files(path): def resize_scenario_data( - scenario_data: xr.DataArray, - model: List[str], - scenario: List[str], - region: List[str], - year: List[int], - variables: List[str], + scenario_data: xr.DataArray, + model: List[str], + scenario: List[str], + region: List[str], + year: List[int], + variables: List[str], ): """ Resize the scenario data to the given scenario, year, region, and variables. @@ -186,12 +183,18 @@ def fetch_inventories_locations(A_index: Dict[str, Tuple[str, str, str]]) -> Lis return list(set([act[3] for act in A_index])) -def group_technosphere_indices_by_category(technosphere_index, reverse_classifications, lca_results_coords) -> Tuple: +def group_technosphere_indices_by_category( + technosphere_index, reverse_classifications, lca_results_coords +) -> Tuple: # Generate a list of activity indices for each activity category acts_idx = [] acts_dict = {} for cat in lca_results_coords["act_category"].values: - x = [int(technosphere_index[a]) for a in reverse_classifications[cat] if a in technosphere_index] + x = [ + int(technosphere_index[a]) + for a in reverse_classifications[cat] + if a in technosphere_index + ] acts_idx.append(x) acts_dict[cat] = x @@ -204,6 +207,7 @@ def group_technosphere_indices_by_category(technosphere_index, reverse_classific # Swap the axes of acts_idx to align with the dimensionality of D return acts_idx, acts_dict, np.swapaxes(acts_idx, 0, 1) + def group_technosphere_indices_by_location(technosphere_index, locations) -> Tuple: """ Group the technosphere indices by location. @@ -215,11 +219,7 @@ def group_technosphere_indices_by_location(technosphere_index, locations) -> Tup act_indices_list = [] act_indices_dict = {} for loc in locations: - x = [ - int(technosphere_index[a]) - for a in technosphere_index - if a[-1] == loc - ] + x = [int(technosphere_index[a]) for a in technosphere_index if a[-1] == loc] act_indices_list.append(x) act_indices_dict[loc] = x @@ -227,10 +227,7 @@ def group_technosphere_indices_by_location(technosphere_index, locations) -> Tup # Pad each list in act_indices_list with -1 to make them all the same length max_len = max([len(x) for x in act_indices_list]) act_indices_array = np.array( - [ - np.pad(x, (0, max_len - len(x)), constant_values=-1) - for x in act_indices_list - ] + [np.pad(x, (0, max_len - len(x)), constant_values=-1) for x in act_indices_list] ) return act_indices_list, act_indices_dict, act_indices_array @@ -284,13 +281,13 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int # If the total demand is zero, return None if ( - demand - / scenarios.sel( - region=region, - model=model, - pathway=scenario, - year=year, - ).sum(dim="variables") + demand + / scenarios.sel( + region=region, + model=model, + pathway=scenario, + year=year, + ).sum(dim="variables") ) < demand_cutoff: continue @@ -374,22 +371,26 @@ def _calculate_year(args): location_to_index = {location: index for index, location in enumerate(locations)} reverse_technosphere_index = {int(v): k for k, v in technosphere_indices.items()} loc_idx = np.array( - [ - location_to_index[act[-1]] + [ + location_to_index[act[-1]] for act in reverse_technosphere_index.values() if act[-1] in locations - ] - ) + ] + ) - acts_category_idx_list, acts_category_idx_dict, acts_category_idx_array = group_technosphere_indices_by_category( - technosphere_indices, - reverse_classifications, - lca_results.coords, + acts_category_idx_list, acts_category_idx_dict, acts_category_idx_array = ( + group_technosphere_indices_by_category( + technosphere_indices, + reverse_classifications, + lca_results.coords, + ) ) - acts_location_idx_list, acts_location_idx_dict, acts_location_idx_array = group_technosphere_indices_by_location( - technosphere_indices, - locations, + acts_location_idx_list, acts_location_idx_dict, acts_location_idx_array = ( + group_technosphere_indices_by_location( + technosphere_indices, + locations, + ) ) results["other"] = { @@ -607,24 +608,24 @@ def get_scenarios(self, scenario_data: pd.DataFrame) -> xr.DataArray: units[variable] = scenario_data[ scenario_data["variables"] == self.mapping[variable]["scenario variable"] - ].iloc[0]["unit"] + ].iloc[0]["unit"] data.attrs["units"] = units return data def calculate( - self, - methods: Optional[List[str]] = None, - models: Optional[List[str]] = None, - scenarios: Optional[List[str]] = None, - regions: Optional[List[str]] = None, - years: Optional[List[int]] = None, - variables: Optional[List[str]] = None, - characterization: bool = True, - flows: Optional[List[str]] = None, - multiprocessing: bool = False, - demand_cutoff: float = 1e-3, + self, + methods: Optional[List[str]] = None, + models: Optional[List[str]] = None, + scenarios: Optional[List[str]] = None, + regions: Optional[List[str]] = None, + years: Optional[List[int]] = None, + variables: Optional[List[str]] = None, + characterization: bool = True, + flows: Optional[List[str]] = None, + multiprocessing: bool = False, + demand_cutoff: float = 1e-3, ) -> dict[Any, Any] | dict[int | Any, dict[Any, dict[str, Any]] | None]: """ Calculate Life Cycle Assessment (LCA) results for given methods, models, scenarios, regions, and years. @@ -742,8 +743,14 @@ def calculate( # Process each region in parallel with Pool(cpu_count()) as p: # store th results as a dictionary with years as keys - results.update({(model, scenario, year): result for year, result in - zip(years, p.map(_calculate_year, args))}) + results.update( + { + (model, scenario, year): result + for year, result in zip( + years, p.map(_calculate_year, args) + ) + } + ) else: results = { (model, scenario, year): _calculate_year( @@ -777,9 +784,14 @@ def fill_in_result_array(self, results: dict): # Assuming DIR_CACHED_DB, results, and self.lca_results are already defined # Pre-loading data from disk if possible - cached_data = {data["id_array"]: load_numpy_array_from_disk(DIR_CACHED_DB / f"{data['id_array']}.npy") - for coord, result in results.items() - for region, data in result.items() if region != "other"} + cached_data = { + data["id_array"]: load_numpy_array_from_disk( + DIR_CACHED_DB / f"{data['id_array']}.npy" + ) + for coord, result in results.items() + for region, data in result.items() + if region != "other" + } # use pyprint to display progress bar = pyprind.ProgBar(len(results)) for coord, result in results.items(): @@ -817,17 +829,17 @@ def fill_in_result_array(self, results: dict): "year": year, "act_category": cat, "location": loc, - "variable": list(variables.keys()) + "variable": list(variables.keys()), } ] = summed_data def characterize_planetary_boundaries( - self, - models: Optional[List[str]] = None, - scenarios: Optional[List[str]] = None, - regions: Optional[List[str]] = None, - years: Optional[List[int]] = None, - variables: Optional[List[str]] = None, + self, + models: Optional[List[str]] = None, + scenarios: Optional[List[str]] = None, + regions: Optional[List[str]] = None, + years: Optional[List[int]] = None, + variables: Optional[List[str]] = None, ): self.calculate( models=models,