chiark / gitweb /
read_limited_bytes: take a `capacity` argument
[hippotat.git] / src / bin / client.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6 use hippotat_macros::into_crlfs;
7
8 #[derive(StructOpt,Debug)]
9 pub struct Opts {
10   #[structopt(flatten)]
11   log: LogOpts,
12
13   #[structopt(flatten)]
14   config: config::Opts,
15 }
16
17 type OutstandingRequest<'r> = Pin<Box<
18     dyn Future<Output=Option<Box<[u8]>>> + Send + 'r
19     >>;
20
21 impl<T> HCC for T where
22         T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
23 trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
24
25 struct ClientContext<'c,C> {
26   ic: &'c InstanceConfig,
27   hclient: &'c Arc<hyper::Client<C>>,
28   reporter: &'c parking_lot::Mutex<Reporter<'c>>,
29 }
30
31 #[derive(Debug)]
32 struct TxQueued {
33   expires: Instant,
34   data: Box<[u8]>,
35 }
36
37 #[throws(AE)]
38 fn submit_request<'r, 'c:'r, C:HCC>(
39   c: &'c ClientContext<C>,
40   req_num: &mut ReqNum,
41   reqs: &mut Vec<OutstandingRequest<'r>>,
42   upbound: FramesData,
43 ) {
44   let show_timeout = c.ic.http_timeout
45     .saturating_add(Duration::from_nanos(999_999_999))
46     .as_secs();
47
48   let time_t = time_t_now();
49   let time_t = format!("{:x}", time_t);
50   let hmac = token_hmac(c.ic.secret.0.as_bytes(), time_t.as_bytes());
51   //dbg!(DumpHex(&hmac));
52   let mut token = time_t;
53   write!(token, " ").unwrap();
54   base64::encode_config_buf(&hmac, BASE64_CONFIG, &mut token);
55
56   let req_num = { *req_num += 1; *req_num };
57
58   let prefix1 = format!(into_crlfs!(
59     r#"--b
60        Content-Type: text/plain; charset="utf-8"
61        Content-Disposition: form-data; name="m"
62
63        {}
64        {}
65        {}
66        {}
67        {}
68        {}"#),
69                        &c.ic.link.client,
70                        token,
71                        c.ic.target_requests_outstanding,
72                        show_timeout,
73                        c.ic.max_batch_down,
74                        c.ic.max_batch_up,
75   );
76
77   let prefix2 = format!(into_crlfs!(
78     r#"
79        --b
80        Content-Type: application/octet-stream
81        Content-Disposition: form-data; name="d"
82
83        "#),
84   );
85   let suffix = format!(into_crlfs!(
86     r#"
87        --b--
88        "#),
89   );
90
91   macro_rules! content { {
92     $out:ty,
93     $iter:ident,
94     $into:ident,
95   } => {
96     itertools::chain![
97       array::IntoIter::new([
98         prefix1.$into(),
99         prefix2.$into(),
100       ]).take(
101         if upbound.is_empty() { 1 } else { 2 }
102       ),
103       Itertools::intersperse(
104         upbound.$iter().map(|u| { let out: $out = u.$into(); out }),
105         SLIP_END_SLICE.$into()
106       ),
107       [ suffix.$into() ],
108     ]
109   }}
110
111   let body_len: usize = content!(
112     &[u8],
113     iter,
114     as_ref,
115   ).map(|b| b.len()).sum();
116
117   trace!("{} #{}: req; tx body_len={} frames={}",
118          &c.ic, req_num, body_len, upbound.len());
119
120   let body = hyper::body::Body::wrap_stream(
121     futures::stream::iter(
122       content!(
123         Bytes,
124         into_iter,
125         into,
126       ).map(Ok::<Bytes,Void>)
127     )
128   );
129
130   let req = hyper::Request::post(&c.ic.url)
131     .header("Content-Type", r#"multipart/form-data; boundary="b""#)
132     .header("Content-Length", body_len)
133     .body(body)
134     .context("construct request")?;
135
136   let resp = c.hclient.request(req);
137   let fut = Box::pin(async move {
138     let r = async { tokio::time::timeout( c.ic.effective_http_timeout, async {
139       let resp = resp.await.context("make request")?;
140       let status = resp.status();
141       let mut resp = resp.into_body();
142       let max_body = c.ic.max_batch_down.sat() + MAX_OVERHEAD;
143       let resp = read_limited_bytes(
144         max_body, default(), default(), &mut resp
145       ).await
146         .discard_data().context("fetching response body")?;
147
148       if ! status.is_success() {
149         throw!(anyhow!("HTTP error status={} body={:?}",
150                        &status, String::from_utf8_lossy(&resp)));
151       }
152
153       Ok::<_,AE>(resp)
154     }).await? }.await;
155
156     let r = c.reporter.lock().filter(Some(req_num), r);
157
158     if let Some(r) = &r {
159       trace!("{} #{}: rok; rx bytes={}", &c.ic, req_num, r.len());
160     } else {
161       tokio::time::sleep(c.ic.http_retry).await;
162     }
163     r
164   });
165   reqs.push(fut);
166 }
167
168 async fn run_client<C:HCC>(
169   ic: InstanceConfig,
170   hclient: Arc<hyper::Client<C>>
171 ) -> Result<Void, AE>
172 {
173   debug!("{}: config: {:?}", &ic, &ic);
174
175   let reporter = parking_lot::Mutex::new(Reporter::new(&ic));
176
177   let c = ClientContext {
178     reporter: &reporter,
179     hclient: &hclient,
180     ic: &ic,
181   };
182
183   let mut ipif = Ipif::start(&ic.ipif, Some(ic.to_string()))?;
184
185   let mut req_num: ReqNum = 0;
186
187   let mut tx_queue: VecDeque<TxQueued> = default();
188   let mut upbound = Frames::default();
189
190   let mut reqs: Vec<OutstandingRequest>
191     = Vec::with_capacity(ic.max_requests_outstanding.sat());
192
193   let mut rx_queue: FrameQueue = default();
194
195   let trouble = async {
196     loop {
197       let rx_queue_space = 
198         if rx_queue.remaining() < ic.max_batch_down.sat() {
199           Ok(())
200         } else {
201           Err(())
202         };
203       
204       select! {
205         biased;
206
207         y = ipif.rx.write_all_buf(&mut rx_queue),
208         if ! rx_queue.is_empty() =>
209         {
210           let () = y.context("write rx data to ipif")?;
211         },
212
213         () = async {
214           let expires = tx_queue.front().unwrap().expires;
215           tokio::time::sleep_until(expires).await
216         },
217         if ! tx_queue.is_empty() =>
218         {
219           let _ = tx_queue.pop_front();
220         },
221
222         data = ipif.tx.next_segment(),
223         if tx_queue.is_empty() =>
224         {
225           let data = (||{
226             data?.ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))
227           })().context("read from ipif")?;
228           //eprintln!("data={:?}", DumpHex(&data));
229
230           match check1(Slip2Mime, ic.mtu, &data, |header| {
231             let addr = ip_packet_addr::<false>(header)?;
232             if addr != ic.link.client.0 { throw!(PE::Src(addr)) }
233             Ok(())
234           }) {
235             Ok(data) => tx_queue.push_back(TxQueued {
236               data,
237               expires: Instant::now() + ic.max_queue_time
238             }),
239             Err(PE::Empty) => { },
240             Err(e@ PE::Src(_)) => debug!("{}: tx discarding: {}", &ic, e),
241             Err(e) => error!("{}: tx discarding: {}", &ic, e),
242           };
243         },
244
245         _ = async { },
246         if ! upbound.tried_full() &&
247            ! tx_queue.is_empty() =>
248         {
249           while let Some(TxQueued { data, expires }) = tx_queue.pop_front() {
250             match upbound.add(ic.max_batch_up, data.into()/*todo:504*/) {
251               Err(data) => {
252                 tx_queue.push_front(TxQueued { data: data.into(), expires });
253                 break;
254               }
255               Ok(()) => { },
256             }
257           }
258         },
259
260         _ = async { },
261         if rx_queue_space.is_ok() &&
262           (reqs.len() < ic.target_requests_outstanding.sat() ||
263            (reqs.len() < ic.max_requests_outstanding.sat() &&
264             ! upbound.is_empty()))
265           =>
266         {
267           submit_request(&c, &mut req_num, &mut reqs,
268                          mem::take(&mut upbound).into())?;
269         },
270
271         (got, goti, _) = async { future::select_all(&mut reqs).await },
272           if ! reqs.is_empty() =>
273         {
274           reqs.swap_remove(goti);
275
276           if let Some(got) = got {
277             
278             //eprintln!("got={:?}", DumpHex(&got));
279             match checkn(SlipNoConv,ic.mtu, &got, &mut rx_queue, |header| {
280               let addr = ip_packet_addr::<true>(header)?;
281               if addr != ic.link.client.0 { throw!(PE::Dst(addr)) }
282               Ok(())
283             }, |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e)) {
284               Ok(()) => reporter.lock().success(),
285               Err(ErrorOnlyBad) => {
286                 reqs.push(Box::pin(async {
287                   tokio::time::sleep(ic.http_retry).await;
288                   None
289                 }));
290               },
291             }
292           }
293         },
294
295         _ = tokio::time::sleep(c.ic.effective_http_timeout),
296         if rx_queue_space.is_err() =>
297         {
298           reporter.lock().filter(None, Err::<Void,_>(
299             anyhow!("rx queue full, blocked")
300           ));
301         },
302       }
303     }
304   }.await;
305
306   ipif.quitting(Some(&ic)).await;
307   trouble
308 }
309
310 #[tokio::main]
311 async fn main() {
312   let opts = Opts::from_args();
313   let (ics,) = config::startup("hippotat", LinkEnd::Client,
314                                &opts.config, &opts.log, |ics|Ok((ics,)));
315
316   let https = HttpsConnector::new();
317   let hclient = hyper::Client::builder()
318     .http1_preserve_header_case(true)
319     .build::<_, hyper::Body>(https);
320   let hclient = Arc::new(hclient);
321
322   info!("starting");
323   let () = future::select_all(
324     ics.into_iter().map(|ic| Box::pin(async {
325       let assocname = ic.to_string();
326       info!("{} starting", &assocname);
327       let hclient = hclient.clone();
328       let join = task::spawn(async {
329         run_client(ic, hclient).await.void_unwrap_err()
330       });
331       match join.await {
332         Ok(e) => {
333           error!("{} failed: {}", &assocname, e);
334         },
335         Err(je) => {
336           error!("{} panicked!", &assocname);
337           panic::resume_unwind(je.into_panic());
338         },
339       }
340     }))
341   ).await.0;
342
343   error!("quitting because one of your client connections crashed");
344   process::exit(16);
345 }