Skip to content

Commit

Permalink
EODC
Browse files Browse the repository at this point in the history
  • Loading branch information
romainsacchi committed Mar 19, 2024
1 parent 2a5afd6 commit e5fe184
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 338 deletions.
2 changes: 1 addition & 1 deletion conda/meta.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ requirements:
- python
- setuptools
run:
- numpy
- numpy==1.24.0
- pandas
- xarray
- scipy
Expand Down
225 changes: 72 additions & 153 deletions dev/generate datapackages.ipynb

Large diffs are not rendered by default.

145 changes: 92 additions & 53 deletions pathways/lca.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import csv
import sys
import warnings
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import numpy as np
import scipy
import scipy.sparse
import xarray as xr
from scipy import sparse
from scipy.sparse import csr_matrix
import bw_processing as bwp
import numpy as np

from .lcia import get_lcia_methods

# Attempt to import pypardiso's spsolve function.
# If it isn't available, fall back on scipy's spsolve.
Expand Down Expand Up @@ -84,58 +85,40 @@ def read_indices_csv(file_path: Path) -> Dict[Tuple[str, str, str, str], str]:

def load_matrix_and_index(
file_path: Path,
num_indices: int,
sign: int = 1,
transpose: bool = False,
extra_indices: Optional[int] = None,
data_type: np.dtype = np.float32,
) -> csr_matrix:
) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Reads a CSV file and returns its contents as a CSR sparse matrix.
:param file_path: The path to the CSV file.
:type file_path: Path
:param num_indices: The number of indices in the data.
:type num_indices: int
:param transpose: Whether or not to transpose the matrix. Default is False.
:type transpose: bool
:param extra_indices: Optional parameter to adjust the shape of the matrix. Default is None.
:type extra_indices: Optional[int]
:return: The data from the CSV file as a CSR sparse matrix.
:rtype: csr_matrix
:return: A tuple containing the data, indices, and sign of the data.
:rtype: Tuple[np.ndarray, np.ndarray, np.ndarray]
"""
# Load the data from the CSV file
coords = np.genfromtxt(file_path, delimiter=";", skip_header=1)

# Extract the row and column indices and the data
I = coords[:, 0].astype(int)
J = coords[:, 1].astype(int)
data = coords[:, 2] * sign
# give `indices_array` a list of tuples of indices
indices_array = np.array(
list(zip(coords[:, 0].astype(int), coords[:, 1].astype(int))),
dtype=bwp.INDICES_DTYPE,
)

# Determine the shape of the matrix
if transpose:
shape = (num_indices, I.max() + 1)
idx = (J, I)
else:
shape = (
extra_indices if extra_indices is not None else I.max() + 1,
num_indices,
)
idx = (I, J)
data_array = coords[:, 2]
# make a boolean scalar array to store the sign of the data
sign_array = np.where(data_array < 0, True, False)
# make values of data_array all positive
data_array = np.abs(data_array)

# Create and return the CSR matrix
matrix = csr_matrix((data, idx), shape=shape, dtype=data_type)
return matrix
return data_array, indices_array, sign_array


def get_lca_matrices(
datapackage: str,
model: str,
scenario: str,
year: int,
data_type: np.dtype = np.float32,
) -> Tuple[sparse.csr_matrix, sparse.csr_matrix, Dict, Dict]:
methods: list,
) -> Tuple[bwp.datapackage, Dict, Dict]:
"""
Retrieve Life Cycle Assessment (LCA) matrices from disk.
Expand All @@ -154,29 +137,85 @@ def get_lca_matrices(
A_inds = read_indices_csv(dirpath / "A_matrix_index.csv")
B_inds = read_indices_csv(dirpath / "B_matrix_index.csv")

A = load_matrix_and_index(
dirpath / "A_matrix.csv", len(A_inds), transpose=True, data_type=data_type
)
# create brightway datapackage
dp = bwp.create_datapackage()

# if A is not square, raise an error
if A.shape[0] != A.shape[1]:
raise ValueError(f"A must be a square matrix. Current shape is {A.shape}.")
a_data, a_indices, a_sign = load_matrix_and_index(
dirpath / "A_matrix.csv",
)

B = load_matrix_and_index(
b_data, b_indices, b_sign = load_matrix_and_index(
dirpath / "B_matrix.csv",
len(B_inds),
sign=-1,
extra_indices=len(A_inds),
data_type=data_type,
)

if B.shape[0] != A.shape[0]:
raise ValueError(
f"Incompatible dimensions between A and B. A shape: {A.shape}, B shape: {B.shape}"
)
dp.add_persistent_vector(
matrix='technosphere_matrix',
indices_array=a_indices,
data_array=a_data,
flip_array=a_sign,
)
dp.add_persistent_vector(
matrix='biosphere_matrix',
indices_array=b_indices,
data_array=b_data,
flip_array=b_sign,
)

# c_data, c_indices, c_sign = fill_characterization_factors_matrix(
# B_inds, methods
# )
#
#
# # if c_data is multidimensional
# if len(c_data.shape) > 1:
# dp.add_persistent_array(
# matrix='characterization_matrix',
# indices_array=c_indices,
# data_array=c_data,
# flip_array=c_sign,
# )
# else:
# dp.add_persistent_vector(
# matrix='characterization_matrix',
# indices_array=c_indices,
# data_array=c_data,
# flip_array=c_sign,
# )

return dp, A_inds, B_inds


def fill_characterization_factors_matrix(
biosphere_flows: dict,
methods
) -> np.ndarray:
"""
Create a characterization matrix based on the list of biosphere flows
given.
:param biosphere_flows:
:param methods: contains names of the methods to use.
:return:
"""

lcia_data = get_lcia_methods(methods=methods)

print(lcia_data)


# create a numpy array filled with zeros
# of size equal to biosphere_flows and lcia methods

cf_matrix = np.zeros((len(biosphere_flows), len(methods)))

return A, B, A_inds, B_inds
# fill the matrix
for i, flow in enumerate(biosphere_flows):
for j, method in enumerate(methods):
try:
cf_matrix[i, j] = lcia_data[method][flow[:3]]
except KeyError:
continue

return cf_matrix

def remove_double_counting(A: csr_matrix, vars_info: dict) -> csr_matrix:
"""
Expand Down
29 changes: 4 additions & 25 deletions pathways/lcia.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,36 +36,15 @@ def format_lcia_method_exchanges(method):
}


def get_lcia_methods():
def get_lcia_methods(methods: list = None):
"""Get a list of available LCIA methods."""
with open(LCIA_METHODS, "r") as f:
data = json.load(f)

return {" - ".join(x["name"]): format_lcia_method_exchanges(x) for x in data}

if methods:
data = [x for x in data if " - ".join(x["name"]) in methods]

def fill_characterization_factors_matrix(biosphere_flows: list, methods) -> np.ndarray:
"""
Create a characterization matrix base don the list of biosphere flows
given.
:param biosphere_flows:
:param methods: contains names of the methods to use.
:return:
"""

lcia_data = get_lcia_methods()

# create a numpy array filled with zeros
# of size equal to biosphere_flows and lcia methods
return {" - ".join(x["name"]): format_lcia_method_exchanges(x) for x in data}

cf_matrix = np.zeros((len(biosphere_flows), len(methods)))

# fill the matrix
for i, flow in enumerate(biosphere_flows):
for j, method in enumerate(methods):
try:
cf_matrix[i, j] = lcia_data[method][flow[:3]]
except KeyError:
continue

return cf_matrix
Loading

0 comments on commit e5fe184

Please sign in to comment.