From: Ian Jackson Date: Sun, 5 Jul 2020 11:50:38 +0000 (+0100) Subject: clean up sse X-Git-Tag: otter-0.2.0~1453 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=ef9329cc446d2c675291531932e6eef86b32edb7;p=otter.git clean up sse --- diff --git a/src/imports.rs b/src/imports.rs index 263002f5..0d63a6ee 100644 --- a/src/imports.rs +++ b/src/imports.rs @@ -13,6 +13,7 @@ pub use std::str::FromStr; pub use std::iter; pub use std::iter::repeat_with; pub use std::collections::VecDeque; +pub use std::num::Wrapping; pub use thiserror::Error; pub use anyhow::{Context,anyhow}; diff --git a/src/sse.rs b/src/sse.rs index d258fe0f..f99fc601 100644 --- a/src/sse.rs +++ b/src/sse.rs @@ -41,6 +41,7 @@ struct UpdateReader { client : ClientId, need_flush : bool, init_confirmation_send : iter::Once<()>, + keepalives : Wrapping, to_send : UpdateId, ami : Arc>, } @@ -66,11 +67,9 @@ impl Read for UpdateReader { let mut buf = orig_buf.as_mut(); if self.init_confirmation_send.next().is_some() { - write!(buf, r#" -data: server online - -"#)?; -/*event: commsworking*/ + write!(buf, "event: commsworking\n\ + data: server online {} {} G{}\n\n", + self.player, self.client, self.to_send)?; } let pu = &mut amig.updates.get(self.player) @@ -86,21 +85,16 @@ data: server online if next_len > buf.len() { break } if next.client == self.client { - write!(buf, r#" -event: recorded -data: "#)?; + write!(buf, "event: recorded\n\ + data: foo\n\n")?; serde_json::to_writer(&mut *buf, &RecordedConfirmation { gen : next.gen, piece : next.piece, cseq : next.client_seq, })?; - write!(buf, r#" -"#)?; } else { - write!(buf, r#" -id: {} -data: {} -"#, + write!(buf, "id: {}\n\ + data: {}\n\n", &self.to_send, &next.json)?; } @@ -109,7 +103,7 @@ data: {} loop { let generated = orig_wanted - buf.len(); if generated > 0 { - eprintln!("SENDING {} to {:?} {:?}:\n{}\n", + eprintln!("SENDING {} to {:?} {:?}: {:?}", generated, &self.player, &self.client, str::from_utf8(&orig_buf[0..generated]).unwrap()); self.need_flush = true; @@ -124,9 +118,13 @@ data: {} amig = cv.wait_timeout(amig, UPDATE_KEEPALIVE) .map_err(|_| em("poison"))?.0; - write!(buf,r#" -: keepalive -"#)?; + + write!(buf, "event: commsworking\n\ + data: server online {} {} G{} K{}\n\n", + self.player, self.client, self.to_send, self.keepalives)?; + self.keepalives += Wrapping(1); +/* + write!(buf,": keepalive\n\n")?; */ } } } @@ -198,7 +196,7 @@ struct APIForm { */ #[derive(Debug)] -struct DebugReader(T); +pub struct DebugReader(pub T); impl Read for DebugReader { fn read(&mut self, buf: &mut [u8]) -> Result { @@ -236,9 +234,11 @@ eprintln!("updates content iad={:?} player={:?} cl={:?} updates={:?}", UpdateReader { player, client, to_send, ami, need_flush : false, + keepalives : Wrapping(0), init_confirmation_send : iter::once(()), } }; let content = BufReader::with_capacity(UPDATE_READER_SIZE, content); - DebugReader(content) + //DebugReader(content) + content } diff --git a/templates/script.js b/templates/script.js index 588760f3..5d773212 100644 --- a/templates/script.js +++ b/templates/script.js @@ -197,12 +197,13 @@ function startup() { var k = Object.keys(j)[0]; messages[k](j[k]); } - es.oncommsworking = function(event) { - status_node.innerHTML = data.value; - } - es.onrecorded = function(event) { + es.addEventListener('commsworking', function(event) { + console.log('GOTDATA'); + status_node.innerHTML = event.data; + }); + es.addEventListener('recorded', function(event) { xxx_recorded(); - } + }); es.onerror = function(e) { console.log('FOO',e,es); json_report_error({