chiark / gitweb /
server: route wip, do sending
[hippotat.git] / src / bin / server.rs
index a45bec6d92e4351bd75f3484332925527e1302d1..df34045d7b61d44f5cc0a0f33f5fb212aff48974 100644 (file)
@@ -14,6 +14,7 @@ pub struct Opts {
 }
 
 const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
+const INTERNAL_QUEUE: usize = 15; // xxx: config
 
 #[derive(Debug)]
 pub struct Global {
@@ -25,6 +26,7 @@ pub struct Global {
 pub struct Client {
   ic: Arc<InstanceConfig>,
   web: mpsc::Sender<WebRequest>,
+  route: mpsc::Sender<RoutedPacket>,
 }
 
 pub type RoutedPacket = Box<[u8]>; // not MIME data
@@ -57,22 +59,35 @@ struct WebResponse {
 type WebResponseData = Vec<u8>;
 
 #[throws(PacketError)]
-pub fn route_packet(global: &Global,
-                    conn: &str, link: &dyn Display,
-                    packet: RoutedPacket, daddr: IpAddr)
+pub async fn route_packet(global: &Global,
+                          conn: &str, link: &(dyn Display + Sync),
+                          packet: RoutedPacket, daddr: IpAddr)
 {
   let c = &global.config;
-  let trace = |how| trace!("{} {} route {} daddr={:?} len={}",
-                           conn, link, how, daddr, packet.len());
-
-  if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
-    trace("ipif inbound xxx discarding");
-  } else if daddr == c.vrelay {
-    trace("discard (relay)");
-  } else if let Some(_client) = global.all_clients.get(&ClientName(daddr)) {
-    trace("ipif route xxx discarding");
-  } else {
-    trace("discard (no client)");
+  let len = packet.len();
+  let trace = |how: &str, why: &str| {
+    trace!("{} {} {} {} {:?} len={}",
+           conn, link, how, why, daddr, len);
+  };
+
+  let (dest, why) =
+    if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
+      (None, "ipif-inbound-xxx")
+    } else if daddr == c.vrelay {
+      (None, "vrelay")
+    } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
+      (Some(&client.route), "client")
+    } else {
+      (None, "no-client")
+    };
+
+  let dest = if let Some(d) = dest { d } else {
+    trace("discard", why); return;
+  };
+
+  match dest.send(packet).await {
+    Ok(()) => trace("forward", why),
+    Err(_) => trace("task-crashed!", why),
   }
 }
 
@@ -274,7 +289,8 @@ async fn handle(
 #[allow(unused_mut)] // xxx
 async fn run_client(global: Arc<Global>,
                     ic: Arc<InstanceConfig>,
-                    mut web: mpsc::Receiver<WebRequest>)
+                    mut web: mpsc::Receiver<WebRequest>,
+                    mut routed: mpsc::Receiver<RoutedPacket>)
                     -> Result<Void, AE>
 {
   struct Outstanding {
@@ -401,7 +417,7 @@ async fn run_client(global: Arc<Global>,
               &global, &conn, &ic.link.client, daddr,packet
             ),
               |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
-            )?;
+            ).await?;
           }
 
           let oi = OutstandingInner {
@@ -441,19 +457,26 @@ async fn main() {
 
     let ics = ics.into_iter().map(Arc::new).collect_vec();
     let (client_handles_send, client_handles_recv) = ics.iter()
-      .map(|_ic| mpsc::channel(
-        5 // xxx should me max_requests_outstanding but that's
-        // marked client-only so needs rework
-      )).unzip::<_,_,Vec<_>,Vec<_>>();
+      .map(|_ic| {
+        let (web_send, web_recv) = mpsc::channel(
+          5 // xxx should me max_requests_outstanding but that's
+          // marked client-only so needs rework
+        );
+        let (route_send, route_recv) = mpsc::channel(
+          INTERNAL_QUEUE
+        );
+        ((web_send, route_send), (web_recv, route_recv))
+      }).unzip::<_,_,Vec<_>,Vec<_>>();
 
     let all_clients = izip!(
       &ics,
       client_handles_send,
-    ).map(|(ic, web_send)| {
+    ).map(|(ic, (web_send, route_send))| {
       (ic.link.client,
        Client {
          ic: ic.clone(),
          web: web_send,
+         route: route_send,
        })
     }).collect();
 
@@ -462,14 +485,15 @@ async fn main() {
       all_clients,
     });
 
-    for (ic, web_recv) in izip!(
+    for (ic, (web_recv, route_recv)) in izip!(
       ics,
       client_handles_recv,
     ) {
       let global_ = global.clone();
       let ic_ = ic.clone();
       tasks.push((tokio::spawn(async move {
-        run_client(global_, ic_, web_recv).await.void_unwrap_err()
+        run_client(global_, ic_, web_recv, route_recv)
+          .await.void_unwrap_err()
       }), format!("client {}", &ic)));
     }