diff --git a/pathways/lca.py b/pathways/lca.py index db78274..b7999fb 100644 --- a/pathways/lca.py +++ b/pathways/lca.py @@ -1,11 +1,11 @@ import csv -import yaml import logging from pathlib import Path from typing import Any, Dict, List, Tuple import bw_processing as bwp import numpy as np +import yaml from bw_processing import Datapackage from scipy import sparse from scipy.sparse import csr_matrix @@ -52,6 +52,7 @@ def read_indices_csv(file_path: Path) -> Dict[Tuple[str, str, str, str], str]: indices[(row[0], row[1], row[2], row[3])] = row[4] return indices + def load_matrix_and_index( file_path: Path, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: @@ -94,6 +95,7 @@ def load_matrix_and_index( return data_array, indices_array, flip_array, distributions_array + def get_matrix_arrays( dirpath: Path, matrix_type: str, @@ -114,6 +116,7 @@ def get_matrix_arrays( return [data, indices, sign, distributions] + def get_indices( dirpath: Path, ) -> Tuple[Dict, Dict]: @@ -130,6 +133,7 @@ def get_indices( return A_indices, B_indices + def get_lca_matrices( A_arrays: list, B_arrays: list, @@ -165,6 +169,7 @@ def get_lca_matrices( return dp + def get_subshares_matrix( correlated_array: list, ) -> Datapackage: @@ -198,7 +203,7 @@ def adjust_matrix_based_on_shares(A_arrays, shares_dict, use_distributions, year data_array, indices_array, sign_array, _ = A_arrays - index_lookup = {(row['row'], row['col']): i for i, row in enumerate(indices_array)} + index_lookup = {(row["row"], row["col"]): i for i, row in enumerate(indices_array)} modified_data = [] modified_indices = [] @@ -209,8 +214,8 @@ def adjust_matrix_based_on_shares(A_arrays, shares_dict, use_distributions, year for _, regions in shares_dict.items(): for _, techs in regions.items(): for _, details in techs.items(): - if 'idx' in details: - unique_product_indices_from_dict.add(details['idx']) + if "idx" in details: + unique_product_indices_from_dict.add(details["idx"]) # Helper function to find index using the lookup dictionary def find_index(activity_idx, product_idx): @@ -218,21 +223,27 @@ def find_index(activity_idx, product_idx): for tech_category, regions in shares_dict.items(): for region, techs in regions.items(): - all_tech_indices = [techs[tech]['idx'] for tech in techs if techs[tech]['idx'] is not None] + all_tech_indices = [ + techs[tech]["idx"] for tech in techs if techs[tech]["idx"] is not None + ] all_product_indices = set() tech_indices = np.isin(indices_array['row'], all_tech_indices) all_product_indices.update(indices_array['col'][tech_indices]) - + for product_idx in all_product_indices: # Vectorized operation to calculate total_output for each product_idx - relevant_indices = [find_index(tech_idx, product_idx) for tech_idx in all_tech_indices if - find_index(tech_idx, product_idx) is not None and tech_idx != product_idx] + relevant_indices = [ + find_index(tech_idx, product_idx) + for tech_idx in all_tech_indices + if find_index(tech_idx, product_idx) is not None + and tech_idx != product_idx + ] total_output = np.sum(data_array[relevant_indices]) for tech, details in techs.items(): - share = details.get(year, {}).get('value', 0) - idx = details['idx'] + share = details.get(year, {}).get("value", 0) + idx = details["idx"] if idx is None or share == 0: continue @@ -241,13 +252,19 @@ def find_index(activity_idx, product_idx): index = find_index(idx, product_idx) # Adjust value or add new exchange - if index is not None and product_idx not in unique_product_indices_from_dict: # Exclude diagonal and undesired exchanges + if ( + index is not None + and product_idx not in unique_product_indices_from_dict + ): # Exclude diagonal and undesired exchanges data_array[index] = new_amount # Append to modified_indices regardless of whether it's a new addition or an adjustment modified_indices.append((idx, product_idx)) modified_data.append(new_amount) modified_signs.append(sign_array[index]) elif product_idx not in unique_product_indices_from_dict: # Exclude diagonal and undesired exchanges + elif ( + product_idx not in unique_product_indices_from_dict + ): # Exclude diagonal and undesired exchanges modified_data.append(new_amount) modified_indices.append((idx, product_idx)) modified_signs.append(True) # CHECK: I am assuming new exchanges are always positive @@ -258,6 +275,7 @@ def find_index(activity_idx, product_idx): return [modified_data_array, modified_indices_array, modified_signs_array] + def fill_characterization_factors_matrices( biosphere_flows: dict, methods, biosphere_dict, debug=False ) -> csr_matrix: @@ -340,6 +358,7 @@ def fill_characterization_factors_matrices( # # return characterized_inventory.tocsr() + def remove_double_counting(A: csr_matrix, vars_info: dict) -> csr_matrix: """ Remove double counting from a technosphere matrix. @@ -362,4 +381,4 @@ def remove_double_counting(A: csr_matrix, vars_info: dict) -> csr_matrix: A_coo.data[row_mask & ~col_mask] = 0 A_coo.eliminate_zeros() - return A_coo.tocsr() \ No newline at end of file + return A_coo.tocsr() diff --git a/pathways/pathways.py b/pathways/pathways.py index 46e5551..93b2235 100644 --- a/pathways/pathways.py +++ b/pathways/pathways.py @@ -41,13 +41,13 @@ clean_cache_directory, create_lca_results_array, display_results, + get_dirpath, get_unit_conversion_factors, harmonize_units, - load_subshares, load_classifications, load_numpy_array_from_disk, + load_subshares, load_units_conversion, - get_dirpath ) warnings.filterwarnings("ignore") @@ -134,11 +134,11 @@ def resize_scenario_data( return scenario_data + def subshares_indices(regions, A_index, geo): """ Fetch the indices in the technosphere matrix from the activities in technologies_shares.yaml in the given regions. - others shares are resized. :param regions: List of regions :param A_index: Dictionary with the indices of the activities in the technosphere matrix. :param geo: Geomap object. @@ -168,19 +168,18 @@ def subshares_indices(regions, A_index, geo): if region not in indices_dict[tech_category]: indices_dict[tech_category][region] = {} indices_dict[tech_category][region][tech_type] = { - 'idx': activity_index, - 2020: { - "value": value_2020 - }, + "idx": activity_index, + 2020: {"value": value_2020}, 2050: { "min": min_2050, "max": max_2050, - "distribution": distribution_2050 + "distribution": distribution_2050, }, } return indices_dict + def fetch_indices(mapping, regions, variables, A_index, geo): """ Fetch the indices for the given activities in @@ -239,7 +238,6 @@ def fetch_indices(mapping, regions, variables, A_index, geo): return vars_idx - def fetch_inventories_locations(A_index: Dict[str, Tuple[str, str, str]]) -> List[str]: """ Fetch the locations of the inventories. @@ -435,7 +433,7 @@ def _calculate_year(args): reverse_classifications, debug, use_distributions, - subshares + subshares, ) = args print(f"------ Calculating LCA results for {year}...") @@ -552,15 +550,12 @@ def _calculate_year(args): lca = MonteCarloLCA( demand={0: 1}, - data_objs=[ - bw_datapackage, bw_correlated - ], + data_objs=[bw_datapackage, bw_correlated], use_distributions=True, use_arrays=True, ) lca.lci() - characterization_matrix = fill_characterization_factors_matrices( biosphere_indices, methods, lca.dicts.biosphere, debug ) @@ -941,7 +936,7 @@ def calculate( self.reverse_classifications, self.debug, use_distributions, - subshares + subshares, ) for year in years ] @@ -976,7 +971,7 @@ def calculate( self.reverse_classifications, self.debug, use_distributions, - subshares + subshares, ) ) for year in years diff --git a/pathways/utils.py b/pathways/utils.py index 25e0818..2305204 100644 --- a/pathways/utils.py +++ b/pathways/utils.py @@ -1,10 +1,10 @@ +import math from pathlib import Path from typing import Any, Dict, List, Tuple, Union import numpy as np import xarray as xr import yaml -import math from .filesystem_constants import DATA_DIR, DIR_CACHED_DB @@ -14,10 +14,10 @@ def get_dirpath( - datapackage: str, - model: str, - scenario: str, - year: int, + datapackage: str, + model: str, + scenario: str, + year: int, ) -> Path: """ Get the directory path for a specific year. @@ -27,13 +27,16 @@ def get_dirpath( :rtype: Path """ - dirpath = Path(datapackage).parent / "inventories" / model.lower() / scenario / str(year) + dirpath = ( + Path(datapackage).parent / "inventories" / model.lower() / scenario / str(year) + ) if not dirpath.exists(): raise FileNotFoundError(f"Directory {dirpath} does not exist.") return dirpath + def load_classifications(): """Load the activities classifications.""" @@ -302,6 +305,7 @@ def load_numpy_array_from_disk(filepath): return np.load(filepath, allow_pickle=True) + def load_subshares() -> dict: """ Load a YAML file and return its content as a Python dictionary. @@ -314,6 +318,7 @@ def load_subshares() -> dict: adjust_subshares(data) return data + def adjust_subshares(data): """ Adjust the subshares data to ensure that the sum of the 2020 values is equal to 1, after neglecting the technologies @@ -331,9 +336,9 @@ def adjust_subshares(data): for subcategory, tech_list in technologies.items(): for tech in tech_list: if 2020 in tech: - value = tech[2020].get('value', 0) + value = tech[2020].get("value", 0) total_2020_value += value - if tech.get('name') is not None: + if tech.get("name") is not None: total_adjustable_value += value # Skip adjustment if no values or all values are named @@ -346,14 +351,16 @@ def adjust_subshares(data): adjusted_total = 0 for subcategory, tech_list in technologies.items(): for tech in tech_list: - if 2020 in tech and tech.get('name') is not None: - tech[2020]['value'] = tech[2020]['value'] * adjustment_factor - adjusted_total += tech[2020]['value'] + if 2020 in tech and tech.get("name") is not None: + tech[2020]["value"] = tech[2020]["value"] * adjustment_factor + adjusted_total += tech[2020]["value"] # Check if the adjusted total is close to 1.00, allowing a small margin for floating-point arithmetic if not math.isclose(adjusted_total, 1.00, rel_tol=1e-9): print( - f"Warning: Total of adjusted '2020' values in category '{category}' does not add up to 1.00 (Total: {adjusted_total})") + f"Warning: Total of adjusted '2020' values in category '{category}' does not add up to 1.00 (Total: {adjusted_total})" + ) + def get_visible_files(path): return [file for file in Path(path).iterdir() if not file.name.startswith(".")]