chiark / gitweb /
Begin setting up the streaming subthread system.
authorSimon Tatham <anakin@pobox.com>
Sat, 30 Dec 2023 11:57:51 +0000 (11:57 +0000)
committerSimon Tatham <anakin@pobox.com>
Sun, 31 Dec 2023 07:11:19 +0000 (07:11 +0000)
This gives Client two new methods. The first starts a subthread
listening to a stream, given a lambda function that you pass into the
subthread to handle the response. The second runs in the context of
the main thread, and the idea is that you give that same response
straight back to the client.

The owner of a Client object is responsible for doing the plumbing in
between, to arrange that a response passed to the lambda in the
subthread is transferred back to the main thread and given to
process_stream_update.

The implementation of that plumbing in Tui sends the data back to the
main thread over a clone of the sync_channel we already had, so that
it's received in the same event loop as terminal input.

So far, nothing is done with the updates.

Also not done: it would be nice to be able to ask a subthread to
terminate, if it's not needed any more. That way if the user tries to
read one of the fast-moving public timelines, we can dismiss its
update thread as soon as they're not looking at that file any more.

src/client.rs
src/main.rs
src/tui.rs

index 85fded56fe6dd6c9a7e5e5cf81dfdeb31cac78c5..d0649a4020221cd0c2ac457940990fd2770a2ad0 100644 (file)
@@ -1,5 +1,6 @@
 use reqwest::Url;
 use std::collections::{HashMap, VecDeque};
+use std::io::Read;
 
 use super::auth::{AuthConfig,AuthError};
 use super::types::*;
@@ -19,6 +20,24 @@ pub enum FeedId {
     User(String, Boosts, Replies),
 }
 
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum StreamId {
+    User,
+}
+
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum StreamResponse {
+    Line(String),
+    BadUTF8,
+    EOF,
+}
+
+#[derive(Debug)]
+pub struct StreamUpdate {
+    pub id: StreamId,
+    pub response: StreamResponse,
+}
+
 #[derive(Debug)]
 pub struct Feed {
     pub ids: VecDeque<String>, // ids, whether of statuses, accounts or what
@@ -150,7 +169,7 @@ impl Client {
         }
     }
 
-    fn api_request(&self, req: Req) ->
+    fn api_request_cl(&self, client: &reqwest::blocking::Client, req: Req) ->
         Result<(String, reqwest::blocking::RequestBuilder), ClientError>
     {
         if req.method != reqwest::Method::GET && !self.permit_write {
@@ -166,10 +185,16 @@ impl Client {
                urlstr.clone(), e.to_string())),
         }?;
 
-        Ok((urlstr, self.client.request(req.method, url)
+        Ok((urlstr, client.request(req.method, url)
             .bearer_auth(&self.auth.user_token)))
     }
 
+    fn api_request(&self, req: Req) ->
+        Result<(String, reqwest::blocking::RequestBuilder), ClientError>
+    {
+        self.api_request_cl(&self.client, req)
+    }
+
     pub fn cache_account(&mut self, ac: &Account) {
         self.accounts.insert(ac.id.to_string(), ac.clone());
     }
@@ -328,4 +353,67 @@ impl Client {
         self.feeds.get(id).expect(
             "should only ever borrow feeds that have been fetched")
     }
+
+    pub fn start_streaming_thread<Recv: Fn(StreamUpdate) + Send + 'static>(
+        &self, id: &StreamId, receiver: Box<Recv>) -> Result<(), ClientError>
+    {
+        let req = match id {
+            StreamId::User => Req::get("streaming/user"),
+        };
+        
+        let client = reqwest::blocking::Client::new();
+        let (_url, req) = self.api_request_cl(&client, req)?;
+        let mut rsp = req.send()?;
+
+        let id = id.clone();
+
+        let _joinhandle = std::thread::spawn(move || {
+            let mut vec: Vec<u8> = Vec::new();
+
+            const BUFSIZE: usize = 4096;
+            let mut buf: [u8; BUFSIZE] = [0; BUFSIZE];
+            while let Ok(sz) = rsp.read(&mut buf) {
+                let read = &buf[..sz];
+                vec.extend_from_slice(read);
+                vec = 'outer: loop {
+                    for line in vec.split_inclusive(|c| *c == 10) {
+                        if !line.ends_with(&[10]) {
+                            let mut newvec = Vec::new();
+                            newvec.extend_from_slice(line);
+                            break 'outer newvec;
+                        } else if line.starts_with(&[':' as u8]) {
+                            // Ignore lines starting with ':': in the
+                            // Mastodon streaming APIs those are
+                            // heartbeat events whose sole purpose is
+                            // to avoid the connection timing out. We
+                            // don't communicate them back to the main
+                            // thread.
+                        } else {
+                            let rsp = match std::str::from_utf8(&line) {
+                                Err(_) => StreamResponse::BadUTF8,
+                                Ok(d) => StreamResponse::Line(d.to_owned()),
+                            };
+                            receiver(StreamUpdate {
+                                id: id.clone(),
+                                response: rsp
+                            });
+                        }
+                    }
+                    // If we didn't get a partial line inside the loop, then
+                    // we must have an empty buffer here
+                    break Vec::new();
+                };
+            }
+
+            receiver(StreamUpdate {
+                id: id.clone(),
+                response: StreamResponse::EOF
+            });
+        });
+
+        Ok(())
+    }
+
+    pub fn process_stream_update(&mut self, _up: StreamUpdate) {
+    }
 }
index 402a5029b8d2d03a77cfd5e0098167c657106e91..3ac0e40adb7a3bfc0a42696b89591782c9e8c4c5 100644 (file)
@@ -4,51 +4,6 @@ use std::process::ExitCode;
 
 use mastodonochrome::tui::Tui;
 
-/*
-#[allow(unused)]
-fn streaming() -> Result<(), mastodonochrome::OurError> {
-    let auth = AuthConfig::load()?;
-
-    let client = reqwest::blocking::Client::new();
-    let req = client.get(auth.instance_url + "/api/v1/streaming/user")
-        .bearer_auth(auth.user_token);
-
-    let mut rsp = match req.send() {
-        Err(e) => Err(OurError::Fatal(
-            format!("unable to make HTTP request: {}", e))),
-        Ok(d) => Ok(d),
-    }?;
-
-    let mut vec: Vec<u8> = Vec::new();
-
-    const BUFSIZE: usize = 4096;
-    let mut buf: [u8; BUFSIZE] = [0; BUFSIZE];
-    while let Ok(sz) = rsp.read(&mut buf) {
-        let read = &buf[..sz];
-        vec.extend_from_slice(read);
-        vec = 'outer: loop {
-            for line in vec.split_inclusive(|c| *c == 10) {
-                if !line.ends_with(&[10]) {
-                    let mut newvec = Vec::new();
-                    newvec.extend_from_slice(line);
-                    break 'outer newvec;
-                } else {
-                    match std::str::from_utf8(&line) {
-                        Err(e) => { dbg!(e); () },
-                        Ok(d) => { dbg!(d); () },
-                    };
-                }
-            }
-            // If we didn't get a partial line inside the loop, then
-            // we must have an empty buffer here
-            break Vec::new();
-        };
-    }
-
-    Ok(())
-}
-*/
-
 fn main() -> ExitCode {
     match Tui::run() {
         Ok(_) => ExitCode::from(0),
index b36fa5022316c5a47751d0f5e9261e4da5c16a1a..cb550a1e4c7f9f356d1d3225f7e69d7710a8d327 100644 (file)
@@ -14,7 +14,7 @@ use std::io::{Stdout, Write, stdout};
 use unicode_width::UnicodeWidthStr;
 
 use super::activity_stack::*;
-use super::client::{Client, ClientError};
+use super::client::{Client, ClientError, StreamId, StreamUpdate};
 use super::coloured_string::{ColouredString, ColouredStringSlice};
 use super::menu::*;
 use super::file::*;
@@ -117,8 +117,10 @@ fn ratatui_set_string(buf: &mut Buffer, x: usize, y: usize,
     }
 }
 
+#[derive(Debug)]
 enum SubthreadEvent {
     TermEv(Event),
+    StreamEv(StreamUpdate),
 }
 
 pub enum PhysicalAction {
@@ -172,6 +174,13 @@ impl From<AuthError> for TuiError {
         }
     }
 }
+impl From<ClientError> for TuiError {
+    fn from(err: ClientError) -> Self {
+        TuiError {
+            message: err.to_string(),
+        }
+    }
+}
 
 impl std::fmt::Display for TuiError {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) ->
@@ -210,9 +219,23 @@ impl Tui {
             state: TuiLogicalState::new(),
             client: client,
         };
-        let result = tui.main_loop();
 
-        let _ = &tui.subthread_sender; // FIXME: this just suppresses a warning
+        {
+            let sender = tui.subthread_sender.clone();
+            tui.client.start_streaming_thread(
+                &StreamId::User, Box::new(move |update| {
+                    if let Err(_) = sender.send(
+                        SubthreadEvent::StreamEv(update)) {
+                    // It would be nice to do something about this
+                    // error, but what _can_ we do? We can hardly send
+                    // an error notification back to the main thread,
+                    // because that communication channel is just what
+                    // we've had a problem with.
+                }
+            }))?;
+        }
+
+        let result = tui.main_loop();
 
         disable_raw_mode()?;
         stdout().execute(LeaveAlternateScreen)?;
@@ -313,6 +336,10 @@ impl Tui {
                         _ => (),
                     }
                 },
+                Ok(SubthreadEvent::StreamEv(update)) => {
+                    self.client.process_stream_update(update);
+                    // FIXME: perhaps also notify state of a change
+                }
             }
         }
     }