chiark / gitweb /
handle update reader buf overflow
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 6 Sep 2020 20:16:40 +0000 (21:16 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 6 Sep 2020 20:16:40 +0000 (21:16 +0100)
src/sse.rs

index 9357ead4c34b2650cc571676853ca94fddd062e1..8c0da349119afafe4e5913cc042afedb4f01d0d4 100644 (file)
@@ -25,6 +25,7 @@ struct UpdateReaderWN {
 
 struct UpdateReader {
   wn: UpdateReaderWN,
+  overflow: Option<io::Cursor<Box<[u8]>>>,
   need_flush : bool,
   gref : InstanceRef,
   keepalives : Wrapping<u32>,
@@ -57,14 +58,19 @@ impl UpdateReaderWN {
 
     self.to_send.try_increment().unwrap();
   }
+
+  fn trouble<T:Debug>(&self, m: &'static str, info: T) -> io::Error {
+    error!("update sending error: {}: {} {}: {:?}",
+           m, &self.player, &self.client, &info);
+    io::Error::new(io::ErrorKind::Other, anyhow!("internal error"))
+  }
 }
 
 impl Read for UpdateReader {
   fn read(&mut self, orig_buf: &mut [u8]) -> Result<usize,io::Error> {
-    let em : fn(&'static str) -> io::Error =
-      |s| io::Error::new(io::ErrorKind::Other, anyhow!(s));
 
-    let mut ig = self.gref.lock().map_err(|_| em("poison"))?;
+    let mut ig = self.gref.lock()
+      .map_err(|e| self.trouble("game corrupted", &e))?;
     let orig_wanted = orig_buf.len();
     let mut buf = &mut *orig_buf;
 
@@ -75,23 +81,41 @@ impl Read for UpdateReader {
     }
 
     let pu = &mut ig.updates.get(self.player)
-      .ok_or_else(|| em("player gonee"))?;
+      .ok_or_else(|| self.trouble("player gonee",()))?;
 
     loop {
+      if let Some(ref mut overflow) = self.overflow {
+        let got = overflow.read(&mut buf)
+          .map_err(|e| self.wn.trouble("overflow failed", &e))?;
+        debug!("read from overflow {} {} len={}",
+               &self.player, &self.client, got);
+        if got == 0 { self.overflow = None }
+        buf = &mut buf[got..];
+      }
+
       let next = match pu.read_log().get(self.to_send) {
         Some(next) => next,  None => { break }
       };
       let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len();
-      if next_len > buf.len() { break }
+      if next_len > buf.len() {
+        if buf.len() != orig_wanted { break }
+
+        if self.overflow.is_some() {
+          throw!(self.wn.trouble("overflow mismanaged",()));
+        }
+        self.overflow = {
+          let mut overflow = Vec::with_capacity(next_len);
+          self.wn.write_next(&mut overflow, &next)
+            .map_err(|e| self.wn.trouble("overflow.write_next",&e))?;
+          debug!("overflow {} {}, len={}",
+                 &self.wn.player, &self.wn.client, &overflow.len());
+          Some(io::Cursor::new(overflow.into_boxed_slice()))
+        };
+        continue;
+      }
 
-      // xxx handle overflow by allocating
       self.wn.write_next(&mut buf, &next)
-        .map_err(|e| {
-          error!("UpdateReader.write_next: {} {} {:?}",
-                 &self.player, &self.client, e);
-          e
-        })?;
-
+        .map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?;
     }
 
     let cv = pu.get_cv();
@@ -117,7 +141,7 @@ impl Read for UpdateReader {
       })() == None { return Ok(0) }
 
       ig.c = cv.wait_timeout(ig.c, UPDATE_KEEPALIVE)
-        .map_err(|_| em("poison"))?.0;
+        .map_err(|e| self.wn.trouble("cv / mutex poison",&e))?.0;
 
       write!(buf, "event: commsworking\n\
                    data: server online {} {} G{} K{}\n\n",
@@ -181,6 +205,7 @@ pub fn content(iad : InstanceAccessDetails<ClientId>, gen: Generation)
     UpdateReader {
       need_flush : false,
       keepalives : Wrapping(0),
+      overflow : None,
       gref,
       init_confirmation_send : iter::once(()),
       wn : UpdateReaderWN {