chiark / gitweb /
new max_batch_down protocol, define
[hippotat.git] / src / bin / client.rs
index 9ae11353aa43f3b47d93d1cddd88f33cc404f032..70fe9b96af78e000903f16e3108435121ecddc55 100644 (file)
@@ -5,6 +5,16 @@
 use hippotat::prelude::*;
 use hippotat_macros::into_crlfs;
 
+#[derive(StructOpt,Debug)]
+pub struct Opts {
+  /// Increase debug level
+  #[structopt(long, short="D", parse(from_occurrences))]
+  debug: usize,
+
+  #[structopt(flatten)]
+  config: config::Opts,
+}
+
 type OutstandingRequest<'r> = Pin<Box<
     dyn Future<Output=Option<Bytes>> + Send + 'r
     >>;
@@ -15,6 +25,7 @@ trait HCC: hyper::client::connect::Connect + Clone + Send + Sync + 'static { }
 
 struct ClientContext<'c,C> {
   ic: &'c InstanceConfig,
+  effective_http_timeout: Duration,
   hclient: &'c Arc<hyper::Client<C>>,
   reporter: &'c parking_lot::Mutex<Reporter<'c>>,
 }
@@ -50,11 +61,13 @@ fn submit_request<'r, 'c:'r, C:HCC>(
        {}
        {}
        {}
+       {}
        {}"#),
                        &c.ic.link.client,
                        token,
                        c.ic.target_requests_outstanding,
                        show_timeout,
+                       c.ic.max_batch_down,
   );
 
   let prefix2 = format!(into_crlfs!(
@@ -118,7 +131,7 @@ fn submit_request<'r, 'c:'r, C:HCC>(
 
   let resp = c.hclient.request(req);
   let fut = Box::pin(async move {
-    let r = async { tokio::time::timeout( c.ic.http_timeout, async {
+    let r = async { tokio::time::timeout( c.effective_http_timeout, async {
       let resp = resp.await.context("make request")?;
       let status = resp.status();
       let resp = resp.into_body();
@@ -127,7 +140,6 @@ fn submit_request<'r, 'c:'r, C:HCC>(
         .context("HTTP error fetching response body")?;
 
       if ! status.is_success() {
-        // xxx get body and log it
         throw!(anyhow!("HTTP error status={} body={:?}",
                        &status, String::from_utf8_lossy(&resp)));
       }
@@ -145,9 +157,6 @@ fn submit_request<'r, 'c:'r, C:HCC>(
   reqs.push(fut);
 }
 
-#[allow(unused_variables)] // xxx
-#[allow(unused_mut)] // xxx
-#[allow(dead_code)] // xxx
 async fn run_client<C:HCC>(
   ic: InstanceConfig,
   hclient: Arc<hyper::Client<C>>
@@ -161,6 +170,9 @@ async fn run_client<C:HCC>(
     reporter: &reporter,
     hclient: &hclient,
     ic: &ic,
+    effective_http_timeout: ic.http_timeout.checked_add(ic.http_timeout_grace)
+      .ok_or_else(|| anyhow!("calculate effective http timeout ({:?} + {:?})",
+                             ic.http_timeout, ic.http_timeout_grace))?,
   };
 
   let mut ipif = tokio::process::Command::new("sh")
@@ -250,7 +262,6 @@ async fn run_client<C:HCC>(
           (reqs.len() < ic.target_requests_outstanding.sat() ||
            (reqs.len() < ic.max_requests_outstanding.sat() &&
             ! upbound.is_empty()))
-          // xxx backpressure, if too much in rx_queue
           =>
         {
           submit_request(&c, &mut req_num, &mut reqs,
@@ -280,10 +291,27 @@ async fn run_client<C:HCC>(
 
 #[tokio::main]
 async fn main() -> Result<(), AE> {
-  let ics = config::read(LinkEnd::Client)?;
+  let opts = Opts::from_args();
+
+  let ics = config::read(&opts.config, LinkEnd::Client)?;
   if ics.is_empty() { throw!(anyhow!("no associations with server(s)")); }
 
-  env_logger::init();
+  {
+    let env = env_logger::Env::new()
+      .filter("HIPPOTAT_LOG")
+      .write_style("HIPPOTAT_LOG_STYLE");
+  
+    let mut logb = env_logger::Builder::new();
+    logb.filter(Some("hippotat"),
+                *[ log::LevelFilter::Info,
+                   log::LevelFilter::Debug ]
+                .get(opts.debug)
+                .unwrap_or(
+                  &log::LevelFilter::Trace
+                ));
+    logb.parse_env(env);
+    logb.init();
+  }
 
   let https = HttpsConnector::new();
   let hclient = hyper::Client::builder().build::<_, hyper::Body>(https);