async {
loop {
- let rx_fullness =
- if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ {
- // xxx make this separate option ? docs say server only
- Ok(())
- } else {
- Err(anyhow!("rx queue full"))
- };
- let rx_fullness_ok = reporter.lock().report(None, rx_fullness);
-
select! {
data = tx_stream.next_segment(),
if packets.is_empty() =>
},
_ = async { },
- if rx_fullness_ok.is_some() &&
- (reqs.len() < ic.target_requests_outstanding.sat() ||
- (reqs.len() < ic.max_requests_outstanding.sat() &&
- ! upbound.is_empty()))
+ if reporter.lock().report(None, {
+ if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ {
+ // xxx make this separate option ? docs say server only
+ Ok(())
+ } else {
+ Err(anyhow!("rx queue full"))
+ }
+ }).is_some() &&
+ (reqs.len() < ic.target_requests_outstanding.sat() ||
+ (reqs.len() < ic.max_requests_outstanding.sat() &&
+ ! upbound.is_empty()))
// xxx backpressure, if too much in rx_queue
=>
{