Skip to content

Commit

Permalink
Split simulations into chunks (#1938)
Browse files Browse the repository at this point in the history
* Add progress

* Add progress

* Progress

* Add progress in adding parallelisation!

* Add final fixes

* Add two workers

* Add error handling

* Fix bug

* Add error handling strength

* Fix bug

* Turn down memory to 32gb

* Fix bug

* Fix bugs

* Download microdata first
  • Loading branch information
nikhilwoodruff authored Nov 5, 2024
1 parent 4546ddb commit 4301f2d
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 18 deletions.
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ debug:
test:
MAX_HOUSEHOLDS=1000 pytest tests

microdata:
python policyengine_api/download_microdata.py

debug-test:
MAX_HOUSEHOLDS=1000 FLASK_DEBUG=1 pytest -vv --durations=0 tests

Expand Down
4 changes: 4 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
- bump: minor
changes:
added:
- Chunking and baseline/reform parallelisation.
2 changes: 1 addition & 1 deletion gcp/policyengine_api/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ ADD . /app
# Make start.sh executable
RUN chmod +x /app/start.sh

RUN cd /app && make install && make test
RUN cd /app && make install && make microdata && make test

# Use full path to start.sh
CMD ["/bin/sh", "/app/start.sh"]
6 changes: 3 additions & 3 deletions gcp/policyengine_api/app.yaml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
runtime: custom
env: flex
resources:
cpu: 24
memory_gb: 128
disk_size_gb: 128
cpu: 16
memory_gb: 32
disk_size_gb: 64
automatic_scaling:
min_num_instances: 1
max_num_instances: 1
Expand Down
4 changes: 4 additions & 0 deletions gcp/policyengine_api/start.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ gunicorn -b :$PORT policyengine_api.api --timeout 300 --workers 5 &
# Start the redis server
redis-server &
# Start the worker
python3 policyengine_api/worker.py &
python3 policyengine_api/worker.py &
python3 policyengine_api/worker.py &
python3 policyengine_api/worker.py &
python3 policyengine_api/worker.py
2 changes: 1 addition & 1 deletion policyengine_api/data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,6 @@ def initialize(self):
if os.environ.get("FLASK_DEBUG") == "1":
database = PolicyEngineDatabase(local=True, initialize=False)
else:
database = PolicyEngineDatabase(local=False, initialize=False)
database = PolicyEngineDatabase(local=True, initialize=False)

local_database = PolicyEngineDatabase(local=True, initialize=False)
15 changes: 15 additions & 0 deletions policyengine_api/download_microdata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from policyengine_us_data import EnhancedCPS_2024, CPS_2024
from policyengine_uk_data import EnhancedFRS_2022_23

DATASETS = [EnhancedCPS_2024, CPS_2024, EnhancedFRS_2022_23]


def download_microdata():
for dataset in DATASETS:
dataset = dataset()
if not dataset.exists:
dataset.download()


if __name__ == "__main__":
download_microdata()
51 changes: 51 additions & 0 deletions policyengine_api/endpoints/economy/chunks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import time
from tqdm import tqdm
import numpy as np


def calc_chunks(variables=None, count_chunks=5, logger=None, sim=None):
for i in range(len(variables)):
if isinstance(variables[i], str):
variables[i] = (variables[i], sim.default_calculation_period)
variables = [
(variable, time_period)
for variable, time_period in variables
if variable in sim.tax_benefit_system.variables
]
if count_chunks > 1:
households = sim.calculate("household_id", 2024).values
chunk_size = len(households) // count_chunks + 1
input_df = sim.to_input_dataframe()

variable_data = {
variable: np.array([]) for variable, time_period in variables
}

for i in tqdm(range(count_chunks)):
if logger is not None:
pct_complete = i / count_chunks
logger(pct_complete)
households_in_chunk = households[
i * chunk_size : (i + 1) * chunk_size
]
chunk_df = input_df[
input_df["household_id__2024"].isin(households_in_chunk)
]

subset_sim = type(sim)(dataset=chunk_df, reform=sim.reform)
subset_sim.default_calculation_period = (
sim.default_calculation_period
)

for variable, time_period in variables:
chunk_values = subset_sim.calculate(
variable, time_period
).values
variable_data[variable] = np.concatenate(
[variable_data[variable], chunk_values]
)

for variable, time_period in variables:
sim.set_input(variable, time_period, variable_data[variable])

return sim
41 changes: 33 additions & 8 deletions policyengine_api/endpoints/economy/reform_impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@
from policyengine_api.utils import hash_object
from datetime import datetime
import traceback
from rq import Queue
from rq.job import Job
from redis import Redis
import time

queue = Queue(connection=Redis())


def ensure_economy_computed(
Expand Down Expand Up @@ -166,24 +172,43 @@ def set_reform_impact_data_routine(
),
)
comment = lambda x: set_comment_on_job(x, *identifiers)
comment("Computing baseline")
baseline_economy = compute_economy(
country_id,
policy_id,

baseline_economy = queue.enqueue(
compute_economy,
country_id=country_id,
policy_id=baseline_policy_id,
region=region,
time_period=time_period,
options=options,
policy_json=baseline_policy,
)
comment("Computing reform")
reform_economy = compute_economy(
country_id,
policy_id,
reform_economy = queue.enqueue(
compute_economy,
country_id=country_id,
policy_id=policy_id,
region=region,
time_period=time_period,
options=options,
policy_json=reform_policy,
)
while baseline_economy.get_status() in ("queued", "started"):
time.sleep(1)
while reform_economy.get_status() in ("queued", "started"):
time.sleep(1)
if reform_economy.get_status() != "finished":
reform_economy = {
"status": "error",
"message": "Error computing reform economy.",
}
else:
reform_economy = reform_economy.result
if baseline_economy.get_status() != "finished":
baseline_economy = {
"status": "error",
"message": "Error computing baseline economy.",
}
else:
baseline_economy = baseline_economy.result
if baseline_economy["status"] != "ok" or reform_economy["status"] != "ok":
local_database.query(
"UPDATE reform_impact SET status = ?, message = ?, end_time = ?, reform_impact_json = ? WHERE country_id = ? AND reform_policy_id = ? AND baseline_policy_id = ? AND region = ? AND time_period = ? AND options_hash = ?",
Expand Down
74 changes: 70 additions & 4 deletions policyengine_api/endpoints/economy/single_economy.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,46 @@
from policyengine_uk import Microsimulation
import time
import os
from policyengine_api.endpoints.economy.chunks import calc_chunks


def compute_general_economy(
simulation: Microsimulation, country_id: str = None
simulation: Microsimulation,
country_id: str = None,
simulation_type: str = None,
comment=None,
) -> dict:
variables = [
"labor_supply_behavioral_response",
"employment_income_behavioral_response",
"household_tax",
"household_benefits",
"household_state_income_tax",
"weekly_hours_worked_behavioural_response_income_elasticity",
"weekly_hours_worked_behavioural_response_substitution_elasticity",
"household_net_income",
"household_market_income",
"in_poverty",
"in_deep_poverty",
"poverty_gap",
"deep_poverty_gap",
"income_tax",
"national_insurance",
"vat",
"council_tax",
"fuel_duty",
"tax_credits",
"universal_credit",
"child_benefit",
"state_pension",
"pension_credit",
"ni_employer",
]
# chunk_logger = lambda pct_complete: comment(f"Simulation {simulation_type}: {pct_complete:.0%}")
calc_chunks(
sim=simulation, variables=variables, count_chunks=4, logger=None
)

total_tax = simulation.calculate("household_tax").sum()
total_spending = simulation.calculate("household_benefits").sum()

Expand Down Expand Up @@ -226,7 +261,7 @@ def compute_cliff_impact(
}


def compute_economy(
def get_microsimulation(
country_id: str,
policy_id: str,
region: str,
Expand Down Expand Up @@ -311,10 +346,41 @@ def compute_economy(
"person_weight"
).get_known_periods():
simulation.delete_arrays("person_weight", time_period)
print(f"Initialised simulation in {time.time() - start} seconds")

return simulation


def compute_economy(
country_id: str,
policy_id: str,
region: str,
time_period: str,
options: dict,
policy_json: dict,
simulation_type: str = None,
comment=None,
):
simulation = get_microsimulation(
country_id,
policy_id,
region,
time_period,
options,
policy_json,
)
if options.get("target") == "cliff":
return compute_cliff_impact(simulation)
print(f"Initialised simulation in {time.time() - start} seconds")
economy = compute_general_economy(simulation, country_id=country_id)
start = time.time()
try:
economy = compute_general_economy(
simulation,
country_id=country_id,
simulation_type=simulation_type,
comment=comment,
)
except Exception as e:
print(f"Error in economy computation: {e}")
return {"status": "error", "message": str(e)}
print(f"Computed economy in {time.time() - start} seconds")
return {"status": "ok", "result": economy}
9 changes: 8 additions & 1 deletion tests/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import redis
import pytest
from policyengine_api.api import app
from policyengine_api.download_microdata import download_microdata

download_microdata()


@contextmanager
Expand All @@ -31,6 +34,10 @@ def client():
with running(["redis-server"], 3):
redis_client = redis.Redis()
redis_client.ping()
with running([sys.executable, "policyengine_api/worker.py"], 3):
with running(
[sys.executable, "policyengine_api/worker.py"], 3
), running([sys.executable, "policyengine_api/worker.py"], 3), running(
[sys.executable, "policyengine_api/worker.py"], 3
):
with app.test_client() as test_client:
yield test_client

0 comments on commit 4301f2d

Please sign in to comment.