let tx_stream = ipif.stdout.take().unwrap();
let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_ESC);
+ let mut tx_defer = None;
+
let stream_for_rx = ipif.stdin .take().unwrap();
+
let mut reqs = Vec::with_capacity(ic.max_requests_outstanding.sat());
async {
loop {
select! {
packet = tx_stream.next_segment(),
- if reqs.len() < ic.max_requests_outstanding.sat() => {
- let packet = packet.context("read from ipif")?;
+ if tx_defer.is_none() &&
+ reqs.len() < ic.max_requests_outstanding.sat() =>
+ {
+ let mut upbound_total = 0;
+ let mut upbound = vec![];
+ let mut to_process: Option<Result<Option<Vec<u8>>,_>>
+ = Some(packet);
+ while let Some(packet) = to_process.take() {
+ let mut packet =
+ packet.context("read from ipif")?
+ .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
+ if let Ok(()) = slip::check_checkmtu_mimeify(&mut packet, &ic) {
+ let new_upbound_total = packet.len() + upbound_total + 1;
+ if new_upbound_total > ic.max_batch_up.sat() {
+ tx_defer = Some(packet);
+ break;
+ }
+ upbound_total = new_upbound_total;
+ upbound.push(packet);
+ // we rely on `next_segment` being cancellation-safe,
+ // which isn't documented as true but seems reasonably safe
+ pin!{ let next_segment = tx_stream.next_segment(); }
+ to_process = match poll!(next_segment) {
+ Poll::Ready(p) => Some(p),
+ Poll::Pending => None,
+ };
+ }
+ }
+ assert!( to_process.is_none() );
+ dbg!(&reqs.len(), &upbound_total, &upbound.len());
+/*
+ Body
+ made out of Stream
+ made out of futures::stream::iter
+
+
+ let datalen =
+
+ let o = 0;
+ let i =
+*/
reqs.push(());
// xxx make new request
}
pub use std::process;
pub use std::str::FromStr;
pub use std::sync::Arc;
+pub use std::task::Poll;
pub use anyhow::{anyhow, Context};
pub use extend::ext;
pub use fehler::{throw, throws};
-pub use futures::future;
+pub use futures::{poll, future};
pub use hyper::Uri;
pub use hyper_tls::HttpsConnector;
pub use ipnet::IpNet;
pub use log::{debug, info, error};
pub use structopt::StructOpt;
pub use tokio::io::AsyncBufReadExt;
+pub use tokio::pin;
pub use tokio::select;
pub use tokio::task;
pub use tokio::time::Duration;
pub use crate::config::{self, InstanceConfig, u32Ext as _};
pub use crate::utils::*;
pub use crate::types::*;
+pub use crate::slip;
pub use anyhow::Error as AE;
pub use ErrorKind as EK;