listener : UnixListener,
}
-type CSWrite = BufWriter<UnixStream>;
-
#[derive(Debug,Error,Clone)]
#[error("connection euid lookup failed (at connection initiation): {0}")]
pub struct ConnectionEuidDiscoverEerror(String);
struct CommandStream<'d> {
euid : Result<u32, ConnectionEuidDiscoverEerror>,
- read : io::Lines<BufReader<UnixStream>>,
- write : CSWrite,
desc : &'d str,
scope : Option<ManagementScope>,
amu : Option<InstanceRef>,
+ chan : MgmtChannel<UnixStream>,
}
type CSE = anyhow::Error;
impl CommandStream<'_> {
#[throws(CSE)]
pub fn mainloop(mut self) {
- while let Some(l) = self.read.next() {
- let l = l.context("read")?;
- decode_and_process(&mut self, &l)?;
- #[allow(clippy::write_with_newline)]
- write!(&mut self.write, "\n")?;
- self.write.flush()?;
- }
+ use MgmtChannelReadError::*;
+ let resp = match self.chan.read()? {
+ Ok(Some(cmd)) => execute(&mut self, cmd),
+ Err(IO(ioe)) => {
+ eprintln!("{}: io error reading: {}", &self.desc, ioe);
+ return;
+ }
+ Err(ParseFailed(s)) => MgmtResponse::Error {
+ error: MgmtError::ParseFailed(s),
+ },
+ };
+ serde_lexpr::to_writer(&mut cs.write, &resp)?;
}
#[throws(MgmtError)]
type ME = MgmtError;
from_instance_lock_error!{MgmtError}
-#[throws(CSE)]
-fn decode_and_process(cs: &mut CommandStream, s: &str) {
- let resp = self::decode_process_inner(cs, s)
- .unwrap_or_else(|e| MgmtResponse::Error {
- error: MgmtError::ParseFailed(format!("{}", e))
- });
- serde_lexpr::to_writer(&mut cs.write, &resp)?;
-}
-
-#[throws(ME)]
-fn decode_process_inner(cs: &mut CommandStream, s: &str)-> MgmtResponse {
- let cmd : MgmtCommand = serde_lexpr::from_str(s)?;
- execute(cs, cmd)?
-}
-
const USERLIST : &str = "/etc/userlist";
impl CommandStream<'_> {
})().unwrap_or_else(|e| format!("<error: {}>", e));
write!(&mut desc, " user={}", user_desc)?;
- let read = conn.try_clone().context("dup the command stream")?;
- let read = BufReader::new(read);
- let read = read.lines();
- let write = conn;
- let write = BufWriter::new(write);
+ let chan = MgmtChannel::new(conn);
let cs = CommandStream {
scope: None, amu: None, desc: &desc,
- read, write, euid,
+ chan, euid,
};
cs.mainloop()?;
--- /dev/null
+
+use crate::imports::*;
+
+#[derive(Debug,Error)]
+pub enum MgmtChannelReadError {
+ Parse(String),
+ IO(#[from] io::Error),
+}
+display_as_debug!{MgmtChannelError}
+
+#[derive(Clone,Debug)]
+pub struct MgmtChannel<U : Read + Write> {
+ read : io::Lines<BufReader<U>>,
+ write : BufWriter<U>,
+}
+
+impl<U: IoTryClone + Read + Write> MgmtChannel<U> {
+ #[throws(AE)]
+ fn new(conn: U) -> MgmtChannel<U> {
+ let read = conn.try_clone().context("dup the command stream")?;
+ let read = BufReader::new(read);
+ let read = read.lines();
+ let write = conn;
+ let write = BufWriter::new(write);
+ MgmtChannel { read, write }
+ }
+
+ #[throws(MgmthannelReadError)]
+ fn read<T>(&mut self) -> Option<T> {
+ let lq = self.read.next().map_err(MgmtChannelReadError::IO)?;
+ let incoming : T = lq.map(
+ |l| serde_lexpr::from_str(l)
+ ).collect().map_err(|e| MgmtChannelReadError::Parse("{}", &e))?;
+ incoming
+ }
+}
+
+trait IoTryClone : Sized {
+ fn try_clone(&self) -> io::Result<Self>;
+}
+
+impl IoTryClone for UnixStream {
+ fn try_clone(&self) -> io::Result<UnixStream> { self.try_clone() }
+}