use reqwest::Url;
use std::collections::{HashMap, VecDeque};
+use std::io::Read;
use super::auth::{AuthConfig,AuthError};
use super::types::*;
User(String, Boosts, Replies),
}
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum StreamId {
+ User,
+}
+
+#[derive(Debug, PartialEq, Eq, Clone)]
+pub enum StreamResponse {
+ Line(String),
+ BadUTF8,
+ EOF,
+}
+
+#[derive(Debug)]
+pub struct StreamUpdate {
+ pub id: StreamId,
+ pub response: StreamResponse,
+}
+
#[derive(Debug)]
pub struct Feed {
pub ids: VecDeque<String>, // ids, whether of statuses, accounts or what
}
}
- fn api_request(&self, req: Req) ->
+ fn api_request_cl(&self, client: &reqwest::blocking::Client, req: Req) ->
Result<(String, reqwest::blocking::RequestBuilder), ClientError>
{
if req.method != reqwest::Method::GET && !self.permit_write {
urlstr.clone(), e.to_string())),
}?;
- Ok((urlstr, self.client.request(req.method, url)
+ Ok((urlstr, client.request(req.method, url)
.bearer_auth(&self.auth.user_token)))
}
+ fn api_request(&self, req: Req) ->
+ Result<(String, reqwest::blocking::RequestBuilder), ClientError>
+ {
+ self.api_request_cl(&self.client, req)
+ }
+
pub fn cache_account(&mut self, ac: &Account) {
self.accounts.insert(ac.id.to_string(), ac.clone());
}
self.feeds.get(id).expect(
"should only ever borrow feeds that have been fetched")
}
+
+ pub fn start_streaming_thread<Recv: Fn(StreamUpdate) + Send + 'static>(
+ &self, id: &StreamId, receiver: Box<Recv>) -> Result<(), ClientError>
+ {
+ let req = match id {
+ StreamId::User => Req::get("streaming/user"),
+ };
+
+ let client = reqwest::blocking::Client::new();
+ let (_url, req) = self.api_request_cl(&client, req)?;
+ let mut rsp = req.send()?;
+
+ let id = id.clone();
+
+ let _joinhandle = std::thread::spawn(move || {
+ let mut vec: Vec<u8> = Vec::new();
+
+ const BUFSIZE: usize = 4096;
+ let mut buf: [u8; BUFSIZE] = [0; BUFSIZE];
+ while let Ok(sz) = rsp.read(&mut buf) {
+ let read = &buf[..sz];
+ vec.extend_from_slice(read);
+ vec = 'outer: loop {
+ for line in vec.split_inclusive(|c| *c == 10) {
+ if !line.ends_with(&[10]) {
+ let mut newvec = Vec::new();
+ newvec.extend_from_slice(line);
+ break 'outer newvec;
+ } else if line.starts_with(&[':' as u8]) {
+ // Ignore lines starting with ':': in the
+ // Mastodon streaming APIs those are
+ // heartbeat events whose sole purpose is
+ // to avoid the connection timing out. We
+ // don't communicate them back to the main
+ // thread.
+ } else {
+ let rsp = match std::str::from_utf8(&line) {
+ Err(_) => StreamResponse::BadUTF8,
+ Ok(d) => StreamResponse::Line(d.to_owned()),
+ };
+ receiver(StreamUpdate {
+ id: id.clone(),
+ response: rsp
+ });
+ }
+ }
+ // If we didn't get a partial line inside the loop, then
+ // we must have an empty buffer here
+ break Vec::new();
+ };
+ }
+
+ receiver(StreamUpdate {
+ id: id.clone(),
+ response: StreamResponse::EOF
+ });
+ });
+
+ Ok(())
+ }
+
+ pub fn process_stream_update(&mut self, _up: StreamUpdate) {
+ }
}
use mastodonochrome::tui::Tui;
-/*
-#[allow(unused)]
-fn streaming() -> Result<(), mastodonochrome::OurError> {
- let auth = AuthConfig::load()?;
-
- let client = reqwest::blocking::Client::new();
- let req = client.get(auth.instance_url + "/api/v1/streaming/user")
- .bearer_auth(auth.user_token);
-
- let mut rsp = match req.send() {
- Err(e) => Err(OurError::Fatal(
- format!("unable to make HTTP request: {}", e))),
- Ok(d) => Ok(d),
- }?;
-
- let mut vec: Vec<u8> = Vec::new();
-
- const BUFSIZE: usize = 4096;
- let mut buf: [u8; BUFSIZE] = [0; BUFSIZE];
- while let Ok(sz) = rsp.read(&mut buf) {
- let read = &buf[..sz];
- vec.extend_from_slice(read);
- vec = 'outer: loop {
- for line in vec.split_inclusive(|c| *c == 10) {
- if !line.ends_with(&[10]) {
- let mut newvec = Vec::new();
- newvec.extend_from_slice(line);
- break 'outer newvec;
- } else {
- match std::str::from_utf8(&line) {
- Err(e) => { dbg!(e); () },
- Ok(d) => { dbg!(d); () },
- };
- }
- }
- // If we didn't get a partial line inside the loop, then
- // we must have an empty buffer here
- break Vec::new();
- };
- }
-
- Ok(())
-}
-*/
-
fn main() -> ExitCode {
match Tui::run() {
Ok(_) => ExitCode::from(0),
use unicode_width::UnicodeWidthStr;
use super::activity_stack::*;
-use super::client::{Client, ClientError};
+use super::client::{Client, ClientError, StreamId, StreamUpdate};
use super::coloured_string::{ColouredString, ColouredStringSlice};
use super::menu::*;
use super::file::*;
}
}
+#[derive(Debug)]
enum SubthreadEvent {
TermEv(Event),
+ StreamEv(StreamUpdate),
}
pub enum PhysicalAction {
}
}
}
+impl From<ClientError> for TuiError {
+ fn from(err: ClientError) -> Self {
+ TuiError {
+ message: err.to_string(),
+ }
+ }
+}
impl std::fmt::Display for TuiError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) ->
state: TuiLogicalState::new(),
client: client,
};
- let result = tui.main_loop();
- let _ = &tui.subthread_sender; // FIXME: this just suppresses a warning
+ {
+ let sender = tui.subthread_sender.clone();
+ tui.client.start_streaming_thread(
+ &StreamId::User, Box::new(move |update| {
+ if let Err(_) = sender.send(
+ SubthreadEvent::StreamEv(update)) {
+ // It would be nice to do something about this
+ // error, but what _can_ we do? We can hardly send
+ // an error notification back to the main thread,
+ // because that communication channel is just what
+ // we've had a problem with.
+ }
+ }))?;
+ }
+
+ let result = tui.main_loop();
disable_raw_mode()?;
stdout().execute(LeaveAlternateScreen)?;
_ => (),
}
},
+ Ok(SubthreadEvent::StreamEv(update)) => {
+ self.client.process_stream_update(update);
+ // FIXME: perhaps also notify state of a change
+ }
}
}
}