chiark / gitweb /
Report when http req fails, not after retry delay
[hippotat.git] / src / bin / client.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: AGPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 type OutstandingRequest<'r> = Pin<Box<
8     dyn Future<Output=Option<Bytes>> + Send + 'r
9     >>;
10
11 impl<T> HCC for T where
12         T: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
13 trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
14
15 #[throws(AE)]
16 fn submit_request<'r, 'ic:'r, 'hc:'r, 'rep:'r, C:HCC>(
17   ic: &'ic InstanceConfig,
18   hclient: &'hc Arc<hyper::Client<C>>,
19   reporter: &'rep parking_lot::Mutex<Reporter>,
20   reqs: &mut Vec<OutstandingRequest<'r>>,
21   upbound: Vec<Vec<u8>>
22 ) {
23   let body = hyper::body::Body::wrap_stream(
24     futures::stream::iter(
25       Itertools::intersperse(
26         upbound.into_iter().map(|u| Bytes::from(u)),
27         slip::SLIP_END_SLICE.into()
28       ).map(Ok::<_,Void>)
29     )
30   );
31
32   let req = hyper::Request::post(&ic.url).body(body)
33     .context("construct request")?;
34
35   let resp = hclient.request(req);
36   let fut = Box::pin(async move {
37     let r = async { tokio::time::timeout( ic.http_timeout, async {
38       let resp = resp.await.context("make request")?;
39       if ! resp.status().is_success() {
40         throw!(anyhow!("HTTP error status {}", &resp.status()));
41       }
42       let resp = resp.into_body();
43       // xxx: some size limit to avoid mallocing the universe
44       let resp = hyper::body::to_bytes(resp).await
45         .context("HTTP error fetching response body")?;
46
47       Ok::<_,AE>(resp)
48     }).await? }.await;
49
50     let r = reporter.lock().report(r);
51
52     if r.is_none() {
53       tokio::time::sleep(ic.http_retry).await;
54     }
55     r
56   });
57   reqs.push(fut);
58 }
59
60 #[allow(unused_variables)] // xxx
61 async fn run_client<C:HCC>(
62   ic: InstanceConfig,
63   hclient: Arc<hyper::Client<C>>
64 ) -> Result<Void, AE>
65 {
66   debug!("{}: config: {:?}", &ic, &ic);
67
68   let reporter = parking_lot::Mutex::new(Reporter { });
69
70   let mut ipif = tokio::process::Command::new("sh")
71     .args(&["-c", &ic.ipif])
72     .stdin (process::Stdio::piped())
73     .stdout(process::Stdio::piped())
74     .stderr(process::Stdio::piped())
75     .kill_on_drop(true)
76     .spawn().context("spawn ipif")?;
77   
78   let stderr = ipif.stderr.take().unwrap();
79   let ic_name = ic.to_string();
80   let _ = task::spawn(async move {
81     let mut stderr = tokio::io::BufReader::new(stderr).lines();
82     while let Some(l) = stderr.next_line().await? {
83       error!("{}: ipif stderr: {}", &ic_name, l.trim_end());
84     }
85     Ok::<_,io::Error>(())
86   });
87
88   let tx_stream = ipif.stdout.take().unwrap();
89   let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_ESC);
90   let mut tx_defer = None;
91
92   let stream_for_rx = ipif.stdin .take().unwrap();
93
94   let mut reqs: Vec<OutstandingRequest>
95     = Vec::with_capacity(ic.max_requests_outstanding.sat());
96
97   // xxx check that ic settings are all honoured
98
99   async {
100     loop {
101       select! {
102         packet = async {
103           // cancellation safety: if this future is polled, we might
104           // move out of tx_defer, but then we will be immediately
105           // ready and yield it inot packet
106           if let Some(y) = tx_defer.take() {
107             Ok(Some(y))
108           } else {
109             tx_stream.next_segment().await
110           }
111         },
112         if tx_defer.is_none() &&
113            reqs.len() < ic.max_requests_outstanding.sat() =>
114         {
115           let mut upbound_total = 0;
116           let mut upbound: Vec<Vec<u8>> = vec![];
117           let mut to_process: Option<Result<Option<Vec<u8>>,_>>
118             = Some(packet);
119           while let Some(packet) = to_process.take() {
120             let mut packet =
121               packet.context("read from ipif")?
122               .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
123             if let Ok(()) = slip::check_checkmtu_mimeify(&mut packet, &ic) {
124               let new_upbound_total = packet.len() + upbound_total + 1;
125               if new_upbound_total > ic.max_batch_up.sat() {
126                 tx_defer = Some(packet);
127                 break;
128               }
129               upbound_total = new_upbound_total;
130               upbound.push(packet);
131               // we rely oin `next_segment` being cancellation-safe,
132               // which isn't documented as true but seems reasonably safe
133               pin!{ let next_segment = tx_stream.next_segment(); }
134               to_process = match poll!(next_segment) {
135                 Poll::Ready(p) => Some(p),
136                 Poll::Pending => None,
137               };
138             }
139           }
140           assert!( to_process.is_none() );
141           dbg!(&reqs.len(), &upbound_total, &upbound.len());
142
143           //: impl futures::Stream<Cow<&[u8]>>
144           submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
145         }
146
147         () = async { },
148         if reqs.len() < ic.target_requests_outstanding.sat() ||
149            (tx_defer.is_some() &&
150             reqs.len() < ic.max_requests_outstanding.sat()) =>
151         {
152           let upbound = tx_defer.take().into_iter().collect_vec();
153           submit_request(&ic, &hclient, &reporter, &mut reqs, upbound)?;
154         }
155
156         (got, goti, _) = async { future::select_all(&mut reqs).await },
157           if ! reqs.is_empty() =>
158         {
159           reqs.swap_remove(goti);
160           if let Some(got) = got {
161             dbg!(&got.remaining()); // xxx
162           }
163         }
164       }
165     }
166   }.await
167 }
168
169 #[tokio::main]
170 async fn main() -> Result<(), AE> {
171   let ics = config::read(LinkEnd::Client)?;
172   if ics.is_empty() { throw!(anyhow!("no associations with server(s)")); }
173
174   env_logger::init();
175
176   let https = HttpsConnector::new();
177   let hclient = hyper::Client::builder().build::<_, hyper::Body>(https);
178   let hclient = Arc::new(hclient);
179
180   info!("starting");
181   let () = future::select_all(
182     ics.into_iter().map(|ic| Box::pin(async {
183       let assocname = ic.to_string();
184       info!("{} starting", &assocname);
185       let hclient = hclient.clone();
186       let join = task::spawn(async {
187         run_client(ic, hclient).await.void_unwrap_err()
188       });
189       match join.await {
190         Ok(e) => {
191           error!("{} failed: {:?}", &assocname, e);
192         },
193         Err(je) => {
194           error!("{} panicked!", &assocname);
195           panic::resume_unwind(je.into_panic());
196         },
197       }
198     }))
199   ).await.0;
200
201   error!("quitting because one of your client connections crashed");
202   process::exit(16);
203 }