chiark / gitweb /
0b5dde3e934929e6651173d93153297650d51f35
[hippotat.git] / server / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod suser;
8 mod slocal;
9 mod sweb;
10
11 pub use sweb::{WebRequest, WebResponse, WebResponseBody};
12 pub use suser::User;
13
14 #[derive(StructOpt,Debug)]
15 pub struct Opts {
16   #[structopt(flatten)]
17   pub log: LogOpts,
18
19   #[structopt(flatten)]
20   pub config: config::Opts,
21 }
22
23 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
24
25 pub const MAXQUEUE_ROUTE2WEB: usize = 15; // xxx: config
26
27 #[derive(Debug)]
28 pub struct Global {
29   config: config::InstanceConfigGlobal,
30   local_rx: mpsc::Sender<RoutedPacket>,
31   all_clients: HashMap<ClientName, User>,
32 }
33
34 pub struct RoutedPacket {
35   pub data: RoutedPacketData,
36 //  pub source: Option<ClientName>, // for eh, tracing, etc.
37 }
38
39 // not MIME data, valid SLIP (checked)
40 pub type RoutedPacketData = Box<[u8]>;
41
42 // loop prevention
43 // we don't decrement the ttl (naughty) but loops cannot arise
44 // because only the server has any routing code, and server
45 // has no internal loops, so worst case is
46 //  client if -> client -> server -> client' -> client if'
47 // and the ifs will decrement the ttl.
48 mod may_route {
49   #[derive(Clone,Debug)]
50   pub struct MayRoute(());
51   impl MayRoute {
52     pub fn came_from_outside_hippotatd() -> Self { Self(()) }
53   }
54 }
55 pub use may_route::MayRoute;
56
57 pub async fn route_packet(global: &Global,
58                           transport_conn: &str, source: Option<&ClientName>,
59                           packet: RoutedPacketData, daddr: IpAddr,
60                           _may_route: MayRoute)
61 {
62   let c = &global.config;
63   let len = packet.len();
64   let trace = |how: &str, why: &str| {
65     trace!("{} to={:?} came=={} user={} len={} {}",
66            how, daddr, transport_conn,
67            match source {
68              Some(s) => (s as &dyn Display),
69              None => &"local",
70            },
71            len, why);
72   };
73
74   let (dest, why) =
75     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
76       (Some(&global.local_rx), "via=local")
77     } else if daddr == c.vrelay {
78       (None, " vrelay")
79     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
80       (Some(&client.route), "via=client")
81     } else {
82       (None, "no-client")
83     };
84
85   let dest = if let Some(d) = dest { d } else {
86     trace("discard ", why); return;
87   };
88
89   let packet = RoutedPacket {
90     data: packet,
91 //    source: source.cloned(),
92   };
93   match dest.send(packet).await {
94     Ok(()) => trace("forward", why),
95     Err(_) => trace("task-crashed!", why),
96   }
97 }
98
99 #[tokio::main]
100 async fn main() {
101   let opts = Opts::from_args();
102   let mut tasks: Vec<(
103     JoinHandle<AE>,
104     String,
105   )> = vec![];
106
107   config::startup(
108     "hippotatd", LinkEnd::Server,
109     &opts.config, &opts.log, |ics|
110   {
111     let global_config = config::InstanceConfigGlobal::from(&ics);
112
113     let ipif = Ipif::start(&global_config.ipif, None)?;
114
115     let ics = ics.into_iter().map(Arc::new).collect_vec();
116     let (client_handles_send, client_handles_recv) = ics.iter()
117       .map(|_ic| {
118         let (web_send, web_recv) = mpsc::channel(
119           5 // xxx should me max_requests_outstanding but that's
120           // marked client-only so needs rework
121         );
122         let (route_send, route_recv) = mpsc::channel(
123           MAXQUEUE_ROUTE2WEB
124         );
125         ((web_send, route_send), (web_recv, route_recv))
126       }).unzip::<_,_,Vec<_>,Vec<_>>();
127
128     let all_clients = izip!(
129       &ics,
130       client_handles_send,
131     ).map(|(ic, (web_send, route_send))| {
132       (ic.link.client,
133        User {
134          ic: ic.clone(),
135          web: web_send,
136          route: route_send,
137        })
138     }).collect();
139
140     let (local_rx_send, local_tx_recv) = mpsc::channel(
141       50 // xxx configurable?
142     );
143
144     let global = Arc::new(Global {
145       config: global_config,
146       local_rx: local_rx_send,
147       all_clients,
148     });
149
150     for (ic, (web_recv, route_recv)) in izip!(
151       ics,
152       client_handles_recv,
153     ) {
154       let global_ = global.clone();
155       let ic_ = ic.clone();
156       tasks.push((tokio::spawn(async move {
157         suser::run(global_, ic_, web_recv, route_recv)
158           .await.void_unwrap_err()
159       }), format!("client {}", &ic)));
160     }
161
162     for addr in &global.config.addrs {
163       let global_ = global.clone();
164       let make_service = hyper::service::make_service_fn(
165         move |conn: &hyper::server::conn::AddrStream| {
166           let global_ = global_.clone();
167           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
168           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
169             AssertUnwindSafe(
170               sweb::handle(conn.clone(), global_.clone(), req)
171             )
172               .catch_unwind()
173               .map(|r| r.unwrap_or_else(|_|{
174                 crash(Err("panicked".into()), "webserver request task")
175               }))
176           }) ) }
177         }
178       );
179
180       let addr = SocketAddr::new(*addr, global.config.port);
181       let server = hyper::Server::try_bind(&addr)
182         .context("bind")?
183         .http1_preserve_header_case(true)
184         .serve(make_service);
185       info!("listening on {}", &addr);
186       let task = tokio::task::spawn(async move {
187         match server.await {
188           Ok(()) => anyhow!("shut down?!"),
189           Err(e) => e.into(),
190         }
191       });
192       tasks.push((task, format!("http server {}", addr)));
193     }
194
195     let global_ = global.clone();
196     let ipif = tokio::task::spawn(async move {
197       slocal::run(global_, local_tx_recv, ipif).await
198         .void_unwrap_err()
199     });
200     tasks.push((ipif, format!("ipif")));
201
202     Ok(())
203   });
204
205   let (output, died_i, _) = future::select_all(
206     tasks.iter_mut().map(|e| &mut e.0)
207   ).await;
208
209   let task = &tasks[died_i].1;
210   let output = output.map_err(|je| je.to_string());
211   crash(output, task);
212 }
213
214 pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
215   match what_happened {
216     Err(je) => error!("task crashed! {}: {}", task, &je),
217     Ok(e)   => error!("task failed! {}: {}",   task, &e ),
218   }
219   process::exit(12);
220 }