Skip to content

Commit

Permalink
set up for writing into a single file
Browse files Browse the repository at this point in the history
  • Loading branch information
Tehforsch committed Sep 21, 2023
1 parent 1e480a7 commit bb3b475
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 122 deletions.
4 changes: 3 additions & 1 deletion src/io/input/file_distribution.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use bevy_ecs::system::Resource;

use crate::communication::Rank;

#[derive(Debug)]
Expand All @@ -22,7 +24,7 @@ impl Region {
}
}

#[derive(Debug)]
#[derive(Debug, Resource)]
pub struct RankAssignment {
pub regions: Vec<Region>,
}
Expand Down
2 changes: 1 addition & 1 deletion src/io/input/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod attribute;
mod file_distribution;
pub mod file_distribution;
#[cfg(test)]
mod tests;

Expand Down
31 changes: 20 additions & 11 deletions src/io/output/attribute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ use bevy_ecs::schedule::SystemDescriptor;
use hdf5::H5Type;

use super::plugin::IntoOutputSystem;
use super::OutputFile;
use super::timer::Timer;
use super::FileWithRegion;
use super::OutputFiles;
use crate::named::Named;

pub trait ToAttribute: Named + Resource {
Expand All @@ -27,19 +29,26 @@ impl<T: Named> Named for Attribute<T> {
}

impl<T: ToAttribute> IntoOutputSystem for Attribute<T> {
fn system() -> SystemDescriptor {
write_attribute::<T>.into_descriptor()
fn create_system() -> SystemDescriptor {
write_attribute::<T>
.into_descriptor()
.with_run_criteria(Timer::run_criterion)
}

fn write_system() -> SystemDescriptor {
(|| {}).into_descriptor()
}
}

fn write_attribute<T: ToAttribute>(res: Res<T>, file: ResMut<OutputFile>) {
let f = file.f.as_ref().unwrap();
let attr = f
.new_attr::<T::Output>()
.shape(())
.create(T::name())
.unwrap();
attr.write_scalar(&res.to_value()).unwrap();
fn write_attribute<T: ToAttribute>(res: Res<T>, file: ResMut<OutputFiles>) {
for FileWithRegion { file, .. } in file.0.as_ref().unwrap().iter() {
let attr = file
.new_attr::<T::Output>()
.shape(())
.create(T::name())
.unwrap();
attr.write_scalar(&res.to_value()).unwrap();
}
}

// The poor man's procedural macro
Expand Down
213 changes: 194 additions & 19 deletions src/io/output/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,60 @@ pub mod timer;

use std::fs;
use std::path::Path;
use std::path::PathBuf;

use bevy_ecs::component::Component;
use bevy_ecs::prelude::Res;
use bevy_ecs::prelude::ResMut;
use bevy_ecs::prelude::Resource;
use bevy_ecs::system::NonSend;
use hdf5::Dataset;
use hdf5::File;
use log::info;
use mpi::traits::CommunicatorCollectives;

pub use self::attribute::Attribute;
pub use self::attribute::ToAttribute;
use self::parameters::OutputParameters;
pub use self::plugin::OutputPlugin;
use self::timer::Timer;
use crate::communication::WorldRank;
use super::input::file_distribution::Region;
use super::to_dataset::ToDataset;
use super::DatasetDescriptor;
use super::OutputDatasetDescriptor;
use crate::communication::MPI_UNIVERSE;
use crate::io::input::file_distribution::RankAssignment;
use crate::parameter_plugin::ParameterFileContents;
use crate::prelude::Particles;
use crate::prelude::WorldRank;
use crate::prelude::WorldSize;
use crate::units::Dimension;

pub const SCALE_FACTOR_IDENTIFIER: &str = "scale_factor_si";
pub const LENGTH_IDENTIFIER: &str = "scaling_length";
pub const TIME_IDENTIFIER: &str = "scaling_time";
pub const MASS_IDENTIFIER: &str = "scaling_mass";
pub const TEMPERATURE_IDENTIFIER: &str = "scaling_temperature";
pub const H_SCALING_IDENTIFIER: &str = "scaling_h";
pub const A_SCALING_IDENTIFIER: &str = "scaling_a";

// Output order:
// Output proceeds as follows
// 1. Main rank creates files
// 2. Main rank creates datasets with correct shape
// 3. Main rank creates attributes to the newly created datasets
// 4. Main rank closes files
// 5. All ranks open files
// 6. All ranks write data
// 7. All ranks close files

#[derive(Default, Resource)]
pub(super) struct OutputFile {
pub f: Option<File>,
pub struct OutputFiles(pub Option<Vec<FileWithRegion>>);

#[derive(Debug)]
pub struct FileWithRegion {
file: File,
region: Region,
}

fn write_used_parameters_system(
Expand Down Expand Up @@ -74,31 +109,171 @@ fn make_snapshot_dir(snapshot_dir: &Path) {
.unwrap_or_else(|_| panic!("Failed to create snapshot dir: {snapshot_dir:?}"));
}

fn open_file_system(
mut file: ResMut<OutputFile>,
rank: Res<WorldRank>,
world_size: Res<WorldSize>,
fn get_output_files(
parameters: Res<OutputParameters>,
output_timer: Res<Timer>,
) {
assert!(file.f.is_none());
let rank_padding = ((**world_size as f64).log10().floor() as usize) + 1;
assignment: Res<RankAssignment>,
get_file: impl Fn(PathBuf) -> hdf5::Result<File>,
) -> Vec<FileWithRegion> {
let file_index_padding = ((parameters.num_output_files as f64).log10().floor() as usize) + 1;
let snapshot_name = format!(
"{:0snap_padding$}",
output_timer.snapshot_num(),
snap_padding = parameters.snapshot_padding
);
let snapshot_dir = parameters.snapshot_dir().join(&snapshot_name);
make_snapshot_dir(&snapshot_dir);
let filename = &format!(
"{:0rank_padding$}.hdf5",
rank.0,
rank_padding = rank_padding
);
info!("Writing snapshot: {}", &snapshot_name);
file.f = Some(File::create(snapshot_dir.join(filename)).expect("Failed to open output file"));
assignment
.regions
.iter()
.map(|region| {
let filename = &format!(
"{:0file_index_padding$}.hdf5",
region.file_index,
file_index_padding = file_index_padding
);
let file = get_file(snapshot_dir.join(filename)).expect("Failed to open output file");
FileWithRegion {
file,
region: region.clone(),
}
})
.collect()
}

fn create_file_system(
mut file: ResMut<OutputFiles>,
parameters: Res<OutputParameters>,
output_timer: Res<Timer>,
assignment: Res<RankAssignment>,
) {
info!("Writing snapshot: {}", &output_timer.snapshot_num());
assert!(file.0.is_none());
file.0 = Some(get_output_files(
parameters,
output_timer,
assignment,
File::create,
));
}

fn open_file_system(
mut file: ResMut<OutputFiles>,
parameters: Res<OutputParameters>,
output_timer: Res<Timer>,
assignment: Res<RankAssignment>,
) {
assert!(file.0.is_none());
file.0 = Some(get_output_files(
parameters,
output_timer,
assignment,
File::open_rw,
))
}

fn close_file_system(mut file: ResMut<OutputFiles>) {
file.0 = None;
}

pub fn create_dataset_system<T: Component + ToDataset>(
file: ResMut<OutputFiles>,
descriptor: NonSend<OutputDatasetDescriptor<T>>,
) {
let files = file.0.as_ref().unwrap();
create_dataset_in_files::<T>(files, &descriptor);
}

pub fn create_dataset_in_files<T: ToDataset>(
files: &[FileWithRegion],
descriptor: &DatasetDescriptor,
) {
for FileWithRegion { file, region } in files.iter() {
let dataset = file
.new_dataset::<T>()
.shape(&[region.dataset_size])
.create(descriptor.dataset_name())
.expect("Failed to create dataset");
add_dimension_attrs::<T>(&dataset);
}
}

pub fn write_dataset_system<T: Component + ToDataset>(
query: Particles<&T>,
file: ResMut<OutputFiles>,
descriptor: NonSend<OutputDatasetDescriptor<T>>,
) {
let files = file.0.as_ref().unwrap();
let data: Vec<T> = query.iter().cloned().collect();
write_dataset_to_files(data, files, &descriptor);
}

pub fn write_dataset_to_files<T: ToDataset>(
data: Vec<T>,
files: &[FileWithRegion],
descriptor: &DatasetDescriptor,
) {
for FileWithRegion { file, region } in files.iter() {
let dataset = file
.dataset(&descriptor.dataset_name())
.expect("Failed to open dataset");
dataset
.write_slice(&data, region.start..region.end)
.expect("Failed to write slice to dataset");
}
}

pub fn add_dimension_attrs<T: ToDataset>(dataset: &Dataset) {
let attr = dataset
.new_attr::<f64>()
.shape(())
.create(SCALE_FACTOR_IDENTIFIER)
.unwrap();
let dimension = T::dimension();
let scale_factor = dimension.base_conversion_factor();
attr.write_scalar(&scale_factor).unwrap();
// Unpack this slightly awkwardly here to make sure that we
// remember to extend it once more units are added to the
// Dimension struct
let Dimension {
length,
time,
mass,
temperature,
h,
a,
} = dimension;
write_dimension(dataset, LENGTH_IDENTIFIER, length);
write_dimension(dataset, TIME_IDENTIFIER, time);
write_dimension(dataset, MASS_IDENTIFIER, mass);
write_dimension(dataset, TEMPERATURE_IDENTIFIER, temperature);
write_dimension(dataset, H_SCALING_IDENTIFIER, h);
write_dimension(dataset, A_SCALING_IDENTIFIER, a);
}

fn close_file_system(mut file: ResMut<OutputFile>) {
file.f = None;
fn write_dimension(dataset: &Dataset, identifier: &str, dimension: i32) {
let attr = dataset
.new_attr::<i32>()
.shape(())
.create(identifier)
.unwrap();
attr.write_scalar(&dimension).unwrap();
}

pub fn init_wait_for_other_ranks_system(world_size: Res<WorldSize>, rank: Res<WorldRank>) {
let world = MPI_UNIVERSE.world();
for i in 0..**world_size {
if i < **rank as usize {
world.barrier();
}
}
}

pub fn finish_wait_for_other_ranks_system(world_size: Res<WorldSize>, rank: Res<WorldRank>) {
let world = MPI_UNIVERSE.world();
for i in 0..**world_size {
if i >= **rank as usize {
world.barrier();
}
}
}
Loading

0 comments on commit bb3b475

Please sign in to comment.