chiark / gitweb /
53286e7d1b96654fb732f7639d60852aa38cad9d
[hippotat.git] / server / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod suser;
8 mod slocal;
9 mod sweb;
10
11 pub use sweb::{WebRequest, WebResponse, WebResponseBody};
12 pub use suser::User;
13
14 #[derive(StructOpt,Debug)]
15 pub struct Opts {
16   #[structopt(flatten)]
17   pub log: LogOpts,
18
19   #[structopt(flatten)]
20   pub config: config::Opts,
21 }
22
23 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
24
25
26 // ----- Backpressure discussion -----
27
28 // These two kinds of channels are sent blockingly, so this means the
29 // task which calls route_packet can get this far ahead, before a
30 // context switch to the receiving task is forced.
31 pub const MAXQUEUE_ROUTE2USER: usize = 15;
32 pub const MAXQUEUE_ROUTE2LOCAL: usize = 50;
33
34 // This channel is sent with try_send, ie non-blocking.  If the user
35 // task becomes overloaded, requests will start to be rejected.
36 pub const MAXQUEUE_WEBREQ2USER: usize = 5;
37
38 // The user task prioritises 1. returning requests or discarding data,
39 // 2. handling data routed to it.  Ie it prefers to drain queues.
40 //
41 // The slocal task prioritises handling routed data and writing it
42 // (synchronously) to the local kernel.  So if the local kernel starts
43 // blocking, all tasks may end up blocked waiting for things to drain.
44
45
46 #[derive(Debug)]
47 pub struct Global {
48   config: config::InstanceConfigGlobal,
49   local_rx: mpsc::Sender<RoutedPacket>,
50   all_clients: HashMap<ClientName, User>,
51 }
52
53 pub struct RoutedPacket {
54   pub data: RoutedPacketData,
55 //  pub source: Option<ClientName>, // for eh, tracing, etc.
56 }
57
58 // not MIME data, valid SLIP (checked)
59 pub type RoutedPacketData = Box<[u8]>;
60
61 // loop prevention
62 // we don't decrement the ttl (naughty) but loops cannot arise
63 // because only the server has any routing code, and server
64 // has no internal loops, so worst case is
65 //  client if -> client -> server -> client' -> client if'
66 // and the ifs will decrement the ttl.
67 mod may_route {
68   #[derive(Clone,Debug)]
69   pub struct MayRoute(());
70   impl MayRoute {
71     pub fn came_from_outside_hippotatd() -> Self { Self(()) }
72   }
73 }
74 pub use may_route::MayRoute;
75
76 pub async fn route_packet(global: &Global,
77                           transport_conn: &str, source: Option<&ClientName>,
78                           packet: RoutedPacketData, daddr: IpAddr,
79                           _may_route: MayRoute)
80 {
81   let c = &global.config;
82   let len = packet.len();
83   let trace = |how: &str, why: &str| {
84     trace!("{} to={:?} came=={} user={} len={} {}",
85            how, daddr, transport_conn,
86            match source {
87              Some(s) => s as &dyn Display,
88              None => &"local",
89            },
90            len, why);
91   };
92
93   let (dest, why) =
94     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
95       (Some(&global.local_rx), "via=local")
96     } else if daddr == c.vrelay {
97       (None, " vrelay")
98     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
99       (Some(&client.route), "via=client")
100     } else {
101       (None, "no-client")
102     };
103
104   let dest = if let Some(d) = dest { d } else {
105     trace("discard ", why); return;
106   };
107
108   let packet = RoutedPacket {
109     data: packet,
110 //    source: source.cloned(),
111   };
112   match dest.send(packet).await {
113     Ok(()) => trace("forward", why),
114     Err(_) => trace("task-crashed!", why),
115   }
116 }
117
118 #[tokio::main]
119 async fn main() {
120   let opts = Opts::from_args();
121   let mut tasks: Vec<(
122     JoinHandle<AE>,
123     String,
124   )> = vec![];
125
126   config::startup(
127     "hippotatd", LinkEnd::Server,
128     &opts.config, &opts.log, |ics|
129   {
130     let global_config = config::InstanceConfigGlobal::from(&ics);
131
132     let ipif = Ipif::start(&global_config.ipif, None)?;
133
134     let ics = ics.into_iter().map(Arc::new).collect_vec();
135     let (client_handles_send, client_handles_recv) = ics.iter()
136       .map(|_ic| {
137         let (web_send, web_recv) = mpsc::channel(
138           MAXQUEUE_WEBREQ2USER
139         );
140         let (route_send, route_recv) = mpsc::channel(
141           MAXQUEUE_ROUTE2USER
142         );
143         ((web_send, route_send), (web_recv, route_recv))
144       }).unzip::<_,_,Vec<_>,Vec<_>>();
145
146     let all_clients = izip!(
147       &ics,
148       client_handles_send,
149     ).map(|(ic, (web_send, route_send))| {
150       (ic.link.client,
151        User {
152          ic: ic.clone(),
153          web: web_send,
154          route: route_send,
155        })
156     }).collect();
157
158     let (local_rx_send, local_tx_recv) = mpsc::channel(
159       MAXQUEUE_ROUTE2LOCAL
160     );
161
162     let global = Arc::new(Global {
163       config: global_config,
164       local_rx: local_rx_send,
165       all_clients,
166     });
167
168     for (ic, (web_recv, route_recv)) in izip!(
169       ics,
170       client_handles_recv,
171     ) {
172       let global_ = global.clone();
173       let ic_ = ic.clone();
174       tasks.push((tokio::spawn(async move {
175         suser::run(global_, ic_, web_recv, route_recv)
176           .await.void_unwrap_err()
177       }), format!("client {}", &ic)));
178     }
179
180     for addr in &global.config.addrs {
181       let global_ = global.clone();
182       let make_service = hyper::service::make_service_fn(
183         move |conn: &hyper::server::conn::AddrStream| {
184           let global_ = global_.clone();
185           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
186           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
187             AssertUnwindSafe(
188               sweb::handle(conn.clone(), global_.clone(), req)
189             )
190               .catch_unwind()
191               .map(|r| r.unwrap_or_else(|_|{
192                 crash(Err("panicked".into()), "webserver request task")
193               }))
194           }) ) }
195         }
196       );
197
198       let addr = SocketAddr::new(*addr, global.config.port);
199       let server = hyper::Server::try_bind(&addr)
200         .context("bind")?
201         .http1_preserve_header_case(true)
202         .serve(make_service);
203       info!("listening on {}", &addr);
204       let task = tokio::task::spawn(async move {
205         match server.await {
206           Ok(()) => anyhow!("shut down?!"),
207           Err(e) => e.into(),
208         }
209       });
210       tasks.push((task, format!("http server {}", addr)));
211     }
212
213     let global_ = global.clone();
214     let ipif = tokio::task::spawn(async move {
215       slocal::run(global_, local_tx_recv, ipif).await
216         .void_unwrap_err()
217     });
218     tasks.push((ipif, format!("ipif")));
219
220     Ok(())
221   });
222
223   let (output, died_i, _) = future::select_all(
224     tasks.iter_mut().map(|e| &mut e.0)
225   ).await;
226
227   let task = &tasks[died_i].1;
228   let output = output.map_err(|je| je.to_string());
229   crash(output, task);
230 }
231
232 pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
233   match what_happened {
234     Err(je) => error!("task crashed! {}: {}", task, &je),
235     Ok(e)   => error!("task failed! {}: {}",   task, &e ),
236   }
237   process::exit(12);
238 }