From a77d99068bd2d9a698cffe8d100f8369c7b83add Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Wed, 4 Aug 2021 23:10:27 +0100 Subject: [PATCH] record expiry time of queue items Signed-off-by: Ian Jackson --- src/bin/client.rs | 19 ++++++++++++++----- src/config.rs | 2 +- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index e2d061d..a1cdce4 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -31,6 +31,12 @@ struct ClientContext<'c,C> { reporter: &'c parking_lot::Mutex>, } +#[derive(Debug)] +struct TxQueued { + expires: Instant, + data: Box<[u8]>, +} + #[throws(AE)] fn submit_request<'r, 'c:'r, C:HCC>( c: &'c ClientContext, @@ -198,7 +204,7 @@ async fn run_client( let mut rx_stream = ipif.stdin .take().unwrap(); let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END); - let mut tx_queue: VecDeque> = default(); + let mut tx_queue: VecDeque = default(); let mut upbound = Frames::default(); let mut reqs: Vec @@ -237,7 +243,10 @@ async fn run_client( if addr != ic.link.client.0 { throw!(PE::Src(addr)) } Ok(()) }) { - Ok(packet) => tx_queue.push_back(packet), + Ok(data) => tx_queue.push_back(TxQueued { + data, + expires: Instant::now() + ic.max_queue_time + }), Err(PE::Empty) => { }, Err(e@ PE::Src(_)) => debug!("{}: tx discarding: {}", &ic, e), Err(e) => error!("{}: tx discarding: {}", &ic, e), @@ -248,9 +257,9 @@ async fn run_client( if ! upbound.tried_full() && ! tx_queue.is_empty() => { - while let Some(packet) = tx_queue.pop_front() { - match upbound.add(ic.max_batch_up, packet.into()/*todo:504*/) { - Err(packet) => { tx_queue.push_front(packet.into()/*todo:504*/); break; } + while let Some(TxQueued { data, expires }) = tx_queue.pop_front() { + match upbound.add(ic.max_batch_up, data.into()/*todo:504*/) { + Err(data) => { tx_queue.push_front(TxQueued { data: data.into(), expires }); break; } Ok(()) => { }, } } diff --git a/src/config.rs b/src/config.rs index a36ec58..b1e27aa 100644 --- a/src/config.rs +++ b/src/config.rs @@ -16,7 +16,7 @@ pub struct InstanceConfig { // Capped settings: #[limited] pub max_batch_down: u32, - #[limited] pub max_queue_time: Duration, + #[limited] pub max_queue_time: Duration, // xxx client unlimited #[limited] pub http_timeout: Duration, #[limited] pub target_requests_outstanding: u32, -- 2.30.2