From: Ian Jackson Date: Thu, 13 Aug 2020 16:05:39 +0000 (+0100) Subject: wip mgmtchannel X-Git-Tag: otter-0.2.0~1136 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=797ce9a5b6768bc3e9e0b7d77d4b872afa48df3d;p=otter.git wip mgmtchannel --- diff --git a/src/cmdlistener.rs b/src/cmdlistener.rs index 8300ff59..e846faea 100644 --- a/src/cmdlistener.rs +++ b/src/cmdlistener.rs @@ -40,18 +40,16 @@ type CSE = anyhow::Error; impl CommandStream<'_> { #[throws(CSE)] pub fn mainloop(mut self) { - use MgmtChannelReadError::*; - let resp = match self.chan.read() { - Ok(Some(cmd)) => match execute(&mut self, cmd), - Err(IO(ioe)) => { - eprintln!("{}: io error reading: {}", &self.desc, ioe); - return; - } - Err(Parse(s)) => MgmtResponse::Error { - error: MgmtError::ParseFailed(s), - }, - }; - self.chan.write(&resp)?; + loop { + use MgmtChannelReadError::*; + let resp = match self.chan.read() { + Ok(cmd) => execute(&mut self, cmd)?, + Err(EOF) => break, + Err(IO(e)) => Err(e).context("read command stream")?, + Err(Parse(s)) => MgmtResponse::Error { error : ParseFailed(s) }, + }; + self.chan.write(&resp).context("swrite command stream")?; + } } #[throws(MgmtError)] @@ -60,11 +58,13 @@ impl CommandStream<'_> { } } +/* impl From for MgmtError { fn from(je: serde_lexpr::Error) -> ME { ParseFailed(format!("{}", &je)) } } +*/ use MgmtCommand::*; use MgmtResponse::*; @@ -487,7 +487,7 @@ impl CommandListener { })().unwrap_or_else(|e| format!("", e)); write!(&mut desc, " user={}", user_desc)?; - let chan = MgmtChannel::new(conn); + let chan = MgmtChannel::new(conn)?; let cs = CommandStream { scope: None, amu: None, desc: &desc, diff --git a/src/mgmtchannel.rs b/src/mgmtchannel.rs index 91f22d3b..b3f1b488 100644 --- a/src/mgmtchannel.rs +++ b/src/mgmtchannel.rs @@ -3,12 +3,13 @@ use crate::imports::*; #[derive(Debug,Error)] pub enum MgmtChannelReadError { + EOF, Parse(String), IO(#[from] io::Error), } display_as_debug!{MgmtChannelReadError} -#[derive(Clone,Debug)] +#[derive(Debug)] pub struct MgmtChannel { read : io::Lines>, write : BufWriter, @@ -16,7 +17,7 @@ pub struct MgmtChannel { impl MgmtChannel { #[throws(AE)] - fn new(conn: U) -> MgmtChannel { + pub fn new(conn: U) -> MgmtChannel { let read = conn.try_clone().context("dup the command stream")?; let read = BufReader::new(read); let read = read.lines(); @@ -26,12 +27,12 @@ impl MgmtChannel { } #[throws(MgmtChannelReadError)] - pub fn read(&mut self) -> Option { - let lq = self.read.next().map_err(MgmtChannelReadError::IO)?; - let incoming : T = lq.map( - |l| serde_lexpr::from_str(l) - ).collect().map_err(|e| MgmtChannelReadError::Parse("{}", &e))?; - incoming + pub fn read(&mut self) -> T { + use MgmtChannelReadError::*; + let l = self.read.next().ok_or(EOF)??; + let v = serde_lexpr::from_str(&l) + .map_err(|e| Parse(format!("{}", &e)))?; + v } #[throws(io::Error)]