const CHUNK_MAX: u16 = 65534;
const CHUNK_ERR: u16 = 65535;
+const CHUNK_DEF: u16 = 8192;
type BO = BigEndian;
fr: Result<&'r mut FrameReader<R>, Option<SenderError>>,
}
+#[derive(Debug)]
+pub struct FrameWriter<W:Write> {
+ inner: Fuse<W>,
+ in_frame: Option<()>,
+}
+
+#[derive(Debug)]
+struct WriteFrameRaw<'w,W:Write> {
+ fw: &'w mut FrameWriter<W>,
+}
+#[derive(Debug)]
+pub struct WriteFrame<'w,W:Write> {
+ buf: BufWriter<WriteFrameRaw<'w,W>>,
+}
+
#[derive(Debug,Copy,Clone,Error)]
#[error("error occurred at peer, during construction of frame data")]
-struct SenderError;
+pub struct SenderError;
#[derive(Clone,Error,Debug)]
pub struct Broken {
}
impl<R:BufRead> FrameReader<R> {
- fn frame<'r>(&'r mut self) -> ReadFrame<'r,R> { ReadFrame {
- fr: Ok(self),
- } }
+ #[throws(io::Error)]
+ pub fn new_frame<'r>(&'r mut self) -> ReadFrame<'r,R> {
+ if self.in_frame.is_some() {
+ let mut buf = vec![0u8; CHUNK_DEF.into()];
+ while self.in_frame.is_some() {
+ let _: Result<_, SenderError> = self.do_read(&mut buf)?;
+ }
+ }
+ self.in_frame = Some(0);
+ ReadFrame { fr: Ok(self) }
+ }
+
+ fn do_read(&mut self, buf: &mut [u8]) ->
+ Result<Result<usize, SenderError>, io::Error>
+ {
+ assert_ne!(buf.len(), 0);
+ let remaining = self.in_frame.as_mut().unwrap();
+ if *remaining == 0 {
+ *remaining = match self.inner.read_u16::<BO>()? {
+ 0 => return Ok(Ok(0)),
+ CHUNK_ERR => return Ok(Err(SenderError)),
+ x => x as usize,
+ }
+ }
+
+ let n = min(buf.len(), *remaining);
+ let r = self.inner.read(&mut buf[0..n])?;
+ assert!(r <= n);
+ *remaining -= n;
+ Ok(Ok(r))
+ }
#[throws(MgmtChannelReadError)]
pub fn read_rmp<T:DeserializeOwned>(&mut self) -> T {
- let mut frame = self.frame();
+ let mut frame = self.new_frame()?;
rmp_serde::decode::from_read(&mut frame)
.map_err(|e| MgmtChannelReadError::Parse(format!("{}", &e)))?
}
#[throws(io::Error)]
fn read(&mut self, buf: &mut [u8]) -> usize {
if buf.len() == 0 { return 0 }
- loop { match self.fr {
+ match self.fr {
+ Ok(ref mut fr) => fr,
Err(None) => return 0,
- Err(Some(e)) => throw!(e),
- Ok(ref mut fr) => {
- if fr.in_frame.is_none() || fr.in_frame == Some(0) {
- match match fr.inner.read_u16::<BO>()? {
- 0 => Err(None),
- CHUNK_ERR => Err(Some(SenderError)),
- x => Ok(x as usize),
- } {
- Err(done) => {
- fr.in_frame = None;
- self.fr = Err(done);
- continue;
- },
- Ok(in_chunk) => {
- fr.in_frame = Some(in_chunk);
- }
- };
- }
- let remaining = fr.in_frame.as_mut().unwrap();
-
- let n = min(buf.len(), *remaining);
- let r = fr.inner.read(&mut buf[0..n])?;
- assert!(r <= n);
- *remaining -= n;
- break r;
- }
- } }
+ Err(Some(e@ SenderError)) => throw!(e),
+ }
+ .do_read(buf)?
+ .map_err(|e: SenderError| { self.fr = Err(Some(e)); e })
+ ?
+ }
+}
+
+impl<W:Write> FrameWriter<W> {
+ #[throws(io::Error)]
+ pub fn new_frame<'w>(&'w mut self) -> WriteFrame<'w,W> {
+ self.tidy(Err(SenderError))?;
+ self.in_frame = Some(());
+ let raw = WriteFrameRaw { fw: self };
+ let buf = BufWriter::with_capacity(CHUNK_DEF.into(), raw);
+ WriteFrame { buf }
+ }
+
+ #[throws(io::Error)]
+ fn tidy(&mut self, how: Result<(), SenderError>) {
+ if let Some(_) = self.in_frame {
+ self.inner.write_u16::<BO>(match how {
+ Ok(()) => 0,
+ Err(SenderError) => CHUNK_ERR,
+ })?;
+ self.in_frame = None;
+ }
+ }
+}
+
+impl<'w,W:Write> WriteFrame<'w,W> {
+ #[throws(io::Error)]
+ pub fn finish_with(self, how: Result<(), SenderError>) {
+ self.buf
+ .into_inner()
+ .map_err(|e| e.into_error())?
+ .fw
+ .tidy(how)?
+ }
+
+ #[throws(io::Error)]
+ pub fn finish(self) { self.finish_with(Ok(()))? }
+}
+impl<'w,W:Write> WriteFrameRaw<'w,W> {
+}
+impl<'w,W:Write> Drop for WriteFrameRaw<'w,W> {
+ fn drop(&mut self) {
+ self.fw.tidy(Err(SenderError))
+ .unwrap_or_else(|_: io::Error| () /* Fuse will replicate this */);
+ }
+}
+impl<'r, R:Write> Write for WriteFrameRaw<'r, R> {
+ #[throws(io::Error)]
+ fn write(&mut self, buf: &[u8]) -> usize {
+ let now = min(buf.len(), CHUNK_MAX.into());
+ self.fw.inner.write_u16::<BO>(now.try_into().unwrap())?;
+ self.fw.inner.write(&buf[0..now])?;
+ now
+ }
+
+ #[throws(io::Error)]
+ fn flush(&mut self) {
+ self.fw.inner.flush()?
}
}