From 7a9a1059fcc599343e401c6ac504288052cf7084 Mon Sep 17 00:00:00 2001 From: Ian Jackson Date: Wed, 30 Sep 2020 22:59:28 +0100 Subject: [PATCH] New update messages for different kinds of update Signed-off-by: Ian Jackson --- src/api.rs | 3 +- src/cmdlistener.rs | 6 ++- src/global.rs | 3 +- src/updates.rs | 102 +++++++++++++++++++++++++++++++++++---------- 4 files changed, 88 insertions(+), 26 deletions(-) diff --git a/src/api.rs b/src/api.rs index 653d7141..41b716e3 100644 --- a/src/api.rs +++ b/src/api.rs @@ -140,7 +140,8 @@ fn api_piece_op(form : Json>) Err(err)?; }, Ok((update, logents)) => { - let mut buf = PrepareUpdatesBuffer::new(g, Some((client, form.cseq)), + let mut buf = PrepareUpdatesBuffer::new(g, + IsResponseToClientOp::Predictable((client, form.cseq)), Some(1 + logents.len())); buf.piece_update(piece, update, &lens); diff --git a/src/cmdlistener.rs b/src/cmdlistener.rs index 95d93170..bd9f391c 100644 --- a/src/cmdlistener.rs +++ b/src/cmdlistener.rs @@ -370,7 +370,8 @@ impl UpdateHandler { }, Online => { let estimate = updates.pcs.len() + updates.log.len(); - let mut buf = PrepareUpdatesBuffer::new(g, None, Some(estimate)); + let irc = IsResponseToClientOp::No; + let mut buf = PrepareUpdatesBuffer::new(g, irc, Some(estimate)); for (upiece, uuop) in updates.pcs { let lens = TransparentLens { }; buf.piece_update(upiece, uuop, &lens); @@ -386,7 +387,8 @@ impl UpdateHandler { use UpdateHandler::*; match self { Bulk(bulk) => { - let mut buf = PrepareUpdatesBuffer::new(g, None, None); + let irc = IsResponseToClientOp::No; + let mut buf = PrepareUpdatesBuffer::new(g, irc, None); for (upiece, uuop) in bulk.pieces { let lens = TransparentLens { }; buf.piece_update(upiece, uuop, &lens); diff --git a/src/global.rs b/src/global.rs index 469b0d49..710efa3e 100644 --- a/src/global.rs +++ b/src/global.rs @@ -448,7 +448,8 @@ impl InstanceGuard<'_> { let lens = TransparentLens { }; let estimate = updated_pieces.len() + 1; - let mut buf = PrepareUpdatesBuffer::new(self, None, Some(estimate)); + let mut buf = PrepareUpdatesBuffer::new( + self, IsResponseToClientOp::No , Some(estimate)); for &piece in &updated_pieces { buf.piece_update(piece, PieceUpdateOp::Modify(()), &lens); } diff --git a/src/updates.rs b/src/updates.rs index a46e8456..77ba0912 100644 --- a/src/updates.rs +++ b/src/updates.rs @@ -42,8 +42,7 @@ pub struct PreparedUpdate { #[derive(Debug)] pub enum PreparedUpdateEntry { Piece { - client : ClientId, - sameclient_cseq : ClientSequence, + by_client: IsResponseToClientOp<(ClientId, ClientSequence)>, piece : VisiblePieceId, op : PieceUpdateOp, }, @@ -90,11 +89,17 @@ enum TransmitUpdateEntry<'u> { piece : VisiblePieceId, cseq : ClientSequence, zg : Option, + svg: Option<&'u Html>, // IsResponseToClientOp::UpdateSvg }, Piece { piece : VisiblePieceId, op : PieceUpdateOp<&'u PreparedPieceState>, }, + RecordedUnpredictable { + piece : VisiblePieceId, + cseq : ClientSequence, + ns: &'u PreparedPieceState, + }, SetTableSize(Pos), Log (&'u LogEntry), Error(&'u ErrorSignaledViaUpdate), @@ -234,25 +239,35 @@ impl PieceUpdateOp { pub struct PrepareUpdatesBuffer<'r> { g : &'r mut Instance, us : Vec, - by_client : ClientId, - cseq : ClientSequence, + by_client : IsResponseToClientOp<(ClientId, ClientSequence)>, gen : Option, } +#[derive(Debug,Copy,Clone)] +pub enum IsResponseToClientOp { + /// In PROTOCOL.md terms, a Server update + No, + /// In PROTOCOL.md terms, a Client update + Predictable(T), + /// In PROTOCOL.md terms, a Client update which also updates + /// the visible piece image (which is just server-controlled). + UpdateSvg(T), + /// In PROTOCOL.md terms, a Client update which results in + /// an immediate Server update. When the client knows this + /// is going to happen it can help the user avoid conflicts. + Unpredictable(T), +} + impl<'r> PrepareUpdatesBuffer<'r> { pub fn new(g: &'r mut Instance, - by_client: Option<(ClientId, ClientSequence)>, + by_client: IsResponseToClientOp<(ClientId, ClientSequence)>, estimate: Option) -> Self { - let by_client = by_client.unwrap_or( - (Default::default(), ClientSequence(0)) - ); let us = estimate.map_or(vec![], Vec::with_capacity); PrepareUpdatesBuffer { gen: None, - by_client: by_client.0, cseq: by_client.1, - us, g, + by_client, us, g, } } @@ -265,7 +280,7 @@ impl<'r> PrepareUpdatesBuffer<'r> { } fn new_for_error(ig: &'r mut Instance) -> Self { - Self::new(ig, None, Some(1)) + Self::new(ig, IsResponseToClientOp::No, Some(1)) } pub fn piece_report_error(ig: &mut Instance, error: PieceOpError, piece: PieceId, @@ -299,6 +314,8 @@ impl<'r> PrepareUpdatesBuffer<'r> { fn piece_update_fallible(&mut self, piece: PieceId, update: PieceUpdateOp<()>, lens: &dyn Lens) -> PreparedUpdateEntry { + type IRC = IsResponseToClientOp<(ClientId, ClientSequence)>; + let gen = self.gen(); let gs = &mut self.g.gs; @@ -309,9 +326,14 @@ impl<'r> PrepareUpdatesBuffer<'r> { (Ok(pc), Some(p)) => { gs.max_z.update_max(pc.zlevel.z); - if self.by_client != pc.lastclient { + if let Some(new_lastclient) = match self.by_client { + IRC::Predictable((tclient,_)) | + IRC::UpdateSvg((tclient,_)) + => if tclient == pc.lastclient { None } else { Some(tclient) } + _ => Some(Default::default()), + } { pc.gen_before_lastclient = pc.gen; - pc.lastclient = self.by_client; + pc.lastclient = new_lastclient; } pc.gen = gen; let pri_for_all = lens.svg_pri(piece,pc,Default::default()); @@ -333,8 +355,7 @@ impl<'r> PrepareUpdatesBuffer<'r> { PreparedUpdateEntry::Piece { piece, - client : self.by_client, - sameclient_cseq : self.cseq, + by_client : self.by_client, op : update, } } @@ -391,7 +412,21 @@ impl<'r> Drop for PrepareUpdatesBuffer<'r> { // ---------- for traansmission ---------- +type IRC = IsResponseToClientOp<(ClientId, ClientSequence)>; + impl PreparedUpdate { + fn is_client(by_client: &IsResponseToClientOp<(ClientId, ClientSequence)>, + ref_client: ClientId) -> Option { + use IsResponseToClientOp::*; + let &(c,cseq) = match by_client { + Predictable (x) => x, + UpdateSvg (x) => x, + Unpredictable(x) => x, + No => return None, + }; + if c == ref_client { Some(cseq) } else { None } + } + pub fn for_transmit(&self, dest : ClientId) -> TransmitUpdate { type ESVU = ErrorSignaledViaUpdate; type PUE = PreparedUpdateEntry; @@ -400,13 +435,36 @@ impl PreparedUpdate { for u in &self.us { trace!("for_transmit to={:?} {:?}", dest, &u); let ue = match u { - &PUE::Piece { piece, client, sameclient_cseq : cseq, ref op } - if client == dest => { - let zg = op.new_z_generation(); - TUE::Recorded { piece, cseq, zg } - }, - &PUE::Piece { piece, ref op, .. } => { - TUE::Piece { piece, op: op.map_ref_new_state() } + &PUE::Piece { piece, by_client, ref op } => { + let ns = ||op.new_state(); + enum FTG<'u> { + Recorded(ClientSequence, Option<&'u PreparedPieceState>), + Exactly(TransmitUpdateEntry<'u>), + Piece, + }; + let ftg = if let Some(cseq) = Self::is_client(&by_client, dest) { + match by_client { + IRC::Predictable(_) => FTG::Recorded(cseq, None), + IRC::UpdateSvg(_) => FTG::Recorded(cseq, ns()), + IRC::Unpredictable(_) => if let Some(ns) = ns() { + FTG::Exactly(TUE::RecordedUnpredictable { piece, cseq, ns }) + } else { + error!("internal error: for_transmit PreparedUpdateEntry::Piece with RecordedUnpredictable but PieceOp no NS"); + FTG::Piece + } + _ => unreachable!(), + } + } else { + FTG::Piece + }; + match ftg { + FTG::Recorded(cseq, ns) => { + let zg = op.new_z_generation(); + TUE::Recorded { piece, cseq, zg, svg: ns.map(|ns| &ns.svg) } + }, + FTG::Piece => TUE::Piece { piece, op: op.map_ref_new_state() }, + FTG::Exactly(x) => x, + } }, PUE::Log(logent) => { TUE::Log(&logent) -- 2.30.2