fn submit_request<'r, 'c:'r, C:HCC>(
c: &'c ClientContext<C>,
reqs: &mut Vec<OutstandingRequest<'r>>,
- upbound: Vec<Vec<u8>>
+ upbound: FramesData,
) {
let body = hyper::body::Body::wrap_stream(
futures::stream::iter(
Itertools::intersperse(
upbound.into_iter().map(|u| Bytes::from(u)),
- slip::SLIP_END_SLICE.into()
+ SLIP_END_SLICE.into()
).map(Ok::<_,Void>)
)
);
if tx_defer.is_none() &&
reqs.len() < ic.max_requests_outstanding.sat() =>
{
- let mut upbound_total = 0;
- let mut upbound: Vec<Vec<u8>> = vec![];
+ let mut upbound = Frames::default();
let mut to_process: Option<Result<Option<Vec<u8>>,_>>
= Some(packet);
while let Some(packet) = to_process.take() {
let mut packet =
packet.context("read from ipif")?
.ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
- if let Ok(()) = slip::check_checkmtu_mimeswap
+ if let Ok(()) = check_checkmtu_mimeswap
::<true>(&ic, &mut packet)
{
- let new_upbound_total = packet.len() + upbound_total + 1;
- if new_upbound_total > ic.max_batch_up.sat() {
- tx_defer = Some(packet);
- break;
- }
- upbound_total = new_upbound_total;
- upbound.push(packet);
+ match upbound.add(ic.max_batch_up, packet) {
+ Err(packet) => {
+ tx_defer = Some(packet);
+ break;
+ }
+ Ok(()) => { },
+ };
// we rely oin `next_segment` being cancellation-safe,
// which isn't documented as true but seems reasonably safe
pin!{ let next_segment = tx_stream.next_segment(); }
}
}
assert!( to_process.is_none() );
- dbg!(&reqs.len(), &upbound_total, &upbound.len());
+ dbg!(&reqs.len(), &upbound);
//: impl futures::Stream<Cow<&[u8]>>
- submit_request(&c, &mut reqs, upbound)?;
+ submit_request(&c, &mut reqs, upbound.into())?;
}
() = async { },
}
}
}
+
+pub type Frame = Vec<u8>;
+pub type FramesData = Vec<Vec<u8>>;
+
+#[derive(Default)]
+pub struct Frames {
+ frames: FramesData,
+ total_len: usize,
+}
+
+impl Debug for Frames {
+ #[throws(fmt::Error)]
+ fn fmt(&self, f: &mut fmt::Formatter) {
+ write!(f, "Frames{{n={},len={}}}", &self.frames.len(), &self.total_len)?;
+ }
+}
+
+impl Frames {
+ #[throws(Frame)]
+ pub fn add(&mut self, max: u32, frame: Frame) {
+ let new_total = self.total_len + frame.len() + 1;
+ if new_total > max.sat() { throw!(frame) }
+ self.total_len = new_total;
+ self.frames.push(frame);
+ }
+}
+
+impl From<Frames> for FramesData {
+ fn from(frames: Frames) -> FramesData { frames.frames }
+}