Skip to content

Commit

Permalink
Merge pull request #503 from JulienDoerner/Interrupt
Browse files Browse the repository at this point in the history
Handling of interruptions
  • Loading branch information
JulienDoerner authored Feb 26, 2025
2 parents 61a6257 + ea66e0e commit a5a51d3
Show file tree
Hide file tree
Showing 11 changed files with 873 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/test_examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ jobs:
[ "$file" = "MHD_modelsipynb.py" ] ||
[ "$file" = "density_grid_samplingipynb.py" ] ||
[ "$file" = "lensing_crv4ipynb.py" ] ||
[ "$file" = "interrupt_candidateVectoripynb.py" ] ||
[ "$file" = "interrupt_sourceipynb.py" ] ||
[ "$file" = "lensing_mapsv4ipynb.py" ]; then
echo "skip file $file"
else
Expand Down
1 change: 1 addition & 0 deletions doc/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ Contents
pages/acceleration.rst
pages/extending_crpropa.rst
pages/example_notebooks/propagation_comparison/Propagation_Comparison_CK_BP.ipynb
pages/interrupting-simulations.rst
pages/AdditionalResources.rst


Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions doc/pages/interrupting-simulations.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
Interrupting simulations on runtime
------------------------------------------------

CRPropa simulations can be interrupted on runtime with the `SIGTERM` or `SIGINT` signals.
If the user defines an output for the interruption (called `InterruptAction`) all candidates which are currently in the simulation will be passed to this output.
In the error stream the user will see a message denoting the number of candidates which have not been started yet.
If the simulation was run with a `candidateVector` as source, the indices of the candidates which have not been started yet will be printed or written to the file.
For a simulation with a source interface, a restart with the missing number of candidates will be sufficient to continue the simulation.

.. toctree::
:caption: Using a candidateVector as source
:maxdepth: 1

example_notebooks/interrupting_simulations/interrupt_candidateVector.ipynb

.. toctree::
:caption: Using a source interface
:maxdepth: 1

example_notebooks/interrupting_simulations/interrupt_source.ipynb


7 changes: 7 additions & 0 deletions include/crpropa/ModuleList.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "crpropa/Candidate.h"
#include "crpropa/Module.h"
#include "crpropa/Source.h"
#include "crpropa/module/Output.h"

#include <list>
#include <sstream>
Expand Down Expand Up @@ -47,9 +48,15 @@ class ModuleList: public Module {
iterator end();
const_iterator end() const;

void setInterruptAction(Output* action);
void dumpCandidate(Candidate* cand) const;

private:
module_list_t modules;
bool showProgress;
Output* interruptAction;
bool haveInterruptAction = false;
std::vector<int> notFinished; // list with not finished numbers of candidates
};

/**
Expand Down
22 changes: 18 additions & 4 deletions include/crpropa/module/Output.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,18 @@ namespace crpropa {
They can be easily customised by enabling/disabling specific columns.
*/
class Output: public Module {
protected:
double lengthScale, energyScale;
std::bitset<64> fields;

public:
struct Property
{
std::string name;
std::string comment;
Variant defaultValue;
};

protected:
double lengthScale, energyScale;
std::bitset<64> fields;

std::vector<Property> properties;

bool oneDimensional;
Expand Down Expand Up @@ -163,6 +165,18 @@ class Output: public Module {
size_t size() const;

void process(Candidate *) const;

/**
* write the indices of not started candidates into the output file.
* Used for interrupting the simulation
* @param indices list of not started indices
*/
virtual void dumpIndexList(std::vector<int> indices) {
std::cout << "indices:\t";
for (int i = 0; i < indices.size(); i++)
std::cout << indices[i] << ", ";
std::cout << "\n";
};
};

/** @}*/
Expand Down
2 changes: 2 additions & 0 deletions include/crpropa/module/TextOutput.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ class TextOutput: public Output {
*/
static void load(const std::string &filename, ParticleCollector *collector);
std::string getDescription() const;

void dumpIndexList(std::vector<int> indicies);
};
/** @}*/

Expand Down
7 changes: 5 additions & 2 deletions python/2_headers.i
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@
%feature("director") crpropa::AbstractCondition;
%include "crpropa/Module.h"

%template(OutputRefPtr) crpropa::ref_ptr<Output>;
%feature("director") crpropa::Output;
%ignore crpropa::Output::dumpIndexList(std::vector<int>);
%include "crpropa/module/Output.h"

%implicitconv crpropa::ref_ptr<crpropa::MagneticField>;
%template(MagneticFieldRefPtr) crpropa::ref_ptr<crpropa::MagneticField>;
%feature("director") crpropa::MagneticField;
Expand Down Expand Up @@ -394,8 +399,6 @@
}
}


%include "crpropa/module/Output.h"
%include "crpropa/module/DiffusionSDE.h"
%include "crpropa/module/TextOutput.h"
%include "crpropa/module/HDF5Output.h"
Expand Down
55 changes: 51 additions & 4 deletions src/ModuleList.cpp.in
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <algorithm>
#include <csignal>
#include <bits/stdc++.h>
#ifndef sighandler_t
typedef void (*sighandler_t)(int);
#endif
Expand Down Expand Up @@ -87,6 +88,10 @@ void ModuleList::run(Candidate* candidate, bool recursive, bool secondariesFirst
run(candidate->secondaries[i], recursive, secondariesFirst);
}
}

// dump candidae and secondaries if interrupted.
if (candidate->isActive() && (g_cancel_signal_flag != 0))
dumpCandidate(candidate);
}

void ModuleList::run(ref_ptr<Candidate> candidate, bool recursive, bool secondariesFirst) {
Expand Down Expand Up @@ -114,8 +119,11 @@ void ModuleList::run(const candidate_vector_t *candidates, bool recursive, bool

#pragma omp parallel for schedule(OMP_SCHEDULE)
for (size_t i = 0; i < count; i++) {
if (g_cancel_signal_flag != 0)
if (g_cancel_signal_flag != 0) {
#pragma omp critical(interrupt_write)
notFinished.push_back(i);
continue;
}

try {
run(candidates->operator[](i), recursive);
Expand All @@ -132,8 +140,18 @@ void ModuleList::run(const candidate_vector_t *candidates, bool recursive, bool
::signal(SIGINT, old_sigint_handler);
::signal(SIGTERM, old_sigterm_handler);
// Propagate signal to old handler.
if (g_cancel_signal_flag > 0)
if (g_cancel_signal_flag > 0) {
raise(g_cancel_signal_flag);
std::cerr << "############################################################################\n";
std::cerr << "# Interrupted CRPropa simulation \n";
std::cerr << "# A total of " << notFinished.size() << " candidates have not been started.\n";
std::cerr << "# the indices of the vector haven been written to to output file. \n";
std::cerr << "############################################################################\n";

// dump list to output file
std::sort(notFinished.begin(), notFinished.end());
interruptAction->dumpIndexList(notFinished);
}
}

void ModuleList::run(SourceInterface *source, size_t count, bool recursive, bool secondariesFirst) {
Expand All @@ -156,8 +174,11 @@ void ModuleList::run(SourceInterface *source, size_t count, bool recursive, bool

#pragma omp parallel for schedule(OMP_SCHEDULE)
for (size_t i = 0; i < count; i++) {
if (g_cancel_signal_flag !=0)
if (g_cancel_signal_flag !=0) {
#pragma omp critical(interrupt_write)
notFinished.push_back(i);
continue;
}

ref_ptr<Candidate> candidate;

Expand Down Expand Up @@ -189,8 +210,13 @@ void ModuleList::run(SourceInterface *source, size_t count, bool recursive, bool
::signal(SIGINT, old_signal_handler);
::signal(SIGTERM, old_sigterm_handler);
// Propagate signal to old handler.
if (g_cancel_signal_flag > 0)
if (g_cancel_signal_flag > 0) {
raise(g_cancel_signal_flag);
std::cerr << "############################################################################\n";
std::cerr << "# Interrupted CRPropa simulation \n";
std::cerr << "# Number of not started candidates from source: " << notFinished.size() << "\n";
std::cerr << "############################################################################\n";
}
}

ModuleList::iterator ModuleList::begin() {
Expand Down Expand Up @@ -222,6 +248,27 @@ void ModuleList::showModules() const {
std::cout << getDescription();
}

void ModuleList::setInterruptAction(Output* action) {
interruptAction = action;
haveInterruptAction = true;
}

void ModuleList::dumpCandidate(Candidate *cand) const {
if (!haveInterruptAction)
return;

if (cand->isActive())
interruptAction->process(cand);
else
KISS_LOG_WARNING << "ModuleList::dumpCandidate is called with a non active candidate. This should not happen for the interrupt action. Please check candidate with serial number "
<< cand->getSerialNumber() << std::endl;

for (int i = 0; i < cand->secondaries.size(); i++) {
if (cand->secondaries[i])
dumpCandidate(cand->secondaries[i]);
}
}

ModuleListRunner::ModuleListRunner(ModuleList *mlist) : mlist(mlist) {
}

Expand Down
14 changes: 14 additions & 0 deletions src/module/TextOutput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "kiss/string.h"

#include <sstream>
#include <cstdio>
#include <stdexcept>
#include <iostream>
Expand Down Expand Up @@ -378,4 +379,17 @@ void TextOutput::gzip() {
#endif
}

void TextOutput::dumpIndexList(std::vector<int> indices) {
#pragma omp critical(FileOutput)
{
std::stringstream ss;
ss << "#" << "\t";
for (int i = 0; i < indices.size(); i++)
ss << indices[i] << "\t";

const std::string cstr = ss.str();
out-> write(cstr.c_str(), cstr.length());
}
}

} // namespace crpropa

0 comments on commit a5a51d3

Please sign in to comment.