1 // Copyright 2021-2022 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later WITH LicenseRef-Hippotat-OpenSSL-Exception
3 // There is NO WARRANTY.
5 use hippotat::prelude::*;
12 pub use daemon::Daemoniser;
13 pub use sweb::{WebRequest, WebResponse, WebResponseBody};
16 #[derive(StructOpt,Debug)]
22 pub config: config::Opts,
28 /// Write our pid to this file
30 pidfile: Option<String>,
32 /// Print a config item, do not actually run
34 print_config: Option<String>,
37 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
40 // ----- Backpressure discussion -----
42 // These two kinds of channels are sent blockingly, so this means the
43 // task which calls route_packet can get this far ahead, before a
44 // context switch to the receiving task is forced.
45 pub const MAXQUEUE_ROUTE2USER: usize = 15;
46 pub const MAXQUEUE_ROUTE2LOCAL: usize = 50;
48 // This channel is sent with try_send, ie non-blocking. If the user
49 // task becomes overloaded, requests will start to be rejected.
50 pub const MAXQUEUE_WEBREQ2USER: usize = 5;
52 // The user task prioritises 1. returning requests or discarding data,
53 // 2. handling data routed to it. Ie it prefers to drain queues.
55 // The slocal task prioritises handling routed data and writing it
56 // (synchronously) to the local kernel. So if the local kernel starts
57 // blocking, all tasks may end up blocked waiting for things to drain.
62 config: config::InstanceConfigGlobal,
63 local_rx: mpsc::Sender<RoutedPacket>,
64 all_clients: HashMap<ClientName, User>,
67 pub struct RoutedPacket {
68 pub data: RoutedPacketData,
69 // pub source: Option<ClientName>, // for eh, tracing, etc.
72 // not MIME data, valid SLIP (checked)
73 pub type RoutedPacketData = Box<[u8]>;
76 // we don't decrement the ttl (naughty) but loops cannot arise
77 // because only the server has any routing code, and server
78 // has no internal loops, so worst case is
79 // client if -> client -> server -> client' -> client if'
80 // and the ifs will decrement the ttl.
82 #[derive(Clone,Debug)]
83 pub struct MayRoute(());
85 pub fn came_from_outside_hippotatd() -> Self { Self(()) }
88 pub use may_route::MayRoute;
90 pub async fn route_packet(global: &Global,
91 transport_conn: &str, source: Option<&ClientName>,
92 packet: RoutedPacketData, daddr: IpAddr,
95 let c = &global.config;
96 let len = packet.len();
97 let trace = |how: &str, why: &str| {
98 trace!("{} to={:?} came=={} user={} len={} {}",
99 how, daddr, transport_conn,
101 Some(s) => s as &dyn Display,
108 if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
109 (Some(&global.local_rx), "via=local")
110 } else if daddr == c.vrelay {
112 } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
113 (Some(&client.route), "via=client")
118 let dest = if let Some(d) = dest { d } else {
119 trace("discard ", why); return;
122 let packet = RoutedPacket {
124 // source: source.cloned(),
126 match dest.send(packet).await {
127 Ok(()) => trace("forward", why),
128 Err(_) => trace("task-crashed!", why),
133 let opts = Opts::from_args();
135 let daemon = if opts.daemon && opts.print_config.is_none() {
136 Some(Daemoniser::phase1())
141 async_main(opts, daemon);
145 async fn async_main(opts: Opts, daemon: Option<Daemoniser>) {
152 "hippotatd", LinkEnd::Server,
153 &opts.config, &opts.log, |ics|
155 let global_config = config::InstanceConfigGlobal::from(&ics);
157 if let Some(key) = opts.print_config.as_ref() {
158 if let Some(inspectable) = global_config.inspect_key(key) {
159 println!("{}", DisplayInspectable(inspectable));
162 throw!(anyhow!("unknown config key {:?}", key));
166 if let Some(pidfile_path) = opts.pidfile.as_ref() {
168 let mut pidfile = fs::File::create(pidfile_path).context("create")?;
169 writeln!(pidfile, "{}", process::id()).context("write")?;
170 pidfile.flush().context("write (flush)")?;
172 })().with_context(|| format!("pidfile {:?}", pidfile_path))?;
175 let ipif = Ipif::start(&global_config.ipif, None)?;
177 let ics = ics.into_iter().map(Arc::new).collect_vec();
178 let (client_handles_send, client_handles_recv) = ics.iter()
180 let (web_send, web_recv) = mpsc::channel(
183 let (route_send, route_recv) = mpsc::channel(
186 ((web_send, route_send), (web_recv, route_recv))
187 }).unzip::<_,_,Vec<_>,Vec<_>>();
189 let all_clients = izip!(
192 ).map(|(ic, (web_send, route_send))| {
201 let (local_rx_send, local_tx_recv) = mpsc::channel(
205 let global = Arc::new(Global {
206 config: global_config,
207 local_rx: local_rx_send,
211 for (ic, (web_recv, route_recv)) in izip!(
215 let global_ = global.clone();
216 let ic_ = ic.clone();
217 tasks.push((tokio::spawn(async move {
218 suser::run(global_, ic_, web_recv, route_recv)
219 .await.void_unwrap_err()
220 }), format!("client {}", &ic)));
223 for addr in &global.config.addrs {
224 let global_ = global.clone();
225 let make_service = hyper::service::make_service_fn(
226 move |conn: &hyper::server::conn::AddrStream| {
227 let global_ = global_.clone();
228 let conn = Arc::new(format!("[{}]", conn.remote_addr()));
229 async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
231 sweb::handle(conn.clone(), global_.clone(), req)
234 .map(|r| r.unwrap_or_else(|_|{
235 crash(Err("panicked".into()), "webserver request task")
241 let addr = SocketAddr::new(*addr, global.config.port);
242 let server = hyper::Server::try_bind(&addr)
244 .http1_preserve_header_case(true)
245 .serve(make_service);
246 info!("listening on {}", &addr);
247 let task = tokio::task::spawn(async move {
249 Ok(()) => anyhow!("shut down?!"),
253 tasks.push((task, format!("http server {}", addr)));
256 let global_ = global.clone();
257 let ipif = tokio::task::spawn(async move {
258 slocal::run(global_, local_tx_recv, ipif).await
261 tasks.push((ipif, format!("ipif")));
266 if let Some(daemon) = daemon {
270 let (output, died_i, _) = future::select_all(
271 tasks.iter_mut().map(|e| &mut e.0)
274 let task = &tasks[died_i].1;
275 let output = output.map_err(|je| je.to_string());
279 pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
280 match what_happened {
281 Err(je) => error!("task crashed! {}: {}", task, &je),
282 Ok(e) => error!("task failed! {}: {}", task, &e ),