struct UpdateReader {
wn: UpdateReaderWN,
+ overflow: Option<io::Cursor<Box<[u8]>>>,
need_flush : bool,
gref : InstanceRef,
keepalives : Wrapping<u32>,
self.to_send.try_increment().unwrap();
}
+
+ fn trouble<T:Debug>(&self, m: &'static str, info: T) -> io::Error {
+ error!("update sending error: {}: {} {}: {:?}",
+ m, &self.player, &self.client, &info);
+ io::Error::new(io::ErrorKind::Other, anyhow!("internal error"))
+ }
}
impl Read for UpdateReader {
fn read(&mut self, orig_buf: &mut [u8]) -> Result<usize,io::Error> {
- let em : fn(&'static str) -> io::Error =
- |s| io::Error::new(io::ErrorKind::Other, anyhow!(s));
- let mut ig = self.gref.lock().map_err(|_| em("poison"))?;
+ let mut ig = self.gref.lock()
+ .map_err(|e| self.trouble("game corrupted", &e))?;
let orig_wanted = orig_buf.len();
let mut buf = &mut *orig_buf;
}
let pu = &mut ig.updates.get(self.player)
- .ok_or_else(|| em("player gonee"))?;
+ .ok_or_else(|| self.trouble("player gonee",()))?;
loop {
+ if let Some(ref mut overflow) = self.overflow {
+ let got = overflow.read(&mut buf)
+ .map_err(|e| self.wn.trouble("overflow failed", &e))?;
+ debug!("read from overflow {} {} len={}",
+ &self.player, &self.client, got);
+ if got == 0 { self.overflow = None }
+ buf = &mut buf[got..];
+ }
+
let next = match pu.read_log().get(self.to_send) {
Some(next) => next, None => { break }
};
let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len();
- if next_len > buf.len() { break }
+ if next_len > buf.len() {
+ if buf.len() != orig_wanted { break }
+
+ if self.overflow.is_some() {
+ throw!(self.wn.trouble("overflow mismanaged",()));
+ }
+ self.overflow = {
+ let mut overflow = Vec::with_capacity(next_len);
+ self.wn.write_next(&mut overflow, &next)
+ .map_err(|e| self.wn.trouble("overflow.write_next",&e))?;
+ debug!("overflow {} {}, len={}",
+ &self.wn.player, &self.wn.client, &overflow.len());
+ Some(io::Cursor::new(overflow.into_boxed_slice()))
+ };
+ continue;
+ }
- // xxx handle overflow by allocating
self.wn.write_next(&mut buf, &next)
- .map_err(|e| {
- error!("UpdateReader.write_next: {} {} {:?}",
- &self.player, &self.client, e);
- e
- })?;
-
+ .map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?;
}
let cv = pu.get_cv();
})() == None { return Ok(0) }
ig.c = cv.wait_timeout(ig.c, UPDATE_KEEPALIVE)
- .map_err(|_| em("poison"))?.0;
+ .map_err(|e| self.wn.trouble("cv / mutex poison",&e))?.0;
write!(buf, "event: commsworking\n\
data: server online {} {} G{} K{}\n\n",
UpdateReader {
need_flush : false,
keepalives : Wrapping(0),
+ overflow : None,
gref,
init_confirmation_send : iter::once(()),
wn : UpdateReaderWN {