diff --git a/pathways/data_validation.py b/pathways/data_validation.py index 98517b7..a2f9e1f 100644 --- a/pathways/data_validation.py +++ b/pathways/data_validation.py @@ -92,7 +92,7 @@ def validate_scenario_data(dataframe: pd.DataFrame) -> bool: - year: integer - value: float - :param resource: Datapackage resource. + :param dataframe: pandas DataFrame containing the scenario data. :return: True if the data is valid, False otherwise. """ @@ -106,7 +106,7 @@ def validate_scenario_data(dataframe: pd.DataFrame) -> bool: return True -def validate_mapping(resource: datapackage.Resource, dataframe: pd.DataFrame): +def validate_mapping(resource: datapackage.Resource): """ Validates the mapping between scenario variables and LCA datasets. The mapping must be a YAML file. @@ -116,7 +116,7 @@ def validate_mapping(resource: datapackage.Resource, dataframe: pd.DataFrame): dataset: string scenario variable: string - :param filepath: relative path to the mapping file. + :param resource: datapackage.Resource :return: boolean """ diff --git a/pathways/lca.py b/pathways/lca.py index 16ddddf..c754f5f 100644 --- a/pathways/lca.py +++ b/pathways/lca.py @@ -1,26 +1,30 @@ +""" +This module contains functions to calculate the Life Cycle Assessment (LCA) results for a given model, scenario, and year. + +""" + + import csv import logging +import uuid from pathlib import Path from typing import Any, Dict, List, Tuple +import bw2calc as bc import bw_processing as bwp import numpy as np +import pyprind +from bw2calc import MonteCarloLCA from bw_processing import Datapackage +from numpy import ndarray, dtype from scipy import sparse from scipy.sparse import csr_matrix -from .lcia import get_lcia_methods - -# Attempt to import pypardiso's spsolve function. -# If it isn't available, we fall back on scipy's spsolve. -try: - from pypardiso import spsolve - - print("Solver: pypardiso") -except ImportError: - from scikits.umfpack import spsolve +from .filesystem_constants import DIR_CACHED_DB +from .lcia import fill_characterization_factors_matrices +from .pathways import _group_technosphere_indices +from .utils import get_unit_conversion_factors, fetch_indices, check_unclassified_activities - print("Solver: scikits.umfpack") logging.basicConfig( level=logging.DEBUG, @@ -31,7 +35,7 @@ ) -def read_indices_csv(file_path: Path) -> Dict[Tuple[str, str, str, str], str]: +def read_indices_csv(file_path: Path) -> dict[tuple[str, str, str, str], int]: """ Reads a CSV file and returns its contents as a dictionary. @@ -42,6 +46,8 @@ def read_indices_csv(file_path: Path) -> Dict[Tuple[str, str, str, str], str]: :type file_path: Path :return: A dictionary mapping tuples of four strings to indices. + For technosphere indices, the four strings are the activity name, product name, location, and unit. + For biosphere indices, the four strings are the flow name, category, subcategory, and unit. :rtype: Dict[Tuple[str, str, str, str], str] """ indices = dict() @@ -104,8 +110,8 @@ def get_lca_matrices( """ Retrieve Life Cycle Assessment (LCA) matrices from disk. - :param datapackage: The path to the datapackage. - :type datapackage: str + :param filepaths: A list of filepaths containing the LCA matrices. + :type filepaths: List[str] :param model: The name of the model. :type model: str :param scenario: The name of the scenario. @@ -163,56 +169,6 @@ def select_filepath(keyword: str, fps): return dp, technosphere_inds, biosphere_inds -def fill_characterization_factors_matrices( - methods: list, biosphere_matrix_dict: dict, biosphere_dict: dict, debug=False -) -> csr_matrix: - """ - Create one CSR matrix for all LCIA method, with the last dimension being the index of the method - :param methods: contains names of the methods to use. - :param biosphere_matrix_dict: dictionary with biosphere flows and their indices - :param debug: if True, log debug information - :return: a sparse matrix with the characterization factors - """ - - lcia_data = get_lcia_methods(methods=methods) - - # Prepare data for efficient creation of the sparse matrix - data = [] - rows = [] - cols = [] - cfs = [] - - for m, method in enumerate(methods): - method_data = lcia_data[method] - - for flow_name in method_data: - if flow_name in biosphere_dict: - idx = biosphere_dict[flow_name] - if idx in biosphere_matrix_dict: - data.append(method_data[flow_name]) - rows.append(biosphere_matrix_dict[idx]) - cols.append(m) - cfs.append((method, flow_name, idx, method_data[flow_name])) - - # Efficiently create the sparse matrix - matrix = sparse.csr_matrix( - (data, (cols, rows)), - shape=(len(methods), len(biosphere_matrix_dict)), - dtype=np.float64, - ) - - if debug: - # sort l by method and flow - cfs = sorted(cfs, key=lambda x: (x[0], x[1])) - for x in cfs: - method, flow, f, value = x - logging.info( - f"LCIA method: {method}, Flow: {flow}, Index: {f}, Value: {value}" - ) - - return matrix - - def remove_double_counting( characterized_inventory: csr_matrix, vars_info: dict, activity_idx: int ) -> csr_matrix: @@ -224,6 +180,9 @@ def remove_double_counting( :param vars_info: Dictionary with information about which indices to zero out. :param activity_idx: Index of the activity being evaluated, which should not be zeroed out. :return: Characterized inventory with double counting removed for all but the evaluated activity. + + TODO: This function is not used in the current implementation. It was used in the previous implementation. Needs improvement. + """ print("Removing double counting") @@ -250,3 +209,252 @@ def remove_double_counting( characterized_inventory[:, idx] = 0 return characterized_inventory.tocsr() + + +def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int]]: + """ + Process the region data. + :param data: Tuple containing the model, scenario, year, region, variables, vars_idx, scenarios, units_map, + demand_cutoff, lca, characterization_matrix, debug, use_distributions. + :return: Dictionary containing the region data. + """ + ( + model, + scenario, + year, + region, + variables, + vars_idx, + scenarios, + units_map, + demand_cutoff, + lca, + characterization_matrix, + debug, + use_distributions, + ) = data + + variables_demand = {} + d = [] + + for v, variable in enumerate(variables): + idx, dataset = vars_idx[variable]["idx"], vars_idx[variable]["dataset"] + # Compute the unit conversion vector for the given activities + dataset_unit = dataset[2] + unit_vector = get_unit_conversion_factors( + scenarios.attrs["units"][variable], + dataset_unit, + units_map, + ) + + # Fetch the demand for the given region, model, pathway, and year + demand = scenarios.sel( + variables=variable, + region=region, + model=model, + pathway=scenario, + year=year, + ) + + # If the demand is below the cut-off criteria, skip to the next iteration + share = demand / scenarios.sel( + region=region, + model=model, + pathway=scenario, + year=year, + ).sum(dim="variables") + + # If the total demand is zero, return None + if share < demand_cutoff: + continue + + variables_demand[variable] = { + "id": idx, + "demand": demand.values * float(unit_vector), + } + + lca.lci(demand={idx: demand.values * float(unit_vector)}) + + if use_distributions == 0: + # Regular LCA + characterized_inventory = ( + characterization_matrix @ lca.inventory + ).toarray() + + else: + # Use distributions for LCA calculations + # next(lca) is a generator that yields the inventory matrix + results = np.array( + [ + (characterization_matrix @ lca.inventory).toarray() + for _ in zip(range(use_distributions), lca) + ] + ) + + # calculate quantiles along the first dimension + characterized_inventory = np.quantile(results, [0.05, 0.5, 0.95], axis=0) + + d.append(characterized_inventory) + + if debug: + logging.info( + f"var.: {variable}, name: {dataset[0][:50]}, " + f"ref.: {dataset[1]}, unit: {dataset[2][:50]}, idx: {idx}," + f"loc.: {dataset[3]}, demand: {round(float(demand.values * float(unit_vector)), 2)}, " + f"unit conv.: {unit_vector}, " + f"impact: {np.round(characterized_inventory.sum(axis=-1) / (demand.values * float(unit_vector)), 3)}. " + ) + + # Save the characterization vectors to disk + id_array = uuid.uuid4() + np.save(file=DIR_CACHED_DB / f"{id_array}.npy", arr=np.stack(d)) + + # just making sure that the memory is freed. Maybe not needed- check later + del d + + # returning a dictionary containing the id_array and the variables + # to be able to fetch them back later + return { + "id_array": id_array, + "variables": {k: v["demand"] for k, v in variables_demand.items()}, + } + + +def _calculate_year(args: tuple): + """ + Prepares the data for the calculation of LCA results for a given year + and calls the process_region function to calculate the results for each region. + """ + ( + model, + scenario, + year, + regions, + variables, + methods, + demand_cutoff, + filepaths, + mapping, + units, + lca_results, + classifications, + scenarios, + reverse_classifications, + debug, + use_distributions, + ) = args + + print(f"------ Calculating LCA results for {year}...") + if debug: + logging.info( + f"############################### " + f"{model}, {scenario}, {year} " + f"###############################" + ) + + geo = Geomap(model=model) + + # Try to load LCA matrices for the given model, scenario, and year + try: + bw_datapackage, technosphere_indices, biosphere_indices = get_lca_matrices( + filepaths, model, scenario, year + ) + + except FileNotFoundError: + # If LCA matrices can't be loaded, skip to the next iteration + if debug: + logging.warning(f"Skipping {model}, {scenario}, {year}, as data not found.") + return + + # Fetch indices + vars_info = fetch_indices(mapping, regions, variables, technosphere_indices, geo) + + # Remove contribution from activities in other activities + # A = remove_double_counting(A, vars_info) + + # check unclassified activities + missing_classifications = check_unclassified_activities( + technosphere_indices, classifications + ) + + if missing_classifications: + if debug: + logging.warning( + f"{len(missing_classifications)} activities are not found in the classifications." + "See missing_classifications.csv for more details." + ) + + results = {} + + locations = lca_results.coords["location"].values.tolist() + + acts_category_idx_dict = _group_technosphere_indices( + technosphere_indices=technosphere_indices, + group_by=lambda x: classifications.get(x[:3], "unclassified"), + group_values=list(set(lca_results.coords["act_category"].values)), + ) + + acts_location_idx_dict = _group_technosphere_indices( + technosphere_indices=technosphere_indices, + group_by=lambda x: x[-1], + group_values=locations, + ) + + results["other"] = { + "acts_category_idx_dict": acts_category_idx_dict, + "acts_location_idx_dict": acts_location_idx_dict, + } + + if use_distributions == 0: + lca = bc.LCA( + demand={0: 1}, + data_objs=[ + bw_datapackage, + ], + ) + lca.lci(factorize=True) + else: + lca = MonteCarloLCA( + demand={0: 1}, + data_objs=[ + bw_datapackage, + ], + use_distributions=True, + ) + lca.lci() + + characterization_matrix = fill_characterization_factors_matrices( + methods=methods, + biosphere_matrix_dict=lca.dicts.biosphere, + biosphere_dict=biosphere_indices, + debug=debug, + ) + + if debug: + logging.info( + f"Characterization matrix created. Shape: {characterization_matrix.shape}" + ) + + bar = pyprind.ProgBar(len(regions)) + for region in regions: + bar.update() + # Iterate over each region + results[region] = process_region( + ( + model, + scenario, + year, + region, + variables, + vars_info[region], + scenarios, + units, + demand_cutoff, + lca, + characterization_matrix, + debug, + use_distributions, + ) + ) + + return results diff --git a/pathways/lcia.py b/pathways/lcia.py index 085e4f3..ec76178 100644 --- a/pathways/lcia.py +++ b/pathways/lcia.py @@ -1,4 +1,13 @@ +""" +This module contains functions to list, and LCIA methods and fill the LCIA characterization matrix. +""" + import json +import logging + +import numpy as np +from scipy import sparse +from scipy.sparse import csr_matrix from .filesystem_constants import DATA_DIR @@ -41,3 +50,54 @@ def get_lcia_methods(methods: list = None): data = [x for x in data if " - ".join(x["name"]) in methods] return {" - ".join(x["name"]): format_lcia_method_exchanges(x) for x in data} + + +def fill_characterization_factors_matrices( + methods: list, biosphere_matrix_dict: dict, biosphere_dict: dict, debug=False +) -> csr_matrix: + """ + Create one CSR matrix for all LCIA method, with the last dimension being the index of the method + :param methods: contains names of the LCIA methods to use (e.g., ["IPCC 2021, Global wArming Potential"]). + :param biosphere_matrix_dict: dictionary with biosphere flows and their indices in bw2calc's matrix + :param biosphere_dict: dictionary with biosphere flows and their indices in the biosphere matrix (not bw2calc's matrix) + :param debug: if True, log debug information + :return: a sparse matrix with the characterization factors + """ + + lcia_data = get_lcia_methods(methods=methods) + + # Prepare data for efficient creation of the sparse matrix + data = [] + rows = [] + cols = [] + cfs = [] + + for m, method in enumerate(methods): + method_data = lcia_data[method] + + for flow_name in method_data: + if flow_name in biosphere_dict: + idx = biosphere_dict[flow_name] + if idx in biosphere_matrix_dict: + data.append(method_data[flow_name]) + rows.append(biosphere_matrix_dict[idx]) + cols.append(m) + cfs.append((method, flow_name, idx, method_data[flow_name])) + + # Efficiently create the sparse matrix + matrix = sparse.csr_matrix( + (data, (cols, rows)), + shape=(len(methods), len(biosphere_matrix_dict)), + dtype=np.float64, + ) + + if debug: + # sort l by method and flow + cfs = sorted(cfs, key=lambda x: (x[0], x[1])) + for x in cfs: + method, flow, f, value = x + logging.info( + f"LCIA method: {method}, Flow: {flow}, Index: {f}, Value: {value}" + ) + + return matrix diff --git a/pathways/pathways.py b/pathways/pathways.py index 0584816..7f28275 100644 --- a/pathways/pathways.py +++ b/pathways/pathways.py @@ -4,70 +4,41 @@ LCA datasets, and LCA matrices. """ -import csv import logging -import uuid import warnings from collections import defaultdict from multiprocessing import Pool, cpu_count -from typing import Any, List, Optional, Tuple +from typing import List, Optional -import bw2calc as bc +import datapackage import numpy as np +import pandas import pandas as pd import pyprind import xarray as xr import yaml -from bw2calc.monte_carlo import MonteCarloLCA from datapackage import DataPackage -from numpy import dtype, ndarray -from premise.geomap import Geomap from .data_validation import validate_datapackage from .filesystem_constants import DATA_DIR, DIR_CACHED_DB -from .lca import fill_characterization_factors_matrices, get_lca_matrices +from .lca import get_lca_matrices, _calculate_year from .lcia import get_lcia_method_names from .utils import ( clean_cache_directory, create_lca_results_array, display_results, - fetch_indices, fetch_inventories_locations, - get_unit_conversion_factors, harmonize_units, load_classifications, load_numpy_array_from_disk, load_units_conversion, - resize_scenario_data, -) + resize_scenario_data, ) # remove warnings warnings.filterwarnings("ignore") -def check_unclassified_activities( - technosphere_indices: dict, classifications: dict -) -> List: - """ - Check if there are activities in the technosphere matrix that are not in the classifications. - :param technosphere_indices: List of activities in the technosphere matrix. - :param classifications: Dictionary of activities classifications. - :return: List of activities not found in the classifications. - """ - missing_classifications = [] - for act in technosphere_indices: - if act[:3] not in classifications: - missing_classifications.append(list(act[:3])) - - if missing_classifications: - with open("missing_classifications.csv", "a") as f: - writer = csv.writer(f) - writer.writerows(missing_classifications) - - return missing_classifications - - -def group_technosphere_indices( +def _group_technosphere_indices( technosphere_indices: dict, group_by, group_values: list ) -> dict: """ @@ -93,266 +64,59 @@ def group_technosphere_indices( return acts_dict -def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int]]: - """ - Process the region data. - :param data: Tuple containing the model, scenario, year, region, variables, vars_idx, scenarios, units_map, - demand_cutoff, lca, characterization_matrix, debug, use_distributions. - :return: Dictionary containing the region data. +def _get_mapping(data) -> dict: """ - ( - model, - scenario, - year, - region, - variables, - vars_idx, - scenarios, - units_map, - demand_cutoff, - lca, - characterization_matrix, - debug, - use_distributions, - ) = data - - variables_demand = {} - d = [] - - for v, variable in enumerate(variables): - idx, dataset = vars_idx[variable]["idx"], vars_idx[variable]["dataset"] - # Compute the unit conversion vector for the given activities - dataset_unit = dataset[2] - unit_vector = get_unit_conversion_factors( - scenarios.attrs["units"][variable], - dataset_unit, - units_map, - ) - - # Fetch the demand for the given region, model, pathway, and year - demand = scenarios.sel( - variables=variable, - region=region, - model=model, - pathway=scenario, - year=year, - ) - - share = demand / scenarios.sel( - region=region, - model=model, - pathway=scenario, - year=year, - ).sum(dim="variables") - - # If the total demand is zero, return None - if share < demand_cutoff: - continue - - variables_demand[variable] = { - "id": idx, - "demand": demand.values * float(unit_vector), - } - - lca.lci(demand={idx: demand.values * float(unit_vector)}) - - if use_distributions == 0: - characterized_inventory = ( - characterization_matrix @ lca.inventory - ).toarray() - - else: - # Use distributions for LCA calculations - # next(lca) is a generator that yields the inventory matrix - results = np.array( - [ - (characterization_matrix @ lca.inventory).toarray() - for _ in zip(range(use_distributions), lca) - ] - ) - - # calculate quantiles along the first dimension - characterized_inventory = np.quantile(results, [0.05, 0.5, 0.95], axis=0) - - d.append(characterized_inventory) - - if debug: - logging.info( - f"var.: {variable}, name: {dataset[0][:50]}, " - f"ref.: {dataset[1]}, unit: {dataset[2][:50]}, idx: {idx}," - f"loc.: {dataset[3]}, demand: {round(float(demand.values * float(unit_vector)), 2)}, " - f"unit conv.: {unit_vector}, " - f"impact: {np.round(characterized_inventory.sum(axis=-1) / (demand.values * float(unit_vector)), 3)}. " - ) - - id_array = uuid.uuid4() - np.save(file=DIR_CACHED_DB / f"{id_array}.npy", arr=np.stack(d)) - - del d - - # concatenate the list of sparse matrices and - # add a third dimension and concatenate along it - return { - "id_array": id_array, - "variables": {k: v["demand"] for k, v in variables_demand.items()}, - } - - -def _calculate_year(args): - ( - model, - scenario, - year, - regions, - variables, - methods, - demand_cutoff, - filepaths, - mapping, - units, - lca_results, - classifications, - scenarios, - reverse_classifications, - debug, - use_distributions, - ) = args - - print(f"------ Calculating LCA results for {year}...") - if debug: - logging.info( - f"############################### " - f"{model}, {scenario}, {year} " - f"###############################" - ) - - geo = Geomap(model=model) - - # Try to load LCA matrices for the given model, scenario, and year - try: - bw_datapackage, technosphere_indices, biosphere_indices = get_lca_matrices( - filepaths, model, scenario, year - ) + Read the mapping file which maps scenario variables to LCA datasets. + It's a YAML file. + :return: dict - except FileNotFoundError: - # If LCA matrices can't be loaded, skip to the next iteration - if debug: - logging.warning(f"Skipping {model}, {scenario}, {year}, as data not found.") - return - - # Fetch indices - vars_info = fetch_indices(mapping, regions, variables, technosphere_indices, geo) - - # Remove contribution from activities in other activities - # A = remove_double_counting(A, vars_info) + """ + return yaml.safe_load(data.get_resource("mapping").raw_read()) - # check unclassified activities - missing_classifications = check_unclassified_activities( - technosphere_indices, classifications - ) - if missing_classifications: - if debug: - logging.warning( - f"{len(missing_classifications)} activities are not found in the classifications." - "See missing_classifications.csv for more details." - ) +def _read_scenario_data(data: dict, scenario: str): + """ + Read the scenario data. + The scenario data describes scenario variables with production volumes for each time step. + :param scenario: str. Scenario name. + :return: pd.DataFrame - results = {} - - locations = lca_results.coords["location"].values.tolist() - - acts_category_idx_dict = group_technosphere_indices( - technosphere_indices=technosphere_indices, - group_by=lambda x: classifications.get(x[:3], "unclassified"), - group_values=list(set(lca_results.coords["act_category"].values)), - ) - - acts_location_idx_dict = group_technosphere_indices( - technosphere_indices=technosphere_indices, - group_by=lambda x: x[-1], - group_values=locations, - ) - - results["other"] = { - "acts_category_idx_dict": acts_category_idx_dict, - "acts_location_idx_dict": acts_location_idx_dict, - } - - if use_distributions == 0: - lca = bc.LCA( - demand={0: 1}, - data_objs=[ - bw_datapackage, - ], - ) - lca.lci(factorize=True) + """ + filepath = data["scenarios"][scenario]["path"] + # if CSV file + if filepath.endswith(".csv"): + return pd.read_csv(filepath, index_col=0) else: - lca = MonteCarloLCA( - demand={0: 1}, - data_objs=[ - bw_datapackage, - ], - use_distributions=True, - ) - lca.lci() - - characterization_matrix = fill_characterization_factors_matrices( - methods=methods, - biosphere_matrix_dict=lca.dicts.biosphere, - biosphere_dict=biosphere_indices, - debug=debug, - ) - - if debug: - logging.info( - f"Characterization matrix created. Shape: {characterization_matrix.shape}" - ) + # Excel file + return pd.read_excel(filepath, index_col=0) - bar = pyprind.ProgBar(len(regions)) - for region in regions: - bar.update() - # Iterate over each region - results[region] = process_region( - ( - model, - scenario, - year, - region, - variables, - vars_info[region], - scenarios, - units, - demand_cutoff, - lca, - characterization_matrix, - debug, - use_distributions, - ) - ) - return results +def _read_datapackage(datapackage: DataPackage) -> DataPackage: + """Read the datapackage.json file. + + :return: DataPackage + """ + return DataPackage(datapackage) class Pathways: """The Pathways class reads in a datapackage that contains scenario data, mapping between scenario variables and LCA datasets, and LCA matrices. - Parameters - ---------- - datapackage : str - Path to the datapackage.json file. + :param datapackage: Path to the datapackage.zip file. + :type datapackage: str + """ def __init__(self, datapackage, debug=False): self.datapackage = datapackage self.data, dataframe, self.filepaths = validate_datapackage( - self.read_datapackage() + _read_datapackage() ) - self.mapping = self.get_mapping() - self.mapping.update(self.get_final_energy_mapping()) + self.mapping = _get_mapping() + self.mapping.update(self._get_final_energy_mapping()) self.debug = debug - self.scenarios = self.get_scenarios(dataframe) + self.scenarios = self._get_scenarios(dataframe) self.classifications = load_classifications() # create a reverse mapping @@ -378,23 +142,19 @@ def __init__(self, datapackage, debug=False): logging.info("#" * 600) logging.info(f"Pathways initialized with datapackage: {datapackage}") - def read_datapackage(self) -> DataPackage: - """Read the datapackage.json file. - - Returns - ------- - dict - The datapackage as a dictionary. - """ - return DataPackage(self.datapackage) - - def get_final_energy_mapping(self): + def _get_final_energy_mapping(self): """ Read the final energy mapping file, which is an Excel file :return: dict """ - def create_dict_for_specific_model(row, model): + def create_dict_for_specific_model(row: dict, model: str) -> dict: + """ + Create a dictionary for a specific model from the row. + :param row: dict + :param model: str + :return: dict + """ # Construct the key from 'sector', 'variable', and 'fuel' key = f"{row['sector']}_{row['variable']}_{row['fuel']}" @@ -416,7 +176,13 @@ def create_dict_for_specific_model(row, model): return dict_structure return None - def create_dict_with_specific_model(dataframe, model): + def create_dict_with_specific_model(dataframe: pandas.DataFrame, model: str) -> dict: + """ + Create a dictionary for a specific model from the dataframe. + :param dataframe: pandas.DataFrame + :param model: str + :return: dict + """ model_dict = {} for index, row in dataframe.iterrows(): row_dict = create_dict_for_specific_model(row, model) @@ -425,47 +191,19 @@ def create_dict_with_specific_model(dataframe, model): return model_dict # Read the Excel file - df = pd.read_excel( + mapping_dataframe = pd.read_excel( DATA_DIR / "final_energy_mapping.xlsx", ) model = self.data.descriptor["scenarios"][0]["name"].split(" - ")[0] - return create_dict_with_specific_model(df, model) - - def get_mapping(self) -> dict: - """ - Read the mapping file. - It's a YAML file. - :return: dict - - """ - return yaml.safe_load(self.data.get_resource("mapping").raw_read()) - - def read_scenario_data(self, scenario): - """Read the scenario data. - - Parameters - ---------- - scenario : str - Scenario name. + return create_dict_with_specific_model(mapping_dataframe, model) - Returns - ------- - pd.DataFrame - The scenario data as a pandas DataFrame. - """ - filepath = self.data["scenarios"][scenario]["path"] - # if CSV file - if filepath.endswith(".csv"): - return pd.read_csv(filepath, index_col=0) - else: - # Excel file - return pd.read_excel(filepath, index_col=0) - - def get_scenarios(self, scenario_data: pd.DataFrame) -> xr.DataArray: + def _get_scenarios(self, scenario_data: pd.DataFrame) -> xr.DataArray: """ Load scenarios from filepaths as pandas DataFrame. Concatenate them into an xarray DataArray. + :param scenario_data: pd.DataFrame + :return: xr.DataArray """ mapping_vars = [item["scenario variable"] for item in self.mapping.values()] @@ -525,7 +263,6 @@ def calculate( years: Optional[List[int]] = None, variables: Optional[List[str]] = None, characterization: bool = True, - flows: Optional[List[str]] = None, multiprocessing: bool = False, demand_cutoff: float = 1e-3, use_distributions: int = 0, @@ -552,12 +289,8 @@ def calculate( :type years: Optional[List[int]], default is None :param variables: List of variables. If None, all available variables will be used. :type variables: Optional[List[str]], default is None - :param flows: List of biosphere flows. If None, all available flows will be used. - :type flows: Optional[List[str]], default is None :param multiprocessing: Boolean. If True, process each region in parallel. :type multiprocessing: bool, default is False - :param data_type: Data type to use for storing LCA results. - :type data_type: np.dtype, default is np.float64 :param demand_cutoff: Float. If the total demand for a given variable is less than this value, the variable is skipped. :type demand_cutoff: float, default is 1e-3 :param use_distributions: Integer. If non zero, use distributions for LCA calculations. @@ -696,9 +429,9 @@ def calculate( # remove None values in results results = {k: v for k, v in results.items() if v is not None} - self.fill_in_result_array(results) + self._fill_in_result_array(results) - def fill_in_result_array(self, results: dict): + def _fill_in_result_array(self, results: dict): # Assuming DIR_CACHED_DB, results, and self.lca_results are already defined diff --git a/pathways/utils.py b/pathways/utils.py index c46e765..23e380e 100644 --- a/pathways/utils.py +++ b/pathways/utils.py @@ -1,3 +1,13 @@ +""" +Utilities for the pathways module. + +These utilities include functions for loading activities classifications and units conversion, harmonizing units, +creating an LCA results array, displaying results, loading a numpy array from disk, getting visible files, cleaning the +cache directory, resizing scenario data, fetching indices, fetching inventories locations, converting a CSV file to a +dictionary, checking unclassified activities, and getting activity indices. + +""" + import csv import logging from pathlib import Path @@ -75,7 +85,7 @@ def get_unit_conversion_factors( return np.array(unit_mapping[scenario_unit][dataset_unit]) -def load_units_conversion(): +def load_units_conversion() -> dict: """Load the units conversion.""" with open(UNITS_CONVERSION, "r") as f: @@ -136,6 +146,7 @@ def create_lca_results_array( } if use_distributions is True: + # we calculate the 5th, 50th, and 95th percentiles coords.update({"quantile": [0.05, 0.5, 0.95]}) dims = ( @@ -162,6 +173,15 @@ def display_results( cutoff: float = 0.001, interpolate: bool = False, ) -> xr.DataArray: + """ + Display the LCA results. + Remove results below a cutoff value and aggregate them into a single category. + :param lca_results: The LCA results. + :param cutoff: The cutoff value. + :param interpolate: A boolean indicating whether to interpolate the results. + :return: The LCA results. + :rtype: xr.DataArray + """ if lca_results is None: raise ValueError("No results to display") @@ -206,7 +226,12 @@ def load_numpy_array_from_disk(filepath): return np.load(filepath, allow_pickle=True) -def get_visible_files(path): +def get_visible_files(path: str) -> list[Path]: + """ + Get visible files in a directory. + :param path: The path to the directory. + :return: List of visible files. + """ return [file for file in Path(path).iterdir() if not file.name.startswith(".")] @@ -260,7 +285,7 @@ def resize_scenario_data( return scenario_data -def _get_activity_indices( +def get_activity_indices( activities: List[Tuple[str, str, str, str]], technosphere_index: Dict[Tuple[str, str, str, str], Any], geo: Geomap, @@ -350,7 +375,7 @@ def fetch_indices( ] # Use _get_activity_indices to fetch indices - idxs = _get_activity_indices(activities, technosphere_index, geo) + idxs = get_activity_indices(activities, technosphere_index, geo) # Map variables to their indices and associated dataset information vars_idx[region] = { @@ -403,3 +428,25 @@ def csv_to_dict(filename: str) -> dict[int, tuple[str, ...]]: logging.warning(f"Row {row} has less than 5 items.") return output_dict + + +def check_unclassified_activities( + technosphere_indices: dict, classifications: dict +) -> List: + """ + Check if there are activities in the technosphere matrix that are not in the classifications. + :param technosphere_indices: List of activities in the technosphere matrix. + :param classifications: Dictionary of activities classifications. + :return: List of activities not found in the classifications. + """ + missing_classifications = [] + for act in technosphere_indices: + if act[:3] not in classifications: + missing_classifications.append(list(act[:3])) + + if missing_classifications: + with open("missing_classifications.csv", "a") as f: + writer = csv.writer(f) + writer.writerows(missing_classifications) + + return missing_classifications