Skip to content

Commit

Permalink
Merge pull request #8 from triarius/optional-tee
Browse files Browse the repository at this point in the history
Make the tee threads optional
  • Loading branch information
triarius authored Sep 13, 2024
2 parents 949edcb + 9a6a65c commit 8db361c
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 75 deletions.
157 changes: 84 additions & 73 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ use std::{
thread,
};

fn io_streams<W: Write + Send + 'static>(
writer: W,
log_path: Option<&Path>,
) -> Result<(Stdio, Vec<Box<dyn Write + Send>>)> {
match log_path {
Some(path) => {
let file = File::create(path)?;
Ok((Stdio::piped(), vec![Box::new(writer), Box::new(file)]))
}
None => Ok((Stdio::inherit(), vec![Box::new(writer)])),
}
}

/// Run a process and tee its stdin, stdout, and stderr to the given files.
///
/// # Errors
Expand All @@ -26,85 +39,83 @@ pub fn run(
stdout_log_path: Option<&Path>,
stderr_log_path: Option<&Path>,
) -> Result<i32> {
let in_io = stdout_log_path.map_or(Stdio::inherit(), |_| Stdio::piped());
let (out_io, mut out_writers) = io_streams(stdout(), stdout_log_path)?;
let (err_io, mut err_writers) = io_streams(stderr(), stderr_log_path)?;

let mut child = Command::new(cmd)
.args(args)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(in_io)
.stdout(out_io)
.stderr(err_io)
.spawn()?;

let child_out = child
.stdout
.take()
.ok_or_else(|| eyre!("child stdout is not piped"))?;

let child_err = child
.stderr
.take()
.ok_or_else(|| eyre!("child stderr is not piped"))?;

let child_in = child
.stdin
.take()
.ok_or_else(|| eyre!("child stdin is not piped"))?;

let mut stdin_outputs = outputs(child_in, stdin_log_path)?;
let mut stdout_outputs = outputs(stdout(), stdout_log_path)?;
let mut stderr_outputs = outputs(stderr(), stderr_log_path)?;

// Create a channel to send data from stdin to the cancellable_tee thread.
let (t_in, r_in) = bounded(1);

// Read from stdin and send on a channel. We will call select! on this
// channel in the cancellable_tee thread.
// DO NOT join on this thread, it will cause reading stdin to block.
thread::spawn(move || {
let mut buffer = [0u8; 1024];
loop {
let n = stdin().read(&mut buffer).unwrap();
if n == 0 {
break;
}
t_in.send((buffer, n)).unwrap();
}
});

thread::scope(|s| -> Result<i32> {
let (t_cancel, r_cancel) = bounded(1);

// If cancellable_tee were not used here, reading from stdin will block the thread even
// after the child process has exited. This will prevent the process from exiting.
// So we have to cancel the tee thread when the child process exits and before
// joining on the tee thread.
s.spawn(move || cancellable_tee(&r_cancel, &r_in, &mut stdin_outputs[..]));
s.spawn(|| tee(child_out, &mut stdout_outputs[..]));
s.spawn(|| tee(child_err, &mut stderr_outputs[..]));

let code = child
.wait()?
.code()
.ok_or_else(|| eyre!("process terminated by signal"))?;

// Cancel the tee threads. This may fail if the stdin was closed before the child process.
t_cancel.try_send(()).flat_map_err(|e| match e {
TrySendError::Full(()) => Err(e),
TrySendError::Disconnected(()) => Ok(()), // If the stdin was close, this is expected
})?;

Ok(code)
})
}
match child.stdin.take() {
Some(child_in) => {
let in_file = File::create(stdin_log_path.unwrap())?;
let mut stdin_outputs: Vec<Box<dyn Write + Send>> =
vec![Box::new(child_in), Box::new(in_file)];

// Create a channel to send data from stdin to the cancellable_tee thread.
let (t_in, r_in) = bounded(1);

// Read from stdin and send on a channel. We will call select! on this
// channel in the cancellable_tee thread.
// DO NOT join on this thread, it will cause reading stdin to block.
thread::spawn(move || {
let mut buffer = [0u8; 1024];
loop {
let n = stdin().read(&mut buffer).unwrap();
if n == 0 {
break;
}
t_in.send((buffer, n)).unwrap();
}
});

fn outputs<W: Write + Send + 'static>(
writer: W,
filename: Option<&Path>,
) -> Result<Vec<Box<(dyn Write + Send)>>> {
match filename {
Some(filename) => {
let file = File::create(filename)?;
Ok(vec![Box::new(writer), Box::new(file)])
thread::scope(|s| -> Result<i32> {
let (t_cancel, r_cancel) = bounded(1);

// If cancellable_tee were not used here, reading from stdin will block the thread even
// after the child process has exited. This will prevent the process from exiting.
// So we have to cancel the tee thread when the child process exits and before
// joining on the tee thread.
s.spawn(move || cancellable_tee(&r_cancel, &r_in, &mut stdin_outputs[..]));

if let Some(child_out) = child.stdout.take() {
s.spawn(move || tee(child_out, &mut out_writers[..]));
}
if let Some(child_err) = child.stderr.take() {
s.spawn(move || tee(child_err, &mut err_writers[..]));
}

let code = child
.wait()?
.code()
.ok_or_else(|| eyre!("process terminated by signal"))?;

// Cancel the tee threads. This may fail if the stdin was closed before the child process.
t_cancel.try_send(()).flat_map_err(|e| match e {
TrySendError::Full(()) => Err(e),
TrySendError::Disconnected(()) => Ok(()), // If the stdin was closed, this is expected
})?;

Ok(code)
})
}
None => Ok(vec![Box::new(writer)]),
None => thread::scope(|s| -> Result<i32> {
if let Some(child_out) = child.stdout.take() {
s.spawn(move || tee(child_out, &mut out_writers[..]));
}
if let Some(child_err) = child.stderr.take() {
s.spawn(move || tee(child_err, &mut err_writers[..]));
}

child
.wait()?
.code()
.ok_or_else(|| eyre!("process terminated by signal"))
}),
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use clap::Parser;
use eyre::Result;
use std::path::PathBuf;
use std::{path::PathBuf, process};

/// A command runner that optionally logs the I/O streams to files.
#[derive(Debug, Parser, PartialEq, Eq)]
Expand Down Expand Up @@ -35,7 +35,7 @@ fn main() -> Result<()> {
args.err_file.as_deref(),
)?;

std::process::exit(code);
process::exit(code);
}

#[cfg(test)]
Expand Down

0 comments on commit 8db361c

Please sign in to comment.