Skip to content

Commit

Permalink
Fix single_proc + add support for single_proc for multi-runs & sweeps (
Browse files Browse the repository at this point in the history
…#315)

* fix tests + rm simulations/ folder

* add types.py

* single run / multi mc is ok

* fix for single run / single param

* add support for single proc runs

---------

Co-authored-by: Emanuel Lima <[email protected]>
  • Loading branch information
danlessa and emanuellima1 committed Dec 14, 2023
1 parent 0a291e8 commit 1b3241e
Show file tree
Hide file tree
Showing 6 changed files with 377 additions and 91 deletions.
52 changes: 33 additions & 19 deletions cadCAD/configuration/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import pandas as pd
import numpy as np
import pandas as pd # type: ignore
from datetime import datetime, timedelta
from collections import Counter
from copy import deepcopy
from functools import reduce
from funcy import curry
from funcy import curry # type: ignore
from cadCAD.types import *
from typing import Union, Dict, List

from cadCAD.configuration.utils.depreciationHandler import sanitize_partial_state_updates
from cadCAD.utils import dict_filter, contains_type, flatten_tabulated_dict, tabulate_dict
Expand Down Expand Up @@ -161,27 +162,40 @@ def env_update(state_dict, sweep_dict, target_value):
curry(trigger)(end_substep)(trigger_field)(trigger_vals)(funct_list)


def config_sim(d):
def process_variables(d):
return flatten_tabulated_dict(tabulate_dict(d))
def config_sim(config_dict: ConfigurationDict):

if "N" in d:
if d["N"] <= 0:
if "N" in config_dict:
if config_dict["N"] <= 0:
raise ValueError("'N' must be > 0")
else:
pass
else:
raise KeyError("The 'sim_configs' dictionary must contain the key 'N'")

if "T" not in d:
raise KeyError("The 'sim_configs' dictionary must contain the key 'T'")
raise KeyError("The 'sim_configs' dictionary must contain the key 'N' (# of Monte Carlo Runs)")

if "M" in d:
M_lengths = len(list(set({key: len(value) for key, value in d["M"].items()}.values())))
if M_lengths > 2:
raise Exception('`M` values require up to a maximum of 2 distinct lengths')
return [{"N": d["N"], "T": d["T"], "M": M} for M in process_variables(d["M"])]
if "T" not in config_dict:
raise KeyError("The 'sim_configs' dictionary must contain the key 'T' (Timestep Iterator)")
else:
d["M"] = [{}]
return d
if "M" in config_dict:
params = config_dict['M']

param_values_length = {key: len(value) if type(value) == list else 0
for key, value in params.items()}
param_values_length_set = set(param_values_length.values())
distinct_param_value_lengths = len(param_values_length_set)

if distinct_param_value_lengths > 2:
raise Exception('When sweeping, `M` list lengths should either be 1 and/or equal. More than two distinct lengths are not allowed')
elif (distinct_param_value_lengths == 1) and (0 in param_values_length_set):
return config_dict
elif (1 in param_values_length_set):
return [{**config_dict, "M": M}
for M in flatten_tabulated_dict(tabulate_dict(params))]
else:
raise Exception('When sweeping, `M` list lengths should either be 1 and/or equal. ')

else:
config_dict["M"] = [{}]
return config_dict


def psub_list(psu_block, psu_steps):
Expand Down
77 changes: 51 additions & 26 deletions cadCAD/engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from cadCAD.configuration.utils import TensorFieldReport, configs_as_objs, configs_as_dicts
from cadCAD.engine.simulation import Executor as SimExecutor
from cadCAD.engine.execution import single_proc_exec, parallelize_simulations, local_simulations
from cadCAD.types import *

VarDictType = Dict[str, List[Any]]
StatesListsType = List[Dict[str, Any]]
Expand All @@ -24,6 +25,17 @@ class ExecutionMode:
multi_proc = 'multi_proc'


def auto_mode_switcher(config_amt: int):
try:
if config_amt == 1:
return ExecutionMode.single_mode, single_proc_exec
elif (config_amt > 1):
return ExecutionMode.multi_mode, parallelize_simulations
except AttributeError:
if config_amt < 1:
raise ValueError('N must be >= 1!')


class ExecutionContext:
def __init__(self, context=ExecutionMode.local_mode, method=None, additional_objs=None) -> None:
self.name = context
Expand All @@ -39,15 +51,15 @@ def distroduce_proc(
ExpIDs,
SubsetIDs,
SubsetWindows,
configured_n, # exec_method,
configured_n, # exec_method,
sc, additional_objs=additional_objs
):
return method(
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs,
SubsetIDs,
SubsetWindows,
configured_n, # exec_method,
configured_n, # exec_method,
sc, additional_objs
)

Expand All @@ -56,8 +68,8 @@ def distroduce_proc(

class Executor:
def __init__(self,
exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False
) -> None:
exec_context: ExecutionContext, configs: List[Configuration], sc=None, empty_return=False
) -> None:
self.sc = sc
self.SimExecutor = SimExecutor
self.exec_method = exec_context.method
Expand All @@ -70,7 +82,8 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]:
return [], [], []

config_proc = Processor()
create_tensor_field = TensorFieldReport(config_proc).create_tensor_field
create_tensor_field = TensorFieldReport(
config_proc).create_tensor_field

sessions = []
var_dict_list, states_lists = [], []
Expand Down Expand Up @@ -105,18 +118,30 @@ def execute(self) -> Tuple[Any, Any, Dict[str, Any]]:
var_dict_list.append(x.sim_config['M'])
states_lists.append([x.initial_state])
eps.append(list(x.exogenous_states.values()))
configs_structs.append(config_proc.generate_config(x.initial_state, x.partial_state_update_blocks, eps[config_idx]))
configs_structs.append(config_proc.generate_config(
x.initial_state, x.partial_state_update_blocks, eps[config_idx]))
env_processes_list.append(x.env_processes)
partial_state_updates.append(x.partial_state_update_blocks)
sim_executors.append(SimExecutor(x.policy_ops).simulation)

config_idx += 1

def get_final_dist_results(simulations, psus, eps, sessions):
tensor_fields = [create_tensor_field(psu, ep) for psu, ep in list(zip(psus, eps))]
remote_threshold = 100
config_amt = len(self.configs)

def get_final_dist_results(simulations: List[StateHistory],
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict]):
tensor_fields = [create_tensor_field(
psu, ep) for psu, ep in list(zip(psus, eps))]
return simulations, tensor_fields, sessions

def get_final_results(simulations, psus, eps, sessions, remote_threshold):
def get_final_results(simulations: List[StateHistory],
psus: List[StateUpdateBlocks],
eps,
sessions: List[SessionDict],
remote_threshold: int):
flat_timesteps, tensor_fields = [], []
for sim_result, psu, ep in list(zip(simulations, psus, eps)):
flat_timesteps.append(flatten(sim_result))
Expand All @@ -128,40 +153,40 @@ def get_final_results(simulations, psus, eps, sessions, remote_threshold):
elif config_amt > 1:
return flat_simulations, tensor_fields, sessions

remote_threshold = 100
config_amt = len(self.configs)

def auto_mode_switcher(config_amt):
try:
if config_amt == 1:
return ExecutionMode.single_mode, single_proc_exec
elif (config_amt > 1):
return ExecutionMode.multi_mode, parallelize_simulations
except AttributeError:
if config_amt < 1:
raise ValueError('N must be >= 1!')

final_result = None
original_N = len(configs_as_dicts(self.configs))
if self.exec_context != ExecutionMode.distributed:
# Consider Legacy Support
if self.exec_context != ExecutionMode.local_mode:
self.exec_context, self.exec_method = auto_mode_switcher(config_amt)
if self.exec_context == ExecutionMode.local_mode:
self.exec_context, self.exec_method = auto_mode_switcher(
config_amt)
elif self.exec_context == ExecutionMode.single_mode or self.exec_context == ExecutionMode.single_proc:
self.exec_context, self.exec_method = ExecutionMode.single_mode, single_proc_exec
elif self.exec_context == ExecutionMode.multi_mode or self.exec_context == ExecutionMode.multi_proc:
if config_amt == 1:
raise ValueError("Multi mode must have at least 2 configs")
else:
self.exec_context, self.exec_method = ExecutionMode.multi_mode, parallelize_simulations
else:
raise ValueError("Invalid execution mode specified")


print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts, SimIDs, RunIDs,
ExpIDs, SubsetIDs, SubsetWindows, original_N
)

final_result = get_final_results(simulations_results, partial_state_updates, eps, sessions, remote_threshold)
final_result = get_final_results(
simulations_results, partial_state_updates, eps, sessions, remote_threshold)
elif self.exec_context == ExecutionMode.distributed:
print("Execution Method: " + self.exec_method.__name__)
simulations_results = self.exec_method(
sim_executors, var_dict_list, states_lists, configs_structs, env_processes_list, Ts,
SimIDs, RunIDs, ExpIDs, SubsetIDs, SubsetWindows, original_N, self.sc
)
final_result = get_final_dist_results(simulations_results, partial_state_updates, eps, sessions)
final_result = get_final_dist_results(
simulations_results, partial_state_updates, eps, sessions)

t2 = time()
print(f"Total execution time: {t2 - t1 :.2f}s")
Expand Down
91 changes: 48 additions & 43 deletions cadCAD/engine/execution.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Callable, Dict, List, Any, Tuple
from pathos.multiprocessing import ProcessPool as PPool
from pathos.multiprocessing import ProcessPool as PPool # type: ignore
from collections import Counter

from cadCAD.types import *
from cadCAD.utils import flatten

VarDictType = Dict[str, List[Any]]
Expand All @@ -11,46 +11,54 @@


def single_proc_exec(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
SimIDs,
Ns: List[int],
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
states_lists: List[StateHistory],
configs_structs: List[StateUpdateBlocks],
env_processes_list: List[EnvProcesses],
Ts: List[TimeSeq],
SimIDs: List[SimulationID],
Ns: List[Run],
ExpIDs: List[int],
SubsetIDs,
SubsetWindows,
configured_n
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
):

# HACK for making it run with N_Runs=1
if type(var_dict_list) == list:
var_dict_list = var_dict_list[0]

print(f'Execution Mode: single_threaded')
params = [
raw_params: List[List] = [
simulation_execs, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, SubsetIDs, SubsetWindows
]
simulation_exec, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window = list(
map(lambda x: x.pop(), params)
map(lambda x: x.pop(), raw_params)
)
result = simulation_exec(
var_dict_list, states_list, config, env_processes, T, sim_id, N, subset_id, subset_window, configured_n
)
return flatten(result)





def parallelize_simulations(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
SimIDs,
Ns: List[int],
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
states_lists: List[StateHistory],
configs_structs: List[StateUpdateBlocks],
env_processes_list: List[EnvProcesses],
Ts: List[TimeSeq],
SimIDs: List[SimulationID],
Ns: List[Run],
ExpIDs: List[int],
SubsetIDs,
SubsetWindows,
configured_n
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
):

print(f'Execution Mode: parallelized')
Expand Down Expand Up @@ -104,32 +112,29 @@ def process_executor(params):


def local_simulations(
simulation_execs: List[Callable],
var_dict_list: List[VarDictType],
states_lists: List[StatesListsType],
configs_structs: List[ConfigsType],
env_processes_list: List[EnvProcessesType],
Ts: List[range],
SimIDs,
Ns: List[int],
ExpIDs: List[int],
SubsetIDs,
SubsetWindows,
configured_n
simulation_execs: List[ExecutorFunction],
var_dict_list: List[Parameters],
states_lists: List[StateHistory],
configs_structs: List[StateUpdateBlocks],
env_processes_list: List[EnvProcesses],
Ts: List[TimeSeq],
SimIDs: List[SimulationID],
Ns: List[Run],
ExpIDs: List[int],
SubsetIDs: List[SubsetID],
SubsetWindows: List[SubsetWindow],
configured_n: List[N_Runs]
):
config_amt = len(configs_structs)

_params = None
if config_amt == 1: # and configured_n != 1
_params = var_dict_list[0]
return single_proc_exec(
simulation_execs, _params, states_lists, configs_structs, env_processes_list,
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n
)
elif config_amt > 1: # and configured_n != 1
_params = var_dict_list
return parallelize_simulations(
simulation_execs, _params, states_lists, configs_structs, env_processes_list,
simulation_execs, var_dict_list, states_lists, configs_structs, env_processes_list,
Ts, SimIDs, Ns, ExpIDs, SubsetIDs, SubsetWindows, configured_n
)
# elif config_amt > 1 and configured_n == 1:
Loading

0 comments on commit 1b3241e

Please sign in to comment.