1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
5 use hippotat::prelude::*;
11 pub use sweb::{WebRequest, WebResponse, WebResponseBody};
14 #[derive(StructOpt,Debug)]
20 pub config: config::Opts,
23 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
25 pub const MAXQUEUE_ROUTE2WEB: usize = 15; // xxx: config
29 config: config::InstanceConfigGlobal,
30 local_rx: mpsc::Sender<RoutedPacket>,
31 all_clients: HashMap<ClientName, User>,
34 pub struct RoutedPacket {
35 pub data: RoutedPacketData,
36 // pub source: Option<ClientName>, // for eh, tracing, etc.
39 // not MIME data, valid SLIP (checked)
40 pub type RoutedPacketData = Box<[u8]>;
43 // we don't decrement the ttl (naughty) but loops cannot arise
44 // because only the server has any routing code, and server
45 // has no internal loops, so worst case is
46 // client if -> client -> server -> client' -> client if'
47 // and the ifs will decrement the ttl.
49 #[derive(Clone,Debug)]
50 pub struct MayRoute(());
52 pub fn came_from_outside_hippotatd() -> Self { Self(()) }
55 pub use may_route::MayRoute;
57 pub async fn route_packet(global: &Global,
58 transport_conn: &str, source: Option<&ClientName>,
59 packet: RoutedPacketData, daddr: IpAddr,
62 let c = &global.config;
63 let len = packet.len();
64 let trace = |how: &str, why: &str| {
65 trace!("{} to={:?} came=={} user={} len={} {}",
66 how, daddr, transport_conn,
68 Some(s) => (s as &dyn Display),
75 if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
76 (Some(&global.local_rx), "via=local")
77 } else if daddr == c.vrelay {
79 } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
80 (Some(&client.route), "via=client")
85 let dest = if let Some(d) = dest { d } else {
86 trace("discard ", why); return;
89 let packet = RoutedPacket {
91 // source: source.cloned(),
93 match dest.send(packet).await {
94 Ok(()) => trace("forward", why),
95 Err(_) => trace("task-crashed!", why),
101 let opts = Opts::from_args();
108 "hippotatd", LinkEnd::Server,
109 &opts.config, &opts.log, |ics|
111 let global_config = config::InstanceConfigGlobal::from(&ics);
113 let ipif = Ipif::start(&global_config.ipif, None)?;
115 let ics = ics.into_iter().map(Arc::new).collect_vec();
116 let (client_handles_send, client_handles_recv) = ics.iter()
118 let (web_send, web_recv) = mpsc::channel(
119 5 // xxx should me max_requests_outstanding but that's
120 // marked client-only so needs rework
122 let (route_send, route_recv) = mpsc::channel(
125 ((web_send, route_send), (web_recv, route_recv))
126 }).unzip::<_,_,Vec<_>,Vec<_>>();
128 let all_clients = izip!(
131 ).map(|(ic, (web_send, route_send))| {
140 let (local_rx_send, local_tx_recv) = mpsc::channel(
141 50 // xxx configurable?
144 let global = Arc::new(Global {
145 config: global_config,
146 local_rx: local_rx_send,
150 for (ic, (web_recv, route_recv)) in izip!(
154 let global_ = global.clone();
155 let ic_ = ic.clone();
156 tasks.push((tokio::spawn(async move {
157 suser::run(global_, ic_, web_recv, route_recv)
158 .await.void_unwrap_err()
159 }), format!("client {}", &ic)));
162 for addr in &global.config.addrs {
163 let global_ = global.clone();
164 let make_service = hyper::service::make_service_fn(
165 move |conn: &hyper::server::conn::AddrStream| {
166 let global_ = global_.clone();
167 let conn = Arc::new(format!("[{}]", conn.remote_addr()));
168 async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
170 sweb::handle(conn.clone(), global_.clone(), req)
173 .map(|r| r.unwrap_or_else(|_|{
174 crash(Err("panicked".into()), "webserver request task")
180 let addr = SocketAddr::new(*addr, global.config.port);
181 let server = hyper::Server::try_bind(&addr)
183 .http1_preserve_header_case(true)
184 .serve(make_service);
185 info!("listening on {}", &addr);
186 let task = tokio::task::spawn(async move {
188 Ok(()) => anyhow!("shut down?!"),
192 tasks.push((task, format!("http server {}", addr)));
195 let global_ = global.clone();
196 let ipif = tokio::task::spawn(async move {
197 slocal::run(global_, local_tx_recv, ipif).await
200 tasks.push((ipif, format!("ipif")));
205 let (output, died_i, _) = future::select_all(
206 tasks.iter_mut().map(|e| &mut e.0)
209 let task = &tasks[died_i].1;
210 let output = output.map_err(|je| je.to_string());
214 pub fn crash(what_happened: Result<AE, String>, task: &str) -> ! {
215 match what_happened {
216 Err(je) => error!("task crashed! {}: {}", task, &je),
217 Ok(e) => error!("task failed! {}: {}", task, &e ),