-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DO NOT MERGE] Parallel execution #259
base: master
Are you sure you want to change the base?
Changes from 8 commits
472ea51
14164f6
97a8116
20fa0ba
54cac5f
5e51beb
1a1c7e9
45fbc57
2555e2e
417baa8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -89,6 +89,8 @@ def __init__(self): | |
# list of pre/post ID arrays that are not needed in device memory | ||
self.delete_synaptic_pre = {} | ||
self.delete_synaptic_post = {} | ||
# dictionary to store parallalelization information | ||
self.stream_info = {} | ||
# The following nested dictionary collects all codeobjects that use random | ||
# number generation (RNG). | ||
self.codeobjects_with_rng = { | ||
|
@@ -359,6 +361,7 @@ def code_object(self, owner, name, abstract_code, variables, template_name, | |
template_kwds["sm_multiplier"] = prefs["devices.cuda_standalone.SM_multiplier"] | ||
template_kwds["syn_launch_bounds"] = prefs["devices.cuda_standalone.syn_launch_bounds"] | ||
template_kwds["calc_occupancy"] = prefs["devices.cuda_standalone.calc_occupancy"] | ||
template_kwds["stream_info"] = self.stream_info | ||
if template_name in ["threshold", "spikegenerator"]: | ||
template_kwds["extra_threshold_kernel"] = prefs["devices.cuda_standalone.extra_threshold_kernel"] | ||
codeobj = super(CUDAStandaloneDevice, self).code_object(owner, name, abstract_code, variables, | ||
|
@@ -374,7 +377,7 @@ def check_openmp_compatible(self, nb_threads): | |
if nb_threads > 0: | ||
raise NotImplementedError("Using OpenMP in a CUDA standalone project is not supported") | ||
|
||
def generate_objects_source(self, writer, arange_arrays, synapses, static_array_specs, networks): | ||
def generate_objects_source(self, writer, arange_arrays, synapses, static_array_specs, networks,stream_info): | ||
sm_multiplier = prefs.devices.cuda_standalone.SM_multiplier | ||
num_parallel_blocks = prefs.devices.cuda_standalone.parallel_blocks | ||
curand_generator_type = prefs.devices.cuda_standalone.random_number_generator_type | ||
|
@@ -393,6 +396,9 @@ def generate_objects_source(self, writer, arange_arrays, synapses, static_array_ | |
for syn in synapses: | ||
if syn.multisynaptic_index is not None: | ||
multisyn_vars.append(syn.variables[syn.multisynaptic_index]) | ||
# get number of unique streams | ||
|
||
num_stream = max(Counter(stream_info).values()) | ||
arr_tmp = self.code_object_class().templater.objects( | ||
None, None, | ||
array_specs=self.arrays, | ||
|
@@ -415,7 +421,9 @@ def generate_objects_source(self, writer, arange_arrays, synapses, static_array_ | |
eventspace_arrays=self.eventspace_arrays, | ||
spikegenerator_eventspaces=self.spikegenerator_eventspaces, | ||
multisynaptic_idx_vars=multisyn_vars, | ||
profiled_codeobjects=self.profiled_codeobjects) | ||
profiled_codeobjects=self.profiled_codeobjects, | ||
parallelize=True, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should become a preference later on. Just putting it here as TODO, not to forget. |
||
stream_size=num_stream) | ||
# Reinsert deleted entries, in case we use self.arrays later? maybe unnecassary... | ||
self.arrays.update(self.eventspace_arrays) | ||
writer.write('objects.*', arr_tmp) | ||
|
@@ -445,7 +453,8 @@ def generate_main_source(self, writer): | |
# For codeobjects run every tick, this happens in the init() of | ||
# the random number buffer called at first clock cycle of the network | ||
main_lines.append('random_number_buffer.ensure_enough_curand_states();') | ||
main_lines.append(f'_run_{codeobj.name}();') | ||
# add stream - default | ||
main_lines.append(f'_run_{codeobj.name}(0);') | ||
elif func == 'after_run_code_object': | ||
codeobj, = args | ||
main_lines.append(f'_after_run_{codeobj.name}();') | ||
|
@@ -986,10 +995,14 @@ def generate_network_source(self, writer): | |
maximum_run_time = self._maximum_run_time | ||
if maximum_run_time is not None: | ||
maximum_run_time = float(maximum_run_time) | ||
num_stream = max(Counter(self.stream_info).values()) | ||
network_tmp = self.code_object_class().templater.network(None, None, | ||
maximum_run_time=maximum_run_time, | ||
eventspace_arrays=self.eventspace_arrays, | ||
spikegenerator_eventspaces=self.spikegenerator_eventspaces) | ||
spikegenerator_eventspaces=self.spikegenerator_eventspaces, | ||
parallelize = True, | ||
stream_info = self.stream_info, | ||
num_stream= num_stream) | ||
writer.write('network.*', network_tmp) | ||
|
||
def generate_synapses_classes_source(self, writer): | ||
|
@@ -1310,7 +1323,7 @@ def build(self, directory='output', | |
|
||
self.generate_objects_source(self.writer, self.arange_arrays, | ||
net_synapses, self.static_array_specs, | ||
self.networks) | ||
self.networks, self.stream_info) | ||
self.generate_network_source(self.writer) | ||
self.generate_synapses_classes_source(self.writer) | ||
self.generate_run_source(self.writer) | ||
|
@@ -1382,6 +1395,26 @@ def network_run(self, net, duration, report=None, report_period=10*second, | |
self.clocks.update(net._clocks) | ||
net.t_ = float(t_end) | ||
|
||
|
||
# Create dictionary for parallelisation with stream | ||
streams_organization = defaultdict(list) | ||
for obj in net.sorted_objects: | ||
streams_organization[(obj.when, obj.order)].append(obj) | ||
|
||
# associate each code object with a particular stream | ||
streams_details = defaultdict(list) | ||
count = 0 | ||
for key in streams_organization: | ||
for object in streams_organization[key]: | ||
streams_details[object.name] = count | ||
count +=1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As discussed, lets make the default 0. Or do we even need a default? Can't we just pass |
||
|
||
self.stream_info = streams_details | ||
stream_num_max = max(self.stream_info.values()) | ||
self.stream_info['default'] = stream_num_max+1 | ||
|
||
|
||
|
||
# TODO: remove this horrible hack | ||
for clock in self.clocks: | ||
if clock.name=='clock': | ||
|
@@ -1516,11 +1549,21 @@ def network_run(self, net, duration, report=None, report_period=10*second, | |
|
||
# create all random numbers needed for the next clock cycle | ||
for clock in net._clocks: | ||
run_lines.append(f'{net.name}.add(&{clock.name}, _run_random_number_buffer);') | ||
run_lines.append(f'{net.name}.add(&{clock.name}, _run_random_number_buffer, {self.stream_info["default"]});') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The random number buffer is a special case. It is not generated from For context: The random number buffer has a fixed size of memory on the GPU (which can be controlled via preference). It generates random number from the host, knowing how many random numbers the kernels will require. The kernels then use this data for multiple time steps (where the Each random number generation call should generate enough random numbers to occupy the entire GPU. So no need for concurrent kernel execution here at all. |
||
|
||
all_clocks = set() | ||
# TODO add for every code object -> add where in the list are there. | ||
# TODO create new dic (code object, position in list) | ||
for clock, codeobj in code_objects: | ||
run_lines.append(f'{net.name}.add(&{clock.name}, _run_{codeobj.name});') | ||
# add this position as additional number here | ||
# check if codeobj.name has _codeobject in it | ||
name = codeobj.name | ||
if "_codeobject" in codeobj.name: | ||
name = codeobj.name[:-11] | ||
if name in self.stream_info.keys(): | ||
run_lines.append(f'{net.name}.add(&{clock.name}, _run_{codeobj.name}, {self.stream_info[name]});') | ||
else: | ||
run_lines.append(f'{net.name}.add(&{clock.name}, _run_{codeobj.name}, {self.stream_info["default"]});') | ||
all_clocks.add(clock) | ||
|
||
# Under some rare circumstances (e.g. a NeuronGroup only defining a | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -155,7 +155,7 @@ _run_kernel_{{codeobj_name}}( | |||||
{% endblock kernel %} | ||||||
|
||||||
|
||||||
void _run_{{codeobj_name}}() | ||||||
void _run_{{codeobj_name}}(cudaStream_t stream) | ||||||
{ | ||||||
using namespace brian; | ||||||
|
||||||
|
@@ -292,7 +292,7 @@ void _run_{{codeobj_name}}() | |||||
{% endblock %} | ||||||
|
||||||
{% block kernel_call %} | ||||||
_run_kernel_{{codeobj_name}}<<<num_blocks, num_threads>>>( | ||||||
_run_kernel_{{codeobj_name}}<<<num_blocks, num_threads,0,stream>>>( | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please stick to the code formatting in the files
Suggested change
|
||||||
_N, | ||||||
num_threads, | ||||||
///// HOST_PARAMETERS ///// | ||||||
|
@@ -326,7 +326,7 @@ void _run_{{codeobj_name}}() | |||||
#ifndef _INCLUDED_{{codeobj_name}} | ||||||
#define _INCLUDED_{{codeobj_name}} | ||||||
|
||||||
void _run_{{codeobj_name}}(); | ||||||
void _run_{{codeobj_name}}(cudaStream_t); | ||||||
|
||||||
{% block extra_functions_h %} | ||||||
{% endblock %} | ||||||
|
@@ -362,7 +362,7 @@ void _after_run_{{codeobj_name}}() | |||||
} | ||||||
{% endmacro %} | ||||||
|
||||||
|
||||||
// {{codeobj_name}} | ||||||
{% macro after_run_h_file() %} | ||||||
#ifndef _INCLUDED_{{codeobj_name}}_after | ||||||
#define _INCLUDED_{{codeobj_name}}_affer | ||||||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,23 +14,33 @@ | |||||
|
||||||
double Network::_last_run_time = 0.0; | ||||||
double Network::_last_run_completed_fraction = 0.0; | ||||||
{% if parallelize %} | ||||||
cudaStream_t custom_stream[{{num_stream}}]; | ||||||
{% endif %} | ||||||
|
||||||
Network::Network() | ||||||
{ | ||||||
t = 0.0; | ||||||
{% if parallelize %} | ||||||
for(int i=0;i<{{num_stream}};i++){ | ||||||
CUDA_SAFE_CALL(cudaStreamCreate(&(custom_stream[i]))); | ||||||
} | ||||||
{% endif %} | ||||||
} | ||||||
|
||||||
void Network::clear() | ||||||
{ | ||||||
objects.clear(); | ||||||
} | ||||||
|
||||||
void Network::add(Clock *clock, codeobj_func func) | ||||||
// TODO have to makr change in objects - make it a tuple | ||||||
// make decision which bject has which stream | ||||||
void Network::add(Clock *clock, codeobj_func func, int group_num) | ||||||
{ | ||||||
#if defined(_MSC_VER) && (_MSC_VER>=1700) | ||||||
objects.push_back(std::make_pair(std::move(clock), std::move(func))); | ||||||
objects.push_back(std::make_tuple(std::move(clock), std::move(func), std::move(group_num))); | ||||||
#else | ||||||
objects.push_back(std::make_pair(clock, func)); | ||||||
objects.push_back(std::make_tuple(clock, func, group_num)); | ||||||
#endif | ||||||
} | ||||||
|
||||||
|
@@ -56,7 +66,7 @@ void Network::run(const double duration, void (*report_func)(const double, const | |||||
Clock* clock = next_clocks(); | ||||||
double elapsed_realtime; | ||||||
bool did_break_early = false; | ||||||
|
||||||
//TODO here | ||||||
while(clock && clock->running()) | ||||||
{ | ||||||
t = clock->t[0]; | ||||||
|
@@ -73,17 +83,41 @@ void Network::run(const double duration, void (*report_func)(const double, const | |||||
next_report_time += report_period; | ||||||
} | ||||||
} | ||||||
Clock *obj_clock = objects[i].first; | ||||||
// TODO tuple of clock and function | ||||||
//Clock *obj_clock = objects[i].first; | ||||||
Clock *obj_clock = std::get<0>(objects[i]); | ||||||
int group_int = std::get<2>(objects[i]); | ||||||
// Only execute the object if it uses the right clock for this step | ||||||
if (curclocks.find(obj_clock) != curclocks.end()) | ||||||
{ | ||||||
codeobj_func func = objects[i].second; | ||||||
// function -> whixh is in templates like common_group.cu | ||||||
// sort the code object - waiting mechanism between groups | ||||||
// cudaEvent or cudaSynchronise | ||||||
//codeobj_func func = objects[i].second; | ||||||
codeobj_func func = std::get<1>(objects[i]); | ||||||
int func_group_int = std::get<2>(objects[i]); | ||||||
if (func) // code objects can be NULL in cases where we store just the clock | ||||||
{ | ||||||
func(); | ||||||
func_groups[func_group_int].push_back(func); | ||||||
//func_groups.push_back(std::make_pair(func_group_int,func)); | ||||||
//func(); | ||||||
// [[func1,func2,func3],[func4...]] | ||||||
} | ||||||
} | ||||||
} | ||||||
|
||||||
// get maximum in objects.cu array | ||||||
|
||||||
// go through each list of func group - 2 loops | ||||||
for(int i=0; i<func_groups.size(); i++){ | ||||||
for(int j=0; j<func_groups.size(); j++){ | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The second loop is wrong:
Suggested change
|
||||||
codeobj_func func = func_groups[i][j]; | ||||||
func(custom_stream[j]); | ||||||
} | ||||||
// reset the func group for that sub stream | ||||||
func_groups.resize(0); | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. After each function group, you need to synchronize host and device. Check the documentation if |
||||||
} | ||||||
|
||||||
for(std::set<Clock*>::iterator i=curclocks.begin(); i!=curclocks.end(); i++) | ||||||
(*i)->tick(); | ||||||
clock = next_clocks(); | ||||||
|
@@ -129,7 +163,8 @@ void Network::compute_clocks() | |||||
clocks.clear(); | ||||||
for(int i=0; i<objects.size(); i++) | ||||||
{ | ||||||
Clock *clock = objects[i].first; | ||||||
Clock *clock = std::get<0>(objects[i]); | ||||||
// Clock *clock = std::get<0>()objects[i].first; | ||||||
clocks.insert(clock); | ||||||
} | ||||||
} | ||||||
|
@@ -174,22 +209,30 @@ Clock* Network::next_clocks() | |||||
#include <ctime> | ||||||
#include "brianlib/clocks.h" | ||||||
|
||||||
typedef void (*codeobj_func)(); | ||||||
typedef void (*codeobj_func)(cudaStream_t); | ||||||
|
||||||
class Network | ||||||
{ | ||||||
std::set<Clock*> clocks, curclocks; | ||||||
void compute_clocks(); | ||||||
Clock* next_clocks(); | ||||||
public: | ||||||
std::vector< std::pair< Clock*, codeobj_func > > objects; | ||||||
// TODO vectory of tuples having clock , codeobj_func and stread integer | ||||||
std::vector< std::tuple< Clock*, codeobj_func, int > > objects; | ||||||
//std::vector< std::pair< Clock*, codeobj_func > > objects; | ||||||
std::vector<std::vector<codeobj_func >> func_groups = std::vector<std::vector<codeobj_func >>({{num_stream}}); | ||||||
//std::vector<std::pair< int, codeobj_func >> func_groups; | ||||||
double t; | ||||||
static double _last_run_time; | ||||||
static double _last_run_completed_fraction; | ||||||
int num_streams; | ||||||
{% if parallelize %} | ||||||
cudaStream_t custom_stream[{{num_stream}}]; | ||||||
{% endif %} | ||||||
|
||||||
Network(); | ||||||
void clear(); | ||||||
void add(Clock *clock, codeobj_func func); | ||||||
void add(Clock *clock, codeobj_func func, int num_streams); | ||||||
void run(const double duration, void (*report_func)(const double, const double, const double, const double), const double report_period); | ||||||
}; | ||||||
|
||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.