chiark / gitweb /
wip mgmtchannel
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Thu, 13 Aug 2020 16:05:39 +0000 (17:05 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Thu, 13 Aug 2020 16:05:39 +0000 (17:05 +0100)
src/cmdlistener.rs
src/mgmtchannel.rs

index 8300ff59d900b14a2d57271fc6a87593bef2b56e..e846faea1a518900e6fb393a3c02be84f068e105 100644 (file)
@@ -40,18 +40,16 @@ type CSE = anyhow::Error;
 impl CommandStream<'_> {
   #[throws(CSE)]
   pub fn mainloop(mut self) {
-    use MgmtChannelReadError::*;
-    let resp = match self.chan.read() {
-      Ok(Some(cmd)) => match execute(&mut self, cmd),
-      Err(IO(ioe)) => {
-        eprintln!("{}: io error reading: {}", &self.desc, ioe);
-        return;
-      }
-      Err(Parse(s)) => MgmtResponse::Error {
-        error: MgmtError::ParseFailed(s),
-      },
-    };
-    self.chan.write(&resp)?;
+    loop {
+      use MgmtChannelReadError::*;
+      let resp = match self.chan.read() {
+        Ok(cmd) => execute(&mut self, cmd)?,
+        Err(EOF) => break,
+        Err(IO(e)) => Err(e).context("read command stream")?,
+        Err(Parse(s)) => MgmtResponse::Error { error : ParseFailed(s) },
+      };
+      self.chan.write(&resp).context("swrite command stream")?;
+    }
   }
 
   #[throws(MgmtError)]
@@ -60,11 +58,13 @@ impl CommandStream<'_> {
   }
 }
 
+/*
 impl From<serde_lexpr::Error> for MgmtError {
   fn from(je: serde_lexpr::Error) -> ME {
     ParseFailed(format!("{}", &je))
   }
 }
+*/
 
 use MgmtCommand::*;
 use MgmtResponse::*;
@@ -487,7 +487,7 @@ impl CommandListener {
         })().unwrap_or_else(|e| format!("<error: {}>", e));
         write!(&mut desc, " user={}", user_desc)?;
 
-        let chan = MgmtChannel::new(conn);
+        let chan = MgmtChannel::new(conn)?;
 
         let cs = CommandStream {
           scope: None, amu: None, desc: &desc,
index 91f22d3bec5b9dc0892f6691c036f840861abae5..b3f1b4888ae20b05b6034ed383fb5a92066dbfdf 100644 (file)
@@ -3,12 +3,13 @@ use crate::imports::*;
 
 #[derive(Debug,Error)]
 pub enum MgmtChannelReadError {
+  EOF,
   Parse(String),
   IO(#[from] io::Error),
 }
 display_as_debug!{MgmtChannelReadError}
 
-#[derive(Clone,Debug)]
+#[derive(Debug)]
 pub struct MgmtChannel<U : Read + Write> {
   read : io::Lines<BufReader<U>>,
   write : BufWriter<U>,
@@ -16,7 +17,7 @@ pub struct MgmtChannel<U : Read + Write> {
 
 impl<U: IoTryClone + Read + Write> MgmtChannel<U> {
   #[throws(AE)]
-  fn new(conn: U) -> MgmtChannel<U> {
+  pub fn new(conn: U) -> MgmtChannel<U> {
     let read = conn.try_clone().context("dup the command stream")?;
     let read = BufReader::new(read);
     let read = read.lines();
@@ -26,12 +27,12 @@ impl<U: IoTryClone + Read + Write> MgmtChannel<U> {
   }
 
   #[throws(MgmtChannelReadError)]
-  pub fn read<T>(&mut self) -> Option<T> {
-    let lq = self.read.next().map_err(MgmtChannelReadError::IO)?;
-    let incoming : T = lq.map(
-      |l| serde_lexpr::from_str(l)
-    ).collect().map_err(|e| MgmtChannelReadError::Parse("{}", &e))?;
-    incoming
+  pub fn read<T:DeserializeOwned>(&mut self) -> T {
+    use MgmtChannelReadError::*;
+    let l = self.read.next().ok_or(EOF)??;
+    let v = serde_lexpr::from_str(&l)
+      .map_err(|e| Parse(format!("{}", &e)))?;
+    v
   }
 
   #[throws(io::Error)]