From: Ian Jackson Date: Mon, 10 May 2021 17:52:14 +0000 (+0100) Subject: mgmtchannel: Prepare for progress updates X-Git-Tag: otter-0.6.0~354 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=513d1c2f56000ab4f311ef63b2ae2779f9e3f382;p=otter.git mgmtchannel: Prepare for progress updates Signed-off-by: Ian Jackson --- diff --git a/daemon/cmdlistener.rs b/daemon/cmdlistener.rs index e207be3d..9efcf7b7 100644 --- a/daemon/cmdlistener.rs +++ b/daemon/cmdlistener.rs @@ -81,6 +81,8 @@ fn execute_and_respond(cs: &mut CommandStreamData, cmd: MgmtCommand, where R: Read, W: Write { let mut bulk_download: Option> = None; + let for_response = for_response + .write_withbulk().context("start to respond")?; let mut cmd_s = log_enabled!(log::Level::Info) .as_some_from(|| format!("{:?}", &cmd)) @@ -334,7 +336,7 @@ fn execute_and_respond(cs: &mut CommandStreamData, cmd: MgmtCommand, } }; - let mut wf = for_response.write_withbulk(&resp).context("respond")?; + let mut wf = for_response.respond(&resp).context("respond")?; if let Some(mut bulk_download) = bulk_download { io::copy(&mut bulk_download, &mut wf).context("download")?; } diff --git a/src/mgmtchannel.rs b/src/mgmtchannel.rs index 016b83b5..c5668cc1 100644 --- a/src/mgmtchannel.rs +++ b/src/mgmtchannel.rs @@ -76,7 +76,9 @@ impl MgmtChannel { P: FnMut(ProgressInfo) -> Result<(),AE>, { use MgmtResponse::*; - let mut wbulk = self.write.write_withbulk(&cmd).context("send command")?; + let mut wbulk = self.write + .write_withbulk().context("start sending command")? + .respond(&cmd).context("send command")?; io::copy(up,&mut wbulk).context("copy bulk upload")?; wbulk.finish().context("finish sending command and data")?; let (mut resp, mut rbulk) = diff --git a/src/packetframe.rs b/src/packetframe.rs index 40a74134..1b9f0b06 100644 --- a/src/packetframe.rs +++ b/src/packetframe.rs @@ -356,21 +356,16 @@ impl FrameWriter { } #[throws(MgmtChannelWriteError)] - pub fn write_withbulk<'c,T>(&'c mut self, val: &T) - -> WriteFrame - where T: Serialize + Debug + pub fn write_withbulk<'c>(&'c mut self) -> ResponseWriter { - let mut f = self.new_frame()?; - rmp_serde::encode::write_named(&mut f, val)?; - trace!("writing {:?}", val); - f + ResponseWriter { f: self.new_frame()? } } #[throws(MgmtChannelWriteError)] pub fn write(&mut self, val: &T) where T: Serialize + Debug { - let f = self.write_withbulk(val)?; + let f = self.write_withbulk()?.respond(val)?; f.finish()?; } } @@ -415,6 +410,19 @@ impl<'w,W:Write> Write for WriteFrame<'w,W> { fn flush(&mut self) { self.buf.flush()? } } +pub struct ResponseWriter<'c,W:Write> { f: WriteFrame<'c,W> } + +impl<'c,W:Write> ResponseWriter<'c,W> { + #[throws(MgmtChannelWriteError)] + pub fn respond<'t,T>(mut self, val: &'t T) -> WriteFrame<'c, impl Write + 'c> + where T: Serialize + Debug + { + rmp_serde::encode::write_named(&mut self.f, val)?; + trace!("writing {:?}", val); + self.f + } +} + // ==================== tests ==================== #[test]