1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
5 use hippotat::prelude::*;
11 pub use sweb::{WebRequest, WebResponse};
14 #[derive(StructOpt,Debug)]
20 pub config: config::Opts,
23 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
24 pub const INTERNAL_QUEUE: usize = 15; // xxx: config
28 config: config::InstanceConfigGlobal,
29 local_rx: mpsc::Sender<RoutedPacket>,
30 all_clients: HashMap<ClientName, User>,
33 pub struct RoutedPacket {
34 pub data: RoutedPacketData,
35 pub source: Option<ClientName>, // for eh, tracing, etc.
38 // not MIME data, valid SLIP (checked)
39 pub type RoutedPacketData = Box<[u8]>;
43 #[derive(Clone,Debug)]
44 pub struct MayRoute(());
46 pub fn came_from_outside_hippotatd() -> Self { Self(()) }
49 pub use may_route::MayRoute;
51 #[throws(PacketError)]
52 pub async fn route_packet(global: &Global,
53 conn: &str, source: Option<&ClientName>,
54 packet: RoutedPacketData, daddr: IpAddr,
57 let c = &global.config;
58 let len = packet.len();
59 let trace = |how: &str, why: &str| {
60 trace!("{} {} {} {} {:?} len={}",
63 Some(s) => (s as &dyn Display),
66 how, why, daddr, len);
70 if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
71 (None, "ipif-inbound-xxx")
72 } else if daddr == c.vrelay {
74 } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
75 (Some(&client.route), "client")
80 let dest = if let Some(d) = dest { d } else {
81 trace("discard", why); return;
84 let packet = RoutedPacket {
86 source: source.cloned(),
88 match dest.send(packet).await {
89 Ok(()) => trace("forward", why),
90 Err(_) => trace("task-crashed!", why),
96 let opts = Opts::from_args();
102 let global = config::startup(
103 "hippotatd", LinkEnd::Server,
104 &opts.config, &opts.log, |ics|
106 let global_config = config::InstanceConfigGlobal::from(&ics);
108 let ipif = Ipif::start(&global_config.ipif, None)?;
110 let ics = ics.into_iter().map(Arc::new).collect_vec();
111 let (client_handles_send, client_handles_recv) = ics.iter()
113 let (web_send, web_recv) = mpsc::channel(
114 5 // xxx should me max_requests_outstanding but that's
115 // marked client-only so needs rework
117 let (route_send, route_recv) = mpsc::channel(
120 ((web_send, route_send), (web_recv, route_recv))
121 }).unzip::<_,_,Vec<_>,Vec<_>>();
123 let all_clients = izip!(
126 ).map(|(ic, (web_send, route_send))| {
135 let (local_rx_send, local_tx_recv) = mpsc::channel(
136 50 // xxx configurable?
139 let global = Arc::new(Global {
140 config: global_config,
141 local_rx: local_rx_send,
145 for (ic, (web_recv, route_recv)) in izip!(
149 let global_ = global.clone();
150 let ic_ = ic.clone();
151 tasks.push((tokio::spawn(async move {
152 suser::run(global_, ic_, web_recv, route_recv)
153 .await.void_unwrap_err()
154 }), format!("client {}", &ic)));
157 for addr in &global.config.addrs {
158 let global_ = global.clone();
159 let make_service = hyper::service::make_service_fn(
160 move |conn: &hyper::server::conn::AddrStream| {
161 let global_ = global_.clone();
162 let conn = Arc::new(format!("[{}]", conn.remote_addr()));
163 async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
164 sweb::handle(conn.clone(), global_.clone(), req)
169 let addr = SocketAddr::new(*addr, global.config.port);
170 let server = hyper::Server::try_bind(&addr)
172 .http1_preserve_header_case(true)
173 .serve(make_service);
174 info!("listening on {}", &addr);
175 let task = tokio::task::spawn(async move {
177 Ok(()) => anyhow!("shut down?!"),
181 tasks.push((task, format!("http server {}", addr)));
184 let global_ = global.clone();
185 let ipif = tokio::task::spawn(async move {
186 slocal::run(global_, local_tx_recv, ipif).await
189 tasks.push((ipif, format!("ipif")));
194 let died = future::select_all(
195 tasks.iter_mut().map(|e| &mut e.0)
197 error!("xxx {:?}", &died);