From 1cb699d8b1fec6b500f8c34c7b4b98065370956b Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Mon, 2 Aug 2021 00:50:56 +0100 Subject: [PATCH] completely redo check() and main client loop Signed-off-by: Ian Jackson --- src/bin/client.rs | 77 +++++++++++++--------------------- src/prelude.rs | 2 +- src/slip.rs | 103 +++++++++++++++++++++++++++++----------------- 3 files changed, 95 insertions(+), 87 deletions(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index d6ee158..425b82a 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -175,7 +175,8 @@ async fn run_client( let tx_stream = ipif.stdout.take().unwrap(); let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END); - let mut tx_defer = None; + let mut packets: VecDeque> = default(); + let mut upbound = Frames::default(); let stream_for_rx = ipif.stdin .take().unwrap(); @@ -187,61 +188,39 @@ async fn run_client( async { loop { select! { - packet = async { - // cancellation safety: if this future is polled, we might - // move out of tx_defer, but then we will be immediately - // ready and yield it inot packet - if let Some(y) = tx_defer.take() { - Ok(Some(y)) - } else { - tx_stream.next_segment().await - } + data = tx_stream.next_segment(), + if packets.is_empty() => + { + let data = + data.context("read from ipif")? + .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?; + // eprintln!("packet={:x?}", &packet); + + packets = check + ::<_, true>(ic.mtu, &data, |e| match e { + e => error!("PACKET ERROR {}", e), // xxx + }).into(); }, - if tx_defer.is_none() && - reqs.len() < ic.max_requests_outstanding.sat() => + + _ = async { }, + if ! upbound.tried_full() && + ! packets.is_empty() => { - let mut upbound = Frames::default(); - let mut to_process: Option>,_>> - = Some(packet); - while let Some(packet) = to_process.take() { - let mut packet = - packet.context("read from ipif")? - .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?; -// eprintln!("packet={:x?}", &packet); - if let Ok(()) = check_checkmtu_mimeswap - ::(ic.mtu, &mut packet) - { - match upbound.add(ic.max_batch_up, packet) { - Err(packet) => { - tx_defer = Some(packet); - break; - } - Ok(()) => { }, - }; - // we rely oin `next_segment` being cancellation-safe, - // which isn't documented as true but seems reasonably safe - pin!{ let next_segment = tx_stream.next_segment(); } - to_process = match poll!(next_segment) { - Poll::Ready(p) => Some(p), - Poll::Pending => None, - }; + while let Some(packet) = packets.pop_front() { + match upbound.add(ic.max_batch_up, packet.into()/*xxx*/) { + Err(packet) => { packets.push_front(packet.into()/*xxx*/); break; } + Ok(()) => { }, } } - assert!( to_process.is_none() ); - dbg!(&reqs.len(), &upbound); - - //: impl futures::Stream> - submit_request(&c, &mut reqs, upbound.into())?; - } + }, - () = async { }, + _ = async { }, if reqs.len() < ic.target_requests_outstanding.sat() || - (tx_defer.is_some() && - reqs.len() < ic.max_requests_outstanding.sat()) => + (reqs.len() < ic.max_requests_outstanding.sat() && + ! upbound.is_empty()) => { - let upbound = tx_defer.take().into_iter().collect_vec(); - submit_request(&c, &mut reqs, upbound)?; - } + submit_request(&c, &mut reqs, mem::take(&mut upbound).into())?; + }, (got, goti, _) = async { future::select_all(&mut reqs).await }, if ! reqs.is_empty() => diff --git a/src/prelude.rs b/src/prelude.rs index d87d4fb..2b377d9 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -3,7 +3,7 @@ // There is NO WARRANTY. pub use std::array; -pub use std::collections::{BTreeSet, HashMap}; +pub use std::collections::{BTreeSet, HashMap, VecDeque}; pub use std::convert::{TryFrom, TryInto}; pub use std::borrow::Cow; pub use std::cmp::{min, max}; diff --git a/src/slip.rs b/src/slip.rs index 357a890..1713057 100644 --- a/src/slip.rs +++ b/src/slip.rs @@ -6,44 +6,61 @@ use crate::prelude::*; pub static SLIP_END_SLICE: &[u8] = &[SLIP_END]; -#[derive(Error,Debug,Copy,Clone)] +#[derive(Error,Debug,Copy,Clone,Eq,PartialEq)] pub enum PacketError { + #[error("empty packet")] Empty, #[error("MTU exceeded ({len} > {mtu})")] MTU { len: usize, mtu: u32 }, #[error("Invalid SLIP escape sequence")] SLIP, } -#[throws(PacketError)] -pub fn check_checkmtu_mimeswap - (mtu: u32, data: &mut [u8]) +pub fn check + (mtu: u32, data: &[u8], mut error_handler: EH) + -> Vec> +where EH: FnMut(PacketError) { // eprintln!("before: {:?}", DumpHex(data)); - for mut packet in data.split_mut(|&c| c == SLIP_END) { - if packet.len() > mtu.sat() { - throw!(PacketError::MTU { len: packet.len(), mtu }) - } + let mut out = vec![]; - while let Some((i, was_mime)) = packet.iter().enumerate().find_map( - |(i,&c)| match c { - SLIP_MIME_ESC => Some((i,true)), - SLIP_ESC => Some((i,false)), - _ => None, + for packet in data.split(|&c| c == SLIP_END) { + match (||{ + if packet.len() == 0 { + throw!(PacketError::Empty) + } + if packet.len() > mtu.sat() { + throw!(PacketError::MTU { len: packet.len(), mtu }); } - ) { - packet[i] = if was_mime { SLIP_ESC } else { SLIP_MIME_ESC }; - if was_mime != TO_MIME { - match packet.get(i+1) { - Some(&SLIP_ESC_END) | - Some(&SLIP_ESC_ESC) => Ok(()), - _ => throw!(PacketError::SLIP), - }?; - packet = &mut packet[i+2 ..]; - } else { - packet = &mut packet[i+1 ..]; + + let mut packet: Box<[u8]> = packet.to_owned().into(); + let mut walk: &mut [u8] = &mut packet; + + while let Some((i, was_mime)) = walk.iter().enumerate().find_map( + |(i,&c)| match c { + SLIP_MIME_ESC => Some((i,true)), + SLIP_ESC => Some((i,false)), + _ => None, + } + ) { + walk[i] = if was_mime { SLIP_ESC } else { SLIP_MIME_ESC }; + if was_mime != TO_MIME { + match walk.get(i+1) { + Some(&SLIP_ESC_END) | + Some(&SLIP_ESC_ESC) => Ok(()), + _ => Err(PacketError::SLIP), + }?; + walk = &mut walk[i+2 ..]; + } else { + walk = &mut walk[i+1 ..]; + } } + + Ok(packet) + })() { + Err(e) => error_handler(e), + Ok(packet) => out.push(packet), } } - + out // eprintln!(" after: {:?}", DumpHex(data)); } @@ -54,6 +71,7 @@ pub type FramesData = Vec>; pub struct Frames { frames: FramesData, total_len: usize, + tried_full: bool, } impl Debug for Frames { @@ -68,10 +86,13 @@ impl Frames { pub fn add(&mut self, max: u32, frame: Frame) { if frame.len() == 0 { return } let new_total = self.total_len + frame.len() + 1; - if new_total > max.sat() { throw!(frame) } + if new_total > max.sat() { self.tried_full = true; throw!(frame); } self.total_len = new_total; self.frames.push(frame); } + + #[inline] pub fn tried_full(&self) -> bool { self.tried_full } + #[inline] pub fn is_empty(&self) -> bool { self.frames.is_empty() } } impl From for FramesData { @@ -105,22 +126,30 @@ impl Debug for DumpHex<'_> { #[test] fn mime_slip_to_mime() { - fn chk(i: &[u8], exp: Result<&[u8], &str>) { - let mut p = i.to_owned(); - match (exp, check_checkmtu_mimeswap::(10, p.as_mut())) { - (Ok(exp), Ok(())) => assert_eq!( DumpHex(exp), DumpHex(&p) ), - (Err(exp), Err(got)) => assert!( got.to_string().contains(exp) ), - x => panic!("? {:?}", x), - } + use PacketError as PE; + const MTU: u32 = 10; + + fn chk(i: &[u8], exp_p: &[&[u8]], exp_e: &[PacketError]) { + let mut got_e = vec![]; + let got_p = check::<_,true>(MTU, i, |e| got_e.push(e)); + assert_eq!( got_p.iter().map(|b| DumpHex(b)).collect_vec(), + exp_p.iter().map(|b| DumpHex(b)).collect_vec() ); + assert_eq!( got_e, + exp_e ); } chk( &[ SLIP_END, SLIP_ESC, SLIP_ESC_END, b'-', b'X' ], - Ok(&[ SLIP_END, b'-', SLIP_ESC_END, SLIP_ESC, b'X' ]) ); + &[ &[ b'-', SLIP_ESC_END, SLIP_ESC, b'X' ] ], + &[ PE::Empty ]); - chk( &[ SLIP_END, SLIP_ESC, b'y' ], Err("SLIP escape") ); + chk( &[ SLIP_END, SLIP_ESC, b'y' ], &[], + &[ PE::Empty, PE::SLIP ]); chk( &[ SLIP_END, b'-', b'y' ], - Ok(&[ SLIP_END, SLIP_ESC, b'y' ]) ); + &[ &[ SLIP_ESC, b'y' ] ], + &[ PE::Empty ]); - chk( &[b'x'; 20], Err("MTU")); + chk( &[b'x'; 20], + &[ ], + &[ PE::MTU { len: 20, mtu: MTU } ]); } -- 2.30.2