From: Ian Jackson Date: Sat, 31 Jul 2021 13:18:05 +0000 (+0100) Subject: break out Frames X-Git-Tag: hippotat/1.0.0~415 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=e34c88b8a1fb4cad34df1cac443f710af93ad480;p=hippotat.git break out Frames Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index 5f2362c..a138ca4 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -22,13 +22,13 @@ struct ClientContext<'c,C> { fn submit_request<'r, 'c:'r, C:HCC>( c: &'c ClientContext, reqs: &mut Vec>, - upbound: Vec> + upbound: FramesData, ) { let body = hyper::body::Body::wrap_stream( futures::stream::iter( Itertools::intersperse( upbound.into_iter().map(|u| Bytes::from(u)), - slip::SLIP_END_SLICE.into() + SLIP_END_SLICE.into() ).map(Ok::<_,Void>) ) ); @@ -122,24 +122,23 @@ async fn run_client( if tx_defer.is_none() && reqs.len() < ic.max_requests_outstanding.sat() => { - let mut upbound_total = 0; - let mut upbound: Vec> = vec![]; + let mut upbound = Frames::default(); let mut to_process: Option>,_>> = Some(packet); while let Some(packet) = to_process.take() { let mut packet = packet.context("read from ipif")? .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?; - if let Ok(()) = slip::check_checkmtu_mimeswap + if let Ok(()) = check_checkmtu_mimeswap ::(&ic, &mut packet) { - let new_upbound_total = packet.len() + upbound_total + 1; - if new_upbound_total > ic.max_batch_up.sat() { - tx_defer = Some(packet); - break; - } - upbound_total = new_upbound_total; - upbound.push(packet); + match upbound.add(ic.max_batch_up, packet) { + Err(packet) => { + tx_defer = Some(packet); + break; + } + Ok(()) => { }, + }; // we rely oin `next_segment` being cancellation-safe, // which isn't documented as true but seems reasonably safe pin!{ let next_segment = tx_stream.next_segment(); } @@ -150,10 +149,10 @@ async fn run_client( } } assert!( to_process.is_none() ); - dbg!(&reqs.len(), &upbound_total, &upbound.len()); + dbg!(&reqs.len(), &upbound); //: impl futures::Stream> - submit_request(&c, &mut reqs, upbound)?; + submit_request(&c, &mut reqs, upbound.into())?; } () = async { }, diff --git a/src/prelude.rs b/src/prelude.rs index 48febde..b23be67 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -44,7 +44,7 @@ pub use crate::config::{self, InstanceConfig, u32Ext as _}; pub use crate::utils::*; pub use crate::reporter::*; pub use crate::types::*; -pub use crate::slip; +pub use crate::slip::*; pub use anyhow::Error as AE; pub use ErrorKind as EK; diff --git a/src/slip.rs b/src/slip.rs index b7be70b..364901d 100644 --- a/src/slip.rs +++ b/src/slip.rs @@ -28,3 +28,33 @@ pub fn check_checkmtu_mimeswap } } } + +pub type Frame = Vec; +pub type FramesData = Vec>; + +#[derive(Default)] +pub struct Frames { + frames: FramesData, + total_len: usize, +} + +impl Debug for Frames { + #[throws(fmt::Error)] + fn fmt(&self, f: &mut fmt::Formatter) { + write!(f, "Frames{{n={},len={}}}", &self.frames.len(), &self.total_len)?; + } +} + +impl Frames { + #[throws(Frame)] + pub fn add(&mut self, max: u32, frame: Frame) { + let new_total = self.total_len + frame.len() + 1; + if new_total > max.sat() { throw!(frame) } + self.total_len = new_total; + self.frames.push(frame); + } +} + +impl From for FramesData { + fn from(frames: Frames) -> FramesData { frames.frames } +}