const UPDATE_READER_SIZE : usize = 1024*32;
const UPDATE_MAX_FRAMING_SIZE : usize = 200;
const UPDATE_KEEPALIVE : Duration = Duration::from_secs(14);
+const UPDATE_EXPIRE : Duration = Duration::from_secs(66);
struct UpdateReaderWN {
player : PlayerId,
id: {}\n\n",
self.to_send)?;
- debug!("sending to {:?} {:?}: {:?}",
- &self.player, &self.client, &tu);
+ debug!("sending to {:?} {:?}: #{} {:?}",
+ &self.player, &self.client, self.to_send, &tu);
self.to_send.try_increment().unwrap();
}
self.player, self.client, self.to_send)?;
}
- let pu = &mut ig.updates.get(self.player)
+ let pu = &mut ig.updates.get_mut(self.player)
.ok_or_else(|| self.trouble("player gonee",()))?;
loop {
}
let next = match pu.read_log().get(self.to_send) {
- Some(next) => next, None => { break }
+ Some(next) => next,
+ None => {
+ if self.to_send < pu.read_log().front_index()
+ && buf.len() == orig_wanted {
+ write!(buf, "event: updates_expired\ndata: {}\n\n",
+ self.to_send)
+ .map_err(|e| self.wn.trouble("notify updates expired", &e))?;
+ debug!("updates expired for {} {}, telling client (#{})",
+ &self.wn.player, &self.wn.client, self.to_send);
+ self.wn.to_send = UpdateId::max_value();
+ // ^ just stops us spewing, hopefully client will notice
+ }
+ break
+ }
};
let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len();
if next_len > buf.len() {
self.wn.write_next(&mut buf, &next)
.map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?;
+
+ let before = next.when - UPDATE_EXPIRE;
+ pu.expire_upto(before);
}
let cv = pu.get_cv();
}
}
+impl Bounded for UpdateId {
+ fn max_value() -> Self { UpdateId(Bounded::max_value()) }
+ fn min_value() -> Self { UpdateId(Bounded::min_value()) }
+}
+
impl StableIndexOffset for UpdateId {
fn try_increment(&mut self) -> Option<()> { self.0.try_increment() }
fn try_decrement(&mut self) -> Option<()> { self.0.try_decrement() }
#[derive(Debug)]
pub struct PreparedUpdate {
pub gen : Generation,
+ pub when: Instant,
pub us : Vec<PreparedUpdateEntry>,
}
// forget to cv.notify
pub fn read_log(&self) -> &PlayerUpdatesLog { &self.log }
pub fn get_cv(&self) -> Arc<Condvar> { self.cv.clone() }
+
+ pub fn expire_upto(&mut self, before: Instant) {
+ loop {
+ if self.log.len() < 2 { break }
+
+ let front = {
+ if let Some(front) = self.log.front() { front }
+ else { break }
+ };
+ if front.when >= before { break }
+
+ trace!("update expiring #{}", self.log.front_index());
+ self.log.pop_front();
+ }
+ }
}
impl PreparedUpdate {
impl<'r> Drop for PrepareUpdatesBuffer<'r> {
fn drop(&mut self) {
let update = PreparedUpdate {
+ when: Instant::now(),
gen: self.gen,
us: mem::take(&mut self.us),
};