From: Ian Jackson Date: Sun, 6 Sep 2020 21:15:42 +0000 (+0100) Subject: updates X-Git-Tag: otter-0.2.0~991 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=febd11dda714e59bf9ee66c672e7d532fc694982;p=otter.git updates --- diff --git a/src/global.rs b/src/global.rs index 71ad4f7d..31bacd45 100644 --- a/src/global.rs +++ b/src/global.rs @@ -434,6 +434,7 @@ impl InstanceGuard<'_> { if let Some(mut updates) = self.updates.remove(oldplayer) { updates. push(PreparedUpdate { gen: self.c.g.gs.gen, + when: Instant::now(), us : vec![ PreparedUpdateEntry::Error( None, ErrorSignaledViaUpdate::PlayerRemoved diff --git a/src/imports.rs b/src/imports.rs index 004d5db8..0e671ac9 100644 --- a/src/imports.rs +++ b/src/imports.rs @@ -70,6 +70,8 @@ pub use arrayvec::ArrayVec; pub use log::{trace,debug,info,warn,error}; pub use log::log; +pub use num_traits::Bounded; + pub use flexi_logger::{LogSpecification}; pub use crate::global::*; diff --git a/src/sse.rs b/src/sse.rs index 8c0da349..0f20c011 100644 --- a/src/sse.rs +++ b/src/sse.rs @@ -16,6 +16,7 @@ pub struct UpdateId (i64); const UPDATE_READER_SIZE : usize = 1024*32; const UPDATE_MAX_FRAMING_SIZE : usize = 200; const UPDATE_KEEPALIVE : Duration = Duration::from_secs(14); +const UPDATE_EXPIRE : Duration = Duration::from_secs(66); struct UpdateReaderWN { player : PlayerId, @@ -53,8 +54,8 @@ impl UpdateReaderWN { id: {}\n\n", self.to_send)?; - debug!("sending to {:?} {:?}: {:?}", - &self.player, &self.client, &tu); + debug!("sending to {:?} {:?}: #{} {:?}", + &self.player, &self.client, self.to_send, &tu); self.to_send.try_increment().unwrap(); } @@ -80,7 +81,7 @@ impl Read for UpdateReader { self.player, self.client, self.to_send)?; } - let pu = &mut ig.updates.get(self.player) + let pu = &mut ig.updates.get_mut(self.player) .ok_or_else(|| self.trouble("player gonee",()))?; loop { @@ -94,7 +95,20 @@ impl Read for UpdateReader { } let next = match pu.read_log().get(self.to_send) { - Some(next) => next, None => { break } + Some(next) => next, + None => { + if self.to_send < pu.read_log().front_index() + && buf.len() == orig_wanted { + write!(buf, "event: updates_expired\ndata: {}\n\n", + self.to_send) + .map_err(|e| self.wn.trouble("notify updates expired", &e))?; + debug!("updates expired for {} {}, telling client (#{})", + &self.wn.player, &self.wn.client, self.to_send); + self.wn.to_send = UpdateId::max_value(); + // ^ just stops us spewing, hopefully client will notice + } + break + } }; let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len(); if next_len > buf.len() { @@ -116,6 +130,9 @@ impl Read for UpdateReader { self.wn.write_next(&mut buf, &next) .map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?; + + let before = next.when - UPDATE_EXPIRE; + pu.expire_upto(before); } let cv = pu.get_cv(); @@ -166,6 +183,11 @@ impl Display for UpdateId { } } +impl Bounded for UpdateId { + fn max_value() -> Self { UpdateId(Bounded::max_value()) } + fn min_value() -> Self { UpdateId(Bounded::min_value()) } +} + impl StableIndexOffset for UpdateId { fn try_increment(&mut self) -> Option<()> { self.0.try_increment() } fn try_decrement(&mut self) -> Option<()> { self.0.try_decrement() } diff --git a/src/updates.rs b/src/updates.rs index 323783fb..d8f50fff 100644 --- a/src/updates.rs +++ b/src/updates.rs @@ -32,6 +32,7 @@ pub struct PlayerUpdates { #[derive(Debug)] pub struct PreparedUpdate { pub gen : Generation, + pub when: Instant, pub us : Vec, } @@ -112,6 +113,21 @@ impl PlayerUpdates { // forget to cv.notify pub fn read_log(&self) -> &PlayerUpdatesLog { &self.log } pub fn get_cv(&self) -> Arc { self.cv.clone() } + + pub fn expire_upto(&mut self, before: Instant) { + loop { + if self.log.len() < 2 { break } + + let front = { + if let Some(front) = self.log.front() { front } + else { break } + }; + if front.when >= before { break } + + trace!("update expiring #{}", self.log.front_index()); + self.log.pop_front(); + } + } } impl PreparedUpdate { @@ -323,6 +339,7 @@ impl<'r> PrepareUpdatesBuffer<'r> { impl<'r> Drop for PrepareUpdatesBuffer<'r> { fn drop(&mut self) { let update = PreparedUpdate { + when: Instant::now(), gen: self.gen, us: mem::take(&mut self.us), }; diff --git a/templates/script.ts b/templates/script.ts index 453d90cc..2a2ac90d 100644 --- a/templates/script.ts +++ b/templates/script.ts @@ -662,6 +662,10 @@ function startup() { console.log('GOTDATA', event); status_node.innerHTML = (event as any).data; }); + es.addEventListener('updates_expired', function(event) { + console.log('UPDATES-EXPIRED', event); + string_report_error('connection to server interrupted too long'); + }); es.onerror = function(e) { console.log('FOO',e,es); json_report_error({