Skip to content

Commit

Permalink
Merge pull request idaholab#16133 from WilkAndy/explore_threading_pro…
Browse files Browse the repository at this point in the history
…blem_16100

Geochemistry: use static to fix threading problem
  • Loading branch information
loganharbour authored Nov 25, 2020
2 parents 97bdd72 + 7c35b3c commit 38dfb7f
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class GeochemistrySpatialReactor : public GeochemistryReactorBase
GeochemistrySpatialReactor(const InputParameters & parameters);
virtual void initialize() override;

/// the main-thread information is used to set the other-thread information in finalize()
virtual void finalize() override;
virtual void threadJoin(const UserObject & uo) override;

Expand All @@ -48,7 +49,7 @@ class GeochemistrySpatialReactor : public GeochemistryReactorBase
const unsigned _num_kin;
/// ModelGeochemicalDatabase at each node.
std::vector<ModelGeochemicalDatabase> _mgd_at_node;
/// ModelGeochemicalDatabase at each node
/// GeochemicalSystem at each node
std::vector<GeochemicalSystem> _egs_at_node;
/// GeochemicalSystem into which the nodal GeochemicalSystem is copied to enable recovery during adaptive timestepping
GeochemicalSystem _egs_copy;
Expand All @@ -70,7 +71,7 @@ class GeochemistrySpatialReactor : public GeochemistryReactorBase
const std::vector<Real> _remove_fixed_activity_time;
/// Number of elements in the vector _remove_fixed_activity_name;
const unsigned _num_removed_fixed;
/// Whether the activity or activity constraint has been removed
/// Whether the activity or activity constraint has been removed at each node
std::vector<std::vector<bool>> _removed_fixed_activity;
/// Names of the species with controlled activity or fugacity
const std::vector<std::string> _controlled_activity_species_names;
Expand Down Expand Up @@ -98,6 +99,8 @@ class GeochemistrySpatialReactor : public GeochemistryReactorBase
const Real _dt_dec;
/// value to multiply dt my in the case of a successful solve
const Real _dt_inc;
/// number of threads used to execute this UserObject
unsigned _nthreads;

/// Build the _my_node_number map
void buildMyNodeNumber();
Expand Down
66 changes: 50 additions & 16 deletions modules/geochemistry/src/userobjects/GeochemistrySpatialReactor.C
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ GeochemistrySpatialReactor::GeochemistrySpatialReactor(const InputParameters & p
_initial_temperature(getParam<Real>("initial_temperature")),
_temperature(coupledValue("temperature")),
_num_kin(_mgd.kin_species_name.size()),
// NOTE: initialize _mgd_at_node before the swaps are performed
_mgd_at_node(_num_my_nodes, _mgd),
_egs_at_node(),
// NOTE: the following implements the swaps in _mgd
Expand Down Expand Up @@ -161,7 +162,8 @@ GeochemistrySpatialReactor::GeochemistrySpatialReactor(const InputParameters & p
_adaptive_timestepping(getParam<bool>("adaptive_timestepping")),
_dt_min(_adaptive_timestepping ? getParam<Real>("dt_min") : std::numeric_limits<Real>::max()),
_dt_dec(getParam<Real>("dt_dec")),
_dt_inc(getParam<Real>("dt_inc"))
_dt_inc(getParam<Real>("dt_inc")),
_nthreads(1)
{
// build _egs_at_node
for (unsigned i = 0; i < _num_my_nodes; ++i)
Expand Down Expand Up @@ -281,6 +283,7 @@ GeochemistrySpatialReactor::initialize()
{
GeochemistryReactorBase::initialize();
_execute_done.assign(_num_my_nodes, false);
_nthreads = 1;
}

void
Expand All @@ -292,6 +295,8 @@ GeochemistrySpatialReactor::execute()
const unsigned my_node_number = _my_node_number.at(_current_node->id());

const unsigned aux_comp_number = 0; // component number to use for AuxVariables
const ModelGeochemicalDatabase & mgd_ref =
_egs_at_node[my_node_number].getModelGeochemicalDatabase();

// close system
if (!_closed_system && _t >= _close_system_at_time)
Expand All @@ -302,9 +307,9 @@ GeochemistrySpatialReactor::execute()
{
if (!_removed_fixed_activity[my_node_number][i] && _t >= _remove_fixed_activity_time[i])
{
if (_mgd_at_node[my_node_number].basis_species_index.count(_remove_fixed_activity_name[i]))
if (mgd_ref.basis_species_index.count(_remove_fixed_activity_name[i]))
_egs_at_node[my_node_number].changeConstraintToBulk(
_mgd_at_node[my_node_number].basis_species_index.at(_remove_fixed_activity_name[i]));
mgd_ref.basis_species_index.at(_remove_fixed_activity_name[i]));
_removed_fixed_activity[my_node_number][i] = true;
}
}
Expand All @@ -314,11 +319,10 @@ GeochemistrySpatialReactor::execute()
{
const std::vector<GeochemicalSystem::ConstraintMeaningEnum> & cm =
_egs_at_node[my_node_number].getConstraintMeaning();
if (_mgd_at_node[my_node_number].basis_species_index.count(
_controlled_activity_species_names[ca]))
if (mgd_ref.basis_species_index.count(_controlled_activity_species_names[ca]))
{
const unsigned basis_ind = _mgd_at_node[my_node_number].basis_species_index.at(
_controlled_activity_species_names[ca]);
const unsigned basis_ind =
mgd_ref.basis_species_index.at(_controlled_activity_species_names[ca]);
if (cm[basis_ind] == GeochemicalSystem::ConstraintMeaningEnum::ACTIVITY ||
cm[basis_ind] == GeochemicalSystem::ConstraintMeaningEnum::FUGACITY)
_egs_at_node[my_node_number].setConstraintValue(
Expand Down Expand Up @@ -348,23 +352,20 @@ GeochemistrySpatialReactor::execute()
for (unsigned i = 0; i < _num_source_species; ++i)
{
const Real this_rate = (*_source_species_rates[i])[aux_comp_number];
if (_mgd_at_node[my_node_number].basis_species_index.count(_source_species_names[i]))
if (mgd_ref.basis_species_index.count(_source_species_names[i]))
{
const unsigned basis_ind =
_mgd_at_node[my_node_number].basis_species_index.at(_source_species_names[i]);
const unsigned basis_ind = mgd_ref.basis_species_index.at(_source_species_names[i]);
_mole_rates(basis_ind) += this_rate;
}
else if (_mgd_at_node[my_node_number].eqm_species_index.count(_source_species_names[i]))
else if (mgd_ref.eqm_species_index.count(_source_species_names[i]))
{
const unsigned eqm_j =
_mgd_at_node[my_node_number].eqm_species_index.at(_source_species_names[i]);
const unsigned eqm_j = mgd_ref.eqm_species_index.at(_source_species_names[i]);
for (unsigned basis_ind = 0; basis_ind < _num_basis; ++basis_ind)
_mole_rates(basis_ind) +=
_mgd_at_node[my_node_number].eqm_stoichiometry(eqm_j, basis_ind) * this_rate;
_mole_rates(basis_ind) += mgd_ref.eqm_stoichiometry(eqm_j, basis_ind) * this_rate;
}
else
{
const unsigned kin_ind = _mgd.kin_species_index.at(_source_species_names[i]);
const unsigned kin_ind = mgd_ref.kin_species_index.at(_source_species_names[i]);
_mole_rates(_num_basis + kin_ind) += this_rate;
}
}
Expand Down Expand Up @@ -421,10 +422,24 @@ GeochemistrySpatialReactor::execute()
void
GeochemistrySpatialReactor::threadJoin(const UserObject & uo)
{
_nthreads += 1;
const GeochemistrySpatialReactor & gsr = static_cast<const GeochemistrySpatialReactor &>(uo);
for (unsigned i = 0; i < _num_my_nodes; ++i)
{
if (!_execute_done[i] && gsr._execute_done[i])
{
_solver_output[i].str("");
_solver_output[i] << gsr._solver_output[i].str();
_tot_iter[i] = gsr._tot_iter[i];
_abs_residual[i] = gsr._abs_residual[i];
_mole_additions[i] = gsr._mole_additions[i];
_egs_at_node[i] = gsr._egs_at_node[i];
_removed_fixed_activity[i] = gsr._removed_fixed_activity[i];
// _mgd_at_node does not need to be threadJoined, because _egs_at_node[i] =
// gsr._egs_at_node[i] uses the copy-assignment operator to copy the data in
// _egs_at_node[i]._mgd
}
}
}

void
Expand All @@ -434,6 +449,25 @@ GeochemistrySpatialReactor::finalize()
// if relevant, record that system is closed
if (!_closed_system && _t >= _close_system_at_time)
_closed_system = true;
// ensure that the non-main threads have the main-thread's copy of _egs_at_node (and hence
// _mgd_at_node) and _removed_fixed_activity, since the main-thread's copy has correctly gathered
// all the information during threadJoin
for (unsigned thrd = 1; thrd < _nthreads; ++thrd)
{
std::vector<GeochemistrySpatialReactor *> objects;
_fe_problem.theWarehouse()
.query()
.condition<AttribSystem>("UserObject")
.condition<AttribThread>(thrd)
.condition<AttribName>(name())
.queryInto(objects);
mooseAssert(objects.size() == 1,
"GeochemistrySpatialReactor::finalize() failed to obtain a single thread copy of "
"the GeochemistrySpatialReactor");
objects[0]->_removed_fixed_activity = _removed_fixed_activity;
objects[0]->_egs_at_node = _egs_at_node;
objects[0]->_closed_system = _closed_system;
}
}

void
Expand Down
2 changes: 0 additions & 2 deletions modules/geochemistry/test/tests/spatial_reactor/tests
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@
requirement = 'Spatially-dependent reaction systems may be multi-threaded'
issues = '#15693'
design = 'GeochemistrySpatialReactor.md'
skip = '#16100'
[../]
[./spatial_4_mpi]
type = CSVDiff
Expand All @@ -135,7 +134,6 @@
min_threads = 2
min_parallel = 2
prereq = spatial_4_mpi
skip = '#16100'
requirement = 'Spatially-dependent reaction systems may use MPI and threads'
issues = '#15693'
design = 'GeochemistrySpatialReactor.md'
Expand Down

0 comments on commit 38dfb7f

Please sign in to comment.