chiark / gitweb /
change type of route_packet etc.
[hippotat.git] / server / server.rs
1 // Copyright 2021 Ian Jackson and contributors to Hippotat
2 // SPDX-License-Identifier: GPL-3.0-or-later
3 // There is NO WARRANTY.
4
5 use hippotat::prelude::*;
6
7 mod suser;
8 mod slocal;
9 mod sweb;
10
11 pub use sweb::{WebRequest, WebResponse};
12 pub use suser::User;
13
14 #[derive(StructOpt,Debug)]
15 pub struct Opts {
16   #[structopt(flatten)]
17   pub log: LogOpts,
18
19   #[structopt(flatten)]
20   pub config: config::Opts,
21 }
22
23 pub const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
24 pub const INTERNAL_QUEUE: usize = 15; // xxx: config
25
26 #[derive(Debug)]
27 pub struct Global {
28   config: config::InstanceConfigGlobal,
29   local_rx: mpsc::Sender<RoutedPacket>,
30   all_clients: HashMap<ClientName, User>,
31 }
32
33 pub struct RoutedPacket {
34   pub data: RoutedPacketData,
35   pub source: Option<ClientName>, // for eh, tracing, etc.
36 }
37
38 // not MIME data, valid SLIP (checked)
39 pub type RoutedPacketData = Box<[u8]>;
40
41 #[throws(PacketError)]
42 pub async fn route_packet(global: &Global,
43                           conn: &str, source: Option<&ClientName>,
44                           packet: RoutedPacketData, daddr: IpAddr)
45 {
46   let c = &global.config;
47   let len = packet.len();
48   let trace = |how: &str, why: &str| {
49     trace!("{} {} {} {} {:?} len={}",
50            conn,
51            match source {
52              Some(s) => (s as &dyn Display),
53              None => &"local",
54            },
55            how, why, daddr, len);
56   };
57
58   let (dest, why) =
59     if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
60       (None, "ipif-inbound-xxx")
61     } else if daddr == c.vrelay {
62       (None, "vrelay")
63     } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
64       (Some(&client.route), "client")
65     } else {
66       (None, "no-client")
67     };
68
69   let dest = if let Some(d) = dest { d } else {
70     trace("discard", why); return;
71   };
72
73   let packet = RoutedPacket {
74     data: packet,
75     source: source.cloned(),
76   };
77   match dest.send(packet).await {
78     Ok(()) => trace("forward", why),
79     Err(_) => trace("task-crashed!", why),
80   }
81 }
82
83 #[tokio::main]
84 async fn main() {
85   let opts = Opts::from_args();
86   let mut tasks: Vec<(
87     JoinHandle<AE>,
88     String,
89   )> = vec![];
90
91   let global = config::startup(
92     "hippotatd", LinkEnd::Server,
93     &opts.config, &opts.log, |ics|
94   {
95     let global_config = config::InstanceConfigGlobal::from(&ics);
96
97     let ipif = Ipif::start(&global_config.ipif, None)?;
98
99     let ics = ics.into_iter().map(Arc::new).collect_vec();
100     let (client_handles_send, client_handles_recv) = ics.iter()
101       .map(|_ic| {
102         let (web_send, web_recv) = mpsc::channel(
103           5 // xxx should me max_requests_outstanding but that's
104           // marked client-only so needs rework
105         );
106         let (route_send, route_recv) = mpsc::channel(
107           INTERNAL_QUEUE
108         );
109         ((web_send, route_send), (web_recv, route_recv))
110       }).unzip::<_,_,Vec<_>,Vec<_>>();
111
112     let all_clients = izip!(
113       &ics,
114       client_handles_send,
115     ).map(|(ic, (web_send, route_send))| {
116       (ic.link.client,
117        User {
118          ic: ic.clone(),
119          web: web_send,
120          route: route_send,
121        })
122     }).collect();
123
124     let (local_rx_send, local_tx_recv) = mpsc::channel(
125       50 // xxx configurable?
126     );
127
128     let global = Arc::new(Global {
129       config: global_config,
130       local_rx: local_rx_send,
131       all_clients,
132     });
133
134     for (ic, (web_recv, route_recv)) in izip!(
135       ics,
136       client_handles_recv,
137     ) {
138       let global_ = global.clone();
139       let ic_ = ic.clone();
140       tasks.push((tokio::spawn(async move {
141         suser::run(global_, ic_, web_recv, route_recv)
142           .await.void_unwrap_err()
143       }), format!("client {}", &ic)));
144     }
145
146     for addr in &global.config.addrs {
147       let global_ = global.clone();
148       let make_service = hyper::service::make_service_fn(
149         move |conn: &hyper::server::conn::AddrStream| {
150           let global_ = global_.clone();
151           let conn = Arc::new(format!("[{}]", conn.remote_addr()));
152           async { Ok::<_, Void>( hyper::service::service_fn(move |req| {
153             sweb::handle(conn.clone(), global_.clone(), req)
154           }) ) }
155         }
156       );
157
158       let addr = SocketAddr::new(*addr, global.config.port);
159       let server = hyper::Server::try_bind(&addr)
160         .context("bind")?
161         .http1_preserve_header_case(true)
162         .serve(make_service);
163       info!("listening on {}", &addr);
164       let task = tokio::task::spawn(async move {
165         match server.await {
166           Ok(()) => anyhow!("shut down?!"),
167           Err(e) => e.into(),
168         }
169       });
170       tasks.push((task, format!("http server {}", addr)));
171     }
172
173     let global_ = global.clone();
174     let ipif = tokio::task::spawn(async move {
175       slocal::run(global_, local_tx_recv, ipif).await
176         .void_unwrap_err()
177     });
178     tasks.push((ipif, format!("ipif")));
179
180     Ok(global)
181   });
182
183   let died = future::select_all(
184     tasks.iter_mut().map(|e| &mut e.0)
185   ).await;
186   error!("xxx {:?}", &died);
187
188   dbg!(global);
189 }