reporter: &'c parking_lot::Mutex<Reporter<'c>>,
}
+#[derive(Debug)]
+struct TxQueued {
+ expires: Instant,
+ data: Box<[u8]>,
+}
+
#[throws(AE)]
fn submit_request<'r, 'c:'r, C:HCC>(
c: &'c ClientContext<C>,
let mut rx_stream = ipif.stdin .take().unwrap();
let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END);
- let mut tx_queue: VecDeque<Box<[u8]>> = default();
+ let mut tx_queue: VecDeque<TxQueued> = default();
let mut upbound = Frames::default();
let mut reqs: Vec<OutstandingRequest>
if addr != ic.link.client.0 { throw!(PE::Src(addr)) }
Ok(())
}) {
- Ok(packet) => tx_queue.push_back(packet),
+ Ok(data) => tx_queue.push_back(TxQueued {
+ data,
+ expires: Instant::now() + ic.max_queue_time
+ }),
Err(PE::Empty) => { },
Err(e@ PE::Src(_)) => debug!("{}: tx discarding: {}", &ic, e),
Err(e) => error!("{}: tx discarding: {}", &ic, e),
if ! upbound.tried_full() &&
! tx_queue.is_empty() =>
{
- while let Some(packet) = tx_queue.pop_front() {
- match upbound.add(ic.max_batch_up, packet.into()/*todo:504*/) {
- Err(packet) => { tx_queue.push_front(packet.into()/*todo:504*/); break; }
+ while let Some(TxQueued { data, expires }) = tx_queue.pop_front() {
+ match upbound.add(ic.max_batch_up, data.into()/*todo:504*/) {
+ Err(data) => { tx_queue.push_front(TxQueued { data: data.into(), expires }); break; }
Ok(()) => { },
}
}
// Capped settings:
#[limited] pub max_batch_down: u32,
- #[limited] pub max_queue_time: Duration,
+ #[limited] pub max_queue_time: Duration, // xxx client unlimited
#[limited] pub http_timeout: Duration,
#[limited] pub target_requests_outstanding: u32,