chiark / gitweb /
break out Ipif::next_frame
[hippotat.git] / src / ipif.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use crate::prelude::*;
6
7 type Tx = t_io::Split<t_io::BufReader<t_proc::ChildStdout>>;
8
9 pub struct Ipif {
10   pub tx: Tx,
11   pub rx: t_proc::ChildStdin,
12   stderr_task: JoinHandle<io::Result<()>>,
13   child: t_proc::Child,
14 }
15
16 impl Ipif {
17   #[throws(AE)]
18   pub fn start(cmd: &str, ic_name: Option<String>) -> Self {
19     let mut child = tokio::process::Command::new("sh")
20       .args(&["-c", cmd])
21       .stdin (process::Stdio::piped())
22       .stdout(process::Stdio::piped())
23       .stderr(process::Stdio::piped())
24       .kill_on_drop(true)
25       .spawn().context("spawn ipif")?;
26
27     let stderr = child.stderr.take().unwrap();
28
29     let stderr_task = task::spawn(async move {
30       let mut stderr = t_io::BufReader::new(stderr).lines();
31       while let Some(l) = stderr.next_line().await? {
32         error!("{}ipif stderr: {}",
33                OptionPrefixColon(ic_name.as_ref()),
34                l.trim_end());
35       }
36       Ok::<_,io::Error>(())
37     });
38  
39     let tx = child.stdout.take().unwrap();
40     let rx = child.stdin .take().unwrap();
41     let tx = t_io::BufReader::new(tx).split(SLIP_END);
42
43     Ipif {
44       tx,
45       rx,
46       stderr_task,
47       child,
48     }
49   }
50
51   pub async fn quitting(mut self, ic: Option<&InstanceConfig>) {
52     let icd = OptionPrefixColon(ic);
53     drop(self.rx);
54
55     match self.child.wait().await {
56       Err(e) => error!("{}also, failed to await ipif child: {}", icd, e),
57       Ok(st) => {
58         let stderr_timeout = Duration::from_millis(1000);
59         match tokio::time::timeout(stderr_timeout, self.stderr_task).await {
60           Err::<_,tokio::time::error::Elapsed>(_)
61             => warn!("{}ipif stderr task continues!", icd),
62           Ok(Err(e)) => error!("{}ipif stderr task crashed: {}", icd, e),
63           Ok(Ok(Err(e))) => error!("{}ipif stderr read failed: {}", icd, e),
64           Ok(Ok(Ok(()))) => { },
65         }
66         if ! st.success() {
67           error!("{}ipif process failed: {}", icd, st);
68         }
69       }
70     }
71
72     drop(self.tx);
73   }
74
75   #[throws(AE)]
76   pub async fn next_frame(tx: &mut Tx) -> Vec<u8> {
77     let data = tx.next_segment().await;
78     (||{
79       data?.ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))
80     })().context("read from ipif")?
81   }
82 }