chiark / gitweb /
timereader: New module - does not work properly yet
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 16 May 2021 18:52:16 +0000 (19:52 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Mon, 17 May 2021 10:06:31 +0000 (11:06 +0100)
At first I tried doing this by a contraption which would dup2
/dev/null over the socket, but it turns out that (with Linux on my
laptop at least) this doesn't interrupt recvfrom in another thread.

Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
Cargo.lock
Cargo.toml
src/lib.rs
src/prelude.rs
src/timedread.rs [new file with mode: 0644]

index e4eacaf2f519378527d2bfa5999f19af74a44349..62dadd542b464e1c0ed8d2ca3ca42f74b3c29474 100644 (file)
@@ -2267,6 +2267,7 @@ dependencies = [
  "lazy_static",
  "libc",
  "log 0.4.14",
+ "mio 0.7.11",
  "nix",
  "num",
  "num-derive",
index 4a66e13afabc9e7003b5eda9fd864461475e4d0e..5ce82df12ec66d4c47ccdc1fad755eb24936b3a5 100644 (file)
@@ -85,6 +85,7 @@ zip="0.5"
 enum-map     = { version="1"    , features=["serde"       ] }
 flexi_logger = { version="0.17" , features=["specfile"    ] }
 index_vec    = { version="0.1.1", features=["serde"       ] }
+mio          = { version="0.7",   features=["os-ext"      ] }
 serde        = { version="1"    , features=["derive", "rc"] }
 strum        = { version="0.20" , features=["derive"      ] }
 
index 6fa03187bd59ba08477e169b27151f4ea0c98598..d81a87b8d88e49583ca804fdf80e7d06267c9ec2 100644 (file)
@@ -37,6 +37,7 @@ pub mod shapelib;
 pub mod spec;
 pub mod sse;
 pub mod termprogress;
+pub mod timedread;
 pub mod tz;
 pub mod updates;
 pub mod ui;
index 6b2371a6146351b24b92d2c3b54a4ded94d3ec69..c0704a4010c6075734503f867146ba8dbd6a5586 100644 (file)
@@ -36,7 +36,7 @@ pub use std::num::{NonZeroUsize, TryFromIntError, Wrapping};
 pub use std::os::linux::fs::MetadataExt; // todo why linux for st_mode??
 pub use std::os::unix;
 pub use std::os::unix::ffi::OsStrExt;
-pub use std::os::unix::io::IntoRawFd;
+pub use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd};
 pub use std::os::unix::net::UnixStream;
 pub use std::os::unix::process::{CommandExt, ExitStatusExt};
 pub use std::path::PathBuf;
diff --git a/src/timedread.rs b/src/timedread.rs
new file mode 100644 (file)
index 0000000..4a7d27f
--- /dev/null
@@ -0,0 +1,97 @@
+// Copyright 2020-2021 Ian Jackson and contributors to Otter
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// There is NO WARRANTY.
+
+use crate::prelude::*;
+
+use io::ErrorKind as EK;
+
+use nix::fcntl::{fcntl, OFlag, FcntlArg};
+use nix::Error as NE;
+use nix::errno::Errno;
+
+use mio::Token;
+
+pub struct TimedFdReader {
+  fd: Fd,
+  poll: mio::Poll,
+  events: mio::event::Events,
+  deadline: Option<Instant>,
+}
+
+pub struct Fd(RawFd);
+impl Fd {
+  pub fn from_raw_fd(fd: RawFd) -> Self { Fd(fd) }
+  fn extract_raw_fd(&mut self) -> RawFd { mem::replace(&mut self.0, -1) }
+}
+impl IntoRawFd for Fd {
+  fn into_raw_fd(mut self) -> RawFd { self.extract_raw_fd() }
+}
+impl AsRawFd for Fd {
+  fn as_raw_fd(&self) -> RawFd { self.0 }
+}
+
+#[ext(pub)]
+impl nix::Error {
+  fn as_ioe(self) -> io::Error {
+    match self {
+      NE::Sys(e) => return io::Error::from_raw_os_error(e as i32),
+      NE::UnsupportedOperation => EK::Unsupported,
+      NE::InvalidPath          => EK::InvalidData,
+      NE::InvalidUtf8          => EK::InvalidData,
+    }.into()
+  }
+}
+
+impl TimedFdReader {
+  /// Takes ownership of the fd
+  #[throws(io::Error)]
+  pub fn from_fd(fd: Fd) -> Self {
+    fcntl(fd.as_raw_fd(), FcntlArg::F_SETFL(OFlag::O_NONBLOCK))
+      .map_err(|e| e.as_ioe())?;
+
+    let poll = mio::Poll::new()?;
+    poll.registry().register(
+      &mut mio::unix::SourceFd(&fd.as_raw_fd()),
+      Token(0),
+      mio::Interest::READABLE,
+    )?;
+    let events = mio::event::Events::with_capacity(1);
+    TimedFdReader { fd, poll, events, deadline: None }
+  }
+}
+
+impl Read for TimedFdReader {
+  #[throws(io::Error)]
+  fn read(&mut self, buf: &mut [u8]) -> usize {
+    'again: loop {
+      for event in &self.events {
+        if event.token() == Token(0) {
+          match unistd::read(self.fd.as_raw_fd(), buf) {
+            Ok(got) => { break 'again got },
+            Err(NE::Sys(Errno::EINTR)) => { continue 'again }
+            Err(NE::Sys(Errno::EAGAIN)) => break,
+            Err(ne) => throw!(ne.as_ioe()),
+          }
+        }
+      }
+
+      let timeout = if let Some(deadline) = self.deadline {
+        let now = Instant::now();
+        if now >= deadline { throw!(io::ErrorKind::TimedOut) }
+        Some(deadline - now)
+      } else {
+        None
+      };
+      self.poll.poll(&mut self.events, timeout)?;
+      if self.events.is_empty() { throw!(io::ErrorKind::TimedOut) }
+    }
+  }
+}
+
+impl Drop for Fd {
+  fn drop(&mut self) {
+    let fd = self.extract_raw_fd();
+    if fd >= 2 { let _ = nix::unistd::close(fd); }
+  }
+}