Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sanity check jobs #1215

Merged
merged 9 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ private final class AllocSQL extends PowerSQL {
getRectangles = conn.query(findRectangle);
getRectangleAt = conn.query(findRectangleAt);
countConnectedBoards = conn.query(countConnected);
findSpecificBoard = conn.query(findLocation);
findSpecificBoard = conn.query(FIND_LOCATION);
getConnectedBoardIDs = conn.query(getConnectedBoards);
allocBoard = conn.update(ALLOCATE_BOARDS_BOARD);
allocJob = conn.update(ALLOCATE_BOARDS_JOB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,30 +416,32 @@ private static JobDescription jobDescription(int id, Row job,
}

@Override
public Optional<Job> createJobInGroup(String owner, String groupName,
public Job createJobInGroup(String owner, String groupName,
CreateDescriptor descriptor, String machineName, List<String> tags,
Duration keepaliveInterval, byte[] req) {
Duration keepaliveInterval, byte[] req)
throws IllegalArgumentException {
return execute(conn -> {
int user = getUser(conn, owner).orElseThrow(
() -> new RuntimeException("no such user: " + owner));
int group = selectGroup(conn, owner, groupName);
if (!quotaManager.mayCreateJob(group)) {
// No quota left
return Optional.empty();
throw new IllegalArgumentException(
"quota exceeded in group " + group);
}

var m = selectMachine(conn, machineName, tags);
var m = selectMachine(conn, descriptor, machineName, tags);
if (!m.isPresent()) {
// Cannot find machine!
return Optional.empty();
throw new IllegalArgumentException(
"no machine available which matches allocation "
+ "request");
}
var machine = m.orElseThrow();

var id = insertJob(conn, machine, user, group, keepaliveInterval,
req);
if (!id.isPresent()) {
// Insert failed
return Optional.empty();
throw new RuntimeException("failed to create job");
}
int jobId = id.orElseThrow();

Expand Down Expand Up @@ -512,12 +514,13 @@ public Integer board(CreateBoard b) {
machine.name, owner, numBoards);

allocator.scheduleAllocateNow();
return getJob(jobId, conn).map(ji -> (Job) ji);
return getJob(jobId, conn).map(ji -> (Job) ji).orElseThrow(
() -> new RuntimeException("Error creating job!"));
});
}

@Override
public Optional<Job> createJob(String owner, CreateDescriptor descriptor,
public Job createJob(String owner, CreateDescriptor descriptor,
String machineName, List<String> tags, Duration keepaliveInterval,
byte[] originalRequest) {
return execute(conn -> createJobInGroup(
Expand All @@ -526,7 +529,7 @@ owner, getOnlyGroup(conn, owner), descriptor, machineName,
}

@Override
public Optional<Job> createJobInCollabSession(String owner,
public Job createJobInCollabSession(String owner,
String nmpiCollab, CreateDescriptor descriptor,
String machineName, List<String> tags, Duration keepaliveInterval,
byte[] originalRequest) {
Expand All @@ -537,39 +540,30 @@ public Optional<Job> createJobInCollabSession(String owner,
var job = execute(conn -> createJobInGroup(
owner, nmpiCollab, descriptor, machineName,
tags, keepaliveInterval, originalRequest));
// On failure to get job, just return; shouldn't happen as quota checked
// earlier, but just in case!
if (job.isEmpty()) {
return job;
}

quotaManager.associateNMPISession(job.get().getId(), session.getId(),
quotaManager.associateNMPISession(job.getId(), session.getId(),
quotaUnits);

// Return the job created
return job;
}

@Override
public Optional<Job> createJobForNMPIJob(String owner, int nmpiJobId,
public Job createJobForNMPIJob(String owner, int nmpiJobId,
CreateDescriptor descriptor, String machineName, List<String> tags,
Duration keepaliveInterval, byte[] originalRequest) {
var collab = quotaManager.mayUseNMPIJob(owner, nmpiJobId);
if (collab.isEmpty()) {
return Optional.empty();
throw new IllegalArgumentException("User cannot create session in "
+ "NMPI job" + nmpiJobId);
}
var quotaDetails = collab.get();

var job = execute(conn -> createJobInGroup(
owner, quotaDetails.collabId, descriptor, machineName,
tags, keepaliveInterval, originalRequest));
// On failure to get job, just return; shouldn't happen as quota checked
// earlier, but just in case!
if (job.isEmpty()) {
return job;
}

quotaManager.associateNMPIJob(job.get().getId(), nmpiJobId,
quotaManager.associateNMPIJob(job.getId(), nmpiJobId,
quotaDetails.quotaUnits);

// Return the job created
Expand Down Expand Up @@ -689,25 +683,74 @@ private static Optional<Integer> insertJob(Connection conn, MachineImpl m,
}

private Optional<MachineImpl> selectMachine(Connection conn,
String machineName, List<String> tags) {
CreateDescriptor descriptor, String machineName,
List<String> tags) {
if (nonNull(machineName)) {
return getMachine(machineName, false, conn);
} else if (!tags.isEmpty()) {
var m = getMachine(machineName, false, conn);
if (m.isPresent() && isAllocPossible(conn, descriptor, m.get())) {
return m;
}
return Optional.empty();
}

if (!tags.isEmpty()) {
for (var m : getMachines(conn, false).values()) {
var mi = (MachineImpl) m;
if (mi.tags.containsAll(tags)) {
/*
* Originally, spalloc checked if allocation was possible;
* we just assume that it is because there really isn't ever
* going to be that many different machines on one service.
*/
if (mi.tags.containsAll(tags)
&& isAllocPossible(conn, descriptor, mi)) {
return Optional.of(mi);
}
}
}
return Optional.empty();
}

private boolean isAllocPossible(final Connection conn,
final CreateDescriptor descriptor,
final MachineImpl m) {
return descriptor.visit(new CreateVisitor<Boolean>() {
@Override
public Boolean numBoards(CreateNumBoards nb) {
try (var getNBoards = conn.query(COUNT_FUNCTIONING_BOARDS)) {
var numBoards = getNBoards.call1(integer("c"), m.id)
.orElseThrow();
return numBoards >= nb.numBoards;
}
}

@Override
public Boolean dimensions(CreateDimensions d) {
try (var checkPossible = conn.query(checkRectangle)) {
return checkPossible.call1((r) -> true, d.width, d.height,
m.id, d.maxDead).isPresent();
}
}

@Override
public Boolean dimensionsAt(CreateDimensionsAt da) {
try (var checkPossible = conn.query(checkRectangleAt)) {
int board = locateBoard(conn, m.name, da, true);
return checkPossible.call1((r) -> true, board,
da.width, da.height, m.id, da.maxDead).isPresent();
} catch (IllegalArgumentException e) {
// This means the board doesn't exist on the given machine
return false;
}
}

@Override
public Boolean board(CreateBoard b) {
try (var check = conn.query(CHECK_LOCATION)) {
int board = locateBoard(conn, m.name, b, false);
return check.call1((r) -> true, m.id, board).isPresent();
} catch (IllegalArgumentException e) {
// This means the board doesn't exist on the given machine
return false;
}
}
});
}

@Override
public void purgeDownCache() {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,12 +192,13 @@ Optional<MachineDescription> getMachineInfo(@NotNull String machine,
* @param originalRequest
* The serialized original request, which will be stored in the
* database for later retrieval.
* @return Handle to the job, or {@code empty} if the job couldn't be made.
* @return The job created.
* @throws IllegalArgumentException if the job could not be created.
*/
Optional<Job> createJob(@NotNull String owner,
Job createJob(@NotNull String owner,
@Valid CreateDescriptor descriptor, String machineName,
List<String> tags, Duration keepaliveInterval,
byte[] originalRequest);
byte[] originalRequest) throws IllegalArgumentException;

/**
* Create a job for a user in a specific local group.
Expand All @@ -224,12 +225,13 @@ Optional<Job> createJob(@NotNull String owner,
* @param originalRequest
* The serialized original request, which will be stored in the
* database for later retrieval.
* @return Handle to the job, or {@code empty} if the job couldn't be made.
* @return The job created.
* @throws IllegalArgumentException if the job could not be created.
*/
Optional<Job> createJobInGroup(@NotNull String owner, @NotNull String group,
Job createJobInGroup(@NotNull String owner, @NotNull String group,
@Valid CreateDescriptor descriptor, String machineName,
List<String> tags, Duration keepaliveInterval,
byte[] originalRequest);
byte[] originalRequest) throws IllegalArgumentException;

/**
* Create a job for interactive use in an NMPI Collab Session.
Expand All @@ -256,13 +258,14 @@ Optional<Job> createJobInGroup(@NotNull String owner, @NotNull String group,
* @param originalRequest
* The serialized original request, which will be stored in the
* database for later retrieval.
* @return Handle to the job, or {@code empty} if the job couldn't be made.
* @return The job created.
* @throws IllegalArgumentException if the job could not be created.
*/
Optional<Job> createJobInCollabSession(@NotNull String owner,
Job createJobInCollabSession(@NotNull String owner,
@NotNull String nmpiCollab,
@Valid CreateDescriptor descriptor, String machineName,
List<String> tags, Duration keepaliveInterval,
byte[] originalRequest);
byte[] originalRequest) throws IllegalArgumentException;

/**
* Create a job for interactive use in an NMPI Collab Session.
Expand All @@ -289,13 +292,14 @@ Optional<Job> createJobInCollabSession(@NotNull String owner,
* @param originalRequest
* The serialized original request, which will be stored in the
* database for later retrieval.
* @return Handle to the job, or {@code empty} if the job couldn't be made.
* @return The job created.
* @throws IllegalArgumentException if the job could not be created.
*/
@PreAuthorize(IS_NMPI_EXEC)
Optional<Job> createJobForNMPIJob(@NotNull String owner, int nmpiJobId,
Job createJobForNMPIJob(@NotNull String owner, int nmpiJobId,
@Valid CreateDescriptor descriptor, String machineName,
List<String> tags, Duration keepaliveInterval,
byte[] originalRequest);
byte[] originalRequest) throws IllegalArgumentException;

/** Purge the cache of what boards are down. */
void purgeDownCache();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,17 +444,16 @@ private Object callOperation(Command cmd) throws Exception {
byte[] serialCmd = getJsonMapper().writeValueAsBytes(cmd);
switch (args.size()) {
case 0:
return createJobNumBoards(1, kwargs, serialCmd).orElse(null);
return createJobNumBoards(1, kwargs, serialCmd);
case 1:
return createJobNumBoards(parseDec(args, 0), kwargs, serialCmd)
.orElse(null);
return createJobNumBoards(parseDec(args, 0), kwargs, serialCmd);
case 2:
return createJobRectangle(parseDec(args, 0), parseDec(args, 1),
kwargs, serialCmd).orElse(null);
kwargs, serialCmd);
case TRIAD_COORD_COUNT:
return createJobSpecificBoard(new TriadCoords(parseDec(args, 0),
parseDec(args, 1), parseDec(args, 2)), kwargs,
serialCmd).orElse(null);
serialCmd);
default:
throw new Oops(
"unsupported number of arguments: " + args.size());
Expand Down Expand Up @@ -563,7 +562,7 @@ private Object callOperation(Command cmd) throws Exception {
* @throws TaskException
* If anything goes wrong.
*/
protected abstract Optional<Integer> createJobNumBoards(
protected abstract int createJobNumBoards(
@Positive int numBoards,
Map<@NotBlank String, @NotNull Object> kwargs, byte[] cmd)
throws TaskException;
Expand All @@ -583,7 +582,7 @@ protected abstract Optional<Integer> createJobNumBoards(
* @throws TaskException
* If anything goes wrong.
*/
protected abstract Optional<Integer> createJobRectangle(
protected abstract int createJobRectangle(
@ValidTriadWidth int width, @ValidTriadHeight int height,
Map<@NotBlank String, @NotNull Object> kwargs, byte[] cmd)
throws TaskException;
Expand All @@ -601,7 +600,7 @@ protected abstract Optional<Integer> createJobRectangle(
* @throws TaskException
* If anything goes wrong.
*/
protected abstract Optional<Integer> createJobSpecificBoard(
protected abstract int createJobSpecificBoard(
@Valid TriadCoords coords,
Map<@NotBlank String, @NotNull Object> kwargs, byte[] cmd)
throws TaskException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Future;

import javax.annotation.PostConstruct;
Expand Down Expand Up @@ -260,22 +259,22 @@ private static List<String> tags(Object src, boolean mayForceDefault) {
}

@Override
protected final Optional<Integer> createJobNumBoards(int numBoards,
protected final int createJobNumBoards(int numBoards,
Map<String, Object> kwargs, byte[] cmd) throws TaskException {
var maxDead = parseDec(kwargs.get("max_dead_boards"));
return createJob(new CreateNumBoards(numBoards, maxDead), kwargs, cmd);
}

@Override
protected final Optional<Integer> createJobRectangle(int width, int height,
protected final int createJobRectangle(int width, int height,
Map<String, Object> kwargs, byte[] cmd) throws TaskException {
var maxDead = parseDec(kwargs.get("max_dead_boards"));
return createJob(new CreateDimensions(width, height, maxDead), kwargs,
cmd);
}

@Override
protected final Optional<Integer> createJobSpecificBoard(TriadCoords coords,
protected final int createJobSpecificBoard(TriadCoords coords,
Map<String, Object> kwargs, byte[] cmd) throws TaskException {
return createJob(triad(coords.x, coords.y, coords.z), kwargs, cmd);
}
Expand All @@ -294,21 +293,18 @@ private static String getOwner(Map<String, Object> kwargs)
return owner;
}

private Optional<Integer> createJob(SpallocAPI.CreateDescriptor create,
private Integer createJob(SpallocAPI.CreateDescriptor create,
Map<String, Object> kwargs, byte[] cmd) throws TaskException {
var owner = getOwner(kwargs);
var keepalive = parseKeepalive((Number) kwargs.get("keepalive"));
var machineName = (String) kwargs.get("machine");
var ts = tags(kwargs.get("tags"), isNull(machineName));
var result = permit.authorize(() -> spalloc.createJobInGroup(
var job = permit.authorize(() -> spalloc.createJobInGroup(
permit.name, groupName, create, machineName, ts, keepalive,
cmd));
result.ifPresent(
j -> log.info(
"made compatibility-mode job {} "
+ "on behalf of claimed user {}",
j.getId(), owner));
return result.map(Job::getId);
log.info("made compatibility-mode job {} on behalf of claimed user {}",
job.getId(), owner);
return job.getId();
}

@Override
Expand Down
Loading
Loading