Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add NPI triggers to GLEAM modelling #536

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/checks-data-pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
python-version: 3.7
- name: Install dependencies
working-directory: data-pipeline
run: python -m pip install --upgrade black
run: python -m pip install --upgrade black==19.10b0
- name: Check with Black
working-directory: data-pipeline
run: black . --check --exclude get-poetry.py
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
__pycache__
.venv
12 changes: 3 additions & 9 deletions data-pipeline/epimodel/algorithms/estimate_r.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@


def preprocess_hopkins(
hopkins_file: str,
rds: RegionDataset,
state_to_country: List[str],
hopkins_file: str, rds: RegionDataset, state_to_country: List[str],
) -> pd.DataFrame:
preprocessed = pd.read_csv(
hopkins_file,
Expand Down Expand Up @@ -70,9 +68,7 @@ def estimate_r(


def mask_not_enough_data(
r_estimates: pd.DataFrame,
hopkins_df: pd.DataFrame,
min_daily_cases: int,
r_estimates: pd.DataFrame, hopkins_df: pd.DataFrame, min_daily_cases: int,
) -> pd.DataFrame:
hopkins_grouped = hopkins_df.groupby("Code")
return r_estimates.groupby("Code").apply(
Expand All @@ -83,9 +79,7 @@ def mask_not_enough_data(


def mask_country_not_enough_data(
country_estimates: pd.DataFrame,
hopkins_df: pd.DataFrame,
min_daily_cases: int,
country_estimates: pd.DataFrame, hopkins_df: pd.DataFrame, min_daily_cases: int,
) -> pd.DataFrame:

hopkins_df = hopkins_df.reset_index().set_index("Date")
Expand Down
37 changes: 15 additions & 22 deletions data-pipeline/epimodel/exports/epidemics_org.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import datetime
import getpass
import json
import os
import logging
import os
import shutil
import socket
import subprocess
from pathlib import Path
from typing import Any, Dict, List, Optional, Iterable
from typing import Any, Dict, Iterable, List, Optional

import numpy as np
import pandas as pd
import simplejson
from tqdm import tqdm

import epimodel
import pandas as pd
from epimodel.imports.johns_hopkins import aggregate_countries

from ..gleam import Batch
from ..regions import Region, RegionDataset
from . import types_to_json, get_df_else_none
import epimodel
from . import get_df_else_none, types_to_json

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -109,20 +111,20 @@ def write(
export_region.data_url = f"{fname}"
if write_country_exports:
with open(export_directory / fname, "wt") as f:
json.dump(
simplejson.dump(
export_region.data_ext,
f,
default=types_to_json,
allow_nan=False,
ignore_nan=True,
separators=(",", ":"),
indent=indent,
)
with open(export_directory / main_data_filename, "wt") as f:
json.dump(
simplejson.dump(
self.to_json(),
f,
default=types_to_json,
allow_nan=False,
ignore_nan=True,
separators=(",", ":"),
indent=indent,
)
Expand Down Expand Up @@ -266,9 +268,7 @@ def get_date_index(models: pd.DataFrame) -> Iterable[datetime.datetime]:
return first

def extract_external_data(
self,
models: pd.DataFrame,
simulation_specs: pd.DataFrame,
self, models: pd.DataFrame, simulation_specs: pd.DataFrame,
) -> Dict[str, Any]:
d: Dict[str, Any] = {
"date_index": [
Expand Down Expand Up @@ -353,12 +353,8 @@ def assert_valid_json(file, minify=False):
)
if minify:
with open(file, "wt") as f:
json.dump(
data,
f,
default=types_to_json,
allow_nan=False,
separators=(",", ":"),
simplejson.dump(
data, f, default=types_to_json, ignore_nan=True, separators=(",", ":"),
)


Expand Down Expand Up @@ -539,10 +535,7 @@ def process_export(
rates, index_col="Code", keep_default_na=False, na_values=[""]
)
timezone_df: pd.DataFrame = pd.read_csv(
timezone,
index_col="Code",
keep_default_na=False,
na_values=[""],
timezone, index_col="Code", keep_default_na=False, na_values=[""],
)

un_age_dist_df: pd.DataFrame = pd.read_csv(un_age_dist, index_col="Code M49").drop(
Expand Down
4 changes: 1 addition & 3 deletions data-pipeline/epimodel/exports/npi_model_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ def process_model_export(
reg: Region = rds[code]

ex.new_region(
reg,
get_df_else_none(npi_model_results_df, code),
extrapolation_date,
reg, get_df_else_none(npi_model_results_df, code), extrapolation_date,
)

return ex
Expand Down
12 changes: 9 additions & 3 deletions data-pipeline/epimodel/gleam/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Batch:
"""

def __init__(self, hdf_file, path, *, _direct=True):
assert not _direct, "Use .new or .load"
assert not _direct, "Use .new or .open"
self.hdf = hdf_file
self.path = path

Expand Down Expand Up @@ -70,13 +70,14 @@ def open(cls, path):
return cls(hdf, path, _direct=False)

@classmethod
def new(cls, *, path=None):
def new(cls, *, path=None, overwrite=False):
"""
Create new batch HDF5 file.
"""
path = Path(path)

assert not path.exists()
if not overwrite:
assert not path.exists()
hdf = pd.HDFStore(path, "w")
return cls(hdf, path, _direct=False)

Expand Down Expand Up @@ -254,6 +255,11 @@ def get_cummulative_active_df(self):

@staticmethod
def generate_sim_stats(cdf: pd.DataFrame, sim_ids: List[str]) -> dict:
if np.sum(np.sum(cdf.isna())) > 0:
log.warning(
f"Creating simulations stats: Unknown daily growths (NaN) are replaced by 0.0"
)
cdf = cdf.fillna(method="ffill")
# get the end date of the simulations
end_date = cdf.index.get_level_values("Date").max()
# get the infected in the end date for the latest date per simulation
Expand Down
196 changes: 196 additions & 0 deletions data-pipeline/epimodel/gleam/npi_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
import io
import logging

import numpy as np

import pandas as pd

from ..regions import Region, RegionDataset
from .batch import Batch
from .definition import GleamDefinition

logger = logging.getLogger(__name__)


class NPITriggerConfig:
def __init__(self, filename=None, text=None):
assert (filename is None) != (text is None)
if filename is not None:
f = filename
else:
f = io.StringIO(text)
df = pd.read_csv(f, header=0, dtype=str, na_values=[], keep_default_na=False)
df = df.applymap(lambda s: s.strip())
assert df.columns[0] == "RegionCode"
assert "Group" in df.columns
df = df.loc[df.RegionCode != ""]
for c in df.columns:
if c not in ("RegionCode", "Group"):
df[c] = pd.to_numeric(df[c])
# Verify uniqueness
_ = df.set_index(["RegionCode", "Group"], verify_integrity=True)

self.df = df
logger.info(
f"Loaded NPI triggers for {len(self.df)} regions: {', '.join(self.df.RegionCode.values)}"
)

def create_updated_batch(
self,
in_batch: Batch,
out_batch: Batch,
rds: RegionDataset,
truncated_simulations: bool = True,
window=14,
):
for k in in_batch.hdf.keys():
if k not in ("/new_fraction", "/triggered_NPIs", "/simulations"):
out_batch.hdf[k] = in_batch.hdf[k]

try:
triggered = in_batch.hdf.get("triggered_NPIs")
except KeyError:
triggered = pd.DataFrame(
{
"SimulationID": pd.Series([], dtype=str),
"RegionCode": pd.Series([], dtype=str),
"Group": pd.Series([], dtype=str),
"Trace": pd.Series([], dtype=str),
"Day": pd.Series([], dtype=str),
"DayNumber": pd.Series([], dtype=np.int32),
"Level": pd.Series([], dtype=np.int32),
}
)
sims = in_batch.hdf.get("simulations")
new_fraction = in_batch.hdf.get("new_fraction")
new_sims = []
new_triggered = triggered.copy()
new_trigger_count = 0

sim_map = {}
# For each simulation
for sim_no, (sid, simrow) in enumerate(sims.iterrows()):
group = simrow.Group
definition = GleamDefinition.from_xml_string(simrow.DefinitionXML)
definition.set_id(int(pd.Timestamp.utcnow().timestamp() * 1000) + sim_no)
new_sid = definition.get_id_str()
sim_map[sid] = new_sid

logger.info(
f" Triggers for simulation {sid}: group {group!r}, trace {simrow.Trace!r}"
)
for _, triggerrow in self.df[self.df.Group == group].iterrows():
# print(triggerrow, triggered)
r = triggerrow.RegionCode
td = triggered.loc[
(triggered.RegionCode == r) & (triggered.SimulationID == sid)
].sort_values("DayNumber")
if len(td) > 0:
last_level = td.Level.iloc[-1]
last_day = td.DayNumber.iloc[-1]
else:
last_level = 0
last_day = 0

pop = rds[r].Population
up_fraction = (
triggerrow.get(f"L{last_level + 1}-Start-14d", np.nan) / pop
)
down_fraction = triggerrow.get(f"L{last_level}-Stop-14d", np.nan) / pop

window_sums = (
new_fraction.loc[(sid, r)]
.rolling(window, min_periods=0)
.sum()
.Infected
)
assert len(window_sums) == len(new_fraction.loc[(sid, r)].Infected)

def switch_to_level(day_no, old_level, level):
"Perform modifications when level change is detected"
nonlocal new_triggered, new_trigger_count
day = new_fraction.index.levels[2].values[day_no]
beta = triggerrow[f"L{level}-beta"]
logger.info(
f" Triggered in {r}: L{old_level}->L{level} on day {day_no} ({day}) (beta={beta})"
)

# Truncate model data at the trigger day (later days are invalid until recompute)
if truncated_simulations:
new_fraction.loc[
(sid, r, slice(day, None)), "Infected"
] = np.nan
new_fraction.loc[
(sid, r, slice(day, None)), "Recovered"
] = np.nan

# Register the trigger
# NOTE: Quadratic somplexity, batch inserts if it seems as a problem
new_triggered = new_triggered.append(
{
"SimulationID": new_sid,
"Group": group,
"Trace": simrow.Trace,
"RegionCode": r,
"DayNumber": day_no,
"Day": str(day),
"Level": level,
},
ignore_index=True,
)
new_trigger_count += 1

# Add exception to new_sims XML
regs = set([rds[r]])
# Hack: add 2 levels of children to get over all overrides
for nr in rds[r].children:
regs.add(nr)
for nr2 in nr.children:
regs.add(nr2)

definition.add_exception(list(regs), {"beta": beta}, start=day)

for i in range(last_day + 1, len(window_sums)):
if window_sums[i] > up_fraction:
switch_to_level(i, last_level, last_level + 1)
break
if window_sums[i] < down_fraction:
switch_to_level(i, last_level, last_level - 1)
break

# Make a new simulation, set new ID
definition.format_exceptions()
new_sims.append(
{
"SimulationID": definition.get_id_str(),
"Group": group,
"Trace": simrow.Trace,
"StartDate": simrow.StartDate,
"DefinitionXML": definition.to_xml_string(),
}
)

out_batch.hdf.put(
"triggered_NPIs",
new_triggered,
format="table",
complib="bzip2",
complevel=4,
)
new_sims = pd.DataFrame(new_sims).set_index("SimulationID")
out_batch.hdf.put(
"simulations", new_sims, format="table", complib="bzip2", complevel=4,
)
if truncated_simulations:
new_fraction.rename(index=sim_map, inplace=True)
out_batch.hdf.put(
"new_fraction",
new_fraction,
format="table",
complib="bzip2",
complevel=4,
)
logger.info(
f'All triggers:\n{new_triggered.sort_values(["Group", "Trace", "RegionCode", "DayNumber"])}'
)
logger.info(f">>> Added {new_trigger_count} new triggers")
6 changes: 1 addition & 5 deletions data-pipeline/epimodel/gleam/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,7 @@ def _prepare_exceptions(self, exceptions: pd.DataFrame) -> pd.DataFrame:
.apply(lambda group: dict(zip(group["Parameter"], group["Value"])))
.reset_index()
.rename(
columns={
0: "variables",
"Start date": "start",
"End date": "end",
}
columns={0: "variables", "Start date": "start", "End date": "end",}
)[output_columns]
)

Expand Down
Loading