Skip to content

Commit

Permalink
Allow initialising shaders in parallel (#455)
Browse files Browse the repository at this point in the history
* Initialise shaders in parallel

* Add control and visibility over initialisation

* Clarify comment

* Fix wasm compilation

* Fix inverted name

* Allow configuring the number of threads directly

* Fix dodgy maths
  • Loading branch information
DJMcNab authored Feb 22, 2024
1 parent 67af4a1 commit 6fa114c
Show file tree
Hide file tree
Showing 7 changed files with 190 additions and 26 deletions.
3 changes: 2 additions & 1 deletion crates/tests/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{env, fs::File, path::Path, sync::Arc};
use std::{env, fs::File, num::NonZeroUsize, path::Path, sync::Arc};

use anyhow::{anyhow, bail, Result};
use vello::{
Expand Down Expand Up @@ -62,6 +62,7 @@ pub async fn render(scene: Scene, params: &TestParams) -> Result<Image> {
RendererOptions {
surface_format: None,
use_cpu: params.use_cpu,
num_init_threads: NonZeroUsize::new(1),
antialiasing_support: vello::AaSupport::area_only(),
},
)
Expand Down
2 changes: 2 additions & 0 deletions examples/headless/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{
fs::File,
num::NonZeroUsize,
path::{Path, PathBuf},
};

Expand Down Expand Up @@ -90,6 +91,7 @@ async fn render(mut scenes: SceneSet, index: usize, args: &Args) -> Result<()> {
RendererOptions {
surface_format: None,
use_cpu: args.use_cpu,
num_init_threads: NonZeroUsize::new(1),
antialiasing_support: vello::AaSupport::area_only(),
},
)
Expand Down
3 changes: 2 additions & 1 deletion examples/scenes/src/svg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ fn example_scene_of(file: PathBuf) -> ExampleScene {
.unwrap_or_else(|| "unknown".to_string());
ExampleScene {
function: Box::new(svg_function_of(name.clone(), move || {
std::fs::read_to_string(file).expect("failed to read svg file")
std::fs::read_to_string(&file)
.unwrap_or_else(|e| panic!("failed to read svg file {file:?}: {e}"))
})),
config: crate::SceneConfig {
animated: false,
Expand Down
4 changes: 4 additions & 0 deletions examples/with_bevy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::num::NonZeroUsize;

use bevy::render::{Render, RenderSet};
use bevy::utils::synccell::SyncCell;
use vello::kurbo::{Affine, Point, Rect, Stroke};
Expand Down Expand Up @@ -29,6 +31,8 @@ impl FromWorld for VelloRenderer {
device.wgpu_device(),
RendererOptions {
surface_format: None,
// TODO: We should ideally use the Bevy threadpool here
num_init_threads: NonZeroUsize::new(1),
antialiasing_support: vello::AaSupport::area_only(),
use_cpu: false,
},
Expand Down
17 changes: 17 additions & 0 deletions examples/with_winit/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// Also licensed under MIT license, at your choice.

use instant::{Duration, Instant};
use std::num::NonZeroUsize;
use std::{collections::HashSet, sync::Arc};

use anyhow::Result;
Expand Down Expand Up @@ -51,6 +52,21 @@ struct Args {
#[arg(long)]
/// Whether to use CPU shaders
use_cpu: bool,
/// Whether to force initialising the shaders serially (rather than spawning threads)
/// This has no effect on wasm, and defaults to 1 on macOS for performance reasons
///
/// Use `0` for an automatic choice
#[arg(long, default_value_t=default_threads())]
num_init_threads: usize,
}

fn default_threads() -> usize {
#![allow(unreachable_code)]
#[cfg(target_os = "mac")]
{
return 1;
}
0
}

struct RenderState<'s> {
Expand Down Expand Up @@ -538,6 +554,7 @@ fn run(
surface_format: Some(render_state.surface.format),
use_cpu,
antialiasing_support: vello::AaSupport::all(),
num_init_threads: NonZeroUsize::new(args.num_init_threads)
},
)
.expect("Could create renderer")
Expand Down
22 changes: 22 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ mod shaders;
#[cfg(feature = "wgpu")]
mod wgpu_engine;

use std::{num::NonZeroUsize, time::Instant};

/// Styling and composition primitives.
pub use peniko;
/// 2D geometry, with a focus on curves.
Expand Down Expand Up @@ -140,14 +142,33 @@ pub struct RendererOptions {
/// Represents the enabled set of AA configurations. This will be used to determine which
/// pipeline permutations should be compiled at startup.
pub antialiasing_support: AaSupport,

/// How many threads to use for initialisation of shaders.
///
/// Use `Some(1)` to use a single thread. This is recommended when on macOS
/// (see https://github.com/bevyengine/bevy/pull/10812#discussion_r1496138004)
///
/// Set to `None` to use a heuristic which will use many but not all threads
///
/// Has no effect on WebAssembly
pub num_init_threads: Option<NonZeroUsize>,
}

#[cfg(feature = "wgpu")]
impl Renderer {
/// Creates a new renderer for the specified device.
pub fn new(device: &Device, options: RendererOptions) -> Result<Self> {
let mut engine = WgpuEngine::new(options.use_cpu);
// If we are running in parallel (i.e. the number of threads is not 1)
if options.num_init_threads != NonZeroUsize::new(1) {
#[cfg(not(target_arch = "wasm32"))]
engine.use_parallel_initialisation();
}
let start = Instant::now();
let shaders = shaders::full_shaders(device, &mut engine, &options)?;
#[cfg(not(target_arch = "wasm32"))]
engine.build_shaders_if_needed(device, options.num_init_threads);
eprintln!("Building shaders took {:?}", start.elapsed());
let blit = options
.surface_format
.map(|surface_format| BlitPipeline::new(device, surface_format));
Expand Down Expand Up @@ -272,6 +293,7 @@ impl Renderer {
pub async fn reload_shaders(&mut self, device: &Device) -> Result<()> {
device.push_error_scope(wgpu::ErrorFilter::Validation);
let mut engine = WgpuEngine::new(self.options.use_cpu);
// We choose not to initialise these shaders in parallel, to ensure the error scope works correctly
let shaders = shaders::full_shaders(device, &mut engine, &self.options)?;
let error = device.pop_error_scope().await;
if let Some(error) = error {
Expand Down
165 changes: 141 additions & 24 deletions src/wgpu_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@ use crate::{
BufProxy, Command, Id, ImageProxy, Recording, ResourceProxy, ShaderId,
};

#[cfg(not(target_arch = "wasm32"))]
struct UninitialisedShader {
wgsl: Cow<'static, str>,
label: &'static str,
entries: Vec<wgpu::BindGroupLayoutEntry>,
shader_id: ShaderId,
}

#[derive(Default)]
pub struct WgpuEngine {
shaders: Vec<Shader>,
pool: ResourcePool,
bind_map: BindMap,
downloads: HashMap<Id, Buffer>,
#[cfg(not(target_arch = "wasm32"))]
shaders_to_initialise: Option<Vec<UninitialisedShader>>,
pub(crate) use_cpu: bool,
}

Expand Down Expand Up @@ -62,7 +72,7 @@ impl Shader {
} else if let Some(wgpu) = self.wgpu.as_ref() {
ShaderKind::Wgpu(wgpu)
} else {
panic!("no available shader")
panic!("no available shader for {}", self.label)
}
}
}
Expand Down Expand Up @@ -130,6 +140,88 @@ impl WgpuEngine {
}
}

/// Enable creating any remaining shaders in parallel
#[cfg(not(target_arch = "wasm32"))]
pub fn use_parallel_initialisation(&mut self) {
if self.shaders_to_initialise.is_some() {
return;
}
self.shaders_to_initialise = Some(Vec::new());
}

#[cfg(not(target_arch = "wasm32"))]
/// Initialise (in parallel) any shaders which are yet to be created
pub fn build_shaders_if_needed(
&mut self,
device: &Device,
num_threads: Option<std::num::NonZeroUsize>,
) {
use std::num::NonZeroUsize;

if let Some(mut new_shaders) = self.shaders_to_initialise.take() {
let num_threads = num_threads
.map(NonZeroUsize::get)
.unwrap_or_else(|| {
// Fallback onto a heuristic. This tries to not to use all threads.
// We keep the main thread blocked and not doing much whilst this is running,
// so we broadly leave two cores unused at the point of maximum parallelism
// (This choice is arbitrary, and could be tuned, although a 'proper' threadpool
// should probably be used instead)
std::thread::available_parallelism().map_or(2, |it| it.get().max(4) - 2)
})
.min(new_shaders.len());
eprintln!("Initialising in parallel using {num_threads} threads");
let remainder = new_shaders.split_off(num_threads);
let (tx, rx) = std::sync::mpsc::channel::<(ShaderId, WgpuShader)>();

// We expect each initialisation to take much longer than acquiring a lock, so we just use a mutex for our work queue
let work_queue = std::sync::Mutex::new(remainder.into_iter());
let work_queue = &work_queue;
std::thread::scope(|scope| {
let tx = tx;
new_shaders
.into_iter()
.map(|it| {
let tx = tx.clone();
std::thread::Builder::new()
.name("Vello shader initialisation worker thread".into())
.spawn_scoped(scope, move || {
let shader = Self::create_compute_pipeline(
device, it.label, it.wgsl, it.entries,
);
// We know the rx can only be closed if all the tx references are dropped
tx.send((it.shader_id, shader)).unwrap();
while let Ok(mut guard) = work_queue.lock() {
if let Some(value) = guard.next() {
drop(guard);
let shader = Self::create_compute_pipeline(
device,
value.label,
value.wgsl,
value.entries,
);
tx.send((value.shader_id, shader)).unwrap();
} else {
break;
}
}
// Another thread panicked or we finished.
// If another thread panicked, we ignore that here and finish our processing
drop(tx);
})
.expect("failed to spawn thread");
})
.for_each(drop);
// Drop the initial sender, to mean that there will be no more senders if and only if all other threads have finished
drop(tx);

while let Ok((id, value)) = rx.recv() {
self.shaders[id.0].wgpu = Some(value);
}
});
}
}

/// Add a shader.
///
/// This function is somewhat limited, it doesn't apply a label, only allows one bind group,
Expand Down Expand Up @@ -173,10 +265,6 @@ impl WgpuEngine {
}
}

let shader_module = device.create_shader_module(wgpu::ShaderModuleDescriptor {
label: Some(label),
source: wgpu::ShaderSource::Wgsl(wgsl),
});
let entries = layout
.iter()
.enumerate()
Expand Down Expand Up @@ -225,27 +313,24 @@ impl WgpuEngine {
}
})
.collect::<Vec<_>>();
let bind_group_layout = device.create_bind_group_layout(&wgpu::BindGroupLayoutDescriptor {
label: None,
entries: &entries,
});
let compute_pipeline_layout =
device.create_pipeline_layout(&wgpu::PipelineLayoutDescriptor {
label: None,
bind_group_layouts: &[&bind_group_layout],
push_constant_ranges: &[],
#[cfg(not(target_arch = "wasm32"))]
if let Some(uninit) = self.shaders_to_initialise.as_mut() {
let id = add(Shader {
label,
wgpu: None,
cpu: None,
})?;
uninit.push(UninitialisedShader {
wgsl,
label,
entries,
shader_id: id,
});
let pipeline = device.create_compute_pipeline(&wgpu::ComputePipelineDescriptor {
label: Some(label),
layout: Some(&compute_pipeline_layout),
module: &shader_module,
entry_point: "main",
});
return Ok(id);
}
let wgpu = Self::create_compute_pipeline(device, label, wgsl, entries);
add(Shader {
wgpu: Some(WgpuShader {
pipeline,
bind_group_layout,
}),
wgpu: Some(wgpu),
cpu: None,
label,
})
Expand Down Expand Up @@ -532,6 +617,38 @@ impl WgpuEngine {
pub fn free_download(&mut self, buf: BufProxy) {
self.downloads.remove(&buf.id);
}

fn create_compute_pipeline(
device: &Device,
label: &str,
wgsl: Cow<'_, str>,
entries: Vec<wgpu::BindGroupLayoutEntry>,
) -> WgpuShader {
let shader_module = device.create_shader_module(wgpu::ShaderModuleDescriptor {
label: Some(label),
source: wgpu::ShaderSource::Wgsl(wgsl),
});
let bind_group_layout = device.create_bind_group_layout(&wgpu::BindGroupLayoutDescriptor {
label: None,
entries: &entries,
});
let compute_pipeline_layout =
device.create_pipeline_layout(&wgpu::PipelineLayoutDescriptor {
label: None,
bind_group_layouts: &[&bind_group_layout],
push_constant_ranges: &[],
});
let pipeline = device.create_compute_pipeline(&wgpu::ComputePipelineDescriptor {
label: Some(label),
layout: Some(&compute_pipeline_layout),
module: &shader_module,
entry_point: "main",
});
WgpuShader {
pipeline,
bind_group_layout,
}
}
}

impl BindMap {
Expand Down

0 comments on commit 6fa114c

Please sign in to comment.