From 90e63b983ef72c2a016da198055a0a429def9a95 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Mon, 17 May 2021 11:41:03 +0100 Subject: [PATCH] progress: Use threads to do concurrent progress reports. Signed-off-by: Ian Jackson --- Cargo.lock | 1 + Cargo.toml | 1 + src/mgmtchannel.rs | 39 +++++++++++++++++++++++++++++---------- 3 files changed, 31 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 62dadd54..28d041f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2248,6 +2248,7 @@ dependencies = [ "chrono", "chrono-tz", "console", + "crossbeam-utils", "delegate", "derive-into-owned", "digest 0.9.0", diff --git a/Cargo.toml b/Cargo.toml index d0a3f753..ac3febcc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,7 @@ cast_trait_object="0.1" chrono="0.4" chrono-tz="0.5" console="0.14" +crossbeam-utils="0.8" delegate="0.5" derive-into-owned="0.1" digest="0.9" diff --git a/src/mgmtchannel.rs b/src/mgmtchannel.rs index 8d8ad02a..f205bd64 100644 --- a/src/mgmtchannel.rs +++ b/src/mgmtchannel.rs @@ -47,7 +47,7 @@ impl Debug for MgmtChannel{ } impl MgmtChannel { - pub const PROGRESS: ProgressUpdateMode = PUM::Simplex; + pub const PROGRESS: ProgressUpdateMode = PUM::Duplex; #[throws(AE)] pub fn connect(socket_path: &str) -> MgmtChannel { @@ -81,15 +81,34 @@ impl MgmtChannel { let mut wbulk = self.write .write_withbulk().context("start sending command")? .respond(&cmd).context("send command")?; - io::copy(up,&mut wbulk).context("copy bulk upload")?; - wbulk.finish().context("finish sending command and data")?; - let (mut resp, mut rbulk) = - self.read.read_withbulk() - .context("read response")?; - while let MR::Progress(pi) = resp { - resp = (&mut rbulk).read_rmp()?; - progress.report(&pi); - } + let read = &mut self.read; + + let (resp, mut rbulk) = crossbeam_utils::thread::scope(|scope| { + let thr = scope.spawn(move |_| { + io::copy(up, &mut wbulk).context("copy")?; + wbulk.finish().context("finish")?; + Ok::<_,AE>(()) + }); + + let (mut resp, mut rbulk) = + read.read_withbulk() + .context("read response")?; + + while let MR::Progress(pi) = resp { + resp = (&mut rbulk).read_rmp()?; + progress.report(&pi); + } + + let r = thr.join().expect("bulk data upload thread paniced"); + if let Err(e) = r { + progress.clear(); + warn!("bulk data upload failed: {}", e); + } + Ok::<_,AE>((resp, rbulk)) + }) + .expect("bulk data upload thread panicked, not reaped") + ?; + progress.clear(); match &resp { Progress(_) => panic!(), -- 2.30.2