chiark / gitweb /
ipif: wip break out
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 7 Aug 2021 23:52:44 +0000 (00:52 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 7 Aug 2021 23:52:44 +0000 (00:52 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs
src/ipif.rs [new file with mode: 0644]
src/lib.rs
src/prelude.rs

index ad8da8128af46df9c319c44a3dae1154e9043821..241504d1f0d2e3f1d9dd34e6a69406ab85106676 100644 (file)
@@ -179,30 +179,10 @@ async fn run_client<C:HCC>(
     ic: &ic,
   };
 
-  let mut ipif = tokio::process::Command::new("sh")
-    .args(&["-c", &ic.ipif])
-    .stdin (process::Stdio::piped())
-    .stdout(process::Stdio::piped())
-    .stderr(process::Stdio::piped())
-    .kill_on_drop(true)
-    .spawn().context("spawn ipif")?;
-  
-  let stderr = ipif.stderr.take().unwrap();
-  let ic_name = ic.to_string();
-  let stderr_task = task::spawn(async move {
-    let mut stderr = tokio::io::BufReader::new(stderr).lines();
-    while let Some(l) = stderr.next_line().await? {
-      error!("{}: ipif stderr: {}", &ic_name, l.trim_end());
-    }
-    Ok::<_,io::Error>(())
-  });
+  let mut ipif = Ipif::start(&ic.ipif, ic.to_string())?;
 
   let mut req_num: ReqNum = 0;
 
-  let tx_stream = ipif.stdout.take().unwrap();
-  let mut rx_stream = ipif.stdin .take().unwrap();
-
-  let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END);
   let mut tx_queue: VecDeque<TxQueued> = default();
   let mut upbound = Frames::default();
 
@@ -223,7 +203,7 @@ async fn run_client<C:HCC>(
       select! {
         biased;
 
-        y = rx_stream.write_all_buf(&mut rx_queue),
+        y = ipif.rx_stream.write_all_buf(&mut rx_queue),
         if ! rx_queue.is_empty() =>
         {
           let () = y.context("write rx data to ipif")?;
@@ -238,7 +218,7 @@ async fn run_client<C:HCC>(
           let _ = tx_queue.pop_front();
         },
 
-        data = tx_stream.next_segment(),
+        data = ipif.tx_stream.next_segment(),
         if tx_queue.is_empty() =>
         {
           let data = (||{
@@ -312,13 +292,13 @@ async fn run_client<C:HCC>(
     }
   }.await;
 
-  drop(tx_stream);
+  drop(ipif.tx_stream);
 
-  match ipif.wait().await {
+  match ipif.child.wait().await {
     Err(e) => error!("{}: also, failed to await ipif child: {}", &ic, e),
     Ok(st) => {
       let stderr_timeout = Duration::from_millis(1000);
-      match tokio::time::timeout(stderr_timeout, stderr_task).await {
+      match tokio::time::timeout(stderr_timeout, ipif.stderr_task).await {
         Err::<_,tokio::time::error::Elapsed>(_)
           => warn!("{}: ipif stderr task continues!", &ic),
         Ok(Err(e)) => error!("{}: ipif stderr task crashed: {}", &ic, e),
diff --git a/src/ipif.rs b/src/ipif.rs
new file mode 100644 (file)
index 0000000..5455ef2
--- /dev/null
@@ -0,0 +1,46 @@
+// Copyright 2021 Ian Jackson and contributors to Hippotat
+// SPDX-License-Identifier: GPL-3.0-or-later
+// There is NO WARRANTY.
+
+use crate::prelude::*;
+
+pub struct Ipif {
+  pub tx_stream: t_io::Split<t_io::BufReader<t_proc::ChildStdout>>,
+  pub rx_stream: t_proc::ChildStdin,
+  pub stderr_task: tokio::task::JoinHandle<io::Result<()>>,
+  pub child: t_proc::Child,
+}
+
+impl Ipif {
+  #[throws(AE)]
+  pub fn start(cmd: &str, ic_name: String) -> Self {
+    let mut child = tokio::process::Command::new("sh")
+      .args(&["-c", cmd])
+      .stdin (process::Stdio::piped())
+      .stdout(process::Stdio::piped())
+      .stderr(process::Stdio::piped())
+      .kill_on_drop(true)
+      .spawn().context("spawn ipif")?;
+
+    let stderr = child.stderr.take().unwrap();
+
+    let stderr_task = task::spawn(async move {
+      let mut stderr = t_io::BufReader::new(stderr).lines();
+      while let Some(l) = stderr.next_line().await? {
+        error!("{}: ipif stderr: {}", ic_name, l.trim_end());
+      }
+      Ok::<_,io::Error>(())
+    });
+    let tx_stream = child.stdout.take().unwrap();
+    let rx_stream = child.stdin .take().unwrap();
+    let tx_stream = t_io::BufReader::new(tx_stream).split(SLIP_END);
+
+    Ipif {
+      tx_stream,
+      rx_stream,
+      stderr_task,
+      child,
+    }
+  }
+}
index 298c1b9689084ec17c97552c7630b09c1fce059d..b5323b7904accef01a88ad3fd9579a68b0574c49 100644 (file)
@@ -7,6 +7,7 @@
 pub mod prelude;
 
 pub mod config;
+pub mod ipif;
 pub mod slip;
 pub mod reporter;
 pub mod queue;
index 1acbafd14236dc89badfba691adb1e5e31348f9a..c0516d28e88672a0efd2eb40f453dc555710551f 100644 (file)
@@ -51,6 +51,7 @@ pub use eyre::Error as AE;
 
 pub use crate::config::{self, InstanceConfig, u32Ext as _};
 pub use crate::ini;
+pub use crate::ipif::Ipif;
 pub use crate::utils::*;
 pub use crate::queue::*;
 pub use crate::reporter::*;
@@ -61,6 +62,8 @@ pub type ReqNum = u64;
 
 pub use ErrorKind as EK;
 pub use PacketError as PE;
+pub use tokio::io as t_io;
+pub use tokio::process as t_proc;
 
 pub const SLIP_END:     u8 = 0o300; // c0
 pub const SLIP_ESC:     u8 = 0o333; // db