Skip to content

Commit

Permalink
Merge pull request #322 from JeffersonLab/nbrei_inspector
Browse files Browse the repository at this point in the history
Feature: JApplicationInspector interactively fires arrows
  • Loading branch information
nathanwbrei authored Jul 19, 2024
2 parents 47abe89 + 7534a3c commit 4f122e0
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ docs/latex/*
cmake-build*/
.idea/*
.cache
compile_commands.json

# PODIO generated artifacts
src/examples/PodioExample/datamodel/*
Expand Down
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ set( CMAKE_BUILD_WITH_INSTALL_RPATH FALSE )
set( CMAKE_INSTALL_RPATH "${CMAKE_INSTALL_PREFIX}/lib" )
set( CMAKE_INSTALL_RPATH_USE_LINK_PATH TRUE )

# Generate a compilation database, e.g. for IDE autocompletion
set( CMAKE_EXPORT_COMPILE_COMMANDS TRUE )

# Useful for debugging. Copied from:
# https://stackoverflow.com/questions/9298278/cmake-print-out-all-accessible-variables-in-a-script
Expand Down
16 changes: 15 additions & 1 deletion src/libraries/JANA/CLI/JMain.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ void PrintUsageOptions() {
std::cout << " -l --loadconfigs <file> Load configuration parameters from file" << std::endl;
std::cout << " -d --dumpconfigs <file> Dump configuration parameters to file" << std::endl;
std::cout << " -b --benchmark Run in benchmark mode" << std::endl;
std::cout << " -i --interactive Run in interactive mode" << std::endl;
std::cout << " --inspect-collection <name> Inspect a collection" << std::endl;
std::cout << " --inspect-component <name> Inspect a component" << std::endl;
}
Expand Down Expand Up @@ -136,7 +137,14 @@ int Execute(JApplication* app, UserOptions &options) {
}
}
}

else if (options.flags[Interactive]) {
app->Initialize();
app->Inspect();
// TODO: Resume and Scale won't work because Inspector calls nonblocking Run()
// Another thing we could do is app->RequestInspection(); app->Run(true);
// as long as we rejigger app->Run() to jump straight to Inspect() when m_inspecting is set
// Or we could wait until we factor out Run() into JSupervisor
}
else if (options.flags[Benchmark]) {
// Run JANA in benchmark mode
JBenchmarker benchmarker(app); // Benchmarking params override default params
Expand Down Expand Up @@ -184,6 +192,8 @@ UserOptions ParseCommandLineOptions(int nargs, char *argv[], bool expect_extra)
tokenizer["--dumpconfigs"] = DumpConfigs;
tokenizer["-b"] = Benchmark;
tokenizer["--benchmark"] = Benchmark;
tokenizer["-i"] = Interactive;
tokenizer["--interactive"] = Interactive;
tokenizer["--inspect-collection"] = InspectCollection;
tokenizer["--inspect-component"] = InspectComponent;

Expand Down Expand Up @@ -255,6 +265,10 @@ UserOptions ParseCommandLineOptions(int nargs, char *argv[], bool expect_extra)
}
break;

case Interactive:
options.flags[Interactive] = true;
break;

case Unknown:
if (argv[i][0] == '-' && argv[i][1] == 'P') {

Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/CLI/JMain.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

namespace jana {

enum Flag {Unknown, ShowUsage, ShowVersion, ShowConfigs, LoadConfigs, DumpConfigs, Benchmark, InspectCollection, InspectComponent};
enum Flag {Unknown, ShowUsage, ShowVersion, ShowConfigs, LoadConfigs, DumpConfigs, Benchmark, InspectCollection, InspectComponent, Interactive};

struct UserOptions {
/// Code representation of all user options.
Expand Down
9 changes: 9 additions & 0 deletions src/libraries/JANA/Engine/JArrowProcessingController.cc
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,15 @@ JArrowProcessingController::~JArrowProcessingController() {
delete m_scheduler;
}

JArrowMetrics::Status JArrowProcessingController::execute_arrow(int arrow_index) {
auto arrow = m_scheduler->checkout(arrow_index);
if (arrow == nullptr) return JArrowMetrics::Status::Error;
JArrowMetrics metrics;
arrow->execute(metrics, 0);
m_scheduler->last_assignment(0, arrow, metrics.get_last_status());
return metrics.get_last_status();
}

void JArrowProcessingController::print_report() {
auto metrics = measure_performance();
LOG_INFO(m_logger) << "Running" << *metrics << LOG_END;
Expand Down
2 changes: 2 additions & 0 deletions src/libraries/JANA/Engine/JArrowProcessingController.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class JArrowProcessingController : public JService {
bool is_timed_out();
bool is_excepted();

JArrowMetrics::Status execute_arrow(int);

std::vector<JException> get_exceptions() const;

std::unique_ptr<const JPerfSummary> measure_performance();
Expand Down
23 changes: 23 additions & 0 deletions src/libraries/JANA/Engine/JScheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,29 @@ void JScheduler::checkin_unprotected(JArrow* assignment, JArrowMetrics::Status l
}
}


JArrow* JScheduler::checkout(int arrow_index) {
// Note that this lets us check out Inactive arrows, whereas checkout_unprotected() does not. This because we are called by JApplicationInspector
// whereas checkout_unprotected is called by JWorker. This is because JArrowProcessingController::request_pause shuts off the topology
// instead of shutting off the workers, which in hindsight might have been the wrong choice.

std::lock_guard<std::mutex> lock(m_mutex);

if (arrow_index >= m_topology_state.arrow_states.size()) return nullptr;

ArrowState& candidate = m_topology_state.arrow_states[arrow_index];

if ((candidate.status == ArrowStatus::Active || candidate.status == ArrowStatus::Inactive) && // This excludes Draining arrows
(candidate.arrow->is_parallel() || candidate.thread_count == 0)) { // This excludes non-parallel arrows that are already assigned to a worker

m_topology_state.arrow_states[arrow_index].thread_count += 1;
return candidate.arrow;

}
return nullptr;
}


JArrow* JScheduler::checkout_unprotected() {

// Choose a new arrow. Loop over all arrows, starting at where we last left off, and pick the first arrow that works
Expand Down
4 changes: 4 additions & 0 deletions src/libraries/JANA/Engine/JScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class JScheduler {
/// any more. The scheduler is thus free to reassign the arrow to one of the remaining workers.
void last_assignment(uint32_t worker_id, JArrow* assignment, JArrowMetrics::Status result);

/// Lets a Worker, test case, or user request a specific arrow. Returns nullptr if arrow can not be
/// checked up because it's no longer active or because it's already at its max parallelism.
JArrow* checkout(int arrow_index);

/// Logger is public so that somebody else can configure it
JLogger logger;

Expand Down
84 changes: 76 additions & 8 deletions src/libraries/JANA/Utils/JApplicationInspector.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@

#include "JApplicationInspector.h"
#include "JANA/Services/JComponentManager.h"
#include "JANA/Status/JComponentSummary.h"
#include "JANA/Topology/JTopologyBuilder.h"
#include <JANA/JApplication.h>
#include <JANA/Engine/JArrowProcessingController.h>

Expand All @@ -7,7 +11,10 @@ void PrintMenu() {
std::cout << " -----------------------------------------" << std::endl;
std::cout << " Available commands" << std::endl;
std::cout << " -----------------------------------------" << std::endl;
std::cout << " ic InspectComponents" << std::endl;
std::cout << " icm InspectComponents" << std::endl;
std::cout << " icm InspectComponent component_name" << std::endl;
std::cout << " icl InspectCollections" << std::endl;
std::cout << " icl InspectCollection collection_name" << std::endl;
std::cout << " it InspectTopology" << std::endl;
std::cout << " ip InspectPlace arrow_id place_id" << std::endl;
std::cout << " ie InspectEvent arrow_id place_id slot_id" << std::endl;
Expand All @@ -19,6 +26,58 @@ void PrintMenu() {
std::cout << " -----------------------------------------" << std::endl;
}

void InspectTopology(JApplication* app) {
auto topology = app->GetService<JTopologyBuilder>();
std::cout << topology->print_topology() << std::endl;
}

void Fire(JApplication* app, int arrow_id) {
auto engine = app->GetService<JArrowProcessingController>();
auto result = engine->execute_arrow(arrow_id);
std::cout << to_string(result) << std::endl;
}

void InspectComponents(JApplication* app) {
auto& summary = app->GetComponentSummary();
PrintComponentTable(std::cout, summary);
}

void InspectComponent(JApplication* app, std::string component_name) {
const auto& summary = app->GetComponentSummary();
auto lookup = summary.FindComponents(component_name);
if (lookup.empty()) {
std::cout << "Component not found!" << std::endl;
}
else {
std::cout << "----------------------------------------------------------" << std::endl;
for (auto* item : lookup) {
std::cout << *item;
std::cout << "----------------------------------------------------------" << std::endl;
}
}
}

void InspectCollections(JApplication* app) {
const auto& summary = app->GetComponentSummary();
PrintCollectionTable(std::cout, summary);
}

void InspectCollection(JApplication* app, std::string collection_name) {
const auto& summary = app->GetComponentSummary();
auto lookup = summary.FindCollections(collection_name);
if (lookup.empty()) {
std::cout << "Collection not found!" << std::endl;
}
else {
std::cout << "----------------------------------------------------------" << std::endl;
for (auto* item : lookup) {
std::cout << *item;
std::cout << "----------------------------------------------------------" << std::endl;
}
}
}


void InspectApplication(JApplication* app) {
auto engine = app->GetService<JArrowProcessingController>();
engine->request_pause();
Expand All @@ -36,17 +95,26 @@ void InspectApplication(JApplication* app) {
std::stringstream ss(user_input);
std::string token;
ss >> token;
std::vector<int> args;
std::vector<std::string> args;
std::string arg;
try {
while (ss >> arg) {
args.push_back(std::stoi(arg));
args.push_back(arg);
}
if ((token == "InspectComponents" || token == "icm") && args.empty()) {
InspectComponents(app);
}
else if ((token == "InspectComponent" || token == "icm") && (args.size() == 1)) {
InspectComponent(app, args[0]);
}
else if ((token == "InspectCollections" || token == "icl") && args.empty()) {
InspectCollections(app);
}
if (token == "InspectComponents" || token == "ic") {
// InspectComponents();
else if ((token == "InspectCollection" || token == "icl") && (args.size() == 1)) {
InspectCollection(app, args[0]);
}
else if ((token == "InspectTopology" || token == "it") && args.empty()) {
// InspectTopology(0);
InspectTopology(app);
}
else if ((token == "InspectPlace" || token == "ip") && args.size() == 2) {
// InspectPlace(std::stoi(args[0]), std::stoi(args[1]));
Expand All @@ -55,14 +123,14 @@ void InspectApplication(JApplication* app) {
// InspectEvent(std::stoi(args[0])
}
else if ((token == "Fire" || token == "f") && (args.size() == 1)) {
// Fire(args[0]);
Fire(app, std::stoi(args[0]));
}
else if (token == "Resume" || token == "r") {
app->Run(false);
break;
}
else if ((token == "Scale" || token == "s") && (args.size() == 1)) {
app->Scale(args[0]);
app->Scale(std::stoi(args[0]));
break;
}
else if (token == "Quit" || token == "q") {
Expand Down

0 comments on commit 4f122e0

Please sign in to comment.