chiark / gitweb /
reorg source
[hippotat.git] / server / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod suser;
8 mod slocal;
9 mod sweb;
10
11 pub use sweb::{WebRequest, WebResponse};
12 pub use suser::Client;
13
14 #[derive(StructOpt,Debug)]
15 pub struct Opts {
16   #[structopt(flatten)]
17   pub log: LogOpts,
18
19   #[structopt(flatten)]
20   pub config: config::Opts,
21 }
22
23 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
24 pub const INTERNAL_QUEUE: usize = 15; // xxx: config
25
26 #[derive(Debug)]
27 pub struct Global {
28   config: config::InstanceConfigGlobal,
29   all_clients: HashMap<ClientName, Client>,
30 }
31
32 pub type RoutedPacket = Box<[u8]>; // not MIME data
33
34 #[throws(PacketError)]
35 pub async fn route_packet(global: &Global,
36                           conn: &str, link: &(dyn Display + Sync),
37                           packet: RoutedPacket, daddr: IpAddr)
38 {
39   let c = &global.config;
40   let len = packet.len();
41   let trace = |how: &str, why: &str| {
42     trace!("{} {} {} {} {:?} len={}",
43            conn, link, how, why, daddr, len);
44   };
45
46   let (dest, why) =
47     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
48       (None, "ipif-inbound-xxx")
49     } else if daddr == c.vrelay {
50       (None, "vrelay")
51     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
52       (Some(&client.route), "client")
53     } else {
54       (None, "no-client")
55     };
56
57   let dest = if let Some(d) = dest { d } else {
58     trace("discard", why); return;
59   };
60
61   match dest.send(packet).await {
62     Ok(()) => trace("forward", why),
63     Err(_) => trace("task-crashed!", why),
64   }
65 }
66
67 #[tokio::main]
68 async fn main() {
69   let opts = Opts::from_args();
70   let mut tasks: Vec<(
71     JoinHandle<AE>,
72     String,
73   )> = vec![];
74
75   let (global, ipif) = config::startup(
76     "hippotatd", LinkEnd::Server,
77     &opts.config, &opts.log, |ics|
78   {
79     let global_config = config::InstanceConfigGlobal::from(&ics);
80
81     let ipif = Ipif::start(&global_config.ipif, None)?;
82
83     let ics = ics.into_iter().map(Arc::new).collect_vec();
84     let (client_handles_send, client_handles_recv) = ics.iter()
85       .map(|_ic| {
86         let (web_send, web_recv) = mpsc::channel(
87           5 // xxx should me max_requests_outstanding but that's
88           // marked client-only so needs rework
89         );
90         let (route_send, route_recv) = mpsc::channel(
91           INTERNAL_QUEUE
92         );
93         ((web_send, route_send), (web_recv, route_recv))
94       }).unzip::<_,_,Vec<_>,Vec<_>>();
95
96     let all_clients = izip!(
97       &ics,
98       client_handles_send,
99     ).map(|(ic, (web_send, route_send))| {
100       (ic.link.client,
101        Client {
102          ic: ic.clone(),
103          web: web_send,
104          route: route_send,
105        })
106     }).collect();
107
108     let global = Arc::new(Global {
109       config: global_config,
110       all_clients,
111     });
112
113     for (ic, (web_recv, route_recv)) in izip!(
114       ics,
115       client_handles_recv,
116     ) {
117       let global_ = global.clone();
118       let ic_ = ic.clone();
119       tasks.push((tokio::spawn(async move {
120         suser::run(global_, ic_, web_recv, route_recv)
121           .await.void_unwrap_err()
122       }), format!("client {}", &ic)));
123     }
124
125     for addr in &global.config.addrs {
126       let global_ = global.clone();
127       let make_service = hyper::service::make_service_fn(
128         move |conn: &hyper::server::conn::AddrStream| {
129           let global_ = global_.clone();
130           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
131           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
132             sweb::handle(conn.clone(), global_.clone(), req)
133           }) ) }
134         }
135       );
136
137       let addr = SocketAddr::new(*addr, global.config.port);
138       let server = hyper::Server::try_bind(&addr)
139         .context("bind")?
140         .http1_preserve_header_case(true)
141         .serve(make_service);
142       info!("listening on {}", &addr);
143       let task = tokio::task::spawn(async move {
144         match server.await {
145           Ok(()) => anyhow!("shut down?!"),
146           Err(e) => e.into(),
147         }
148       });
149       tasks.push((task, format!("http server {}", addr)));
150     }
151     
152     Ok((global, ipif))
153   });
154
155   let died = future::select_all(
156     tasks.iter_mut().map(|e| &mut e.0)
157   ).await;
158   error!("xxx {:?}", &died);
159
160   ipif.quitting(None).await;
161
162   dbg!(global);
163 }