chiark / gitweb /
progress: Use threads to do concurrent progress reports.
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 17 May 2021 10:41:03 +0000 (11:41 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 17 May 2021 14:00:29 +0000 (15:00 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
Cargo.lock
Cargo.toml
src/mgmtchannel.rs

index 62dadd542b464e1c0ed8d2ca3ca42f74b3c29474..28d041f24977936831f800369f3e0cf8a8d81809 100644 (file)
@@ -2248,6 +2248,7 @@ dependencies = [
  "chrono",
  "chrono-tz",
  "console",
+ "crossbeam-utils",
  "delegate",
  "derive-into-owned",
  "digest 0.9.0",
index d0a3f75330cbb32063a64a7f66b5802ffbbfac8a..ac3febcc0edd3a05ba1515a03e764c8b613175db 100644 (file)
@@ -38,6 +38,7 @@ cast_trait_object="0.1"
 chrono="0.4"
 chrono-tz="0.5"
 console="0.14"
+crossbeam-utils="0.8"
 delegate="0.5"
 derive-into-owned="0.1"
 digest="0.9"
index 8d8ad02a24334d44aa3729bfcefe1096a025ccf0..f205bd64bf3965a8f0c3a1b00cf44cb35ccdcd7c 100644 (file)
@@ -47,7 +47,7 @@ impl Debug for MgmtChannel{
 }
 
 impl MgmtChannel {
-  pub const PROGRESS: ProgressUpdateMode = PUM::Simplex;
+  pub const PROGRESS: ProgressUpdateMode = PUM::Duplex;
 
   #[throws(AE)]
   pub fn connect(socket_path: &str) -> MgmtChannel {
@@ -81,15 +81,34 @@ impl MgmtChannel {
     let mut wbulk = self.write
       .write_withbulk().context("start sending command")?
       .respond(&cmd).context("send command")?;
-    io::copy(up,&mut wbulk).context("copy bulk upload")?;
-    wbulk.finish().context("finish sending command and data")?;
-    let (mut resp, mut rbulk) =
-      self.read.read_withbulk()
-      .context("read response")?;
-    while let MR::Progress(pi) = resp {
-      resp = (&mut rbulk).read_rmp()?;
-      progress.report(&pi);
-    }
+    let read = &mut self.read;
+
+    let (resp, mut rbulk) = crossbeam_utils::thread::scope(|scope| {
+      let thr = scope.spawn(move |_| {
+        io::copy(up, &mut wbulk).context("copy")?;
+        wbulk.finish().context("finish")?;
+        Ok::<_,AE>(())
+      });
+
+      let (mut resp, mut rbulk) =
+        read.read_withbulk()
+        .context("read response")?;
+
+      while let MR::Progress(pi) = resp {
+        resp = (&mut rbulk).read_rmp()?;
+        progress.report(&pi);
+      }
+
+      let r = thr.join().expect("bulk data upload thread paniced");
+      if let Err(e) = r {
+        progress.clear();
+        warn!("bulk data upload failed: {}", e);
+      }
+      Ok::<_,AE>((resp, rbulk))
+    })
+      .expect("bulk data upload thread panicked, not reaped")
+      ?;
+
     progress.clear();
     match &resp {
       Progress(_) => panic!(),