pub use std::collections::btree_set::BTreeSet;
pub use std::convert::{Infallible, TryInto};
pub use std::fs;
-pub use std::io::{self, BufRead, BufReader, ErrorKind, Write};
+pub use std::io::{self, BufRead, BufReader, ErrorKind, Read, Write};
pub use std::iter;
pub use std::mem;
pub use std::net::TcpStream;
pub use std::os::linux::fs::MetadataExt; // todo why linux for st_mode??
pub use std::path;
pub use std::process::{self, Command, Stdio};
+pub use std::sync::mpsc;
pub use std::thread::{self, sleep};
pub use std::time::{self, Duration};
pub gen: Generation,
pub cseq: RawClientSequence,
pub dom: scraper::Html,
- pub updates: UnixStream,
+ pub updates: mpsc::Receiver<Update>,
pub client: reqwest::blocking::Client,
}
use scraper_ext::{HtmlExt, RequestBuilderExt};
+type Update = serde_json::Value;
+
+#[throws(AE)]
+fn updates_parser<R:Read>(input: R, out: &mut mpsc::Sender<Update>) {
+ let mut accum: HashMap<String, String> = default();
+ for l in BufReader::new(input).lines() {
+ let l = l?;
+ if ! l.is_empty() {
+ let mut l = l.splitn(2, ':');
+ let lhs = l.next().unwrap();
+ let rhs = l.next().unwrap();
+ let rhs = rhs.trim_start();
+ let ins = accum.insert(lhs.to_string(), rhs.to_string())
+ .is_none().expect("duplicate field");
+ continue;
+ }
+ let entry = mem::take(&mut accum);
+ let accum = (); // stops accidental use of accum
+ if entry.get("event").map(String::as_str) == Some("commsworking") {
+ eprintln!("commsworking: {}", entry["data"]);
+ } else if let Some(event) = entry.get("event") {
+ panic!("unexpected event: {}", event);
+ } else {
+ let payload = &entry["data"];
+ let payload = serde_json::from_str(payload).unwrap();
+ if out.send(payload).is_err() { break }
+ }
+ }
+}
+
impl Ctx {
#[throws(AE)]
fn connect_player(&self, player: &Player) -> Session {
.subst("@url@/_/updates?ctoken=@ctoken@&gen=@gen@")?
).send()?;
- let (mut writer, reader) = UnixStream::pair()?;
+ let (mut wpipe, rpipe) = UnixStream::pair()?;
thread::spawn(move ||{
eprintln!("copy_to'ing");
- sse.copy_to(&mut writer).unwrap();
+ sse.copy_to(&mut wpipe).unwrap();
eprintln!("copy_to'd!");
});
+ let (mut csend, crecv) = mpsc::channel();
+ thread::spawn(move ||{
+ updates_parser(rpipe, &mut csend).expect("udpates parser failed")
+ });
+
Session {
client, gen,
cseq: 42,
ctoken: RawToken(ctoken.to_string()),
dom: session,
- updates: reader,
+ updates: crecv,
}
}
}