From: Ian Jackson Date: Tue, 29 Mar 2022 20:45:29 +0000 (+0100) Subject: SSE: Redo buffer handling X-Git-Tag: otter-1.0.0~82 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=3fd0e47ad19ed981c62747e844e2e1e3be581fdb;p=otter.git SSE: Redo buffer handling Now we just write into a Vec. This is rather simpler. Signed-off-by: Ian Jackson --- diff --git a/TODO b/TODO index 0d068af3..f9a948ef 100644 --- a/TODO +++ b/TODO @@ -1,4 +1,3 @@ -TODOs in daemon/sse.rs::content content-type for download bundles http2 HEAD requests Build on stable diff --git a/daemon/main.rs b/daemon/main.rs index b1113768..fb0f629b 100644 --- a/daemon/main.rs +++ b/daemon/main.rs @@ -16,6 +16,7 @@ pub mod sse; pub use std::pin::Pin; pub use futures::future; +pub use futures::FutureExt as _; pub use crate::api::InstanceAccess; pub use crate::api::{FatalErrorResponse}; diff --git a/daemon/sse.rs b/daemon/sse.rs index 38ebf477..575ed482 100644 --- a/daemon/sse.rs +++ b/daemon/sse.rs @@ -12,7 +12,6 @@ use super::*; // ---------- basic definitions ---------- const UPDATE_READER_SIZE: usize = 1024*32; -const UPDATE_MAX_FRAMING_SIZE: usize = 200; const UPDATE_KEEPALIVE: Duration = Duration::from_secs(14); const UPDATE_EXPIRE: Duration = Duration::from_secs(66); @@ -25,7 +24,6 @@ struct UpdateReaderWN { struct UpdateReader { wn: UpdateReaderWN, overflow: Option>>, - need_flush: bool, gref: InstanceRef, keepalives: Wrapping, ending_send: Option>>, @@ -33,10 +31,6 @@ struct UpdateReader { } deref_to_field!{UpdateReader, UpdateReaderWN, wn} // no DerefMut -#[derive(Error,Debug)] -#[error("WouldBlock error misreported!")] -struct FlushWouldBlockError{} - impl UpdateReaderWN { #[throws(io::Error)] fn write_next(&mut self, mut buf: &mut U, tz: &Timezone, @@ -63,45 +57,45 @@ impl UpdateReaderWN { } } -#[derive(Debug)] -struct BufForRead<'b> { - csr: io::Cursor<&'b mut [u8]>, +type BufForSend = Vec; + +#[derive(Debug, Default)] +struct BufForRead { + buf: Vec, } -impl Write for BufForRead<'_> { +impl Write for BufForRead { #[throws(io::Error)] - fn write(&mut self, d: &[u8]) -> usize { self.csr.write(d)? } + fn write(&mut self, d: &[u8]) -> usize { self.buf.write(d)? } #[throws(io::Error)] - fn flush(&mut self) { self.csr.flush()? } + fn flush(&mut self) { self.buf.flush()? } } -impl BufForRead<'_> { - fn reset_to_start(&mut self) { self.csr.set_position(0) } - fn generated(&self) -> usize { self.csr.position().try_into().unwrap() } - fn at_start(&self) -> bool { self.generated() == 0 } - fn remaining(&self) -> usize { - self.csr.get_ref().len() - - usize::try_from(self.csr.position()).unwrap() - } +impl BufForRead { + fn reset_to_start(&mut self) { self.buf.truncate(0) } + fn at_start(&self) -> bool { self.buf.len() == 0 } + fn len(&self) -> usize { self.buf.len() } fn copy_from(&mut self, mut read: R) { let rbuf = read.fill_buf().unwrap(); - let did = self.csr.write(rbuf).unwrap(); + let did = self.write(rbuf).unwrap(); read.consume(did); } - #[throws(io::Error)] - fn just_copy_from(&mut self, read: R) -> usize { + fn just_copy_from(mut self, read: R) -> BufForSend { self.copy_from(read); - self.generated() + self.finish() } + + fn finish(self) -> BufForSend { assert!(! self.buf.is_empty()); self.buf } + fn finish_eof() -> BufForSend { vec![] } } trait InfallibleBufRead: BufRead { } impl InfallibleBufRead for io::Cursor where io::Cursor: BufRead { } impl InfallibleBufRead for &mut T where T: InfallibleBufRead { } impl UpdateReader { - async fn read(&mut self, buf: &mut [u8]) -> Result { - let mut buf = BufForRead{ csr: io::Cursor::new(buf) }; - if buf.remaining() == 0 { return Ok(0) } + #[throws(io::Error)] + async fn read(&mut self) -> BufForSend { + let mut buf = BufForRead::default(); if let Some(ref mut ending) = self.ending_send { return buf.just_copy_from(ending); @@ -159,43 +153,20 @@ impl UpdateReader { break } }; - let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len(self.player); - if next_len > buf.remaining() { - if ! buf.at_start() { break } - - if self.overflow.is_some() { - throw!(self.wn.trouble("overflow mismanaged",())); - } - self.overflow = { - let mut overflow = Vec::with_capacity(next_len); - self.wn.write_next(&mut overflow, &iplayer.ipl.tz, &next) - .map_err(|e| self.wn.trouble("overflow.write_next",&e))?; - debug!("overflow {} {}, len={}", - &self.wn.player, &self.wn.client, &overflow.len()); - Some(io::Cursor::new(overflow.into_boxed_slice())) - }; - continue; - } self.wn.write_next(&mut buf, &iplayer.ipl.tz, &next) .map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?; + if buf.len() >= UPDATE_READER_SIZE { return buf.finish() } + let before = next.when - UPDATE_EXPIRE; pu.expire_upto(before); } let cv = pu.get_cv(); - let generated = buf.generated(); - if generated > 0 { - self.need_flush = true; - return Ok(generated) - } - - if self.need_flush { - self.need_flush = false; - return Err(io::Error::new(io::ErrorKind::WouldBlock, - FlushWouldBlockError{})); + if buf.len() > 0 { + return buf.finish(); } if (||{ @@ -203,7 +174,7 @@ impl UpdateReader { let client = ig.clients.get_mut(self.client)?; client.lastseen = Instant::now(); Some(()) - })() == None { return Ok(0) } + })() == None { return BufForRead::finish_eof() } let was_gen = ig.gs.gen; @@ -219,8 +190,7 @@ impl UpdateReader { data: online {} {} G{}\n\n", self.player, self.client, was_gen)?; self.keepalives += Wrapping(1); - self.need_flush = true; - return Ok(buf.generated()); + return buf.finish(); } } @@ -228,9 +198,8 @@ impl UpdateReader { #[throws(Fatal)] pub fn content(iad: InstanceAccessDetails, gen: Generation) - -> Pin - >>> { + -> Pin>>> +{ let client = iad.ident; let update_reader = { @@ -251,7 +220,6 @@ pub fn content(iad: InstanceAccessDetails, gen: Generation) }; UpdateReader { - need_flush: false, keepalives: Wrapping(0), overflow: None, gref, @@ -263,29 +231,12 @@ pub fn content(iad: InstanceAccessDetails, gen: Generation) } }; - Box::pin(futures::stream::try_unfold(update_reader, - |mut update_reader| async { - // TODO change error type here to not be io::Error - // TODO get rid of io::ErrorKind::WouldBlock kludge - // TODO what is the point now of BufForRead? Combine this with that? - // TODO adaptive buffer length - let mut buffer = vec![ 0u8; UPDATE_READER_SIZE ]; - let mut used = 0; - loop { - if used == buffer.len() { break } - - let got = match update_reader.read(&mut buffer[used..]).await { - Err(e) if e.kind() == io::ErrorKind::WouldBlock => { - if used > 0 { break } else { continue } - }, - x => x, - }?; - - used += got; - } - Ok(if used > 0 { - buffer.truncate(used); - Some((Bytes::from(buffer), update_reader)) + Box::pin(futures::stream::try_unfold( + update_reader, |mut update_reader| async + { + let got = update_reader.read().await?; + Ok(if got.len() > 0 { + Some((Bytes::from(got), update_reader)) } else { None })