chiark / gitweb /
wip
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 4 Jul 2020 00:12:01 +0000 (01:12 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sat, 4 Jul 2020 00:12:01 +0000 (01:12 +0100)
src/global.rs
src/sse.rs

index dd384a6c5a8e59251c1f2fc58c0b42010734fed5..c81c47e51482b8cfbae7a46943ea206fc3186445 100644 (file)
@@ -33,7 +33,7 @@ pub struct PreparedUpdate {
 
 pub struct PlayerUpdates {
   pub log : StableIndexVecDeque<PreparedUpdate,sse::UpdateId>,
-  pub cv : Condvar,
+  pub cv : Arc<Condvar>,
 }
 
 pub struct Instance {
index 2133453b67a86e635b0d9d3974e8fac89975c5ed..789262abefb500b22c19741176cc237087ff1c5b 100644 (file)
@@ -20,6 +20,12 @@ impl Neg for UpdateId {
   fn neg(self) -> Self { UpdateId(-self.0) }
 }
 
+impl Display for UpdateId {
+  fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
+    Display::fmt(&self.0,f)
+  }
+}
+
 impl StableIndexOffset for UpdateId {
   fn try_increment(&mut self) -> Option<()> { self.0.try_increment() }
   fn try_decrement(&mut self) -> Option<()> { self.0.try_decrement() }
@@ -39,16 +45,26 @@ struct UpdateReader {
   ami : Arc<Mutex<Instance>>,
 }
 
+#[derive(Serialize)]
+struct RecordedConfirmation {
+  gen : Generation,
+  piece : PieceId,
+  cseq : ClientSequence,
+}
+
 impl Read for UpdateReader {
-  fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
-    let em : fn(&'static str) -> io::Error = |s|
-    io::Error::new(io::ErrorKind::Other, anyhow!(s));
+  fn read(&mut self, mut buf: &mut [u8]) -> Result<usize,io::Error> {
+    let em : fn(&'static str) -> io::Error =
+      |s| io::Error::new(io::ErrorKind::Other, anyhow!(s));
 
-    let amig = self.ami.lock().map_err(|_| em("poison"))?;
+    let mut amig = self.ami.lock().map_err(|_| em("poison"))?;
     let orig_wanted = buf.len();
 
     let pu = &mut amig.updates.get(self.player)
       .ok_or_else(|| em("player gonee"))?;
+
+    let cv = pu.cv.clone();
+
     loop {
       let next = match pu.log.get(self.to_send) {
         Some(next) => next,  None => { break }
@@ -59,23 +75,29 @@ impl Read for UpdateReader {
       if next.client == self.client {
         write!(buf, r#"
 event: recorded
-data: {{ gen: {}, piece: {}, cseq:{} }}
-"#,
-               &next.gen, &next.piece, &next.client_seq);
+data: "#)?;
+        serde_json::to_writer(&mut buf, &RecordedConfirmation {
+          gen : next.gen,
+          piece : next.piece,
+          cseq : next.client_seq,
+        })?;
+        write!(buf, r#"
+"#)?;
       } else {
         write!(buf, r#"
 id: {}
 data: {}
 "#,
                &self.to_send,
-               &next.json);
+               &next.json)?;
       }
     }
     loop {
       let generated = orig_wanted - buf.len();
-      if generated > 0 { return generated }
+      if generated > 0 { return Ok(generated) }
 
-      amig = self.cv.wait_timeout(amig, UPDATE_KEEPALIVE)?.0;
+      amig = cv.wait_timeout(amig, UPDATE_KEEPALIVE)
+        .map_err(|_| em("poison"))?.0;
       write!(buf,r#"
 : keepalive
 "#);
@@ -156,11 +178,12 @@ pub fn content(iad : InstanceAccessDetails<ClientId>, gen: Generation)
 
   let content = {
     let mut ig = iad.g.lock().map_err(|e| anyhow!("lock poison {:?}",&e))?;
-    let g = &mut ig.gs;
+    let _g = &mut ig.gs;
     let cl = ig.clients.get(client).ok_or_else(|| anyhow!("no client"))?;
     let player = cl.player;
     let ami = iad.g.clone();
 
+    let _ = gen;
     let to_send = UpdateId(42); // xxx
     
     UpdateReader { player, client, to_send, ami }