Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add logging to worker #1948

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ dist/*
**/*.h5
**/*.csv.gz
.env
logs/*
*.log

# Ignore generated credentials from google-github-actions/auth
gha-creds-*.json
Expand Down
6 changes: 6 additions & 0 deletions changelog_entry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
- bump: minor
changes:
added:
- Logging module called WorkerLogger to log microsim runs that use worker
- Basic informational logging for microsim runs
- Warning-level logging for NaN values present in calculated microsim variables
6 changes: 0 additions & 6 deletions policyengine_api/country.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,12 +304,6 @@ def build_entities(self) -> dict:
data[entity.key] = entity_data
return data

# 1. Remove the call to `get_all_variables` (Done)
# 2. Remove the code to check if the calculated variable is within the traced variables array (Done)
# 3. Remove the commented code block that writes to the local_database inside the for loop (Done)
# 4. Delete the code at the end of the function that writes to a file (Done)
# 5. Add code at the end of the function to write to a database (Done)

def calculate(
self,
household: dict,
Expand Down
2 changes: 1 addition & 1 deletion policyengine_api/endpoints/economy/economy.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def get_economic_impact(
)
for r in result
]
job_id = f"reform_impact_{country_id}_{policy_id}_{baseline_policy_id}_{region}_{time_period}_{options_hash}_{api_version}"
job_id = f"impact_{country_id}_{policy_id}_{baseline_policy_id}_{region}_{time_period}"

if len(result) == 0:
RECENT_JOBS[job_id] = dict(
Expand Down
21 changes: 21 additions & 0 deletions policyengine_api/endpoints/economy/reform_impact.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
from policyengine_api.utils import hash_object
from datetime import datetime
import traceback
from policyengine_api.utils.worker_logs import WorkerLogger
from rq import get_current_job


def ensure_economy_computed(
Expand Down Expand Up @@ -55,9 +57,15 @@ def set_reform_impact_data(
baseline_policy: dict,
reform_policy: dict,
):
current_job = get_current_job()
logger = WorkerLogger(job_id=current_job.id)
logger.log("Setting reform impact data")

options_hash = json.dumps(options, sort_keys=True)
baseline_policy_id = int(baseline_policy_id)
policy_id = int(policy_id)
logger.log(f"Baseline policy ID: {baseline_policy_id}")
logger.log(f"Reform policy ID: {policy_id}")
try:
set_reform_impact_data_routine(
baseline_policy_id,
Expand All @@ -71,6 +79,13 @@ def set_reform_impact_data(
)
except Exception as e:
# Save the status as error and the message as the error message
logger.log(
f"Exception raised in set_reform_impact_data: {e}", level="error"
)
logger.log(f"Traceback: {traceback.format_exc()}", level="error")
logger.log(
f"Setting reform #{policy_id} impact data to error", level="error"
)
local_database.query(
"UPDATE reform_impact SET status = ?, message = ?, end_time = ? WHERE country_id = ? AND reform_policy_id = ? AND baseline_policy_id = ? AND region = ? AND time_period = ? AND options_hash = ?",
(
Expand Down Expand Up @@ -110,6 +125,9 @@ def set_reform_impact_data_routine(
time_period (str): The time period, e.g. 2024.
options (dict): Any additional options.
"""
current_job = get_current_job()
logger = WorkerLogger(job_id=current_job.id)
logger.log("Running set_reform_impact_data_routine")
options_hash = json.dumps(options, sort_keys=True)
baseline_policy_id = int(baseline_policy_id)
policy_id = int(policy_id)
Expand Down Expand Up @@ -167,6 +185,7 @@ def set_reform_impact_data_routine(
)
comment = lambda x: set_comment_on_job(x, *identifiers)
comment("Computing baseline")
logger.log("Computing baseline")
baseline_economy = compute_economy(
country_id,
policy_id,
Expand All @@ -176,6 +195,7 @@ def set_reform_impact_data_routine(
policy_json=baseline_policy,
)
comment("Computing reform")
logger.log("Computing reform")
reform_economy = compute_economy(
country_id,
policy_id,
Expand Down Expand Up @@ -213,6 +233,7 @@ def set_reform_impact_data_routine(
baseline_economy = baseline_economy["result"]
reform_economy = reform_economy["result"]
comment("Comparing baseline and reform")
logger.log("Comparing baseline and reform")
impact = compare_economic_outputs(
baseline_economy, reform_economy, country_id=country_id
)
Expand Down
Loading