Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new: Continuously pipe stdin during file hashing. #1864

Merged
merged 3 commits into from
Mar 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
- Updated `toolchain.default` in `moon.yml` to support a list of IDs.
- Updated generated JSON schemas at `.moon/cache/schemas` to dynamically include toolchain plugin
configuration.
- Updated file hashing (via `git hash-object`) to continously pipe stdin to avoid hanging processes.
- Deprecated `hasher.batchSize` in `.moon/workspace.yml`.

#### 🧩 Plugins

Expand All @@ -66,6 +68,11 @@
- Updated `migrate_turborepo_extension` to v0.1.6.
- Will no longer remove Turborepo configs. Pass `--cleanup` to remove them.

#### 🐞 Fixes

- Fixed an issue where file hashing (`git hash-object`) would hang when there are too many files
being hashed.

#### ⚙️ Internal

- Updated proto to [v0.47.2](https://github.com/moonrepo/proto/releases/tag/v0.47.0) (from 0.45.2).
Expand Down
1 change: 1 addition & 0 deletions crates/config/src/workspace/hasher_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ derive_enum!(
#[derive(Clone, Config, Debug, PartialEq)]
pub struct HasherConfig {
/// The number of files to include in each hash operation.
#[deprecated]
#[setting(default = 2500)]
pub batch_size: u16,

Expand Down
3 changes: 1 addition & 2 deletions crates/config/tests/workspace_config_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,6 @@ generator:
fn loads_defaults() {
let config = test_load_config(FILENAME, "hasher: {}", load_config_from_root);

assert_eq!(config.hasher.batch_size, 2500);
assert!(config.hasher.warn_on_missing_inputs);
}

Expand All @@ -558,7 +557,6 @@ hasher:
load_config_from_root,
);

assert_eq!(config.hasher.batch_size, 1000);
assert!(!config.hasher.warn_on_missing_inputs);
}

Expand Down Expand Up @@ -852,6 +850,7 @@ extensions:
use starbase_sandbox::locate_fixture;
use std::str::FromStr;

#[allow(deprecated)]
#[test]
fn loads_pkl() {
let config = test_config(locate_fixture("pkl"), |path| {
Expand Down
9 changes: 9 additions & 0 deletions crates/process/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub struct Command {

pub bin: OsString,

/// Continuously write to stdin and read from stdout.
pub continuous_pipe: bool,

pub cwd: Option<OsString>,

pub env: FxHashMap<OsString, Option<OsString>>,
Expand Down Expand Up @@ -47,6 +50,7 @@ impl Command {
Command {
bin: bin.as_ref().to_os_string(),
args: vec![],
continuous_pipe: false,
cwd: None,
env: FxHashMap::default(),
error_on_nonzero: true,
Expand Down Expand Up @@ -214,6 +218,11 @@ impl Command {
self.prefix.as_deref()
}

pub fn set_continuous_pipe(&mut self, state: bool) -> &mut Self {
self.continuous_pipe = state;
self
}

pub fn set_print_command(&mut self, state: bool) -> &mut Self {
self.print_command = state;
self
Expand Down
2 changes: 1 addition & 1 deletion crates/process/src/command_line.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl CommandLine {
}

if input.len() > 200 && !debug_input {
command.push("(truncated)");
command.push("(truncated input)");
} else {
command.push(&input);
}
Expand Down
156 changes: 142 additions & 14 deletions crates/process/src/exec_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,28 @@ use crate::output_to_error;
use crate::process_error::ProcessError;
use crate::process_registry::ProcessRegistry;
use crate::shared_child::SharedChild;
use miette::IntoDiagnostic;
use moon_common::color;
use rustc_hash::FxHashMap;
use std::env;
use std::ffi::{OsStr, OsString};
use std::path::PathBuf;
use std::process::{Output, Stdio};
use std::sync::{Arc, RwLock};
use std::time::Instant;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::{Child, Command as TokioCommand};
use tokio::task;
use tracing::{debug, enabled};
use tokio::task::{self, JoinHandle};
use tracing::{debug, enabled, trace};

impl Command {
pub async fn exec_capture_output(&mut self) -> miette::Result<Output> {
if self.continuous_pipe {
return self.exec_capture_continuous_output().await;
}

let registry = ProcessRegistry::instance();
let (mut command, line) = self.create_async_command();
let (mut command, line, instant) = self.create_async_command();

let child = if self.should_pass_stdin() {
command
Expand Down Expand Up @@ -57,6 +63,8 @@ impl Command {
error: Box::new(error),
});

self.post_log_command(instant, &shared_child);

registry.remove_running(shared_child).await;

let output = result?;
Expand All @@ -66,9 +74,109 @@ impl Command {
Ok(output)
}

pub async fn exec_capture_continuous_output(&mut self) -> miette::Result<Output> {
let registry = ProcessRegistry::instance();
let (mut command, line, instant) = self.create_async_command();

command
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped());

let child = command.spawn().map_err(|error| ProcessError::Capture {
bin: self.get_bin_name(),
error: Box::new(error),
})?;

let shared_child = registry.add_running(child).await;
let stdin = shared_child.take_stdin().await;
let stdout = shared_child.take_stdout().await;
let stderr = shared_child.take_stderr().await;

self.log_command(&line, &shared_child);

let items = self.input.drain(..).collect::<Vec<_>>();
let bin_name = self.get_bin_name();

let stdin_handle: JoinHandle<miette::Result<()>> = task::spawn(async move {
if let Some(mut stdin) = stdin {
for item in items {
stdin
.write_all(item.as_encoded_bytes())
.await
.map_err(|error| ProcessError::WriteInput {
bin: bin_name.clone(),
error: Box::new(error),
})?;
}

drop(stdin);
}

Ok(())
});

let stdout_handle: JoinHandle<miette::Result<Vec<String>>> = task::spawn(async move {
let mut logs = vec![];
let mut lines = BufReader::new(stdout.unwrap()).lines();

while let Ok(Some(line)) = lines.next_line().await {
logs.push(line);
}

Ok(logs)
});

let stderr_handle: JoinHandle<miette::Result<Vec<String>>> = task::spawn(async move {
let mut logs = vec![];
let mut lines = BufReader::new(stderr.unwrap()).lines();

while let Ok(Some(line)) = lines.next_line().await {
logs.push(line);
}

Ok(logs)
});

// Attempt to create the child output
let result = shared_child
.wait()
.await
.map_err(|error| ProcessError::Capture {
bin: self.get_bin_name(),
error: Box::new(error),
});

self.post_log_command(instant, &shared_child);

registry.remove_running(shared_child).await;

let status = result?;

stdin_handle.await.into_diagnostic()??;

let output = Output {
status,
stdout: stdout_handle
.await
.into_diagnostic()??
.join("\n")
.into_bytes(),
stderr: stderr_handle
.await
.into_diagnostic()??
.join("\n")
.into_bytes(),
};

self.handle_nonzero_status(&output, true)?;

Ok(output)
}

pub async fn exec_stream_output(&mut self) -> miette::Result<Output> {
let registry = ProcessRegistry::instance();
let (mut command, line) = self.create_async_command();
let (mut command, line, instant) = self.create_async_command();

let child = if self.should_pass_stdin() {
command.stdin(Stdio::piped());
Expand Down Expand Up @@ -100,6 +208,8 @@ impl Command {
error: Box::new(error),
});

self.post_log_command(instant, &shared_child);

registry.remove_running(shared_child).await;

let status = result?;
Expand All @@ -116,7 +226,7 @@ impl Command {

pub async fn exec_stream_and_capture_output(&mut self) -> miette::Result<Output> {
let registry = ProcessRegistry::instance();
let (mut command, line) = self.create_async_command();
let (mut command, line, instant) = self.create_async_command();

command
.stdin(if self.should_pass_stdin() {
Expand Down Expand Up @@ -221,6 +331,8 @@ impl Command {
error: Box::new(error),
});

self.post_log_command(instant, &shared_child);

registry.remove_running(shared_child).await;

let status = result?;
Expand Down Expand Up @@ -340,7 +452,7 @@ impl Command {
// Ok(output)
// }

fn create_async_command(&self) -> (TokioCommand, CommandLine) {
fn create_async_command(&self) -> (TokioCommand, CommandLine, Instant) {
let command_line = self.create_command_line();

let mut command = TokioCommand::new(&command_line.command[0]);
Expand All @@ -358,7 +470,7 @@ impl Command {
command.current_dir(cwd);
}

(command, command_line)
(command, command_line, Instant::now())
}

fn create_command_line(&self) -> CommandLine {
Expand Down Expand Up @@ -420,29 +532,45 @@ impl Command {
})
.collect();

let debug_input = env::var("MOON_DEBUG_PROCESS_INPUT").is_ok();
let input_size: Option<usize> = if self.input.is_empty() {
None
} else {
Some(self.input.iter().map(|i| i.len()).sum())
};

let mut line = line.to_string();
let line_size = line.len();

if line_size > 1000 && !debug_input {
line.truncate(1000);
line.push_str(&format!(" ... (and {} more bytes)", line_size - 1000));
}

debug!(
pid = child.id(),
shell = self.shell.as_ref().map(|sh| &sh.bin_name),
env = ?env_vars,
cwd = ?working_dir,
input_size,
"Running command {}",
color::shell(line.to_string())
color::shell(line)
);
}

fn post_log_command(&self, instant: Instant, child: &SharedChild) {
trace!(pid = child.id(), "Ran command in {:?}", instant.elapsed());
}

async fn write_input_to_child(
&self,
child: &mut Child,
line: &CommandLine,
) -> miette::Result<()> {
let input = line.input.join(OsStr::new(" "));

let mut stdin = child.stdin.take().unwrap_or_else(|| {
panic!("Unable to write stdin: {}", input.to_string_lossy());
});
let mut stdin = child.stdin.take().expect("Unable to write stdin!");

stdin
.write_all(input.as_encoded_bytes())
.write_all(line.input.join(OsStr::new(" ")).as_encoded_bytes())
.await
.map_err(|error| ProcessError::WriteInput {
bin: self.get_bin_name(),
Expand Down
6 changes: 1 addition & 5 deletions crates/task-hasher/src/task_hasher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,7 @@ impl<'task> TaskHasher<'task> {
.map(|file| file.to_string())
.collect::<Vec<_>>();

hashed_inputs.extend(
self.vcs
.get_file_hashes(&files, true, self.hasher_config.batch_size)
.await?,
);
hashed_inputs.extend(self.vcs.get_file_hashes(&files, true).await?);

self.content.inputs = hashed_inputs;
}
Expand Down
Loading
Loading