chiark / gitweb /
daemon: wip
[hippotat.git] / server / server.rs
1 // Copyright 2021-2022 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod daemon;
8 mod suser;
9 mod slocal;
10 mod sweb;
11
12 pub use daemon::Daemoniser;
13 pub use sweb::{WebRequest, WebResponse, WebResponseBody};
14 pub use suser::User;
15
16 #[derive(StructOpt,Debug)]
17 pub struct Opts {
18   #[structopt(flatten)]
19   pub log: LogOpts,
20
21   #[structopt(flatten)]
22   pub config: config::Opts,
23
24   /// Daemonise
25   #[structopt(long)]
26   daemon: bool,
27 }
28
29 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
30
31
32 // ----- Backpressure discussion -----
33
34 // These two kinds of channels are sent blockingly, so this means the
35 // task which calls route_packet can get this far ahead, before a
36 // context switch to the receiving task is forced.
37 pub const MAXQUEUE_ROUTE2USER: usize = 15;
38 pub const MAXQUEUE_ROUTE2LOCAL: usize = 50;
39
40 // This channel is sent with try_send, ie non-blocking.  If the user
41 // task becomes overloaded, requests will start to be rejected.
42 pub const MAXQUEUE_WEBREQ2USER: usize = 5;
43
44 // The user task prioritises 1. returning requests or discarding data,
45 // 2. handling data routed to it.  Ie it prefers to drain queues.
46 //
47 // The slocal task prioritises handling routed data and writing it
48 // (synchronously) to the local kernel.  So if the local kernel starts
49 // blocking, all tasks may end up blocked waiting for things to drain.
50
51
52 #[derive(Debug)]
53 pub struct Global {
54   config: config::InstanceConfigGlobal,
55   local_rx: mpsc::Sender<RoutedPacket>,
56   all_clients: HashMap<ClientName, User>,
57 }
58
59 pub struct RoutedPacket {
60   pub data: RoutedPacketData,
61 //  pub source: Option<ClientName>, // for eh, tracing, etc.
62 }
63
64 // not MIME data, valid SLIP (checked)
65 pub type RoutedPacketData = Box<[u8]>;
66
67 // loop prevention
68 // we don't decrement the ttl (naughty) but loops cannot arise
69 // because only the server has any routing code, and server
70 // has no internal loops, so worst case is
71 //  client if -> client -> server -> client' -> client if'
72 // and the ifs will decrement the ttl.
73 mod may_route {
74   #[derive(Clone,Debug)]
75   pub struct MayRoute(());
76   impl MayRoute {
77     pub fn came_from_outside_hippotatd() -> Self { Self(()) }
78   }
79 }
80 pub use may_route::MayRoute;
81
82 pub async fn route_packet(global: &Global,
83                           transport_conn: &str, source: Option<&ClientName>,
84                           packet: RoutedPacketData, daddr: IpAddr,
85                           _may_route: MayRoute)
86 {
87   let c = &global.config;
88   let len = packet.len();
89   let trace = |how: &str, why: &str| {
90     trace!("{} to={:?} came=={} user={} len={} {}",
91            how, daddr, transport_conn,
92            match source {
93              Some(s) => s as &dyn Display,
94              None => &"local",
95            },
96            len, why);
97   };
98
99   let (dest, why) =
100     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
101       (Some(&global.local_rx), "via=local")
102     } else if daddr == c.vrelay {
103       (None, " vrelay")
104     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
105       (Some(&client.route), "via=client")
106     } else {
107       (None, "no-client")
108     };
109
110   let dest = if let Some(d) = dest { d } else {
111     trace("discard ", why); return;
112   };
113
114   let packet = RoutedPacket {
115     data: packet,
116 //    source: source.cloned(),
117   };
118   match dest.send(packet).await {
119     Ok(()) => trace("forward", why),
120     Err(_) => trace("task-crashed!", why),
121   }
122 }
123
124 fn main() {
125   let opts = Opts::from_args();
126   let daemon = if opts.daemon {
127     Some(Daemoniser::phase1())
128   } else {
129     None
130   };
131
132   async_main(opts, daemon);
133 }
134
135 #[tokio::main]
136 async fn async_main(opts: Opts, daemon: Option<Daemoniser>) {
137   let mut tasks: Vec<(
138     JoinHandle<AE>,
139     String,
140   )> = vec![];
141
142   config::startup(
143     "hippotatd", LinkEnd::Server,
144     &opts.config, &opts.log, |ics|
145   {
146     let global_config = config::InstanceConfigGlobal::from(&ics);
147
148     let ipif = Ipif::start(&global_config.ipif, None)?;
149
150     let ics = ics.into_iter().map(Arc::new).collect_vec();
151     let (client_handles_send, client_handles_recv) = ics.iter()
152       .map(|_ic| {
153         let (web_send, web_recv) = mpsc::channel(
154           MAXQUEUE_WEBREQ2USER
155         );
156         let (route_send, route_recv) = mpsc::channel(
157           MAXQUEUE_ROUTE2USER
158         );
159         ((web_send, route_send), (web_recv, route_recv))
160       }).unzip::<_,_,Vec<_>,Vec<_>>();
161
162     let all_clients = izip!(
163       &ics,
164       client_handles_send,
165     ).map(|(ic, (web_send, route_send))| {
166       (ic.link.client,
167        User {
168          ic: ic.clone(),
169          web: web_send,
170          route: route_send,
171        })
172     }).collect();
173
174     let (local_rx_send, local_tx_recv) = mpsc::channel(
175       MAXQUEUE_ROUTE2LOCAL
176     );
177
178     let global = Arc::new(Global {
179       config: global_config,
180       local_rx: local_rx_send,
181       all_clients,
182     });
183
184     for (ic, (web_recv, route_recv)) in izip!(
185       ics,
186       client_handles_recv,
187     ) {
188       let global_ = global.clone();
189       let ic_ = ic.clone();
190       tasks.push((tokio::spawn(async move {
191         suser::run(global_, ic_, web_recv, route_recv)
192           .await.void_unwrap_err()
193       }), format!("client {}", &ic)));
194     }
195
196     for addr in &global.config.addrs {
197       let global_ = global.clone();
198       let make_service = hyper::service::make_service_fn(
199         move |conn: &hyper::server::conn::AddrStream| {
200           let global_ = global_.clone();
201           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
202           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
203             AssertUnwindSafe(
204               sweb::handle(conn.clone(), global_.clone(), req)
205             )
206               .catch_unwind()
207               .map(|r| r.unwrap_or_else(|_|{
208                 crash(Err("panicked".into()), "webserver request task")
209               }))
210           }) ) }
211         }
212       );
213
214       let addr = SocketAddr::new(*addr, global.config.port);
215       let server = hyper::Server::try_bind(&addr)
216         .context("bind")?
217         .http1_preserve_header_case(true)
218         .serve(make_service);
219       info!("listening on {}", &addr);
220       let task = tokio::task::spawn(async move {
221         match server.await {
222           Ok(()) => anyhow!("shut down?!"),
223           Err(e) => e.into(),
224         }
225       });
226       tasks.push((task, format!("http server {}", addr)));
227     }
228
229     let global_ = global.clone();
230     let ipif = tokio::task::spawn(async move {
231       slocal::run(global_, local_tx_recv, ipif).await
232         .void_unwrap_err()
233     });
234     tasks.push((ipif, format!("ipif")));
235
236     Ok(())
237   });
238
239   if let Some(daemon) = daemon {
240     daemon.complete();
241   }
242
243   let (output, died_i, _) = future::select_all(
244     tasks.iter_mut().map(|e| &mut e.0)
245   ).await;
246
247   let task = &tasks[died_i].1;
248   let output = output.map_err(|je| je.to_string());
249   crash(output, task);
250 }
251
252 pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
253   match what_happened {
254     Err(je) => error!("task crashed! {}: {}", task, &je),
255     Ok(e)   => error!("task failed! {}: {}",   task, &e ),
256   }
257   process::exit(12);
258 }