Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor local_driver and handling of generic options #6061

Merged
merged 5 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/clib/lib/include/ert/job_queue/local_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,8 @@ void local_driver_free__(void *__driver);
job_status_type local_driver_get_job_status(void *__driver, void *__job);
void local_driver_free_job(void *__job);
void local_driver_init_option_list(stringlist_type *option_list);

bool local_driver_set_option(void *__driver, const char *option_key,
const void *value_);
const void *local_driver_get_option(const void *__driver,
const char *option_key);
#endif
4 changes: 0 additions & 4 deletions src/clib/lib/include/ert/job_queue/queue_driver.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ void queue_driver_free_job(queue_driver_type *driver, void *job_data);
void queue_driver_kill_job(queue_driver_type *driver, void *job_data);
job_status_type queue_driver_get_status(queue_driver_type *driver,
void *job_data);

extern "C" PY_USED const char *
queue_driver_get_name(const queue_driver_type *driver);

extern "C" bool queue_driver_set_option(queue_driver_type *driver,
const char *option_key,
const void *value);
Expand Down
84 changes: 43 additions & 41 deletions src/clib/lib/job_queue/local_driver.cpp
Original file line number Diff line number Diff line change
@@ -1,45 +1,37 @@
#include <mutex>
#include <optional>
#include <thread>

#include <signal.h>
#include <stdlib.h>
#include <sys/wait.h>

#include <ert/util/util.hpp>
#include <thread>

#include <ert/job_queue/local_driver.hpp>
#include <ert/job_queue/queue_driver.hpp>
#include <ert/job_queue/spawn.hpp>
#include <ert/util/util.hpp>

typedef struct local_job_struct local_job_type;

struct local_job_struct {
bool active;
job_status_type status;
std::optional<std::thread> run_thread;
pid_t child_process;
bool active = false;
job_status_type status = JOB_QUEUE_WAITING;
std::optional<std::thread> run_thread = std::nullopt;
pid_t child_process = 0;
};

struct local_driver_struct {
std::mutex submit_lock;
};

static local_job_type *local_job_alloc() {
local_job_type *job = new local_job_type;
job->active = false;
job->status = JOB_QUEUE_WAITING;
return job;
}
static local_job_type *local_job_alloc() { return new local_job_type; }

job_status_type local_driver_get_job_status(void *__driver, void *__job) {
if (__job == NULL)
/* The job has not been registered at all ... */
return JOB_QUEUE_NOT_ACTIVE;
else {
job_status_type local_driver_get_job_status(void * /**__driver*/, void *__job) {
if (__job != nullptr) {
local_job_type *job = reinterpret_cast<local_job_type *>(__job);
return job->status;
}

return JOB_QUEUE_NOT_ACTIVE; // The job has not been registered at all
}

void local_driver_free_job(void *__job) {
Expand All @@ -48,7 +40,7 @@ void local_driver_free_job(void *__job) {
free(job);
}

void local_driver_kill_job(void *__driver, void *__job) {
void local_driver_kill_job(void * /**__driver*/, void *__job) {
local_job_type *job = reinterpret_cast<local_job_type *>(__job);
if (job->child_process > 0)
kill(job->child_process, SIGTERM);
Expand All @@ -63,37 +55,33 @@ void submit_job_thread(const char *executable, int argc, char **argv,
local_job_type *job) {
int wait_status;
job->child_process =
spawn(executable, argc, (const char **)argv, NULL, NULL);
spawn(executable, argc, (const char **)argv, nullptr, nullptr);
util_free_stringlist(argv, argc);
waitpid(job->child_process, &wait_status, 0);

job->active = false;
job->status = JOB_QUEUE_EXIT;
if (WIFEXITED(wait_status))
if (WEXITSTATUS(wait_status) == 0)
job->status = JOB_QUEUE_DONE;
if (WIFEXITED(wait_status) != 0 && (WEXITSTATUS(wait_status) == 0))
job->status = JOB_QUEUE_DONE;
}

void *local_driver_submit_job(void *__driver, const char *submit_cmd,
int num_cpu, /* Ignored */
const char *run_path, const char *job_name,
int argc, const char **argv) {
int /** num_cpu */, const char * /**run_path*/,
const char * /**job_name*/, int argc,
const char **argv) {
local_driver_type *driver = reinterpret_cast<local_driver_type *>(__driver);
{
local_job_type *job = local_job_alloc();
local_job_type *job = local_job_alloc();
auto argv_copy = util_alloc_stringlist_copy(argv, argc);

auto argv_copy = util_alloc_stringlist_copy(argv, argc);
std::lock_guard guard{driver->submit_lock};
job->active = true;
job->status = JOB_QUEUE_RUNNING;

std::lock_guard guard{driver->submit_lock};
job->active = true;
job->status = JOB_QUEUE_RUNNING;
job->run_thread = std::thread{
[=] { submit_job_thread(submit_cmd, argc, argv_copy, job); }};
job->run_thread->detach();

job->run_thread = std::thread{
[=] { submit_job_thread(submit_cmd, argc, argv_copy, job); }};
job->run_thread->detach();

return job;
}
return job;
}

void local_driver_free(local_driver_type *driver) { delete driver; }
Expand All @@ -105,6 +93,20 @@ void local_driver_free__(void *__driver) {

void *local_driver_alloc() { return new local_driver_type; }

void local_driver_init_option_list(stringlist_type *option_list) {
//No options specific for local driver; do nothing
void local_driver_init_option_list(stringlist_type * /**option_list*/) {}

bool local_driver_set_option(void * /**__driver*/, const char * /**option_key*/,
const void * /**value_*/) {
util_abort(
"%s: Local driver does not support run time setting of options\n",
__func__);
return false;
}

const void *local_driver_get_option(const void * /**__driver*/,
const char * /**option_key*/) {
util_abort(
"%s: Local driver does not support run time reading of options\n",
__func__);
return nullptr;
}
47 changes: 5 additions & 42 deletions src/clib/lib/job_queue/queue_driver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,31 +48,11 @@ struct queue_driver_struct {

/** Driver specific data - passed as first argument to the driver functions above. */
void *data;

/* Generic data - common to all driver types. */
/** String name of driver. */
char *name;
};

const char *queue_driver_get_name(const queue_driver_type *driver) {
return driver->name;
}

/**
Set option - can also be used to perform actions - not only setting
of parameters. There is no limit :-)
*/
bool queue_driver_set_option(queue_driver_type *driver, const char *option_key,
const void *value) {
if (driver->set_option != NULL)
/* The actual low level set functions can not fail! */
return driver->set_option(driver->data, option_key, value);
else {
util_abort(
"%s: driver:%s does not support run time setting of options\n",
__func__, driver->name);
return false;
}
return driver->set_option(driver->data, option_key, value);
}

/**
Expand All @@ -93,7 +73,6 @@ static queue_driver_type *queue_driver_alloc_empty() {
driver->free_driver = NULL;
driver->get_option = NULL;
driver->set_option = NULL;
driver->name = NULL;
driver->data = NULL;
driver->init_options = NULL;

Expand All @@ -116,7 +95,6 @@ queue_driver_type *queue_driver_alloc(job_driver_type type) {
driver->free_driver = lsf_driver_free__;
driver->set_option = lsf_driver_set_option;
driver->get_option = lsf_driver_get_option;
driver->name = util_alloc_string_copy("LSF");
driver->init_options = lsf_driver_init_option_list;
driver->data = lsf_driver_alloc();
break;
Expand All @@ -126,7 +104,8 @@ queue_driver_type *queue_driver_alloc(job_driver_type type) {
driver->kill_job = local_driver_kill_job;
driver->free_job = local_driver_free_job;
driver->free_driver = local_driver_free__;
driver->name = util_alloc_string_copy("local");
driver->set_option = local_driver_set_option;
driver->get_option = local_driver_get_option;
driver->init_options = local_driver_init_option_list;
driver->data = local_driver_alloc();
break;
Expand All @@ -138,12 +117,10 @@ queue_driver_type *queue_driver_alloc(job_driver_type type) {
driver->free_driver = torque_driver_free__;
driver->set_option = torque_driver_set_option;
driver->get_option = torque_driver_get_option;
driver->name = util_alloc_string_copy("TORQUE");
driver->init_options = torque_driver_init_option_list;
driver->data = torque_driver_alloc();
break;
case SLURM_DRIVER:
driver->name = util_alloc_string_copy("SLURM");
driver->set_option = slurm_driver_set_option;
driver->get_option = slurm_driver_get_option;
driver->init_options = slurm_driver_init_option_list;
Expand All @@ -163,25 +140,12 @@ queue_driver_type *queue_driver_alloc(job_driver_type type) {

const void *queue_driver_get_option(queue_driver_type *driver,
const char *option_key) {
if (driver->get_option != NULL)
/* The actual low level set functions can not fail! */
return driver->get_option(driver->data, option_key);
else {
util_abort(
"%s: driver:%s does not support run time reading of options\n",
__func__, driver->name);
return NULL;
}
return driver->get_option(driver->data, option_key);
}

void queue_driver_init_option_list(queue_driver_type *driver,
stringlist_type *option_list) {
if (driver->init_options)
driver->init_options(option_list);
else
util_abort(
"%s: driver:%s does not support run time reading of options\n",
__func__, driver->name);
driver->init_options(option_list);
}

/* These are the functions used by the job_queue layer. */
Expand Down Expand Up @@ -214,6 +178,5 @@ void queue_driver_free_driver(queue_driver_type *driver) {

void queue_driver_free(queue_driver_type *driver) {
queue_driver_free_driver(driver);
free(driver->name);
delete driver;
}
98 changes: 42 additions & 56 deletions src/clib/old_tests/job_queue/test_job_queue_driver.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include <stdlib.h>

#include <ert/util/test_util.hpp>
#include <ert/util/util.hpp>
#include <stdlib.h>
#include <vector>

#include <ert/job_queue/job_queue.hpp>
#include <ert/job_queue/local_driver.hpp>
#include <ert/job_queue/lsf_driver.hpp>

#include <ert/job_queue/queue_driver.hpp>
#include <ert/job_queue/slurm_driver.hpp>
#include <ert/job_queue/torque_driver.hpp>

Expand Down Expand Up @@ -43,65 +44,45 @@ void set_option_valid_on_specific_driver_returns_true() {
queue_driver_free(driver_torque);
}

void get_driver_option_lists() {
//Torque driver option list
{
queue_driver_type *driver_torque = queue_driver_alloc(TORQUE_DRIVER);
test_assert_string_equal(queue_driver_get_name(driver_torque),
"TORQUE");
stringlist_type *option_list = stringlist_alloc_new();
queue_driver_init_option_list(driver_torque, option_list);

for (const auto &i : TORQUE_DRIVER_OPTIONS) {
test_assert_true(stringlist_contains(option_list, i.c_str()));
}
stringlist_free(option_list);
queue_driver_free(driver_torque);
}

//Local driver option list (only general queue_driver options)
{
queue_driver_type *driver_local = queue_driver_alloc(LOCAL_DRIVER);
test_assert_string_equal(queue_driver_get_name(driver_local), "local");
stringlist_type *option_list = stringlist_alloc_new();
queue_driver_init_option_list(driver_local, option_list);
void get_driver_option_lists(job_driver_type driver_type,
std::vector<std::string> driver_options) {
queue_driver_type *driver_ = queue_driver_alloc(driver_type);
stringlist_type *option_list = stringlist_alloc_new();
queue_driver_init_option_list(driver_, option_list);

stringlist_free(option_list);
queue_driver_free(driver_local);
for (const auto &i : driver_options) {
test_assert_true(stringlist_contains(option_list, i.c_str()));
}

//Lsf driver option list
{
queue_driver_type *driver_lsf = queue_driver_alloc(LSF_DRIVER);
test_assert_string_equal(queue_driver_get_name(driver_lsf), "LSF");
stringlist_type *option_list = stringlist_alloc_new();
queue_driver_init_option_list(driver_lsf, option_list);

for (const auto &i : LSF_DRIVER_OPTIONS) {
test_assert_true(stringlist_contains(option_list, i.c_str()));
}

stringlist_free(option_list);
queue_driver_free(driver_lsf);
}

//SLurm driver option list
{
queue_driver_type *driver_slurm = queue_driver_alloc(SLURM_DRIVER);
test_assert_string_equal(queue_driver_get_name(driver_slurm), "SLURM");
stringlist_type *option_list = stringlist_alloc_new();
queue_driver_init_option_list(driver_slurm, option_list);

for (const auto &i : SLURM_DRIVER_OPTIONS) {
test_assert_true(stringlist_contains(option_list, i.c_str()));
}
stringlist_free(option_list);
queue_driver_free(driver_);
}

stringlist_free(option_list);
queue_driver_free(driver_slurm);
}
void test_local_driver_no_get_set_options() {
queue_driver_type *driver_local = queue_driver_alloc(LOCAL_DRIVER);
stringlist_type *option_list = stringlist_alloc_new();
queue_driver_init_option_list(driver_local, option_list);
test_assert_util_abort(
"local_driver_get_option",
[](void *arg) {
auto local_driver = static_cast<queue_driver_type *>(arg);
queue_driver_get_option(local_driver, "NA");
},
driver_local);

test_assert_util_abort(
"local_driver_set_option",
[](void *arg) {
auto local_driver = static_cast<queue_driver_type *>(arg);
queue_driver_set_option(local_driver, "NA", "NA");
},
driver_local);
stringlist_free(option_list);
queue_driver_free(driver_local);
}

int main(int argc, char **argv) {
util_install_signals();
job_queue_set_driver_(LSF_DRIVER);
job_queue_set_driver_(LOCAL_DRIVER);
job_queue_set_driver_(TORQUE_DRIVER);
Expand All @@ -111,7 +92,12 @@ int main(int argc, char **argv) {
set_option_invalid_value_returns_false();

set_option_valid_on_specific_driver_returns_true();
get_driver_option_lists();

get_driver_option_lists(TORQUE_DRIVER, TORQUE_DRIVER_OPTIONS);
get_driver_option_lists(SLURM_DRIVER, SLURM_DRIVER_OPTIONS);
get_driver_option_lists(LSF_DRIVER, LSF_DRIVER_OPTIONS);

test_local_driver_no_get_set_options();

exit(0);
}
Loading
Loading