chiark / gitweb /
print config value
[hippotat.git] / server / server.rs
1 // Copyright 2021-2022 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod daemon;
8 mod suser;
9 mod slocal;
10 mod sweb;
11
12 pub use daemon::Daemoniser;
13 pub use sweb::{WebRequest, WebResponse, WebResponseBody};
14 pub use suser::User;
15
16 #[derive(StructOpt,Debug)]
17 pub struct Opts {
18   #[structopt(flatten)]
19   pub log: LogOpts,
20
21   #[structopt(flatten)]
22   pub config: config::Opts,
23
24   /// Daemonise
25   #[structopt(long)]
26   daemon: bool,
27
28   /// Print a config item, do not actually run
29   #[structopt(long)]
30   print_config: Option<String>,
31 }
32
33 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
34
35
36 // ----- Backpressure discussion -----
37
38 // These two kinds of channels are sent blockingly, so this means the
39 // task which calls route_packet can get this far ahead, before a
40 // context switch to the receiving task is forced.
41 pub const MAXQUEUE_ROUTE2USER: usize = 15;
42 pub const MAXQUEUE_ROUTE2LOCAL: usize = 50;
43
44 // This channel is sent with try_send, ie non-blocking.  If the user
45 // task becomes overloaded, requests will start to be rejected.
46 pub const MAXQUEUE_WEBREQ2USER: usize = 5;
47
48 // The user task prioritises 1. returning requests or discarding data,
49 // 2. handling data routed to it.  Ie it prefers to drain queues.
50 //
51 // The slocal task prioritises handling routed data and writing it
52 // (synchronously) to the local kernel.  So if the local kernel starts
53 // blocking, all tasks may end up blocked waiting for things to drain.
54
55
56 #[derive(Debug)]
57 pub struct Global {
58   config: config::InstanceConfigGlobal,
59   local_rx: mpsc::Sender<RoutedPacket>,
60   all_clients: HashMap<ClientName, User>,
61 }
62
63 pub struct RoutedPacket {
64   pub data: RoutedPacketData,
65 //  pub source: Option<ClientName>, // for eh, tracing, etc.
66 }
67
68 // not MIME data, valid SLIP (checked)
69 pub type RoutedPacketData = Box<[u8]>;
70
71 // loop prevention
72 // we don't decrement the ttl (naughty) but loops cannot arise
73 // because only the server has any routing code, and server
74 // has no internal loops, so worst case is
75 //  client if -> client -> server -> client' -> client if'
76 // and the ifs will decrement the ttl.
77 mod may_route {
78   #[derive(Clone,Debug)]
79   pub struct MayRoute(());
80   impl MayRoute {
81     pub fn came_from_outside_hippotatd() -> Self { Self(()) }
82   }
83 }
84 pub use may_route::MayRoute;
85
86 pub async fn route_packet(global: &Global,
87                           transport_conn: &str, source: Option<&ClientName>,
88                           packet: RoutedPacketData, daddr: IpAddr,
89                           _may_route: MayRoute)
90 {
91   let c = &global.config;
92   let len = packet.len();
93   let trace = |how: &str, why: &str| {
94     trace!("{} to={:?} came=={} user={} len={} {}",
95            how, daddr, transport_conn,
96            match source {
97              Some(s) => s as &dyn Display,
98              None => &"local",
99            },
100            len, why);
101   };
102
103   let (dest, why) =
104     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
105       (Some(&global.local_rx), "via=local")
106     } else if daddr == c.vrelay {
107       (None, " vrelay")
108     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
109       (Some(&client.route), "via=client")
110     } else {
111       (None, "no-client")
112     };
113
114   let dest = if let Some(d) = dest { d } else {
115     trace("discard ", why); return;
116   };
117
118   let packet = RoutedPacket {
119     data: packet,
120 //    source: source.cloned(),
121   };
122   match dest.send(packet).await {
123     Ok(()) => trace("forward", why),
124     Err(_) => trace("task-crashed!", why),
125   }
126 }
127
128 fn main() {
129   let opts = Opts::from_args();
130   let daemon = if opts.daemon && opts.print_config.is_none() {
131     Some(Daemoniser::phase1())
132   } else {
133     None
134   };
135
136   async_main(opts, daemon);
137 }
138
139 #[tokio::main]
140 async fn async_main(opts: Opts, daemon: Option<Daemoniser>) {
141   let mut tasks: Vec<(
142     JoinHandle<AE>,
143     String,
144   )> = vec![];
145
146   config::startup(
147     "hippotatd", LinkEnd::Server,
148     &opts.config, &opts.log, |ics|
149   {
150     let global_config = config::InstanceConfigGlobal::from(&ics);
151
152     if let Some(key) = opts.print_config.as_ref() {
153       if let Some(inspectable) = global_config.inspect_key(key) {
154         println!("{}", DisplayInspectable(inspectable));
155         process::exit(0);
156       } else {
157         throw!(anyhow!("unknown config key {:?}", key));
158       }
159     }
160
161     let ipif = Ipif::start(&global_config.ipif, None)?;
162
163     let ics = ics.into_iter().map(Arc::new).collect_vec();
164     let (client_handles_send, client_handles_recv) = ics.iter()
165       .map(|_ic| {
166         let (web_send, web_recv) = mpsc::channel(
167           MAXQUEUE_WEBREQ2USER
168         );
169         let (route_send, route_recv) = mpsc::channel(
170           MAXQUEUE_ROUTE2USER
171         );
172         ((web_send, route_send), (web_recv, route_recv))
173       }).unzip::<_,_,Vec<_>,Vec<_>>();
174
175     let all_clients = izip!(
176       &ics,
177       client_handles_send,
178     ).map(|(ic, (web_send, route_send))| {
179       (ic.link.client,
180        User {
181          ic: ic.clone(),
182          web: web_send,
183          route: route_send,
184        })
185     }).collect();
186
187     let (local_rx_send, local_tx_recv) = mpsc::channel(
188       MAXQUEUE_ROUTE2LOCAL
189     );
190
191     let global = Arc::new(Global {
192       config: global_config,
193       local_rx: local_rx_send,
194       all_clients,
195     });
196
197     for (ic, (web_recv, route_recv)) in izip!(
198       ics,
199       client_handles_recv,
200     ) {
201       let global_ = global.clone();
202       let ic_ = ic.clone();
203       tasks.push((tokio::spawn(async move {
204         suser::run(global_, ic_, web_recv, route_recv)
205           .await.void_unwrap_err()
206       }), format!("client {}", &ic)));
207     }
208
209     for addr in &global.config.addrs {
210       let global_ = global.clone();
211       let make_service = hyper::service::make_service_fn(
212         move |conn: &hyper::server::conn::AddrStream| {
213           let global_ = global_.clone();
214           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
215           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
216             AssertUnwindSafe(
217               sweb::handle(conn.clone(), global_.clone(), req)
218             )
219               .catch_unwind()
220               .map(|r| r.unwrap_or_else(|_|{
221                 crash(Err("panicked".into()), "webserver request task")
222               }))
223           }) ) }
224         }
225       );
226
227       let addr = SocketAddr::new(*addr, global.config.port);
228       let server = hyper::Server::try_bind(&addr)
229         .context("bind")?
230         .http1_preserve_header_case(true)
231         .serve(make_service);
232       info!("listening on {}", &addr);
233       let task = tokio::task::spawn(async move {
234         match server.await {
235           Ok(()) => anyhow!("shut down?!"),
236           Err(e) => e.into(),
237         }
238       });
239       tasks.push((task, format!("http server {}", addr)));
240     }
241
242     let global_ = global.clone();
243     let ipif = tokio::task::spawn(async move {
244       slocal::run(global_, local_tx_recv, ipif).await
245         .void_unwrap_err()
246     });
247     tasks.push((ipif, format!("ipif")));
248
249     Ok(())
250   });
251
252   if let Some(daemon) = daemon {
253     daemon.complete();
254   }
255
256   let (output, died_i, _) = future::select_all(
257     tasks.iter_mut().map(|e| &mut e.0)
258   ).await;
259
260   let task = &tasks[died_i].1;
261   let output = output.map_err(|je| je.to_string());
262   crash(output, task);
263 }
264
265 pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
266   match what_happened {
267     Err(je) => error!("task crashed! {}: {}", task, &je),
268     Ok(e)   => error!("task failed! {}: {}",   task, &e ),
269   }
270   process::exit(12);
271 }