From b34f4d44151a4e36138e122cf64c8d31619b3d13 Mon Sep 17 00:00:00 2001 From: Rod S Date: Fri, 10 Nov 2023 12:17:23 -0800 Subject: [PATCH] Don't pointlessly block on glyph order. Add a few things to timing output to make it more helpful. --- fontc/src/change_detector.rs | 2 +- fontc/src/error.rs | 4 +- fontc/src/lib.rs | 2 +- fontc/src/main.rs | 4 +- fontc/src/timing.rs | 29 ++++-- fontc/src/workload.rs | 195 ++++++++++++++++++++++------------- 6 files changed, 153 insertions(+), 83 deletions(-) diff --git a/fontc/src/change_detector.rs b/fontc/src/change_detector.rs index 3ae8b127e..ec4433445 100644 --- a/fontc/src/change_detector.rs +++ b/fontc/src/change_detector.rs @@ -59,7 +59,7 @@ impl ChangeDetector { prev_inputs: Input, timer: &mut JobTimer, ) -> Result { - let time = create_timer(AnyWorkId::InternalTiming("new change detector")) + let time = create_timer(AnyWorkId::InternalTiming("new change detector"), 0) .queued() .run(); diff --git a/fontc/src/error.rs b/fontc/src/error.rs index 2776301d2..50f12bc1f 100644 --- a/fontc/src/error.rs +++ b/fontc/src/error.rs @@ -21,6 +21,6 @@ pub enum Error { TasksFailed(Vec<(AnyWorkId, String)>), #[error("Invalid regex")] BadRegex(#[from] regex::Error), - #[error("Unable to proceed")] - UnableToProceed, + #[error("Unable to proceed; {0} jobs stuck pending")] + UnableToProceed(usize), } diff --git a/fontc/src/lib.rs b/fontc/src/lib.rs index 076773cf0..e6605c99e 100644 --- a/fontc/src/lib.rs +++ b/fontc/src/lib.rs @@ -293,7 +293,7 @@ pub fn create_workload( change_detector: &mut ChangeDetector, timer: JobTimer, ) -> Result { - let time = create_timer(AnyWorkId::InternalTiming("Create workload")) + let time = create_timer(AnyWorkId::InternalTiming("Create workload"), 0) .queued() .run(); let mut workload = change_detector.create_workload(timer)?; diff --git a/fontc/src/main.rs b/fontc/src/main.rs index 30362527d..200fec987 100644 --- a/fontc/src/main.rs +++ b/fontc/src/main.rs @@ -23,7 +23,7 @@ fn main() { fn run() -> Result<(), Error> { let mut timer = JobTimer::new(Instant::now()); - let time = create_timer(AnyWorkId::InternalTiming("Init logger")) + let time = create_timer(AnyWorkId::InternalTiming("Init logger"), 0) .queued() .run(); env_logger::builder() @@ -43,7 +43,7 @@ fn run() -> Result<(), Error> { .init(); timer.add(time.complete()); - let time = create_timer(AnyWorkId::InternalTiming("Init config")) + let time = create_timer(AnyWorkId::InternalTiming("Init config"), 0) .queued() .run(); let args = Args::parse(); diff --git a/fontc/src/timing.rs b/fontc/src/timing.rs index beff22c09..75f16b608 100644 --- a/fontc/src/timing.rs +++ b/fontc/src/timing.rs @@ -8,7 +8,7 @@ use std::{collections::HashMap, io, thread::ThreadId, time::Instant}; /// Tracks time for jobs that run on many threads. /// -/// Meant for use with a threadpool. For example, build up [JobTime] for each +/// Meant for use with a threadpool. For example, build timing for each /// unit of work submitted to something like rayon and accumulate them here. /// /// Currently not threadsafe, meant to be used by a central orchestrator because @@ -76,6 +76,7 @@ impl JobTimer { for timing in timings { let job_start = (timing.run - self.t0).as_secs_f64(); let job_end = (timing.complete - self.t0).as_secs_f64(); + let job_queued = (timing.queued - self.t0).as_secs_f64(); let begin_pct = 100.0 * job_start / end_time.as_secs_f64(); let exec_pct = 100.0 * (timing.complete - timing.run).as_secs_f64() / end_time.as_secs_f64(); @@ -96,10 +97,13 @@ impl JobTimer { } writeln!( out, - "{:.2}s ({:.2}%) {:?}", - job_end - job_start, + "{:.0}ms ({:.2}%) {:?}\nqueued at {:.0}ms\nrun at {:.0}ms\nWave {}", + 1000.0 * (job_end - job_start), exec_pct, - timing.id + timing.id, + 1000.0 * job_queued, + 1000.0 * job_start, + timing.nth_wave, ) .unwrap(); writeln!(out, " ").unwrap(); @@ -189,9 +193,10 @@ fn color(id: &AnyWorkId) -> &'static str { /// /// Meant to be called when a job is runnable, that is it's ready to be /// submitted to an execution system such as a threadpool. -pub fn create_timer(id: AnyWorkId) -> JobTimeRunnable { +pub fn create_timer(id: AnyWorkId, nth_wave: usize) -> JobTimeRunnable { JobTimeRunnable { id, + nth_wave, runnable: Instant::now(), } } @@ -201,6 +206,7 @@ pub fn create_timer(id: AnyWorkId) -> JobTimeRunnable { /// It may have queued from t0 to launchable but now it's go-time! pub struct JobTimeRunnable { id: AnyWorkId, + nth_wave: usize, runnable: Instant, } @@ -209,6 +215,7 @@ impl JobTimeRunnable { pub fn queued(self) -> JobTimeQueued { JobTimeQueued { id: self.id, + nth_wave: self.nth_wave, runnable: self.runnable, queued: Instant::now(), } @@ -217,6 +224,7 @@ impl JobTimeRunnable { pub struct JobTimeQueued { id: AnyWorkId, + nth_wave: usize, runnable: Instant, queued: Instant, } @@ -229,6 +237,7 @@ impl JobTimeQueued { pub fn run(self) -> JobTimeRunning { JobTimeRunning { id: self.id, + nth_wave: self.nth_wave, thread: std::thread::current().id(), runnable: self.runnable, queued: self.queued, @@ -239,6 +248,7 @@ impl JobTimeQueued { pub struct JobTimeRunning { id: AnyWorkId, + nth_wave: usize, thread: ThreadId, runnable: Instant, queued: Instant, @@ -252,9 +262,10 @@ impl JobTimeRunning { pub fn complete(self) -> JobTime { JobTime { id: self.id, + nth_wave: self.nth_wave, thread_id: self.thread, _runnable: self.runnable, - _queued: self.queued, + queued: self.queued, run: self.run, complete: Instant::now(), } @@ -265,9 +276,10 @@ impl JobTimeRunning { #[derive(Debug)] pub struct JobTime { id: AnyWorkId, + nth_wave: usize, thread_id: ThreadId, _runnable: Instant, - _queued: Instant, + queued: Instant, run: Instant, complete: Instant, } @@ -278,9 +290,10 @@ impl JobTime { let now = Instant::now(); JobTime { id, + nth_wave: 0, thread_id: std::thread::current().id(), _runnable: now, - _queued: now, + queued: now, run: now, complete: now, } diff --git a/fontc/src/workload.rs b/fontc/src/workload.rs index 65163598c..fd7d453b9 100644 --- a/fontc/src/workload.rs +++ b/fontc/src/workload.rs @@ -5,7 +5,7 @@ use std::{ panic::AssertUnwindSafe, sync::{ atomic::{AtomicBool, Ordering}, - Arc, + Arc, Mutex, }, }; @@ -16,10 +16,10 @@ use fontir::{ orchestration::{Context as FeContext, WorkId as FeWorkIdentifier}, source::Input, }; -use log::{debug, trace}; +use log::{debug, trace, warn}; use crate::{ - timing::{create_timer, JobTime, JobTimer}, + timing::{create_timer, JobTime, JobTimeQueued, JobTimer}, work::{AnyAccess, AnyContext, AnyWork, AnyWorkError}, ChangeDetector, Error, }; @@ -134,7 +134,16 @@ impl<'a> Workload<'a> { } } - fn update_be_glyph_dependencies(&mut self, fe_root: &FeContext, glyph_name: GlyphName) { + /// When BE glyph jobs are initially created they don't know enough to set fine grained dependencies + /// so they depend on *all* IR glyphs. Once IR for a glyph completes we can refine that: + /// + /// * If the glyph doesn't emit to binary we don't need to do the BE work at all + /// * If the glyph has no components the BE for it doesn't use glyph order and needn't block on it + /// * If the glyph does have components we need to block on glyph order because that might alter them + /// * For example, flatten + /// + /// By minimizing dependencies we allow jobs to start earlier and execute with greater concurrency. + fn update_be_glyph_work(&mut self, fe_root: &FeContext, glyph_name: GlyphName) { let glyph = fe_root .glyphs .get(&FeWorkIdentifier::Glyph(glyph_name.clone())); @@ -144,17 +153,29 @@ impl<'a> Workload<'a> { .get_mut(&be_id) .expect("{be_id} should exist"); - let mut deps = HashSet::from([ - AnyWorkId::Fe(FeWorkIdentifier::StaticMetadata), - FeWorkIdentifier::GlyphOrder.into(), - ]); + if !glyph.emit_to_binary { + trace!("Skipping execution of {be_id:?}; it does not emit to binary"); + self.jobs_pending.remove(&be_id); + self.mark_also_completed(&be_id); + self.success.insert(be_id); + return; + } + let mut deps = HashSet::from([AnyWorkId::Fe(FeWorkIdentifier::StaticMetadata)]); + + let mut has_components = false; for inst in glyph.sources().values() { for component in inst.components.iter() { + has_components = true; deps.insert(FeWorkIdentifier::Glyph(component.base.clone()).into()); } } + // We don't *have* to wait on glyph order, but if we don't it delays the critical path + if has_components { + deps.insert(FeWorkIdentifier::GlyphOrder.into()); + } + trace!( "Updating {be_id:?} deps from {:?} to {deps:?}", be_job.read_access @@ -181,16 +202,12 @@ impl<'a> Workload<'a> { super::add_glyph_be_job(self, glyph_name.clone()); // Glyph order is done so all IR must be done. Copy dependencies from the IR for the same name. - self.update_be_glyph_dependencies(fe_root, glyph_name.clone()); + self.update_be_glyph_work(fe_root, glyph_name.clone()); } } - // When BE glyph jobs are initially created they don't know enough to set fine grained dependencies - // so they depend on *all* IR glyphs. Once IR for a glyph completes we know enough to refine that - // to just the glyphs our glyphs uses as components. This allows BE glyph jobs to run concurrently with - // unrelated IR jobs. if let AnyWorkId::Fe(FeWorkIdentifier::Glyph(glyph_name)) = success { - self.update_be_glyph_dependencies(fe_root, glyph_name); + self.update_be_glyph_work(fe_root, glyph_name); } } @@ -227,39 +244,23 @@ impl<'a> Workload<'a> { } } - pub fn launchable(&mut self) -> Vec { - let timing = create_timer(AnyWorkId::InternalTiming("Launchable")) + /// Populate launchable with jobs ready to run from highest to lowest priority + pub fn update_launchable(&mut self, launchable: &mut Vec) { + let timing = create_timer(AnyWorkId::InternalTiming("Launchable"), 0) .queued() .run(); - let mut has_kern = false; - let mut launchable: Vec<_> = self + launchable.clear(); + for id in self .jobs_pending .iter() - .filter_map(|(id, job)| { - if !job.running && self.can_run(job) { - if matches!(id, AnyWorkId::Fe(FeWorkIdentifier::Kerning)) { - has_kern = true; - } - Some(id.clone()) - } else { - None - } - }) - .collect(); - trace!("Launchable: {launchable:?}"); - - // https://github.com/googlefonts/fontc/issues/456: try to avoid kern as long pole - if has_kern { - let kern_idx = launchable - .iter() - .position(|id| matches!(id, AnyWorkId::Fe(FeWorkIdentifier::Kerning))) - .unwrap(); - launchable.swap(0, kern_idx); + .filter_map(|(id, job)| (!job.running && self.can_run(job)).then_some(id)) + { + launchable.push(id.clone()); } + trace!("Launchable: {:?}", launchable); self.timer.add(timing.complete()); - launchable } pub fn exec(mut self, fe_root: &FeContext, be_root: &BeContext) -> Result { @@ -270,53 +271,108 @@ impl<'a> Workload<'a> { // a flag we set if we panic let abort_queued_jobs = Arc::new(AtomicBool::new(false)); + let run_queue = Arc::new(Mutex::new( + Vec::<(AnyWork, JobTimeQueued, AnyContext)>::with_capacity(512), + )); + // Do NOT assign custom thread names because it makes flamegraph root each thread individually rayon::in_place_scope(|scope| { // Whenever a task completes see if it was the last incomplete dependency of other task(s) // and spawn them if it was // TODO timeout and die it if takes too long to make forward progress or we're spinning w/o progress + + // To avoid allocation every poll for work + let mut launchable = Vec::with_capacity(512.min(self.job_count)); + let mut nth_wave = 0; + while self.success.len() < self.job_count { // Spawn anything that is currently executable (has no unfulfilled dependencies) - let launchable = self.launchable(); + self.update_launchable(&mut launchable); if launchable.is_empty() && !self.jobs_pending.values().any(|j| j.running) { - return Err(Error::UnableToProceed); + if log::log_enabled!(log::Level::Warn) { + warn!("{}/{} jobs have succeeded, nothing is running, and nothing is launchable", self.success.len(), self.job_count); + for pending in self.jobs_pending.keys() { + warn!(" blocked: {pending:?}"); + } + } + return Err(Error::UnableToProceed(self.jobs_pending.len())); } - // Launch anything that needs launching - for id in launchable { - let timing = create_timer(id.clone()); + // Get launchables ready to run + if !launchable.is_empty() { + nth_wave += 1; + } - let job = self.jobs_pending.get_mut(&id).unwrap(); - log::trace!("Start {:?}", id); - let send = send.clone(); - job.running = true; - let work = job - .work - .take() - .expect("{id:?} ready to run but has no work?!"); - if !job.run { - if let Err(e) = send.send((id.clone(), Ok(()), JobTime::nop(id.clone()))) { - log::error!("Unable to write nop {id:?} to completion channel: {e}"); - //FIXME: if we can't send messages it means the receiver has dropped, - //which means we should... return? abort? + { + let mut run_queue = run_queue.lock().unwrap(); + for id in launchable.iter() { + let timing = create_timer(id.clone(), nth_wave); + + let job = self.jobs_pending.get_mut(id).unwrap(); + log::trace!("Start {:?}", id); + job.running = true; + let work = job + .work + .take() + .expect("{id:?} ready to run but has no work?!"); + if !job.run { + if let Err(e) = + send.send((id.clone(), Ok(()), JobTime::nop(id.clone()))) + { + log::error!( + "Unable to write nop {id:?} to completion channel: {e}" + ); + //FIXME: if we can't send messages it means the receiver has dropped, + //which means we should... return? abort? + } + continue; } - continue; + let work_context = AnyContext::for_work( + fe_root, + be_root, + id, + job.read_access.clone(), + job.write_access.clone(), + ); + + let timing = timing.queued(); + run_queue.push((work, timing, work_context)); } - let work_context = AnyContext::for_work( - fe_root, - be_root, - &id, - job.read_access.clone(), - job.write_access.clone(), - ); + // Try to prioritize the critical path based on --emit-timing observation + // , + run_queue.sort_by_cached_key(|(work, ..)| { + // Higher priority sorts last, which means run first due to pop + // We basically want things that block the glyph order => kern => fea sequence to go asap + match work.id() { + AnyWorkId::Be(BeWorkIdentifier::Features) => 99, + AnyWorkId::Fe(FeWorkIdentifier::Features) => 99, + AnyWorkId::Fe(FeWorkIdentifier::Kerning) => 99, + AnyWorkId::Fe(FeWorkIdentifier::GlyphOrder) => 99, + AnyWorkId::Fe(FeWorkIdentifier::PreliminaryGlyphOrder) => 99, + AnyWorkId::Fe(FeWorkIdentifier::StaticMetadata) => 99, + AnyWorkId::Fe(FeWorkIdentifier::GlobalMetrics) => 99, + AnyWorkId::Fe(FeWorkIdentifier::Glyph(..)) => 1, + _ => 0, + } + }); + } + + // Spawn for every job that's executable. Each spawn will pull one item from the run queue. + for _ in 0..launchable.len() { + let send = send.clone(); + let run_queue = run_queue.clone(); let abort = abort_queued_jobs.clone(); - let timing = timing.queued(); scope.spawn(move |_| { + let runnable = { run_queue.lock().unwrap().pop() }; + let Some((work, timing, work_context)) = runnable else { + panic!("Spawned more jobs than items available to run"); + }; + let id = work.id(); let timing = timing.run(); if abort.load(Ordering::Relaxed) { - log::trace!("Aborting {:?}", work.id()); + log::trace!("Aborting {:?}", id); return; } // # Unwind Safety @@ -457,8 +513,9 @@ impl<'a> Workload<'a> { #[cfg(test)] pub fn run_for_test(&mut self, fe_root: &FeContext, be_root: &BeContext) -> HashSet { let pre_success = self.success.clone(); + let mut launchable = Vec::new(); while !self.jobs_pending.is_empty() { - let launchable = self.launchable(); + self.update_launchable(&mut launchable); if launchable.is_empty() { log::error!("Completed:"); let mut success: Vec<_> = self.success.iter().collect(); @@ -484,7 +541,7 @@ impl<'a> Workload<'a> { } let id = &launchable[0]; - let timing = create_timer(id.clone()); + let timing = create_timer(id.clone(), 0); let job = self.jobs_pending.remove(id).unwrap(); if job.run { let timing = timing.queued();