chiark / gitweb /
SSE: Redo buffer handling
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Tue, 29 Mar 2022 20:45:29 +0000 (21:45 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Tue, 29 Mar 2022 23:54:47 +0000 (00:54 +0100)
Now we just write into a Vec.  This is rather simpler.

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
TODO
daemon/main.rs
daemon/sse.rs

diff --git a/TODO b/TODO
index 0d068af35f9fa99fec8224c5d942d40f35ed4721..f9a948ef064b6a6a359d523aa8f28f4f2379fbae 100644 (file)
--- a/TODO
+++ b/TODO
@@ -1,4 +1,3 @@
-TODOs in daemon/sse.rs::content
 content-type for download bundles
 http2 HEAD requests
 Build on stable
index b1113768021b9d10a5e2ba7a95b2298bab782af1..fb0f629b6c09eba8abd493a629931429b7ba7e18 100644 (file)
@@ -16,6 +16,7 @@ pub mod sse;
 
 pub use std::pin::Pin;
 pub use futures::future;
+pub use futures::FutureExt as _;
 
 pub use crate::api::InstanceAccess;
 pub use crate::api::{FatalErrorResponse};
index 38ebf477bd4a09f67857214182852a64e3bef1e7..575ed4825e1927b420db6af13297d308e3dc35c0 100644 (file)
@@ -12,7 +12,6 @@ use super::*;
 // ---------- basic definitions ----------
 
 const UPDATE_READER_SIZE: usize = 1024*32;
-const UPDATE_MAX_FRAMING_SIZE: usize = 200;
 const UPDATE_KEEPALIVE: Duration = Duration::from_secs(14);
 const UPDATE_EXPIRE: Duration = Duration::from_secs(66);
 
@@ -25,7 +24,6 @@ struct UpdateReaderWN {
 struct UpdateReader {
   wn: UpdateReaderWN,
   overflow: Option<io::Cursor<Box<[u8]>>>,
-  need_flush: bool,
   gref: InstanceRef,
   keepalives: Wrapping<u32>,
   ending_send: Option<io::Cursor<Box<[u8]>>>,
@@ -33,10 +31,6 @@ struct UpdateReader {
 }
 deref_to_field!{UpdateReader, UpdateReaderWN, wn} // no DerefMut
 
-#[derive(Error,Debug)]
-#[error("WouldBlock error misreported!")]
-struct FlushWouldBlockError{}
-
 impl UpdateReaderWN {
   #[throws(io::Error)]
   fn write_next<U>(&mut self, mut buf: &mut U, tz: &Timezone,
@@ -63,45 +57,45 @@ impl UpdateReaderWN {
   }
 }
 
-#[derive(Debug)]
-struct BufForRead<'b> {
-  csr: io::Cursor<&'b mut [u8]>,
+type BufForSend = Vec<u8>;
+
+#[derive(Debug, Default)]
+struct BufForRead {
+  buf: Vec<u8>,
 }
-impl Write for BufForRead<'_> {
+impl Write for BufForRead {
   #[throws(io::Error)]
-  fn write(&mut self, d: &[u8]) -> usize { self.csr.write(d)? }
+  fn write(&mut self, d: &[u8]) -> usize { self.buf.write(d)? }
   #[throws(io::Error)]
-  fn flush(&mut self) { self.csr.flush()? }
+  fn flush(&mut self) { self.buf.flush()? }
 }
-impl BufForRead<'_> {
-  fn reset_to_start(&mut self) { self.csr.set_position(0) }
-  fn generated(&self) -> usize { self.csr.position().try_into().unwrap() }
-  fn at_start(&self) -> bool { self.generated() == 0 }
-  fn remaining(&self) -> usize {
-    self.csr.get_ref().len() -
-    usize::try_from(self.csr.position()).unwrap()
-  }
+impl BufForRead {
+  fn reset_to_start(&mut self) { self.buf.truncate(0) }
+  fn at_start(&self) -> bool { self.buf.len() == 0 }
+  fn len(&self) -> usize { self.buf.len() }
 
   fn copy_from<R: InfallibleBufRead>(&mut self, mut read: R) {
     let rbuf = read.fill_buf().unwrap();
-    let did = self.csr.write(rbuf).unwrap();
+    let did = self.write(rbuf).unwrap();
     read.consume(did);
   }
 
-  #[throws(io::Error)]
-  fn just_copy_from<R: InfallibleBufRead>(&mut self, read: R) -> usize {
+  fn just_copy_from<R: InfallibleBufRead>(mut self, read: R) -> BufForSend {
     self.copy_from(read);
-    self.generated()
+    self.finish()
   }
+
+  fn finish(self) -> BufForSend { assert!(! self.buf.is_empty()); self.buf }
+  fn finish_eof() -> BufForSend { vec![] }
 }
 trait InfallibleBufRead: BufRead { }
 impl<T> InfallibleBufRead for io::Cursor<T> where io::Cursor<T>: BufRead { }
 impl<T> InfallibleBufRead for &mut T where T: InfallibleBufRead { }
 
 impl UpdateReader {
-  async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
-    let mut buf = BufForRead{ csr: io::Cursor::new(buf) };
-    if buf.remaining() == 0 { return Ok(0) }
+  #[throws(io::Error)]
+  async fn read(&mut self) -> BufForSend {
+    let mut buf = BufForRead::default();
 
     if let Some(ref mut ending) = self.ending_send {
       return buf.just_copy_from(ending);
@@ -159,43 +153,20 @@ impl UpdateReader {
           break
         }
       };
-      let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len(self.player);
-      if next_len > buf.remaining() {
-        if ! buf.at_start() { break }
-
-        if self.overflow.is_some() {
-          throw!(self.wn.trouble("overflow mismanaged",()));
-        }
-        self.overflow = {
-          let mut overflow = Vec::with_capacity(next_len);
-          self.wn.write_next(&mut overflow, &iplayer.ipl.tz, &next)
-            .map_err(|e| self.wn.trouble("overflow.write_next",&e))?;
-          debug!("overflow {} {}, len={}",
-                 &self.wn.player, &self.wn.client, &overflow.len());
-          Some(io::Cursor::new(overflow.into_boxed_slice()))
-        };
-        continue;
-      }
 
       self.wn.write_next(&mut buf, &iplayer.ipl.tz, &next)
         .map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?;
 
+      if buf.len() >= UPDATE_READER_SIZE { return buf.finish() }
+
       let before = next.when - UPDATE_EXPIRE;
       pu.expire_upto(before);
     }
 
     let cv = pu.get_cv();
 
-    let generated = buf.generated();
-    if generated > 0 {
-      self.need_flush = true;
-      return Ok(generated)
-    }
-
-    if self.need_flush {
-      self.need_flush = false;
-      return Err(io::Error::new(io::ErrorKind::WouldBlock,
-                                FlushWouldBlockError{}));
+    if buf.len() > 0 {
+      return buf.finish();
     }
 
     if (||{
@@ -203,7 +174,7 @@ impl UpdateReader {
       let client = ig.clients.get_mut(self.client)?;
       client.lastseen = Instant::now();
       Some(())
-    })() == None { return Ok(0) }
+    })() == None { return BufForRead::finish_eof() }
 
     let was_gen = ig.gs.gen;
 
@@ -219,8 +190,7 @@ impl UpdateReader {
                  data: online {} {} G{}\n\n",
            self.player, self.client, was_gen)?;
     self.keepalives += Wrapping(1);
-    self.need_flush = true;
-    return Ok(buf.generated());
+    return buf.finish();
   }
 }
 
@@ -228,9 +198,8 @@ impl UpdateReader {
 
 #[throws(Fatal)]
 pub fn content(iad: InstanceAccessDetails<ClientId>, gen: Generation)
-               -> Pin<Box<dyn futures::Stream<
-                   Item=Result<Bytes, io::Error>
-               >>> {
+               -> Pin<Box<dyn futures::Stream<Item=Result<Bytes, io::Error>>>>
+{
   let client = iad.ident;
 
   let update_reader = {
@@ -251,7 +220,6 @@ pub fn content(iad: InstanceAccessDetails<ClientId>, gen: Generation)
       };
 
     UpdateReader {
-      need_flush: false,
       keepalives: Wrapping(0),
       overflow: None,
       gref,
@@ -263,29 +231,12 @@ pub fn content(iad: InstanceAccessDetails<ClientId>, gen: Generation)
     }
   };
 
-  Box::pin(futures::stream::try_unfold(update_reader,
-                                       |mut update_reader| async {
-    // TODO change error type here to not be io::Error
-    // TODO get rid of io::ErrorKind::WouldBlock kludge
-    // TODO what is the point now of BufForRead?  Combine this with that?
-    // TODO adaptive buffer length
-    let mut buffer = vec![ 0u8; UPDATE_READER_SIZE ];
-    let mut used = 0;
-    loop {
-      if used == buffer.len() { break }
-
-      let got = match update_reader.read(&mut buffer[used..]).await {
-        Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
-          if used > 0 { break } else { continue }
-        },
-        x => x,
-      }?;
-
-      used += got;
-    }
-    Ok(if used > 0 {
-      buffer.truncate(used);
-      Some((Bytes::from(buffer), update_reader))
+  Box::pin(futures::stream::try_unfold(
+    update_reader, |mut update_reader| async
+  {
+    let got = update_reader.read().await?;
+    Ok(if got.len() > 0 {
+      Some((Bytes::from(got), update_reader))
     } else {
       None
     })