From: Ian Jackson Date: Mon, 2 Aug 2021 01:26:22 +0000 (+0100) Subject: wip rx X-Git-Tag: hippotat/1.0.0~381 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=d0b3cf4e6cdf35bc8d3246d9d57e1404fa1e5752;p=hippotat.git wip rx Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index b43fc59..30936ee 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -184,7 +184,13 @@ async fn run_client( let mut reqs: Vec = Vec::with_capacity(ic.max_requests_outstanding.sat()); - let mut inbound: VecDeque> = default(); + let mut rx_queue: VecDeque> = default(); + #[derive(Debug)] + enum RxState { + Frame(Cursor>), + End, + } + let mut rx_current: Option = None; // xxx check that ic settings are all honoured @@ -199,8 +205,8 @@ async fn run_client( .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?; //eprintln!("data={:?}", DumpHex(&data)); - packets = check - ::<_,_,true>(ic.mtu, &data, |header| { + check + ::<_,_,_,true>(ic.mtu, &data, &mut packets, |header| { let addr = ip_packet_addr::(header)?; if addr != ic.link.client.0 { throw!(PE::Src(addr)) } Ok(()) @@ -208,7 +214,7 @@ async fn run_client( PE::Empty => { }, e@ PE::Src(_) => debug!("{}: tx: discarding: {}", &ic, e), e => error!("{}: tx: discarding: {}", &ic, e), - }).into(); + }); }, _ = async { }, @@ -225,8 +231,10 @@ async fn run_client( _ = async { }, if reqs.len() < ic.target_requests_outstanding.sat() || - (reqs.len() < ic.max_requests_outstanding.sat() && - ! upbound.is_empty()) => + (reqs.len() < ic.max_requests_outstanding.sat() && + ! upbound.is_empty()) + // xxx backpressure, if too much in rx_queue + => { submit_request(&c, &mut reqs, mem::take(&mut upbound).into())?; }, @@ -235,8 +243,17 @@ async fn run_client( if ! reqs.is_empty() => { reqs.swap_remove(goti); + if let Some(got) = got { - dbg!(&got.remaining()); // xxx + check + ::<_,_,_,false>(ic.mtu, &got, &mut rx_queue, |header| { + let addr = ip_packet_addr::(header)?; + if addr != ic.link.client.0 { throw!(PE::Dst(addr)) } + Ok(()) + }, |e| error!("{}: rx: discarding: {}", &ic, e)); + + dbg!(&rx_queue.len()); + rx_queue = default(); // xxx } } } diff --git a/src/prelude.rs b/src/prelude.rs index 4a2b9c3..3b1bed5 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -10,7 +10,7 @@ pub use std::cmp::{min, max}; pub use std::fs; pub use std::fmt::{self, Debug, Display, Write as _}; pub use std::future::Future; -pub use std::io::{self, ErrorKind, Read as _, Write as _}; +pub use std::io::{self, Cursor, ErrorKind, Read as _, Write as _}; pub use std::iter; pub use std::mem; pub use std::net::{IpAddr, Ipv4Addr, Ipv6Addr}; diff --git a/src/slip.rs b/src/slip.rs index c5b919b..16b56b5 100644 --- a/src/slip.rs +++ b/src/slip.rs @@ -16,16 +16,17 @@ pub enum PacketError { #[error("bad, IPv{vsn}, len={len}")] Bad { len: usize, vsn: u8 }, } -pub fn check - (mtu: u32, data: &[u8], mut addr_chk: AC, mut error_handler: EH) - -> Vec> -where AC: FnMut(&[u8]) -> Result<(), PacketError>, - EH: FnMut(PacketError), +pub fn check( + mtu: u32, + data: &[u8], + out: &mut OUT, + mut addr_chk: AC, + mut error_handler: EH +) where OUT: Extend>, + AC: FnMut(&[u8]) -> Result<(), PacketError>, + EH: FnMut(PacketError), { // eprintln!("before: {:?}", DumpHex(data)); - - let mut out = vec![]; - for packet in data.split(|&c| c == SLIP_END) { match (||{ if packet.len() == 0 { @@ -69,10 +70,9 @@ where AC: FnMut(&[u8]) -> Result<(), PacketError>, Ok(packet) })() { Err(e) => error_handler(e), - Ok(packet) => out.push(packet), + Ok(packet) => out.extend(iter::once(packet)), } } - out // eprintln!(" after: {:?}", DumpHex(data)); }