chiark / gitweb /
completely redo check() and main client loop
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 1 Aug 2021 23:50:56 +0000 (00:50 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 1 Aug 2021 23:50:56 +0000 (00:50 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs
src/prelude.rs
src/slip.rs

index d6ee158a967a4a27d776d1630a725ed0313d17c9..425b82a1f831d6664590a8d12e8653909db86033 100644 (file)
@@ -175,7 +175,8 @@ async fn run_client<C:HCC>(
 
   let tx_stream = ipif.stdout.take().unwrap();
   let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END);
-  let mut tx_defer = None;
+  let mut packets: VecDeque<Box<[u8]>> = default();
+  let mut upbound = Frames::default();
 
   let stream_for_rx = ipif.stdin .take().unwrap();
 
@@ -187,61 +188,39 @@ async fn run_client<C:HCC>(
   async {
     loop {
       select! {
-        packet = async {
-          // cancellation safety: if this future is polled, we might
-          // move out of tx_defer, but then we will be immediately
-          // ready and yield it inot packet
-          if let Some(y) = tx_defer.take() {
-            Ok(Some(y))
-          } else {
-            tx_stream.next_segment().await
-          }
+        data = tx_stream.next_segment(),
+        if packets.is_empty() =>
+        {
+          let data =
+            data.context("read from ipif")?
+            .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
+          //            eprintln!("packet={:x?}", &packet);
+
+          packets = check
+            ::<_, true>(ic.mtu, &data, |e| match e {
+              e => error!("PACKET ERROR {}", e), // xxx
+            }).into();
         },
-        if tx_defer.is_none() &&
-           reqs.len() < ic.max_requests_outstanding.sat() =>
+
+        _ = async { },
+        if ! upbound.tried_full() &&
+           ! packets.is_empty() =>
         {
-          let mut upbound = Frames::default();
-          let mut to_process: Option<Result<Option<Vec<u8>>,_>>
-            = Some(packet);
-          while let Some(packet) = to_process.take() {
-            let mut packet =
-              packet.context("read from ipif")?
-              .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
-//            eprintln!("packet={:x?}", &packet);
-            if let Ok(()) = check_checkmtu_mimeswap
-              ::<true>(ic.mtu, &mut packet)
-            {
-              match upbound.add(ic.max_batch_up, packet) {
-                Err(packet) => {
-                  tx_defer = Some(packet);
-                  break;
-                }
-                Ok(()) => { },
-              };
-              // we rely oin `next_segment` being cancellation-safe,
-              // which isn't documented as true but seems reasonably safe
-              pin!{ let next_segment = tx_stream.next_segment(); }
-              to_process = match poll!(next_segment) {
-                Poll::Ready(p) => Some(p),
-                Poll::Pending => None,
-              };
+          while let Some(packet) = packets.pop_front() {
+            match upbound.add(ic.max_batch_up, packet.into()/*xxx*/) {
+              Err(packet) => { packets.push_front(packet.into()/*xxx*/); break; }
+              Ok(()) => { },
             }
           }
-          assert!( to_process.is_none() );
-          dbg!(&reqs.len(), &upbound);
-
-          //: impl futures::Stream<Cow<&[u8]>>
-          submit_request(&c, &mut reqs, upbound.into())?;
-        }
+        },
 
-        () = async { },
+        _ = async { },
         if reqs.len() < ic.target_requests_outstanding.sat() ||
-           (tx_defer.is_some() &&
-            reqs.len() < ic.max_requests_outstanding.sat()) =>
+          (reqs.len() < ic.max_requests_outstanding.sat() &&
+           ! upbound.is_empty()) =>
         {
-          let upbound = tx_defer.take().into_iter().collect_vec();
-          submit_request(&c, &mut reqs, upbound)?;
-        }
+          submit_request(&c, &mut reqs, mem::take(&mut upbound).into())?;
+        },
 
         (got, goti, _) = async { future::select_all(&mut reqs).await },
           if ! reqs.is_empty() =>
index d87d4fb5808f422ddcbbabd93adf7b7af171e1aa..2b377d9131d84589fdabc642c5c88a5f6c5763c6 100644 (file)
@@ -3,7 +3,7 @@
 // There is NO WARRANTY.
 
 pub use std::array;
-pub use std::collections::{BTreeSet, HashMap};
+pub use std::collections::{BTreeSet, HashMap, VecDeque};
 pub use std::convert::{TryFrom, TryInto};
 pub use std::borrow::Cow;
 pub use std::cmp::{min, max};
index 357a8908a7e9572403146091301587cdae7403fb..1713057cb156dbaa8d050662859e32d8ae3e7553 100644 (file)
@@ -6,44 +6,61 @@ use crate::prelude::*;
 
 pub static SLIP_END_SLICE: &[u8] = &[SLIP_END];
 
-#[derive(Error,Debug,Copy,Clone)]
+#[derive(Error,Debug,Copy,Clone,Eq,PartialEq)]
 pub enum PacketError {
+  #[error("empty packet")]                 Empty,
   #[error("MTU exceeded ({len} > {mtu})")] MTU { len: usize, mtu: u32 },
   #[error("Invalid SLIP escape sequence")] SLIP,
 }
 
-#[throws(PacketError)]
-pub fn check_checkmtu_mimeswap<const TO_MIME: bool>
-  (mtu: u32, data: &mut [u8])
+pub fn check<EH, const TO_MIME: bool>
+  (mtu: u32, data: &[u8], mut error_handler: EH)
+   -> Vec<Box<[u8]>>
+where EH: FnMut(PacketError)
 {
 //  eprintln!("before: {:?}", DumpHex(data));
 
-  for mut packet in data.split_mut(|&c| c == SLIP_END) {
-    if packet.len() > mtu.sat() {
-      throw!(PacketError::MTU { len: packet.len(), mtu })
-    }
+  let mut out = vec![];
 
-    while let Some((i, was_mime)) = packet.iter().enumerate().find_map(
-      |(i,&c)| match c {
-        SLIP_MIME_ESC => Some((i,true)),
-        SLIP_ESC      => Some((i,false)),
-        _ => None,
+  for packet in data.split(|&c| c == SLIP_END) {
+    match (||{
+      if packet.len() == 0 {
+        throw!(PacketError::Empty)
+      }
+      if packet.len() > mtu.sat() {
+        throw!(PacketError::MTU { len: packet.len(), mtu });
       }
-    ) {
-      packet[i] = if was_mime { SLIP_ESC } else { SLIP_MIME_ESC };
-      if was_mime != TO_MIME {
-        match packet.get(i+1) {
-          Some(&SLIP_ESC_END) |
-          Some(&SLIP_ESC_ESC) => Ok(()),
-          _ => throw!(PacketError::SLIP),
-        }?;
-        packet = &mut packet[i+2 ..];
-      } else {
-        packet = &mut packet[i+1 ..];
+
+      let mut packet: Box<[u8]> = packet.to_owned().into();
+      let mut walk: &mut [u8] = &mut packet;
+
+      while let Some((i, was_mime)) = walk.iter().enumerate().find_map(
+        |(i,&c)| match c {
+          SLIP_MIME_ESC => Some((i,true)),
+          SLIP_ESC      => Some((i,false)),
+          _ => None,
+        }
+      ) {
+        walk[i] = if was_mime { SLIP_ESC } else { SLIP_MIME_ESC };
+        if was_mime != TO_MIME {
+          match walk.get(i+1) {
+            Some(&SLIP_ESC_END) |
+            Some(&SLIP_ESC_ESC) => Ok(()),
+            _ => Err(PacketError::SLIP),
+          }?;
+          walk = &mut walk[i+2 ..];
+        } else {
+          walk = &mut walk[i+1 ..];
+        }
       }
+
+      Ok(packet)
+    })() {
+      Err(e) => error_handler(e),
+      Ok(packet) => out.push(packet),
     }
   }
-
+  out
 //  eprintln!(" after: {:?}", DumpHex(data));
 }
 
@@ -54,6 +71,7 @@ pub type FramesData = Vec<Vec<u8>>;
 pub struct Frames {
   frames: FramesData,
   total_len: usize,
+  tried_full: bool,
 }
 
 impl Debug for Frames {
@@ -68,10 +86,13 @@ impl Frames {
   pub fn add(&mut self, max: u32, frame: Frame) {
     if frame.len() == 0 { return }
     let new_total = self.total_len + frame.len() + 1;
-    if new_total > max.sat() { throw!(frame) }
+    if new_total > max.sat() { self.tried_full = true; throw!(frame); }
     self.total_len = new_total;
     self.frames.push(frame);
   }
+
+  #[inline] pub fn tried_full(&self) -> bool { self.tried_full }
+  #[inline] pub fn is_empty(&self) -> bool { self.frames.is_empty() }
 }
 
 impl From<Frames> for FramesData {
@@ -105,22 +126,30 @@ impl Debug for DumpHex<'_> {
 
 #[test]
 fn mime_slip_to_mime() {
-  fn chk(i: &[u8], exp: Result<&[u8], &str>) {
-    let mut p = i.to_owned();
-    match (exp, check_checkmtu_mimeswap::<true>(10, p.as_mut())) {
-      (Ok(exp), Ok(())) => assert_eq!( DumpHex(exp), DumpHex(&p) ),
-      (Err(exp), Err(got)) => assert!( got.to_string().contains(exp) ),
-      x => panic!("? {:?}", x),
-    }
+  use PacketError as PE;
+  const MTU: u32 = 10;
+
+  fn chk(i: &[u8], exp_p: &[&[u8]], exp_e: &[PacketError]) {
+    let mut got_e = vec![];
+    let got_p = check::<_,true>(MTU, i, |e| got_e.push(e));
+    assert_eq!( got_p.iter().map(|b| DumpHex(b)).collect_vec(),
+                exp_p.iter().map(|b| DumpHex(b)).collect_vec() );
+    assert_eq!( got_e,
+                exp_e );
   }
 
   chk( &[ SLIP_END, SLIP_ESC, SLIP_ESC_END, b'-',     b'X' ],
-    Ok(&[ SLIP_END, b'-',     SLIP_ESC_END, SLIP_ESC, b'X' ]) );
+    &[           &[ b'-',     SLIP_ESC_END, SLIP_ESC, b'X' ] ],
+    &[ PE::Empty ]);
 
-  chk( &[ SLIP_END, SLIP_ESC, b'y' ], Err("SLIP escape") );
+  chk( &[ SLIP_END, SLIP_ESC, b'y' ], &[],
+    &[ PE::Empty,   PE::SLIP ]);
 
   chk( &[ SLIP_END, b'-',     b'y' ],
-    Ok(&[ SLIP_END, SLIP_ESC, b'y' ]) );
+    &[           &[ SLIP_ESC, b'y' ] ],
+    &[ PE::Empty ]);
 
-  chk( &[b'x'; 20], Err("MTU"));
+  chk( &[b'x'; 20],
+    &[             ],
+    &[ PE::MTU { len: 20, mtu: MTU } ]);
 }