-
Notifications
You must be signed in to change notification settings - Fork 461
LocalityNew
- A batch consists of a sequence of files F1 ... Fn and a set of jobs J1 ... Jm operating on these files.
- Each job uses a contiguous set of files.
- A given file may be used by many jobs.
- The density of jobs in the file sequence may be variable.
- Several batches may be in progress concurrently, for the same or different applications.
- To complete batches quickly.
- To minimize the amount of data transfer to hosts.
The policy in which we dispatch the jobs in order essentially sends every file to every host, so it fails to achieve the 2nd goal.
The ideal policy would start each host at a different point in the job space, separated according to their speeds. This would potentially send each file only to a single host. However, it's impractical for various reasons: replication, unreliability of hosts, and unpredictability of their speed.
Instead, we use a policy in which the set of hosts is divided into teams, and each team works on a different area of the job space. Teams should have these properties:
- The fastest host in a team should be no faster than the total of the other hosts. Otherwise, it could get unboundedly far ahead of the others.
- Subject to the above, teams should as small as possible. A good size might be 10 or 20.
- The hosts in a team should belong to different users (for validation purposes).
Because of host churn, team membership is dynamic; e.g. a team may be empty for a period.
A cursor consists of
- a team of hosts
- a range of jobs
- status information (see below)
Note: we discussed having a separate notion of "job range", allowing cursors to move from one job range to another, and allowing job ranges to be subdivided. I think this is needlessly complex.
New tables:
batch
// this table already exists; we may need to add fields to it
batch_host // batch/host association table
host_id integer
batch_id integer
cursor_id integer
locality_cursor
batch_id integer
first_job_num integer
last_job_num integer
// range of jobs to be done
expavg_credit double
// sum of expavg_credit of hosts in the team
remaining_credit double
// estimated credit of unfinished jobs
// zero means all jobs finished
first_unfinished_job_num integer
// all jobs before this have been completed
first_ungenerated_job_num integer
// we've generated workunit records for all jobs before this
workunit (new fields)
cursor_id integer
job_num integer
To create a batch:
- create batch record
- based on # of hosts, # of jobs, and job density, create locality_cursor records
Define
est_time_left(cursor) = cursor.remaining_credit/cursor.expavg_credit
This is an estimate of the time needed to complete the cursor's jobs, given its current team.
For each batch:
If this is a new host (i.e. no batch_host record) then
- assign host to cursor for this batch with least greatest est_time_left().
- create batch_host record
- add host's expavg_credit to cursor's expavg_credit
Otherwise, consider moving this host to a different cursor. Let C = host's cursor, and let D = the cursor for which est_time_left() is greatest. If est_time_left(C) < .5*est_time_left(D), then move this host to D. (This policy may need to be refined a bit to reduce moving hosts between cursors).
For each batch:
Enumerate unsent results for this host's cursor in order of increasing job num (i.e. finish old jobs before starting new ones). Send as many as the host can handle.
Some type of synchronization is needed; maybe
- use a DB transaction (how much would this lock?)
- explicitly lock the locality_cursor record (is this possible?)
- use a semaphore per cursor
If the host has a sticky file that's not used by an unfinished job in its cursor, tell client to delete that file.
Note: names of sticky files should encode the batch and file number.
Loop over batches and cursors. Try to maintain a cushion of N unsent jobs per cursor. Start generating jobs at cursor.first_ungenerated_job_num.
When a workunit is completed (successfully or not):
while cursor.first_unfinished_job_num is finished
cursor.first_unfinished_job_num++
update cursor.remaining_credit
This runs periodically (every hour or day) and recomputes locality_cursor.expavg_credit. It doesn't have to be a separate program; it could be added to an existing daemon, like the feeder