chiark / gitweb /
client: wip code
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 26 Jul 2021 23:59:55 +0000 (00:59 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 26 Jul 2021 23:59:55 +0000 (00:59 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/client.rs
src/lib.rs
src/prelude.rs
src/slip.rs [new file with mode: 0644]

index a9c0fad45a0380c50fd73bf86d9efbe7805ca224..912354ca9bcb08aa9a6b6fb6c4912400b142d702 100644 (file)
@@ -37,15 +37,57 @@ async fn run_client<C>(ic: InstanceConfig, hclient: Arc<hyper::Client<C>>)
 
   let tx_stream = ipif.stdout.take().unwrap();
   let mut tx_stream = tokio::io::BufReader::new(tx_stream).split(SLIP_ESC);
+  let mut tx_defer = None;
+
   let stream_for_rx = ipif.stdin .take().unwrap();
+
   let mut reqs = Vec::with_capacity(ic.max_requests_outstanding.sat());
 
   async {
     loop {
       select! {
         packet = tx_stream.next_segment(),
-        if reqs.len() < ic.max_requests_outstanding.sat() => {
-          let packet = packet.context("read from ipif")?;
+        if tx_defer.is_none() &&
+           reqs.len() < ic.max_requests_outstanding.sat() =>
+        {
+          let mut upbound_total = 0;
+          let mut upbound = vec![];
+          let mut to_process: Option<Result<Option<Vec<u8>>,_>>
+            = Some(packet);
+          while let Some(packet) = to_process.take() {
+            let mut packet =
+              packet.context("read from ipif")?
+              .ok_or_else(|| io::Error::from(io::ErrorKind::UnexpectedEof))?;
+            if let Ok(()) = slip::check_checkmtu_mimeify(&mut packet, &ic) {
+              let new_upbound_total = packet.len() + upbound_total + 1;
+              if new_upbound_total > ic.max_batch_up.sat() {
+                tx_defer = Some(packet);
+                break;
+              }
+              upbound_total = new_upbound_total;
+              upbound.push(packet);
+              // we rely on `next_segment` being cancellation-safe,
+              // which isn't documented as true but seems reasonably safe
+              pin!{ let next_segment = tx_stream.next_segment(); }
+              to_process = match poll!(next_segment) {
+                Poll::Ready(p) => Some(p),
+                Poll::Pending => None,
+              };
+            }
+          }
+          assert!( to_process.is_none() );
+          dbg!(&reqs.len(), &upbound_total, &upbound.len());
+/*
+          Body
+            made out of Stream
+            made out of futures::stream::iter
+            
+          
+          let datalen = 
+          
+          let o = 0;
+          let i = 
+*/
           reqs.push(());
           // xxx make new request
         }
index 613ae59417fa4f0591e7248dec4250d0d8e9018d..1bdd108e231ccd60b2f3a26cb23382f3fd892dd2 100644 (file)
@@ -14,5 +14,6 @@
 pub mod prelude;
 
 pub mod config;
+pub mod slip;
 pub mod types;
 pub mod utils;
index 521d82e549a64b47d7cbd95fa3c388e30d380156..d87b2b051257aa300aaee00b83b5ea9143fd4258 100644 (file)
@@ -16,11 +16,12 @@ pub use std::panic;
 pub use std::process;
 pub use std::str::FromStr;
 pub use std::sync::Arc;
+pub use std::task::Poll;
 
 pub use anyhow::{anyhow, Context};
 pub use extend::ext;
 pub use fehler::{throw, throws};
-pub use futures::future;
+pub use futures::{poll, future};
 pub use hyper::Uri;
 pub use hyper_tls::HttpsConnector;
 pub use ipnet::IpNet;
@@ -29,6 +30,7 @@ pub use lazy_regex::{regex_is_match, regex_replace_all};
 pub use log::{debug, info, error};
 pub use structopt::StructOpt;
 pub use tokio::io::AsyncBufReadExt;
+pub use tokio::pin;
 pub use tokio::select;
 pub use tokio::task;
 pub use tokio::time::Duration;
@@ -37,6 +39,7 @@ pub use void::{self, Void, ResultVoidExt, ResultVoidErrExt};
 pub use crate::config::{self, InstanceConfig, u32Ext as _};
 pub use crate::utils::*;
 pub use crate::types::*;
+pub use crate::slip;
 
 pub use anyhow::Error as AE;
 pub use ErrorKind as EK;
diff --git a/src/slip.rs b/src/slip.rs
new file mode 100644 (file)
index 0000000..0de0947
--- /dev/null
@@ -0,0 +1,10 @@
+// Copyright 2021 Ian Jackson and contributors to Hippotat
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// There is NO WARRANTY.
+
+use crate::prelude::*;
+
+#[throws(AE)]
+pub fn check_checkmtu_mimeify(_data: &mut [u8], _ic: &InstanceConfig) {
+  // xxx
+}