chiark / gitweb /
sse: Introduce BufForRead
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 21 Jun 2021 01:13:27 +0000 (02:13 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 19 Mar 2022 16:01:02 +0000 (16:01 +0000)
This abstracts away the &mut u8 and orig_buf from Updatereader.

This will be helpful when UpdateReader becomes AsyncRead rather than
Read.

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/sse.rs

index 9f8974446ab71659d172c2bafe39164da6af9481..8f33e5a20a0c797a4b15aae658b425797e1f2dfa 100644 (file)
@@ -69,18 +69,53 @@ impl UpdateReaderWN {
   }
 }
 
+#[derive(Debug)]
+struct BufForRead<'b> {
+  csr: io::Cursor<&'b mut [u8]>,
+}
+impl Write for BufForRead<'_> {
+  #[throws(io::Error)]
+  fn write(&mut self, d: &[u8]) -> usize { self.csr.write(d)? }
+  #[throws(io::Error)]
+  fn flush(&mut self) { self.csr.flush()? }
+}
+impl BufForRead<'_> {
+  fn reset_to_start(&mut self) { self.csr.set_position(0) }
+  fn generated(&self) -> usize { self.csr.position().try_into().unwrap() }
+  fn at_start(&self) -> bool { self.generated() == 0 }
+  fn remaining(&self) -> usize {
+    self.csr.get_ref().len() -
+    usize::try_from(self.csr.position()).unwrap()
+  }
+
+  fn copy_from<R: InfallibleBufRead>(&mut self, mut read: R) {
+    let rbuf = read.fill_buf().unwrap();
+    let did = self.csr.write(rbuf).unwrap();
+    read.consume(did);
+  }
+
+  #[throws(io::Error)]
+  fn just_copy_from<R: InfallibleBufRead>(&mut self, read: R) -> usize {
+    self.copy_from(read);
+    self.generated()
+  }
+}
+trait InfallibleBufRead: BufRead { }
+impl<T> InfallibleBufRead for io::Cursor<T> where io::Cursor<T>: BufRead { }
+impl<T> InfallibleBufRead for &mut T where T: InfallibleBufRead { }
+
 impl Read for UpdateReader {
-  fn read(&mut self, orig_buf: &mut [u8]) -> Result<usize, io::Error> {
+  fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
+    let mut buf = BufForRead{ csr: io::Cursor::new(buf) };
+    if buf.remaining() == 0 { return Ok(0) }
+
     if let Some(ref mut ending) = self.ending_send {
-      return ending.read(orig_buf);
+      return buf.just_copy_from(ending);
     }
 
     let mut ig = self.gref.lock()
       .map_err(|e| self.trouble("game corrupted", &e))?;
 
-    let orig_wanted = orig_buf.len();
-    let mut buf = &mut *orig_buf;
-
     if self.init_confirmation_send.next().is_some() {
       write!(buf, "event: commsworking\n\
                    data: init {} {} G{}\n\n",
@@ -96,7 +131,8 @@ impl Read for UpdateReader {
           .into_bytes().into_boxed_slice();
         assert_eq!(self.ending_send, None);
         let ending = self.ending_send.get_or_insert(io::Cursor::new(data));
-        return ending.read(orig_buf);
+        buf.reset_to_start();
+        return buf.just_copy_from(ending);
       },
     };
 
@@ -104,19 +140,20 @@ impl Read for UpdateReader {
 
     loop {
       if let Some(ref mut overflow) = self.overflow {
-        let got = overflow.read(&mut buf)
-          .map_err(|e| self.wn.trouble("overflow failed", &e))?;
-        debug!("read from overflow {} {} len={}",
-               &self.player, &self.client, got);
-        if got == 0 { self.overflow = None }
-        buf = &mut buf[got..];
+        buf.copy_from(&mut *overflow);
+        if usize::try_from(overflow.position()).unwrap()
+        == overflow.get_ref().len() {
+          self.overflow = None
+        }
+        debug!("read from overflow {} {}",
+               &self.player, &self.client);
       }
 
       let next = match pu.read_log().get(self.to_send) {
         Some(next) => next,
         None => {
           if self.to_send < pu.read_log().front_index()
-          && buf.len() == orig_wanted {
+          && buf.at_start() {
             write!(buf, "event: updates-expired\ndata: {}\n\n",
                    self.to_send)
               .map_err(|e| self.wn.trouble("notify updates expired", &e))?;
@@ -129,8 +166,8 @@ impl Read for UpdateReader {
         }
       };
       let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len(self.player);
-      if next_len > buf.len() {
-        if buf.len() != orig_wanted { break }
+      if next_len > buf.remaining() {
+        if ! buf.at_start() { break }
 
         if self.overflow.is_some() {
           throw!(self.wn.trouble("overflow mismanaged",()));
@@ -156,7 +193,7 @@ impl Read for UpdateReader {
     let cv = pu.get_cv();
 
     loop {
-      let generated = orig_wanted - buf.len();
+      let generated = buf.generated();
       if generated > 0 {
         self.need_flush = true;
         return Ok(generated)