chiark / gitweb /
apitest: parse updates
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Fri, 26 Feb 2021 23:58:40 +0000 (23:58 +0000)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Fri, 26 Feb 2021 23:58:40 +0000 (23:58 +0000)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
apitest.rs
apitest/at-otter.rs

index a367ddc29e62a1bc96471f7ddc3f0ee94d8ebe48..4d2f4257e01d1a48400f4ff4ace6ee2d03a67c7f 100644 (file)
@@ -40,7 +40,7 @@ pub use std::collections::hash_map::HashMap;
 pub use std::collections::btree_set::BTreeSet;
 pub use std::convert::{Infallible, TryInto};
 pub use std::fs;
-pub use std::io::{self, BufRead, BufReader, ErrorKind, Write};
+pub use std::io::{self, BufRead, BufReader, ErrorKind, Read, Write};
 pub use std::iter;
 pub use std::mem;
 pub use std::net::TcpStream;
@@ -51,6 +51,7 @@ pub use std::os::unix::fs::DirBuilderExt;
 pub use std::os::linux::fs::MetadataExt; // todo why linux for st_mode??
 pub use std::path;
 pub use std::process::{self, Command, Stdio};
+pub use std::sync::mpsc;
 pub use std::thread::{self, sleep};
 pub use std::time::{self, Duration};
 
index cd61d337fb9eed5fb858637289d81c3461111f8e..b412d2a209655010e030ec0353a04f383029638b 100644 (file)
@@ -26,7 +26,7 @@ struct Session {
   pub gen: Generation,
   pub cseq: RawClientSequence,
   pub dom: scraper::Html,
-  pub updates: UnixStream,
+  pub updates: mpsc::Receiver<Update>,
   pub client: reqwest::blocking::Client,
 }
 
@@ -92,6 +92,36 @@ mod scraper_ext {
 
 use scraper_ext::{HtmlExt, RequestBuilderExt};
 
+type Update = serde_json::Value;
+
+#[throws(AE)]
+fn updates_parser<R:Read>(input: R, out: &mut mpsc::Sender<Update>) {
+  let mut accum: HashMap<String, String> = default();
+  for l in BufReader::new(input).lines() {
+    let l = l?;
+    if ! l.is_empty() {
+      let mut l = l.splitn(2, ':');
+      let lhs = l.next().unwrap();
+      let rhs = l.next().unwrap();
+      let rhs = rhs.trim_start();
+      let ins = accum.insert(lhs.to_string(), rhs.to_string())
+        .is_none().expect("duplicate field");
+      continue;
+    }
+    let entry = mem::take(&mut accum);
+    let accum = (); // stops accidental use of accum
+    if entry.get("event").map(String::as_str) == Some("commsworking") {
+      eprintln!("commsworking: {}", entry["data"]);
+    } else if let Some(event) = entry.get("event") {
+      panic!("unexpected event: {}", event);
+    } else {
+      let payload = &entry["data"];
+      let payload = serde_json::from_str(payload).unwrap();
+      if out.send(payload).is_err() { break }
+    }
+  }
+}
+
 impl Ctx {
   #[throws(AE)]
   fn connect_player(&self, player: &Player) -> Session {
@@ -120,19 +150,24 @@ impl Ctx {
         .subst("@url@/_/updates?ctoken=@ctoken@&gen=@gen@")?
     ).send()?;
 
-    let (mut writer, reader) = UnixStream::pair()?;
+    let (mut wpipe, rpipe) = UnixStream::pair()?;
     thread::spawn(move ||{
       eprintln!("copy_to'ing");
-      sse.copy_to(&mut writer).unwrap();
+      sse.copy_to(&mut wpipe).unwrap();
       eprintln!("copy_to'd!"); 
     });
 
+    let (mut csend, crecv) = mpsc::channel();
+    thread::spawn(move ||{
+      updates_parser(rpipe, &mut csend).expect("udpates parser failed")
+    });
+
     Session {
       client, gen,
       cseq: 42,
       ctoken: RawToken(ctoken.to_string()),
       dom: session,
-      updates: reader,
+      updates: crecv,
     }
   }
 }