Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multi-threading support #94

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ anyhow = "1.0.32"
atty = "0.2.14"
clap = { version = "3.0.7", features = ["derive"] }
colored = "2.0"
dyn-clone = "1.0.5"
ignore = "0.4"
Inflector = "0.11"
os_str_bytes = "6.0.1"
patricia_tree = "0.3.1"
regex = "1.5.1"


Expand Down
20 changes: 13 additions & 7 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ struct Options {
parse(from_os_str),
help = "The source path. Defaults to the working directory"
)]
path: Option<PathBuf>,
paths: Vec<PathBuf>,

#[clap(
long = "--no-regex",
Expand Down Expand Up @@ -176,7 +176,7 @@ pub fn run() -> Result<()> {
ignored,
ignored_file_types,
no_regex,
path,
mut paths,
pattern,
replacement,
selected_file_types,
Expand Down Expand Up @@ -217,11 +217,13 @@ pub fn run() -> Result<()> {
ignored_file_types,
};

let path = path.unwrap_or_else(|| Path::new(".").to_path_buf());
if path == PathBuf::from("-") {
if paths.is_empty() {
paths.push(Path::new(".").to_path_buf());
}
if paths.len() == 1 && paths.first().unwrap() == &PathBuf::from("-") {
run_on_stdin(query)
} else {
run_on_directory(console, path, settings, query)
run_on_directory(console, paths, settings, query)
}
}

Expand All @@ -241,12 +243,16 @@ fn run_on_stdin(query: Query) -> Result<()> {

fn run_on_directory(
console: Console,
path: PathBuf,
paths: Vec<PathBuf>,
settings: Settings,
query: Query,
) -> Result<()> {
let dry_run = settings.dry_run;
let mut directory_patcher = DirectoryPatcher::new(&console, &path, &settings);
let mut directory_patcher = DirectoryPatcher::new(
&console,
Box::new(paths.iter().map(|p| -> &Path { &*p })),
&settings,
);
directory_patcher.run(&query)?;
let stats = directory_patcher.stats();
if stats.total_replacements() == 0 {
Expand Down
150 changes: 131 additions & 19 deletions src/directory_patcher.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
use anyhow::{Context, Result};
use anyhow::{bail, Context, Result};
use dyn_clone::DynClone;
use ignore::WalkState;
use std::fmt::Debug;
use std::fs;
use std::path::Path;
use std::sync::{Arc, Mutex};

use crate::console::Console;
use crate::file_patcher::FilePatcher;
use crate::query::Query;
use crate::settings::Settings;
use crate::stats::Stats;

use self::path_deduplicator::PathDeduplicator;

mod path_deduplicator;

#[derive(Debug)]
/// Used to run replacement query on every text file present in a given path
/// ```rust
Expand All @@ -29,22 +38,32 @@ use crate::stats::Stats;
// Note: keep the dry_run: true in the doc test above or the integration test
// will fail ...
pub struct DirectoryPatcher<'a> {
path: &'a Path,
paths: Box<dyn PathsIter<'a> + 'a>,
settings: &'a Settings,
console: &'a Console,
stats: Stats,
}

pub trait PathsIter<'a>
where
Self: Debug + DynClone + Iterator<Item = &'a Path> + Send,
{
}

dyn_clone::clone_trait_object!(<'a> PathsIter<'a>);

impl<'a, T> PathsIter<'a> for T where Self: Debug + DynClone + Iterator<Item = &'a Path> + Send + 'a {}

impl<'a> DirectoryPatcher<'a> {
pub fn new(
console: &'a Console,
path: &'a Path,
paths: Box<dyn PathsIter<'a> + 'a>,
settings: &'a Settings,
) -> DirectoryPatcher<'a> {
let stats = Stats::default();
DirectoryPatcher {
console,
path,
paths,
settings,
stats,
}
Expand All @@ -53,41 +72,74 @@ impl<'a> DirectoryPatcher<'a> {
/// Run the given query on the selected files in self.path
pub fn run(&mut self, query: &Query) -> Result<()> {
let walker = self.build_walker()?;
for entry in walker {
let entry = entry.with_context(|| "Could not read directory entry")?;
if let Some(file_type) = entry.file_type() {
if file_type.is_file() {
self.patch_file(entry.path(), query)?;
let mut error_happened = Arc::new(Mutex::new(false));
walker.run(|| {
let error_happened = error_happened.clone();
let console = self.console;
let stats = &self.stats;
let settings = &self.settings;
Box::new(move |entry| -> WalkState {
let res = (|| -> Result<()> {
let entry = entry.with_context(|| "Could not read directory entry")?;
if let Some(file_type) = entry.file_type() {
if file_type.is_file() {
Self::patch_file(console, stats, settings, entry.path(), query)?;
}
}
Ok(())
})();

match res {
Ok(()) => WalkState::Continue,
Err(e) => {
*error_happened.lock().unwrap() = true;
console.print_error(&format!("{:?}", e));
WalkState::Quit
}
}
}
})
});
let error_happened = *Arc::get_mut(&mut error_happened)
.expect("no references to error flag expected after dir walking")
.get_mut()
.unwrap();
if error_happened {
bail!("one or more directory walking operations failed, see above for more details")
} else {
Ok(())
}
Ok(())
}

pub fn stats(self) -> Stats {
self.stats
}

pub(crate) fn patch_file(&mut self, entry: &Path, query: &Query) -> Result<()> {
let file_patcher = FilePatcher::new(self.console, entry, query)?;
pub(crate) fn patch_file(
console: &Console,
stats: &Stats,
settings: &Settings,
entry: &Path,
query: &Query,
) -> Result<()> {
let file_patcher = FilePatcher::new(console, entry, query)?;
let file_patcher = match file_patcher {
None => return Ok(()),
Some(f) => f,
};
let num_replacements = file_patcher.num_replacements();
if num_replacements != 0 {
self.console.print_message("\n");
console.print_message("\n");
}
let num_lines = file_patcher.num_lines();
self.stats.update(num_lines, num_replacements);
if self.settings.dry_run {
stats.update(num_lines, num_replacements);
if settings.dry_run {
return Ok(());
}
file_patcher.run()?;
Ok(())
}

fn build_walker(&self) -> Result<ignore::Walk> {
fn build_walker(&self) -> Result<ignore::WalkParallel> {
let mut types_builder = ignore::types::TypesBuilder::new();
types_builder.add_defaults();
let mut count: u32 = 0;
Expand Down Expand Up @@ -116,7 +168,19 @@ impl<'a> DirectoryPatcher<'a> {
}
}
let types_matcher = types_builder.build()?;
let mut walk_builder = ignore::WalkBuilder::new(&self.path);

let mut paths = self.paths.clone();

let mut walk_builder = ignore::WalkBuilder::new(
paths
.next()
.expect("internal error: expected at least one path"),
);

for path in paths {
walk_builder.add(path);
}

walk_builder.types(types_matcher);
// Note: the walk_builder configures the "ignore" settings of the Walker,
// hence the negations
Expand All @@ -126,6 +190,54 @@ impl<'a> DirectoryPatcher<'a> {
if self.settings.hidden {
walk_builder.hidden(false);
}
Ok(walk_builder.build())

let path_deduplicator = Arc::new(Mutex::new(PathDeduplicator::new()));
walk_builder.filter_entry(move |dir_entry| {
fs::canonicalize(dir_entry.path()).map_or(false, |abs_path_buf| {
let was_not_seen_before =
path_deduplicator.lock().unwrap().insert_path(&abs_path_buf);
was_not_seen_before
})
});

// NOTE(erichdongubler): `walkdir` parallel API lets us do fancy things like skipping
// duplicate canonicalized entries, which we absolutely need with specification of multiple
// paths for walking. However, we only use a single thread here for now because there's
// a few issues to tackle with enabling multiple threads:
//
// - Blocker: Console printing is only synchronized for individual write operations --
// which is not good enough when we have entire blocks of output that need to stay
// together.
// - Minor: Design thinking should be given to the following issues:
// - Errors for any reason in old logic halted the entire search-and-replace operation.
// However, we can no [longer guarantee that there won't be a few straggler
// operations][stragglers] before `walkdir` quits its iteration flow when we encounter
// an error.
//
// [stragglers]: https://docs.rs/ignore/latest/ignore/enum.WalkState.html#variant.Quit
//
// This is probably fine to just note as expected behavior. However, we also have
// an opportunity for user-defined behavior for this!
//
// Idea: expose a `--on-replace-error=(report-and-continue|ignore|stop)` flag.
// - How to expose the number of threads used to a user?
// - Idea: Stick to 1 by default for now, to keep behavior until a new breaking version.
// - Idea: allow specifying non-zero number, or "max", ex. `--num-threads=(<1..n>|max)`,
// `-j` for short like with many other *nix tools.
// - How to print results -- the first alternatives that come to mind are:
// - Approach 1: Print replacement reports per file one-at-a-time, queueing them once
// the entire file is processed.
// - Approach 2: Print replacement reports per file one-at-a-time, queueing individual
// line replacements as they are processed.
// - Pro: Slightly more responsive UI?
// - Con: Once a file is picked for reporting, even finished reports can't print
// until the picked one is done. This could have some pathological UX compared to
// approach 1.
// - Approach 3: Print replacements reports after all of them have been received.
// - Almost certainly not what we want -- the most memory and waiting to show the
// user something.
walk_builder.threads(1);

Ok(walk_builder.build_parallel())
}
}
22 changes: 22 additions & 0 deletions src/directory_patcher/path_deduplicator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::path::Path;

use os_str_bytes::RawOsStr;
use patricia_tree::PatriciaSet;

#[derive(Debug, Default)]
pub struct PathDeduplicator {
set: PatriciaSet,
}

impl PathDeduplicator {
pub fn new() -> Self {
Self::default()
}

// Returns `true` if the given `path` was called for this instance before.
pub fn insert_path(&mut self, path: &Path) -> bool {
let Self { set } = self;
let raw = RawOsStr::new(path.as_os_str());
set.insert(raw.as_raw_bytes())
}
}
Loading