chiark / gitweb /
record expiry time of queue items
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 4 Aug 2021 22:10:27 +0000 (23:10 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Wed, 4 Aug 2021 22:10:27 +0000 (23:10 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs
src/config.rs

index e2d061d2dc850556fdbe89480371f34f57877ac6..a1cdce4eae485c0abcd619aaa9c85cbe0ccb9418 100644 (file)
@@ -31,6 +31,12 @@ struct ClientContext<'c,C> {
   reporter: &'c parking_lot::Mutex<Reporter<'c>>,
 }
 
+#[derive(Debug)]
+struct TxQueued {
+  expires: Instant,
+  data: Box<[u8]>,
+}
+
 #[throws(AE)]
 fn submit_request<'r, 'c:'r, C:HCC>(
   c: &'c ClientContext<C>,
@@ -198,7 +204,7 @@ async fn run_client<C:HCC>(
   let mut rx_stream = ipif.stdin .take().unwrap();
 
   let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END);
-  let mut tx_queue: VecDeque<Box<[u8]>> = default();
+  let mut tx_queue: VecDeque<TxQueued> = default();
   let mut upbound = Frames::default();
 
   let mut reqs: Vec<OutstandingRequest>
@@ -237,7 +243,10 @@ async fn run_client<C:HCC>(
             if addr != ic.link.client.0 { throw!(PE::Src(addr)) }
             Ok(())
           }) {
-            Ok(packet) => tx_queue.push_back(packet),
+            Ok(data) => tx_queue.push_back(TxQueued {
+              data,
+              expires: Instant::now() + ic.max_queue_time
+            }),
             Err(PE::Empty) => { },
             Err(e@ PE::Src(_)) => debug!("{}: tx discarding: {}", &ic, e),
             Err(e) => error!("{}: tx discarding: {}", &ic, e),
@@ -248,9 +257,9 @@ async fn run_client<C:HCC>(
         if ! upbound.tried_full() &&
            ! tx_queue.is_empty() =>
         {
-          while let Some(packet) = tx_queue.pop_front() {
-            match upbound.add(ic.max_batch_up, packet.into()/*todo:504*/) {
-              Err(packet) => { tx_queue.push_front(packet.into()/*todo:504*/); break; }
+          while let Some(TxQueued { data, expires }) = tx_queue.pop_front() {
+            match upbound.add(ic.max_batch_up, data.into()/*todo:504*/) {
+              Err(data) => { tx_queue.push_front(TxQueued { data: data.into(), expires }); break; }
               Ok(()) => { },
             }
           }
index a36ec584f987864a27bb2803ca553dce9d574dd8..b1e27aa7a467d82e776f25621b55086ca6b566f0 100644 (file)
@@ -16,7 +16,7 @@ pub struct InstanceConfig {
 
   // Capped settings:
   #[limited]    pub max_batch_down:               u32,
-  #[limited]    pub max_queue_time:               Duration,
+  #[limited]    pub max_queue_time:               Duration, // xxx client unlimited
   #[limited]    pub http_timeout:                 Duration,
   #[limited]    pub target_requests_outstanding:  u32,