chiark / gitweb /
server: route wip, do sending
[hippotat.git] / src / bin / server.rs
index 297740a1552ff729de9b71c3f6a5c902f54502d6..df34045d7b61d44f5cc0a0f33f5fb212aff48974 100644 (file)
@@ -14,19 +14,23 @@ pub struct Opts {
 }
 
 const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
+const INTERNAL_QUEUE: usize = 15; // xxx: config
 
 #[derive(Debug)]
-struct Global {
+pub struct Global {
   config: config::InstanceConfigGlobal,
   all_clients: HashMap<ClientName, Client>,
 }
 
 #[derive(Debug)]
-struct Client {
+pub struct Client {
   ic: Arc<InstanceConfig>,
-  web: tokio::sync::mpsc::Sender<WebRequest>,
+  web: mpsc::Sender<WebRequest>,
+  route: mpsc::Sender<RoutedPacket>,
 }
 
+pub type RoutedPacket = Box<[u8]>; // not MIME data
+
 /// Sent from hyper worker pool task to client task
 #[allow(dead_code)] // xxx
 #[derive(Debug)]
@@ -39,7 +43,7 @@ struct WebRequest {
   length_hint: usize,
   body: hyper::body::Body,
   boundary_finder: multipart::BoundaryFinder,
-  reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+  reply_to: oneshot::Sender<WebResponse>,
   warnings: Warnings,
   conn: Arc<String>,
 }
@@ -55,12 +59,36 @@ struct WebResponse {
 type WebResponseData = Vec<u8>;
 
 #[throws(PacketError)]
-pub fn route_packet(conn: &str, link: &dyn Display,
-                    packet: Box<[u8]>, daddr: IpAddr)
+pub async fn route_packet(global: &Global,
+                          conn: &str, link: &(dyn Display + Sync),
+                          packet: RoutedPacket, daddr: IpAddr)
 {
-  // xxx
-  trace!("{} {} discarding packet daddr={:?} len={}",
-         conn, link, daddr, packet.len());
+  let c = &global.config;
+  let len = packet.len();
+  let trace = |how: &str, why: &str| {
+    trace!("{} {} {} {} {:?} len={}",
+           conn, link, how, why, daddr, len);
+  };
+
+  let (dest, why) =
+    if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
+      (None, "ipif-inbound-xxx")
+    } else if daddr == c.vrelay {
+      (None, "vrelay")
+    } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
+      (Some(&client.route), "client")
+    } else {
+      (None, "no-client")
+    };
+
+  let dest = if let Some(d) = dest { d } else {
+    trace("discard", why); return;
+  };
+
+  match dest.send(packet).await {
+    Ok(()) => trace("forward", why),
+    Err(_) => trace("task-crashed!", why),
+  }
 }
 
 async fn handle(
@@ -208,7 +236,7 @@ async fn handle(
     //eprintln!("boundary={:?} start={} name={:?} client={}",
     // boundary, start, &comp.name, &client.ic);
 
-    let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
+    let (reply_to, reply_recv) = oneshot::channel();
     trace!("{} {} request, Content-Length={}",
            &conn, &client_name, length_hint);
     let wreq = WebRequest {
@@ -259,12 +287,14 @@ async fn handle(
 
 #[allow(unused_variables)] // xxx
 #[allow(unused_mut)] // xxx
-async fn run_client(ic: Arc<InstanceConfig>,
-                    mut web: mpsc::Receiver<WebRequest>)
+async fn run_client(global: Arc<Global>,
+                    ic: Arc<InstanceConfig>,
+                    mut web: mpsc::Receiver<WebRequest>,
+                    mut routed: mpsc::Receiver<RoutedPacket>)
                     -> Result<Void, AE>
 {
   struct Outstanding {
-    reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+    reply_to: oneshot::Sender<WebResponse>,
     oi: OutstandingInner,
   }
   #[derive(Debug)]
@@ -275,7 +305,7 @@ async fn run_client(ic: Arc<InstanceConfig>,
   let  downbound: VecDeque<(/*xxx*/)> = default();
 
   let try_send_response = |
-    reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+    reply_to: oneshot::Sender<WebResponse>,
     response: WebResponse
   | {
     reply_to.send(response)
@@ -384,10 +414,10 @@ async fn run_client(ic: Arc<InstanceConfig>,
               let daddr = ip_packet_addr::<true>(header)?;
               Ok(daddr)
             }, |(daddr,packet)| route_packet(
-              &conn, &ic.link.client, daddr,packet
+              &global, &conn, &ic.link.client, daddr,packet
             ),
               |e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
-            )?;
+            ).await?;
           }
 
           let oi = OutstandingInner {
@@ -427,34 +457,46 @@ async fn main() {
 
     let ics = ics.into_iter().map(Arc::new).collect_vec();
     let (client_handles_send, client_handles_recv) = ics.iter()
-      .map(|_ic| mpsc::channel(
-        5 // xxx should me max_requests_outstanding but that's
-        // marked client-only so needs rework
-      )).unzip::<_,_,Vec<_>,Vec<_>>();
+      .map(|_ic| {
+        let (web_send, web_recv) = mpsc::channel(
+          5 // xxx should me max_requests_outstanding but that's
+          // marked client-only so needs rework
+        );
+        let (route_send, route_recv) = mpsc::channel(
+          INTERNAL_QUEUE
+        );
+        ((web_send, route_send), (web_recv, route_recv))
+      }).unzip::<_,_,Vec<_>,Vec<_>>();
 
     let all_clients = izip!(
-      ics.into_iter(),
+      &ics,
       client_handles_send,
-      client_handles_recv,
-    ).map(|(ic, web_send, web_recv)| {
-      let r = (ic.link.client,
-           Client {
-             ic: ic.clone(),
-             web: web_send,
-           });
-
-      let ic_ = ic.clone();
-      tasks.push((tokio::spawn(async move {
-        run_client(ic_, web_recv).await.void_unwrap_err()
-      }), format!("client {}", &ic)));
-
-      r
+    ).map(|(ic, (web_send, route_send))| {
+      (ic.link.client,
+       Client {
+         ic: ic.clone(),
+         web: web_send,
+         route: route_send,
+       })
     }).collect();
+
     let global = Arc::new(Global {
       config: global_config,
       all_clients,
     });
 
+    for (ic, (web_recv, route_recv)) in izip!(
+      ics,
+      client_handles_recv,
+    ) {
+      let global_ = global.clone();
+      let ic_ = ic.clone();
+      tasks.push((tokio::spawn(async move {
+        run_client(global_, ic_, web_recv, route_recv)
+          .await.void_unwrap_err()
+      }), format!("client {}", &ic)));
+    }
+
     for addr in &global.config.addrs {
       let global_ = global.clone();
       let make_service = hyper::service::make_service_fn(