Skip to content

Commit c1b6918

Browse files
committed
Modify "Final Energy|…" data in .ssp.transport
- New function broadcast_t_fe(). - Update test expectations.
1 parent c610a33 commit c1b6918

File tree

2 files changed

+157
-60
lines changed

2 files changed

+157
-60
lines changed

message_ix_models/project/ssp/transport.py

Lines changed: 117 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from genno import Key
1313

1414
from message_ix_models import Context
15+
from message_ix_models.model.structure import get_codelist
1516
from message_ix_models.tools.iamc import iamc_like_data_for_query, to_quantity
1617
from message_ix_models.util import minimum_version
1718
from message_ix_models.util.genno import Keys
@@ -58,11 +59,15 @@
5859
#: - :py:`.emi`: computed aviation emissions.
5960
#: - :py:`.emi_in`: input data for aviation and other transport emissions, to be
6061
#: adjusted or overwritten.
62+
#: - :py:`.fe`: computed final energy data.
63+
#: - :py:`.fe_in`: input data for transport final energy, to be adjusted or overwritten.
6164
K = Keys(
6265
bcast=f"broadcast:t:{L}",
6366
input=f"input:n-y-VARIABLE-UNIT:{L}",
6467
emi=f"emission:e-n-t-y-UNIT:{L}",
65-
emi_in=f"emission:e-n-t-y-UNIT:{L}+input",
68+
emi_in=f"emission:e-n-t-y-UNIT:{L}+in",
69+
fe_in=f"fe:c-n-t-y:{L}+in",
70+
fe_out=f"fe:c-n-t-y:{L}+out",
6671
)
6772

6873

@@ -101,8 +106,10 @@ def aviation_emi_share(ref: "TQuantity") -> "TQuantity":
101106
)
102107

103108

104-
def broadcast_t(version: Literal[1, 2], include_international: bool) -> "AnyQuantity":
105-
"""Quantity to re-add the |t| dimension.
109+
def broadcast_t_emi(
110+
version: Literal[1, 2], include_international: bool
111+
) -> "AnyQuantity":
112+
"""Quantity to re-add the |t| dimension for emission data.
106113
107114
Parameters
108115
----------
@@ -152,6 +159,22 @@ def broadcast_t(version: Literal[1, 2], include_international: bool) -> "AnyQuan
152159
return genno.Quantity(value[idx], coords={"t": t[idx]})
153160

154161

162+
def broadcast_t_fe() -> "AnyQuantity":
163+
"""Quantity to re-add the |t| dimension for final energy data."""
164+
return genno.Quantity(
165+
pd.DataFrame(
166+
[
167+
["lightoil", "Bunkers", "", +1.0],
168+
["lightoil", "Bunkers|International Aviation", "", +1.0],
169+
["lightoil", "Bunkers", "Liquids|Oil", +1.0],
170+
["lightoil", "Transportation", "", -1.0],
171+
["lightoil", "Transportation", "Liquids|Oil", -1.0],
172+
],
173+
columns=["c", "t", "c_new", "value"],
174+
).set_index(["c", "t", "c_new"])["value"]
175+
)
176+
177+
155178
def e_UNIT(cl_emission: "sdmx.model.common.Codelist") -> "AnyQuantity":
156179
"""Return a quantity for broadcasting.
157180
@@ -186,7 +209,11 @@ def e_UNIT(cl_emission: "sdmx.model.common.Codelist") -> "AnyQuantity":
186209

187210

188211
def finalize(
189-
q_all: "TQuantity", q_update: "TQuantity", model_name: str, scenario_name: str
212+
q_all: "TQuantity",
213+
q_emi_update: "TQuantity",
214+
q_fe_update: "TQuantity",
215+
model_name: str,
216+
scenario_name: str,
190217
) -> pd.DataFrame:
191218
"""Finalize output.
192219
@@ -214,12 +241,12 @@ def _expand(qty):
214241
# Convert `q_all` to pd.Series
215242
s_all = q_all.pipe(_expand).to_series()
216243

217-
# - Convert `q_update` to pd.Series
244+
# - Convert `q_emi_update` to pd.Series
218245
# - Reassemble "Variable" codes.
219246
# - Drop dimensions (e, t).
220247
# - Align index with s_all.
221-
s_update = (
222-
q_update.pipe(_expand)
248+
s_emi_update = (
249+
q_emi_update.pipe(_expand)
223250
.to_frame()
224251
.reset_index()
225252
.assign(
@@ -229,13 +256,37 @@ def _expand(qty):
229256
.set_index(s_all.index.names)[0]
230257
.rename("value")
231258
)
232-
log.info(f"{len(s_update)} obs to update")
233-
234-
# Update `s_all`. This yields an 'outer join' of the original and s_update indices.
235-
s_all.update(s_update)
259+
log.info(f'{len(s_emi_update)} obs to update for Variable="Emission|…"')
236260

261+
# Likewise for q_fe_update
262+
dim = {"UNIT": [f"{q_fe_update.units:~}".replace("EJ / a", "EJ/yr")]}
263+
s_fe_update = (
264+
q_fe_update.expand_dims(dim=dim)
265+
.pipe(_expand)
266+
.to_frame()
267+
.reset_index()
268+
.assign(
269+
Variable=lambda df: ("Final Energy|" + df["t"] + "|" + df["c"]).str.replace(
270+
r"\|$", "", regex=True
271+
)
272+
)
273+
.drop(["c", "t"], axis=1)
274+
.set_index(s_all.index.names)[0]
275+
.rename("value")
276+
)
277+
log.info(f'{len(s_fe_update)} obs to update for Variable="Final Energy|…"')
278+
279+
# - Concatenate s_all, s_emi_update, and s_fe_update as columns of a data frame.
280+
# The result has the superset of the indices of the arguments.
281+
# - Fill along axes. Values from s_*_update end up in the last column.
282+
# - Select the last column.
283+
# - Reshape to wide format.
284+
# - Rename index levels and restore to columns.
237285
return (
238-
s_all.unstack("y")
286+
pd.concat([s_all, s_emi_update, s_fe_update], axis=1)
287+
.ffill(axis=1)
288+
.iloc[:, -1]
289+
.unstack("y")
239290
.reorder_levels(["Model", "Scenario", "Region", "Variable", "Unit"])
240291
.reset_index()
241292
)
@@ -316,32 +367,65 @@ def get_computer(
316367
log.info(f"method 'C' will use data from {url}")
317368

318369
# Common structure and utility quantities used by method_[ABC]
319-
c.add(K.bcast, broadcast_t, version=2, include_international=method == "A")
370+
c.add(K.bcast, broadcast_t_emi, version=2, include_international=method == "A")
320371

321372
# Placeholder for data-loading task. This is filled in later by process_df() or
322373
# process_file().
323374
c.add(K.input, None)
324375

325376
# Select and transform data matching EXPR_EMI
326-
# Filter on "VARIABLE", expand the (e, t) dimensions from "VARIABLE"
377+
# Filter on "VARIABLE", extract the (e, t) dimensions
327378
c.add(K.emi_in[0], "select_expand", K.input, dim_cb={"VARIABLE": v_to_emi_coords})
379+
# Assign units
328380
c.add(K.emi_in, "assign_units", K.emi_in[0], units="Mt/year")
329381

382+
# Select and transform data matching EXPR_FE
383+
# Filter on "VARIABLE", extract the (c, t) dimensions
384+
dim_cb = {"VARIABLE": v_to_fe_coords}
385+
c.add(K.fe_in[0] * "UNITS", "select_expand", K.input, dim_cb=dim_cb)
386+
# Convert "UNIT" dim labels to Quantity.units
387+
c.add(K.fe_in[1], "unique_units_from_dim", K.fe_in[0] * "UNITS", dim="UNIT")
388+
# Change labels; see get_label()
389+
c.add(K.fe_in, "relabel", K.fe_in[1], labels=get_labels())
390+
330391
# Call a function to prepare the remaining calculations up to K.emi
331392
method_func = {METHOD.A: method_A, METHOD.B: method_B, METHOD.C: method_C}[method]
332393
method_func(c)
333394

334395
# Adjust the original data by adding the (maybe negative) prepared values at K.emi
335396
c.add(K.emi["adj"], "add", K.emi_in, K.emi)
397+
c.add(K.fe_out["adj"], "add", K.fe_in[1], K.fe_out)
336398

337399
# Add a key "target" to:
338400
# - Collapse to IAMC "VARIABLE" dimension name.
339401
# - Recombine with other/unaltered original data.
340-
c.add("target", finalize, K.input, K.emi["adj"], "model name", "scenario name")
402+
c.add(
403+
"target",
404+
finalize,
405+
K.input,
406+
K.emi["adj"],
407+
K.fe_out["adj"],
408+
"model name",
409+
"scenario name",
410+
)
341411

342412
return c
343413

344414

415+
@cache
416+
def get_labels():
417+
"""Return mapper for relabelling input data:
418+
419+
- c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil'.
420+
- n[ode]: "AFR" → "R12_AFR" etc. "World" is not changed.
421+
"""
422+
cl = get_codelist("node/R12")
423+
labels = dict(c={"Liquids|Oil": "lightoil", "": "_T"}, n={})
424+
for n in filter(lambda n: len(n.child) and n.id != "World", cl):
425+
labels["n"][n.id.partition("_")[2]] = n.id
426+
return labels
427+
428+
345429
def get_scenario_code(model_name: str, scenario_name: str) -> "sdmx.model.common.Code":
346430
"""Return a specific code from ``CL_TRANSPORT_SCENARIO``.
347431
@@ -390,6 +474,9 @@ def method_A(c: "Computer") -> None:
390474
# Rail and Domestic Shipping"
391475
c.add(K.emi, "mul", K.emi[0] / "t", k_share, K.bcast)
392476

477+
# No change to final energy data
478+
c.add(K.fe_out, genno.Quantity(0.0, units="EJ / a"))
479+
393480

394481
def method_B(c: "Computer") -> None:
395482
"""Prepare calculations up to :data:`K.emi` using :data:`METHOD.B`.
@@ -468,10 +555,10 @@ def method_BC_common(c: "Computer", k_fe_share: "Key") -> None:
468555
A key with dimensions either :math:`(c, n)` or :math:`(c, n, y)` giving the
469556
share of aviation in total transport final energy.
470557
"""
471-
from message_ix_models.model.structure import get_codelist
558+
472559
from message_ix_models.model.transport.key import exo
473560

474-
# Check dimensions of k_emi_share
561+
# Check dimensions of k_fe_share
475562
exp = {frozenset("cn"), frozenset("cny")}
476563
if set(k_fe_share.dims) not in exp: # pragma: no cover
477564
raise ValueError(f"Dimensions of k_cn={k_fe_share.dims} are not in {exp}")
@@ -480,31 +567,17 @@ def method_BC_common(c: "Computer", k_fe_share: "Key") -> None:
480567
k = Keys(
481568
ei=exo.emi_intensity, # Dimensions (c, e, t)
482569
emi0=Key("emission", ("ceny"), L),
483-
fe_in=Key("fe", ("c", "n", "y", "UNIT"), "input"),
484-
fe=Key("fe", tuple("cny"), L),
570+
fe=Key("fe", tuple("cny"), f"{L}+BC"),
485571
units=Key(f"units:e-UNIT:{L}"),
486572
)
487573

488-
### Prepare data from the input data file: total transport consumption of light oil
489-
490-
# Filter on "VARIABLE", extract (e) dimension
491-
c.add(k.fe_in[0], "select_expand", K.input, dim_cb={"VARIABLE": v_to_fe_coords})
492-
493-
# Convert "UNIT" dim labels to Quantity.units
494-
c.add(k.fe_in[1] / "UNIT", "unique_units_from_dim", k.fe_in[0], dim="UNIT")
495-
496-
# Relabel:
497-
# - c[ommodity]: 'Liquids|Oil' (IAMC 'variable' component) → 'lightoil'
498-
# - n[ode]: "AFR" → "R12_AFR" etc. "World" is not changed.
499-
cl = get_codelist("node/R12")
500-
labels = dict(c={"Liquids|Oil": "lightoil"}, n={})
501-
for n in filter(lambda n: len(n.child) and n.id != "World", cl):
502-
labels["n"][n.id.partition("_")[2]] = n.id
503-
c.add(k.fe_in[2] / "UNIT", "relabel", k.fe_in[1] / "UNIT", labels=labels)
574+
# Select only total transport consumption of lightoil from K.fe_in
575+
indexers = {"t": "Transportation (w/ bunkers)"}
576+
c.add(k.fe[0], "select", K.fe_in, indexers=indexers, drop=True)
504577

505578
### Compute estimate of emissions
506579
# Product of aviation share and FE of total transport → FE of aviation
507-
c.add(k.fe, "mul", k.fe_in[2] / "UNIT", k_fe_share)
580+
c.add(k.fe, "mul", k.fe[0], k_fe_share)
508581

509582
# Convert exogenous emission intensity data to Mt / EJ
510583
c.add(k.ei["units"], "convert_units", k.ei, units="Mt / EJ")
@@ -525,9 +598,16 @@ def method_BC_common(c: "Computer", k_fe_share: "Key") -> None:
525598
c.add(K.emi[2], "mul", k.emi0[1], k.units, K.bcast)
526599

527600
# Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed.
528-
labels = dict(n={v: k for k, v in labels["n"].items()})
601+
labels = dict(n={v: k for k, v in get_labels()["n"].items()})
529602
c.add(K.emi, "relabel", K.emi[2], labels=labels)
530603

604+
# Re-add the "t" dimension with +ve and -ve sign for certain labels
605+
c.add(K.fe_out[0], "mul", k.fe, broadcast_t_fe())
606+
c.add(K.fe_out[1], "drop_vars", K.fe_out[0] * "c_new", names="c")
607+
c.add(K.fe_out[2], "rename_dims", K.fe_out[1], name_dict={"c_new": "c"})
608+
# Restore labels: "R12_AFR" → "AFR" etc. "World" is not changed.
609+
c.add(K.fe_out, "relabel", K.fe_out[2], labels=labels)
610+
531611

532612
def method_C(c: "Computer") -> None:
533613
"""Prepare calculations up to :data:`K.emi` using :data:`METHOD.C`.

message_ix_models/tests/project/ssp/test_transport.py

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from collections.abc import Callable, Hashable
23
from functools import cache
34
from typing import TYPE_CHECKING
@@ -21,6 +22,8 @@
2122
if TYPE_CHECKING:
2223
import pathlib
2324

25+
log = logging.getLogger(__name__)
26+
2427
METHOD_PARAM = (
2528
METHOD.A,
2629
METHOD.B,
@@ -88,8 +91,26 @@ def _to_long(df):
8891
assert expected_variables(IN_, method) <= set(df_in["Variable"].unique())
8992
region = set(df_in["Region"].unique())
9093

94+
# Identify the directory from which IEA EWEB data is read
95+
iea_eweb_dir = web.dir_fallback(
96+
web.FILES[("IEA", "2024")][0], where=web.IEA_EWEB._where()
97+
)
98+
# True if the fuzzed test data are being used
99+
iea_eweb_test_data = iea_eweb_dir.match("message_ix_models/data/test/iea/web")
100+
log.info(f"{iea_eweb_test_data = }")
101+
102+
# Number of added values; number of modified values
103+
N_new, N_exp = {
104+
(METHOD.A, False): (0, 10280),
105+
(METHOD.A, True): (0, 10280),
106+
(METHOD.B, False): (13 * 20, 10120),
107+
(METHOD.B, True): (10 * 20, 7720),
108+
(METHOD.C, False): (13 * 20, 7000),
109+
(METHOD.C, True): (13 * 20, 7000),
110+
}[(method, iea_eweb_test_data)]
111+
91112
# Data have the same length
92-
assert len(df_in) == len(df_out)
113+
assert len(df_in) + N_new == len(df_out)
93114

94115
# Output has the same set of region codes as input
95116
assert region == set(df_out["Region"].unique())
@@ -104,32 +125,15 @@ def _to_long(df):
104125
.query("abs(value_out - value_in) > 1e-16")
105126
)
106127

107-
# Identify the directory from which IEA EWEB data is read
108-
iea_eweb_dir = web.dir_fallback(
109-
web.FILES[("IEA", "2024")][0], where=web.IEA_EWEB._where()
110-
)
111-
# True if the fuzzed test data are being used
112-
iea_eweb_test_data = iea_eweb_dir.match("message_ix_models/data/test/iea/web")
113-
114128
# All regions and "World" have modified values
115129
N_reg = {METHOD.A: 13, METHOD.B: 9, METHOD.C: 13}[method]
116130
assert N_reg <= len(df["Region"].unique())
117131

118-
# Number of modified values
119-
N_exp = {
120-
(METHOD.A, False): 10280,
121-
(METHOD.A, True): 10280,
122-
(METHOD.B, False): 10120,
123-
(METHOD.B, True): 7720,
124-
(METHOD.C, False): 7000,
125-
(METHOD.C, True): 7000,
126-
}[(method, iea_eweb_test_data)]
127-
128-
if N_exp != len(df):
129-
# df.to_csv("debug-diff.csv") # DEBUG Dump to file
130-
# print(df.to_string(max_rows=50)) # DEBUG Show in test output
131-
msg = f"Unexpected number of modified values: {N_exp} != {len(df)}"
132-
assert N_exp == len(df)
132+
# if N_exp != len(df): # DEBUG
133+
# df.to_csv("debug-diff.csv") # Dump to file
134+
# print(df.to_string(max_rows=50)) # Show in test output
135+
# msg = f"Unexpected number of modified values: {N_exp} != {len(df)}"
136+
# assert N_exp == len(df), msg
133137

134138
# All of the expected 'variable' codes have been modified
135139
assert expected_variables(OUT, method) == set(df["Variable"].unique())
@@ -149,6 +153,8 @@ def expected_variables(flag: int, method: METHOD) -> set[str]:
149153
edt = "Energy|Demand|Transportation"
150154

151155
result = set()
156+
157+
# Emissions
152158
for e in SPECIES:
153159
# Expected data flows in which these variable codes appear
154160
exp = IN_ if (e in SPECIES_WITHOUT_EF and method != METHOD.A) else I_O
@@ -160,6 +166,17 @@ def expected_variables(flag: int, method: METHOD) -> set[str]:
160166
f"Emissions|{e}|{edt}|Road Rail and Domestic Shipping",
161167
}
162168

169+
# Final Energy
170+
if method != METHOD.A:
171+
result |= {
172+
"Final Energy|Bunkers",
173+
"Final Energy|Bunkers|Liquids|Oil",
174+
"Final Energy|Transportation",
175+
"Final Energy|Transportation|Liquids|Oil",
176+
}
177+
if flag & OUT:
178+
result.add("Final Energy|Bunkers|International Aviation")
179+
163180
return result
164181

165182

0 commit comments

Comments
 (0)