Skip to content

Commit

Permalink
Merge pull request #1216 from SpiNNakerManchester/fix_tombstone
Browse files Browse the repository at this point in the history
Fix tombstone
  • Loading branch information
rowleya authored Feb 4, 2025
2 parents 1bcfd50 + 6130668 commit 5612108
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 600 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,14 @@ private Stream<HistoricalJob> jobs() {
return jobs.stream().filter(Objects::nonNull);
}

private Stream<Integer> nmpiJobs() {
return jobs().map(j -> j.nmpiJobId).filter(Objects::nonNull);
}

private Stream<Integer> nmpiSessions() {
return jobs().map(j -> j.nmpiSessionId).filter(Objects::nonNull);
}

/**
* @return The number of job records to copy over to the historical
* database.
Expand Down Expand Up @@ -850,6 +858,10 @@ private class HistoricalJob {

String groupName;

Integer nmpiJobId;

Integer nmpiSessionId;

HistoricalJob(Row row) {
jobId = row.getInt("job_id");
machineId = row.getInt("machine_id");
Expand All @@ -870,6 +882,8 @@ private class HistoricalJob {
userName = row.getString("user_name");
groupId = row.getInt("group_id");
groupName = row.getString("group_name");
nmpiJobId = row.getInteger("nmpi_job_id");
nmpiSessionId = row.getInteger("nmpi_session_id");
}

Object[] args() {
Expand All @@ -878,7 +892,7 @@ Object[] args() {
width, height, depth, allocatedRoot, keepaliveInterval,
keepaliveHost, deathReason, deathTimestamp, originalRequest,
allocationTimestamp, allocationSize, machineName, userName,
groupId, groupName
groupId, groupName, nmpiJobId, nmpiSessionId
};
}
}
Expand All @@ -903,6 +917,8 @@ private Copied tombstone(Connection conn, Connection histConn) {
var readAllocs = conn.query(READ_HISTORICAL_ALLOCS);
var deleteJobs = conn.update(DELETE_JOB_RECORD);
var deleteAllocs = conn.update(DELETE_ALLOC_RECORD);
var deleteNMPIJob = conn.update(DELETE_NMPI_JOB);
var deleteNMPISession = conn.update(DELETE_NMPI_SESSION);
var writeJobs = histConn.update(WRITE_HISTORICAL_JOBS);
var writeAllocs = histConn.update(WRITE_HISTORICAL_ALLOCS)) {
var grace = historyProps.getGracePeriod();
Expand All @@ -914,6 +930,8 @@ private Copied tombstone(Connection conn, Connection histConn) {
copied.jobs().forEach((j) -> writeJobs.call(j.args()));
});
conn.transaction(() -> {
copied.nmpiJobs().forEach(deleteNMPIJob::call);
copied.nmpiSessions().forEach(deleteNMPISession::call);
copied.allocs().forEach((a) -> deleteAllocs.call(a.allocId));
copied.jobs().forEach((j) -> deleteJobs.call(j.jobId));
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,21 @@ public abstract class SQLQueries {
protected static final String DELETE_ALLOC_RECORD =
"DELETE FROM old_board_allocations WHERE alloc_id = :alloc_id";

/**
* Actually delete an NMPI job record. Only called by the data tombstone-r.
*/
@Parameter("job_id")
protected static final String DELETE_NMPI_JOB =
"DELETE FROM job_nmpi_job WHERE job_id = :job_id";

/**
* Actually delete an NMPI session record.
* Only called by the data tombstone-r.
*/
@Parameter("job_id")
protected static final String DELETE_NMPI_SESSION =
"DELETE FROM job_nmpi_session WHERE job_id = :job_id";

/**
* Read the blacklisted chips for a board.
*
Expand Down Expand Up @@ -2273,17 +2288,23 @@ public abstract class SQLQueries {
@ResultColumn("user_name")
@ResultColumn("group_id")
@ResultColumn("group_name")
@ResultColumn("nmpi_job_id")
@ResultColumn("nmpi_session_id")
protected static final String READ_HISTORICAL_JOBS =
"SELECT job_id, machine_id, owner, create_timestamp, "
+ "jobs.width as width, jobs.height as height, jobs.depth as depth,"
+ "allocated_root, keepalive_interval, keepalive_host, "
+ "death_reason, death_timestamp, original_request, "
+ "allocation_timestamp, allocation_size, "
+ "machine_name, user_name, group_id, group_name "
+ "machine_name, user_name, group_id, group_name, "
+ "job_nmpi_job.nmpi_job_id as nmpi_job_id, "
+ "job_nmpi_session.session_id as nmpi_session_id "
+ "FROM jobs "
+ "JOIN user_groups USING (group_id) "
+ "JOIN machines USING (machine_id) "
+ "JOIN user_info ON jobs.owner = user_info.user_id "
+ "LEFT JOIN job_nmpi_job USING (job_id) "
+ "LEFT JOIN job_nmpi_session USING (job_id) "
+ "WHERE death_timestamp + :grace_period < UNIX_TIMESTAMP()";

/**
Expand Down Expand Up @@ -2320,20 +2341,24 @@ public abstract class SQLQueries {
@Parameter("owner_name")
@Parameter("group_id")
@Parameter("group_name")
@Parameter("nmpi_job_id")
@Parameter("nmpi_session_id")
protected static final String WRITE_HISTORICAL_JOBS =
"INSERT IGNORE INTO jobs( "
+ "job_id, machine_id, owner, create_timestamp, "
+ "width, height, depth, root_id, "
+ "keepalive_interval, keepalive_host, "
+ "death_reason, death_timestamp, "
+ "original_request, allocation_timestamp, allocation_size, "
+ "machine_name, owner_name, group_id, group_name) "
+ "machine_name, owner_name, group_id, group_name, "
+ "nmpi_job_id, nmpi_session_id) "
+ "VALUES(:job_id, :machine_id, :owner, :create_timestamp, "
+ ":width, :height, :depth, :root_id, "
+ ":keepalive_interval, :keepalive_host, "
+ ":death_reason, :death_timestamp, "
+ ":original_request, :allocation_timestamp, :allocation_size, "
+ ":machine_name, :owner_name, :group_id, :group_name)";
+ ":machine_name, :owner_name, :group_id, :group_name, "
+ ":nmpi_job_id, :nmpi_session_id)";

/**
* Set the NMPI session for a Job.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ CREATE TABLE IF NOT EXISTS jobs(
death_timestamp - allocation_timestamp) VIRTUAL,
resources_used INTEGER GENERATED ALWAYS AS ( -- generated column
lifetime_duration * allocation_size) VIRTUAL,
nmpi_job_id INTEGER,
nmpi_session_id INTEGER,
UNIQUE INDEX (job_id ASC)
);

Expand Down
48 changes: 0 additions & 48 deletions SpiNNaker-allocserv/src/main/resources/spalloc-tombstone.sql

This file was deleted.

Loading

0 comments on commit 5612108

Please sign in to comment.