Skip to content

Rewrite "Final Energy|*" IAMC variables #355

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 135 additions & 47 deletions message_ix_models/project/ssp/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from genno import Key

from message_ix_models import Context
from message_ix_models.model.structure import get_codelist
from message_ix_models.tools.iamc import iamc_like_data_for_query, to_quantity
from message_ix_models.util import minimum_version
from message_ix_models.util.genno import Keys
Expand All @@ -33,9 +34,17 @@
EXPR_EMI = re.compile(
r"^Emissions\|(?P<e>[^\|]+)\|Energy\|Demand\|(?P<t>(Bunkers|Transportation).*)$"
)

#: Expression used to select and extract :math:`(c)` dimension coordinates from variable
#: codes in :func:`v_to_fe_coords`.
EXPR_FE = re.compile(r"^Final Energy\|Transportation\|(?P<c>Liquids\|Oil)$")
EXPR_FE = re.compile(
r"""^Final.Energy\|
(?P<t>Bunkers(\|International.Aviation)?|(Transportation(|.\(w/.bunkers\))))
\|?
(?P<c>|Liquids\|Oil)
$""",
flags=re.VERBOSE,
)

#: Keywords for :func:`.iamc_like_data_for_query` / :func:`.to_quantity`.
IAMC_KW = dict(non_iso_3166="keep", query="Model != ''", unique="MODEL SCENARIO")
Expand All @@ -50,11 +59,15 @@
#: - :py:`.emi`: computed aviation emissions.
#: - :py:`.emi_in`: input data for aviation and other transport emissions, to be
#: adjusted or overwritten.
#: - :py:`.fe`: computed final energy data.
#: - :py:`.fe_in`: input data for transport final energy, to be adjusted or overwritten.
K = Keys(
bcast=f"broadcast:t:{L}",
input=f"input:n-y-VARIABLE-UNIT:{L}",
emi=f"emission:e-n-t-y-UNIT:{L}",
emi_in=f"emission:e-n-t-y-UNIT:{L}+input",
emi_in=f"emission:e-n-t-y-UNIT:{L}+in",
fe_in=f"fe:c-n-t-y:{L}+in",
fe_out=f"fe:c-n-t-y:{L}+out",
)


Expand Down Expand Up @@ -93,8 +106,10 @@
)


def broadcast_t(version: Literal[1, 2], include_international: bool) -> "AnyQuantity":
"""Quantity to re-add the |t| dimension.
def broadcast_t_emi(
version: Literal[1, 2], include_international: bool
) -> "AnyQuantity":
"""Quantity to re-add the |t| dimension for emission data.

Parameters
----------
Expand Down Expand Up @@ -144,6 +159,22 @@
return genno.Quantity(value[idx], coords={"t": t[idx]})


def broadcast_t_fe() -> "AnyQuantity":
"""Quantity to re-add the |t| dimension for final energy data."""
return genno.Quantity(
pd.DataFrame(
[
["lightoil", "Bunkers", "", +1.0],
["lightoil", "Bunkers|International Aviation", "", +1.0],
["lightoil", "Bunkers", "Liquids|Oil", +1.0],
["lightoil", "Transportation", "", -1.0],
["lightoil", "Transportation", "Liquids|Oil", -1.0],
],
columns=["c", "t", "c_new", "value"],
).set_index(["c", "t", "c_new"])["value"]
)


def e_UNIT(cl_emission: "sdmx.model.common.Codelist") -> "AnyQuantity":
"""Return a quantity for broadcasting.

Expand Down Expand Up @@ -178,7 +209,11 @@


def finalize(
q_all: "TQuantity", q_update: "TQuantity", model_name: str, scenario_name: str
q_all: "TQuantity",
q_emi_update: "TQuantity",
q_fe_update: "TQuantity",
model_name: str,
scenario_name: str,
) -> pd.DataFrame:
"""Finalize output.

Expand Down Expand Up @@ -206,12 +241,12 @@
# Convert `q_all` to pd.Series
s_all = q_all.pipe(_expand).to_series()

# - Convert `q_update` to pd.Series
# - Convert `q_emi_update` to pd.Series
# - Reassemble "Variable" codes.
# - Drop dimensions (e, t).
# - Align index with s_all.
s_update = (
q_update.pipe(_expand)
s_emi_update = (
q_emi_update.pipe(_expand)
.to_frame()
.reset_index()
.assign(
Expand All @@ -221,13 +256,37 @@
.set_index(s_all.index.names)[0]
.rename("value")
)
log.info(f"{len(s_update)} obs to update")

# Update `s_all`. This yields an 'outer join' of the original and s_update indices.
s_all.update(s_update)
log.info(f'{len(s_emi_update)} obs to update for Variable="Emission|…"')

# Likewise for q_fe_update
dim = {"UNIT": [f"{q_fe_update.units:~}".replace("EJ / a", "EJ/yr")]}
s_fe_update = (
q_fe_update.expand_dims(dim=dim)
.pipe(_expand)
.to_frame()
.reset_index()
.assign(
Variable=lambda df: ("Final Energy|" + df["t"] + "|" + df["c"]).str.replace(
r"\|$", "", regex=True
)
)
.drop(["c", "t"], axis=1)
.set_index(s_all.index.names)[0]
.rename("value")
)
log.info(f'{len(s_fe_update)} obs to update for Variable="Final Energy|…"')

# - Concatenate s_all, s_emi_update, and s_fe_update as columns of a data frame.
# The result has the superset of the indices of the arguments.
# - Fill along axes. Values from s_*_update end up in the last column.
# - Select the last column.
# - Reshape to wide format.
# - Rename index levels and restore to columns.
return (
s_all.unstack("y")
pd.concat([s_all, s_emi_update, s_fe_update], axis=1)
.ffill(axis=1)
.iloc[:, -1]
.unstack("y")
.reorder_levels(["Model", "Scenario", "Region", "Variable", "Unit"])
.reset_index()
)
Expand Down Expand Up @@ -291,13 +350,13 @@
url = workflow.scenario_url(context, label_full)
# Optionally apply a regex substitution
URL_SUB = {
"LED-SSP1": ("$", "#102"), # Point to a specific version
"LED-SSP2": ("$", "#108"),
"SSP1": ("$", "#652"),
"SSP2": ("$", "#695"),
"SSP3": ("$", "#569"),
"SSP4": ("$", "#525"),
"SSP5": ("$", "#522"), # Other scenario name
"LED-SSP1": ("$", "#162"), # Point to a specific version
"LED-SSP2": ("$", "#171"),
"SSP1": ("$", "#771"),
"SSP2": ("$", "#869"),
"SSP3": ("$", "#686"),
"SSP4": ("$", "#639"),
"SSP5": ("$", "#649"),
# "SSP5": ("(SSP_2024.5) baseline$", r"\1 baseline#525"), # Other scenario name
}
if pattern_repl := URL_SUB.get(sc.id):
Expand All @@ -308,32 +367,65 @@
log.info(f"method 'C' will use data from {url}")

# Common structure and utility quantities used by method_[ABC]
c.add(K.bcast, broadcast_t, version=2, include_international=method == "A")
c.add(K.bcast, broadcast_t_emi, version=2, include_international=method == "A")

# Placeholder for data-loading task. This is filled in later by process_df() or
# process_file().
c.add(K.input, None)

# Select and transform data matching EXPR_EMI
# Filter on "VARIABLE", expand the (e, t) dimensions from "VARIABLE"
# Filter on "VARIABLE", extract the (e, t) dimensions
c.add(K.emi_in[0], "select_expand", K.input, dim_cb={"VARIABLE": v_to_emi_coords})
# Assign units
c.add(K.emi_in, "assign_units", K.emi_in[0], units="Mt/year")

# Select and transform data matching EXPR_FE
# Filter on "VARIABLE", extract the (c, t) dimensions
dim_cb = {"VARIABLE": v_to_fe_coords}
c.add(K.fe_in[0] * "UNITS", "select_expand", K.input, dim_cb=dim_cb)
# Convert "UNIT" dim labels to Quantity.units
c.add(K.fe_in[1], "unique_units_from_dim", K.fe_in[0] * "UNITS", dim="UNIT")
# Change labels; see get_label()
c.add(K.fe_in, "relabel", K.fe_in[1], labels=get_labels())

# Call a function to prepare the remaining calculations up to K.emi
method_func = {METHOD.A: method_A, METHOD.B: method_B, METHOD.C: method_C}[method]
method_func(c)

# Adjust the original data by adding the (maybe negative) prepared values at K.emi
c.add(K.emi["adj"], "add", K.emi_in, K.emi)
c.add(K.fe_out["adj"], "add", K.fe_in[1], K.fe_out)

# Add a key "target" to:
# - Collapse to IAMC "VARIABLE" dimension name.
# - Recombine with other/unaltered original data.
c.add("target", finalize, K.input, K.emi["adj"], "model name", "scenario name")
c.add(
"target",
finalize,
K.input,
K.emi["adj"],
K.fe_out["adj"],
"model name",
"scenario name",
)

return c


@cache
def get_labels():
"""Return mapper for relabelling input data:

- c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil'.
- n[ode]: "AFR" → "R12_AFR" etc. "World" is not changed.
"""
cl = get_codelist("node/R12")
labels = dict(c={"Liquids|Oil": "lightoil", "": "_T"}, n={})
for n in filter(lambda n: len(n.child) and n.id != "World", cl):
labels["n"][n.id.partition("_")[2]] = n.id
return labels


def get_scenario_code(model_name: str, scenario_name: str) -> "sdmx.model.common.Code":
"""Return a specific code from ``CL_TRANSPORT_SCENARIO``.

Expand Down Expand Up @@ -382,6 +474,9 @@
# Rail and Domestic Shipping"
c.add(K.emi, "mul", K.emi[0] / "t", k_share, K.bcast)

# No change to final energy data
c.add(K.fe_out, genno.Quantity(0.0, units="EJ / a"))


def method_B(c: "Computer") -> None:
"""Prepare calculations up to :data:`K.emi` using :data:`METHOD.B`.
Expand Down Expand Up @@ -460,10 +555,10 @@
A key with dimensions either :math:`(c, n)` or :math:`(c, n, y)` giving the
share of aviation in total transport final energy.
"""
from message_ix_models.model.structure import get_codelist

from message_ix_models.model.transport.key import exo

# Check dimensions of k_emi_share
# Check dimensions of k_fe_share
exp = {frozenset("cn"), frozenset("cny")}
if set(k_fe_share.dims) not in exp: # pragma: no cover
raise ValueError(f"Dimensions of k_cn={k_fe_share.dims} are not in {exp}")
Expand All @@ -472,31 +567,17 @@
k = Keys(
ei=exo.emi_intensity, # Dimensions (c, e, t)
emi0=Key("emission", ("ceny"), L),
fe_in=Key("fe", ("c", "n", "y", "UNIT"), "input"),
fe=Key("fe", tuple("cny"), L),
fe=Key("fe", tuple("cny"), f"{L}+BC"),
units=Key(f"units:e-UNIT:{L}"),
)

### Prepare data from the input data file: total transport consumption of light oil

# Filter on "VARIABLE", extract (e) dimension
c.add(k.fe_in[0], "select_expand", K.input, dim_cb={"VARIABLE": v_to_fe_coords})

# Convert "UNIT" dim labels to Quantity.units
c.add(k.fe_in[1] / "UNIT", "unique_units_from_dim", k.fe_in[0], dim="UNIT")

# Relabel:
# - c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil'
# - n[ode]: "AFR" → "R12_AFR" etc. "World" is not changed.
cl = get_codelist("node/R12")
labels = dict(c={"Liquids|Oil": "lightoil"}, n={})
for n in filter(lambda n: len(n.child) and n.id != "World", cl):
labels["n"][n.id.partition("_")[2]] = n.id
c.add(k.fe_in[2] / "UNIT", "relabel", k.fe_in[1] / "UNIT", labels=labels)
# Select only total transport consumption of lightoil from K.fe_in
indexers = {"t": "Transportation (w/ bunkers)"}
c.add(k.fe[0], "select", K.fe_in, indexers=indexers, drop=True)

### Compute estimate of emissions
# Product of aviation share and FE of total transport → FE of aviation
c.add(k.fe, "mul", k.fe_in[2] / "UNIT", k_fe_share)
c.add(k.fe, "mul", k.fe[0], k_fe_share)

# Convert exogenous emission intensity data to Mt / EJ
c.add(k.ei["units"], "convert_units", k.ei, units="Mt / EJ")
Expand All @@ -517,9 +598,16 @@
c.add(K.emi[2], "mul", k.emi0[1], k.units, K.bcast)

# Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed.
labels = dict(n={v: k for k, v in labels["n"].items()})
labels = dict(n={v: k for k, v in get_labels()["n"].items()})
c.add(K.emi, "relabel", K.emi[2], labels=labels)

# Re-add the "t" dimension with +ve and -ve sign for certain labels
c.add(K.fe_out[0], "mul", k.fe, broadcast_t_fe())
c.add(K.fe_out[1], "drop_vars", K.fe_out[0] * "c_new", names="c")
c.add(K.fe_out[2], "rename_dims", K.fe_out[1], name_dict={"c_new": "c"})
# Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed.
c.add(K.fe_out, "relabel", K.fe_out[2], labels=labels)


def method_C(c: "Computer") -> None:
"""Prepare calculations up to :data:`K.emi` using :data:`METHOD.C`.
Expand Down Expand Up @@ -556,8 +644,8 @@
labels = {"nl": {"R12_GLB": "World"}}
c.add(k.base[1], "relabel", k.base[0], labels=labels, sums=True)

# Select the numerator
c.add(k.share0["num"], "select", k.base[1], indexers=dict(t=["AIR"]), drop=True)
# Select the numerator; drop the 't' dimension
c.add(k.share0["num"], "select", k.base[1], indexers=dict(t="AIR"), drop=True)

Check warning on line 648 in message_ix_models/project/ssp/transport.py

View check run for this annotation

Codecov / codecov/patch

message_ix_models/project/ssp/transport.py#L648

Added line #L648 was not covered by tests
# Ratio of AIR to the total
c.add(k.share0, "div", k.share0["num"], k.base[1] / "t")

Expand Down
Loading
Loading