Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
romainsacchi authored and romainsacchi committed Mar 21, 2024
2 parents 5874593 + 7e69aaa commit 1ecd518
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 69 deletions.
2 changes: 1 addition & 1 deletion dev/timing.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
],
regions=[
"WEU",
#"USA",
# "USA",
],
scenarios=[scenario],
years=[
Expand Down
4 changes: 2 additions & 2 deletions pathways/data_validation.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ def validate_datapackage(datapackage: datapackage.DataPackage):
dataframe = pd.DataFrame(data, columns=headers)

# Check that the scenario data is valid
#validate_scenario_data(dataframe)
# validate_scenario_data(dataframe)

# Check that the mapping is valid
#validate_mapping(datapackage.get_resource("mapping"), dataframe)
# validate_mapping(datapackage.get_resource("mapping"), dataframe)

# Check that the LCA data is valid
# validate_lca_data(datapackage)
Expand Down
1 change: 1 addition & 0 deletions pathways/filesystem_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

from pathlib import Path

import platformdirs

# Directories for data which comes with Pathways
Expand Down
144 changes: 78 additions & 66 deletions pathways/pathways.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"""

import csv
import uuid
from collections import defaultdict
from multiprocessing import Pool, cpu_count
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import uuid

import bw2calc as bc
import numpy as np
Expand All @@ -21,12 +21,9 @@
from numpy import dtype, ndarray
from premise.geomap import Geomap

from .filesystem_constants import DATA_DIR, DIR_CACHED_DB
from .data_validation import validate_datapackage
from .lca import (
get_lca_matrices,
remove_double_counting,
)
from .filesystem_constants import DATA_DIR, DIR_CACHED_DB
from .lca import get_lca_matrices, remove_double_counting
from .lcia import get_lcia_method_names
from .utils import (
_get_activity_indices,
Expand All @@ -35,8 +32,8 @@
get_unit_conversion_factors,
harmonize_units,
load_classifications,
load_numpy_array_from_disk,
load_units_conversion,
load_numpy_array_from_disk
)

import warnings
Expand Down Expand Up @@ -85,12 +82,12 @@ def get_visible_files(path):


def resize_scenario_data(
scenario_data: xr.DataArray,
model: List[str],
scenario: List[str],
region: List[str],
year: List[int],
variables: List[str],
scenario_data: xr.DataArray,
model: List[str],
scenario: List[str],
region: List[str],
year: List[int],
variables: List[str],
):
"""
Resize the scenario data to the given scenario, year, region, and variables.
Expand Down Expand Up @@ -189,12 +186,18 @@ def fetch_inventories_locations(A_index: Dict[str, Tuple[str, str, str]]) -> Lis
return list(set([act[3] for act in A_index]))


def group_technosphere_indices_by_category(technosphere_index, reverse_classifications, lca_results_coords) -> Tuple:
def group_technosphere_indices_by_category(
technosphere_index, reverse_classifications, lca_results_coords
) -> Tuple:
# Generate a list of activity indices for each activity category
acts_idx = []
acts_dict = {}
for cat in lca_results_coords["act_category"].values:
x = [int(technosphere_index[a]) for a in reverse_classifications[cat] if a in technosphere_index]
x = [
int(technosphere_index[a])
for a in reverse_classifications[cat]
if a in technosphere_index
]
acts_idx.append(x)
acts_dict[cat] = x

Expand All @@ -207,6 +210,7 @@ def group_technosphere_indices_by_category(technosphere_index, reverse_classific
# Swap the axes of acts_idx to align with the dimensionality of D
return acts_idx, acts_dict, np.swapaxes(acts_idx, 0, 1)


def group_technosphere_indices_by_location(technosphere_index, locations) -> Tuple:
"""
Group the technosphere indices by location.
Expand All @@ -218,22 +222,15 @@ def group_technosphere_indices_by_location(technosphere_index, locations) -> Tup
act_indices_list = []
act_indices_dict = {}
for loc in locations:
x = [
int(technosphere_index[a])
for a in technosphere_index
if a[-1] == loc
]
x = [int(technosphere_index[a]) for a in technosphere_index if a[-1] == loc]

act_indices_list.append(x)
act_indices_dict[loc] = x

# Pad each list in act_indices_list with -1 to make them all the same length
max_len = max([len(x) for x in act_indices_list])
act_indices_array = np.array(
[
np.pad(x, (0, max_len - len(x)), constant_values=-1)
for x in act_indices_list
]
[np.pad(x, (0, max_len - len(x)), constant_values=-1) for x in act_indices_list]
)

return act_indices_list, act_indices_dict, act_indices_array
Expand Down Expand Up @@ -287,13 +284,13 @@ def process_region(data: Tuple) -> dict[str, ndarray[Any, dtype[Any]] | list[int

# If the total demand is zero, return None
if (
demand
/ scenarios.sel(
region=region,
model=model,
pathway=scenario,
year=year,
).sum(dim="variables")
demand
/ scenarios.sel(
region=region,
model=model,
pathway=scenario,
year=year,
).sum(dim="variables")
) < demand_cutoff:
continue

Expand Down Expand Up @@ -377,22 +374,26 @@ def _calculate_year(args):
location_to_index = {location: index for index, location in enumerate(locations)}
reverse_technosphere_index = {int(v): k for k, v in technosphere_indices.items()}
loc_idx = np.array(
[
location_to_index[act[-1]]
[
location_to_index[act[-1]]
for act in reverse_technosphere_index.values()
if act[-1] in locations
]
)
]
)

acts_category_idx_list, acts_category_idx_dict, acts_category_idx_array = group_technosphere_indices_by_category(
technosphere_indices,
reverse_classifications,
lca_results.coords,
acts_category_idx_list, acts_category_idx_dict, acts_category_idx_array = (
group_technosphere_indices_by_category(
technosphere_indices,
reverse_classifications,
lca_results.coords,
)
)

acts_location_idx_list, acts_location_idx_dict, acts_location_idx_array = group_technosphere_indices_by_location(
technosphere_indices,
locations,
acts_location_idx_list, acts_location_idx_dict, acts_location_idx_array = (
group_technosphere_indices_by_location(
technosphere_indices,
locations,
)
)

results["other"] = {
Expand Down Expand Up @@ -610,24 +611,24 @@ def get_scenarios(self, scenario_data: pd.DataFrame) -> xr.DataArray:
units[variable] = scenario_data[
scenario_data["variables"]
== self.mapping[variable]["scenario variable"]
].iloc[0]["unit"]
].iloc[0]["unit"]

data.attrs["units"] = units

return data

def calculate(
self,
methods: Optional[List[str]] = None,
models: Optional[List[str]] = None,
scenarios: Optional[List[str]] = None,
regions: Optional[List[str]] = None,
years: Optional[List[int]] = None,
variables: Optional[List[str]] = None,
characterization: bool = True,
flows: Optional[List[str]] = None,
multiprocessing: bool = False,
demand_cutoff: float = 1e-3,
self,
methods: Optional[List[str]] = None,
models: Optional[List[str]] = None,
scenarios: Optional[List[str]] = None,
regions: Optional[List[str]] = None,
years: Optional[List[int]] = None,
variables: Optional[List[str]] = None,
characterization: bool = True,
flows: Optional[List[str]] = None,
multiprocessing: bool = False,
demand_cutoff: float = 1e-3,
) -> dict[Any, Any] | dict[int | Any, dict[Any, dict[str, Any]] | None]:
"""
Calculate Life Cycle Assessment (LCA) results for given methods, models, scenarios, regions, and years.
Expand Down Expand Up @@ -745,8 +746,14 @@ def calculate(
# Process each region in parallel
with Pool(cpu_count()) as p:
# store th results as a dictionary with years as keys
results.update({(model, scenario, year): result for year, result in
zip(years, p.map(_calculate_year, args))})
results.update(
{
(model, scenario, year): result
for year, result in zip(
years, p.map(_calculate_year, args)
)
}
)
else:
results = {
(model, scenario, year): _calculate_year(
Expand Down Expand Up @@ -780,9 +787,14 @@ def fill_in_result_array(self, results: dict):
# Assuming DIR_CACHED_DB, results, and self.lca_results are already defined

# Pre-loading data from disk if possible
cached_data = {data["id_array"]: load_numpy_array_from_disk(DIR_CACHED_DB / f"{data['id_array']}.npy")
for coord, result in results.items()
for region, data in result.items() if region != "other"}
cached_data = {
data["id_array"]: load_numpy_array_from_disk(
DIR_CACHED_DB / f"{data['id_array']}.npy"
)
for coord, result in results.items()
for region, data in result.items()
if region != "other"
}
# use pyprint to display progress
bar = pyprind.ProgBar(len(results))
for coord, result in results.items():
Expand Down Expand Up @@ -820,17 +832,17 @@ def fill_in_result_array(self, results: dict):
"year": year,
"act_category": cat,
"location": loc,
"variable": list(variables.keys())
"variable": list(variables.keys()),
}
] = summed_data

def characterize_planetary_boundaries(
self,
models: Optional[List[str]] = None,
scenarios: Optional[List[str]] = None,
regions: Optional[List[str]] = None,
years: Optional[List[int]] = None,
variables: Optional[List[str]] = None,
self,
models: Optional[List[str]] = None,
scenarios: Optional[List[str]] = None,
regions: Optional[List[str]] = None,
years: Optional[List[int]] = None,
variables: Optional[List[str]] = None,
):
self.calculate(
models=models,
Expand Down

0 comments on commit 1ecd518

Please sign in to comment.