chiark / gitweb /
packetframe: Restructuring etc.
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Thu, 15 Apr 2021 01:01:49 +0000 (02:01 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Fri, 23 Apr 2021 18:32:07 +0000 (19:32 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/lib.rs
src/packetframe.rs

index f4cd9207aef5ece6fefd22804078a93330c85b2f..215c069c37364bbb56b8e171ce3113e3b7760347 100644 (file)
@@ -3,6 +3,7 @@
 // There is NO WARRANTY.
 
 #![feature(min_type_alias_impl_trait)]
+#![feature(io_into_inner_error_parts)]
 #![allow(clippy::redundant_closure_call)]
 
 pub mod imports;
index 46007997c1c7e7b0145d5adfe211dee6d98aefdb..9876f04d90c34eb6eb380f3289671c06996821a1 100644 (file)
@@ -17,6 +17,7 @@ use crate::prelude::*;
 
 const CHUNK_MAX: u16 = 65534;
 const CHUNK_ERR: u16 = 65535;
+const CHUNK_DEF: u16 = 8192;
 
 type BO = BigEndian;
 
@@ -34,9 +35,24 @@ pub struct ReadFrame<'r,R:BufRead> {
   fr: Result<&'r mut FrameReader<R>, Option<SenderError>>,
 }
 
+#[derive(Debug)]
+pub struct FrameWriter<W:Write> {
+  inner: Fuse<W>,
+  in_frame: Option<()>,
+}
+
+#[derive(Debug)]
+struct WriteFrameRaw<'w,W:Write> {
+  fw: &'w mut FrameWriter<W>,
+}
+#[derive(Debug)]
+pub struct WriteFrame<'w,W:Write> {
+  buf: BufWriter<WriteFrameRaw<'w,W>>,
+}
+
 #[derive(Debug,Copy,Clone,Error)]
 #[error("error occurred at peer, during construction of frame data")]
-struct SenderError;
+pub struct SenderError;
 
 #[derive(Clone,Error,Debug)]
 pub struct Broken {
@@ -101,13 +117,41 @@ impl Display for Broken {
 }
 
 impl<R:BufRead> FrameReader<R> {
-  fn frame<'r>(&'r mut self) -> ReadFrame<'r,R> { ReadFrame {
-    fr: Ok(self),
-  } }
+  #[throws(io::Error)]
+  pub fn new_frame<'r>(&'r mut self) -> ReadFrame<'r,R> {
+    if self.in_frame.is_some() {
+      let mut buf = vec![0u8; CHUNK_DEF.into()];
+      while self.in_frame.is_some() {
+        let _: Result<_, SenderError> = self.do_read(&mut buf)?;
+      }
+    }
+    self.in_frame = Some(0);
+    ReadFrame { fr: Ok(self) }
+  }
+
+  fn do_read(&mut self, buf: &mut [u8]) ->
+    Result<Result<usize, SenderError>, io::Error>
+  {
+    assert_ne!(buf.len(), 0);
+    let remaining = self.in_frame.as_mut().unwrap();
+    if *remaining == 0 {
+      *remaining = match self.inner.read_u16::<BO>()? {
+        0         => return Ok(Ok(0)),
+        CHUNK_ERR => return Ok(Err(SenderError)),
+        x         => x as usize,
+      }
+    }
+
+    let n = min(buf.len(), *remaining);
+    let r = self.inner.read(&mut buf[0..n])?;
+    assert!(r <= n);
+    *remaining -= n;
+    Ok(Ok(r))
+  }
 
   #[throws(MgmtChannelReadError)]
   pub fn read_rmp<T:DeserializeOwned>(&mut self) -> T {
-    let mut frame = self.frame();
+    let mut frame = self.new_frame()?;
     rmp_serde::decode::from_read(&mut frame)
       .map_err(|e| MgmtChannelReadError::Parse(format!("{}", &e)))?
   }
@@ -117,34 +161,71 @@ impl<'r, R:BufRead> Read for ReadFrame<'r, R> {
   #[throws(io::Error)]
   fn read(&mut self, buf: &mut [u8]) -> usize {
     if buf.len() == 0 { return 0 }
-    loop { match self.fr {
+    match self.fr {
+      Ok(ref mut fr) => fr,
       Err(None) => return 0,
-      Err(Some(e)) => throw!(e),
-      Ok(ref mut fr) => {
-        if fr.in_frame.is_none() || fr.in_frame == Some(0) {
-          match match fr.inner.read_u16::<BO>()? {
-            0         => Err(None),
-            CHUNK_ERR => Err(Some(SenderError)),
-            x         => Ok(x as usize),
-          } {
-            Err(done) => {
-              fr.in_frame = None;
-              self.fr = Err(done);
-              continue;
-            },
-            Ok(in_chunk) => {
-              fr.in_frame = Some(in_chunk);
-            }
-          };
-        }
-        let remaining = fr.in_frame.as_mut().unwrap();
-
-        let n = min(buf.len(), *remaining);
-        let r = fr.inner.read(&mut buf[0..n])?;
-        assert!(r <= n);
-        *remaining -= n;
-        break r;
-      }
-    } }
+      Err(Some(e@ SenderError)) => throw!(e),
+    }
+      .do_read(buf)?
+      .map_err(|e: SenderError| { self.fr = Err(Some(e)); e })
+      ?
+  }
+}
+
+impl<W:Write> FrameWriter<W> {
+  #[throws(io::Error)]
+  pub fn new_frame<'w>(&'w mut self) -> WriteFrame<'w,W> {
+    self.tidy(Err(SenderError))?;
+    self.in_frame = Some(());
+    let raw = WriteFrameRaw { fw: self };
+    let buf = BufWriter::with_capacity(CHUNK_DEF.into(), raw);
+    WriteFrame { buf }
+  }
+
+  #[throws(io::Error)]
+  fn tidy(&mut self, how: Result<(), SenderError>) {
+    if let Some(_) = self.in_frame {
+      self.inner.write_u16::<BO>(match how {
+        Ok(()) => 0,
+        Err(SenderError) => CHUNK_ERR,
+      })?;
+      self.in_frame = None;
+    }
+  }
+}
+
+impl<'w,W:Write> WriteFrame<'w,W> {
+  #[throws(io::Error)]
+  pub fn finish_with(self, how: Result<(), SenderError>) {
+    self.buf
+      .into_inner()
+      .map_err(|e| e.into_error())?
+      .fw
+      .tidy(how)?
+  }
+
+  #[throws(io::Error)]
+  pub fn finish(self) { self.finish_with(Ok(()))? }
+}
+impl<'w,W:Write> WriteFrameRaw<'w,W> {
+}
+impl<'w,W:Write> Drop for WriteFrameRaw<'w,W> {
+  fn drop(&mut self) {
+    self.fw.tidy(Err(SenderError))
+      .unwrap_or_else(|_: io::Error| () /* Fuse will replicate this */);
+  }
+}
+impl<'r, R:Write> Write for WriteFrameRaw<'r, R> {
+  #[throws(io::Error)]
+  fn write(&mut self, buf: &[u8]) -> usize {
+    let now = min(buf.len(), CHUNK_MAX.into());
+    self.fw.inner.write_u16::<BO>(now.try_into().unwrap())?;
+    self.fw.inner.write(&buf[0..now])?;
+    now
+  }
+
+  #[throws(io::Error)]
+  fn flush(&mut self) {
+    self.fw.inner.flush()?
   }
 }