chiark / gitweb /
wip rx
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 2 Aug 2021 01:26:22 +0000 (02:26 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 2 Aug 2021 01:26:22 +0000 (02:26 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs
src/prelude.rs
src/slip.rs

index b43fc594f1ba7ef652eb14bbca263ebd2b8d7e41..30936ee6eb60da5932d96665ffa008a2d0098287 100644 (file)
@@ -184,7 +184,13 @@ async fn run_client<C:HCC>(
   let mut reqs: Vec<OutstandingRequest>
     = Vec::with_capacity(ic.max_requests_outstanding.sat());
 
-  let mut inbound: VecDeque<Box<[u8]>> = default();
+  let mut rx_queue: VecDeque<Box<[u8]>> = default();
+  #[derive(Debug)]
+  enum RxState {
+    Frame(Cursor<Box<u8>>),
+    End,
+  }
+  let mut rx_current: Option<RxState> = None;
 
   // xxx check that ic settings are all honoured
 
@@ -199,8 +205,8 @@ async fn run_client<C:HCC>(
             .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
           //eprintln!("data={:?}", DumpHex(&data));
 
-          packets = check
-            ::<_,_,true>(ic.mtu, &data, |header| {
+          check
+            ::<_,_,_,true>(ic.mtu, &data, &mut packets, |header| {
               let addr = ip_packet_addr::<false>(header)?;
               if addr != ic.link.client.0 { throw!(PE::Src(addr)) }
               Ok(())
@@ -208,7 +214,7 @@ async fn run_client<C:HCC>(
               PE::Empty => { },
               e@ PE::Src(_) => debug!("{}: tx: discarding: {}", &ic, e),
               e => error!("{}: tx: discarding: {}", &ic, e),
-            }).into();
+            });
         },
 
         _ = async { },
@@ -225,8 +231,10 @@ async fn run_client<C:HCC>(
 
         _ = async { },
         if reqs.len() < ic.target_requests_outstanding.sat() ||
-          (reqs.len() < ic.max_requests_outstanding.sat() &&
-           ! upbound.is_empty()) =>
+           (reqs.len() < ic.max_requests_outstanding.sat() &&
+            ! upbound.is_empty())
+          // xxx backpressure, if too much in rx_queue
+          =>
         {
           submit_request(&c, &mut reqs, mem::take(&mut upbound).into())?;
         },
@@ -235,8 +243,17 @@ async fn run_client<C:HCC>(
           if ! reqs.is_empty() =>
         {
           reqs.swap_remove(goti);
+
           if let Some(got) = got {
-            dbg!(&got.remaining()); // xxx
+            check
+              ::<_,_,_,false>(ic.mtu, &got, &mut rx_queue, |header| {
+                let addr = ip_packet_addr::<true>(header)?;
+                if addr != ic.link.client.0 { throw!(PE::Dst(addr)) }
+                Ok(())
+              }, |e| error!("{}: rx: discarding: {}", &ic, e));
+          
+            dbg!(&rx_queue.len());
+            rx_queue = default(); // xxx
           }
         }
       }
index 4a2b9c3d04faf3e3d15d931f61ac859a47c82c72..3b1bed55e2ed14fe29f2de73c369bbefd681afd6 100644 (file)
@@ -10,7 +10,7 @@ pub use std::cmp::{min, max};
 pub use std::fs;
 pub use std::fmt::{self, Debug, Display, Write as _};
 pub use std::future::Future;
-pub use std::io::{self, ErrorKind, Read as _, Write as _};
+pub use std::io::{self, Cursor, ErrorKind, Read as _, Write as _};
 pub use std::iter;
 pub use std::mem;
 pub use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
index c5b919b55ef8fb7ebd9c563e81432d1dd3498154..16b56b5df765e1a4143333c60c2b2caf745d5647 100644 (file)
@@ -16,16 +16,17 @@ pub enum PacketError {
   #[error("bad, IPv{vsn}, len={len}")]     Bad { len: usize, vsn: u8 },
 }
 
-pub fn check<AC, EH, const TO_MIME: bool>
-  (mtu: u32, data: &[u8], mut addr_chk: AC, mut error_handler: EH)
-   -> Vec<Box<[u8]>>
-where AC: FnMut(&[u8]) -> Result<(), PacketError>,
-      EH: FnMut(PacketError),
+pub fn check<AC, EH, OUT, const TO_MIME: bool>(
+  mtu: u32,
+  data: &[u8],
+  out: &mut OUT,
+  mut addr_chk: AC,
+  mut error_handler: EH
+) where OUT: Extend<Box<[u8]>>,
+        AC: FnMut(&[u8]) -> Result<(), PacketError>,
+        EH: FnMut(PacketError),
 {
 //  eprintln!("before: {:?}", DumpHex(data));
-
-  let mut out = vec![];
-
   for packet in data.split(|&c| c == SLIP_END) {
     match (||{
       if packet.len() == 0 {
@@ -69,10 +70,9 @@ where AC: FnMut(&[u8]) -> Result<(), PacketError>,
       Ok(packet)
     })() {
       Err(e) => error_handler(e),
-      Ok(packet) => out.push(packet),
+      Ok(packet) => out.extend(iter::once(packet)),
     }
   }
-  out
 //  eprintln!(" after: {:?}", DumpHex(data));
 }