From: Ian Jackson Date: Mon, 21 Jun 2021 01:13:27 +0000 (+0100) Subject: sse: Introduce BufForRead X-Git-Tag: otter-1.0.0~139 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=ff17d74c65a1e87fa57aa8bc7256eb236663e2b2;p=otter.git sse: Introduce BufForRead This abstracts away the &mut u8 and orig_buf from Updatereader. This will be helpful when UpdateReader becomes AsyncRead rather than Read. Signed-off-by: Ian Jackson --- diff --git a/src/sse.rs b/src/sse.rs index 9f897444..8f33e5a2 100644 --- a/src/sse.rs +++ b/src/sse.rs @@ -69,18 +69,53 @@ impl UpdateReaderWN { } } +#[derive(Debug)] +struct BufForRead<'b> { + csr: io::Cursor<&'b mut [u8]>, +} +impl Write for BufForRead<'_> { + #[throws(io::Error)] + fn write(&mut self, d: &[u8]) -> usize { self.csr.write(d)? } + #[throws(io::Error)] + fn flush(&mut self) { self.csr.flush()? } +} +impl BufForRead<'_> { + fn reset_to_start(&mut self) { self.csr.set_position(0) } + fn generated(&self) -> usize { self.csr.position().try_into().unwrap() } + fn at_start(&self) -> bool { self.generated() == 0 } + fn remaining(&self) -> usize { + self.csr.get_ref().len() - + usize::try_from(self.csr.position()).unwrap() + } + + fn copy_from(&mut self, mut read: R) { + let rbuf = read.fill_buf().unwrap(); + let did = self.csr.write(rbuf).unwrap(); + read.consume(did); + } + + #[throws(io::Error)] + fn just_copy_from(&mut self, read: R) -> usize { + self.copy_from(read); + self.generated() + } +} +trait InfallibleBufRead: BufRead { } +impl InfallibleBufRead for io::Cursor where io::Cursor: BufRead { } +impl InfallibleBufRead for &mut T where T: InfallibleBufRead { } + impl Read for UpdateReader { - fn read(&mut self, orig_buf: &mut [u8]) -> Result { + fn read(&mut self, buf: &mut [u8]) -> Result { + let mut buf = BufForRead{ csr: io::Cursor::new(buf) }; + if buf.remaining() == 0 { return Ok(0) } + if let Some(ref mut ending) = self.ending_send { - return ending.read(orig_buf); + return buf.just_copy_from(ending); } let mut ig = self.gref.lock() .map_err(|e| self.trouble("game corrupted", &e))?; - let orig_wanted = orig_buf.len(); - let mut buf = &mut *orig_buf; - if self.init_confirmation_send.next().is_some() { write!(buf, "event: commsworking\n\ data: init {} {} G{}\n\n", @@ -96,7 +131,8 @@ impl Read for UpdateReader { .into_bytes().into_boxed_slice(); assert_eq!(self.ending_send, None); let ending = self.ending_send.get_or_insert(io::Cursor::new(data)); - return ending.read(orig_buf); + buf.reset_to_start(); + return buf.just_copy_from(ending); }, }; @@ -104,19 +140,20 @@ impl Read for UpdateReader { loop { if let Some(ref mut overflow) = self.overflow { - let got = overflow.read(&mut buf) - .map_err(|e| self.wn.trouble("overflow failed", &e))?; - debug!("read from overflow {} {} len={}", - &self.player, &self.client, got); - if got == 0 { self.overflow = None } - buf = &mut buf[got..]; + buf.copy_from(&mut *overflow); + if usize::try_from(overflow.position()).unwrap() + == overflow.get_ref().len() { + self.overflow = None + } + debug!("read from overflow {} {}", + &self.player, &self.client); } let next = match pu.read_log().get(self.to_send) { Some(next) => next, None => { if self.to_send < pu.read_log().front_index() - && buf.len() == orig_wanted { + && buf.at_start() { write!(buf, "event: updates-expired\ndata: {}\n\n", self.to_send) .map_err(|e| self.wn.trouble("notify updates expired", &e))?; @@ -129,8 +166,8 @@ impl Read for UpdateReader { } }; let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len(self.player); - if next_len > buf.len() { - if buf.len() != orig_wanted { break } + if next_len > buf.remaining() { + if ! buf.at_start() { break } if self.overflow.is_some() { throw!(self.wn.trouble("overflow mismanaged",())); @@ -156,7 +193,7 @@ impl Read for UpdateReader { let cv = pu.get_cv(); loop { - let generated = orig_wanted - buf.len(); + let generated = buf.generated(); if generated > 0 { self.need_flush = true; return Ok(generated)