From: Ian Jackson Date: Sat, 4 Jul 2020 00:12:01 +0000 (+0100) Subject: wip X-Git-Tag: otter-0.2.0~1489 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=7c8a650924a802ee5a085d74a2776dc088d64443;p=otter.git wip --- diff --git a/src/global.rs b/src/global.rs index dd384a6c..c81c47e5 100644 --- a/src/global.rs +++ b/src/global.rs @@ -33,7 +33,7 @@ pub struct PreparedUpdate { pub struct PlayerUpdates { pub log : StableIndexVecDeque, - pub cv : Condvar, + pub cv : Arc, } pub struct Instance { diff --git a/src/sse.rs b/src/sse.rs index 2133453b..789262ab 100644 --- a/src/sse.rs +++ b/src/sse.rs @@ -20,6 +20,12 @@ impl Neg for UpdateId { fn neg(self) -> Self { UpdateId(-self.0) } } +impl Display for UpdateId { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + Display::fmt(&self.0,f) + } +} + impl StableIndexOffset for UpdateId { fn try_increment(&mut self) -> Option<()> { self.0.try_increment() } fn try_decrement(&mut self) -> Option<()> { self.0.try_decrement() } @@ -39,16 +45,26 @@ struct UpdateReader { ami : Arc>, } +#[derive(Serialize)] +struct RecordedConfirmation { + gen : Generation, + piece : PieceId, + cseq : ClientSequence, +} + impl Read for UpdateReader { - fn read(&mut self, mut buf: &mut [u8]) -> io::Result { - let em : fn(&'static str) -> io::Error = |s| - io::Error::new(io::ErrorKind::Other, anyhow!(s)); + fn read(&mut self, mut buf: &mut [u8]) -> Result { + let em : fn(&'static str) -> io::Error = + |s| io::Error::new(io::ErrorKind::Other, anyhow!(s)); - let amig = self.ami.lock().map_err(|_| em("poison"))?; + let mut amig = self.ami.lock().map_err(|_| em("poison"))?; let orig_wanted = buf.len(); let pu = &mut amig.updates.get(self.player) .ok_or_else(|| em("player gonee"))?; + + let cv = pu.cv.clone(); + loop { let next = match pu.log.get(self.to_send) { Some(next) => next, None => { break } @@ -59,23 +75,29 @@ impl Read for UpdateReader { if next.client == self.client { write!(buf, r#" event: recorded -data: {{ gen: {}, piece: {}, cseq:{} }} -"#, - &next.gen, &next.piece, &next.client_seq); +data: "#)?; + serde_json::to_writer(&mut buf, &RecordedConfirmation { + gen : next.gen, + piece : next.piece, + cseq : next.client_seq, + })?; + write!(buf, r#" +"#)?; } else { write!(buf, r#" id: {} data: {} "#, &self.to_send, - &next.json); + &next.json)?; } } loop { let generated = orig_wanted - buf.len(); - if generated > 0 { return generated } + if generated > 0 { return Ok(generated) } - amig = self.cv.wait_timeout(amig, UPDATE_KEEPALIVE)?.0; + amig = cv.wait_timeout(amig, UPDATE_KEEPALIVE) + .map_err(|_| em("poison"))?.0; write!(buf,r#" : keepalive "#); @@ -156,11 +178,12 @@ pub fn content(iad : InstanceAccessDetails, gen: Generation) let content = { let mut ig = iad.g.lock().map_err(|e| anyhow!("lock poison {:?}",&e))?; - let g = &mut ig.gs; + let _g = &mut ig.gs; let cl = ig.clients.get(client).ok_or_else(|| anyhow!("no client"))?; let player = cl.player; let ami = iad.g.clone(); + let _ = gen; let to_send = UpdateId(42); // xxx UpdateReader { player, client, to_send, ami }