chiark / gitweb /
panic handling in webserver task too
[hippotat.git] / server / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod suser;
8 mod slocal;
9 mod sweb;
10
11 pub use sweb::{WebRequest, WebResponse, WebResponseBody};
12 pub use suser::User;
13
14 #[derive(StructOpt,Debug)]
15 pub struct Opts {
16   #[structopt(flatten)]
17   pub log: LogOpts,
18
19   #[structopt(flatten)]
20   pub config: config::Opts,
21 }
22
23 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
24 pub const INTERNAL_QUEUE: usize = 15; // xxx: config
25
26 #[derive(Debug)]
27 pub struct Global {
28   config: config::InstanceConfigGlobal,
29   local_rx: mpsc::Sender<RoutedPacket>,
30   all_clients: HashMap<ClientName, User>,
31 }
32
33 pub struct RoutedPacket {
34   pub data: RoutedPacketData,
35 //  pub source: Option<ClientName>, // for eh, tracing, etc.
36 }
37
38 // not MIME data, valid SLIP (checked)
39 pub type RoutedPacketData = Box<[u8]>;
40
41 // loop prevention
42 // we don't decrement the ttl (naughty) but loops cannot arise
43 // because only the server has any routing code, and server
44 // has no internal loops, so worst case is
45 //  client if -> client -> server -> client' -> client if'
46 // and the ifs will decrement the ttl.
47 mod may_route {
48   #[derive(Clone,Debug)]
49   pub struct MayRoute(());
50   impl MayRoute {
51     pub fn came_from_outside_hippotatd() -> Self { Self(()) }
52   }
53 }
54 pub use may_route::MayRoute;
55
56 pub async fn route_packet(global: &Global,
57                           transport_conn: &str, source: Option<&ClientName>,
58                           packet: RoutedPacketData, daddr: IpAddr,
59                           _may_route: MayRoute)
60 {
61   let c = &global.config;
62   let len = packet.len();
63   let trace = |how: &str, why: &str| {
64     trace!("{} to={:?} came=={} user={} len={} {}",
65            how, daddr, transport_conn,
66            match source {
67              Some(s) => (s as &dyn Display),
68              None => &"local",
69            },
70            len, why);
71   };
72
73   let (dest, why) =
74     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
75       (Some(&global.local_rx), "via=local")
76     } else if daddr == c.vrelay {
77       (None, " vrelay")
78     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
79       (Some(&client.route), "via=client")
80     } else {
81       (None, "no-client")
82     };
83
84   let dest = if let Some(d) = dest { d } else {
85     trace("discard ", why); return;
86   };
87
88   let packet = RoutedPacket {
89     data: packet,
90 //    source: source.cloned(),
91   };
92   match dest.send(packet).await {
93     Ok(()) => trace("forward", why),
94     Err(_) => trace("task-crashed!", why),
95   }
96 }
97
98 #[tokio::main]
99 async fn main() {
100   let opts = Opts::from_args();
101   let mut tasks: Vec<(
102     JoinHandle<AE>,
103     String,
104   )> = vec![];
105
106   config::startup(
107     "hippotatd", LinkEnd::Server,
108     &opts.config, &opts.log, |ics|
109   {
110     let global_config = config::InstanceConfigGlobal::from(&ics);
111
112     let ipif = Ipif::start(&global_config.ipif, None)?;
113
114     let ics = ics.into_iter().map(Arc::new).collect_vec();
115     let (client_handles_send, client_handles_recv) = ics.iter()
116       .map(|_ic| {
117         let (web_send, web_recv) = mpsc::channel(
118           5 // xxx should me max_requests_outstanding but that's
119           // marked client-only so needs rework
120         );
121         let (route_send, route_recv) = mpsc::channel(
122           INTERNAL_QUEUE
123         );
124         ((web_send, route_send), (web_recv, route_recv))
125       }).unzip::<_,_,Vec<_>,Vec<_>>();
126
127     let all_clients = izip!(
128       &ics,
129       client_handles_send,
130     ).map(|(ic, (web_send, route_send))| {
131       (ic.link.client,
132        User {
133          ic: ic.clone(),
134          web: web_send,
135          route: route_send,
136        })
137     }).collect();
138
139     let (local_rx_send, local_tx_recv) = mpsc::channel(
140       50 // xxx configurable?
141     );
142
143     let global = Arc::new(Global {
144       config: global_config,
145       local_rx: local_rx_send,
146       all_clients,
147     });
148
149     for (ic, (web_recv, route_recv)) in izip!(
150       ics,
151       client_handles_recv,
152     ) {
153       let global_ = global.clone();
154       let ic_ = ic.clone();
155       tasks.push((tokio::spawn(async move {
156         suser::run(global_, ic_, web_recv, route_recv)
157           .await.void_unwrap_err()
158       }), format!("client {}", &ic)));
159     }
160
161     for addr in &global.config.addrs {
162       let global_ = global.clone();
163       let make_service = hyper::service::make_service_fn(
164         move |conn: &hyper::server::conn::AddrStream| {
165           let global_ = global_.clone();
166           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
167           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
168             AssertUnwindSafe(
169               sweb::handle(conn.clone(), global_.clone(), req)
170             )
171               .catch_unwind()
172               .map(|r| r.unwrap_or_else(|_|{
173                 crash(Err("panicked".into()), "webserver request task")
174               }))
175           }) ) }
176         }
177       );
178
179       let addr = SocketAddr::new(*addr, global.config.port);
180       let server = hyper::Server::try_bind(&addr)
181         .context("bind")?
182         .http1_preserve_header_case(true)
183         .serve(make_service);
184       info!("listening on {}", &addr);
185       let task = tokio::task::spawn(async move {
186         match server.await {
187           Ok(()) => anyhow!("shut down?!"),
188           Err(e) => e.into(),
189         }
190       });
191       tasks.push((task, format!("http server {}", addr)));
192     }
193
194     let global_ = global.clone();
195     let ipif = tokio::task::spawn(async move {
196       slocal::run(global_, local_tx_recv, ipif).await
197         .void_unwrap_err()
198     });
199     tasks.push((ipif, format!("ipif")));
200
201     Ok(())
202   });
203
204   let (output, died_i, _) = future::select_all(
205     tasks.iter_mut().map(|e| &mut e.0)
206   ).await;
207
208   let task = &tasks[died_i].1;
209   let output = output.map_err(|je| je.to_string());
210   crash(output, task);
211 }
212
213 pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
214   match what_happened {
215     Err(je) => error!("task crashed! {}: {}", task, &je),
216     Ok(e)   => error!("task failed! {}: {}",   task, &e ),
217   }
218   process::exit(12);
219 }