}
impl MgmtChannel {
- pub const PROGRESS: ProgressUpdateMode = PUM::Simplex;
+ pub const PROGRESS: ProgressUpdateMode = PUM::Duplex;
#[throws(AE)]
pub fn connect(socket_path: &str) -> MgmtChannel {
let mut wbulk = self.write
.write_withbulk().context("start sending command")?
.respond(&cmd).context("send command")?;
- io::copy(up,&mut wbulk).context("copy bulk upload")?;
- wbulk.finish().context("finish sending command and data")?;
- let (mut resp, mut rbulk) =
- self.read.read_withbulk()
- .context("read response")?;
- while let MR::Progress(pi) = resp {
- resp = (&mut rbulk).read_rmp()?;
- progress.report(&pi);
- }
+ let read = &mut self.read;
+
+ let (resp, mut rbulk) = crossbeam_utils::thread::scope(|scope| {
+ let thr = scope.spawn(move |_| {
+ io::copy(up, &mut wbulk).context("copy")?;
+ wbulk.finish().context("finish")?;
+ Ok::<_,AE>(())
+ });
+
+ let (mut resp, mut rbulk) =
+ read.read_withbulk()
+ .context("read response")?;
+
+ while let MR::Progress(pi) = resp {
+ resp = (&mut rbulk).read_rmp()?;
+ progress.report(&pi);
+ }
+
+ let r = thr.join().expect("bulk data upload thread paniced");
+ if let Err(e) = r {
+ progress.clear();
+ warn!("bulk data upload failed: {}", e);
+ }
+ Ok::<_,AE>((resp, rbulk))
+ })
+ .expect("bulk data upload thread panicked, not reaped")
+ ?;
+
progress.clear();
match &resp {
Progress(_) => panic!(),