From 850c2808f2cf2c48140cec9b34b08ee52c32fded Mon Sep 17 00:00:00 2001 From: Zane Starr Date: Thu, 14 Mar 2024 19:53:47 -0700 Subject: [PATCH] fix: this refactors execution to properly use the proces pool Prior to this change the process pool was created and destroyed for every configuration, this would cause the cpu/memory to thrash and improperly allocate task to avaialble cpu, resulting in sometimes 25% utilization of available cpu resources The change corrects this, as well as tackles memory problems, by writing temporary results to disk and then reading them back at the end of the simulation. This is non configurable in this commit, and can also result in loading too much memory, as it does not include the ability to progressively or lazy load data into the final dataframe to complete the simulation. This commit is a WIP fixes #351 --- cadCAD/engine/execution.py | 107 ++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 48 deletions(-) diff --git a/cadCAD/engine/execution.py b/cadCAD/engine/execution.py index 97a5fa87..0dbac770 100644 --- a/cadCAD/engine/execution.py +++ b/cadCAD/engine/execution.py @@ -1,8 +1,11 @@ +import os from typing import Callable, Dict, List, Any, Tuple, Sequence -from pathos.multiprocessing import ProcessPool # type: ignore +from pathos.multiprocessing import ProcessPool # type: ignore from collections import Counter from cadCAD.types import * from cadCAD.utils import flatten +import tempfile +import pickle VarDictType = Dict[str, List[object]] StatesListsType = List[dict[str, object]] @@ -25,15 +28,14 @@ def single_proc_exec( configured_n: Sequence[N_Runs], additional_objs=None ) -> List: - - + if not isinstance(var_dict_list, Sequence): var_dict_list = list([var_dict_list]) raw_params = ( simulation_execs, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows, var_dict_list) - + results: List = [] print(f'Execution Mode: single_threaded') for raw_param in zip(*raw_params): @@ -44,6 +46,46 @@ def single_proc_exec( results.append(flatten(result)) return flatten(results) + +def process_executor(params): + if len_configs_structs > 1: + with ProcessPool(processes=len_configs_structs) as pp: + results = pp.map( + lambda t: t[0](t[1], t[2], t[3], t[4], t[5], + t[6], t[7], t[8], t[9], configured_n), params + ) + else: + t = params[0] + results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], + t[7], t[8], t[9], configured_n) + return results + + +def process_executor(params): + simulation_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n = params + + result = [simulation_exec( + var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n + )] + temp_file = tempfile.NamedTemporaryFile(delete=False) + with open(temp_file.name, 'wb') as f: # Note 'wb' for binary writing mode + pickle.dump(result, f) + return temp_file.name + + +def file_handler(filenames: List[str]) -> List: + combined_results = [] + for file_name in filenames: + with open(file_name, 'rb') as f: # Note 'rb' for binary reading mode + result = pickle.load(f) + combined_results.append(result) + result = None + f.close() + os.remove(file_name) # Clean up temporary file + del result # Delete the result from memory after processing + return combined_results + + def parallelize_simulations( simulation_execs: List[ExecutorFunction], var_dict_list: List[Parameters], @@ -61,50 +103,19 @@ def parallelize_simulations( ): print(f'Execution Mode: parallelized') - params = list( - zip( - simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, - Ts, SimIDs, Ns, SubsetIDs, SubsetWindows - ) - ) - len_configs_structs = len(configs_structs) + params = [ + (sim_exec, var_dict, states_list, config, env_processes, + T, sim_id, N, subset_id, subset_window, configured_n) + for sim_exec, var_dict, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window in + zip(simulation_execs, var_dict_list, states_lists, configs_structs, + env_processes_list, Ts, SimIDs, Ns, SubsetIDs, SubsetWindows) + ] - unique_runs = Counter(SimIDs) - sim_count = max(unique_runs.values()) - highest_divisor = int(len_configs_structs / sim_count) + with ProcessPool(maxtasksperchild=1) as pool: + temp_files = pool.map(process_executor, params) - new_configs_structs, new_params = [], [] - for count in range(len(params)): - if count == 0: - new_params.append( - params[count: highest_divisor] - ) - new_configs_structs.append( - configs_structs[count: highest_divisor] - ) - elif count > 0: - new_params.append( - params[count * highest_divisor: (count + 1) * highest_divisor] - ) - new_configs_structs.append( - configs_structs[count * highest_divisor: (count + 1) * highest_divisor] - ) - - def process_executor(params): - if len_configs_structs > 1: - with ProcessPool(processes=len_configs_structs) as pp: - results = pp.map( - lambda t: t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n), params - ) - else: - t = params[0] - results = t[0](t[1], t[2], t[3], t[4], t[5], t[6], t[7], t[8], t[9], configured_n) - return results - - results = flatten(list(map(lambda params: process_executor(params), new_params))) - - return results + return flatten(file_handler(temp_files)) def local_simulations( @@ -121,15 +132,15 @@ def local_simulations( SubsetWindows: List[SubsetWindow], configured_n: List[N_Runs], additional_objs=None - ): +): config_amt = len(configs_structs) - if config_amt == 1: # and configured_n != 1 + if config_amt == 1: # and configured_n != 1 return single_proc_exec( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs ) - elif config_amt > 1: # and configured_n != 1 + elif config_amt > 1: # and configured_n != 1 return parallelize_simulations( simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n, additional_objs