From 4d05f6c22e5dff378c06f306b5c776b926ffc520 Mon Sep 17 00:00:00 2001 From: Simon Tatham Date: Sat, 30 Dec 2023 11:57:51 +0000 Subject: [PATCH] Begin setting up the streaming subthread system. This gives Client two new methods. The first starts a subthread listening to a stream, given a lambda function that you pass into the subthread to handle the response. The second runs in the context of the main thread, and the idea is that you give that same response straight back to the client. The owner of a Client object is responsible for doing the plumbing in between, to arrange that a response passed to the lambda in the subthread is transferred back to the main thread and given to process_stream_update. The implementation of that plumbing in Tui sends the data back to the main thread over a clone of the sync_channel we already had, so that it's received in the same event loop as terminal input. So far, nothing is done with the updates. Also not done: it would be nice to be able to ask a subthread to terminate, if it's not needed any more. That way if the user tries to read one of the fast-moving public timelines, we can dismiss its update thread as soon as they're not looking at that file any more. --- src/client.rs | 92 +++++++++++++++++++++++++++++++++++++++++++++++++-- src/main.rs | 45 ------------------------- src/tui.rs | 33 ++++++++++++++++-- 3 files changed, 120 insertions(+), 50 deletions(-) diff --git a/src/client.rs b/src/client.rs index 85fded5..d0649a4 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,6 @@ use reqwest::Url; use std::collections::{HashMap, VecDeque}; +use std::io::Read; use super::auth::{AuthConfig,AuthError}; use super::types::*; @@ -19,6 +20,24 @@ pub enum FeedId { User(String, Boosts, Replies), } +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum StreamId { + User, +} + +#[derive(Debug, PartialEq, Eq, Clone)] +pub enum StreamResponse { + Line(String), + BadUTF8, + EOF, +} + +#[derive(Debug)] +pub struct StreamUpdate { + pub id: StreamId, + pub response: StreamResponse, +} + #[derive(Debug)] pub struct Feed { pub ids: VecDeque, // ids, whether of statuses, accounts or what @@ -150,7 +169,7 @@ impl Client { } } - fn api_request(&self, req: Req) -> + fn api_request_cl(&self, client: &reqwest::blocking::Client, req: Req) -> Result<(String, reqwest::blocking::RequestBuilder), ClientError> { if req.method != reqwest::Method::GET && !self.permit_write { @@ -166,10 +185,16 @@ impl Client { urlstr.clone(), e.to_string())), }?; - Ok((urlstr, self.client.request(req.method, url) + Ok((urlstr, client.request(req.method, url) .bearer_auth(&self.auth.user_token))) } + fn api_request(&self, req: Req) -> + Result<(String, reqwest::blocking::RequestBuilder), ClientError> + { + self.api_request_cl(&self.client, req) + } + pub fn cache_account(&mut self, ac: &Account) { self.accounts.insert(ac.id.to_string(), ac.clone()); } @@ -328,4 +353,67 @@ impl Client { self.feeds.get(id).expect( "should only ever borrow feeds that have been fetched") } + + pub fn start_streaming_thread( + &self, id: &StreamId, receiver: Box) -> Result<(), ClientError> + { + let req = match id { + StreamId::User => Req::get("streaming/user"), + }; + + let client = reqwest::blocking::Client::new(); + let (_url, req) = self.api_request_cl(&client, req)?; + let mut rsp = req.send()?; + + let id = id.clone(); + + let _joinhandle = std::thread::spawn(move || { + let mut vec: Vec = Vec::new(); + + const BUFSIZE: usize = 4096; + let mut buf: [u8; BUFSIZE] = [0; BUFSIZE]; + while let Ok(sz) = rsp.read(&mut buf) { + let read = &buf[..sz]; + vec.extend_from_slice(read); + vec = 'outer: loop { + for line in vec.split_inclusive(|c| *c == 10) { + if !line.ends_with(&[10]) { + let mut newvec = Vec::new(); + newvec.extend_from_slice(line); + break 'outer newvec; + } else if line.starts_with(&[':' as u8]) { + // Ignore lines starting with ':': in the + // Mastodon streaming APIs those are + // heartbeat events whose sole purpose is + // to avoid the connection timing out. We + // don't communicate them back to the main + // thread. + } else { + let rsp = match std::str::from_utf8(&line) { + Err(_) => StreamResponse::BadUTF8, + Ok(d) => StreamResponse::Line(d.to_owned()), + }; + receiver(StreamUpdate { + id: id.clone(), + response: rsp + }); + } + } + // If we didn't get a partial line inside the loop, then + // we must have an empty buffer here + break Vec::new(); + }; + } + + receiver(StreamUpdate { + id: id.clone(), + response: StreamResponse::EOF + }); + }); + + Ok(()) + } + + pub fn process_stream_update(&mut self, _up: StreamUpdate) { + } } diff --git a/src/main.rs b/src/main.rs index 402a502..3ac0e40 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,51 +4,6 @@ use std::process::ExitCode; use mastodonochrome::tui::Tui; -/* -#[allow(unused)] -fn streaming() -> Result<(), mastodonochrome::OurError> { - let auth = AuthConfig::load()?; - - let client = reqwest::blocking::Client::new(); - let req = client.get(auth.instance_url + "/api/v1/streaming/user") - .bearer_auth(auth.user_token); - - let mut rsp = match req.send() { - Err(e) => Err(OurError::Fatal( - format!("unable to make HTTP request: {}", e))), - Ok(d) => Ok(d), - }?; - - let mut vec: Vec = Vec::new(); - - const BUFSIZE: usize = 4096; - let mut buf: [u8; BUFSIZE] = [0; BUFSIZE]; - while let Ok(sz) = rsp.read(&mut buf) { - let read = &buf[..sz]; - vec.extend_from_slice(read); - vec = 'outer: loop { - for line in vec.split_inclusive(|c| *c == 10) { - if !line.ends_with(&[10]) { - let mut newvec = Vec::new(); - newvec.extend_from_slice(line); - break 'outer newvec; - } else { - match std::str::from_utf8(&line) { - Err(e) => { dbg!(e); () }, - Ok(d) => { dbg!(d); () }, - }; - } - } - // If we didn't get a partial line inside the loop, then - // we must have an empty buffer here - break Vec::new(); - }; - } - - Ok(()) -} -*/ - fn main() -> ExitCode { match Tui::run() { Ok(_) => ExitCode::from(0), diff --git a/src/tui.rs b/src/tui.rs index b36fa50..cb550a1 100644 --- a/src/tui.rs +++ b/src/tui.rs @@ -14,7 +14,7 @@ use std::io::{Stdout, Write, stdout}; use unicode_width::UnicodeWidthStr; use super::activity_stack::*; -use super::client::{Client, ClientError}; +use super::client::{Client, ClientError, StreamId, StreamUpdate}; use super::coloured_string::{ColouredString, ColouredStringSlice}; use super::menu::*; use super::file::*; @@ -117,8 +117,10 @@ fn ratatui_set_string(buf: &mut Buffer, x: usize, y: usize, } } +#[derive(Debug)] enum SubthreadEvent { TermEv(Event), + StreamEv(StreamUpdate), } pub enum PhysicalAction { @@ -172,6 +174,13 @@ impl From for TuiError { } } } +impl From for TuiError { + fn from(err: ClientError) -> Self { + TuiError { + message: err.to_string(), + } + } +} impl std::fmt::Display for TuiError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> @@ -210,9 +219,23 @@ impl Tui { state: TuiLogicalState::new(), client: client, }; - let result = tui.main_loop(); - let _ = &tui.subthread_sender; // FIXME: this just suppresses a warning + { + let sender = tui.subthread_sender.clone(); + tui.client.start_streaming_thread( + &StreamId::User, Box::new(move |update| { + if let Err(_) = sender.send( + SubthreadEvent::StreamEv(update)) { + // It would be nice to do something about this + // error, but what _can_ we do? We can hardly send + // an error notification back to the main thread, + // because that communication channel is just what + // we've had a problem with. + } + }))?; + } + + let result = tui.main_loop(); disable_raw_mode()?; stdout().execute(LeaveAlternateScreen)?; @@ -313,6 +336,10 @@ impl Tui { _ => (), } }, + Ok(SubthreadEvent::StreamEv(update)) => { + self.client.process_stream_update(update); + // FIXME: perhaps also notify state of a change + } } } } -- 2.30.2