From: Ian Jackson Date: Mon, 26 Jul 2021 23:59:55 +0000 (+0100) Subject: client: wip code X-Git-Tag: hippotat/1.0.0~436 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=1acf0e05b7b3b280b5e256f95890812ad960faaf;p=hippotat.git client: wip code Signed-off-by: Ian Jackson --- diff --git a/src/bin/client.rs b/src/bin/client.rs index a9c0fad..912354c 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -37,15 +37,57 @@ async fn run_client(ic: InstanceConfig, hclient: Arc>) let tx_stream = ipif.stdout.take().unwrap(); let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_ESC); + let mut tx_defer = None; + let stream_for_rx = ipif.stdin .take().unwrap(); + let mut reqs = Vec::with_capacity(ic.max_requests_outstanding.sat()); async { loop { select! { packet = tx_stream.next_segment(), - if reqs.len() < ic.max_requests_outstanding.sat() => { - let packet = packet.context("read from ipif")?; + if tx_defer.is_none() && + reqs.len() < ic.max_requests_outstanding.sat() => + { + let mut upbound_total = 0; + let mut upbound = vec![]; + let mut to_process: Option>,_>> + = Some(packet); + while let Some(packet) = to_process.take() { + let mut packet = + packet.context("read from ipif")? + .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?; + if let Ok(()) = slip::check_checkmtu_mimeify(&mut packet, &ic) { + let new_upbound_total = packet.len() + upbound_total + 1; + if new_upbound_total > ic.max_batch_up.sat() { + tx_defer = Some(packet); + break; + } + upbound_total = new_upbound_total; + upbound.push(packet); + // we rely on `next_segment` being cancellation-safe, + // which isn't documented as true but seems reasonably safe + pin!{ let next_segment = tx_stream.next_segment(); } + to_process = match poll!(next_segment) { + Poll::Ready(p) => Some(p), + Poll::Pending => None, + }; + } + } + assert!( to_process.is_none() ); + dbg!(&reqs.len(), &upbound_total, &upbound.len()); +/* + Body + made out of Stream + made out of futures::stream::iter + + + let datalen = + + let o = 0; + let i = +*/ reqs.push(()); // xxx make new request } diff --git a/src/lib.rs b/src/lib.rs index 613ae59..1bdd108 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,5 +14,6 @@ pub mod prelude; pub mod config; +pub mod slip; pub mod types; pub mod utils; diff --git a/src/prelude.rs b/src/prelude.rs index 521d82e..d87b2b0 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -16,11 +16,12 @@ pub use std::panic; pub use std::process; pub use std::str::FromStr; pub use std::sync::Arc; +pub use std::task::Poll; pub use anyhow::{anyhow, Context}; pub use extend::ext; pub use fehler::{throw, throws}; -pub use futures::future; +pub use futures::{poll, future}; pub use hyper::Uri; pub use hyper_tls::HttpsConnector; pub use ipnet::IpNet; @@ -29,6 +30,7 @@ pub use lazy_regex::{regex_is_match, regex_replace_all}; pub use log::{debug, info, error}; pub use structopt::StructOpt; pub use tokio::io::AsyncBufReadExt; +pub use tokio::pin; pub use tokio::select; pub use tokio::task; pub use tokio::time::Duration; @@ -37,6 +39,7 @@ pub use void::{self, Void, ResultVoidExt, ResultVoidErrExt}; pub use crate::config::{self, InstanceConfig, u32Ext as _}; pub use crate::utils::*; pub use crate::types::*; +pub use crate::slip; pub use anyhow::Error as AE; pub use ErrorKind as EK; diff --git a/src/slip.rs b/src/slip.rs new file mode 100644 index 0000000..0de0947 --- /dev/null +++ b/src/slip.rs @@ -0,0 +1,10 @@ +// Copyright 2021 Ian Jackson and contributors to Hippotat +// SPDX-License-Identifier: AGPL-3.0-or-later +// There is NO WARRANTY. + +use crate::prelude::*; + +#[throws(AE)] +pub fn check_checkmtu_mimeify(_data: &mut [u8], _ic: &InstanceConfig) { + // xxx +}