chiark / gitweb /
wip mgmtchannel
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Thu, 13 Aug 2020 15:04:57 +0000 (16:04 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Thu, 13 Aug 2020 15:04:57 +0000 (16:04 +0100)
src/cmdlistener.rs
src/imports.rs
src/lib.rs
src/mgmtchannel.rs [new file with mode: 0644]

index 171d77752b145f0285eb77440dc0cadeb9efc023..dc7dc877b7d1fa42db4fec72bfce0d1b574966ca 100644 (file)
@@ -23,19 +23,16 @@ pub struct CommandListener {
   listener : UnixListener,
 }
 
-type CSWrite = BufWriter<UnixStream>;
-
 #[derive(Debug,Error,Clone)]
 #[error("connection euid lookup failed (at connection initiation): {0}")]
 pub struct ConnectionEuidDiscoverEerror(String);
 
 struct CommandStream<'d> {
   euid : Result<u32, ConnectionEuidDiscoverEerror>,
-  read : io::Lines<BufReader<UnixStream>>,
-  write : CSWrite,
   desc : &'d str,
   scope : Option<ManagementScope>,
   amu : Option<InstanceRef>,
+  chan : MgmtChannel<UnixStream>,
 }
 
 type CSE = anyhow::Error;
@@ -43,13 +40,18 @@ type CSE = anyhow::Error;
 impl CommandStream<'_> {
   #[throws(CSE)]
   pub fn mainloop(mut self) {
-    while let Some(l) = self.read.next() {
-      let l = l.context("read")?;
-      decode_and_process(&mut self, &l)?;
-      #[allow(clippy::write_with_newline)]
-      write!(&mut self.write, "\n")?;
-      self.write.flush()?;
-    }
+    use MgmtChannelReadError::*;
+    let resp = match self.chan.read()? {
+      Ok(Some(cmd)) => execute(&mut self, cmd),
+      Err(IO(ioe)) => {
+        eprintln!("{}: io error reading: {}", &self.desc, ioe);
+        return;
+      }
+      Err(ParseFailed(s)) => MgmtResponse::Error {
+        error: MgmtError::ParseFailed(s),
+      },
+    };
+    serde_lexpr::to_writer(&mut cs.write, &resp)?;
   }
 
   #[throws(MgmtError)]
@@ -71,21 +73,6 @@ use MgmtError::*;
 type ME = MgmtError;
 from_instance_lock_error!{MgmtError}
 
-#[throws(CSE)]
-fn decode_and_process(cs: &mut CommandStream, s: &str) {
-  let resp = self::decode_process_inner(cs, s)
-    .unwrap_or_else(|e| MgmtResponse::Error {
-      error: MgmtError::ParseFailed(format!("{}", e))
-    });
-  serde_lexpr::to_writer(&mut cs.write, &resp)?;
-}
-
-#[throws(ME)]
-fn decode_process_inner(cs: &mut CommandStream, s: &str)-> MgmtResponse {
-  let cmd : MgmtCommand = serde_lexpr::from_str(s)?;
-  execute(cs, cmd)?
-}
-
 const USERLIST : &str = "/etc/userlist";
 
 impl CommandStream<'_> {
@@ -500,15 +487,11 @@ impl CommandListener {
         })().unwrap_or_else(|e| format!("<error: {}>", e));
         write!(&mut desc, " user={}", user_desc)?;
 
-        let read = conn.try_clone().context("dup the command stream")?;
-        let read = BufReader::new(read);
-        let read = read.lines();
-        let write = conn;
-        let write = BufWriter::new(write);
+        let chan = MgmtChannel::new(conn);
 
         let cs = CommandStream {
           scope: None, amu: None, desc: &desc,
-          read, write, euid,
+          chan, euid,
         };
         cs.mainloop()?;
         
index bb48b0ec73cfe1c8108e5cf403c310d8b7122f8f..1933d6b76581e8c1da4949e8551c78cdca04dbbb 100644 (file)
@@ -73,6 +73,7 @@ pub use crate::slotmap_slot_idx::*;
 pub use crate::cmdlistener::*;
 pub use crate::spec::*;
 pub use crate::client::*;
+pub use crate::mgmtchannel::*;
 pub use crate::api::{Lens,TransparentLens};
 pub use crate::utils::OrdExt;
 
index 638c57dd30b0c7e51f8a67e33f4d345afd89f59e..08e778334e21b0544cb38ebdb90b2113b7944d81 100644 (file)
@@ -20,4 +20,5 @@ pub mod cmdlistener;
 pub mod commands;
 pub mod utils;
 pub mod client;
+pub mod mgmtchannel;
 #[path="slotmap-slot-idx.rs"] pub mod slotmap_slot_idx;
diff --git a/src/mgmtchannel.rs b/src/mgmtchannel.rs
new file mode 100644 (file)
index 0000000..7d210f3
--- /dev/null
@@ -0,0 +1,44 @@
+
+use crate::imports::*;
+
+#[derive(Debug,Error)]
+pub enum MgmtChannelReadError {
+  Parse(String),
+  IO(#[from] io::Error),
+}
+display_as_debug!{MgmtChannelError}
+
+#[derive(Clone,Debug)]
+pub struct MgmtChannel<U : Read + Write> {
+  read : io::Lines<BufReader<U>>,
+  write : BufWriter<U>,
+}
+
+impl<U: IoTryClone + Read + Write> MgmtChannel<U> {
+  #[throws(AE)]
+  fn new(conn: U) -> MgmtChannel<U> {
+    let read = conn.try_clone().context("dup the command stream")?;
+    let read = BufReader::new(read);
+    let read = read.lines();
+    let write = conn;
+    let write = BufWriter::new(write);
+    MgmtChannel { read, write }
+  }
+
+  #[throws(MgmthannelReadError)]
+  fn read<T>(&mut self) -> Option<T> {
+    let lq = self.read.next().map_err(MgmtChannelReadError::IO)?;
+    let incoming : T = lq.map(
+      |l| serde_lexpr::from_str(l)
+    ).collect().map_err(|e| MgmtChannelReadError::Parse("{}", &e))?;
+    incoming
+  }
+}
+
+trait IoTryClone : Sized {
+  fn try_clone(&self) -> io::Result<Self>;
+}
+
+impl IoTryClone for UnixStream {
+  fn try_clone(&self) -> io::Result<UnixStream> { self.try_clone() }
+}