chiark / gitweb /
updates
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 6 Sep 2020 21:15:42 +0000 (22:15 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 6 Sep 2020 21:15:42 +0000 (22:15 +0100)
src/global.rs
src/imports.rs
src/sse.rs
src/updates.rs
templates/script.ts

index 71ad4f7d4a9c161cd6edf8567031a8205b03129d..31bacd456a0eedf6e53130398fd5af3c1a704921 100644 (file)
@@ -434,6 +434,7 @@ impl InstanceGuard<'_> {
       if let Some(mut updates) = self.updates.remove(oldplayer) {
         updates. push(PreparedUpdate {
           gen: self.c.g.gs.gen,
+          when: Instant::now(),
           us : vec![ PreparedUpdateEntry::Error(
             None,
             ErrorSignaledViaUpdate::PlayerRemoved
index 004d5db8b99f23c86c78578eb49fd8b642544fd9..0e671ac9f73cfe3899ae3b4459ffbb34bc974554 100644 (file)
@@ -70,6 +70,8 @@ pub use arrayvec::ArrayVec;
 pub use log::{trace,debug,info,warn,error};
 pub use log::log;
 
+pub use num_traits::Bounded;
+
 pub use flexi_logger::{LogSpecification};
 
 pub use crate::global::*;
index 8c0da349119afafe4e5913cc042afedb4f01d0d4..0f20c011501a972fc7d968bcf7eb0f4e9c58b14f 100644 (file)
@@ -16,6 +16,7 @@ pub struct UpdateId (i64);
 const UPDATE_READER_SIZE : usize = 1024*32;
 const UPDATE_MAX_FRAMING_SIZE : usize = 200;
 const UPDATE_KEEPALIVE : Duration = Duration::from_secs(14);
+const UPDATE_EXPIRE : Duration = Duration::from_secs(66);
 
 struct UpdateReaderWN {
   player : PlayerId,
@@ -53,8 +54,8 @@ impl UpdateReaderWN {
                  id: {}\n\n",
            self.to_send)?;
 
-    debug!("sending to {:?} {:?}: {:?}",
-           &self.player, &self.client, &tu);
+    debug!("sending to {:?} {:?}: #{} {:?}",
+           &self.player, &self.client, self.to_send, &tu);
 
     self.to_send.try_increment().unwrap();
   }
@@ -80,7 +81,7 @@ impl Read for UpdateReader {
              self.player, self.client, self.to_send)?;
     }
 
-    let pu = &mut ig.updates.get(self.player)
+    let pu = &mut ig.updates.get_mut(self.player)
       .ok_or_else(|| self.trouble("player gonee",()))?;
 
     loop {
@@ -94,7 +95,20 @@ impl Read for UpdateReader {
       }
 
       let next = match pu.read_log().get(self.to_send) {
-        Some(next) => next,  None => { break }
+        Some(next) => next,
+        None => {
+          if self.to_send < pu.read_log().front_index()
+          && buf.len() == orig_wanted {
+            write!(buf, "event: updates_expired\ndata: {}\n\n",
+                   self.to_send)
+              .map_err(|e| self.wn.trouble("notify updates expired", &e))?;
+            debug!("updates expired for {} {}, telling client (#{})",
+                   &self.wn.player, &self.wn.client, self.to_send);
+            self.wn.to_send = UpdateId::max_value();
+            // ^ just stops us spewing, hopefully client will notice
+          }
+          break
+        }
       };
       let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len();
       if next_len > buf.len() {
@@ -116,6 +130,9 @@ impl Read for UpdateReader {
 
       self.wn.write_next(&mut buf, &next)
         .map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?;
+
+      let before = next.when - UPDATE_EXPIRE;
+      pu.expire_upto(before);
     }
 
     let cv = pu.get_cv();
@@ -166,6 +183,11 @@ impl Display for UpdateId {
   }
 }
 
+impl Bounded for UpdateId {
+  fn max_value() -> Self { UpdateId(Bounded::max_value()) }
+  fn min_value() -> Self { UpdateId(Bounded::min_value()) }
+}
+
 impl StableIndexOffset for UpdateId {
   fn try_increment(&mut self) -> Option<()> { self.0.try_increment() }
   fn try_decrement(&mut self) -> Option<()> { self.0.try_decrement() }
index 323783fb2730998b740e5b33ead88b17daac98f5..d8f50fff5f2cfa5b616cec1fc052702686d3eb63 100644 (file)
@@ -32,6 +32,7 @@ pub struct PlayerUpdates {
 #[derive(Debug)]
 pub struct PreparedUpdate {
   pub gen : Generation,
+  pub when: Instant,
   pub us : Vec<PreparedUpdateEntry>,
 }
 
@@ -112,6 +113,21 @@ impl PlayerUpdates {
   // forget to cv.notify
   pub fn read_log(&self) -> &PlayerUpdatesLog { &self.log }
   pub fn get_cv(&self) -> Arc<Condvar> { self.cv.clone() }
+
+  pub fn expire_upto(&mut self, before: Instant) {
+    loop {
+      if self.log.len() < 2 { break }
+
+      let front = {
+        if let Some(front) = self.log.front() { front }
+        else { break }
+      };
+      if front.when >= before { break }
+
+      trace!("update expiring #{}", self.log.front_index());
+      self.log.pop_front();
+    }
+  }
 }
 
 impl PreparedUpdate {
@@ -323,6 +339,7 @@ impl<'r> PrepareUpdatesBuffer<'r> {
 impl<'r> Drop for PrepareUpdatesBuffer<'r> {
   fn drop(&mut self) {
     let update = PreparedUpdate {
+      when: Instant::now(),
       gen: self.gen,
       us: mem::take(&mut self.us),
     };
index 453d90cc50d99dd98945e0b799e7df9d473ef71c..2a2ac90de579e3cd41d6cc64c00f9fc147c8b1d1 100644 (file)
@@ -662,6 +662,10 @@ function startup() {
     console.log('GOTDATA', event);
     status_node.innerHTML = (event as any).data;
   });
+  es.addEventListener('updates_expired', function(event) {
+    console.log('UPDATES-EXPIRED', event);
+    string_report_error('connection to server interrupted too long');
+  });
   es.onerror = function(e) {
     console.log('FOO',e,es);
     json_report_error({