Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/subshares_romain' into subshares…
Browse files Browse the repository at this point in the history
…_romain
  • Loading branch information
alvarojhahn committed May 2, 2024
2 parents ba720fd + 87809c7 commit ad82620
Showing 1 changed file with 68 additions and 44 deletions.
112 changes: 68 additions & 44 deletions pathways/stats.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import pandas as pd
import os
import re
from pathlib import Path

import pandas as pd
import statsmodels.api as sm
import re
from openpyxl import load_workbook
import os


def log_subshares_to_excel(model: str, scenario: str, year: int, shares: dict):
Expand All @@ -27,11 +28,13 @@ def log_subshares_to_excel(model: str, scenario: str, year: int, shares: dict):

num_iterations = len(shares[first_tech][year][next(iter(shares[first_tech][year]))])
for i in range(num_iterations):
iteration_data = {'Iteration': i + 1, 'Year': year}
iteration_data = {"Iteration": i + 1, "Year": year}
for tech, years_data in shares.items():
if year in years_data:
for subtype, values in years_data[year].items():
iteration_data[f"{tech}_{subtype}"] = values[i] if i < len(values) else None
iteration_data[f"{tech}_{subtype}"] = (
values[i] if i < len(values) else None
)
data.append(iteration_data)

new_df = pd.DataFrame(data)
Expand All @@ -40,12 +43,21 @@ def log_subshares_to_excel(model: str, scenario: str, year: int, shares: dict):
if os.path.exists(filename):
df_existing = pd.read_excel(filename)
# Merge new data into existing data, selectively updating share columns
combined_df = df_existing.set_index(['Iteration', 'Year']).combine_first(
new_df.set_index(['Iteration', 'Year'])).reset_index()
combined_df = (
df_existing.set_index(["Iteration", "Year"])
.combine_first(new_df.set_index(["Iteration", "Year"]))
.reset_index()
)
# Optionally, ensure the columns are in a meaningful order
new_columns = [col for col in new_df.columns if col not in ['Iteration', 'Year']]
existing_columns = [col for col in df_existing.columns if col not in new_df.columns]
combined_df = combined_df[['Iteration', 'Year'] + new_columns + existing_columns]
new_columns = [
col for col in new_df.columns if col not in ["Iteration", "Year"]
]
existing_columns = [
col for col in df_existing.columns if col not in new_df.columns
]
combined_df = combined_df[
["Iteration", "Year"] + new_columns + existing_columns
]

combined_df.to_excel(filename, index=False)
else:
Expand All @@ -63,7 +75,7 @@ def log_intensities_to_excel(model: str, scenario: str, year: int, new_data: dic
:param year: The year for which the data is logged.
:param new_data: Dictionary where keys are the new column names and values are lists of data for each column.
"""
filename = f'stats_report_{model}_{scenario}_{year}.xlsx'
filename = f"stats_report_{model}_{scenario}_{year}.xlsx"

if not new_data:
print("Warning: No new data provided to log.")
Expand All @@ -76,39 +88,46 @@ def log_intensities_to_excel(model: str, scenario: str, year: int, new_data: dic

df_new = pd.DataFrame(index=range(max_length), columns=new_data.keys())
for key, values in new_data.items():
df_new[key][:len(values)] = values
df_new[key][: len(values)] = values

df_new['Iteration'] = range(1, max_length + 1)
df_new['Year'] = [year] * max_length
df_new["Iteration"] = range(1, max_length + 1)
df_new["Year"] = [year] * max_length

combined_df = pd.merge(df_existing, df_new, on=['Iteration', 'Year'], how='outer', suffixes=('', '_new'))
combined_df = pd.merge(
df_existing,
df_new,
on=["Iteration", "Year"],
how="outer",
suffixes=("", "_new"),
)

for col in df_new.columns:
if col + '_new' in combined_df:
combined_df[col].update(combined_df.pop(col + '_new'))
if col + "_new" in combined_df:
combined_df[col].update(combined_df.pop(col + "_new"))

# Remove any '_new' columns if they still exist after updates
combined_df = combined_df.loc[:, ~combined_df.columns.str.endswith('_new')]
combined_df = combined_df.loc[:, ~combined_df.columns.str.endswith("_new")]

df = combined_df
else:
max_length = max(len(v) for v in new_data.values())
df = pd.DataFrame(new_data, index=range(max_length))
df['Iteration'] = range(1, max_length + 1)
df['Year'] = [year] * max_length
df["Iteration"] = range(1, max_length + 1)
df["Year"] = [year] * max_length

df.to_excel(filename, index=False)
except Exception as e:
print(f"Failed to update the Excel file: {e}")


def log_results_to_excel(
model: str,
scenario: str,
year: int,
total_impacts_by_method: dict,
methods: list,
filepath=None):
model: str,
scenario: str,
year: int,
total_impacts_by_method: dict,
methods: list,
filepath=None,
):
"""
Log the characterized inventory results for each LCIA method into separate columns in an Excel file.
Expand All @@ -132,14 +151,16 @@ def log_results_to_excel(
for method, impacts in total_impacts_by_method.items():
df[method] = pd.Series(impacts)

base_cols = ['Iteration', 'Year'] if 'Iteration' in df.columns else []
base_cols = ["Iteration", "Year"] if "Iteration" in df.columns else []
other_cols = [col for col in df.columns if col not in base_cols + methods]
df = df[base_cols + methods + other_cols]

df.to_excel(filepath, index=False)


def create_mapping_sheet(filepaths: list, model: str, scenario: str, year: int, parameter_keys: list):
def create_mapping_sheet(
filepaths: list, model: str, scenario: str, year: int, parameter_keys: list
):
"""
Create a mapping sheet for the activities with uncertainties.
:param filepaths: List of paths to data files.
Expand All @@ -154,11 +175,11 @@ def filter_filepaths(suffix: str, contains: list):
Path(fp)
for fp in filepaths
if all(kw in fp for kw in contains)
and Path(fp).suffix == suffix
and Path(fp).exists()
and Path(fp).suffix == suffix
and Path(fp).exists()
]

unique_indices = {int(idx) for key in parameter_keys for idx in key.split('_to_')}
unique_indices = {int(idx) for key in parameter_keys for idx in key.split("_to_")}

fps = filter_filepaths(".csv", [model, scenario, str(year)])
if len(fps) < 1:
Expand All @@ -176,13 +197,17 @@ def filter_filepaths(suffix: str, contains: list):
technosphere_inds = pd.read_csv(technosphere_indices_path, sep=";", header=None)
technosphere_inds.columns = ["Activity", "Product", "Unit", "Location", "Index"]

mapping_df = technosphere_inds[technosphere_inds['Index'].isin(unique_indices)]
mapping_df = mapping_df[["Activity", "Product", "Location", "Unit", "Index"]] # Restrict columns if necessary
mapping_df = technosphere_inds[technosphere_inds["Index"].isin(unique_indices)]
mapping_df = mapping_df[
["Activity", "Product", "Location", "Unit", "Index"]
] # Restrict columns if necessary

excel_path = f"stats_report_{model}_{scenario}_{year}.xlsx"

try:
with pd.ExcelWriter(excel_path, mode='a', engine='openpyxl', if_sheet_exists='replace') as writer:
with pd.ExcelWriter(
excel_path, mode="a", engine="openpyxl", if_sheet_exists="replace"
) as writer:
mapping_df.to_excel(writer, index=False, sheet_name="Mapping")
except Exception as e:
print(f"Error writing mapping sheet to {excel_path}: {str(e)}")
Expand All @@ -196,7 +221,7 @@ def escape_formula(text: str):
:param text: The string to be adjusted.
:return: The adjusted string.
"""
return "'" + text if text.startswith(('=', '-', '+')) else text
return "'" + text if text.startswith(("=", "-", "+")) else text


def run_stats_analysis(model: str, scenario: str, year: int, methods: list):
Expand All @@ -212,24 +237,26 @@ def run_stats_analysis(model: str, scenario: str, year: int, methods: list):
:param methods: Methods corresponding to dataset columns.
"""

filename = f'stats_report_{model}_{scenario}_{year}.xlsx'
filename = f"stats_report_{model}_{scenario}_{year}.xlsx"

try:
book = load_workbook(filename)
except FileNotFoundError:
book = pd.ExcelWriter(filename, engine='openpyxl') # Create a new workbook if not found
book = pd.ExcelWriter(
filename, engine="openpyxl"
) # Create a new workbook if not found
book.close()
book = load_workbook(filename)

data = pd.read_excel(filename, sheet_name='Sheet1')
data = pd.read_excel(filename, sheet_name="Sheet1")

for idx, method in enumerate(methods):
if method not in data.columns:
print(f"Data for {method} not found in the file.")
continue

Y = data[method]
X = data.drop(columns=['Iteration', 'Year'] + methods)
X = data.drop(columns=["Iteration", "Year"] + methods)
X = sm.add_constant(X)

model_results = sm.OLS(Y, X).fit()
Expand All @@ -246,15 +273,12 @@ def run_stats_analysis(model: str, scenario: str, year: int, methods: list):
book.remove(std)
ws = book.create_sheet(sheet_name)

summary_lines = summary.split('\n')
summary_lines = summary.split("\n")

for line in summary_lines:
line = escape_formula(line)
columns = re.split(r'\s{2,}', line)
columns = re.split(r"\s{2,}", line)
ws.append(columns)

book.save(filename)
print("Analysis complete and results saved.")



0 comments on commit ad82620

Please sign in to comment.