chiark / gitweb /
wip
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Fri, 3 Jul 2020 23:54:53 +0000 (00:54 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Fri, 3 Jul 2020 23:54:53 +0000 (00:54 +0100)
src/bin/server.rs
src/gamestate.rs
src/global.rs
src/imports.rs
src/sse.rs
src/updates.rs

index 958120e926607607686aeb6640e4030a8a4cc957..bd277408c854825f87158ba251acef1f4b1c9348 100644 (file)
@@ -131,7 +131,7 @@ enum OpError {
 struct ApiGrab {
   t : String,
   p : VisiblePieceId,
-  g : Counter,
+  g : Generation,
   s : ClientSequence,
 }
 #[post("/_/api/grab", format="json", data="<form>")]
@@ -155,7 +155,7 @@ fn api_grab(form : Json<ApiGrab>) -> impl response::Responder<'static> {
     if u_gen > q_gen { Err(OpError::Conflict)? }
     if p.held != None { Err(OpError::PieceHeld)? };
     p.held = Some(player);
-    gs.gen += 1;
+    gs.gen.increment();
     let gen = gs.gen;
     if client != p.lastclient {
       p.gen_before_lastclient = p.gen_lastclient;
@@ -208,147 +208,19 @@ enum XUpdate {
   TestCounter { value: usize },
 }
 
-const UPDATE_READER_SIZE : usize = 1024*32;
-const UPDATE_MAX_FRAMING_SIZE : usize = 200;
-const UPDATE_KEEPALIVE : Duration = Duration::from_seconds(14);
-
-#[derive(Debug)]
-struct UpdateReader {
-  player : PlayerId,
-  client : ClientId,
-  to_send : UpdateCounter, // xxx race for setting this initially
-  ami : Arc<Mutex<Instance>>,
-}
-
-impl Read for UpdateReader {
-  fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
-    let amig = self.ami.lock()?;
-    let orig_wanted = buf.len();
-
-    let pu = &mut amig.updates.get(self.player)
-      .ok_or_else(|| io::Error::new
-                  (io::ErrorKind::Other, anyhow!("player gonee")))?;
-    loop {
-      let next = match pu.log.get(self.to_send) {
-        Some(next) => next,  None => { break }
-      };
-      let next_len = UPDATE_MAX_FRAMING_SIZE + next.json.len();
-      if next_len > buf.len() { break }
-
-      if next.client == self.client {
-        write!(buf, r#"
-event: recorded
-data: {{ gen: {}, piece: {}, cseq:{} }}
-"#,
-               &self.gen, &next.piece, &next.client_seq);
-      } else {
-        write!(buf, r#"
-id: {}
-data: {}
-"#,
-               &self.to_send,
-               &next.json);
-      }
-    }
-    loop {
-      let generated = orig_wanted - buf.len();
-      if generated > 0 { return generated }
-
-      amig = self.cv.wait_timeout(amig, UPDATE_KEEPALIVE)?.0;
-      write!(buf,r#"
-: keepalive
-"#);
-    }
-  }
-}
-
-    /*
-    loop {
-                    e
-      let send_from = (||{
-        let l = self.updates.len();
-        let last_probe = match updates.last() {
-          None => return None,
-          Some(&now) if self.last_sent > now.gen => return l+1,
-          _ => l,
-        };
-        let (lo, hi /* half-open */) = loop {
-          let depth = l - last_probe;
-          depth *= 2;
-          if depth > l { break (0, last_probe) }
-          let probe = l - depth;
-          let here = updates[probe];
-          if here.gen < l 
-
-        if let Some(&now) =  {
-          if  { return None }
-        }
-        let probe = inst.updates.len() - 1;
-        let (lo, hi) = loop {
-          if search == 0 { break }
-          search -= 1;
-          tu = inst.updates[search];
-          if 
-
-        let lo = 0;
-        
-      };
-    loop {
-         implement this! 
-    }
-    for (tclient, tcl) in &mut g.clients {
-      if tclient == client {
-        tcl.transmit_update(&Update {
-          gen,
-          u : UpdatePayload::ClientSequence(piece, form.s),
-        });
-      } else {
-        tcl.transmit_update(&update);
-      }          
-    }
-     */
-/*
-
-    thread::sleep(Duration::from_millis(500));
-    let message = XUpdate::TestCounter { value : self.next };
-    let data = serde_json::to_string(&message)?;
-    let data = format!("data: {}\n\n", &data);
-    // eprintln!("want to return into &[;{}] {:?}", buf.len(), &data);
-    self.next += 1;
-    buf[0..data.len()].copy_from_slice(data.as_bytes());
-    Ok(buf.len())
-  }
-}*/
-
-/*
-#[derive(Deserialize)]
-struct APIForm {
-  t : String,
-  c : ClientId,
-}
- */
-
-#[get("/_/updates/<ctoken>")]
-#[throws(RE)]
-fn updates(ctoken : InstanceAccess<ClientId>)
+#[get("/_/updates/<ctoken>/<gen>")]
+#[throws(E)]
+fn updates(ctoken : InstanceAccess<ClientId>, gen: Generation)
            -> impl response::Responder<'static> {
   let iad = ctoken.i;
-  let client = iad.ident;
-  let _ = {
-    let mut ig = iad.g.lock().map_err(|e| anyhow!("lock poison {:?}",&e))?;
-    let _g = &mut ig.gs;
-    let cl = ig.clients.get(client).ok_or_else(|| anyhow!("no client"))?;
-    let _player = cl.player;
-  };
-  let tc = TestCounterInner { next : 0 };
-  let tc = BufReader::new(tc);
-  let ch = response::Stream::chunked(tc, 1);
-  let ct = ContentType::parse_flexible("text/event-stream; charset=utf-8").
-    unwrap();
-  response::content::Content(ct,ch)
+  let content = sse::content(iad);
+  let content = response::Stream::chunked(content, 1);
+  const CTYPE : &str = "text/event-stream; charset=utf-8";
+  let ctype = ContentType::parse_flexible(CTYPE).unwrap();
+  response::content::Content(ctype,content)
 }  
 
-#[get("/_/<leaf>")]
+#[Get("/_/<leaf>")]
 fn resource(leaf : CheckedResourceLeaf) -> io::Result<NamedFile> {
   let template_dir = "templates"; // xxx
   NamedFile::open(format!("{}/{}", template_dir, leaf.safe))
@@ -370,7 +242,7 @@ fn main() {
       loading,
       session,
       resource,
-      updates,
+      sse::updates,
       api_grab,
       api_ungrab,
       api_move,
index 48471c35a6cbe9db7b1c4fa146bc5b824fcfb4d9..8d6da5ac5cbb3022d459411abdd6598afa7f689d 100644 (file)
@@ -5,7 +5,14 @@ slotmap::new_key_type!{
   pub struct PieceId;
 }
 
-pub type Counter = u64;
+#[derive(Copy,Clone,Debug,Ord,PartialOrd,Eq,PartialEq)]
+#[derive(Serialize)]
+#[serde(transparent)]
+pub struct Generation (u64);
+
+impl Generation {
+  pub fn increment(&mut self) { self.0 += 1 }
+}
 
 visible_slotmap_key!{ VisiblePieceId('.') }
 
@@ -49,8 +56,8 @@ pub struct PieceRecord {
   pub face : FaceId,
   pub held : Option<PlayerId>,
   pub lastclient : ClientId,
-  pub gen_lastclient : Counter,
-  pub gen_before_lastclient : Counter,
+  pub gen_lastclient : Generation,
+  pub gen_before_lastclient : Generation,
 }
 
 impl PieceRecord {
@@ -67,7 +74,7 @@ impl PieceRecord {
 pub struct GameState {
   pub pieces : DenseSlotMap<PieceId,PieceRecord>,
   pub players : DenseSlotMap<PlayerId,Player>,
-  pub gen : Counter,
+  pub gen : Generation,
 }
 
 #[derive(Debug)]
@@ -83,10 +90,10 @@ pub fn xxx_gamestate_init() -> GameState {
       face : 0.into(),
       held : None,
       lastclient : Default::default(),
-      gen_lastclient : 0,
-      gen_before_lastclient : 0,
+      gen_lastclient : Generation(0),
+      gen_before_lastclient : Generation(0),
     };
     pieces.insert(pr);
   }
-  GameState { pieces, gen : 1, players : Default::default(),  }
+  GameState { pieces, gen : Generation(1), players : Default::default(),  }
 }
index 3ecdf43f41c3202555eab30157152001e9868ae1..dd384a6c5a8e59251c1f2fc58c0b42010734fed5 100644 (file)
@@ -24,15 +24,15 @@ impl Client {
 }
 
 pub struct PreparedUpdate {
-  gen : Counter,
-  client : ClientId,
-  piece : PieceId,
-  client_seq : ClientSequence,
-  json : String,
+  pub gen : Generation,
+  pub client : ClientId,
+  pub piece : PieceId,
+  pub client_seq : ClientSequence,
+  pub json : String,
 }
 
 pub struct PlayerUpdates {
-  pub log : StableIndexVecDeque<PreparedUpdate,UpdateCounter>,
+  pub log : StableIndexVecDeque<PreparedUpdate,sse::UpdateId>,
   pub cv : Condvar,
 }
 
index a949f8f196238c110af1549651c67a36793cc5d9..625df08f388fcffd0370239d5c54a7c413e79a9e 100644 (file)
@@ -1,6 +1,6 @@
 
 pub use std::io;
-pub use std::io::{BufReader,Read};
+pub use std::io::{BufReader,Read,Write};
 pub use std::fmt::{self,Display,Debug};
 pub use std::thread;
 pub use std::time::Duration;
@@ -44,7 +44,7 @@ pub use crate::gamestate::*;
 pub use crate::pieces::*;
 pub use crate::keydata::*;
 pub use crate::updates::*;
-pub use crate::sse::*;
+pub use crate::sse;
 
 pub type E = anyhow::Error;
 pub type AE = anyhow::Error;
@@ -53,3 +53,4 @@ pub type SvgData = Vec<u8>;
 pub type Coord = isize;
 pub type Pos = [Coord; 2];
 pub type Colour = String;
+
index c090c3e4eeb2c8e3298560af3371b7f56bddb19f..2133453b67a86e635b0d9d3974e8fac89975c5ed 100644 (file)
 
+#![feature(proc_macro_hygiene, decl_macro)]
+
 use crate::imports::*;
 
 #[derive(Copy,Clone,Debug,Eq,PartialEq,Ord,PartialOrd)]
 #[derive(Serialize,Deserialize)]
 #[serde(transparent)]
-pub struct UpdateCounter (i64);
+pub struct UpdateId (i64);
 
 use vecdeque_stableix::StableIndexOffset;
 use std::ops::Neg;
 
-impl Neg for UpdateCounter {
+const UPDATE_READER_SIZE : usize = 1024*32;
+const UPDATE_MAX_FRAMING_SIZE : usize = 200;
+const UPDATE_KEEPALIVE : Duration = Duration::from_secs(14);
+
+impl Neg for UpdateId {
   type Output = Self;
-  fn neg(self) -> Self { UpdateCounter(-self.0) }
+  fn neg(self) -> Self { UpdateId(-self.0) }
 }
 
-impl StableIndexOffset for UpdateCounter {
+impl StableIndexOffset for UpdateId {
   fn try_increment(&mut self) -> Option<()> { self.0.try_increment() }
   fn try_decrement(&mut self) -> Option<()> { self.0.try_decrement() }
   fn index_input(&self, input: Self) -> Option<usize> {
     self.0.index_input(input.0)
   }
   fn index_output(&self, inner: usize) -> Option<Self> {
-    self.0.index_output(inner).map(|v| UpdateCounter(v))
+    self.0.index_output(inner).map(|v| UpdateId(v))
+  }
+  fn zero() -> Self { UpdateId(0) }
+}
+
+struct UpdateReader {
+  player : PlayerId,
+  client : ClientId,
+  to_send : UpdateId, // xxx race for setting this initially
+  ami : Arc<Mutex<Instance>>,
+}
+
+impl Read for UpdateReader {
+  fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
+    let em : fn(&'static str) -> io::Error = |s|
+    io::Error::new(io::ErrorKind::Other, anyhow!(s));
+
+    let amig = self.ami.lock().map_err(|_| em("poison"))?;
+    let orig_wanted = buf.len();
+
+    let pu = &mut amig.updates.get(self.player)
+      .ok_or_else(|| em("player gonee"))?;
+    loop {
+      let next = match pu.log.get(self.to_send) {
+        Some(next) => next,  None => { break }
+      };
+      let next_len = UPDATE_MAX_FRAMING_SIZE + next.json.len();
+      if next_len > buf.len() { break }
+
+      if next.client == self.client {
+        write!(buf, r#"
+event: recorded
+data: {{ gen: {}, piece: {}, cseq:{} }}
+"#,
+               &next.gen, &next.piece, &next.client_seq);
+      } else {
+        write!(buf, r#"
+id: {}
+data: {}
+"#,
+               &self.to_send,
+               &next.json);
+      }
+    }
+    loop {
+      let generated = orig_wanted - buf.len();
+      if generated > 0 { return generated }
+
+      amig = self.cv.wait_timeout(amig, UPDATE_KEEPALIVE)?.0;
+      write!(buf,r#"
+: keepalive
+"#);
+    }
   }
-  fn zero() -> Self { UpdateCounter(0) }
+}
+
+    /*
+    loop {
+                    e
+      let send_from = (||{
+        let l = self.updates.len();
+        let last_probe = match updates.last() {
+          None => return None,
+          Some(&now) if self.last_sent > now.gen => return l+1,
+          _ => l,
+        };
+        let (lo, hi /* half-open */) = loop {
+          let depth = l - last_probe;
+          depth *= 2;
+          if depth > l { break (0, last_probe) }
+          let probe = l - depth;
+          let here = updates[probe];
+          if here.gen < l 
+
+        if let Some(&now) =  {
+          if  { return None }
+        }
+        let probe = inst.updates.len() - 1;
+        let (lo, hi) = loop {
+          if search == 0 { break }
+          search -= 1;
+          tu = inst.updates[search];
+          if 
+
+        let lo = 0;
+        
+      };
+    loop {
+         implement this! 
+    }
+    for (tclient, tcl) in &mut g.clients {
+      if tclient == client {
+        tcl.transmit_update(&Update {
+          gen,
+          u : UpdatePayload::ClientSequence(piece, form.s),
+        });
+      } else {
+        tcl.transmit_update(&update);
+      }          
+    }
+     */
+/*
+
+    thread::sleep(Duration::from_millis(500));
+    let message = XUpdate::TestCounter { value : self.next };
+    let data = serde_json::to_string(&message)?;
+    let data = format!("data: {}\n\n", &data);
+    // eprintln!("want to return into &[;{}] {:?}", buf.len(), &data);
+    self.next += 1;
+    buf[0..data.len()].copy_from_slice(data.as_bytes());
+    Ok(buf.len())
+  }
+}*/
+
+/*
+#[derive(Deserialize)]
+struct APIForm {
+  t : String,
+  c : ClientId,
+}
+ */
+
+#[throws(E)]
+pub fn content(iad : InstanceAccessDetails<ClientId>, gen: Generation)
+  -> impl Read {
+  let client = iad.ident;
+
+  let content = {
+    let mut ig = iad.g.lock().map_err(|e| anyhow!("lock poison {:?}",&e))?;
+    let g = &mut ig.gs;
+    let cl = ig.clients.get(client).ok_or_else(|| anyhow!("no client"))?;
+    let player = cl.player;
+    let ami = iad.g.clone();
+
+    let to_send = UpdateId(42); // xxx
+    
+    UpdateReader { player, client, to_send, ami }
+  };
+  BufReader::with_capacity(UPDATE_READER_SIZE, content)
 }
index 6566d3df348db74079939e0f44084b60e8a4a0a4..a0ce16a18948da36a549221b5addeca8e2478dd4 100644 (file)
@@ -7,7 +7,7 @@ pub struct ClientSequence(u64);
 
 #[derive(Debug,Serialize)]
 pub struct Update {
-  pub gen : Counter,
+  pub gen : Generation,
   pub u : UpdatePayload,
 }