struct ApiGrab {
t : String,
p : VisiblePieceId,
- g : Counter,
+ g : Generation,
s : ClientSequence,
}
#[post("/_/api/grab", format="json", data="<form>")]
if u_gen > q_gen { Err(OpError::Conflict)? }
if p.held != None { Err(OpError::PieceHeld)? };
p.held = Some(player);
- gs.gen += 1;
+ gs.gen.increment();
let gen = gs.gen;
if client != p.lastclient {
p.gen_before_lastclient = p.gen_lastclient;
TestCounter { value: usize },
}
-const UPDATE_READER_SIZE : usize = 1024*32;
-const UPDATE_MAX_FRAMING_SIZE : usize = 200;
-const UPDATE_KEEPALIVE : Duration = Duration::from_seconds(14);
-
-#[derive(Debug)]
-struct UpdateReader {
- player : PlayerId,
- client : ClientId,
- to_send : UpdateCounter, // xxx race for setting this initially
- ami : Arc<Mutex<Instance>>,
-}
-
-impl Read for UpdateReader {
- fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
- let amig = self.ami.lock()?;
- let orig_wanted = buf.len();
-
- let pu = &mut amig.updates.get(self.player)
- .ok_or_else(|| io::Error::new
- (io::ErrorKind::Other, anyhow!("player gonee")))?;
- loop {
- let next = match pu.log.get(self.to_send) {
- Some(next) => next, None => { break }
- };
- let next_len = UPDATE_MAX_FRAMING_SIZE + next.json.len();
- if next_len > buf.len() { break }
-
- if next.client == self.client {
- write!(buf, r#"
-event: recorded
-data: {{ gen: {}, piece: {}, cseq:{} }}
-"#,
- &self.gen, &next.piece, &next.client_seq);
- } else {
- write!(buf, r#"
-id: {}
-data: {}
-"#,
- &self.to_send,
- &next.json);
- }
- }
- loop {
- let generated = orig_wanted - buf.len();
- if generated > 0 { return generated }
-
- amig = self.cv.wait_timeout(amig, UPDATE_KEEPALIVE)?.0;
- write!(buf,r#"
-: keepalive
-"#);
- }
- }
-}
-
- /*
- loop {
- e
- let send_from = (||{
- let l = self.updates.len();
- let last_probe = match updates.last() {
- None => return None,
- Some(&now) if self.last_sent > now.gen => return l+1,
- _ => l,
- };
- let (lo, hi /* half-open */) = loop {
- let depth = l - last_probe;
- depth *= 2;
- if depth > l { break (0, last_probe) }
- let probe = l - depth;
- let here = updates[probe];
- if here.gen < l
-
- if let Some(&now) = {
- if { return None }
- }
- let probe = inst.updates.len() - 1;
- let (lo, hi) = loop {
- if search == 0 { break }
- search -= 1;
- tu = inst.updates[search];
- if
-
- let lo = 0;
-
- };
- loop {
- implement this!
- }
- for (tclient, tcl) in &mut g.clients {
- if tclient == client {
- tcl.transmit_update(&Update {
- gen,
- u : UpdatePayload::ClientSequence(piece, form.s),
- });
- } else {
- tcl.transmit_update(&update);
- }
- }
- */
-/*
-
- thread::sleep(Duration::from_millis(500));
- let message = XUpdate::TestCounter { value : self.next };
- let data = serde_json::to_string(&message)?;
- let data = format!("data: {}\n\n", &data);
- // eprintln!("want to return into &[;{}] {:?}", buf.len(), &data);
- self.next += 1;
- buf[0..data.len()].copy_from_slice(data.as_bytes());
- Ok(buf.len())
- }
-}*/
-
-/*
-#[derive(Deserialize)]
-struct APIForm {
- t : String,
- c : ClientId,
-}
- */
-
-#[get("/_/updates/<ctoken>")]
-#[throws(RE)]
-fn updates(ctoken : InstanceAccess<ClientId>)
+#[get("/_/updates/<ctoken>/<gen>")]
+#[throws(E)]
+fn updates(ctoken : InstanceAccess<ClientId>, gen: Generation)
-> impl response::Responder<'static> {
let iad = ctoken.i;
- let client = iad.ident;
- let _ = {
- let mut ig = iad.g.lock().map_err(|e| anyhow!("lock poison {:?}",&e))?;
- let _g = &mut ig.gs;
- let cl = ig.clients.get(client).ok_or_else(|| anyhow!("no client"))?;
- let _player = cl.player;
- };
- let tc = TestCounterInner { next : 0 };
- let tc = BufReader::new(tc);
- let ch = response::Stream::chunked(tc, 1);
- let ct = ContentType::parse_flexible("text/event-stream; charset=utf-8").
- unwrap();
- response::content::Content(ct,ch)
+ let content = sse::content(iad);
+ let content = response::Stream::chunked(content, 1);
+ const CTYPE : &str = "text/event-stream; charset=utf-8";
+ let ctype = ContentType::parse_flexible(CTYPE).unwrap();
+ response::content::Content(ctype,content)
}
-#[get("/_/<leaf>")]
+#[Get("/_/<leaf>")]
fn resource(leaf : CheckedResourceLeaf) -> io::Result<NamedFile> {
let template_dir = "templates"; // xxx
NamedFile::open(format!("{}/{}", template_dir, leaf.safe))
loading,
session,
resource,
- updates,
+ sse::updates,
api_grab,
api_ungrab,
api_move,
pub struct PieceId;
}
-pub type Counter = u64;
+#[derive(Copy,Clone,Debug,Ord,PartialOrd,Eq,PartialEq)]
+#[derive(Serialize)]
+#[serde(transparent)]
+pub struct Generation (u64);
+
+impl Generation {
+ pub fn increment(&mut self) { self.0 += 1 }
+}
visible_slotmap_key!{ VisiblePieceId('.') }
pub face : FaceId,
pub held : Option<PlayerId>,
pub lastclient : ClientId,
- pub gen_lastclient : Counter,
- pub gen_before_lastclient : Counter,
+ pub gen_lastclient : Generation,
+ pub gen_before_lastclient : Generation,
}
impl PieceRecord {
pub struct GameState {
pub pieces : DenseSlotMap<PieceId,PieceRecord>,
pub players : DenseSlotMap<PlayerId,Player>,
- pub gen : Counter,
+ pub gen : Generation,
}
#[derive(Debug)]
face : 0.into(),
held : None,
lastclient : Default::default(),
- gen_lastclient : 0,
- gen_before_lastclient : 0,
+ gen_lastclient : Generation(0),
+ gen_before_lastclient : Generation(0),
};
pieces.insert(pr);
}
- GameState { pieces, gen : 1, players : Default::default(), }
+ GameState { pieces, gen : Generation(1), players : Default::default(), }
}
}
pub struct PreparedUpdate {
- gen : Counter,
- client : ClientId,
- piece : PieceId,
- client_seq : ClientSequence,
- json : String,
+ pub gen : Generation,
+ pub client : ClientId,
+ pub piece : PieceId,
+ pub client_seq : ClientSequence,
+ pub json : String,
}
pub struct PlayerUpdates {
- pub log : StableIndexVecDeque<PreparedUpdate,UpdateCounter>,
+ pub log : StableIndexVecDeque<PreparedUpdate,sse::UpdateId>,
pub cv : Condvar,
}
pub use std::io;
-pub use std::io::{BufReader,Read};
+pub use std::io::{BufReader,Read,Write};
pub use std::fmt::{self,Display,Debug};
pub use std::thread;
pub use std::time::Duration;
pub use crate::pieces::*;
pub use crate::keydata::*;
pub use crate::updates::*;
-pub use crate::sse::*;
+pub use crate::sse;
pub type E = anyhow::Error;
pub type AE = anyhow::Error;
pub type Coord = isize;
pub type Pos = [Coord; 2];
pub type Colour = String;
+
+#![feature(proc_macro_hygiene, decl_macro)]
+
use crate::imports::*;
#[derive(Copy,Clone,Debug,Eq,PartialEq,Ord,PartialOrd)]
#[derive(Serialize,Deserialize)]
#[serde(transparent)]
-pub struct UpdateCounter (i64);
+pub struct UpdateId (i64);
use vecdeque_stableix::StableIndexOffset;
use std::ops::Neg;
-impl Neg for UpdateCounter {
+const UPDATE_READER_SIZE : usize = 1024*32;
+const UPDATE_MAX_FRAMING_SIZE : usize = 200;
+const UPDATE_KEEPALIVE : Duration = Duration::from_secs(14);
+
+impl Neg for UpdateId {
type Output = Self;
- fn neg(self) -> Self { UpdateCounter(-self.0) }
+ fn neg(self) -> Self { UpdateId(-self.0) }
}
-impl StableIndexOffset for UpdateCounter {
+impl StableIndexOffset for UpdateId {
fn try_increment(&mut self) -> Option<()> { self.0.try_increment() }
fn try_decrement(&mut self) -> Option<()> { self.0.try_decrement() }
fn index_input(&self, input: Self) -> Option<usize> {
self.0.index_input(input.0)
}
fn index_output(&self, inner: usize) -> Option<Self> {
- self.0.index_output(inner).map(|v| UpdateCounter(v))
+ self.0.index_output(inner).map(|v| UpdateId(v))
+ }
+ fn zero() -> Self { UpdateId(0) }
+}
+
+struct UpdateReader {
+ player : PlayerId,
+ client : ClientId,
+ to_send : UpdateId, // xxx race for setting this initially
+ ami : Arc<Mutex<Instance>>,
+}
+
+impl Read for UpdateReader {
+ fn read(&mut self, mut buf: &mut [u8]) -> io::Result<usize> {
+ let em : fn(&'static str) -> io::Error = |s|
+ io::Error::new(io::ErrorKind::Other, anyhow!(s));
+
+ let amig = self.ami.lock().map_err(|_| em("poison"))?;
+ let orig_wanted = buf.len();
+
+ let pu = &mut amig.updates.get(self.player)
+ .ok_or_else(|| em("player gonee"))?;
+ loop {
+ let next = match pu.log.get(self.to_send) {
+ Some(next) => next, None => { break }
+ };
+ let next_len = UPDATE_MAX_FRAMING_SIZE + next.json.len();
+ if next_len > buf.len() { break }
+
+ if next.client == self.client {
+ write!(buf, r#"
+event: recorded
+data: {{ gen: {}, piece: {}, cseq:{} }}
+"#,
+ &next.gen, &next.piece, &next.client_seq);
+ } else {
+ write!(buf, r#"
+id: {}
+data: {}
+"#,
+ &self.to_send,
+ &next.json);
+ }
+ }
+ loop {
+ let generated = orig_wanted - buf.len();
+ if generated > 0 { return generated }
+
+ amig = self.cv.wait_timeout(amig, UPDATE_KEEPALIVE)?.0;
+ write!(buf,r#"
+: keepalive
+"#);
+ }
}
- fn zero() -> Self { UpdateCounter(0) }
+}
+
+ /*
+ loop {
+ e
+ let send_from = (||{
+ let l = self.updates.len();
+ let last_probe = match updates.last() {
+ None => return None,
+ Some(&now) if self.last_sent > now.gen => return l+1,
+ _ => l,
+ };
+ let (lo, hi /* half-open */) = loop {
+ let depth = l - last_probe;
+ depth *= 2;
+ if depth > l { break (0, last_probe) }
+ let probe = l - depth;
+ let here = updates[probe];
+ if here.gen < l
+
+ if let Some(&now) = {
+ if { return None }
+ }
+ let probe = inst.updates.len() - 1;
+ let (lo, hi) = loop {
+ if search == 0 { break }
+ search -= 1;
+ tu = inst.updates[search];
+ if
+
+ let lo = 0;
+
+ };
+ loop {
+ implement this!
+ }
+ for (tclient, tcl) in &mut g.clients {
+ if tclient == client {
+ tcl.transmit_update(&Update {
+ gen,
+ u : UpdatePayload::ClientSequence(piece, form.s),
+ });
+ } else {
+ tcl.transmit_update(&update);
+ }
+ }
+ */
+/*
+
+ thread::sleep(Duration::from_millis(500));
+ let message = XUpdate::TestCounter { value : self.next };
+ let data = serde_json::to_string(&message)?;
+ let data = format!("data: {}\n\n", &data);
+ // eprintln!("want to return into &[;{}] {:?}", buf.len(), &data);
+ self.next += 1;
+ buf[0..data.len()].copy_from_slice(data.as_bytes());
+ Ok(buf.len())
+ }
+}*/
+
+/*
+#[derive(Deserialize)]
+struct APIForm {
+ t : String,
+ c : ClientId,
+}
+ */
+
+#[throws(E)]
+pub fn content(iad : InstanceAccessDetails<ClientId>, gen: Generation)
+ -> impl Read {
+ let client = iad.ident;
+
+ let content = {
+ let mut ig = iad.g.lock().map_err(|e| anyhow!("lock poison {:?}",&e))?;
+ let g = &mut ig.gs;
+ let cl = ig.clients.get(client).ok_or_else(|| anyhow!("no client"))?;
+ let player = cl.player;
+ let ami = iad.g.clone();
+
+ let to_send = UpdateId(42); // xxx
+
+ UpdateReader { player, client, to_send, ami }
+ };
+ BufReader::with_capacity(UPDATE_READER_SIZE, content)
}
#[derive(Debug,Serialize)]
pub struct Update {
- pub gen : Counter,
+ pub gen : Generation,
pub u : UpdatePayload,
}