From 850ecc49f7e3ec439de2bb054de927964d3bc0ec Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Tue, 3 Aug 2021 18:43:30 +0100 Subject: [PATCH] messages Signed-off-by: Ian Jackson --- src/bin/client.rs | 19 ++++++++++++++----- src/prelude.rs | 6 ++++-- src/reporter.rs | 4 ++-- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/bin/client.rs b/src/bin/client.rs index e437f51..656b4b9 100644 --- a/src/bin/client.rs +++ b/src/bin/client.rs @@ -22,6 +22,7 @@ struct ClientContext<'c,C> { #[throws(AE)] fn submit_request<'r, 'c:'r, C:HCC>( c: &'c ClientContext, + req_num: &mut ReqNum, reqs: &mut Vec>, upbound: FramesData, ) { @@ -39,6 +40,8 @@ fn submit_request<'r, 'c:'r, C:HCC>( write!(token, " ").unwrap(); base64::encode_config_buf(&hmac, BASE64_CONFIG, &mut token); + let req_num = { *req_num += 1; *req_num }; + let prefix1 = format!(into_crlfs!( r#"--b Content-Type: text/plain; charset="utf-8" @@ -94,6 +97,9 @@ fn submit_request<'r, 'c:'r, C:HCC>( as_ref, ).map(|b| b.len()).sum(); + trace!("{} #{}: frames={} bytes={}", + &c.ic, req_num, upbound.len(), body_len); + let body = hyper::body::Body::wrap_stream( futures::stream::iter( content!( @@ -129,7 +135,7 @@ fn submit_request<'r, 'c:'r, C:HCC>( Ok::<_,AE>(resp) }).await? }.await; - let r = c.reporter.lock().report(r); + let r = c.reporter.lock().report(req_num, r); if r.is_none() { tokio::time::sleep(c.ic.http_retry).await; @@ -175,6 +181,8 @@ async fn run_client( Ok::<_,io::Error>(()) }); + let mut req_num: ReqNum = 0; + let tx_stream = ipif.stdout.take().unwrap(); let rx_stream = ipif.stdin .take().unwrap(); @@ -213,8 +221,8 @@ async fn run_client( Ok(()) }, |e| match e { PE::Empty => { }, - e@ PE::Src(_) => debug!("{}: tx: discarding: {}", &ic, e), - e => error!("{}: tx: discarding: {}", &ic, e), + e@ PE::Src(_) => debug!("{}: tx discarding: {}", &ic, e), + e => error!("{}: tx discarding: {}", &ic, e), }); }, @@ -237,7 +245,8 @@ async fn run_client( // xxx backpressure, if too much in rx_queue => { - submit_request(&c, &mut reqs, mem::take(&mut upbound).into())?; + submit_request(&c, &mut req_num, &mut reqs, + mem::take(&mut upbound).into())?; }, (got, goti, _) = async { future::select_all(&mut reqs).await }, @@ -252,7 +261,7 @@ async fn run_client( let addr = ip_packet_addr::(header)?; if addr != ic.link.client.0 { throw!(PE::Dst(addr)) } Ok(()) - }, |e| error!("{}: rx: discarding: {}", &ic, e)); + }, |e| error!("{} #{}: rx discarding: {}", &ic, req_num, e)); dbg!(&rx_queue.len()); rx_queue = default(); // xxx diff --git a/src/prelude.rs b/src/prelude.rs index e95aa9c..7d2b4b8 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -33,7 +33,7 @@ pub use hyper_tls::HttpsConnector; pub use ipnet::IpNet; pub use itertools::{iproduct, Itertools}; pub use lazy_regex::{regex_is_match, regex_replace_all}; -pub use log::{debug, info, error}; +pub use log::{trace, debug, info, warn, error}; pub use structopt::StructOpt; pub use thiserror::Error; pub use tokio::io::AsyncBufReadExt; @@ -49,6 +49,8 @@ pub use crate::reporter::*; pub use crate::types::*; pub use crate::slip::*; +pub type ReqNum = u64; + pub use anyhow::Error as AE; pub use ErrorKind as EK; pub use PacketError as PE; @@ -57,7 +59,7 @@ pub const SLIP_END: u8 = 0o300; // c0 pub const SLIP_ESC: u8 = 0o333; // db pub const SLIP_ESC_END: u8 = 0o334; // dc pub const SLIP_ESC_ESC: u8 = 0o335; // dd -pub const SLIP_MIME_ESC: u8 = b'-'; +pub const SLIP_MIME_ESC: u8 = b'-'; // 2d pub use base64::STANDARD as BASE64_CONFIG; diff --git a/src/reporter.rs b/src/reporter.rs index af4a924..55e645e 100644 --- a/src/reporter.rs +++ b/src/reporter.rs @@ -13,7 +13,7 @@ impl<'r> Reporter<'r> { ic } } - pub fn report(&mut self, r: Result) -> Option { + pub fn report(&mut self, req_num: ReqNum, r: Result) -> Option { match r { Ok(t) => { // xxx something something success @@ -21,7 +21,7 @@ impl<'r> Reporter<'r> { }, Err(e) => { // xxx something something error - error!("ERROR {} {:?}", self.ic, e); + warn!("{} #{}: {:?}", self.ic, req_num, e); None }, } -- 2.30.2