chiark / gitweb /
break out submit_request
[hippotat.git] / src / bin / client.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: AGPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 /*
8 struct Client {
9   requests_outstanding: Vec<>,
10   tx_read_stream: something,
11   rx_write_stream: something,
12 }*/
13
14
15 type OutstandingRequest<'r> = Pin<Box<
16     dyn Future<Output=Result<Bytes,AE>> + Send + 'r
17     >>;
18
19 #[throws(AE)]
20 fn submit_request<'r, 'ic:'r, 'hc:'r, C>(ic: &'ic InstanceConfig,
21                       hclient: &'hc Arc<hyper::Client<C>>,
22                       reqs: &mut Vec<OutstandingRequest<'r>>,
23                       upbound: Vec<Vec<u8>>,
24 )
25 where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
26  {
27           let body = hyper::body::Body::wrap_stream(
28             futures::stream::iter(
29               Itertools::intersperse(
30                 upbound.into_iter().map(|u| Bytes::from(u)),
31                 slip::SLIP_END_SLICE.into()
32               ).map(Ok::<_,Void>)
33             )
34           );
35
36           let req = hyper::Request::post(&ic.url).body(body)
37             .context("construct request")?;
38
39           let resp = hclient.request(req);
40           let fut = Box::pin(async move {
41             let r = async { tokio::time::timeout( ic.http_timeout, async {
42               let resp = resp.await.context("make request")?;
43               if ! resp.status().is_success() {
44                 throw!(anyhow!("HTTP error status {}", &resp.status()));
45               }
46               let resp = resp.into_body();
47               // xxx: some size limit to avoid mallocing the universe
48               let resp = hyper::body::to_bytes(resp).await
49                 .context("HTTP error fetching response body")?;
50               Ok::<_,AE>(resp)
51             }).await? }.await;
52             if r.is_err() {
53               tokio::time::sleep(ic.http_retry).await;
54             }
55             r
56           });
57           reqs.push(fut);
58 }
59
60 #[allow(unused_variables)] // xxx
61 async fn run_client<C>(ic: InstanceConfig, hclient: Arc<hyper::Client<C>>)
62                        -> Result<Void, AE>
63 where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
64 {
65   debug!("{}: config: {:?}", &ic, &ic);
66
67   let mut reporter = Reporter { };
68
69   let mut ipif = tokio::process::Command::new("sh")
70     .args(&["-c", &ic.ipif])
71     .stdin (process::Stdio::piped())
72     .stdout(process::Stdio::piped())
73     .stderr(process::Stdio::piped())
74     .kill_on_drop(true)
75     .spawn().context("spawn ipif")?;
76   
77   let stderr = ipif.stderr.take().unwrap();
78   let ic_name = ic.to_string();
79   let _ = task::spawn(async move {
80     let mut stderr = tokio::io::BufReader::new(stderr).lines();
81     while let Some(l) = stderr.next_line().await? {
82       error!("{}: ipif stderr: {}", &ic_name, l.trim_end());
83     }
84     Ok::<_,io::Error>(())
85   });
86
87   let tx_stream = ipif.stdout.take().unwrap();
88   let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_ESC);
89   let mut tx_defer = None;
90
91   let stream_for_rx = ipif.stdin .take().unwrap();
92
93   let mut reqs: Vec<OutstandingRequest>
94     = Vec::with_capacity(ic.max_requests_outstanding.sat());
95
96   // xxx check that ic settings are all honoured
97
98   async {
99     loop {
100       select! {
101         packet = tx_stream.next_segment(),
102         if tx_defer.is_none() &&
103            reqs.len() < ic.max_requests_outstanding.sat() =>
104         {
105           let mut upbound_total = 0;
106           let mut upbound: Vec<Vec<u8>> = vec![];
107           let mut to_process: Option<Result<Option<Vec<u8>>,_>>
108             = Some(packet);
109           while let Some(packet) = to_process.take() {
110             let mut packet =
111               packet.context("read from ipif")?
112               .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
113             if let Ok(()) = slip::check_checkmtu_mimeify(&mut packet, &ic) {
114               let new_upbound_total = packet.len() + upbound_total + 1;
115               if new_upbound_total > ic.max_batch_up.sat() {
116                 tx_defer = Some(packet);
117                 break;
118               }
119               upbound_total = new_upbound_total;
120               upbound.push(packet);
121               // we rely oin `next_segment` being cancellation-safe,
122               // which isn't documented as true but seems reasonably safe
123               pin!{ let next_segment = tx_stream.next_segment(); }
124               to_process = match poll!(next_segment) {
125                 Poll::Ready(p) => Some(p),
126                 Poll::Pending => None,
127               };
128             }
129           }
130           assert!( to_process.is_none() );
131           dbg!(&reqs.len(), &upbound_total, &upbound.len());
132
133           //: impl futures::Stream<Cow<&[u8]>>
134           submit_request(&ic, &hclient, &mut reqs, upbound)?;
135         }
136
137         (got, goti, _) = async { future::select_all(&mut reqs).await },
138           if ! reqs.is_empty() =>
139         {
140           reqs.swap_remove(goti);
141           if let Some(got) = reporter.report(got) {
142             dbg!(&got.remaining()); // xxx
143           }
144         }
145       }
146     }
147   }.await
148 }
149
150 #[tokio::main]
151 async fn main() -> Result<(), AE> {
152   let ics = config::read(LinkEnd::Client)?;
153   if ics.is_empty() { throw!(anyhow!("no associations with server(s)")); }
154
155   env_logger::init();
156
157   let https = HttpsConnector::new();
158   let hclient = hyper::Client::builder().build::<_, hyper::Body>(https);
159   let hclient = Arc::new(hclient);
160
161   info!("starting");
162   let () = future::select_all(
163     ics.into_iter().map(|ic| Box::pin(async {
164       let assocname = ic.to_string();
165       info!("{} starting", &assocname);
166       let hclient = hclient.clone();
167       let join = task::spawn(async {
168         run_client(ic, hclient).await.void_unwrap_err()
169       });
170       match join.await {
171         Ok(e) => {
172           error!("{} failed: {:?}", &assocname, e);
173         },
174         Err(je) => {
175           error!("{} panicked!", &assocname);
176           panic::resume_unwind(je.into_panic());
177         },
178       }
179     }))
180   ).await.0;
181
182   error!("quitting because one of your client connections crashed");
183   process::exit(16);
184 }