chiark / gitweb /
wip mimeswap
[hippotat.git] / src / bin / client.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: AGPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 type OutstandingRequest<'r> = Pin<Box<
8     dyn Future<Output=Option<Bytes>> + Send + 'r
9     >>;
10
11 impl<T> HCC for T where
12         T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
13 trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
14
15 struct ClientContext<'c,C> {
16   ic: &'c InstanceConfig,
17   hclient: &'c Arc<hyper::Client<C>>,
18   reporter: &'c parking_lot::Mutex<Reporter<'c>>,
19 }
20
21 #[throws(AE)]
22 fn submit_request<'r, 'c:'r, C:HCC>(
23   c: &'c ClientContext<C>,
24   reqs: &mut Vec<OutstandingRequest<'r>>,
25   upbound: Vec<Vec<u8>>
26 ) {
27   let body = hyper::body::Body::wrap_stream(
28     futures::stream::iter(
29       Itertools::intersperse(
30         upbound.into_iter().map(|u| Bytes::from(u)),
31         slip::SLIP_END_SLICE.into()
32       ).map(Ok::<_,Void>)
33     )
34   );
35
36   let req = hyper::Request::post(&c.ic.url).body(body)
37     .context("construct request")?;
38
39   let resp = c.hclient.request(req);
40   let fut = Box::pin(async move {
41     let r = async { tokio::time::timeout( c.ic.http_timeout, async {
42       let resp = resp.await.context("make request")?;
43       if ! resp.status().is_success() {
44         throw!(anyhow!("HTTP error status {}", &resp.status()));
45       }
46       let resp = resp.into_body();
47       // xxx: some size limit to avoid mallocing the universe
48       let resp = hyper::body::to_bytes(resp).await
49         .context("HTTP error fetching response body")?;
50
51       Ok::<_,AE>(resp)
52     }).await? }.await;
53
54     let r = c.reporter.lock().report(r);
55
56     if r.is_none() {
57       tokio::time::sleep(c.ic.http_retry).await;
58     }
59     r
60   });
61   reqs.push(fut);
62 }
63
64 #[allow(unused_variables)] // xxx
65 async fn run_client<C:HCC>(
66   ic: InstanceConfig,
67   hclient: Arc<hyper::Client<C>>
68 ) -> Result<Void, AE>
69 {
70   debug!("{}: config: {:?}", &ic, &ic);
71
72   let reporter = parking_lot::Mutex::new(Reporter::new(&ic));
73
74   let c = ClientContext {
75     reporter: &reporter,
76     hclient: &hclient,
77     ic: &ic,
78   };
79
80   let mut ipif = tokio::process::Command::new("sh")
81     .args(&["-c", &ic.ipif])
82     .stdin (process::Stdio::piped())
83     .stdout(process::Stdio::piped())
84     .stderr(process::Stdio::piped())
85     .kill_on_drop(true)
86     .spawn().context("spawn ipif")?;
87   
88   let stderr = ipif.stderr.take().unwrap();
89   let ic_name = ic.to_string();
90   let _ = task::spawn(async move {
91     let mut stderr = tokio::io::BufReader::new(stderr).lines();
92     while let Some(l) = stderr.next_line().await? {
93       error!("{}: ipif stderr: {}", &ic_name, l.trim_end());
94     }
95     Ok::<_,io::Error>(())
96   });
97
98   let tx_stream = ipif.stdout.take().unwrap();
99   let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_ESC);
100   let mut tx_defer = None;
101
102   let stream_for_rx = ipif.stdin .take().unwrap();
103
104   let mut reqs: Vec<OutstandingRequest>
105     = Vec::with_capacity(ic.max_requests_outstanding.sat());
106
107   // xxx check that ic settings are all honoured
108
109   async {
110     loop {
111       select! {
112         packet = async {
113           // cancellation safety: if this future is polled, we might
114           // move out of tx_defer, but then we will be immediately
115           // ready and yield it inot packet
116           if let Some(y) = tx_defer.take() {
117             Ok(Some(y))
118           } else {
119             tx_stream.next_segment().await
120           }
121         },
122         if tx_defer.is_none() &&
123            reqs.len() < ic.max_requests_outstanding.sat() =>
124         {
125           let mut upbound_total = 0;
126           let mut upbound: Vec<Vec<u8>> = vec![];
127           let mut to_process: Option<Result<Option<Vec<u8>>,_>>
128             = Some(packet);
129           while let Some(packet) = to_process.take() {
130             let mut packet =
131               packet.context("read from ipif")?
132               .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
133             if let Ok(()) = slip::check_checkmtu_mimeswap(&ic, SLIP_ESC, &mut packet) {
134               let new_upbound_total = packet.len() + upbound_total + 1;
135               if new_upbound_total > ic.max_batch_up.sat() {
136                 tx_defer = Some(packet);
137                 break;
138               }
139               upbound_total = new_upbound_total;
140               upbound.push(packet);
141               // we rely oin `next_segment` being cancellation-safe,
142               // which isn't documented as true but seems reasonably safe
143               pin!{ let next_segment = tx_stream.next_segment(); }
144               to_process = match poll!(next_segment) {
145                 Poll::Ready(p) => Some(p),
146                 Poll::Pending => None,
147               };
148             }
149           }
150           assert!( to_process.is_none() );
151           dbg!(&reqs.len(), &upbound_total, &upbound.len());
152
153           //: impl futures::Stream<Cow<&[u8]>>
154           submit_request(&c, &mut reqs, upbound)?;
155         }
156
157         () = async { },
158         if reqs.len() < ic.target_requests_outstanding.sat() ||
159            (tx_defer.is_some() &&
160             reqs.len() < ic.max_requests_outstanding.sat()) =>
161         {
162           let upbound = tx_defer.take().into_iter().collect_vec();
163           submit_request(&c, &mut reqs, upbound)?;
164         }
165
166         (got, goti, _) = async { future::select_all(&mut reqs).await },
167           if ! reqs.is_empty() =>
168         {
169           reqs.swap_remove(goti);
170           if let Some(got) = got {
171             dbg!(&got.remaining()); // xxx
172           }
173         }
174       }
175     }
176   }.await
177 }
178
179 #[tokio::main]
180 async fn main() -> Result<(), AE> {
181   let ics = config::read(LinkEnd::Client)?;
182   if ics.is_empty() { throw!(anyhow!("no associations with server(s)")); }
183
184   env_logger::init();
185
186   let https = HttpsConnector::new();
187   let hclient = hyper::Client::builder().build::<_, hyper::Body>(https);
188   let hclient = Arc::new(hclient);
189
190   info!("starting");
191   let () = future::select_all(
192     ics.into_iter().map(|ic| Box::pin(async {
193       let assocname = ic.to_string();
194       info!("{} starting", &assocname);
195       let hclient = hclient.clone();
196       let join = task::spawn(async {
197         run_client(ic, hclient).await.void_unwrap_err()
198       });
199       match join.await {
200         Ok(e) => {
201           error!("{} failed: {:?}", &assocname, e);
202         },
203         Err(je) => {
204           error!("{} panicked!", &assocname);
205           panic::resume_unwind(je.into_panic());
206         },
207       }
208     }))
209   ).await.0;
210
211   error!("quitting because one of your client connections crashed");
212   process::exit(16);
213 }