chiark / gitweb /
Apply OpenSSL exception to Hippotat files
[hippotat.git] / server / server.rs
1 // Copyright 2021-2022 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod daemon;
8 mod suser;
9 mod slocal;
10 mod sweb;
11
12 pub use daemon::Daemoniser;
13 pub use sweb::{WebRequest, WebResponse, WebResponseBody};
14 pub use suser::User;
15
16 #[derive(StructOpt,Debug)]
17 pub struct Opts {
18   #[structopt(flatten)]
19   pub log: LogOpts,
20
21   #[structopt(flatten)]
22   pub config: config::Opts,
23
24   /// Daemonise
25   #[structopt(long)]
26   daemon: bool,
27
28   /// Write our pid to this file
29   #[structopt(long)]
30   pidfile: Option<String>,
31
32   /// Print a config item, do not actually run
33   #[structopt(long)]
34   print_config: Option<String>,
35 }
36
37 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
38
39
40 // ----- Backpressure discussion -----
41
42 // These two kinds of channels are sent blockingly, so this means the
43 // task which calls route_packet can get this far ahead, before a
44 // context switch to the receiving task is forced.
45 pub const MAXQUEUE_ROUTE2USER: usize = 15;
46 pub const MAXQUEUE_ROUTE2LOCAL: usize = 50;
47
48 // This channel is sent with try_send, ie non-blocking.  If the user
49 // task becomes overloaded, requests will start to be rejected.
50 pub const MAXQUEUE_WEBREQ2USER: usize = 5;
51
52 // The user task prioritises 1. returning requests or discarding data,
53 // 2. handling data routed to it.  Ie it prefers to drain queues.
54 //
55 // The slocal task prioritises handling routed data and writing it
56 // (synchronously) to the local kernel.  So if the local kernel starts
57 // blocking, all tasks may end up blocked waiting for things to drain.
58
59
60 #[derive(Debug)]
61 pub struct Global {
62   config: config::InstanceConfigGlobal,
63   local_rx: mpsc::Sender<RoutedPacket>,
64   all_clients: HashMap<ClientName, User>,
65 }
66
67 pub struct RoutedPacket {
68   pub data: RoutedPacketData,
69 //  pub source: Option<ClientName>, // for eh, tracing, etc.
70 }
71
72 // not MIME data, valid SLIP (checked)
73 pub type RoutedPacketData = Box<[u8]>;
74
75 // loop prevention
76 // we don't decrement the ttl (naughty) but loops cannot arise
77 // because only the server has any routing code, and server
78 // has no internal loops, so worst case is
79 //  client if -> client -> server -> client' -> client if'
80 // and the ifs will decrement the ttl.
81 mod may_route {
82   #[derive(Clone,Debug)]
83   pub struct MayRoute(());
84   impl MayRoute {
85     pub fn came_from_outside_hippotatd() -> Self { Self(()) }
86   }
87 }
88 pub use may_route::MayRoute;
89
90 pub async fn route_packet(global: &Global,
91                           transport_conn: &str, source: Option<&ClientName>,
92                           packet: RoutedPacketData, daddr: IpAddr,
93                           _may_route: MayRoute)
94 {
95   let c = &global.config;
96   let len = packet.len();
97   let trace = |how: &str, why: &str| {
98     trace!("{} to={:?} came=={} user={} len={} {}",
99            how, daddr, transport_conn,
100            match source {
101              Some(s) => s as &dyn Display,
102              None => &"local",
103            },
104            len, why);
105   };
106
107   let (dest, why) =
108     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
109       (Some(&global.local_rx), "via=local")
110     } else if daddr == c.vrelay {
111       (None, " vrelay")
112     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
113       (Some(&client.route), "via=client")
114     } else {
115       (None, "no-client")
116     };
117
118   let dest = if let Some(d) = dest { d } else {
119     trace("discard ", why); return;
120   };
121
122   let packet = RoutedPacket {
123     data: packet,
124 //    source: source.cloned(),
125   };
126   match dest.send(packet).await {
127     Ok(()) => trace("forward", why),
128     Err(_) => trace("task-crashed!", why),
129   }
130 }
131
132 fn main() {
133   let opts = Opts::from_args();
134
135   let daemon = if opts.daemon && opts.print_config.is_none() {
136     Some(Daemoniser::phase1())
137   } else {
138     None
139   };
140
141   async_main(opts, daemon);
142 }
143
144 #[tokio::main]
145 async fn async_main(opts: Opts, daemon: Option<Daemoniser>) {
146   let mut tasks: Vec<(
147     JoinHandle<AE>,
148     String,
149   )> = vec![];
150
151   config::startup(
152     "hippotatd", LinkEnd::Server,
153     &opts.config, &opts.log, |ics|
154   {
155     let global_config = config::InstanceConfigGlobal::from(&ics);
156
157     if let Some(key) = opts.print_config.as_ref() {
158       if let Some(inspectable) = global_config.inspect_key(key) {
159         println!("{}", DisplayInspectable(inspectable));
160         process::exit(0);
161       } else {
162         throw!(anyhow!("unknown config key {:?}", key));
163       }
164     }
165
166     if let Some(pidfile_path) = opts.pidfile.as_ref() {
167       (||{
168         let mut pidfile = fs::File::create(pidfile_path).context("create")?;
169         writeln!(pidfile, "{}", process::id()).context("write")?;
170         pidfile.flush().context("write (flush)")?;
171         Ok::<_,AE>(())
172       })().with_context(|| format!("pidfile {:?}", pidfile_path))?;
173     }
174
175     let ipif = Ipif::start(&global_config.ipif, None)?;
176
177     let ics = ics.into_iter().map(Arc::new).collect_vec();
178     let (client_handles_send, client_handles_recv) = ics.iter()
179       .map(|_ic| {
180         let (web_send, web_recv) = mpsc::channel(
181           MAXQUEUE_WEBREQ2USER
182         );
183         let (route_send, route_recv) = mpsc::channel(
184           MAXQUEUE_ROUTE2USER
185         );
186         ((web_send, route_send), (web_recv, route_recv))
187       }).unzip::<_,_,Vec<_>,Vec<_>>();
188
189     let all_clients = izip!(
190       &ics,
191       client_handles_send,
192     ).map(|(ic, (web_send, route_send))| {
193       (ic.link.client,
194        User {
195          ic: ic.clone(),
196          web: web_send,
197          route: route_send,
198        })
199     }).collect();
200
201     let (local_rx_send, local_tx_recv) = mpsc::channel(
202       MAXQUEUE_ROUTE2LOCAL
203     );
204
205     let global = Arc::new(Global {
206       config: global_config,
207       local_rx: local_rx_send,
208       all_clients,
209     });
210
211     for (ic, (web_recv, route_recv)) in izip!(
212       ics,
213       client_handles_recv,
214     ) {
215       let global_ = global.clone();
216       let ic_ = ic.clone();
217       tasks.push((tokio::spawn(async move {
218         suser::run(global_, ic_, web_recv, route_recv)
219           .await.void_unwrap_err()
220       }), format!("client {}", &ic)));
221     }
222
223     for addr in &global.config.addrs {
224       let global_ = global.clone();
225       let make_service = hyper::service::make_service_fn(
226         move |conn: &hyper::server::conn::AddrStream| {
227           let global_ = global_.clone();
228           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
229           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
230             AssertUnwindSafe(
231               sweb::handle(conn.clone(), global_.clone(), req)
232             )
233               .catch_unwind()
234               .map(|r| r.unwrap_or_else(|_|{
235                 crash(Err("panicked".into()), "webserver request task")
236               }))
237           }) ) }
238         }
239       );
240
241       let addr = SocketAddr::new(*addr, global.config.port);
242       let server = hyper::Server::try_bind(&addr)
243         .context("bind")?
244         .http1_preserve_header_case(true)
245         .serve(make_service);
246       info!("listening on {}", &addr);
247       let task = tokio::task::spawn(async move {
248         match server.await {
249           Ok(()) => anyhow!("shut down?!"),
250           Err(e) => e.into(),
251         }
252       });
253       tasks.push((task, format!("http server {}", addr)));
254     }
255
256     let global_ = global.clone();
257     let ipif = tokio::task::spawn(async move {
258       slocal::run(global_, local_tx_recv, ipif).await
259         .void_unwrap_err()
260     });
261     tasks.push((ipif, format!("ipif")));
262
263     Ok(())
264   });
265
266   if let Some(daemon) = daemon {
267     daemon.complete();
268   }
269
270   let (output, died_i, _) = future::select_all(
271     tasks.iter_mut().map(|e| &mut e.0)
272   ).await;
273
274   let task = &tasks[died_i].1;
275   let output = output.map_err(|je| je.to_string());
276   crash(output, task);
277 }
278
279 pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
280   match what_happened {
281     Err(je) => error!("task crashed! {}: {}", task, &je),
282     Ok(e)   => error!("task failed! {}: {}",   task, &e ),
283   }
284   process::exit(12);
285 }