Ok::<_,AE>(resp)
}).await? }.await;
- let r = c.reporter.lock().report(req_num, r);
+ let r = c.reporter.lock().report(Some(req_num), r);
if r.is_none() {
tokio::time::sleep(c.ic.http_retry).await;
async {
loop {
+ let rx_fullness =
+ if rx_queue.remaining() < ic.max_batch_down.sat() * 3 /* xxx */ {
+ // xxx make this separate option ? docs say server only
+ Ok(())
+ } else {
+ Err(anyhow!("rx queue full"))
+ };
+ let rx_fullness_ok = reporter.lock().report(None, rx_fullness);
+
select! {
data = tx_stream.next_segment(),
if packets.is_empty() =>
},
_ = async { },
- if reqs.len() < ic.target_requests_outstanding.sat() ||
- (reqs.len() < ic.max_requests_outstanding.sat() &&
- ! upbound.is_empty())
+ if rx_fullness_ok.is_some() &&
+ (reqs.len() < ic.target_requests_outstanding.sat() ||
+ (reqs.len() < ic.max_requests_outstanding.sat() &&
+ ! upbound.is_empty()))
// xxx backpressure, if too much in rx_queue
=>
{
}, |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e));
dbg!(&rx_queue);
- rx_queue = default(); // xxx
+ // xxx do writing with
+ // tokio::io::AsyncWriteExt::write_all_buf
}
}
}
ic
} }
- pub fn report<T>(&mut self, req_num: ReqNum, r: Result<T,AE>) -> Option<T> {
+ pub fn report<T>(&mut self, req_num: Option<ReqNum>, r: Result<T,AE>)
+ -> Option<T> {
match r {
Ok(t) => {
// xxx something something success
},
Err(e) => {
// xxx something something error
- warn!("{} #{}: {:?}", self.ic, req_num, e);
+ if let Some(req_num) = req_num {
+ warn!("{} #{}: {:?}", self.ic, req_num, e);
+ } else {
+ warn!("{}: {:?}", self.ic, e);
+ }
None
},
}