From: Ian Jackson Date: Sat, 7 Aug 2021 23:52:44 +0000 (+0100) Subject: ipif: wip break out X-Git-Tag: hippotat/1.0.0~249 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=c8adc66f0ba69226270494ad3ee09a98ea42c35e;p=hippotat.git ipif: wip break out Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index ad8da81..241504d 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -179,30 +179,10 @@ async fn run_client( ic: &ic, }; - let mut ipif = tokio::process::Command::new("sh") - .args(&["-c", &ic.ipif]) - .stdin (process::Stdio::piped()) - .stdout(process::Stdio::piped()) - .stderr(process::Stdio::piped()) - .kill_on_drop(true) - .spawn().context("spawn ipif")?; - - let stderr = ipif.stderr.take().unwrap(); - let ic_name = ic.to_string(); - let stderr_task = task::spawn(async move { - let mut stderr = tokio::io::BufReader::new(stderr).lines(); - while let Some(l) = stderr.next_line().await? { - error!("{}: ipif stderr: {}", &ic_name, l.trim_end()); - } - Ok::<_,io::Error>(()) - }); + let mut ipif = Ipif::start(&ic.ipif, ic.to_string())?; let mut req_num: ReqNum = 0; - let tx_stream = ipif.stdout.take().unwrap(); - let mut rx_stream = ipif.stdin .take().unwrap(); - - let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END); let mut tx_queue: VecDeque = default(); let mut upbound = Frames::default(); @@ -223,7 +203,7 @@ async fn run_client( select! { biased; - y = rx_stream.write_all_buf(&mut rx_queue), + y = ipif.rx_stream.write_all_buf(&mut rx_queue), if ! rx_queue.is_empty() => { let () = y.context("write rx data to ipif")?; @@ -238,7 +218,7 @@ async fn run_client( let _ = tx_queue.pop_front(); }, - data = tx_stream.next_segment(), + data = ipif.tx_stream.next_segment(), if tx_queue.is_empty() => { let data = (||{ @@ -312,13 +292,13 @@ async fn run_client( } }.await; - drop(tx_stream); + drop(ipif.tx_stream); - match ipif.wait().await { + match ipif.child.wait().await { Err(e) => error!("{}: also, failed to await ipif child: {}", &ic, e), Ok(st) => { let stderr_timeout = Duration::from_millis(1000); - match tokio::time::timeout(stderr_timeout, stderr_task).await { + match tokio::time::timeout(stderr_timeout, ipif.stderr_task).await { Err::<_,tokio::time::error::Elapsed>(_) => warn!("{}: ipif stderr task continues!", &ic), Ok(Err(e)) => error!("{}: ipif stderr task crashed: {}", &ic, e), diff --git a/src/ipif.rs b/src/ipif.rs new file mode 100644 index 0000000..5455ef2 --- /dev/null +++ b/src/ipif.rs @@ -0,0 +1,46 @@ +// Copyright 2021 Ian Jackson and contributors to Hippotat +// SPDX-License-Identifier: GPL-3.0-or-later +// There is NO WARRANTY. + +use crate::prelude::*; + +pub struct Ipif { + pub tx_stream: t_io::Split>, + pub rx_stream: t_proc::ChildStdin, + pub stderr_task: tokio::task::JoinHandle>, + pub child: t_proc::Child, +} + +impl Ipif { + #[throws(AE)] + pub fn start(cmd: &str, ic_name: String) -> Self { + let mut child = tokio::process::Command::new("sh") + .args(&["-c", cmd]) + .stdin (process::Stdio::piped()) + .stdout(process::Stdio::piped()) + .stderr(process::Stdio::piped()) + .kill_on_drop(true) + .spawn().context("spawn ipif")?; + + let stderr = child.stderr.take().unwrap(); + + let stderr_task = task::spawn(async move { + let mut stderr = t_io::BufReader::new(stderr).lines(); + while let Some(l) = stderr.next_line().await? { + error!("{}: ipif stderr: {}", ic_name, l.trim_end()); + } + Ok::<_,io::Error>(()) + }); + + let tx_stream = child.stdout.take().unwrap(); + let rx_stream = child.stdin .take().unwrap(); + let tx_stream = t_io::BufReader::new(tx_stream).split(SLIP_END); + + Ipif { + tx_stream, + rx_stream, + stderr_task, + child, + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 298c1b9..b5323b7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,6 +7,7 @@ pub mod prelude; pub mod config; +pub mod ipif; pub mod slip; pub mod reporter; pub mod queue; diff --git a/src/prelude.rs b/src/prelude.rs index 1acbafd..c0516d2 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -51,6 +51,7 @@ pub use eyre::Error as AE; pub use crate::config::{self, InstanceConfig, u32Ext as _}; pub use crate::ini; +pub use crate::ipif::Ipif; pub use crate::utils::*; pub use crate::queue::*; pub use crate::reporter::*; @@ -61,6 +62,8 @@ pub type ReqNum = u64; pub use ErrorKind as EK; pub use PacketError as PE; +pub use tokio::io as t_io; +pub use tokio::process as t_proc; pub const SLIP_END: u8 = 0o300; // c0 pub const SLIP_ESC: u8 = 0o333; // db