1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
5 use hippotat::prelude::*;
10 pub use sweb::{WebRequest, WebResponse};
11 pub use slink::Client;
13 #[derive(StructOpt,Debug)]
19 pub config: config::Opts,
22 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
23 pub const INTERNAL_QUEUE: usize = 15; // xxx: config
27 config: config::InstanceConfigGlobal,
28 all_clients: HashMap<ClientName, Client>,
31 pub type RoutedPacket = Box<[u8]>; // not MIME data
33 #[throws(PacketError)]
34 pub async fn route_packet(global: &Global,
35 conn: &str, link: &(dyn Display + Sync),
36 packet: RoutedPacket, daddr: IpAddr)
38 let c = &global.config;
39 let len = packet.len();
40 let trace = |how: &str, why: &str| {
41 trace!("{} {} {} {} {:?} len={}",
42 conn, link, how, why, daddr, len);
46 if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
47 (None, "ipif-inbound-xxx")
48 } else if daddr == c.vrelay {
50 } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
51 (Some(&client.route), "client")
56 let dest = if let Some(d) = dest { d } else {
57 trace("discard", why); return;
60 match dest.send(packet).await {
61 Ok(()) => trace("forward", why),
62 Err(_) => trace("task-crashed!", why),
68 let opts = Opts::from_args();
74 let (global, ipif) = config::startup(
75 "hippotatd", LinkEnd::Server,
76 &opts.config, &opts.log, |ics|
78 let global_config = config::InstanceConfigGlobal::from(&ics);
80 let ipif = Ipif::start(&global_config.ipif, None)?;
82 let ics = ics.into_iter().map(Arc::new).collect_vec();
83 let (client_handles_send, client_handles_recv) = ics.iter()
85 let (web_send, web_recv) = mpsc::channel(
86 5 // xxx should me max_requests_outstanding but that's
87 // marked client-only so needs rework
89 let (route_send, route_recv) = mpsc::channel(
92 ((web_send, route_send), (web_recv, route_recv))
93 }).unzip::<_,_,Vec<_>,Vec<_>>();
95 let all_clients = izip!(
98 ).map(|(ic, (web_send, route_send))| {
107 let global = Arc::new(Global {
108 config: global_config,
112 for (ic, (web_recv, route_recv)) in izip!(
116 let global_ = global.clone();
117 let ic_ = ic.clone();
118 tasks.push((tokio::spawn(async move {
119 slink::run(global_, ic_, web_recv, route_recv)
120 .await.void_unwrap_err()
121 }), format!("client {}", &ic)));
124 for addr in &global.config.addrs {
125 let global_ = global.clone();
126 let make_service = hyper::service::make_service_fn(
127 move |conn: &hyper::server::conn::AddrStream| {
128 let global_ = global_.clone();
129 let conn = Arc::new(format!("[{}]", conn.remote_addr()));
130 async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
131 sweb::handle(conn.clone(), global_.clone(), req)
136 let addr = SocketAddr::new(*addr, global.config.port);
137 let server = hyper::Server::try_bind(&addr)
139 .http1_preserve_header_case(true)
140 .serve(make_service);
141 info!("listening on {}", &addr);
142 let task = tokio::task::spawn(async move {
144 Ok(()) => anyhow!("shut down?!"),
148 tasks.push((task, format!("http server {}", addr)));
154 let died = future::select_all(
155 tasks.iter_mut().map(|e| &mut e.0)
157 error!("xxx {:?}", &died);
159 ipif.quitting(None).await;