diff --git a/message_ix_models/project/ssp/transport.py b/message_ix_models/project/ssp/transport.py index 8e20d1669e..639b1f0ff7 100644 --- a/message_ix_models/project/ssp/transport.py +++ b/message_ix_models/project/ssp/transport.py @@ -12,6 +12,7 @@ from genno import Key from message_ix_models import Context +from message_ix_models.model.structure import get_codelist from message_ix_models.tools.iamc import iamc_like_data_for_query, to_quantity from message_ix_models.util import minimum_version from message_ix_models.util.genno import Keys @@ -33,9 +34,17 @@ EXPR_EMI = re.compile( r"^Emissions\|(?P[^\|]+)\|Energy\|Demand\|(?P(Bunkers|Transportation).*)$" ) + #: Expression used to select and extract :math:`(c)` dimension coordinates from variable #: codes in :func:`v_to_fe_coords`. -EXPR_FE = re.compile(r"^Final Energy\|Transportation\|(?PLiquids\|Oil)$") +EXPR_FE = re.compile( + r"""^Final.Energy\| + (?PBunkers(\|International.Aviation)?|(Transportation(|.\(w/.bunkers\)))) + \|? + (?P|Liquids\|Oil) + $""", + flags=re.VERBOSE, +) #: Keywords for :func:`.iamc_like_data_for_query` / :func:`.to_quantity`. IAMC_KW = dict(non_iso_3166="keep", query="Model != ''", unique="MODEL SCENARIO") @@ -50,11 +59,15 @@ #: - :py:`.emi`: computed aviation emissions. #: - :py:`.emi_in`: input data for aviation and other transport emissions, to be #: adjusted or overwritten. +#: - :py:`.fe`: computed final energy data. +#: - :py:`.fe_in`: input data for transport final energy, to be adjusted or overwritten. K = Keys( bcast=f"broadcast:t:{L}", input=f"input:n-y-VARIABLE-UNIT:{L}", emi=f"emission:e-n-t-y-UNIT:{L}", - emi_in=f"emission:e-n-t-y-UNIT:{L}+input", + emi_in=f"emission:e-n-t-y-UNIT:{L}+in", + fe_in=f"fe:c-n-t-y:{L}+in", + fe_out=f"fe:c-n-t-y:{L}+out", ) @@ -93,8 +106,10 @@ def aviation_emi_share(ref: "TQuantity") -> "TQuantity": ) -def broadcast_t(version: Literal[1, 2], include_international: bool) -> "AnyQuantity": - """Quantity to re-add the |t| dimension. +def broadcast_t_emi( + version: Literal[1, 2], include_international: bool +) -> "AnyQuantity": + """Quantity to re-add the |t| dimension for emission data. Parameters ---------- @@ -144,6 +159,22 @@ def broadcast_t(version: Literal[1, 2], include_international: bool) -> "AnyQuan return genno.Quantity(value[idx], coords={"t": t[idx]}) +def broadcast_t_fe() -> "AnyQuantity": + """Quantity to re-add the |t| dimension for final energy data.""" + return genno.Quantity( + pd.DataFrame( + [ + ["lightoil", "Bunkers", "", +1.0], + ["lightoil", "Bunkers|International Aviation", "", +1.0], + ["lightoil", "Bunkers", "Liquids|Oil", +1.0], + ["lightoil", "Transportation", "", -1.0], + ["lightoil", "Transportation", "Liquids|Oil", -1.0], + ], + columns=["c", "t", "c_new", "value"], + ).set_index(["c", "t", "c_new"])["value"] + ) + + def e_UNIT(cl_emission: "sdmx.model.common.Codelist") -> "AnyQuantity": """Return a quantity for broadcasting. @@ -178,7 +209,11 @@ def e_UNIT(cl_emission: "sdmx.model.common.Codelist") -> "AnyQuantity": def finalize( - q_all: "TQuantity", q_update: "TQuantity", model_name: str, scenario_name: str + q_all: "TQuantity", + q_emi_update: "TQuantity", + q_fe_update: "TQuantity", + model_name: str, + scenario_name: str, ) -> pd.DataFrame: """Finalize output. @@ -206,12 +241,12 @@ def _expand(qty): # Convert `q_all` to pd.Series s_all = q_all.pipe(_expand).to_series() - # - Convert `q_update` to pd.Series + # - Convert `q_emi_update` to pd.Series # - Reassemble "Variable" codes. # - Drop dimensions (e, t). # - Align index with s_all. - s_update = ( - q_update.pipe(_expand) + s_emi_update = ( + q_emi_update.pipe(_expand) .to_frame() .reset_index() .assign( @@ -221,13 +256,37 @@ def _expand(qty): .set_index(s_all.index.names)[0] .rename("value") ) - log.info(f"{len(s_update)} obs to update") - - # Update `s_all`. This yields an 'outer join' of the original and s_update indices. - s_all.update(s_update) + log.info(f'{len(s_emi_update)} obs to update for Variable="Emission|…"') + # Likewise for q_fe_update + dim = {"UNIT": [f"{q_fe_update.units:~}".replace("EJ / a", "EJ/yr")]} + s_fe_update = ( + q_fe_update.expand_dims(dim=dim) + .pipe(_expand) + .to_frame() + .reset_index() + .assign( + Variable=lambda df: ("Final Energy|" + df["t"] + "|" + df["c"]).str.replace( + r"\|$", "", regex=True + ) + ) + .drop(["c", "t"], axis=1) + .set_index(s_all.index.names)[0] + .rename("value") + ) + log.info(f'{len(s_fe_update)} obs to update for Variable="Final Energy|…"') + + # - Concatenate s_all, s_emi_update, and s_fe_update as columns of a data frame. + # The result has the superset of the indices of the arguments. + # - Fill along axes. Values from s_*_update end up in the last column. + # - Select the last column. + # - Reshape to wide format. + # - Rename index levels and restore to columns. return ( - s_all.unstack("y") + pd.concat([s_all, s_emi_update, s_fe_update], axis=1) + .ffill(axis=1) + .iloc[:, -1] + .unstack("y") .reorder_levels(["Model", "Scenario", "Region", "Variable", "Unit"]) .reset_index() ) @@ -291,13 +350,13 @@ def get_computer( url = workflow.scenario_url(context, label_full) # Optionally apply a regex substitution URL_SUB = { - "LED-SSP1": ("$", "#102"), # Point to a specific version - "LED-SSP2": ("$", "#108"), - "SSP1": ("$", "#652"), - "SSP2": ("$", "#695"), - "SSP3": ("$", "#569"), - "SSP4": ("$", "#525"), - "SSP5": ("$", "#522"), # Other scenario name + "LED-SSP1": ("$", "#162"), # Point to a specific version + "LED-SSP2": ("$", "#171"), + "SSP1": ("$", "#771"), + "SSP2": ("$", "#869"), + "SSP3": ("$", "#686"), + "SSP4": ("$", "#639"), + "SSP5": ("$", "#649"), # "SSP5": ("(SSP_2024.5) baseline$", r"\1 baseline#525"), # Other scenario name } if pattern_repl := URL_SUB.get(sc.id): @@ -308,32 +367,65 @@ def get_computer( log.info(f"method 'C' will use data from {url}") # Common structure and utility quantities used by method_[ABC] - c.add(K.bcast, broadcast_t, version=2, include_international=method == "A") + c.add(K.bcast, broadcast_t_emi, version=2, include_international=method == "A") # Placeholder for data-loading task. This is filled in later by process_df() or # process_file(). c.add(K.input, None) # Select and transform data matching EXPR_EMI - # Filter on "VARIABLE", expand the (e, t) dimensions from "VARIABLE" + # Filter on "VARIABLE", extract the (e, t) dimensions c.add(K.emi_in[0], "select_expand", K.input, dim_cb={"VARIABLE": v_to_emi_coords}) + # Assign units c.add(K.emi_in, "assign_units", K.emi_in[0], units="Mt/year") + # Select and transform data matching EXPR_FE + # Filter on "VARIABLE", extract the (c, t) dimensions + dim_cb = {"VARIABLE": v_to_fe_coords} + c.add(K.fe_in[0] * "UNITS", "select_expand", K.input, dim_cb=dim_cb) + # Convert "UNIT" dim labels to Quantity.units + c.add(K.fe_in[1], "unique_units_from_dim", K.fe_in[0] * "UNITS", dim="UNIT") + # Change labels; see get_label() + c.add(K.fe_in, "relabel", K.fe_in[1], labels=get_labels()) + # Call a function to prepare the remaining calculations up to K.emi method_func = {METHOD.A: method_A, METHOD.B: method_B, METHOD.C: method_C}[method] method_func(c) # Adjust the original data by adding the (maybe negative) prepared values at K.emi c.add(K.emi["adj"], "add", K.emi_in, K.emi) + c.add(K.fe_out["adj"], "add", K.fe_in[1], K.fe_out) # Add a key "target" to: # - Collapse to IAMC "VARIABLE" dimension name. # - Recombine with other/unaltered original data. - c.add("target", finalize, K.input, K.emi["adj"], "model name", "scenario name") + c.add( + "target", + finalize, + K.input, + K.emi["adj"], + K.fe_out["adj"], + "model name", + "scenario name", + ) return c +@cache +def get_labels(): + """Return mapper for relabelling input data: + + - c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil'. + - n[ode]: "AFR" → "R12_AFR" etc. "World" is not changed. + """ + cl = get_codelist("node/R12") + labels = dict(c={"Liquids|Oil": "lightoil", "": "_T"}, n={}) + for n in filter(lambda n: len(n.child) and n.id != "World", cl): + labels["n"][n.id.partition("_")[2]] = n.id + return labels + + def get_scenario_code(model_name: str, scenario_name: str) -> "sdmx.model.common.Code": """Return a specific code from ``CL_TRANSPORT_SCENARIO``. @@ -382,6 +474,9 @@ def method_A(c: "Computer") -> None: # Rail and Domestic Shipping" c.add(K.emi, "mul", K.emi[0] / "t", k_share, K.bcast) + # No change to final energy data + c.add(K.fe_out, genno.Quantity(0.0, units="EJ / a")) + def method_B(c: "Computer") -> None: """Prepare calculations up to :data:`K.emi` using :data:`METHOD.B`. @@ -460,10 +555,10 @@ def method_BC_common(c: "Computer", k_fe_share: "Key") -> None: A key with dimensions either :math:`(c, n)` or :math:`(c, n, y)` giving the share of aviation in total transport final energy. """ - from message_ix_models.model.structure import get_codelist + from message_ix_models.model.transport.key import exo - # Check dimensions of k_emi_share + # Check dimensions of k_fe_share exp = {frozenset("cn"), frozenset("cny")} if set(k_fe_share.dims) not in exp: # pragma: no cover raise ValueError(f"Dimensions of k_cn={k_fe_share.dims} are not in {exp}") @@ -472,31 +567,17 @@ def method_BC_common(c: "Computer", k_fe_share: "Key") -> None: k = Keys( ei=exo.emi_intensity, # Dimensions (c, e, t) emi0=Key("emission", ("ceny"), L), - fe_in=Key("fe", ("c", "n", "y", "UNIT"), "input"), - fe=Key("fe", tuple("cny"), L), + fe=Key("fe", tuple("cny"), f"{L}+BC"), units=Key(f"units:e-UNIT:{L}"), ) - ### Prepare data from the input data file: total transport consumption of light oil - - # Filter on "VARIABLE", extract (e) dimension - c.add(k.fe_in[0], "select_expand", K.input, dim_cb={"VARIABLE": v_to_fe_coords}) - - # Convert "UNIT" dim labels to Quantity.units - c.add(k.fe_in[1] / "UNIT", "unique_units_from_dim", k.fe_in[0], dim="UNIT") - - # Relabel: - # - c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil' - # - n[ode]: "AFR" → "R12_AFR" etc. "World" is not changed. - cl = get_codelist("node/R12") - labels = dict(c={"Liquids|Oil": "lightoil"}, n={}) - for n in filter(lambda n: len(n.child) and n.id != "World", cl): - labels["n"][n.id.partition("_")[2]] = n.id - c.add(k.fe_in[2] / "UNIT", "relabel", k.fe_in[1] / "UNIT", labels=labels) + # Select only total transport consumption of lightoil from K.fe_in + indexers = {"t": "Transportation (w/ bunkers)"} + c.add(k.fe[0], "select", K.fe_in, indexers=indexers, drop=True) ### Compute estimate of emissions # Product of aviation share and FE of total transport → FE of aviation - c.add(k.fe, "mul", k.fe_in[2] / "UNIT", k_fe_share) + c.add(k.fe, "mul", k.fe[0], k_fe_share) # Convert exogenous emission intensity data to Mt / EJ c.add(k.ei["units"], "convert_units", k.ei, units="Mt / EJ") @@ -517,9 +598,16 @@ def method_BC_common(c: "Computer", k_fe_share: "Key") -> None: c.add(K.emi[2], "mul", k.emi0[1], k.units, K.bcast) # Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed. - labels = dict(n={v: k for k, v in labels["n"].items()}) + labels = dict(n={v: k for k, v in get_labels()["n"].items()}) c.add(K.emi, "relabel", K.emi[2], labels=labels) + # Re-add the "t" dimension with +ve and -ve sign for certain labels + c.add(K.fe_out[0], "mul", k.fe, broadcast_t_fe()) + c.add(K.fe_out[1], "drop_vars", K.fe_out[0] * "c_new", names="c") + c.add(K.fe_out[2], "rename_dims", K.fe_out[1], name_dict={"c_new": "c"}) + # Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed. + c.add(K.fe_out, "relabel", K.fe_out[2], labels=labels) + def method_C(c: "Computer") -> None: """Prepare calculations up to :data:`K.emi` using :data:`METHOD.C`. @@ -556,8 +644,8 @@ def method_C(c: "Computer") -> None: labels = {"nl": {"R12_GLB": "World"}} c.add(k.base[1], "relabel", k.base[0], labels=labels, sums=True) - # Select the numerator - c.add(k.share0["num"], "select", k.base[1], indexers=dict(t=["AIR"]), drop=True) + # Select the numerator; drop the 't' dimension + c.add(k.share0["num"], "select", k.base[1], indexers=dict(t="AIR"), drop=True) # Ratio of AIR to the total c.add(k.share0, "div", k.share0["num"], k.base[1] / "t") diff --git a/message_ix_models/tests/project/ssp/test_transport.py b/message_ix_models/tests/project/ssp/test_transport.py index bb5712d2fd..dc8906634e 100644 --- a/message_ix_models/tests/project/ssp/test_transport.py +++ b/message_ix_models/tests/project/ssp/test_transport.py @@ -1,3 +1,4 @@ +import logging from collections.abc import Callable, Hashable from functools import cache from typing import TYPE_CHECKING @@ -12,6 +13,7 @@ get_scenario_code, process_df, process_file, + v_to_fe_coords, ) from message_ix_models.testing import MARK from message_ix_models.tools.iea import web @@ -20,6 +22,8 @@ if TYPE_CHECKING: import pathlib +log = logging.getLogger(__name__) + METHOD_PARAM = ( METHOD.A, METHOD.B, @@ -87,8 +91,26 @@ def _to_long(df): assert expected_variables(IN_, method) <= set(df_in["Variable"].unique()) region = set(df_in["Region"].unique()) + # Identify the directory from which IEA EWEB data is read + iea_eweb_dir = web.dir_fallback( + web.FILES[("IEA", "2024")][0], where=web.IEA_EWEB._where() + ) + # True if the fuzzed test data are being used + iea_eweb_test_data = iea_eweb_dir.match("message_ix_models/data/test/iea/web") + log.info(f"{iea_eweb_test_data = }") + + # Number of added values; number of modified values + N_new, N_exp = { + (METHOD.A, False): (0, 10280), + (METHOD.A, True): (0, 10280), + (METHOD.B, False): (13 * 20, 10120), + (METHOD.B, True): (10 * 20, 7720), + (METHOD.C, False): (13 * 20, 7000), + (METHOD.C, True): (13 * 20, 7000), + }[(method, iea_eweb_test_data)] + # Data have the same length - assert len(df_in) == len(df_out) + assert len(df_in) + N_new == len(df_out) # Output has the same set of region codes as input assert region == set(df_out["Region"].unique()) @@ -103,32 +125,15 @@ def _to_long(df): .query("abs(value_out - value_in) > 1e-16") ) - # Identify the directory from which IEA EWEB data is read - iea_eweb_dir = web.dir_fallback( - web.FILES[("IEA", "2024")][0], where=web.IEA_EWEB._where() - ) - # True if the fuzzed test data are being used - iea_eweb_test_data = iea_eweb_dir.match("message_ix_models/data/test/iea/web") - # All regions and "World" have modified values N_reg = {METHOD.A: 13, METHOD.B: 9, METHOD.C: 13}[method] assert N_reg <= len(df["Region"].unique()) - # Number of modified values - N_exp = { - (METHOD.A, False): 10280, - (METHOD.A, True): 10280, - (METHOD.B, False): 10120, - (METHOD.B, True): 7720, - (METHOD.C, False): 7000, - (METHOD.C, True): 7000, - }[(method, iea_eweb_test_data)] - - if N_exp != len(df): - # df.to_csv("debug-diff.csv") # DEBUG Dump to file - # print(df.to_string(max_rows=50)) # DEBUG Show in test output - msg = f"Unexpected number of modified values: {N_exp} != {len(df)}" - assert N_exp == len(df) + # if N_exp != len(df): # DEBUG + # df.to_csv("debug-diff.csv") # Dump to file + # print(df.to_string(max_rows=50)) # Show in test output + # msg = f"Unexpected number of modified values: {N_exp} != {len(df)}" + # assert N_exp == len(df), msg # All of the expected 'variable' codes have been modified assert expected_variables(OUT, method) == set(df["Variable"].unique()) @@ -148,6 +153,8 @@ def expected_variables(flag: int, method: METHOD) -> set[str]: edt = "Energy|Demand|Transportation" result = set() + + # Emissions for e in SPECIES: # Expected data flows in which these variable codes appear exp = IN_ if (e in SPECIES_WITHOUT_EF and method != METHOD.A) else I_O @@ -159,6 +166,17 @@ def expected_variables(flag: int, method: METHOD) -> set[str]: f"Emissions|{e}|{edt}|Road Rail and Domestic Shipping", } + # Final Energy + if method != METHOD.A: + result |= { + "Final Energy|Bunkers", + "Final Energy|Bunkers|Liquids|Oil", + "Final Energy|Transportation", + "Final Energy|Transportation|Liquids|Oil", + } + if flag & OUT: + result.add("Final Energy|Bunkers|International Aviation") + return result @@ -279,3 +297,31 @@ def test_process_file(tmp_path, test_context, input_csv_path, method) -> None: # Output satisfies expectations check(df_in, df_out, method) + + +@pytest.mark.parametrize( + "value, exp", + ( + ("Final Energy|Bunkers", {"c": "", "t": "Bunkers"}), + ( + "Final Energy|Bunkers|International Aviation", + {"c": "", "t": "Bunkers|International Aviation"}, + ), + ("Final Energy|Bunkers|Liquids|Oil", {"c": "Liquids|Oil", "t": "Bunkers"}), + ( + "Final Energy|Transportation (w/ bunkers)", + {"c": "", "t": "Transportation (w/ bunkers)"}, + ), + ( + "Final Energy|Transportation (w/ bunkers)|Liquids|Oil", + {"c": "Liquids|Oil", "t": "Transportation (w/ bunkers)"}, + ), + ("Final Energy|Transportation", {"c": "", "t": "Transportation"}), + ( + "Final Energy|Transportation|Liquids|Oil", + {"c": "Liquids|Oil", "t": "Transportation"}, + ), + ), +) +def test_v_to_fe_coords(value: str, exp) -> None: + assert exp == v_to_fe_coords(value)