Skip to content

Commit

Permalink
Add first iteration of product contributions
Browse files Browse the repository at this point in the history
  • Loading branch information
marc-vdm committed Sep 22, 2023
1 parent a843fef commit 6359dd7
Showing 1 changed file with 260 additions and 4 deletions.
264 changes: 260 additions & 4 deletions activity_browser/layouts/tabs/LCA_results_tabs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@

from collections import namedtuple
import traceback
from typing import List, Optional, Union
from typing import List, Tuple, Optional, Union

import numpy as np
import pandas as pd
import warnings

from PySide2.QtWidgets import (
QWidget, QTabWidget, QVBoxLayout, QHBoxLayout, QScrollArea, QRadioButton,
Expand All @@ -18,6 +21,7 @@
)
from PySide2 import QtGui, QtCore
from stats_arrays.errors import InvalidParamsError
import brightway2 as bw

from ...bwutils import (
Contributions, MonteCarloLCA, MLCA,
Expand All @@ -34,6 +38,7 @@
from ...ui.tables import ContributionTable, InventoryTable, LCAResultsTable
from ...ui.widgets import CutoffMenu, SwitchComboBox
from ...ui.web import SankeyNavigatorWidget
from ...bwutils.superstructure.graph_traversal_with_scenario import GraphTraversalWithScenario
from .base import BaseRightTab

import logging
Expand Down Expand Up @@ -69,7 +74,7 @@ def get_unit(method: tuple, relative: bool = False) -> str:

# Special namedtuple for the LCAResults TabWidget.
Tabs = namedtuple(
"tabs", ("inventory", "results", "ef", "process", "sankey", "mc", "gsa")
"tabs", ("inventory", "results", "ef", "process", "product", "sankey", "mc", "gsa")
)
Relativity = namedtuple("relativity", ("relative", "absolute"))
ExportTable = namedtuple("export_table", ("label", "copy", "csv", "excel"))
Expand Down Expand Up @@ -107,7 +112,6 @@ def __init__(self, data: dict, parent=None):

self.setMovable(True)
self.setVisible(False)
self.visible = False

QApplication.setOverrideCursor(QtCore.Qt.WaitCursor)
self.mlca, self.contributions, self.mc = calculations.do_LCA_calculations(data)
Expand All @@ -120,6 +124,7 @@ def __init__(self, data: dict, parent=None):
results=LCAResultsTab(self),
ef=ElementaryFlowContributionTab(self),
process=ProcessContributionsTab(self),
product=ProductContributionsTab(self.cs_name, self),
sankey=SankeyNavigatorWidget(self.cs_name, parent=self),
mc=MonteCarloTab(self), # mc=None if self.mc is None else MonteCarloTab(self),
gsa=GSATab(self),
Expand All @@ -129,6 +134,7 @@ def __init__(self, data: dict, parent=None):
results="LCA Results",
ef="EF Contributions",
process="Process Contributions",
product="Product Contributions",
sankey="Sankey",
mc="Monte Carlo",
gsa="Sensitivity Analysis",
Expand Down Expand Up @@ -521,7 +527,6 @@ def elementary_flows_contributing_to_IA_methods(self, contributary: bool = True,

return data.loc[data['code'].isin(new_flows)]


def update_table(self):
"""Update the table."""
inventory = "biosphere" if self.radio_button_biosphere.isChecked() else "technosphere"
Expand Down Expand Up @@ -1034,6 +1039,257 @@ def update_dataframe(self, *args, **kwargs):
)


class ProductContributionsTab(ContributionTab):
"""Class for the 'Product Contributions' sub-tab.
This tab allows for analysis of first-level product contributions.
The direct impact (from biosphere exchanges from the FU)
and cumulative impacts from all exchange inputs to the FU (first level) are calculated.
e.g. the direct emissions from steel production and the cumulative impact for all electricity input
into that activity. This works on the basis of input products and their total (cumulative) impact, scaled to
how much of that product is needed in the FU.
Example questions that can be answered by this tab:
What is the contribution of electricity (product) to reference flow XXX?
Which input product contributes the most to impact category YYY?
What products contribute most to reference flow ZZZ?
Shows:
Compare options button to change between 'Reference Flows' and 'Impact Categories'
'Impact Category'/'Reference Flow' chooser with aggregation method
Plot/Table on/off and Relative/Absolute options for data
Plot/Table
Export options
"""

def __init__(self, cs_name, parent=None):
super().__init__(parent)

self.cache = {'totals': {}} # We cache the calculated data, as it can take some time to generate.
# We cache the individual calculation results, as they are re-used in multiple views
# e.g. FU1 x method1 x scenario1
# may be seen in both 'Reference Flows' and 'Impact Categories', just with different axes.
# we also cache totals, not for calculation speed, but to be able to easily convert for relative results
self.caching = True # set to False to disable caching for debug

self.layout.addLayout(get_header_layout('Product Contributions'))
combobox = self.build_combobox(has_method=True, has_func=True)
self.layout.addLayout(combobox)
self.layout.addWidget(horizontal_line())
self.layout.addWidget(self.build_main_space())
self.layout.addLayout(self.build_export(True, True))

# get relevant data from calculation setup
self.cs = cs_name
func_units = bw.calculation_setups[self.cs]['inv']
self.func_keys = [list(fu.keys())[0] for fu in func_units] # extract a list of keys from the functional units
self.func_units = [
{bw.get_activity(k): v for k, v in fu.items()}
for fu in func_units
]
self.methods = bw.calculation_setups[self.cs]['ia']

self.contribution_fn = 'Product contributions'
self.switches.configure(self.has_func, self.has_method)
self.connect_signals()
self.toggle_comparisons(self.switches.indexes.func)

self.combobox_menu.agg_label.setVisible(False)
self.combobox_menu.agg.setVisible(False)

def update_dataframe(self, *args, **kwargs):
"""Retrieve the product contributions."""
# get the right data
if self.has_scenarios:
scenario_index = self.combobox_menu.scenario.currentIndex()
else:
scenario_index = None
method_index = self.combobox_menu.method.currentIndex()
method = self.methods[method_index]
demand_index = self.combobox_menu.func.currentIndex()
demand = self.func_units[demand_index]
demand_key = self.func_keys[demand_index]

all_data = []
compare = self.switches.currentText()
if compare == 'Reference Flows':
# run the analysis for every reference flow
for demand_index, demand in enumerate(self.func_units):
demand_key = self.func_keys[demand_index]
cache_key = (demand_key, method_index, scenario_index)
if self.caching and self.cache.get(cache_key, False):
all_data.append([demand_key, self.cache[cache_key]])
continue
data = self.calculate_contributions(demand, demand_key,
method=method, method_index=method_index,
scenario_lca=self.has_scenarios, scenario_index=scenario_index,
)
if data:
data = self.reformat_data(data)
else:
data = {'Total': 0}
all_data.append([demand_key, data])
self.cache[cache_key] = data
elif compare == 'Impact Categories':
# run the analysis for every method
for method_index, method in enumerate(self.methods):
cache_key = (demand_key, method_index, scenario_index)
if self.caching and self.cache.get(cache_key, False):
all_data.append([method, self.cache[cache_key]])
continue
data = self.calculate_contributions(demand, demand_key,
method=method, method_index=method_index,
scenario_lca=self.has_scenarios, scenario_index=scenario_index,
)
if data:
data = self.reformat_data(data)
else:
data = {'Total': 0}
all_data.append([method, data])
self.cache[cache_key] = data
elif compare == 'Scenarios':
# run the analysis for every scenario
orig_idx = self.combobox_menu.scenario.currentIndex()
for scenario_index in range(self.combobox_menu.scenario.count()):
self.combobox_menu.scenario.setCurrentIndex(scenario_index)
scenario = self.combobox_menu.scenario.currentText()

cache_key = (demand_key, method_index, scenario_index)
if self.caching and self.cache.get(cache_key, False):
all_data.append([scenario, self.cache[cache_key]])
continue
data = self.calculate_contributions(demand, demand_key,
method=method, method_index=method_index,
scenario_lca=self.has_scenarios, scenario_index=scenario_index,
)
if data:
data = self.reformat_data(data)
else:
data = {'Total': 0}
all_data.append([scenario, data])
self.cache[cache_key] = data
self.combobox_menu.scenario.setCurrentIndex(orig_idx)
df = self.data_to_df(all_data, compare)
# TODO manage empty dataframe so overall calculation doesn't fail with plot generation
return df

def calculate_contributions(self, demand, demand_key,
method, method_index: int = None,
scenario_lca: bool = False, scenario_index: int = None) -> Optional[dict]:
"""Calculate LCA, do graph traversal on the first level of activities."""
technosphere = bw.get_activity(demand_key).technosphere()

max_calc = len([exch for exch in technosphere if
exch.input.key != exch.output.key]) # get the amount of exchanges that are not to self

with warnings.catch_warnings():
# Ignore the calculation count warning, as we don't care about it in this situation
warnings.filterwarnings("ignore", message="Stopping traversal due to calculation count.")
try:
if scenario_lca:
self.parent.mlca.update_lca_calculation_for_sankey(scenario_index, demand, method_index)
data = GraphTraversalWithScenario(self.parent.mlca).calculate(demand, method, cutoff=0,
max_calc=max_calc)
else:
data = bw.GraphTraversal().calculate(demand, method, cutoff=0, max_calc=max_calc)
except (ValueError, ZeroDivisionError) as e:
log.info('{}, no results calculated for {}'.format(e, method))
return
return data

def reformat_data(self, data: dict) -> dict:
"""Reformat the data into a useable format."""
lca = data["lca"]
demand = list(lca.demand.items())[0]
demand_key = list(lca.demand.keys())[0].key
total = lca.score
reverse_activity_dict = {v: k for k, v in lca.activity_dict.items()}

# get the impact data per key
contributions = {
bw.get_activity(reverse_activity_dict[edge['from']]).key: edge['impact'] for edge in data["edges"]
if all(i != -1 for i in (edge["from"], edge["to"]))
}
# get impact for the total and demand
diff = total - sum([v for v in contributions.values()]) # for the demand process, calculate by the difference
contributions[demand_key] = diff
contributions['Total'] = total
return contributions

def data_to_df(self, all_data: List[Tuple[object, dict]], compare: str) -> pd.DataFrame:
"""Convert the provided data into a dataframe."""
unique_keys = []
# get all the unique keys:
d = {'index': ['Total'], 'reference product': [''], 'name': [''],
'location': [''], 'unit': [''], 'database': ['']}
meta_cols = set(d.keys())
for i, (item, data) in enumerate(all_data):
print('++ COL:', item, data)
unique_keys += list(data.keys())
# already add the total with right column formatting depending on compares
if compare == 'Reference Flows':
col_name = self.metadata_to_index(self.key_to_metadata(item))
elif compare == 'Impact Categories':
col_name = self.metadata_to_index(list(item))
elif compare == 'Scenarios':
col_name = item

self.cache['totals'][col_name] = data['Total']
if self.relative:
d[col_name] = [1]
else:
d[col_name] = [data['Total']]

all_data[i] = item, data, col_name
unique_keys = set(unique_keys)

# convert to dict format to feed into dataframe
for key in unique_keys:
if key == 'Total':
continue
# get metadata
metadata = self.key_to_metadata(key)
d['index'].append(self.metadata_to_index(metadata))
d['reference product'].append(metadata[0])
d['name'].append(metadata[1])
d['location'].append(metadata[2])
d['unit'].append(metadata[3])
d['database'].append(metadata[4])
# check for each dataset if we have values, otherwise add np.nan
for item, data, col_name in all_data:
if val := data.get(key, False):
if self.relative:
value = val / self.cache['totals'][col_name]
else:
value = val
else:
value = np.nan
d[col_name].append(value)
df = pd.DataFrame(d)
check_cols = list(set(df.columns) - meta_cols)
df = df.dropna(subset=check_cols, how='all')
df.sort_values(by=col_name, ascending=False) # temporary sorting solution, just sort on the last column.
return df

def key_to_metadata(self, key: tuple) -> list:
"""Convert the key information to list with metadata.
format:
[reference product, activity name, location, unit, database]
"""
act = bw.get_activity(key)
return [act.get('reference product'), act.get('name'), act.get('location'), act.get('unit'), key[0]]

def metadata_to_index(self, data: list) -> str:
"""Convert list to formatted index.
format:
reference product | activity name | location | unit | database
"""
return ' | '.join(data)


class CorrelationsTab(NewAnalysisTab):
def __init__(self, parent):
super().__init__(parent)
Expand Down

0 comments on commit 6359dd7

Please sign in to comment.