async {
loop {
select! {
- packet = tx_stream.next_segment(),
+ packet = async {
+ // cancellation safety: if this future is polled, we might
+ // move out of tx_defer, but then we will be immediately
+ // ready and yield it inot packet
+ if let Some(y) = tx_defer.take() {
+ Ok(Some(y))
+ } else {
+ tx_stream.next_segment().await
+ }
+ },
if tx_defer.is_none() &&
reqs.len() < ic.max_requests_outstanding.sat() =>
{
submit_request(&ic, &hclient, &mut reqs, upbound)?;
}
+ () = async { },
+ if reqs.len() < ic.target_requests_outstanding.sat() ||
+ (tx_defer.is_some() &&
+ reqs.len() < ic.max_requests_outstanding.sat()) =>
+ {
+ let upbound = tx_defer.take().into_iter().collect_vec();
+ submit_request(&ic, &hclient, &mut reqs, upbound)?;
+ }
+
(got, goti, _) = async { future::select_all(&mut reqs).await },
if ! reqs.is_empty() =>
{