From: Ian Jackson Date: Sun, 6 Sep 2020 20:16:40 +0000 (+0100) Subject: handle update reader buf overflow X-Git-Tag: otter-0.2.0~992 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=5b4901730faf14533bf94e239b3ca1496fe6cdd2;p=otter.git handle update reader buf overflow --- diff --git a/src/sse.rs b/src/sse.rs index 9357ead4..8c0da349 100644 --- a/src/sse.rs +++ b/src/sse.rs @@ -25,6 +25,7 @@ struct UpdateReaderWN { struct UpdateReader { wn: UpdateReaderWN, + overflow: Option>>, need_flush : bool, gref : InstanceRef, keepalives : Wrapping, @@ -57,14 +58,19 @@ impl UpdateReaderWN { self.to_send.try_increment().unwrap(); } + + fn trouble(&self, m: &'static str, info: T) -> io::Error { + error!("update sending error: {}: {} {}: {:?}", + m, &self.player, &self.client, &info); + io::Error::new(io::ErrorKind::Other, anyhow!("internal error")) + } } impl Read for UpdateReader { fn read(&mut self, orig_buf: &mut [u8]) -> Result { - let em : fn(&'static str) -> io::Error = - |s| io::Error::new(io::ErrorKind::Other, anyhow!(s)); - let mut ig = self.gref.lock().map_err(|_| em("poison"))?; + let mut ig = self.gref.lock() + .map_err(|e| self.trouble("game corrupted", &e))?; let orig_wanted = orig_buf.len(); let mut buf = &mut *orig_buf; @@ -75,23 +81,41 @@ impl Read for UpdateReader { } let pu = &mut ig.updates.get(self.player) - .ok_or_else(|| em("player gonee"))?; + .ok_or_else(|| self.trouble("player gonee",()))?; loop { + if let Some(ref mut overflow) = self.overflow { + let got = overflow.read(&mut buf) + .map_err(|e| self.wn.trouble("overflow failed", &e))?; + debug!("read from overflow {} {} len={}", + &self.player, &self.client, got); + if got == 0 { self.overflow = None } + buf = &mut buf[got..]; + } + let next = match pu.read_log().get(self.to_send) { Some(next) => next, None => { break } }; let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len(); - if next_len > buf.len() { break } + if next_len > buf.len() { + if buf.len() != orig_wanted { break } + + if self.overflow.is_some() { + throw!(self.wn.trouble("overflow mismanaged",())); + } + self.overflow = { + let mut overflow = Vec::with_capacity(next_len); + self.wn.write_next(&mut overflow, &next) + .map_err(|e| self.wn.trouble("overflow.write_next",&e))?; + debug!("overflow {} {}, len={}", + &self.wn.player, &self.wn.client, &overflow.len()); + Some(io::Cursor::new(overflow.into_boxed_slice())) + }; + continue; + } - // xxx handle overflow by allocating self.wn.write_next(&mut buf, &next) - .map_err(|e| { - error!("UpdateReader.write_next: {} {} {:?}", - &self.player, &self.client, e); - e - })?; - + .map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?; } let cv = pu.get_cv(); @@ -117,7 +141,7 @@ impl Read for UpdateReader { })() == None { return Ok(0) } ig.c = cv.wait_timeout(ig.c, UPDATE_KEEPALIVE) - .map_err(|_| em("poison"))?.0; + .map_err(|e| self.wn.trouble("cv / mutex poison",&e))?.0; write!(buf, "event: commsworking\n\ data: server online {} {} G{} K{}\n\n", @@ -181,6 +205,7 @@ pub fn content(iad : InstanceAccessDetails, gen: Generation) UpdateReader { need_flush : false, keepalives : Wrapping(0), + overflow : None, gref, init_confirmation_send : iter::once(()), wn : UpdateReaderWN {