Skip to content

Commit

Permalink
Merge pull request flux-framework#2726 from grondo/sched-simple-unlim…
Browse files Browse the repository at this point in the history
…ited

add 'unlimited' alloc mode to simple-sched
  • Loading branch information
mergify[bot] authored Feb 12, 2020
2 parents cc52ae8 + 7b7c537 commit 47e59f3
Show file tree
Hide file tree
Showing 4 changed files with 197 additions and 48 deletions.
177 changes: 141 additions & 36 deletions src/modules/sched-simple/sched.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,32 @@
#include <flux/schedutil.h>

#include "src/common/libutil/errno_safe.h"
#include "src/common/libjob/job.h"
#include "libjj.h"
#include "rlist.h"

struct jobreq {
void *handle;
const flux_msg_t *msg;
uint32_t uid;
int priority;
double t_submit;
flux_jobid_t id;
struct jj_counts jj;
int errnum;
};

struct simple_sched {
flux_t *h;
char *mode; /* allocation mode */
bool single;
struct rlist *rlist; /* list of resources */
struct jobreq *job; /* currently processed job */
zlistx_t *queue; /* job queue */
schedutil_t *util_ctx;

flux_watcher_t *prep;
flux_watcher_t *check;
flux_watcher_t *idle;
};

static void jobreq_destroy (struct jobreq *job)
Expand All @@ -42,18 +53,50 @@ static void jobreq_destroy (struct jobreq *job)
}
}

static void jobreq_destructor (void **x)
{
jobreq_destroy (*x);
}

#define NUMCMP(a,b) ((a)==(b)?0:((a)<(b)?-1:1))

/* Taken from modules/job-manager/job.c */
static int jobreq_cmp (const void *x, const void *y)
{
const struct jobreq *j1 = x;
const struct jobreq *j2 = y;
int rc;

if ((rc = (-1)*NUMCMP (j1->priority, j2->priority)) == 0)
rc = NUMCMP (j1->t_submit, j2->t_submit);
return rc;
}

static struct jobreq *
jobreq_find (struct simple_sched *ss, flux_jobid_t id)
{
struct jobreq *job;
job = zlistx_first (ss->queue);
while (job) {
if (job->id == id)
return job;
job = zlistx_next (ss->queue);
}
return NULL;
}

static struct jobreq *
jobreq_create (const flux_msg_t *msg, const char *jobspec)
{
struct jobreq *job = calloc (1, sizeof (*job));
int pri;
uint32_t uid;
double t_submit;

if (job == NULL)
return NULL;
if (schedutil_alloc_request_decode (msg, &job->id,
&pri, &uid, &t_submit) < 0)
if (schedutil_alloc_request_decode (msg,
&job->id,
&job->priority,
&job->uid,
&job->t_submit) < 0)
goto err;
job->msg = flux_msg_incref (msg);
if (libjj_get_counts (jobspec, &job->jj) < 0)
Expand All @@ -66,11 +109,16 @@ jobreq_create (const flux_msg_t *msg, const char *jobspec)

static void simple_sched_destroy (flux_t *h, struct simple_sched *ss)
{
schedutil_destroy (ss->util_ctx);
if (ss->job) {
flux_respond_error (h, ss->job->msg, ENOSYS, "simple sched exiting");
jobreq_destroy (ss->job);
struct jobreq *job = zlistx_first (ss->queue);
while (job) {
flux_respond_error (h, job->msg, ENOSYS, "simple sched exiting");
job = zlistx_next (ss->queue);
}
zlistx_destroy (&ss->queue);
flux_watcher_destroy (ss->prep);
flux_watcher_destroy (ss->check);
flux_watcher_destroy (ss->idle);
schedutil_destroy (ss->util_ctx);
rlist_destroy (ss->rlist);
free (ss->mode);
free (ss);
Expand All @@ -81,6 +129,9 @@ static struct simple_sched * simple_sched_create (void)
struct simple_sched *ss = calloc (1, sizeof (*ss));
if (ss == NULL)
return NULL;

/* Single alloc request mode is default */
ss->single = true;
return ss;
}

Expand All @@ -95,25 +146,27 @@ static char *Rstring_create (struct rlist *l)
return (s);
}

static void try_alloc (flux_t *h, struct simple_sched *ss)
static int try_alloc (flux_t *h, struct simple_sched *ss)
{
int rc = -1;
char *s = NULL;
struct rlist *alloc = NULL;
struct jj_counts *jj = NULL;
char *R = NULL;
if (!ss->job)
return;
jj = &ss->job->jj;
struct jobreq *job = zlistx_first (ss->queue);
if (!job)
return -1;
jj = &job->jj;
alloc = rlist_alloc (ss->rlist, ss->mode,
jj->nnodes, jj->nslots, jj->slot_size);
if (!alloc) {
const char *note = "unable to allocate provided jobspec";
if (errno == ENOSPC)
return;
return rc;
if (errno == EOVERFLOW)
note = "unsatisfiable request";
if (schedutil_alloc_respond_denied (ss->util_ctx,
ss->job->msg,
job->msg,
note) < 0)
flux_log_error (h, "schedutil_alloc_respond_denied");
goto out;
Expand All @@ -122,17 +175,46 @@ static void try_alloc (flux_t *h, struct simple_sched *ss)
if (!(R = Rstring_create (alloc)))
flux_log_error (h, "Rstring_create");

if (R && schedutil_alloc_respond_R (ss->util_ctx, ss->job->msg, R, s) < 0)
if (R && schedutil_alloc_respond_R (ss->util_ctx, job->msg, R, s) < 0)
flux_log_error (h, "schedutil_alloc_respond_R");

flux_log (h, LOG_DEBUG, "alloc: %ju: %s", (uintmax_t) ss->job->id, s);
flux_log (h, LOG_DEBUG, "alloc: %ju: %s", (uintmax_t) job->id, s);
rc = 0;

out:
jobreq_destroy (ss->job);
ss->job = NULL;
zlistx_delete (ss->queue, job->handle);
rlist_destroy (alloc);
free (R);
free (s);
return rc;
}

static void prep_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
struct simple_sched *ss = arg;
/* if there is at least one job to schedule, start check and idle */
if (zlistx_size (ss->queue) > 0) {
/* If there's a new job to process, start idle watcher */
flux_watcher_start (ss->check);
flux_watcher_start (ss->idle);
}
}

static void check_cb (flux_reactor_t *r, flux_watcher_t *w,
int revents, void *arg)
{
struct simple_sched *ss = arg;
flux_watcher_stop (ss->idle);

/* See if we can fulfill alloc for a pending job
* If current head of queue can't be allocated, stop the prep
* watcher, i.e. block. O/w, retry on next loop.
*/
if (try_alloc (ss->h, ss) < 0 && errno == ENOSPC) {
flux_watcher_stop (ss->prep);
flux_watcher_stop (ss->check);
}
}

static int try_free (flux_t *h, struct simple_sched *ss, const char *R)
Expand Down Expand Up @@ -167,44 +249,47 @@ void free_cb (flux_t *h, const flux_msg_t *msg, const char *R, void *arg)
flux_log_error (h, "free_cb: schedutil_free_respond");

/* See if we can fulfill alloc for a pending job */
try_alloc (h, ss);
flux_watcher_start (ss->prep);
}

static void alloc_cb (flux_t *h, const flux_msg_t *msg,
const char *jobspec, void *arg)
{
struct simple_sched *ss = arg;
struct jobreq *job;

if (ss->job) {
if (ss->single && zlistx_size (ss->queue) > 0) {
flux_log (h, LOG_ERR, "alloc received before previous one handled");
errno = EINVAL;
goto err;
}
if (!(ss->job = jobreq_create (msg, jobspec))) {
if (!(job = jobreq_create (msg, jobspec))) {
flux_log_error (h, "alloc: jobreq_create");
goto err;
}
if (ss->job->errnum != 0) {
if (job->errnum != 0) {
if (schedutil_alloc_respond_denied (ss->util_ctx,
msg,
ss->job->jj.error) < 0)
job->jj.error) < 0)
flux_log_error (h, "alloc_respond_denied");
jobreq_destroy (ss->job);
ss->job = NULL;
jobreq_destroy (job);
return;
}
flux_log (h, LOG_DEBUG, "req: %ju: spec={%d,%d,%d}",
(uintmax_t) ss->job->id, ss->job->jj.nnodes,
ss->job->jj.nslots, ss->job->jj.slot_size);
try_alloc (h, ss);
(uintmax_t) job->id, job->jj.nnodes,
job->jj.nslots, job->jj.slot_size);
job->handle = zlistx_insert (ss->queue,
job,
job->priority > FLUX_JOB_PRIORITY_DEFAULT);
flux_watcher_start (ss->prep);
return;
err:
if (flux_respond_error (h, msg, errno, NULL) < 0)
flux_log_error (h, "alloc: flux_respond_error");
}

/* Job manager wants to cancel a pending allocation request.
* If ss->job (our queue of 1) matches, respond to the alloc request
* If a matching job found in queue, respond to the alloc request
* and "dequeue" it.
*/
static void cancel_cb (flux_t *h,
Expand All @@ -214,14 +299,14 @@ static void cancel_cb (flux_t *h,
void *arg)
{
struct simple_sched *ss = arg;
struct jobreq *job = jobreq_find (ss, id);

if (ss->job && ss->job->id == id) {
if (schedutil_alloc_respond_cancel (ss->util_ctx, ss->job->msg) < 0) {
if (job) {
if (schedutil_alloc_respond_cancel (ss->util_ctx, job->msg) < 0) {
flux_log_error (h, "alloc_respond_cancel");
return;
}
jobreq_destroy (ss->job);
ss->job = NULL;
zlistx_delete (ss->queue, job->handle);
}
}

Expand Down Expand Up @@ -302,7 +387,9 @@ static int simple_sched_init (flux_t *h, struct simple_sched *ss)
flux_log_error (h, "schedutil_hello");
goto out;
}
if (schedutil_ready (ss->util_ctx, "single", NULL) < 0) {
if (schedutil_ready (ss->util_ctx,
ss->single ? "single": "unlimited",
NULL) < 0) {
flux_log_error (h, "schedutil_ready");
goto out;
}
Expand Down Expand Up @@ -335,6 +422,9 @@ static int process_args (flux_t *h, struct simple_sched *ss,
free (ss->mode);
ss->mode = get_alloc_mode (h, argv[i]+5);
}
else if (strcmp ("unlimited", argv[i]) == 0) {
ss->single = false;
}
else {
flux_log_error (h, "Unknown module option: '%s'", argv[i]);
return -1;
Expand Down Expand Up @@ -368,6 +458,21 @@ int mod_main (flux_t *h, int argc, char **argv)
flux_log_error (h, "schedutil_create");
goto done;
}
ss->h = h;
ss->prep = flux_prepare_watcher_create (r, prep_cb, ss);
ss->check = flux_check_watcher_create (r, check_cb, ss);
ss->idle = flux_idle_watcher_create (r, NULL, NULL);
if (!ss->prep || !ss->check || !ss->idle) {
errno = ENOMEM;
goto done;
}
flux_watcher_start (ss->prep);

if (!(ss->queue = zlistx_new ()))
goto done;
zlistx_set_comparator (ss->queue, jobreq_cmp);
zlistx_set_destructor (ss->queue, jobreq_destructor);

if (simple_sched_init (h, ss) < 0)
goto done;
if (flux_msg_handler_addvec (h, htab, ss, &handlers) < 0) {
Expand Down
Loading

0 comments on commit 47e59f3

Please sign in to comment.