diff --git a/dev/generate datapackages.ipynb b/dev/generate datapackages.ipynb index 4c5c735..c4b927d 100644 --- a/dev/generate datapackages.ipynb +++ b/dev/generate datapackages.ipynb @@ -198,7 +198,7 @@ }, { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "id": "9d57850c-04fa-4ef7-89e8-3e9de62f92e3", "metadata": {}, "outputs": [ @@ -222,216 +222,7 @@ "execution_count": null, "id": "a081c764-a942-4627-9197-fcd373e98b62", "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Calculating LCA results for image...\n", - "--- Calculating LCA results for SSP2-RCP19...\n", - "------ Calculating LCA results for 2005...\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "0% [# ] 100% | ETA: 00:00:01/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n", - "2\n", - "3\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "0% [### ] 100% | ETA: 00:02:10/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n", - "2\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "0% [#### ] 100% | ETA: 00:02:07/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n", - "2\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "0% [###### ] 100% | ETA: 00:02:12/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n", - "2\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "0% [######## ] 100% | ETA: 00:02:05/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "0% [######### ] 100% | ETA: 00:01:57/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n", - "2\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n", - "2\n", - "3\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "0% [########### ] 100% | ETA: 00:01:55/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n", - "2\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/romain/anaconda3/envs/pathways/lib/python3.11/site-packages/scikits/umfpack/umfpack.py:736: UmfpackWarning: (almost) singular matrix! (estimated cond. number: 1.21e+13)\n", - " warnings.warn(msg, UmfpackWarning)\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "0\n", - "1\n" - ] - } - ], + "outputs": [], "source": [ "import numpy as np\n", "dp = p.calculate(\n", @@ -446,12 +237,11 @@ " v for v in p.scenarios.coords[\"variables\"].values\n", " if any(i in v for i in [\"Industry\", \"Transport\", \"Heating\"])\n", " ],\n", - " #years=[2005, 2020, 2050, 2070, 2100],\n", + " years=[2005, 2010, 2020],\n", " characterization=True,\n", " #flows=[\"Lithium - natural resource - in ground - kilogram\",],\n", - " #data_type=np.float32,\n", " multiprocessing=False,\n", - " demand_cutoff=0.1,\n", + " demand_cutoff=0.01,\n", ")" ] }, diff --git a/dev/timing.py b/dev/timing.py index 9e3b7c5..c01ccaa 100644 --- a/dev/timing.py +++ b/dev/timing.py @@ -9,14 +9,14 @@ p.calculate( methods=[ "RELICS - metals extraction - Lithium", - "RELICS - metals extraction - Molybdenum", + #"RELICS - metals extraction - Molybdenum", ], regions=[ - "WEU", + "WEU", "USA", ], scenarios=[scenario], years=[ - 2020, + 2010, 2020, 2030, ], variables=[ v @@ -24,6 +24,7 @@ if any(i in v for i in ["Industry", "Transport", "Heating"]) ], demand_cutoff=0.01, + multiprocessing=True ) arr = p.display_results() arr.to_netcdf(f"results_image_{scenario}.nc") diff --git a/pathways/pathways.py b/pathways/pathways.py index a446c8a..d5d2f9b 100644 --- a/pathways/pathways.py +++ b/pathways/pathways.py @@ -16,6 +16,8 @@ import pyprind import xarray as xr import yaml +from numpy import ndarray, dtype +from scipy import sparse from datapackage import DataPackage from premise.geomap import Geomap @@ -204,8 +206,8 @@ def generate_A_indices(A_index, reverse_classifications, lca_results_coords): return np.swapaxes(acts_idx, 0, 1) -def process_region(data: Tuple) -> Union[None, Dict[str, Any]]: - global impact_categories +def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int]]: + ( model, scenario, @@ -221,49 +223,19 @@ def process_region(data: Tuple) -> Union[None, Dict[str, Any]]: B_index, reverse_classifications, lca_results_coords, - flows, demand_cutoff, + locations, + location_to_index, + rev_A_index, + lca ) = data - locations = lca_results_coords["location"].values.tolist() - location_to_index = {location: index for index, location in enumerate(locations)} - rev_A_index = {int(v): k for k, v in A_index.items()} - - # Generate a list of activity indices for each activity category - category_idx = [] - for cat in lca_results_coords["act_category"].values: - category_idx.append( - [int(A_index[a]) for a in reverse_classifications[cat] if a in A_index] - ) - - act_categories = lca_results_coords["act_category"].values - - # if lcia_matrix is not None: - impact_categories = lca_results_coords["impact_category"].values - target = np.zeros( - ( - len(act_categories), - len(list(vars_idx)), - len(locations), - len(impact_categories), - ) - ) - # else: - # if flows is not None: - # target = np.zeros( - # (len(act_categories), len(list(vars_idx)), len(locations), len(flows)) - # ) - # else: - # target = np.zeros( - # (len(act_categories), len(list(vars_idx)), len(locations), len(B_index)) - # ) - FU = [] + variables_demand = [] + variables_idx = [] - vars_list = [] for v, variable in enumerate(variables): idx, dataset = vars_idx[variable]["idx"], vars_idx[variable]["dataset"] - # Compute the unit conversion vector for the given activities dataset_unit = dataset[2] unit_vector = get_unit_conversion_factors( @@ -298,90 +270,163 @@ def process_region(data: Tuple) -> Union[None, Dict[str, Any]]: idx: demand.values * float(unit_vector), } ) + variables_demand.append(variable) + variables_idx.append(v) - vars_list.append(v) + d = [] + for f, fu in enumerate(FU): + lca.lcia(fu) + d.append(lca.supply_array) - lca = bc.LCA( - demand=FU[0], - data_objs=[dp], + # concatenate the list of sparse matrices and + # add a third dimension and concatenate along it + return {"data": np.column_stack(d), "variables": variables_demand, "variables_idx": variables_idx} + + +def _calculate_year(args): + + ( + model, + scenario, + year, + regions, + variables, + methods, + demand_cutoff, + datapackage, + mapping, + units, + lca_results, + classifications, + scenarios, + reverse_classifications, + ) = args + + print(f"------ Calculating LCA results for {year}...") + + geo = Geomap(model=model) + + # Try to load LCA matrices for the given model, scenario, and year + try: + bw_datapackage, technosphere_indices, biosphere_indices = get_lca_matrices( + datapackage, model, scenario, year, methods + ) + + except FileNotFoundError: + # If LCA matrices can't be loaded, skip to the next iteration + print( + "LCA matrices not found for the given model, scenario, and year." + ) + return + + # Fetch indices + vars_info = fetch_indices( + mapping, regions, variables, technosphere_indices, geo ) - lca.lci() - # Populate the result array - act_locs = [a[-1] for a in rev_A_index.values()] + # Remove contribution from activities in other activities + # A = remove_double_counting(A, vars_info) - # Initialize the new array with zeros for missing data - E = np.zeros((len(A_index), len(locations), len(impact_categories))) + # check unclassified activities + check_unclassified_activities(technosphere_indices, classifications) + + # Initialize list to store activities not found in classifications + missing_class = [] + + # Check for activities not found in classifications + for act in technosphere_indices: + if act not in classifications: + missing_class.append(list(act)) + + locations = lca_results.coords["location"].values.tolist() + location_to_index = {location: index for index, location in enumerate(locations)} + rev_technosphere_index = {int(v): k for k, v in technosphere_indices.items()} + + lca = bc.LCA( + demand={0: 1}, + data_objs=[bw_datapackage,], + ) + lca.lci(factorize=True) + results = {} + # use pyprind + bar = pyprind.ProgBar(len(regions)) + for region in regions: + print(region) + bar.update() + # Iterate over each region + results[region] = process_region( + ( + model, + scenario, + year, + region, + variables, + vars_info[region], + scenarios, + mapping, + units, + bw_datapackage, + technosphere_indices, + biosphere_indices, + reverse_classifications, + lca_results.coords, + demand_cutoff, + locations, + location_to_index, + rev_technosphere_index, + lca + ) + ) + + # Populate the result array + act_locs = [a[-1] for a in rev_technosphere_index.values()] acts_idx = generate_A_indices( - A_index, + technosphere_indices, reverse_classifications, - lca_results_coords, + lca_results.coords, ) + act_categories = lca_results.coords["act_category"].values + impact_categories = lca_results.coords["impact_category"].values - for f, fu in enumerate(FU): - print(f) - lca.redo_lcia(fu) - - # Sum along the first axis of D to get final result - D = lca.characterized_inventory.sum(axis=0) - - for i, act in enumerate(rev_A_index.values()): - if act[-1] in act_locs: - loc_idx = location_to_index[act[-1]] - E[i, loc_idx] = D[:, i] - - # Sum over the first axis of D, - # using acts_idx for advanced indexing - target[:, vars_list[f]] = E[acts_idx, ...].sum(axis=0) - # - # else: - # # else, just sum the results of the inventory - # acts_idx = generate_A_indices( - # A_index, - # reverse_classifications, - # lca_results_coords, - # ) - # if flows is not None: - # - # def transf_flow(f): - # return tuple(f.split(" - ")) - # - # flows_idx = [int(B_index[transf_flow(f)]) for f in flows] - # C = C[:, flows_idx] - # - # # Initialize the new array with zeros for missing data - # E = np.zeros((len(A_index), len(locations), len(flows_idx))) - # - # # Populate the result array - # act_locs = [a[-1] for a in rev_A_index.values()] - # - # for i, act in enumerate(rev_A_index.values()): - # if act[-1] in act_locs: - # loc_idx = location_to_index[act[-1]] - # E[i, loc_idx, :] = C[i, :] - # - # target[:, v] = E[acts_idx, ...].sum(axis=0) - # else: - # target[:, v] = C[acts_idx, ...].sum(axis=0) - - # Return a dictionary containing the processed LCA data for the given region - def get_indices(): - if flows is not None: - return [a for a in flows] - return [" - ".join(a) for a in list(B_index.keys())] - - return { - "act_category": act_categories, - "variable": list(vars_idx.keys()), - "impact_category": impact_categories, - "year": year, - "region": region, - "D": target, - "scenario": scenario, - "model": model, - } + # Generate a list of activity indices for each activity category + category_idx = [] + for cat in lca_results.coords["act_category"].values: + category_idx.append( + [int(technosphere_indices[a]) for a in reverse_classifications[cat] if a in technosphere_indices] + ) + mask = np.array([act[-1] in act_locs for act in rev_technosphere_index.values()]) + loc_idx = np.array( + [location_to_index[act[-1]] for act in rev_technosphere_index.values() if act[-1] in act_locs]) + + # Initialize the new array with zeros for missing data + E = np.zeros((len(technosphere_indices), len(locations), len(impact_categories), len(variables))) + print("E.shape", E.shape) + + for region, result in results.items(): + data = result["data"] + variables_demand = result["variables"] + variables_idx = result["variables_idx"] + E[mask, loc_idx][..., variables_idx] = data[mask][:, None] + + # using acts_idx for advanced indexing + E = E[acts_idx, ...].sum(axis=0) + print("final E.shape", E.shape) + + lca_results.loc[ + { + "act_category": act_categories, + "variable": variables_demand, + "impact_category": impact_categories, + "year": year, + "region": region, + "model": model, + "scenario": scenario, + } + ] = E.transpose(0, -1, 1, 2) + + return lca_results class Pathways: """The Pathways class reads in a datapackage that contains scenario data, @@ -559,7 +604,6 @@ def calculate( characterization: bool = True, flows: Optional[List[str]] = None, multiprocessing: bool = False, - data_type: np.dtype = np.float64, demand_cutoff: float = 1e-3, ) -> None: """ @@ -596,9 +640,6 @@ def calculate( self.scenarios = harmonize_units(self.scenarios, variables) - # Initialize list to store activities not found in classifications - missing_class = [] - if characterization is False: methods = None @@ -616,6 +657,7 @@ def calculate( years = self.scenarios.coords["year"].values if variables is None: variables = self.scenarios.coords["variables"].values + variables = [str(v) for v in variables] # resize self.scenarios array to fit the given arguments self.scenarios = resize_scenario_data( @@ -631,132 +673,79 @@ def calculate( if k in self.scenarios.coords["variables"].values } + # Create xarray for storing LCA results if not already present + if self.lca_results is None: + dp, A_index, B_index = get_lca_matrices( + self.datapackage, models[0], scenarios[0], years[0], methods + ) + locations = fetch_inventories_locations(A_index) + + self.lca_results = create_lca_results_array( + methods=methods, + B_indices=B_index, + years=years, + regions=regions, + locations=locations, + models=models, + scenarios=scenarios, + classifications=self.classifications, + mapping=self.mapping, + flows=flows, + ) + # Iterate over each combination of model, scenario, and year for model in models: print(f"Calculating LCA results for {model}...") - geo = Geomap(model=model) for scenario in scenarios: print(f"--- Calculating LCA results for {scenario}...") - for year in years: - print(f"------ Calculating LCA results for {year}...") - - # Try to load LCA matrices for the given model, scenario, and year - try: - dp, A_index, B_index = get_lca_matrices( - self.datapackage, model, scenario, year, methods - ) - - except FileNotFoundError: - # If LCA matrices can't be loaded, skip to the next iteration - print( - "LCA matrices not found for the given model, scenario, and year." + if multiprocessing: + args = [ + ( + model, + scenario, + year, + regions, + variables, + methods, + demand_cutoff, + self.datapackage, + self.mapping, + self.units, + self.lca_results, + self.classifications, + self.scenarios, + self.reverse_classifications, ) - continue - - # Fetch indices - vars_info = fetch_indices( - self.mapping, regions, variables, A_index, geo - ) - - # Remove contribution from activities in other activities - # A = remove_double_counting(A, vars_info) - - # check unclassified activities - check_unclassified_activities(A_index, self.classifications) - - # Create xarray for storing LCA results if not already present - if self.lca_results is None: - locations = fetch_inventories_locations(A_index) - - self.lca_results = create_lca_results_array( - methods=methods, - B_indices=B_index, - years=years, - regions=regions, - locations=locations, - models=models, - scenarios=scenarios, - classifications=self.classifications, - mapping=self.mapping, - flows=flows, - ) - - # Check for activities not found in classifications - for act in A_index: - if act not in self.classifications: - missing_class.append(list(act)) - - if multiprocessing is True: - # Prepare data for each region - data_for_regions = [ - ( - model, - scenario, - year, - region, - variables, - vars_info[region], - self.scenarios, - self.mapping, - self.units, - dp, - A_index, - B_index, - self.reverse_classifications, - self.lca_results.coords, - flows, - demand_cutoff, - ) - for region in regions - ] - - # Process each region in parallel - with Pool(cpu_count()) as p: - results = p.map(process_region, data_for_regions) - else: - results = [] - # use pyprind - bar = pyprind.ProgBar(len(regions)) - for region in regions: - bar.update() - # Iterate over each region - results.append( - process_region( - ( - model, - scenario, - year, - region, - variables, - vars_info[region], - self.scenarios, - self.mapping, - self.units, - dp, - A_index, - B_index, - self.reverse_classifications, - self.lca_results.coords, - flows, - demand_cutoff, - ) + for year in years + ] + # Process each region in parallel + with Pool(cpu_count()) as p: + results = p.map(_calculate_year, args) + else: + results = [] + for year in years: + results.append( + _calculate_year( + ( + model, + scenario, + year, + regions, + variables, + methods, + demand_cutoff, + self.datapackage, + self.mapping, + self.units, + self.lca_results, + self.classifications, + self.scenarios, + self.reverse_classifications, ) ) + ) - # Store results in the LCA results xarray - for result in results: - if result is not None: - self.lca_results.loc[ - { - "act_category": result["act_category"], - "variable": result["variable"], - "impact_category": result["impact_category"], - "year": result["year"], - "region": result["region"], - "model": result["model"], - "scenario": result["scenario"], - } - ] = result["D"] + self.lca_results = xr.concat(results, dim="year") def characterize_planetary_boundaries( self,