ic: &ic,
};
- let mut ipif = tokio::process::Command::new("sh")
- .args(&["-c", &ic.ipif])
- .stdin (process::Stdio::piped())
- .stdout(process::Stdio::piped())
- .stderr(process::Stdio::piped())
- .kill_on_drop(true)
- .spawn().context("spawn ipif")?;
-
- let stderr = ipif.stderr.take().unwrap();
- let ic_name = ic.to_string();
- let stderr_task = task::spawn(async move {
- let mut stderr = tokio::io::BufReader::new(stderr).lines();
- while let Some(l) = stderr.next_line().await? {
- error!("{}: ipif stderr: {}", &ic_name, l.trim_end());
- }
- Ok::<_,io::Error>(())
- });
+ let mut ipif = Ipif::start(&ic.ipif, ic.to_string())?;
let mut req_num: ReqNum = 0;
- let tx_stream = ipif.stdout.take().unwrap();
- let mut rx_stream = ipif.stdin .take().unwrap();
-
- let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END);
let mut tx_queue: VecDeque<TxQueued> = default();
let mut upbound = Frames::default();
select! {
biased;
- y = rx_stream.write_all_buf(&mut rx_queue),
+ y = ipif.rx_stream.write_all_buf(&mut rx_queue),
if ! rx_queue.is_empty() =>
{
let () = y.context("write rx data to ipif")?;
let _ = tx_queue.pop_front();
},
- data = tx_stream.next_segment(),
+ data = ipif.tx_stream.next_segment(),
if tx_queue.is_empty() =>
{
let data = (||{
}
}.await;
- drop(tx_stream);
+ drop(ipif.tx_stream);
- match ipif.wait().await {
+ match ipif.child.wait().await {
Err(e) => error!("{}: also, failed to await ipif child: {}", &ic, e),
Ok(st) => {
let stderr_timeout = Duration::from_millis(1000);
- match tokio::time::timeout(stderr_timeout, stderr_task).await {
+ match tokio::time::timeout(stderr_timeout, ipif.stderr_task).await {
Err::<_,tokio::time::error::Elapsed>(_)
=> warn!("{}: ipif stderr task continues!", &ic),
Ok(Err(e)) => error!("{}: ipif stderr task crashed: {}", &ic, e),
--- /dev/null
+// Copyright 2021 Ian Jackson and contributors to Hippotat
+// SPDX-License-Identifier: GPL-3.0-or-later
+// There is NO WARRANTY.
+
+use crate::prelude::*;
+
+pub struct Ipif {
+ pub tx_stream: t_io::Split<t_io::BufReader<t_proc::ChildStdout>>,
+ pub rx_stream: t_proc::ChildStdin,
+ pub stderr_task: tokio::task::JoinHandle<io::Result<()>>,
+ pub child: t_proc::Child,
+}
+
+impl Ipif {
+ #[throws(AE)]
+ pub fn start(cmd: &str, ic_name: String) -> Self {
+ let mut child = tokio::process::Command::new("sh")
+ .args(&["-c", cmd])
+ .stdin (process::Stdio::piped())
+ .stdout(process::Stdio::piped())
+ .stderr(process::Stdio::piped())
+ .kill_on_drop(true)
+ .spawn().context("spawn ipif")?;
+
+ let stderr = child.stderr.take().unwrap();
+
+ let stderr_task = task::spawn(async move {
+ let mut stderr = t_io::BufReader::new(stderr).lines();
+ while let Some(l) = stderr.next_line().await? {
+ error!("{}: ipif stderr: {}", ic_name, l.trim_end());
+ }
+ Ok::<_,io::Error>(())
+ });
+
+ let tx_stream = child.stdout.take().unwrap();
+ let rx_stream = child.stdin .take().unwrap();
+ let tx_stream = t_io::BufReader::new(tx_stream).split(SLIP_END);
+
+ Ipif {
+ tx_stream,
+ rx_stream,
+ stderr_task,
+ child,
+ }
+ }
+}
pub use crate::config::{self, InstanceConfig, u32Ext as _};
pub use crate::ini;
+pub use crate::ipif::Ipif;
pub use crate::utils::*;
pub use crate::queue::*;
pub use crate::reporter::*;
pub use ErrorKind as EK;
pub use PacketError as PE;
+pub use tokio::io as t_io;
+pub use tokio::process as t_proc;
pub const SLIP_END: u8 = 0o300; // c0
pub const SLIP_ESC: u8 = 0o333; // db