From: Ian Jackson Date: Sat, 11 Jul 2020 23:28:31 +0000 (+0100) Subject: reorg TrasnmitUpdate X-Git-Tag: otter-0.2.0~1376 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=2149402c4631349031333c3f61126f66f6d23661;p=otter.git reorg TrasnmitUpdate --- diff --git a/src/sse.rs b/src/sse.rs index 4b57ba57..bc96fda7 100644 --- a/src/sse.rs +++ b/src/sse.rs @@ -25,20 +25,6 @@ struct UpdateReader { ami : Arc>, } -#[derive(Debug,Serialize)] -enum TransmitUpdate<'u> { - Recorded { - piece : VisiblePieceId, - cseq : ClientSequence, - zg : Option, - }, - Piece { - piece : VisiblePieceId, - op : &'u PieceUpdateOp, - }, - Log (&'u LogEntry), -} - #[derive(Error,Debug)] #[error("WouldBlock error misreported!")] struct FlushWouldBlockError{} @@ -70,27 +56,10 @@ impl Read for UpdateReader { let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len(); if next_len > buf.len() { break } - write!(buf, "data: [")?; - for u in &next.us { - let tu = match u { - &PreparedUpdateEntry::Piece - { piece, client, sameclient_cseq : cseq, ref op } - if client== self.client => { - let zg = op.new_z_generation(); - TransmitUpdate::Recorded { piece, cseq, zg } - }, - &PreparedUpdateEntry::Piece { piece, ref op, .. } => { - TransmitUpdate::Piece { piece, op } - }, - PreparedUpdateEntry::Log(logent) => { - TransmitUpdate::Log(&*logent) - }, - }; - serde_json::to_writer(&mut buf, &tu)?; - write!(buf,",")?; - } - serde_json::to_writer(&mut buf, &next.gen)?; - write!(buf, "]\n\ + let tu = next.for_transmit(self.client); + write!(buf, "data: ")?; + serde_json::to_writer(&mut buf, &tu)?; + write!(buf, "\n\ id: {}\n\n", &self.to_send)?; self.to_send.try_increment().unwrap(); diff --git a/src/updates.rs b/src/updates.rs index efe5b886..6baabec5 100644 --- a/src/updates.rs +++ b/src/updates.rs @@ -55,6 +55,28 @@ pub enum PieceUpdateOp { SetZLevel(ZLevel), } +// ---------- for traansmission ---------- + +#[derive(Debug,Serialize)] +pub struct TransmitUpdate<'u> ( + Generation, + Vec>, +); + +#[derive(Debug,Serialize)] +enum TransmitUpdateEntry<'u> { + Recorded { + piece : VisiblePieceId, + cseq : ClientSequence, + zg : Option, + }, + Piece { + piece : VisiblePieceId, + op : &'u PieceUpdateOp, + }, + Log (&'u LogEntry), +} + // ========== implementation ========== // ---------- prepared updates, queued in memory ---------- @@ -87,7 +109,7 @@ impl PreparedUpdateEntry { } } -// ---------- piece updates ---------- +// ---------- PieceUpdatesOp ---------- impl PieceUpdateOp { pub fn new_state(&self) -> Option<&NS> { @@ -122,3 +144,29 @@ impl PieceUpdateOp { } } } + +// ---------- for traansmission ---------- + +impl PreparedUpdate { + pub fn for_transmit(&self, dest : ClientId) -> TransmitUpdate { + let mut ents = vec![]; + for u in &self.us { + let ue = match u { + &PreparedUpdateEntry::Piece + { piece, client, sameclient_cseq : cseq, ref op } + if client == dest => { + let zg = op.new_z_generation(); + TransmitUpdateEntry::Recorded { piece, cseq, zg } + }, + &PreparedUpdateEntry::Piece { piece, ref op, .. } => { + TransmitUpdateEntry::Piece { piece, op } + }, + PreparedUpdateEntry::Log(logent) => { + TransmitUpdateEntry::Log(&*logent) + }, + }; + ents.push(ue); + }; + TransmitUpdate(self.gen, ents) + } +} diff --git a/templates/script.ts b/templates/script.ts index 48b23578..031d6826 100644 --- a/templates/script.ts +++ b/templates/script.ts @@ -463,9 +463,8 @@ function startup() { var es = new EventSource("/_/updates/"+ctoken+'/'+gen); es.onmessage = function(event) { console.log('GOTEVE', event) - var j = JSON.parse(event.data); - var tgen = j.pop(); - for (var m of j) { + var [tgen, ms] = JSON.parse(event.data); + for (var m of ms) { var k = Object.keys(m)[0]; messages[k](m[k]); }