From: Ian Jackson Date: Sun, 16 May 2021 18:52:16 +0000 (+0100) Subject: timereader: New module - does not work properly yet X-Git-Tag: otter-0.6.0~230 X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=0b5b98c50b72fbb435985558022c68591c1cf38e;p=otter.git timereader: New module - does not work properly yet At first I tried doing this by a contraption which would dup2 /dev/null over the socket, but it turns out that (with Linux on my laptop at least) this doesn't interrupt recvfrom in another thread. Signed-off-by: Ian Jackson --- diff --git a/Cargo.lock b/Cargo.lock index e4eacaf2..62dadd54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2267,6 +2267,7 @@ dependencies = [ "lazy_static", "libc", "log 0.4.14", + "mio 0.7.11", "nix", "num", "num-derive", diff --git a/Cargo.toml b/Cargo.toml index 4a66e13a..5ce82df1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,6 +85,7 @@ zip="0.5" enum-map = { version="1" , features=["serde" ] } flexi_logger = { version="0.17" , features=["specfile" ] } index_vec = { version="0.1.1", features=["serde" ] } +mio = { version="0.7", features=["os-ext" ] } serde = { version="1" , features=["derive", "rc"] } strum = { version="0.20" , features=["derive" ] } diff --git a/src/lib.rs b/src/lib.rs index 6fa03187..d81a87b8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -37,6 +37,7 @@ pub mod shapelib; pub mod spec; pub mod sse; pub mod termprogress; +pub mod timedread; pub mod tz; pub mod updates; pub mod ui; diff --git a/src/prelude.rs b/src/prelude.rs index 6b2371a6..c0704a40 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -36,7 +36,7 @@ pub use std::num::{NonZeroUsize, TryFromIntError, Wrapping}; pub use std::os::linux::fs::MetadataExt; // todo why linux for st_mode?? pub use std::os::unix; pub use std::os::unix::ffi::OsStrExt; -pub use std::os::unix::io::IntoRawFd; +pub use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; pub use std::os::unix::net::UnixStream; pub use std::os::unix::process::{CommandExt, ExitStatusExt}; pub use std::path::PathBuf; diff --git a/src/timedread.rs b/src/timedread.rs new file mode 100644 index 00000000..4a7d27fd --- /dev/null +++ b/src/timedread.rs @@ -0,0 +1,97 @@ +// Copyright 2020-2021 Ian Jackson and contributors to Otter +// SPDX-License-Identifier: AGPL-3.0-or-later +// There is NO WARRANTY. + +use crate::prelude::*; + +use io::ErrorKind as EK; + +use nix::fcntl::{fcntl, OFlag, FcntlArg}; +use nix::Error as NE; +use nix::errno::Errno; + +use mio::Token; + +pub struct TimedFdReader { + fd: Fd, + poll: mio::Poll, + events: mio::event::Events, + deadline: Option, +} + +pub struct Fd(RawFd); +impl Fd { + pub fn from_raw_fd(fd: RawFd) -> Self { Fd(fd) } + fn extract_raw_fd(&mut self) -> RawFd { mem::replace(&mut self.0, -1) } +} +impl IntoRawFd for Fd { + fn into_raw_fd(mut self) -> RawFd { self.extract_raw_fd() } +} +impl AsRawFd for Fd { + fn as_raw_fd(&self) -> RawFd { self.0 } +} + +#[ext(pub)] +impl nix::Error { + fn as_ioe(self) -> io::Error { + match self { + NE::Sys(e) => return io::Error::from_raw_os_error(e as i32), + NE::UnsupportedOperation => EK::Unsupported, + NE::InvalidPath => EK::InvalidData, + NE::InvalidUtf8 => EK::InvalidData, + }.into() + } +} + +impl TimedFdReader { + /// Takes ownership of the fd + #[throws(io::Error)] + pub fn from_fd(fd: Fd) -> Self { + fcntl(fd.as_raw_fd(), FcntlArg::F_SETFL(OFlag::O_NONBLOCK)) + .map_err(|e| e.as_ioe())?; + + let poll = mio::Poll::new()?; + poll.registry().register( + &mut mio::unix::SourceFd(&fd.as_raw_fd()), + Token(0), + mio::Interest::READABLE, + )?; + let events = mio::event::Events::with_capacity(1); + TimedFdReader { fd, poll, events, deadline: None } + } +} + +impl Read for TimedFdReader { + #[throws(io::Error)] + fn read(&mut self, buf: &mut [u8]) -> usize { + 'again: loop { + for event in &self.events { + if event.token() == Token(0) { + match unistd::read(self.fd.as_raw_fd(), buf) { + Ok(got) => { break 'again got }, + Err(NE::Sys(Errno::EINTR)) => { continue 'again } + Err(NE::Sys(Errno::EAGAIN)) => break, + Err(ne) => throw!(ne.as_ioe()), + } + } + } + + let timeout = if let Some(deadline) = self.deadline { + let now = Instant::now(); + if now >= deadline { throw!(io::ErrorKind::TimedOut) } + Some(deadline - now) + } else { + None + }; + self.poll.poll(&mut self.events, timeout)?; + if self.events.is_empty() { throw!(io::ErrorKind::TimedOut) } + } + } +} + +impl Drop for Fd { + fn drop(&mut self) { + let fd = self.extract_raw_fd(); + if fd >= 2 { let _ = nix::unistd::close(fd); } + } +}