Skip to content

Commit

Permalink
Handling exceptions thrown from threads span by ThreadPoolExecutor - …
Browse files Browse the repository at this point in the history
…TRON-2202 (#969)

* Handling exceptions thrown from threads span by ThreadPoolExecutor

* Addressing reviews

* Addressing more reviews

* Removed Exit code
  • Loading branch information
EmanElsaban authored Jun 4, 2024
1 parent d164542 commit 9707800
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 11 deletions.
12 changes: 12 additions & 0 deletions tests/serialize/runstate/dynamodb_state_store_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,15 @@ def test_retry_reading(self, store, small_object, large_object):
store.restore(keys)
except Exception:
assert_equal(mock_failed_read.call_count, 11)

def test_restore_exception_propagation(self, store, small_object):
# This test is to ensure that restore propagates exceptions upwards: see DAR-2328
keys = [store.build_key("thing", i) for i in range(3)]

mock_future = mock.MagicMock()
mock_future.result.side_effect = Exception("mocked exception")
with mock.patch("concurrent.futures.Future", return_value=mock_future, autospec=True):
with mock.patch("concurrent.futures.as_completed", return_value=[mock_future], autospec=True):
with pytest.raises(Exception) as exec_info:
store.restore(keys)
assert str(exec_info.value) == "mocked exception"
24 changes: 14 additions & 10 deletions tron/serialize/runstate/dynamodb_state_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,16 +85,20 @@ def _get_items(self, table_keys: list) -> object:
# request otherwise
cand_keys_list = []
for resp in concurrent.futures.as_completed(responses):
items.extend(resp.result()["Responses"][self.name])
# add any potential unprocessed keys to the thread pool
if resp.result()["UnprocessedKeys"].get(self.name) and attempts_to_retrieve_keys < MAX_ATTEMPTS:
cand_keys_list.append(resp.result()["UnprocessedKeys"][self.name]["Keys"])
elif attempts_to_retrieve_keys >= MAX_ATTEMPTS:
failed_keys = resp.result()["UnprocessedKeys"][self.name]["Keys"]
error = Exception(
f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{failed_keys}\n from dynamodb\n{resp.result()}"
)
raise error
try:
items.extend(resp.result()["Responses"][self.name])
# add any potential unprocessed keys to the thread pool
if resp.result()["UnprocessedKeys"].get(self.name) and attempts_to_retrieve_keys < MAX_ATTEMPTS:
cand_keys_list.append(resp.result()["UnprocessedKeys"][self.name]["Keys"])
elif attempts_to_retrieve_keys >= MAX_ATTEMPTS:
failed_keys = resp.result()["UnprocessedKeys"][self.name]["Keys"]
error = Exception(
f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{failed_keys}\n from dynamodb\n{resp.result()}"
)
raise error
except Exception as e:
log.exception("Encountered issues retrieving data from DynamoDB")
raise e
attempts_to_retrieve_keys += 1
return items

Expand Down
7 changes: 6 additions & 1 deletion tron/serialize/runstate/statemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import copy
import itertools
import logging
import sys
import time
from contextlib import contextmanager
from typing import Dict
Expand Down Expand Up @@ -162,7 +163,11 @@ def restore(self, job_names, skip_validation=False):
for job_name, job_state in jobs.items()
}
for result in concurrent.futures.as_completed(results):
jobs[results[result]]["runs"] = result.result()
try:
jobs[results[result]]["runs"] = result.result()
except Exception:
log.exception(f"Unable to restore state for {results[result]} - exiting to avoid corrupting data.")
sys.exit(1)

state = {
runstate.JOB_STATE: jobs,
Expand Down

0 comments on commit 9707800

Please sign in to comment.