struct ClientContext<'c,C> {
ic: &'c InstanceConfig,
- effective_http_timeout: Duration,
+ effective_http_timeout: Duration, // also min err report interval, xxx doc
hclient: &'c Arc<hyper::Client<C>>,
reporter: &'c parking_lot::Mutex<Reporter<'c>>,
}
async {
loop {
+ let rx_queue_space =
+ if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ {
+ // xxx make this separate option ? docs say server only
+ Ok(())
+ } else {
+ Err(())
+ };
+
select! {
y = rx_stream.write_all_buf(&mut rx_queue),
if ! rx_queue.is_empty() =>
},
_ = async { },
- if reporter.lock().filter(None, {
- if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ {
- // xxx make this separate option ? docs say server only
- Ok(())
- } else {
- Err(anyhow!("rx queue full"))
- }
- }).is_some() &&
+ if rx_queue_space.is_ok() &&
(reqs.len() < ic.target_requests_outstanding.sat() ||
(reqs.len() < ic.max_requests_outstanding.sat() &&
! upbound.is_empty()))
}, |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e));
}
- }
+ },
+
+ _ = tokio::time::sleep(c.effective_http_timeout),
+ if rx_queue_space.is_err() =>
+ {
+ reporter.lock().filter(None, Err::<Void,_>(
+ anyhow!("rx queue full, blocked")
+ ));
+ },
}
}
}.await
let now = Instant::now();
if let Some(rep) = &self.last_report {
if now - rep.when < match rep.ok {
- Ok(()) => Duration::from_secs(3600),
- Err(()) => Duration::from_secs(30),
+ Ok(()) => Duration::from_secs(3600), // xxx config?
+ Err(()) => Duration::from_secs(30), // xxx config?
} {
return
}