diff --git a/activity_browser/layouts/tabs/LCA_results_tabs.py b/activity_browser/layouts/tabs/LCA_results_tabs.py index 3df4cae4..321ac0b9 100644 --- a/activity_browser/layouts/tabs/LCA_results_tabs.py +++ b/activity_browser/layouts/tabs/LCA_results_tabs.py @@ -5,10 +5,14 @@ """ from collections import namedtuple -from typing import List, Optional, Union +from copy import deepcopy +from typing import List, Tuple, Optional, Union from logging import getLogger +import numpy as np import pandas as pd +import warnings + from PySide2 import QtCore, QtGui from PySide2.QtWidgets import (QApplication, QButtonGroup, QCheckBox, QComboBox, QFileDialog, QGridLayout, QGroupBox, @@ -16,7 +20,11 @@ QPushButton, QRadioButton, QScrollArea, QTableView, QTabWidget, QToolBar, QVBoxLayout, QWidget) +from activity_browser.bwutils import AB_metadata + +from activity_browser.bwutils.metadata import MetaDataStore from stats_arrays.errors import InvalidParamsError +import brightway2 as bw from activity_browser import signals from activity_browser.mod.bw2data import calculation_setups @@ -31,6 +39,7 @@ from ...ui.tables import ContributionTable, InventoryTable, LCAResultsTable from ...ui.web import SankeyNavigatorWidget from ...ui.widgets import CutoffMenu, SwitchComboBox +from ...bwutils.superstructure.graph_traversal_with_scenario import GraphTraversalWithScenario from .base import BaseRightTab log = getLogger(__name__) @@ -62,7 +71,7 @@ def get_unit(method: tuple, relative: bool = False) -> str: # Special namedtuple for the LCAResults TabWidget. Tabs = namedtuple( - "tabs", ("inventory", "results", "ef", "process", "sankey", "mc", "gsa") + "tabs", ("inventory", "results", "ef", "process", "product", "sankey", "mc", "gsa") ) Relativity = namedtuple("relativity", ("relative", "absolute")) ExportTable = namedtuple("export_table", ("label", "copy", "csv", "excel")) @@ -121,6 +130,7 @@ def __init__(self, data: dict, parent=None): results=LCAResultsTab(self), ef=ElementaryFlowContributionTab(self), process=ProcessContributionsTab(self), + product=ProductContributionsTab(self.cs_name, self), sankey=SankeyNavigatorWidget(self.cs_name, parent=self), mc=MonteCarloTab( self @@ -132,6 +142,7 @@ def __init__(self, data: dict, parent=None): results="LCA Results", ef="EF Contributions", process="Process Contributions", + product="Product Contributions", sankey="Sankey", mc="Monte Carlo", gsa="Sensitivity Analysis", @@ -909,6 +920,7 @@ def set_filename(self, optional_fields: dict = None): optional.get("functional_unit"), self.unit, ) + filename = "_".join((str(x) for x in fields if x is not None)) self.plot.plot_name, self.table.table_name = filename, filename @@ -981,12 +993,9 @@ def set_combobox_changes(self): # gather the combobox values method = self.parent.method_dict[self.combobox_menu.method.currentText()] functional_unit = self.combobox_menu.func.currentText() - scenario = self.combobox_menu.scenario.currentIndex() + scenario = max(self.combobox_menu.scenario.currentIndex(), 0) # set scenario 0 if not initiated yet aggregator = self.combobox_menu.agg.currentText() - # catch uninitiated scenario combobox - if scenario < 0: - scenario = 0 # set aggregator to None if unwanted if aggregator == "none": aggregator = None @@ -1159,6 +1168,315 @@ def update_dataframe(self, *args, **kwargs): ) +class ProductContributionsTab(ContributionTab): + """Class for the 'Product Contributions' sub-tab. + + This tab allows for analysis of first-level product contributions. + The direct impact (from biosphere exchanges from the FU) + and cumulative impacts from all exchange inputs to the FU (first level) are calculated. + + e.g. the direct emissions from steel production and the cumulative impact for all electricity input + into that activity. This works on the basis of input products and their total (cumulative) impact, scaled to + how much of that product is needed in the FU. + + Example questions that can be answered by this tab: + What is the contribution of electricity (product) to reference flow XXX? + Which input product contributes the most to impact category YYY? + What products contribute most to reference flow ZZZ? + + Shows: + Compare options button to change between 'Reference Flows' and 'Impact Categories' + 'Impact Category'/'Reference Flow' chooser with aggregation method + Plot/Table on/off and Relative/Absolute options for data + Plot/Table + Export options + """ + + def __init__(self, cs_name, parent=None): + super().__init__(parent) + + self.cache = {"totals": {}} # We cache the calculated data, as it can take some time to generate. + # We cache the individual calculation results, as they are re-used in multiple views + # e.g. FU1 x method1 x scenario1 + # may be seen in both 'Reference Flows' and 'Impact Categories', just with different axes. + # we also cache totals, not for calculation speed, but to be able to easily convert for relative results + self.caching = True # set to False to disable caching for debug + + self.layout.addLayout(get_header_layout("Product Contributions")) + self.layout.addWidget(self.cutoff_menu) + self.layout.addWidget(horizontal_line()) + combobox = self.build_combobox(has_method=True, has_func=True) + self.layout.addLayout(combobox) + self.layout.addWidget(horizontal_line()) + self.layout.addWidget(self.build_main_space()) + self.layout.addLayout(self.build_export(True, True)) + + # get relevant data from calculation setup + self.cs = cs_name + func_units = bw.calculation_setups[self.cs]["inv"] + self.func_keys = [list(fu.keys())[0] for fu in func_units] # extract a list of keys from the functional units + self.func_units = [ + {bw.get_activity(k): v for k, v in fu.items()} + for fu in func_units + ] + self.methods = bw.calculation_setups[self.cs]["ia"] + + self.contribution_fn = "Product contributions" + self.switches.configure(self.has_func, self.has_method) + self.connect_signals() + self.toggle_comparisons(self.switches.indexes.func) + + def update_dataframe(self, *args, **kwargs): + """Retrieve the product contributions.""" + + #TODO + # 1 refactor so this updates the df, not does calculations/cache reads etc + # 2 figure out how this already works with relative??? + # 3 make this work with aggegator for data + # 4 make this work with cutoff menu (limit, limit_type (and normalize??) + # 5 update documentation + + def get_data(): + return self.calculate_contributions(demand, demand_key, demand_index, + method=method, method_index=method_index, + scenario_lca=self.has_scenarios, scenario_index=scenario_index, + ) + + # get the right data + if self.has_scenarios: + # get the scenario index, if it is -1 (none selected), then use index 0 + scenario_index = max(self.combobox_menu.scenario.currentIndex(), 0) + else: + scenario_index = None + method_index = self.combobox_menu.method.currentIndex() + method = self.methods[method_index] + demand_index = self.combobox_menu.func.currentIndex() + demand = self.func_units[demand_index] + demand_key = self.func_keys[demand_index] + + all_data = [] + compare = self.switches.currentText() + if compare == "Reference Flows": + # run the analysis for every reference flow + for demand_index, demand in enumerate(self.func_units): + demand_key = self.func_keys[demand_index] + cache_key = (demand_index, method_index, scenario_index) + if self.caching and self.cache.get(cache_key, False): + # this data is cached + all_data.append([demand_key, self.cache[cache_key]]) + continue + + data = get_data() + all_data.append([demand_key, data]) + if self.caching: + self.cache[cache_key] = data + elif compare == "Impact Categories": + # run the analysis for every method + for method_index, method in enumerate(self.methods): + cache_key = (demand_index, method_index, scenario_index) + if self.caching and self.cache.get(cache_key, False): + # this data is cached + all_data.append([method, self.cache[cache_key]]) + continue + + data = get_data() + all_data.append([method, data]) + if self.caching: + self.cache[cache_key] = data + elif compare == "Scenarios": + # run the analysis for every scenario + orig_idx = self.combobox_menu.scenario.currentIndex() + for scenario_index in range(self.combobox_menu.scenario.count()): + scenario = self.combobox_menu.scenario.itemText(scenario_index) + cache_key = (demand_index, method_index, scenario_index) + if self.caching and self.cache.get(cache_key, False): + # this data is cached + all_data.append([scenario, self.cache[cache_key]]) + continue + + data = get_data() + all_data.append([scenario, data]) + if self.caching: + self.cache[cache_key] = data + self.combobox_menu.scenario.setCurrentIndex(orig_idx) + + df = self.data_to_df(all_data, compare) + return df + + def calculate_contributions(self, demand, demand_key, demand_index, + method, method_index: int = None, + scenario_lca: bool = False, scenario_index: int = None) -> Optional[dict]: + """TODO.""" + + def get_default_demands() -> dict: + """Get the inputs to calculate contributions from the activity""" + # get exchange keys leading to this activity + technosphere = bw.get_activity(demand_key).technosphere() + + keys = [exch.input.key for exch in technosphere if + exch.input.key != exch.output.key] + # find scale from production amount and demand amount + scale = demand[demand_key] / [p for p in bw.get_activity(demand_key).production()][0].amount + + amounts = [exch.amount * scale for exch in technosphere if + exch.input.key != exch.output.key] + demands = {keys[i]: amounts[i] for i, _ in enumerate(keys)} + return demands + + def get_scenario_demands() -> dict: + """Get the inputs to calculate contributions from the scenario matrix""" + # get exchange keys leading to this activity + technosphere = bw.get_activity(demand_key).technosphere() + demand_idx = _lca.product_dict[demand_key] + + keys = [exch.input.key for exch in technosphere if + exch.input.key != exch.output.key] + # find scale from production amount and demand amount + scale = demand[demand_key] / _lca.technosphere_matrix[_lca.activity_dict[demand_key], demand_idx] * -1 + + amounts = [] + + for exch in technosphere: + exch_idx = _lca.activity_dict[exch.input.key] + if exch.input.key != exch.output.key: + amounts.append(_lca.technosphere_matrix[exch_idx, demand_idx] * scale) + + # write al non-zero exchanges to demand dict + demands = {keys[i]: amounts[i] for i, _ in enumerate(keys) if amounts[i] != 0} + return demands + + # reuse LCA object from original calculation to skip 1 LCA + if scenario_lca: + # get score from the already calculated result + score = self.parent.mlca.lca_scores[demand_index, method_index, scenario_index] + + # get lca object from mlca class + self.parent.mlca.current = scenario_index + self.parent.mlca.update_matrices() + _lca = self.parent.mlca.lca + _lca.redo_lci(demand) + + # _lca.lci(factorize=True) + else: + # get score from the already calculated result + score = self.parent.mlca.lca_scores[demand_index, method_index] + + # get lca object to calculate new results + _lca = self.parent.mlca.lca + + # set the correct method + _lca.switch_method(method) + _lca.lcia_calculation() + + if score == 0: + # no need to calculate contributions to '0' score + # technically it could be that positive and negative score of same amount negate to 0, but highly unlikely. + return {"Total": 0, demand_key: 0} + + data = {"Total": score} + remainder = score # contribution of demand_key + + if not scenario_lca: + new_demands = get_default_demands() + else: + new_demands = get_scenario_demands() + + # iterate over all activities demand_key is connected to + for key, amt in new_demands.items(): + + # recalculate for this demand + _lca.redo_lci({key: amt}) + _lca.redo_lcia() + + score = _lca.score + if score != 0: + # only store non-zero results + data[key] = score + remainder -= score # subtract this from remainder + + data[demand_key] = remainder + return data + + def data_to_df(self, all_data: List[Tuple[object, dict]], compare: str) -> pd.DataFrame: + """Convert the provided data into a dataframe.""" + unique_keys = [] + # get all the unique keys: + d = {"index": ["Total"], "reference product": [""], "name": [""], + "location": [""], "unit": [""], "database": [""]} + meta_cols = set(d.keys()) + for i, (item, data) in enumerate(all_data): + unique_keys += list(data.keys()) + # already add the total with right column formatting depending on compares + if compare == "Reference Flows": + col_name = self.metadata_to_index(self.key_to_metadata(item)) + elif compare == "Impact Categories": + col_name = self.metadata_to_index(list(item)) + elif compare == "Scenarios": + col_name = item + + self.cache["totals"][col_name] = data["Total"] + if self.relative: + d[col_name] = [1] + else: + d[col_name] = [data["Total"]] + + all_data[i] = item, data, col_name + unique_keys = set(unique_keys) + + # convert to dict format to feed into dataframe + for key in unique_keys: + if key == "Total": + continue + # get metadata + metadata = self.key_to_metadata(key) + d["index"].append(self.metadata_to_index(metadata)) + d["reference product"].append(metadata[0]) + d["name"].append(metadata[1]) + d["location"].append(metadata[2]) + d["unit"].append(metadata[3]) + d["database"].append(metadata[4]) + # check for each dataset if we have values, otherwise add np.nan + for item, data, col_name in all_data: + if val := data.get(key, False): + if self.relative: + value = val / self.cache["totals"][col_name] + else: + value = val + else: + value = np.nan + d[col_name].append(value) + df = pd.DataFrame(d) + check_cols = list(set(df.columns) - meta_cols) + df = df.dropna(subset=check_cols, how="all") + # TODO sort like https://github.com/LCA-ActivityBrowser/activity-browser/issues/887 + df.sort_values(by=col_name, ascending=False) # temporary sorting solution, just sort on the last column. + return df + + def key_to_metadata(self, key: tuple) -> list: + """Convert the key information to list with metadata. + + format: + [reference product, activity name, location, unit, database] + """ + return list(AB_metadata.get_metadata([key], ["reference product", "name", "location", "unit"]).iloc[0]) + [key[0]] + + def metadata_to_index(self, data: list) -> str: + """Convert list to formatted index. + + format: + reference product | activity name | location | unit | database + """ + return " | ".join(data) + + def build_combobox( + self, has_method: bool = True, has_func: bool = False + ) -> QHBoxLayout: + self.combobox_menu.agg.addItems( + self.parent.contributions.DEFAULT_ACT_AGGREGATES + ) + return super().build_combobox(has_method, has_func) + + class CorrelationsTab(NewAnalysisTab): def __init__(self, parent): super().__init__(parent)