chiark / gitweb /
reorg source
[hippotat.git] / server / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod sclient;
8 mod sweb;
9
10 pub use sweb::{WebRequest, WebResponse};
11 pub use sclient::Client;
12
13 #[derive(StructOpt,Debug)]
14 pub struct Opts {
15   #[structopt(flatten)]
16   pub log: LogOpts,
17
18   #[structopt(flatten)]
19   pub config: config::Opts,
20 }
21
22 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
23 pub const INTERNAL_QUEUE: usize = 15; // xxx: config
24
25 #[derive(Debug)]
26 pub struct Global {
27   config: config::InstanceConfigGlobal,
28   all_clients: HashMap<ClientName, Client>,
29 }
30
31 pub type RoutedPacket = Box<[u8]>; // not MIME data
32
33 #[throws(PacketError)]
34 pub async fn route_packet(global: &Global,
35                           conn: &str, link: &(dyn Display + Sync),
36                           packet: RoutedPacket, daddr: IpAddr)
37 {
38   let c = &global.config;
39   let len = packet.len();
40   let trace = |how: &str, why: &str| {
41     trace!("{} {} {} {} {:?} len={}",
42            conn, link, how, why, daddr, len);
43   };
44
45   let (dest, why) =
46     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
47       (None, "ipif-inbound-xxx")
48     } else if daddr == c.vrelay {
49       (None, "vrelay")
50     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
51       (Some(&client.route), "client")
52     } else {
53       (None, "no-client")
54     };
55
56   let dest = if let Some(d) = dest { d } else {
57     trace("discard", why); return;
58   };
59
60   match dest.send(packet).await {
61     Ok(()) => trace("forward", why),
62     Err(_) => trace("task-crashed!", why),
63   }
64 }
65
66 #[tokio::main]
67 async fn main() {
68   let opts = Opts::from_args();
69   let mut tasks: Vec<(
70     JoinHandle<AE>,
71     String,
72   )> = vec![];
73
74   let (global, ipif) = config::startup(
75     "hippotatd", LinkEnd::Server,
76     &opts.config, &opts.log, |ics|
77   {
78     let global_config = config::InstanceConfigGlobal::from(&ics);
79
80     let ipif = Ipif::start(&global_config.ipif, None)?;
81
82     let ics = ics.into_iter().map(Arc::new).collect_vec();
83     let (client_handles_send, client_handles_recv) = ics.iter()
84       .map(|_ic| {
85         let (web_send, web_recv) = mpsc::channel(
86           5 // xxx should me max_requests_outstanding but that's
87           // marked client-only so needs rework
88         );
89         let (route_send, route_recv) = mpsc::channel(
90           INTERNAL_QUEUE
91         );
92         ((web_send, route_send), (web_recv, route_recv))
93       }).unzip::<_,_,Vec<_>,Vec<_>>();
94
95     let all_clients = izip!(
96       &ics,
97       client_handles_send,
98     ).map(|(ic, (web_send, route_send))| {
99       (ic.link.client,
100        Client {
101          ic: ic.clone(),
102          web: web_send,
103          route: route_send,
104        })
105     }).collect();
106
107     let global = Arc::new(Global {
108       config: global_config,
109       all_clients,
110     });
111
112     for (ic, (web_recv, route_recv)) in izip!(
113       ics,
114       client_handles_recv,
115     ) {
116       let global_ = global.clone();
117       let ic_ = ic.clone();
118       tasks.push((tokio::spawn(async move {
119         sclient::run(global_, ic_, web_recv, route_recv)
120           .await.void_unwrap_err()
121       }), format!("client {}", &ic)));
122     }
123
124     for addr in &global.config.addrs {
125       let global_ = global.clone();
126       let make_service = hyper::service::make_service_fn(
127         move |conn: &hyper::server::conn::AddrStream| {
128           let global_ = global_.clone();
129           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
130           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
131             sweb::handle(conn.clone(), global_.clone(), req)
132           }) ) }
133         }
134       );
135
136       let addr = SocketAddr::new(*addr, global.config.port);
137       let server = hyper::Server::try_bind(&addr)
138         .context("bind")?
139         .http1_preserve_header_case(true)
140         .serve(make_service);
141       info!("listening on {}", &addr);
142       let task = tokio::task::spawn(async move {
143         match server.await {
144           Ok(()) => anyhow!("shut down?!"),
145           Err(e) => e.into(),
146         }
147       });
148       tasks.push((task, format!("http server {}", addr)));
149     }
150     
151     Ok((global, ipif))
152   });
153
154   let died = future::select_all(
155     tasks.iter_mut().map(|e| &mut e.0)
156   ).await;
157   error!("xxx {:?}", &died);
158
159   ipif.quitting(None).await;
160
161   dbg!(global);
162 }