Last active
March 28, 2023 10:23
-
-
Save transfluxus/a2bef148adbd1dca260b1aaf9dc03a67 to your computer and use it in GitHub Desktop.
LCA-ana
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from pathlib import Path | |
from typing import Optional, Union | |
import numpy as np | |
import pandas as pd | |
from frictionless import extract | |
import matplotlib.pyplot as plt | |
class Experiment: | |
def __init__(self, experiment_file: str): | |
self.file_path = Path(experiment_file) | |
self.experiment_path = self.file_path.parent.joinpath(self.file_path.stem) | |
if not self.file_path.exists(): | |
raise ValueError(f"File {self.file_path} does not exist") | |
self.setup_folder() | |
def setup_folder(self): | |
self.experiment_path.mkdir(exist_ok=True) | |
self.experiment_path.joinpath("results").mkdir(exist_ok=True) | |
self.experiment_path.joinpath("results/plots").mkdir(exist_ok=True) | |
self.experiment_path.joinpath("results/simple_trees").mkdir(exist_ok=True) | |
self.experiment_path.joinpath("split-scenario").mkdir(exist_ok=True) | |
self.experiment_path.joinpath("split-scenario-indicator").mkdir(exist_ok=True) | |
def select_file(self, scenario: str, indicator: Optional[str]) -> str: | |
if indicator: | |
path = self.experiment_path.joinpath(f"split-scenario-indicator/{scenario}_{indicator}.csv") | |
else: | |
path = self.experiment_path.joinpath(f"split-scenario/{scenario}.csv") | |
if not path.exists(): | |
raise ValueError(f"File {path} does not exist") | |
return path.as_posix() | |
def get_data(self, scenario: str, indicator: Optional[str]) -> list[dict[str, any]]: | |
data = extract(self.select_file(scenario, indicator)) | |
key0 = list(data.keys())[0] | |
return data[key0] | |
def split_experiment(self): | |
data: dict[str, any] = extract(self.file_path.as_posix())[self.file_path.stem] | |
# turn data into a pandas dataframe | |
complete_df = pd.DataFrame(data) | |
# group by scenario | |
scenarios = complete_df.groupby("Scenario") | |
# get the name of each group | |
scenarios.groups.keys() | |
# write each group into its own csv file with the name of the group | |
for scenario, group in scenarios: | |
group.to_csv(self.experiment_path.joinpath(f"split-scenario/f{scenario}.csv").as_posix(), | |
index=False) | |
# split each group further by the "Indicator" column | |
indicators = group.groupby("Indicator") | |
# write each indicator group into a file in "split-scenario-indicator" folder | |
for indicator_name, indicator_group in indicators: | |
indicator_group.to_csv(self.experiment_path.joinpath(f"split-scenario-indicator/" | |
f"{scenario}_{indicator_name}.csv", | |
).as_posix(), index=False) | |
# get all scenarios and write them into a file | |
scenarios = complete_df["Scenario"].unique() | |
with self.experiment_path.joinpath("scenarios.json").open("w") as f: | |
json.dump(scenarios.tolist(), f, indent=2) | |
# get all indicators and write them into a file | |
indicators = complete_df["Indicator"].unique() | |
with self.experiment_path.joinpath("indicators.json").open("w") as f: | |
json.dump(indicators.tolist(), f, indent=2) | |
processors = complete_df["Processor"].unique() | |
with self.experiment_path.joinpath("processor.json").open("w") as f: | |
json.dump([p.split(".") for p in processors.tolist()], f, indent=2) | |
def get_scenario_indicator_data(self, | |
scenario: str, | |
indicator: str, | |
processor_name: Union[str, tuple, list]) -> pd.DataFrame: | |
data = self.get_data(scenario, indicator) | |
complete_df = pd.DataFrame(data) | |
# split the df Processor column by "." and add the result into columns named | |
# "level_x" where x is the level starting from 0. | |
complete_df = complete_df.join(complete_df["Processor"].str.split(".", expand=True).add_prefix("level_")) | |
# split the processor_name into a tuple by "." | |
if isinstance(processor_name, str): | |
processor_name = tuple(processor_name.split(".")) | |
# filter the df by the processor_name | |
df = complete_df.copy() | |
for i, name in enumerate(processor_name): | |
df = df[df[f"level_{i}"] == name] | |
# get the rows of the next level bytes filtering "Dendrogram level" column | |
df = df[df["Dendrogram level"] == len(processor_name)] | |
# sum up the values the same level as the processor_name | |
# others = complete_df[complete_df["Dendrogram level"] == len(processor_name) - 2] | |
# we don't need that. just get the total sum at the top level | |
total_value = float(complete_df[complete_df["Dendrogram level"] == 0]["Value"]) | |
# print(total_value) | |
# convert values in Value to float | |
df["Value"] = df["Value"].astype(float) | |
# throw out all columns but "level_x" and "Value" | |
df = df[[f"level_{len(processor_name)}", "Value"]] | |
# rename the "level_x" column to "Technology" | |
df = df.rename(columns={f"level_{len(processor_name)}": "Technology"}) | |
# all a column relative to the total value | |
df["Relative"] = df["Value"] / total_value | |
return df | |
def plot_impacts(self, | |
scenarios: Union[str, list[str]], | |
indicator: str, | |
processor_name: Union[str, tuple, list]) -> pd.DataFrame: | |
scenarios_data = { | |
scenario: self.get_scenario_indicator_data(scenario, indicator, processor_name) | |
for scenario in (scenarios if isinstance(scenarios, list) else [scenarios]) | |
} | |
technologies = list(list(scenarios_data.values())[0]["Technology"]) | |
#print(technologies) | |
# weights = { | |
# np.array(sc[sc["Technology" == tech]]["Values"] for sc in scenarios_data.values()) | |
# for tech in technologies | |
# } | |
for tech in technologies: | |
print(tech) | |
for scenario in scenarios_data.values(): | |
# print(scenario) | |
v = list(scenario[scenario["Technology"] == tech]["Value"])[0] | |
print(tech, v) | |
return | |
# # create a stacked bar | |
# # absolute numbers | |
# fig, ax = plt.subplots() | |
# # for each row of the df | |
# bottom = 0 | |
# for i, row in df.iterrows(): | |
# value = row["Value"] | |
# technology = row["Technology"] | |
# ax.bar(f"{scenario}\nf{indicator}", value, label=technology, bottom=bottom) | |
# bottom += value | |
# ax.legend(loc="upper right") | |
# fig.savefig(self.experiment_path.joinpath(f"results/plots/{scenario}_{indicator}.png")) | |
# | |
# # relative numbers | |
# fig, ax = plt.subplots() | |
# # for each row of the df | |
# bottom = 0 | |
# for i, row in df.iterrows(): | |
# # get the value of the row | |
# value = row["Relative"] | |
# technology = row["Technology"] | |
# ax.bar(f"{scenario}\nf{indicator}\nREL", value, label=technology, bottom=bottom) | |
# bottom += value | |
# ax.legend(loc="upper right") | |
# | |
# plt.show() | |
# fig.savefig(self.experiment_path.joinpath(f"results/plots/{scenario}_{indicator}-REL.png")) | |
return scenarios_data | |
def build_simple_tree(self, scenario: str, indicator: str, save: bool = True) -> dict[str, any]: | |
data = self.get_data(scenario, indicator) | |
tree = {} | |
for row in data: | |
# get the value in column called "Processor" | |
proc = row["Processor"] | |
proc_tuple = tuple(proc.split(".")) | |
if len(proc_tuple) == 1: | |
tree[proc] = {"name": proc, "children": {}, "value": float(row["Value"])} | |
else: | |
# find all parent nodes starting from the root | |
parent = tree | |
for i in range(len(proc_tuple) - 1): | |
parent = parent[proc_tuple[i]]["children"] | |
parent[proc_tuple[-1]] = {"name": proc, "children": {}, "value": float(row["Value"])} | |
json.dump(tree, open(self.experiment_path.joinpath(f"results/simple_trees/{scenario}_{indicator}.json"), "w"), | |
indent=2, ensure_ascii=False) | |
return tree | |
exp = Experiment("../data/results_filtered.csv") | |
# exp.split_experiment() | |
# exp.build_simple_tree("installed_2020", "ionising_radiation_ionising_radiation_potential__IRP_") | |
df = exp.plot_impacts("installed_2020", "ionising_radiation_ionising_radiation_potential__IRP_", | |
[ | |
"electricity_production", | |
"renewables", "others_renew"]) | |
# print(df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment