From ffbb8c2d1a1834c5675ded75439d54bfcaba1323 Mon Sep 17 00:00:00 2001 From: alvarojhahn Date: Thu, 11 Apr 2024 14:38:09 +0200 Subject: [PATCH] Subshares dev --- pathways/data/technologies_shares.yaml | 2 +- pathways/lca.py | 230 +++++++++++++++++++------ pathways/pathways.py | 67 +++---- 3 files changed, 204 insertions(+), 95 deletions(-) diff --git a/pathways/data/technologies_shares.yaml b/pathways/data/technologies_shares.yaml index aee1da2..2488436 100644 --- a/pathways/data/technologies_shares.yaml +++ b/pathways/data/technologies_shares.yaml @@ -334,7 +334,7 @@ Battery-Stationary: ### Home storage uses NMC811 (w/o hyphen) in the current pre 2020: value: 0.05 2050: - min: 0.0.05 + min: 0.05 max: 0.33 distribution: uniform create copy: False diff --git a/pathways/lca.py b/pathways/lca.py index b7999fb..362cfc5 100644 --- a/pathways/lca.py +++ b/pathways/lca.py @@ -110,11 +110,14 @@ def get_matrix_arrays( matrix_filename = f"{matrix_type}_matrix.csv" - data, indices, sign, distributions = load_matrix_and_index( - dirpath / matrix_filename, - ) + if matrix_type == "A": + data, indices, sign, distributions = load_matrix_and_index(dirpath / matrix_filename,) + arrays = [data, indices, sign, distributions] + elif matrix_type == "B": + data, indices, _, distributions = load_matrix_and_index(dirpath / matrix_filename,) + arrays = [data, indices, distributions] - return [data, indices, sign, distributions] + return arrays def get_indices( @@ -185,7 +188,7 @@ def get_subshares_matrix( dp_correlated.add_persistent_array( matrix='technosphere_matrix', indices_array=a_indices, - data_array=a_data_samples.reshape((1, -1)), + data_array=a_data_samples, flip_array=a_sign, ) @@ -202,7 +205,6 @@ def adjust_matrix_based_on_shares(A_arrays, shares_dict, use_distributions, year """ data_array, indices_array, sign_array, _ = A_arrays - index_lookup = {(row["row"], row["col"]): i for i, row in enumerate(indices_array)} modified_data = [] @@ -223,58 +225,186 @@ def find_index(activity_idx, product_idx): for tech_category, regions in shares_dict.items(): for region, techs in regions.items(): - all_tech_indices = [ - techs[tech]["idx"] for tech in techs if techs[tech]["idx"] is not None - ] - all_product_indices = set() - - tech_indices = np.isin(indices_array['row'], all_tech_indices) - all_product_indices.update(indices_array['col'][tech_indices]) - + all_tech_indices = [details["idx"] for _, details in techs.items() if "idx" in details] + all_product_indices = set(indices_array['col'][np.isin(indices_array['row'], all_tech_indices)]) + + tech_group_ranges = {} + tech_group_defaults = {} + + for tech, details in techs.items(): + if year != 2020: + if details[2050]['distribution'] == 'uniform': + tech_group_ranges[tech] = (details[2050]['min'], details[year]['max']) + tech_group_defaults[tech] = details.get(2020, {}).get('value', 0) + else: + print('At this point, only uniform distributions are supported. Exiting.') + exit(1) + + if year != 2020 and tech_group_ranges: + group_shares = correlated_uniform_samples(tech_group_ranges, tech_group_defaults) + print('Tech group', tech_category, 'shares: ', group_shares) + else: + group_shares = {tech: details.get(year, {}).get("value", 0) for tech, details in techs.items()} + print('Tech group', tech_category, 'shares: ', group_shares) + for product_idx in all_product_indices: - # Vectorized operation to calculate total_output for each product_idx - relevant_indices = [ - find_index(tech_idx, product_idx) - for tech_idx in all_tech_indices - if find_index(tech_idx, product_idx) is not None - and tech_idx != product_idx - ] + relevant_indices = [find_index(idx, product_idx) for idx in all_tech_indices if find_index(idx, product_idx) is not None] total_output = np.sum(data_array[relevant_indices]) - for tech, details in techs.items(): - share = details.get(year, {}).get("value", 0) - idx = details["idx"] - if idx is None or share == 0: - continue - - # Calculate new amount - new_amount = total_output * share - index = find_index(idx, product_idx) - - # Adjust value or add new exchange - if ( - index is not None - and product_idx not in unique_product_indices_from_dict - ): # Exclude diagonal and undesired exchanges - data_array[index] = new_amount - # Append to modified_indices regardless of whether it's a new addition or an adjustment - modified_indices.append((idx, product_idx)) - modified_data.append(new_amount) - modified_signs.append(sign_array[index]) - elif product_idx not in unique_product_indices_from_dict: # Exclude diagonal and undesired exchanges - elif ( - product_idx not in unique_product_indices_from_dict - ): # Exclude diagonal and undesired exchanges - modified_data.append(new_amount) - modified_indices.append((idx, product_idx)) - modified_signs.append(True) # CHECK: I am assuming new exchanges are always positive - - modified_data_array = np.array(modified_data, dtype=np.float64) + for tech, share in group_shares.items(): + if tech in techs and 'idx' in techs[tech] and techs[tech]['idx'] is not None: + idx = techs[tech]['idx'] + + if year == 2020: + share_value = details.get(year, {}).get("value", 0) + new_amounts = np.array([total_output * share_value]).reshape((1, -1)) + else: + new_amounts = np.array([total_output * share for _ in range(use_distributions)]).reshape((1, -1)) + index = find_index(idx, product_idx) + + if index is not None and product_idx not in unique_product_indices_from_dict: + modified_indices.append((idx, product_idx)) + modified_data.append(new_amounts) + modified_signs.append(sign_array[index]) + elif index is None and product_idx not in unique_product_indices_from_dict: + modified_data.append(new_amounts) + modified_indices.append((idx, product_idx)) + modified_signs.append(True) + + # modified_data_array = np.array(modified_data, dtype=object) + modified_data_array = np.concatenate(modified_data, axis=0) modified_indices_array = np.array(modified_indices, dtype=bwp.INDICES_DTYPE) modified_signs_array = np.array(modified_signs, dtype=bool) return [modified_data_array, modified_indices_array, modified_signs_array] +# def adjust_matrix_based_on_shares(A_arrays, shares_dict, use_distributions, year): +# """ +# Adjust the technosphere matrix based on shares. +# :param data_array: +# :param indices_array: +# :param shares_dict: +# :param year: +# :return: +# """ +# +# data_array, indices_array, sign_array, _ = A_arrays +# index_lookup = {(row["row"], row["col"]): i for i, row in enumerate(indices_array)} +# +# modified_data = [] +# modified_indices = [] +# modified_signs = [] +# +# # Determine unique product indices from shares_dict to identify which shouldn't be automatically updated/added +# unique_product_indices_from_dict = set() +# for _, regions in shares_dict.items(): +# for _, techs in regions.items(): +# for _, details in techs.items(): +# if "idx" in details: +# unique_product_indices_from_dict.add(details["idx"]) +# +# # Helper function to find index using the lookup dictionary +# def find_index(activity_idx, product_idx): +# return index_lookup.get((activity_idx, product_idx)) +# +# for tech_category, regions in shares_dict.items(): +# for region, techs in regions.items(): +# if year != 2020: +# tech_group_ranges = {} +# tech_group_defaults = {} +# all_tech_indices = [] +# +# # Collect data for this technology group for correlated sampling +# for tech, details in techs.items(): +# if 'idx' in details and details['idx'] is not None: +# all_tech_indices.append(details['idx']) +# print(f'All tech indices: {all_tech_indices}') +# if details[2050]['distribution'] == 'uniform': +# tech_group_ranges[tech] = (details[2050]['min'], details[year]['max']) +# tech_group_defaults[tech] = details.get(2020, {}).get('value', 0) +# print(f'Tech: {tech}, ranges: {tech_group_ranges[tech]}, defaults: {tech_group_defaults[tech]}') +# else: +# print('At this point, only uniform distributions are supported. Exiting.') +# exit(1) +# +# # Generate correlated samples for this technology group +# if tech_group_ranges: +# group_shares = correlated_uniform_samples(tech_group_ranges, tech_group_defaults) +# print(group_shares) +# else: +# print('No tech_group_ranges found. Exiting.') +# +# all_product_indices = set(indices_array['col'][np.isin(indices_array['row'], all_tech_indices)]) +# for product_idx in all_product_indices: +# relevant_indices = [find_index(idx, product_idx) for idx in all_tech_indices if +# find_index(idx, product_idx) is not None] +# total_output = np.sum(data_array[relevant_indices]) +# +# for tech, share in group_shares.items(): +# if tech in techs and 'idx' in techs[tech] and techs[tech]['idx'] is not None: +# idx = techs[tech]['idx'] +# new_amounts = np.array([total_output * share for _ in range(use_distributions)]).reshape((1, -1)) +# index = find_index(idx, product_idx) +# if index is not None and product_idx not in unique_product_indices_from_dict: +# modified_indices.append((idx, product_idx)) +# modified_data.append(new_amounts) +# modified_signs.append(sign_array[index]) +# elif index is None and product_idx not in unique_product_indices_from_dict: +# modified_data.append(new_amounts) +# modified_indices.append((idx, product_idx)) +# modified_signs.append(True) +# else: +# for tech, details in techs.items(): +# share = details.get(year, {}).get("value", 0) +# idx = details["idx"] +# if idx is None or share == 0: +# continue +# # Calculate new amount +# new_amount = total_output * share +# index = find_index(idx, product_idx) +# +# # Adjust value or add new exchange +# if ( +# index is not None +# and product_idx not in unique_product_indices_from_dict +# ): # Exclude diagonal and undesired exchanges +# data_array[index] = new_amount +# # Append to modified_indices regardless of whether it's a new addition or an adjustment +# modified_indices.append((idx, product_idx)) +# modified_data.append(np.array([[new_amount]])) +# modified_signs.append(sign_array[index]) +# elif product_idx not in unique_product_indices_from_dict: # Exclude diagonal and undesired exchanges +# modified_data.append(np.array([[new_amount]])) +# modified_indices.append((idx, product_idx)) +# modified_signs.append(True) # Assuming new exchanges are positive +# +# # modified_data_array = np.array(modified_data, dtype=object) +# modified_data_array = np.concatenate(modified_data, axis=0) +# modified_indices_array = np.array(modified_indices, dtype=bwp.INDICES_DTYPE) +# modified_signs_array = np.array(modified_signs, dtype=bool) +# +# return [modified_data_array, modified_indices_array, modified_signs_array] + + +def correlated_uniform_samples(ranges, defaults, iterations=1000): + """ + Adjusts randomly selected shares for parameters to sum to 1 while respecting their specified ranges. + + :param ranges: Dict with parameter names as keys and (min, max) tuples as values. + :param defaults: Dict with default values for each parameter. + :param iterations: Number of iterations to attempt to find a valid distribution. + :return: A dict with the adjusted shares for each parameter. + """ + for _ in range(iterations): + shares = {param: np.random.uniform(low, high) for param, (low, high) in ranges.items()} + total_share = sum(shares.values()) + shares = {param: share / total_share for param, share in shares.items()} + if all(ranges[param][0] <= share <= ranges[param][1] for param, share in shares.items()): + return shares + + print("Failed to find a valid distribution after {} iterations".format(iterations)) + return defaults + def fill_characterization_factors_matrices( biosphere_flows: dict, methods, biosphere_dict, debug=False diff --git a/pathways/pathways.py b/pathways/pathways.py index 93b2235..2ae69fc 100644 --- a/pathways/pathways.py +++ b/pathways/pathways.py @@ -532,29 +532,31 @@ def _calculate_year(args): ], ) lca.lci(factorize=True) - elif use_distributions != 0 and subshares == False: - lca = MonteCarloLCA( - demand={0: 1}, - data_objs=[ - bw_datapackage, - ], - use_distributions=True, - ) - lca.lci() else: + if subshares is False: + + lca = MonteCarloLCA( + demand={0: 1}, + data_objs=[ + bw_datapackage, + ], + use_distributions=True, + ) + lca.lci() + else: - shares_indices = subshares_indices(regions, technosphere_indices, geo) - correlated_arrays = adjust_matrix_based_on_shares(A_arrays, shares_indices, use_distributions, year) + shares_indices = subshares_indices(regions, technosphere_indices, geo) + correlated_arrays = adjust_matrix_based_on_shares(A_arrays, shares_indices, use_distributions, year) - bw_correlated = get_subshares_matrix(correlated_arrays) + bw_correlated = get_subshares_matrix(correlated_arrays) - lca = MonteCarloLCA( - demand={0: 1}, - data_objs=[bw_datapackage, bw_correlated], - use_distributions=True, - use_arrays=True, - ) - lca.lci() + lca = MonteCarloLCA( + demand={0: 1}, + data_objs=[bw_datapackage, bw_correlated], + use_distributions=True, + use_arrays=True, + ) + lca.lci() characterization_matrix = fill_characterization_factors_matrices( biosphere_indices, methods, lca.dicts.biosphere, debug @@ -598,25 +600,6 @@ def _calculate_year(args): return results - -def correlated_uniform_montecarlo(ranges, defaults, iterations): - """ - Adjusts randomly selected shares for parameters to sum to 1 while respecting their specified ranges. - - :param ranges: Dict with parameter names as keys and (min, max) tuples as values. - :param defaults: Dict with default values for each parameter. - :param iterations: Number of iterations to attempt to find a valid distribution. - :return: A dict with the adjusted shares for each parameter. - """ - for _ in range(iterations): - shares = {param: np.random.uniform(low, high) for param, (low, high) in ranges.items()} - total_share = sum(shares.values()) - shares = {param: share / total_share for param, share in shares.items()} - if all(ranges[param][0] <= share <= ranges[param][1] for param, share in shares.items()): - return shares - return defaults - - class Pathways: """The Pathways class reads in a datapackage that contains scenario data, mapping between scenario variables and LCA datasets, and LCA matrices. @@ -894,9 +877,8 @@ def calculate( # Create xarray for storing LCA results if not already present if self.lca_results is None: - _, technosphere_index, biosphere_index = get_lca_matrices( - self.datapackage, models[0], scenarios[0], years[0] - ) + dirpath = get_dirpath(self.datapackage, models[0], scenarios[0], years[0]) + technosphere_index, biosphere_index = get_indices(dirpath) locations = fetch_inventories_locations(technosphere_index) self.lca_results = create_lca_results_array( @@ -980,9 +962,6 @@ def calculate( # remove None values in results results = {k: v for k, v in results.items() if v is not None} - test = subshares_indices(regions, technosphere_index, Geomap(model=model)) - print(test) - self.fill_in_result_array(results) def fill_in_result_array(self, results: dict):