From de473c8b15e0e2b1de2aa7f52f831e6c7cfe1f3a Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Thu, 13 Aug 2020 16:04:57 +0100 Subject: [PATCH] wip mgmtchannel --- src/cmdlistener.rs | 47 +++++++++++++++------------------------------- src/imports.rs | 1 + src/lib.rs | 1 + src/mgmtchannel.rs | 44 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 61 insertions(+), 32 deletions(-) create mode 100644 src/mgmtchannel.rs diff --git a/src/cmdlistener.rs b/src/cmdlistener.rs index 171d7775..dc7dc877 100644 --- a/src/cmdlistener.rs +++ b/src/cmdlistener.rs @@ -23,19 +23,16 @@ pub struct CommandListener { listener : UnixListener, } -type CSWrite = BufWriter; - #[derive(Debug,Error,Clone)] #[error("connection euid lookup failed (at connection initiation): {0}")] pub struct ConnectionEuidDiscoverEerror(String); struct CommandStream<'d> { euid : Result, - read : io::Lines>, - write : CSWrite, desc : &'d str, scope : Option, amu : Option, + chan : MgmtChannel, } type CSE = anyhow::Error; @@ -43,13 +40,18 @@ type CSE = anyhow::Error; impl CommandStream<'_> { #[throws(CSE)] pub fn mainloop(mut self) { - while let Some(l) = self.read.next() { - let l = l.context("read")?; - decode_and_process(&mut self, &l)?; - #[allow(clippy::write_with_newline)] - write!(&mut self.write, "\n")?; - self.write.flush()?; - } + use MgmtChannelReadError::*; + let resp = match self.chan.read()? { + Ok(Some(cmd)) => execute(&mut self, cmd), + Err(IO(ioe)) => { + eprintln!("{}: io error reading: {}", &self.desc, ioe); + return; + } + Err(ParseFailed(s)) => MgmtResponse::Error { + error: MgmtError::ParseFailed(s), + }, + }; + serde_lexpr::to_writer(&mut cs.write, &resp)?; } #[throws(MgmtError)] @@ -71,21 +73,6 @@ use MgmtError::*; type ME = MgmtError; from_instance_lock_error!{MgmtError} -#[throws(CSE)] -fn decode_and_process(cs: &mut CommandStream, s: &str) { - let resp = self::decode_process_inner(cs, s) - .unwrap_or_else(|e| MgmtResponse::Error { - error: MgmtError::ParseFailed(format!("{}", e)) - }); - serde_lexpr::to_writer(&mut cs.write, &resp)?; -} - -#[throws(ME)] -fn decode_process_inner(cs: &mut CommandStream, s: &str)-> MgmtResponse { - let cmd : MgmtCommand = serde_lexpr::from_str(s)?; - execute(cs, cmd)? -} - const USERLIST : &str = "/etc/userlist"; impl CommandStream<'_> { @@ -500,15 +487,11 @@ impl CommandListener { })().unwrap_or_else(|e| format!("", e)); write!(&mut desc, " user={}", user_desc)?; - let read = conn.try_clone().context("dup the command stream")?; - let read = BufReader::new(read); - let read = read.lines(); - let write = conn; - let write = BufWriter::new(write); + let chan = MgmtChannel::new(conn); let cs = CommandStream { scope: None, amu: None, desc: &desc, - read, write, euid, + chan, euid, }; cs.mainloop()?; diff --git a/src/imports.rs b/src/imports.rs index bb48b0ec..1933d6b7 100644 --- a/src/imports.rs +++ b/src/imports.rs @@ -73,6 +73,7 @@ pub use crate::slotmap_slot_idx::*; pub use crate::cmdlistener::*; pub use crate::spec::*; pub use crate::client::*; +pub use crate::mgmtchannel::*; pub use crate::api::{Lens,TransparentLens}; pub use crate::utils::OrdExt; diff --git a/src/lib.rs b/src/lib.rs index 638c57dd..08e77833 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,4 +20,5 @@ pub mod cmdlistener; pub mod commands; pub mod utils; pub mod client; +pub mod mgmtchannel; #[path="slotmap-slot-idx.rs"] pub mod slotmap_slot_idx; diff --git a/src/mgmtchannel.rs b/src/mgmtchannel.rs new file mode 100644 index 00000000..7d210f36 --- /dev/null +++ b/src/mgmtchannel.rs @@ -0,0 +1,44 @@ + +use crate::imports::*; + +#[derive(Debug,Error)] +pub enum MgmtChannelReadError { + Parse(String), + IO(#[from] io::Error), +} +display_as_debug!{MgmtChannelError} + +#[derive(Clone,Debug)] +pub struct MgmtChannel { + read : io::Lines>, + write : BufWriter, +} + +impl MgmtChannel { + #[throws(AE)] + fn new(conn: U) -> MgmtChannel { + let read = conn.try_clone().context("dup the command stream")?; + let read = BufReader::new(read); + let read = read.lines(); + let write = conn; + let write = BufWriter::new(write); + MgmtChannel { read, write } + } + + #[throws(MgmthannelReadError)] + fn read(&mut self) -> Option { + let lq = self.read.next().map_err(MgmtChannelReadError::IO)?; + let incoming : T = lq.map( + |l| serde_lexpr::from_str(l) + ).collect().map_err(|e| MgmtChannelReadError::Parse("{}", &e))?; + incoming + } +} + +trait IoTryClone : Sized { + fn try_clone(&self) -> io::Result; +} + +impl IoTryClone for UnixStream { + fn try_clone(&self) -> io::Result { self.try_clone() } +} -- 2.30.2