Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First-Tier contribution analysis tab #1046

Draft
wants to merge 15 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
330 changes: 324 additions & 6 deletions activity_browser/layouts/tabs/LCA_results_tabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,26 @@
"""

from collections import namedtuple
from typing import List, Optional, Union
from copy import deepcopy
from typing import List, Tuple, Optional, Union
from logging import getLogger

import numpy as np
import pandas as pd
import warnings

from PySide2 import QtCore, QtGui
from PySide2.QtWidgets import (QApplication, QButtonGroup, QCheckBox,
QComboBox, QFileDialog, QGridLayout, QGroupBox,
QHBoxLayout, QLabel, QLineEdit, QMessageBox,
QPushButton, QRadioButton, QScrollArea,
QTableView, QTabWidget, QToolBar, QVBoxLayout,
QWidget)
from activity_browser.bwutils import AB_metadata

from activity_browser.bwutils.metadata import MetaDataStore
from stats_arrays.errors import InvalidParamsError
import brightway2 as bw

from activity_browser import signals
from activity_browser.mod.bw2data import calculation_setups
Expand All @@ -31,6 +39,7 @@
from ...ui.tables import ContributionTable, InventoryTable, LCAResultsTable
from ...ui.web import SankeyNavigatorWidget
from ...ui.widgets import CutoffMenu, SwitchComboBox
from ...bwutils.superstructure.graph_traversal_with_scenario import GraphTraversalWithScenario
from .base import BaseRightTab

log = getLogger(__name__)
Expand Down Expand Up @@ -62,7 +71,7 @@ def get_unit(method: tuple, relative: bool = False) -> str:

# Special namedtuple for the LCAResults TabWidget.
Tabs = namedtuple(
"tabs", ("inventory", "results", "ef", "process", "sankey", "mc", "gsa")
"tabs", ("inventory", "results", "ef", "process", "product", "sankey", "mc", "gsa")
)
Relativity = namedtuple("relativity", ("relative", "absolute"))
ExportTable = namedtuple("export_table", ("label", "copy", "csv", "excel"))
Expand Down Expand Up @@ -121,6 +130,7 @@ def __init__(self, data: dict, parent=None):
results=LCAResultsTab(self),
ef=ElementaryFlowContributionTab(self),
process=ProcessContributionsTab(self),
product=ProductContributionsTab(self.cs_name, self),
sankey=SankeyNavigatorWidget(self.cs_name, parent=self),
mc=MonteCarloTab(
self
Expand All @@ -132,6 +142,7 @@ def __init__(self, data: dict, parent=None):
results="LCA Results",
ef="EF Contributions",
process="Process Contributions",
product="Product Contributions",
sankey="Sankey",
mc="Monte Carlo",
gsa="Sensitivity Analysis",
Expand Down Expand Up @@ -909,6 +920,7 @@ def set_filename(self, optional_fields: dict = None):
optional.get("functional_unit"),
self.unit,
)

filename = "_".join((str(x) for x in fields if x is not None))
self.plot.plot_name, self.table.table_name = filename, filename

Expand Down Expand Up @@ -981,12 +993,9 @@ def set_combobox_changes(self):
# gather the combobox values
method = self.parent.method_dict[self.combobox_menu.method.currentText()]
functional_unit = self.combobox_menu.func.currentText()
scenario = self.combobox_menu.scenario.currentIndex()
scenario = max(self.combobox_menu.scenario.currentIndex(), 0) # set scenario 0 if not initiated yet
aggregator = self.combobox_menu.agg.currentText()

# catch uninitiated scenario combobox
if scenario < 0:
scenario = 0
# set aggregator to None if unwanted
if aggregator == "none":
aggregator = None
Expand Down Expand Up @@ -1159,6 +1168,315 @@ def update_dataframe(self, *args, **kwargs):
)


class ProductContributionsTab(ContributionTab):
"""Class for the 'Product Contributions' sub-tab.

This tab allows for analysis of first-level product contributions.
The direct impact (from biosphere exchanges from the FU)
and cumulative impacts from all exchange inputs to the FU (first level) are calculated.

e.g. the direct emissions from steel production and the cumulative impact for all electricity input
into that activity. This works on the basis of input products and their total (cumulative) impact, scaled to
how much of that product is needed in the FU.

Example questions that can be answered by this tab:
What is the contribution of electricity (product) to reference flow XXX?
Which input product contributes the most to impact category YYY?
What products contribute most to reference flow ZZZ?

Shows:
Compare options button to change between 'Reference Flows' and 'Impact Categories'
'Impact Category'/'Reference Flow' chooser with aggregation method
Plot/Table on/off and Relative/Absolute options for data
Plot/Table
Export options
"""

def __init__(self, cs_name, parent=None):
super().__init__(parent)

self.cache = {"totals": {}} # We cache the calculated data, as it can take some time to generate.
# We cache the individual calculation results, as they are re-used in multiple views
# e.g. FU1 x method1 x scenario1
# may be seen in both 'Reference Flows' and 'Impact Categories', just with different axes.
# we also cache totals, not for calculation speed, but to be able to easily convert for relative results
self.caching = True # set to False to disable caching for debug

self.layout.addLayout(get_header_layout("Product Contributions"))
self.layout.addWidget(self.cutoff_menu)
self.layout.addWidget(horizontal_line())
combobox = self.build_combobox(has_method=True, has_func=True)
self.layout.addLayout(combobox)
self.layout.addWidget(horizontal_line())
self.layout.addWidget(self.build_main_space())
self.layout.addLayout(self.build_export(True, True))

# get relevant data from calculation setup
self.cs = cs_name
func_units = bw.calculation_setups[self.cs]["inv"]
self.func_keys = [list(fu.keys())[0] for fu in func_units] # extract a list of keys from the functional units
self.func_units = [
{bw.get_activity(k): v for k, v in fu.items()}
for fu in func_units
]
self.methods = bw.calculation_setups[self.cs]["ia"]

self.contribution_fn = "Product contributions"
self.switches.configure(self.has_func, self.has_method)
self.connect_signals()
self.toggle_comparisons(self.switches.indexes.func)

def update_dataframe(self, *args, **kwargs):
"""Retrieve the product contributions."""

#TODO
# 1 refactor so this updates the df, not does calculations/cache reads etc
# 2 figure out how this already works with relative???
# 3 make this work with aggegator for data
# 4 make this work with cutoff menu (limit, limit_type (and normalize??)
# 5 update documentation

def get_data():
return self.calculate_contributions(demand, demand_key, demand_index,
method=method, method_index=method_index,
scenario_lca=self.has_scenarios, scenario_index=scenario_index,
)

# get the right data
if self.has_scenarios:
# get the scenario index, if it is -1 (none selected), then use index 0
scenario_index = max(self.combobox_menu.scenario.currentIndex(), 0)
else:
scenario_index = None
method_index = self.combobox_menu.method.currentIndex()
method = self.methods[method_index]
demand_index = self.combobox_menu.func.currentIndex()
demand = self.func_units[demand_index]
demand_key = self.func_keys[demand_index]

all_data = []
compare = self.switches.currentText()
if compare == "Reference Flows":
# run the analysis for every reference flow
for demand_index, demand in enumerate(self.func_units):
demand_key = self.func_keys[demand_index]
cache_key = (demand_index, method_index, scenario_index)
if self.caching and self.cache.get(cache_key, False):
# this data is cached
all_data.append([demand_key, self.cache[cache_key]])
continue

data = get_data()
all_data.append([demand_key, data])
if self.caching:
self.cache[cache_key] = data
elif compare == "Impact Categories":
# run the analysis for every method
for method_index, method in enumerate(self.methods):
cache_key = (demand_index, method_index, scenario_index)
if self.caching and self.cache.get(cache_key, False):
# this data is cached
all_data.append([method, self.cache[cache_key]])
continue

data = get_data()
all_data.append([method, data])
if self.caching:
self.cache[cache_key] = data
elif compare == "Scenarios":
# run the analysis for every scenario
orig_idx = self.combobox_menu.scenario.currentIndex()
for scenario_index in range(self.combobox_menu.scenario.count()):
scenario = self.combobox_menu.scenario.itemText(scenario_index)
cache_key = (demand_index, method_index, scenario_index)
if self.caching and self.cache.get(cache_key, False):
# this data is cached
all_data.append([scenario, self.cache[cache_key]])
continue

data = get_data()
all_data.append([scenario, data])
if self.caching:
self.cache[cache_key] = data
self.combobox_menu.scenario.setCurrentIndex(orig_idx)

df = self.data_to_df(all_data, compare)
return df

def calculate_contributions(self, demand, demand_key, demand_index,
method, method_index: int = None,
scenario_lca: bool = False, scenario_index: int = None) -> Optional[dict]:
"""TODO."""

def get_default_demands() -> dict:
"""Get the inputs to calculate contributions from the activity"""
# get exchange keys leading to this activity
technosphere = bw.get_activity(demand_key).technosphere()

keys = [exch.input.key for exch in technosphere if
exch.input.key != exch.output.key]
# find scale from production amount and demand amount
scale = demand[demand_key] / [p for p in bw.get_activity(demand_key).production()][0].amount

amounts = [exch.amount * scale for exch in technosphere if
exch.input.key != exch.output.key]
demands = {keys[i]: amounts[i] for i, _ in enumerate(keys)}
return demands

def get_scenario_demands() -> dict:
"""Get the inputs to calculate contributions from the scenario matrix"""
# get exchange keys leading to this activity
technosphere = bw.get_activity(demand_key).technosphere()
demand_idx = _lca.product_dict[demand_key]

keys = [exch.input.key for exch in technosphere if
exch.input.key != exch.output.key]
# find scale from production amount and demand amount
scale = demand[demand_key] / _lca.technosphere_matrix[_lca.activity_dict[demand_key], demand_idx] * -1

amounts = []

for exch in technosphere:
exch_idx = _lca.activity_dict[exch.input.key]
if exch.input.key != exch.output.key:
amounts.append(_lca.technosphere_matrix[exch_idx, demand_idx] * scale)

# write al non-zero exchanges to demand dict
demands = {keys[i]: amounts[i] for i, _ in enumerate(keys) if amounts[i] != 0}
return demands

# reuse LCA object from original calculation to skip 1 LCA
if scenario_lca:
# get score from the already calculated result
score = self.parent.mlca.lca_scores[demand_index, method_index, scenario_index]

# get lca object from mlca class
self.parent.mlca.current = scenario_index
self.parent.mlca.update_matrices()
_lca = self.parent.mlca.lca
_lca.redo_lci(demand)

# _lca.lci(factorize=True)
else:
# get score from the already calculated result
score = self.parent.mlca.lca_scores[demand_index, method_index]

# get lca object to calculate new results
_lca = self.parent.mlca.lca

# set the correct method
_lca.switch_method(method)
_lca.lcia_calculation()

if score == 0:
# no need to calculate contributions to '0' score
# technically it could be that positive and negative score of same amount negate to 0, but highly unlikely.
return {"Total": 0, demand_key: 0}

data = {"Total": score}
remainder = score # contribution of demand_key

if not scenario_lca:
new_demands = get_default_demands()
else:
new_demands = get_scenario_demands()

# iterate over all activities demand_key is connected to
for key, amt in new_demands.items():

# recalculate for this demand
_lca.redo_lci({key: amt})
_lca.redo_lcia()

score = _lca.score
if score != 0:
# only store non-zero results
data[key] = score
remainder -= score # subtract this from remainder

data[demand_key] = remainder
return data

def data_to_df(self, all_data: List[Tuple[object, dict]], compare: str) -> pd.DataFrame:
"""Convert the provided data into a dataframe."""
unique_keys = []
# get all the unique keys:
d = {"index": ["Total"], "reference product": [""], "name": [""],
"location": [""], "unit": [""], "database": [""]}
meta_cols = set(d.keys())
for i, (item, data) in enumerate(all_data):
unique_keys += list(data.keys())
# already add the total with right column formatting depending on compares
if compare == "Reference Flows":
col_name = self.metadata_to_index(self.key_to_metadata(item))
elif compare == "Impact Categories":
col_name = self.metadata_to_index(list(item))
elif compare == "Scenarios":
col_name = item

self.cache["totals"][col_name] = data["Total"]
if self.relative:
d[col_name] = [1]
else:
d[col_name] = [data["Total"]]

all_data[i] = item, data, col_name
unique_keys = set(unique_keys)

# convert to dict format to feed into dataframe
for key in unique_keys:
if key == "Total":
continue
# get metadata
metadata = self.key_to_metadata(key)
d["index"].append(self.metadata_to_index(metadata))
d["reference product"].append(metadata[0])
d["name"].append(metadata[1])
d["location"].append(metadata[2])
d["unit"].append(metadata[3])
d["database"].append(metadata[4])
# check for each dataset if we have values, otherwise add np.nan
for item, data, col_name in all_data:
if val := data.get(key, False):
if self.relative:
value = val / self.cache["totals"][col_name]
else:
value = val
else:
value = np.nan
d[col_name].append(value)
df = pd.DataFrame(d)
check_cols = list(set(df.columns) - meta_cols)
df = df.dropna(subset=check_cols, how="all")
# TODO sort like https://github.com/LCA-ActivityBrowser/activity-browser/issues/887
df.sort_values(by=col_name, ascending=False) # temporary sorting solution, just sort on the last column.
return df

def key_to_metadata(self, key: tuple) -> list:
"""Convert the key information to list with metadata.

format:
[reference product, activity name, location, unit, database]
"""
return list(AB_metadata.get_metadata([key], ["reference product", "name", "location", "unit"]).iloc[0]) + [key[0]]

def metadata_to_index(self, data: list) -> str:
"""Convert list to formatted index.

format:
reference product | activity name | location | unit | database
"""
return " | ".join(data)

def build_combobox(
self, has_method: bool = True, has_func: bool = False
) -> QHBoxLayout:
self.combobox_menu.agg.addItems(
self.parent.contributions.DEFAULT_ACT_AGGREGATES
)
return super().build_combobox(has_method, has_func)


class CorrelationsTab(NewAnalysisTab):
def __init__(self, parent):
super().__init__(parent)
Expand Down
Loading