chiark / gitweb /
wip server ipif
[hippotat.git] / server / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod suser;
8 mod slocal;
9 mod sweb;
10
11 pub use sweb::{WebRequest, WebResponse};
12 pub use suser::User;
13
14 #[derive(StructOpt,Debug)]
15 pub struct Opts {
16   #[structopt(flatten)]
17   pub log: LogOpts,
18
19   #[structopt(flatten)]
20   pub config: config::Opts,
21 }
22
23 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
24 pub const INTERNAL_QUEUE: usize = 15; // xxx: config
25
26 #[derive(Debug)]
27 pub struct Global {
28   config: config::InstanceConfigGlobal,
29   local_rx: mpsc::Sender<RoutedPacket>,
30   all_clients: HashMap<ClientName, User>,
31 }
32
33 pub type RoutedPacket = Box<[u8]>; // not MIME data
34
35 #[throws(PacketError)]
36 pub async fn route_packet(global: &Global,
37                           conn: &str, link: &(dyn Display + Sync),
38                           packet: RoutedPacket, daddr: IpAddr)
39 {
40   let c = &global.config;
41   let len = packet.len();
42   let trace = |how: &str, why: &str| {
43     trace!("{} {} {} {} {:?} len={}",
44            conn, link, how, why, daddr, len);
45   };
46
47   let (dest, why) =
48     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
49       (None, "ipif-inbound-xxx")
50     } else if daddr == c.vrelay {
51       (None, "vrelay")
52     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
53       (Some(&client.route), "client")
54     } else {
55       (None, "no-client")
56     };
57
58   let dest = if let Some(d) = dest { d } else {
59     trace("discard", why); return;
60   };
61
62   match dest.send(packet).await {
63     Ok(()) => trace("forward", why),
64     Err(_) => trace("task-crashed!", why),
65   }
66 }
67
68 #[tokio::main]
69 async fn main() {
70   let opts = Opts::from_args();
71   let mut tasks: Vec<(
72     JoinHandle<AE>,
73     String,
74   )> = vec![];
75
76   let global = config::startup(
77     "hippotatd", LinkEnd::Server,
78     &opts.config, &opts.log, |ics|
79   {
80     let global_config = config::InstanceConfigGlobal::from(&ics);
81
82     let ipif = Ipif::start(&global_config.ipif, None)?;
83
84     let ics = ics.into_iter().map(Arc::new).collect_vec();
85     let (client_handles_send, client_handles_recv) = ics.iter()
86       .map(|_ic| {
87         let (web_send, web_recv) = mpsc::channel(
88           5 // xxx should me max_requests_outstanding but that's
89           // marked client-only so needs rework
90         );
91         let (route_send, route_recv) = mpsc::channel(
92           INTERNAL_QUEUE
93         );
94         ((web_send, route_send), (web_recv, route_recv))
95       }).unzip::<_,_,Vec<_>,Vec<_>>();
96
97     let all_clients = izip!(
98       &ics,
99       client_handles_send,
100     ).map(|(ic, (web_send, route_send))| {
101       (ic.link.client,
102        User {
103          ic: ic.clone(),
104          web: web_send,
105          route: route_send,
106        })
107     }).collect();
108
109     let (local_rx_send, local_tx_recv) = mpsc::channel(
110       50 // xxx configurable?
111     );
112
113     let global = Arc::new(Global {
114       config: global_config,
115       local_rx: local_rx_send,
116       all_clients,
117     });
118
119     for (ic, (web_recv, route_recv)) in izip!(
120       ics,
121       client_handles_recv,
122     ) {
123       let global_ = global.clone();
124       let ic_ = ic.clone();
125       tasks.push((tokio::spawn(async move {
126         suser::run(global_, ic_, web_recv, route_recv)
127           .await.void_unwrap_err()
128       }), format!("client {}", &ic)));
129     }
130
131     for addr in &global.config.addrs {
132       let global_ = global.clone();
133       let make_service = hyper::service::make_service_fn(
134         move |conn: &hyper::server::conn::AddrStream| {
135           let global_ = global_.clone();
136           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
137           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
138             sweb::handle(conn.clone(), global_.clone(), req)
139           }) ) }
140         }
141       );
142
143       let addr = SocketAddr::new(*addr, global.config.port);
144       let server = hyper::Server::try_bind(&addr)
145         .context("bind")?
146         .http1_preserve_header_case(true)
147         .serve(make_service);
148       info!("listening on {}", &addr);
149       let task = tokio::task::spawn(async move {
150         match server.await {
151           Ok(()) => anyhow!("shut down?!"),
152           Err(e) => e.into(),
153         }
154       });
155       tasks.push((task, format!("http server {}", addr)));
156     }
157
158     let global_ = global.clone();
159     let ipif = tokio::task::spawn(async move {
160       slocal::run(global_, local_tx_recv, ipif).await
161         .void_unwrap_err()
162     });
163     tasks.push((ipif, format!("ipif")));
164
165     Ok(global)
166   });
167
168   let died = future::select_all(
169     tasks.iter_mut().map(|e| &mut e.0)
170   ).await;
171   error!("xxx {:?}", &died);
172
173   dbg!(global);
174 }