let tx_stream = ipif.stdout.take().unwrap();
let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_END);
- let mut tx_defer = None;
+ let mut packets: VecDeque<Box<[u8]>> = default();
+ let mut upbound = Frames::default();
let stream_for_rx = ipif.stdin .take().unwrap();
async {
loop {
select! {
- packet = async {
- // cancellation safety: if this future is polled, we might
- // move out of tx_defer, but then we will be immediately
- // ready and yield it inot packet
- if let Some(y) = tx_defer.take() {
- Ok(Some(y))
- } else {
- tx_stream.next_segment().await
- }
+ data = tx_stream.next_segment(),
+ if packets.is_empty() =>
+ {
+ let data =
+ data.context("read from ipif")?
+ .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
+ // eprintln!("packet={:x?}", &packet);
+
+ packets = check
+ ::<_, true>(ic.mtu, &data, |e| match e {
+ e => error!("PACKET ERROR {}", e), // xxx
+ }).into();
},
- if tx_defer.is_none() &&
- reqs.len() < ic.max_requests_outstanding.sat() =>
+
+ _ = async { },
+ if ! upbound.tried_full() &&
+ ! packets.is_empty() =>
{
- let mut upbound = Frames::default();
- let mut to_process: Option<Result<Option<Vec<u8>>,_>>
- = Some(packet);
- while let Some(packet) = to_process.take() {
- let mut packet =
- packet.context("read from ipif")?
- .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
-// eprintln!("packet={:x?}", &packet);
- if let Ok(()) = check_checkmtu_mimeswap
- ::<true>(ic.mtu, &mut packet)
- {
- match upbound.add(ic.max_batch_up, packet) {
- Err(packet) => {
- tx_defer = Some(packet);
- break;
- }
- Ok(()) => { },
- };
- // we rely oin `next_segment` being cancellation-safe,
- // which isn't documented as true but seems reasonably safe
- pin!{ let next_segment = tx_stream.next_segment(); }
- to_process = match poll!(next_segment) {
- Poll::Ready(p) => Some(p),
- Poll::Pending => None,
- };
+ while let Some(packet) = packets.pop_front() {
+ match upbound.add(ic.max_batch_up, packet.into()/*xxx*/) {
+ Err(packet) => { packets.push_front(packet.into()/*xxx*/); break; }
+ Ok(()) => { },
}
}
- assert!( to_process.is_none() );
- dbg!(&reqs.len(), &upbound);
-
- //: impl futures::Stream<Cow<&[u8]>>
- submit_request(&c, &mut reqs, upbound.into())?;
- }
+ },
- () = async { },
+ _ = async { },
if reqs.len() < ic.target_requests_outstanding.sat() ||
- (tx_defer.is_some() &&
- reqs.len() < ic.max_requests_outstanding.sat()) =>
+ (reqs.len() < ic.max_requests_outstanding.sat() &&
+ ! upbound.is_empty()) =>
{
- let upbound = tx_defer.take().into_iter().collect_vec();
- submit_request(&c, &mut reqs, upbound)?;
- }
+ submit_request(&c, &mut reqs, mem::take(&mut upbound).into())?;
+ },
(got, goti, _) = async { future::select_all(&mut reqs).await },
if ! reqs.is_empty() =>
pub static SLIP_END_SLICE: &[u8] = &[SLIP_END];
-#[derive(Error,Debug,Copy,Clone)]
+#[derive(Error,Debug,Copy,Clone,Eq,PartialEq)]
pub enum PacketError {
+ #[error("empty packet")] Empty,
#[error("MTU exceeded ({len} > {mtu})")] MTU { len: usize, mtu: u32 },
#[error("Invalid SLIP escape sequence")] SLIP,
}
-#[throws(PacketError)]
-pub fn check_checkmtu_mimeswap<const TO_MIME: bool>
- (mtu: u32, data: &mut [u8])
+pub fn check<EH, const TO_MIME: bool>
+ (mtu: u32, data: &[u8], mut error_handler: EH)
+ -> Vec<Box<[u8]>>
+where EH: FnMut(PacketError)
{
// eprintln!("before: {:?}", DumpHex(data));
- for mut packet in data.split_mut(|&c| c == SLIP_END) {
- if packet.len() > mtu.sat() {
- throw!(PacketError::MTU { len: packet.len(), mtu })
- }
+ let mut out = vec![];
- while let Some((i, was_mime)) = packet.iter().enumerate().find_map(
- |(i,&c)| match c {
- SLIP_MIME_ESC => Some((i,true)),
- SLIP_ESC => Some((i,false)),
- _ => None,
+ for packet in data.split(|&c| c == SLIP_END) {
+ match (||{
+ if packet.len() == 0 {
+ throw!(PacketError::Empty)
+ }
+ if packet.len() > mtu.sat() {
+ throw!(PacketError::MTU { len: packet.len(), mtu });
}
- ) {
- packet[i] = if was_mime { SLIP_ESC } else { SLIP_MIME_ESC };
- if was_mime != TO_MIME {
- match packet.get(i+1) {
- Some(&SLIP_ESC_END) |
- Some(&SLIP_ESC_ESC) => Ok(()),
- _ => throw!(PacketError::SLIP),
- }?;
- packet = &mut packet[i+2 ..];
- } else {
- packet = &mut packet[i+1 ..];
+
+ let mut packet: Box<[u8]> = packet.to_owned().into();
+ let mut walk: &mut [u8] = &mut packet;
+
+ while let Some((i, was_mime)) = walk.iter().enumerate().find_map(
+ |(i,&c)| match c {
+ SLIP_MIME_ESC => Some((i,true)),
+ SLIP_ESC => Some((i,false)),
+ _ => None,
+ }
+ ) {
+ walk[i] = if was_mime { SLIP_ESC } else { SLIP_MIME_ESC };
+ if was_mime != TO_MIME {
+ match walk.get(i+1) {
+ Some(&SLIP_ESC_END) |
+ Some(&SLIP_ESC_ESC) => Ok(()),
+ _ => Err(PacketError::SLIP),
+ }?;
+ walk = &mut walk[i+2 ..];
+ } else {
+ walk = &mut walk[i+1 ..];
+ }
}
+
+ Ok(packet)
+ })() {
+ Err(e) => error_handler(e),
+ Ok(packet) => out.push(packet),
}
}
-
+ out
// eprintln!(" after: {:?}", DumpHex(data));
}
pub struct Frames {
frames: FramesData,
total_len: usize,
+ tried_full: bool,
}
impl Debug for Frames {
pub fn add(&mut self, max: u32, frame: Frame) {
if frame.len() == 0 { return }
let new_total = self.total_len + frame.len() + 1;
- if new_total > max.sat() { throw!(frame) }
+ if new_total > max.sat() { self.tried_full = true; throw!(frame); }
self.total_len = new_total;
self.frames.push(frame);
}
+
+ #[inline] pub fn tried_full(&self) -> bool { self.tried_full }
+ #[inline] pub fn is_empty(&self) -> bool { self.frames.is_empty() }
}
impl From<Frames> for FramesData {
#[test]
fn mime_slip_to_mime() {
- fn chk(i: &[u8], exp: Result<&[u8], &str>) {
- let mut p = i.to_owned();
- match (exp, check_checkmtu_mimeswap::<true>(10, p.as_mut())) {
- (Ok(exp), Ok(())) => assert_eq!( DumpHex(exp), DumpHex(&p) ),
- (Err(exp), Err(got)) => assert!( got.to_string().contains(exp) ),
- x => panic!("? {:?}", x),
- }
+ use PacketError as PE;
+ const MTU: u32 = 10;
+
+ fn chk(i: &[u8], exp_p: &[&[u8]], exp_e: &[PacketError]) {
+ let mut got_e = vec![];
+ let got_p = check::<_,true>(MTU, i, |e| got_e.push(e));
+ assert_eq!( got_p.iter().map(|b| DumpHex(b)).collect_vec(),
+ exp_p.iter().map(|b| DumpHex(b)).collect_vec() );
+ assert_eq!( got_e,
+ exp_e );
}
chk( &[ SLIP_END, SLIP_ESC, SLIP_ESC_END, b'-', b'X' ],
- Ok(&[ SLIP_END, b'-', SLIP_ESC_END, SLIP_ESC, b'X' ]) );
+ &[ &[ b'-', SLIP_ESC_END, SLIP_ESC, b'X' ] ],
+ &[ PE::Empty ]);
- chk( &[ SLIP_END, SLIP_ESC, b'y' ], Err("SLIP escape") );
+ chk( &[ SLIP_END, SLIP_ESC, b'y' ], &[],
+ &[ PE::Empty, PE::SLIP ]);
chk( &[ SLIP_END, b'-', b'y' ],
- Ok(&[ SLIP_END, SLIP_ESC, b'y' ]) );
+ &[ &[ SLIP_ESC, b'y' ] ],
+ &[ PE::Empty ]);
- chk( &[b'x'; 20], Err("MTU"));
+ chk( &[b'x'; 20],
+ &[ ],
+ &[ PE::MTU { len: 20, mtu: MTU } ]);
}