From: Ian Jackson Date: Thu, 15 Apr 2021 01:01:49 +0000 (+0100) Subject: packetframe: Restructuring etc. X-Git-Tag: otter-0.6.0~582 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=a4b8ae11573daa987dab2a9b10ff96de8cce80ab;p=otter.git packetframe: Restructuring etc. Signed-off-by: Ian Jackson --- diff --git a/src/lib.rs b/src/lib.rs index f4cd9207..215c069c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,7 @@ // There is NO WARRANTY. #![feature(min_type_alias_impl_trait)] +#![feature(io_into_inner_error_parts)] #![allow(clippy::redundant_closure_call)] pub mod imports; diff --git a/src/packetframe.rs b/src/packetframe.rs index 46007997..9876f04d 100644 --- a/src/packetframe.rs +++ b/src/packetframe.rs @@ -17,6 +17,7 @@ use crate::prelude::*; const CHUNK_MAX: u16 = 65534; const CHUNK_ERR: u16 = 65535; +const CHUNK_DEF: u16 = 8192; type BO = BigEndian; @@ -34,9 +35,24 @@ pub struct ReadFrame<'r,R:BufRead> { fr: Result<&'r mut FrameReader, Option>, } +#[derive(Debug)] +pub struct FrameWriter { + inner: Fuse, + in_frame: Option<()>, +} + +#[derive(Debug)] +struct WriteFrameRaw<'w,W:Write> { + fw: &'w mut FrameWriter, +} +#[derive(Debug)] +pub struct WriteFrame<'w,W:Write> { + buf: BufWriter>, +} + #[derive(Debug,Copy,Clone,Error)] #[error("error occurred at peer, during construction of frame data")] -struct SenderError; +pub struct SenderError; #[derive(Clone,Error,Debug)] pub struct Broken { @@ -101,13 +117,41 @@ impl Display for Broken { } impl FrameReader { - fn frame<'r>(&'r mut self) -> ReadFrame<'r,R> { ReadFrame { - fr: Ok(self), - } } + #[throws(io::Error)] + pub fn new_frame<'r>(&'r mut self) -> ReadFrame<'r,R> { + if self.in_frame.is_some() { + let mut buf = vec![0u8; CHUNK_DEF.into()]; + while self.in_frame.is_some() { + let _: Result<_, SenderError> = self.do_read(&mut buf)?; + } + } + self.in_frame = Some(0); + ReadFrame { fr: Ok(self) } + } + + fn do_read(&mut self, buf: &mut [u8]) -> + Result, io::Error> + { + assert_ne!(buf.len(), 0); + let remaining = self.in_frame.as_mut().unwrap(); + if *remaining == 0 { + *remaining = match self.inner.read_u16::()? { + 0 => return Ok(Ok(0)), + CHUNK_ERR => return Ok(Err(SenderError)), + x => x as usize, + } + } + + let n = min(buf.len(), *remaining); + let r = self.inner.read(&mut buf[0..n])?; + assert!(r <= n); + *remaining -= n; + Ok(Ok(r)) + } #[throws(MgmtChannelReadError)] pub fn read_rmp(&mut self) -> T { - let mut frame = self.frame(); + let mut frame = self.new_frame()?; rmp_serde::decode::from_read(&mut frame) .map_err(|e| MgmtChannelReadError::Parse(format!("{}", &e)))? } @@ -117,34 +161,71 @@ impl<'r, R:BufRead> Read for ReadFrame<'r, R> { #[throws(io::Error)] fn read(&mut self, buf: &mut [u8]) -> usize { if buf.len() == 0 { return 0 } - loop { match self.fr { + match self.fr { + Ok(ref mut fr) => fr, Err(None) => return 0, - Err(Some(e)) => throw!(e), - Ok(ref mut fr) => { - if fr.in_frame.is_none() || fr.in_frame == Some(0) { - match match fr.inner.read_u16::()? { - 0 => Err(None), - CHUNK_ERR => Err(Some(SenderError)), - x => Ok(x as usize), - } { - Err(done) => { - fr.in_frame = None; - self.fr = Err(done); - continue; - }, - Ok(in_chunk) => { - fr.in_frame = Some(in_chunk); - } - }; - } - let remaining = fr.in_frame.as_mut().unwrap(); - - let n = min(buf.len(), *remaining); - let r = fr.inner.read(&mut buf[0..n])?; - assert!(r <= n); - *remaining -= n; - break r; - } - } } + Err(Some(e@ SenderError)) => throw!(e), + } + .do_read(buf)? + .map_err(|e: SenderError| { self.fr = Err(Some(e)); e }) + ? + } +} + +impl FrameWriter { + #[throws(io::Error)] + pub fn new_frame<'w>(&'w mut self) -> WriteFrame<'w,W> { + self.tidy(Err(SenderError))?; + self.in_frame = Some(()); + let raw = WriteFrameRaw { fw: self }; + let buf = BufWriter::with_capacity(CHUNK_DEF.into(), raw); + WriteFrame { buf } + } + + #[throws(io::Error)] + fn tidy(&mut self, how: Result<(), SenderError>) { + if let Some(_) = self.in_frame { + self.inner.write_u16::(match how { + Ok(()) => 0, + Err(SenderError) => CHUNK_ERR, + })?; + self.in_frame = None; + } + } +} + +impl<'w,W:Write> WriteFrame<'w,W> { + #[throws(io::Error)] + pub fn finish_with(self, how: Result<(), SenderError>) { + self.buf + .into_inner() + .map_err(|e| e.into_error())? + .fw + .tidy(how)? + } + + #[throws(io::Error)] + pub fn finish(self) { self.finish_with(Ok(()))? } +} +impl<'w,W:Write> WriteFrameRaw<'w,W> { +} +impl<'w,W:Write> Drop for WriteFrameRaw<'w,W> { + fn drop(&mut self) { + self.fw.tidy(Err(SenderError)) + .unwrap_or_else(|_: io::Error| () /* Fuse will replicate this */); + } +} +impl<'r, R:Write> Write for WriteFrameRaw<'r, R> { + #[throws(io::Error)] + fn write(&mut self, buf: &[u8]) -> usize { + let now = min(buf.len(), CHUNK_MAX.into()); + self.fw.inner.write_u16::(now.try_into().unwrap())?; + self.fw.inner.write(&buf[0..now])?; + now + } + + #[throws(io::Error)] + fn flush(&mut self) { + self.fw.inner.flush()? } }