From: Ian Jackson Date: Fri, 26 Feb 2021 23:58:40 +0000 (+0000) Subject: apitest: parse updates X-Git-Tag: otter-0.4.0~344 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=520dd16c139d121fc40d91fb0f260d88a797c304;p=otter.git apitest: parse updates Signed-off-by: Ian Jackson --- diff --git a/apitest.rs b/apitest.rs index a367ddc2..4d2f4257 100644 --- a/apitest.rs +++ b/apitest.rs @@ -40,7 +40,7 @@ pub use std::collections::hash_map::HashMap; pub use std::collections::btree_set::BTreeSet; pub use std::convert::{Infallible, TryInto}; pub use std::fs; -pub use std::io::{self, BufRead, BufReader, ErrorKind, Write}; +pub use std::io::{self, BufRead, BufReader, ErrorKind, Read, Write}; pub use std::iter; pub use std::mem; pub use std::net::TcpStream; @@ -51,6 +51,7 @@ pub use std::os::unix::fs::DirBuilderExt; pub use std::os::linux::fs::MetadataExt; // todo why linux for st_mode?? pub use std::path; pub use std::process::{self, Command, Stdio}; +pub use std::sync::mpsc; pub use std::thread::{self, sleep}; pub use std::time::{self, Duration}; diff --git a/apitest/at-otter.rs b/apitest/at-otter.rs index cd61d337..b412d2a2 100644 --- a/apitest/at-otter.rs +++ b/apitest/at-otter.rs @@ -26,7 +26,7 @@ struct Session { pub gen: Generation, pub cseq: RawClientSequence, pub dom: scraper::Html, - pub updates: UnixStream, + pub updates: mpsc::Receiver, pub client: reqwest::blocking::Client, } @@ -92,6 +92,36 @@ mod scraper_ext { use scraper_ext::{HtmlExt, RequestBuilderExt}; +type Update = serde_json::Value; + +#[throws(AE)] +fn updates_parser(input: R, out: &mut mpsc::Sender) { + let mut accum: HashMap = default(); + for l in BufReader::new(input).lines() { + let l = l?; + if ! l.is_empty() { + let mut l = l.splitn(2, ':'); + let lhs = l.next().unwrap(); + let rhs = l.next().unwrap(); + let rhs = rhs.trim_start(); + let ins = accum.insert(lhs.to_string(), rhs.to_string()) + .is_none().expect("duplicate field"); + continue; + } + let entry = mem::take(&mut accum); + let accum = (); // stops accidental use of accum + if entry.get("event").map(String::as_str) == Some("commsworking") { + eprintln!("commsworking: {}", entry["data"]); + } else if let Some(event) = entry.get("event") { + panic!("unexpected event: {}", event); + } else { + let payload = &entry["data"]; + let payload = serde_json::from_str(payload).unwrap(); + if out.send(payload).is_err() { break } + } + } +} + impl Ctx { #[throws(AE)] fn connect_player(&self, player: &Player) -> Session { @@ -120,19 +150,24 @@ impl Ctx { .subst("@url@/_/updates?ctoken=@ctoken@&gen=@gen@")? ).send()?; - let (mut writer, reader) = UnixStream::pair()?; + let (mut wpipe, rpipe) = UnixStream::pair()?; thread::spawn(move ||{ eprintln!("copy_to'ing"); - sse.copy_to(&mut writer).unwrap(); + sse.copy_to(&mut wpipe).unwrap(); eprintln!("copy_to'd!"); }); + let (mut csend, crecv) = mpsc::channel(); + thread::spawn(move ||{ + updates_parser(rpipe, &mut csend).expect("udpates parser failed") + }); + Session { client, gen, cseq: 42, ctoken: RawToken(ctoken.to_string()), dom: session, - updates: reader, + updates: crecv, } } }