chiark / gitweb /
client: wip code
[hippotat.git] / src / bin / client.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: AGPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 /*
8 struct Client {
9   requests_outstanding: Vec<>,
10   tx_read_stream: something,
11   rx_write_stream: something,
12 }*/
13
14 #[allow(unused_variables)] // xxx
15 async fn run_client<C>(ic: InstanceConfig, hclient: Arc<hyper::Client<C>>)
16                        -> Result<Void, AE>
17 where C: hyper::client::connect::Connect + Clone + Send + Sync + 'static,
18 {
19   debug!("{}: config: {:?}", &ic, &ic);
20
21   let mut reporter = Reporter { };
22
23   let mut ipif = tokio::process::Command::new("sh")
24     .args(&["-c", &ic.ipif])
25     .stdin (process::Stdio::piped())
26     .stdout(process::Stdio::piped())
27     .stderr(process::Stdio::piped())
28     .kill_on_drop(true)
29     .spawn().context("spawn ipif")?;
30   
31   let stderr = ipif.stderr.take().unwrap();
32   let ic_name = ic.to_string();
33   let _ = task::spawn(async move {
34     let mut stderr = tokio::io::BufReader::new(stderr).lines();
35     while let Some(l) = stderr.next_line().await? {
36       error!("{}: ipif stderr: {}", &ic_name, l.trim_end());
37     }
38     Ok::<_,io::Error>(())
39   });
40
41   let tx_stream = ipif.stdout.take().unwrap();
42   let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_ESC);
43   let mut tx_defer = None;
44
45   let stream_for_rx = ipif.stdin .take().unwrap();
46
47   let mut reqs: Vec<Pin<Box<
48       dyn Future<Output=Result<Bytes,AE>>
49       >>> = Vec::with_capacity(ic.max_requests_outstanding.sat());
50
51   // xxx check that ic settings are all honoured
52
53   async {
54     loop {
55       select! {
56         packet = tx_stream.next_segment(),
57         if tx_defer.is_none() &&
58            reqs.len() < ic.max_requests_outstanding.sat() =>
59         {
60           let mut upbound_total = 0;
61           let mut upbound: Vec<Vec<u8>> = vec![];
62           let mut to_process: Option<Result<Option<Vec<u8>>,_>>
63             = Some(packet);
64           while let Some(packet) = to_process.take() {
65             let mut packet =
66               packet.context("read from ipif")?
67               .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
68             if let Ok(()) = slip::check_checkmtu_mimeify(&mut packet, &ic) {
69               let new_upbound_total = packet.len() + upbound_total + 1;
70               if new_upbound_total > ic.max_batch_up.sat() {
71                 tx_defer = Some(packet);
72                 break;
73               }
74               upbound_total = new_upbound_total;
75               upbound.push(packet);
76               // we rely oin `next_segment` being cancellation-safe,
77               // which isn't documented as true but seems reasonably safe
78               pin!{ let next_segment = tx_stream.next_segment(); }
79               to_process = match poll!(next_segment) {
80                 Poll::Ready(p) => Some(p),
81                 Poll::Pending => None,
82               };
83             }
84           }
85           assert!( to_process.is_none() );
86           dbg!(&reqs.len(), &upbound_total, &upbound.len());
87
88           //: impl futures::Stream<Cow<&[u8]>>
89           let body = hyper::body::Body::wrap_stream(
90             futures::stream::iter(
91               Itertools::intersperse(
92                 upbound.into_iter().map(|u| Bytes::from(u)),
93                 slip::SLIP_END_SLICE.into()
94               ).map(Ok::<_,Void>)
95             )
96           );
97
98           let req = hyper::Request::post(&ic.url).body(body)
99             .context("construct request")?;
100
101           let resp = hclient.request(req);
102           let fut = Box::pin(async {
103             let r = async { tokio::time::timeout( ic.http_timeout, async {
104               let resp = resp.await.context("make request")?;
105               if ! resp.status().is_success() {
106                 throw!(anyhow!("HTTP error status {}", &resp.status()));
107               }
108               let resp = resp.into_body();
109               // xxx: some size limit to avoid mallocing the universe
110               let resp = hyper::body::to_bytes(resp).await
111                 .context("HTTP error fetching response body")?;
112               Ok::<_,AE>(resp)
113             }).await? }.await;
114             if r.is_err() {
115               tokio::time::sleep(ic.http_retry).await;
116             }
117             r
118           });
119           reqs.push(fut);
120         }
121
122         (got, goti, _) = async { future::select_all(&mut reqs).await },
123           if ! reqs.is_empty() =>
124         {
125           reqs.swap_remove(goti);
126           if let Some(got) = reporter.report(got) {
127             dbg!(&got.remaining()); // xxx
128           }
129         }
130       }
131     }
132   }.await
133 }
134
135 #[tokio::main]
136 async fn main() -> Result<(), AE> {
137   let ics = config::read(LinkEnd::Client)?;
138   if ics.is_empty() { throw!(anyhow!("no associations with server(s)")); }
139
140   env_logger::init();
141
142   let https = HttpsConnector::new();
143   let hclient = hyper::Client::builder().build::<_, hyper::Body>(https);
144   let hclient = Arc::new(hclient);
145
146   info!("starting");
147   let () = future::select_all(
148     ics.into_iter().map(|ic| Box::pin(async {
149       let assocname = ic.to_string();
150       info!("{} starting", &assocname);
151       let hclient = hclient.clone();
152       let join = task::spawn(async {
153         run_client(ic, hclient).await.void_unwrap_err()
154       });
155       match join.await {
156         Ok(e) => {
157           error!("{} failed: {:?}", &assocname, e);
158         },
159         Err(je) => {
160           error!("{} panicked!", &assocname);
161           panic::resume_unwind(je.into_panic());
162         },
163       }
164     }))
165   ).await.0;
166
167   error!("quitting because one of your client connections crashed");
168   process::exit(16);
169 }