Skip to content

Commit

Permalink
executing analysis command the correct way
Browse files Browse the repository at this point in the history
  • Loading branch information
timglabisch committed Mar 30, 2022
1 parent ed425b1 commit 8041127
Showing 1 changed file with 40 additions and 30 deletions.
70 changes: 40 additions & 30 deletions mehsh_check/src/analysis/analysis_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ use tokio::process::Command;
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::task::JoinHandle;

type CliOutput = Vec<u8>;

pub struct ExecuteAnalysisCommandHandler {
notify_send: UnboundedSender<()>,
}
Expand All @@ -25,7 +23,7 @@ struct CommandExecutionContext {

#[derive(Debug)]
enum ExecuteMsg {
CliOutput(CliOutput),
CliOutput(Vec<u8>),
Finish(()),
}

Expand Down Expand Up @@ -203,42 +201,54 @@ async fn execute_analysis_command(
.stderr(Stdio::piped())
.kill_on_drop(true)
.spawn()
.context("could not start child")?;
.expect("could not start child");

let mut stdout_buffer = [0; 4096];
let mut stdout = BufReader::new(
let stdout = BufReader::new(
child
.stdout
.take()
.context("could not take stdout from child")?,
.expect("could not take stdout from child"),
);
let mut stderr_buffer = [0; 4096];
let mut stderr = BufReader::new(
let stderr = BufReader::new(
child
.stderr
.take()
.context("could not take stderr from child")?,
.expect("could not take stderr from child"),
);

loop {
::tokio::select! {
stdout_read_res = stdout.read(&mut stdout_buffer) => {
let out : &[u8] = &stdout_buffer[..stdout_read_res.context("could not read stdout_read_res")?];
if out.len() == 0 {
continue;
}
sender.send(ExecuteMsg::CliOutput(out.to_vec()))?;
},
stderr_read_res = stderr.read(&mut stderr_buffer) => {
let out : &[u8] = &stderr_buffer[..stderr_read_res.context("could not read stdout_read_res")?];
if out.len() == 0 {
continue;
}
sender.send(ExecuteMsg::CliOutput(out.to_vec()))?;
},
res = child.wait() => {
return res.map_err(|e| anyhow!(e));
},
let jh_sender = sender.clone();
let mut jh_read = stdout;
let jh_stdout : JoinHandle<Result<(), ::anyhow::Error>> = ::tokio::spawn(async move {
let mut buffer = [0; 4096];
loop {
match jh_read.read(&mut buffer).await? {
0 => return Ok(()),
size => {
let out : &[u8] = &buffer[..size];
jh_sender.send(ExecuteMsg::CliOutput(out.to_vec())).context("could not send")?;
},
}
}
}
});

let jh_sender = sender.clone();
let mut jh_read = stderr;
let jh_stderr : JoinHandle<Result<(), ::anyhow::Error>> = ::tokio::spawn(async move {
let mut buffer = [0; 4096];
loop {
match jh_read.read(&mut buffer).await? {
0 => return Ok(()),
size => {
let out : &[u8] = &buffer[..size];
jh_sender.send(ExecuteMsg::CliOutput(out.to_vec())).context("could not send")?;
},
}
}
});

let (jh1, jh2) = ::tokio::join!(jh_stdout, jh_stderr);
jh1??;
jh2??;

return child.wait().await.map_err(|e| anyhow!(e));
}

0 comments on commit 8041127

Please sign in to comment.