Skip to content

Commit

Permalink
Subshares dev
Browse files Browse the repository at this point in the history
  • Loading branch information
alvarojhahn committed Apr 11, 2024
1 parent fb679aa commit ffbb8c2
Show file tree
Hide file tree
Showing 3 changed files with 204 additions and 95 deletions.
2 changes: 1 addition & 1 deletion pathways/data/technologies_shares.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ Battery-Stationary: ### Home storage uses NMC811 (w/o hyphen) in the current pre
2020:
value: 0.05
2050:
min: 0.0.05
min: 0.05
max: 0.33
distribution: uniform
create copy: False
230 changes: 180 additions & 50 deletions pathways/lca.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,14 @@ def get_matrix_arrays(

matrix_filename = f"{matrix_type}_matrix.csv"

data, indices, sign, distributions = load_matrix_and_index(
dirpath / matrix_filename,
)
if matrix_type == "A":
data, indices, sign, distributions = load_matrix_and_index(dirpath / matrix_filename,)
arrays = [data, indices, sign, distributions]
elif matrix_type == "B":
data, indices, _, distributions = load_matrix_and_index(dirpath / matrix_filename,)
arrays = [data, indices, distributions]

return [data, indices, sign, distributions]
return arrays


def get_indices(
Expand Down Expand Up @@ -185,7 +188,7 @@ def get_subshares_matrix(
dp_correlated.add_persistent_array(
matrix='technosphere_matrix',
indices_array=a_indices,
data_array=a_data_samples.reshape((1, -1)),
data_array=a_data_samples,
flip_array=a_sign,
)

Expand All @@ -202,7 +205,6 @@ def adjust_matrix_based_on_shares(A_arrays, shares_dict, use_distributions, year
"""

data_array, indices_array, sign_array, _ = A_arrays

index_lookup = {(row["row"], row["col"]): i for i, row in enumerate(indices_array)}

modified_data = []
Expand All @@ -223,58 +225,186 @@ def find_index(activity_idx, product_idx):

for tech_category, regions in shares_dict.items():
for region, techs in regions.items():
all_tech_indices = [
techs[tech]["idx"] for tech in techs if techs[tech]["idx"] is not None
]
all_product_indices = set()

tech_indices = np.isin(indices_array['row'], all_tech_indices)
all_product_indices.update(indices_array['col'][tech_indices])

all_tech_indices = [details["idx"] for _, details in techs.items() if "idx" in details]
all_product_indices = set(indices_array['col'][np.isin(indices_array['row'], all_tech_indices)])

tech_group_ranges = {}
tech_group_defaults = {}

for tech, details in techs.items():
if year != 2020:
if details[2050]['distribution'] == 'uniform':
tech_group_ranges[tech] = (details[2050]['min'], details[year]['max'])
tech_group_defaults[tech] = details.get(2020, {}).get('value', 0)
else:
print('At this point, only uniform distributions are supported. Exiting.')
exit(1)

if year != 2020 and tech_group_ranges:
group_shares = correlated_uniform_samples(tech_group_ranges, tech_group_defaults)
print('Tech group', tech_category, 'shares: ', group_shares)
else:
group_shares = {tech: details.get(year, {}).get("value", 0) for tech, details in techs.items()}
print('Tech group', tech_category, 'shares: ', group_shares)

for product_idx in all_product_indices:
# Vectorized operation to calculate total_output for each product_idx
relevant_indices = [
find_index(tech_idx, product_idx)
for tech_idx in all_tech_indices
if find_index(tech_idx, product_idx) is not None
and tech_idx != product_idx
]
relevant_indices = [find_index(idx, product_idx) for idx in all_tech_indices if find_index(idx, product_idx) is not None]
total_output = np.sum(data_array[relevant_indices])

for tech, details in techs.items():
share = details.get(year, {}).get("value", 0)
idx = details["idx"]
if idx is None or share == 0:
continue

# Calculate new amount
new_amount = total_output * share
index = find_index(idx, product_idx)

# Adjust value or add new exchange
if (
index is not None
and product_idx not in unique_product_indices_from_dict
): # Exclude diagonal and undesired exchanges
data_array[index] = new_amount
# Append to modified_indices regardless of whether it's a new addition or an adjustment
modified_indices.append((idx, product_idx))
modified_data.append(new_amount)
modified_signs.append(sign_array[index])
elif product_idx not in unique_product_indices_from_dict: # Exclude diagonal and undesired exchanges
elif (
product_idx not in unique_product_indices_from_dict
): # Exclude diagonal and undesired exchanges
modified_data.append(new_amount)
modified_indices.append((idx, product_idx))
modified_signs.append(True) # CHECK: I am assuming new exchanges are always positive

modified_data_array = np.array(modified_data, dtype=np.float64)
for tech, share in group_shares.items():
if tech in techs and 'idx' in techs[tech] and techs[tech]['idx'] is not None:
idx = techs[tech]['idx']

if year == 2020:
share_value = details.get(year, {}).get("value", 0)
new_amounts = np.array([total_output * share_value]).reshape((1, -1))
else:
new_amounts = np.array([total_output * share for _ in range(use_distributions)]).reshape((1, -1))
index = find_index(idx, product_idx)

if index is not None and product_idx not in unique_product_indices_from_dict:
modified_indices.append((idx, product_idx))
modified_data.append(new_amounts)
modified_signs.append(sign_array[index])
elif index is None and product_idx not in unique_product_indices_from_dict:
modified_data.append(new_amounts)
modified_indices.append((idx, product_idx))
modified_signs.append(True)

# modified_data_array = np.array(modified_data, dtype=object)
modified_data_array = np.concatenate(modified_data, axis=0)
modified_indices_array = np.array(modified_indices, dtype=bwp.INDICES_DTYPE)
modified_signs_array = np.array(modified_signs, dtype=bool)

return [modified_data_array, modified_indices_array, modified_signs_array]

# def adjust_matrix_based_on_shares(A_arrays, shares_dict, use_distributions, year):
# """
# Adjust the technosphere matrix based on shares.
# :param data_array:
# :param indices_array:
# :param shares_dict:
# :param year:
# :return:
# """
#
# data_array, indices_array, sign_array, _ = A_arrays
# index_lookup = {(row["row"], row["col"]): i for i, row in enumerate(indices_array)}
#
# modified_data = []
# modified_indices = []
# modified_signs = []
#
# # Determine unique product indices from shares_dict to identify which shouldn't be automatically updated/added
# unique_product_indices_from_dict = set()
# for _, regions in shares_dict.items():
# for _, techs in regions.items():
# for _, details in techs.items():
# if "idx" in details:
# unique_product_indices_from_dict.add(details["idx"])
#
# # Helper function to find index using the lookup dictionary
# def find_index(activity_idx, product_idx):
# return index_lookup.get((activity_idx, product_idx))
#
# for tech_category, regions in shares_dict.items():
# for region, techs in regions.items():
# if year != 2020:
# tech_group_ranges = {}
# tech_group_defaults = {}
# all_tech_indices = []
#
# # Collect data for this technology group for correlated sampling
# for tech, details in techs.items():
# if 'idx' in details and details['idx'] is not None:
# all_tech_indices.append(details['idx'])
# print(f'All tech indices: {all_tech_indices}')
# if details[2050]['distribution'] == 'uniform':
# tech_group_ranges[tech] = (details[2050]['min'], details[year]['max'])
# tech_group_defaults[tech] = details.get(2020, {}).get('value', 0)
# print(f'Tech: {tech}, ranges: {tech_group_ranges[tech]}, defaults: {tech_group_defaults[tech]}')
# else:
# print('At this point, only uniform distributions are supported. Exiting.')
# exit(1)
#
# # Generate correlated samples for this technology group
# if tech_group_ranges:
# group_shares = correlated_uniform_samples(tech_group_ranges, tech_group_defaults)
# print(group_shares)
# else:
# print('No tech_group_ranges found. Exiting.')
#
# all_product_indices = set(indices_array['col'][np.isin(indices_array['row'], all_tech_indices)])
# for product_idx in all_product_indices:
# relevant_indices = [find_index(idx, product_idx) for idx in all_tech_indices if
# find_index(idx, product_idx) is not None]
# total_output = np.sum(data_array[relevant_indices])
#
# for tech, share in group_shares.items():
# if tech in techs and 'idx' in techs[tech] and techs[tech]['idx'] is not None:
# idx = techs[tech]['idx']
# new_amounts = np.array([total_output * share for _ in range(use_distributions)]).reshape((1, -1))
# index = find_index(idx, product_idx)
# if index is not None and product_idx not in unique_product_indices_from_dict:
# modified_indices.append((idx, product_idx))
# modified_data.append(new_amounts)
# modified_signs.append(sign_array[index])
# elif index is None and product_idx not in unique_product_indices_from_dict:
# modified_data.append(new_amounts)
# modified_indices.append((idx, product_idx))
# modified_signs.append(True)
# else:
# for tech, details in techs.items():
# share = details.get(year, {}).get("value", 0)
# idx = details["idx"]
# if idx is None or share == 0:
# continue
# # Calculate new amount
# new_amount = total_output * share
# index = find_index(idx, product_idx)
#
# # Adjust value or add new exchange
# if (
# index is not None
# and product_idx not in unique_product_indices_from_dict
# ): # Exclude diagonal and undesired exchanges
# data_array[index] = new_amount
# # Append to modified_indices regardless of whether it's a new addition or an adjustment
# modified_indices.append((idx, product_idx))
# modified_data.append(np.array([[new_amount]]))
# modified_signs.append(sign_array[index])
# elif product_idx not in unique_product_indices_from_dict: # Exclude diagonal and undesired exchanges
# modified_data.append(np.array([[new_amount]]))
# modified_indices.append((idx, product_idx))
# modified_signs.append(True) # Assuming new exchanges are positive
#
# # modified_data_array = np.array(modified_data, dtype=object)
# modified_data_array = np.concatenate(modified_data, axis=0)
# modified_indices_array = np.array(modified_indices, dtype=bwp.INDICES_DTYPE)
# modified_signs_array = np.array(modified_signs, dtype=bool)
#
# return [modified_data_array, modified_indices_array, modified_signs_array]


def correlated_uniform_samples(ranges, defaults, iterations=1000):
"""
Adjusts randomly selected shares for parameters to sum to 1 while respecting their specified ranges.
:param ranges: Dict with parameter names as keys and (min, max) tuples as values.
:param defaults: Dict with default values for each parameter.
:param iterations: Number of iterations to attempt to find a valid distribution.
:return: A dict with the adjusted shares for each parameter.
"""
for _ in range(iterations):
shares = {param: np.random.uniform(low, high) for param, (low, high) in ranges.items()}
total_share = sum(shares.values())
shares = {param: share / total_share for param, share in shares.items()}
if all(ranges[param][0] <= share <= ranges[param][1] for param, share in shares.items()):
return shares

print("Failed to find a valid distribution after {} iterations".format(iterations))
return defaults


def fill_characterization_factors_matrices(
biosphere_flows: dict, methods, biosphere_dict, debug=False
Expand Down
67 changes: 23 additions & 44 deletions pathways/pathways.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,29 +532,31 @@ def _calculate_year(args):
],
)
lca.lci(factorize=True)
elif use_distributions != 0 and subshares == False:
lca = MonteCarloLCA(
demand={0: 1},
data_objs=[
bw_datapackage,
],
use_distributions=True,
)
lca.lci()
else:
if subshares is False:

lca = MonteCarloLCA(
demand={0: 1},
data_objs=[
bw_datapackage,
],
use_distributions=True,
)
lca.lci()
else:

shares_indices = subshares_indices(regions, technosphere_indices, geo)
correlated_arrays = adjust_matrix_based_on_shares(A_arrays, shares_indices, use_distributions, year)
shares_indices = subshares_indices(regions, technosphere_indices, geo)
correlated_arrays = adjust_matrix_based_on_shares(A_arrays, shares_indices, use_distributions, year)

bw_correlated = get_subshares_matrix(correlated_arrays)
bw_correlated = get_subshares_matrix(correlated_arrays)

lca = MonteCarloLCA(
demand={0: 1},
data_objs=[bw_datapackage, bw_correlated],
use_distributions=True,
use_arrays=True,
)
lca.lci()
lca = MonteCarloLCA(
demand={0: 1},
data_objs=[bw_datapackage, bw_correlated],
use_distributions=True,
use_arrays=True,
)
lca.lci()

characterization_matrix = fill_characterization_factors_matrices(
biosphere_indices, methods, lca.dicts.biosphere, debug
Expand Down Expand Up @@ -598,25 +600,6 @@ def _calculate_year(args):

return results


def correlated_uniform_montecarlo(ranges, defaults, iterations):
"""
Adjusts randomly selected shares for parameters to sum to 1 while respecting their specified ranges.
:param ranges: Dict with parameter names as keys and (min, max) tuples as values.
:param defaults: Dict with default values for each parameter.
:param iterations: Number of iterations to attempt to find a valid distribution.
:return: A dict with the adjusted shares for each parameter.
"""
for _ in range(iterations):
shares = {param: np.random.uniform(low, high) for param, (low, high) in ranges.items()}
total_share = sum(shares.values())
shares = {param: share / total_share for param, share in shares.items()}
if all(ranges[param][0] <= share <= ranges[param][1] for param, share in shares.items()):
return shares
return defaults


class Pathways:
"""The Pathways class reads in a datapackage that contains scenario data,
mapping between scenario variables and LCA datasets, and LCA matrices.
Expand Down Expand Up @@ -894,9 +877,8 @@ def calculate(

# Create xarray for storing LCA results if not already present
if self.lca_results is None:
_, technosphere_index, biosphere_index = get_lca_matrices(
self.datapackage, models[0], scenarios[0], years[0]
)
dirpath = get_dirpath(self.datapackage, models[0], scenarios[0], years[0])
technosphere_index, biosphere_index = get_indices(dirpath)
locations = fetch_inventories_locations(technosphere_index)

self.lca_results = create_lca_results_array(
Expand Down Expand Up @@ -980,9 +962,6 @@ def calculate(
# remove None values in results
results = {k: v for k, v in results.items() if v is not None}

test = subshares_indices(regions, technosphere_index, Geomap(model=model))
print(test)

self.fill_in_result_array(results)

def fill_in_result_array(self, results: dict):
Expand Down

0 comments on commit ffbb8c2

Please sign in to comment.