From: Ian Jackson Date: Fri, 3 Jul 2020 23:54:53 +0000 (+0100) Subject: wip X-Git-Tag: otter-0.2.0~1490 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=e893a1f44066524285e1554002a1b5db9d9e5ad1;p=otter.git wip --- diff --git a/src/bin/server.rs b/src/bin/server.rs index 958120e9..bd277408 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -131,7 +131,7 @@ enum OpError { struct ApiGrab { t : String, p : VisiblePieceId, - g : Counter, + g : Generation, s : ClientSequence, } #[post("/_/api/grab", format="json", data="
")] @@ -155,7 +155,7 @@ fn api_grab(form : Json) -> impl response::Responder<'static> { if u_gen > q_gen { Err(OpError::Conflict)? } if p.held != None { Err(OpError::PieceHeld)? }; p.held = Some(player); - gs.gen += 1; + gs.gen.increment(); let gen = gs.gen; if client != p.lastclient { p.gen_before_lastclient = p.gen_lastclient; @@ -208,147 +208,19 @@ enum XUpdate { TestCounter { value: usize }, } -const UPDATE_READER_SIZE : usize = 1024*32; -const UPDATE_MAX_FRAMING_SIZE : usize = 200; -const UPDATE_KEEPALIVE : Duration = Duration::from_seconds(14); - -#[derive(Debug)] -struct UpdateReader { - player : PlayerId, - client : ClientId, - to_send : UpdateCounter, // xxx race for setting this initially - ami : Arc>, -} - -impl Read for UpdateReader { - fn read(&mut self, mut buf: &mut [u8]) -> io::Result { - let amig = self.ami.lock()?; - let orig_wanted = buf.len(); - - let pu = &mut amig.updates.get(self.player) - .ok_or_else(|| io::Error::new - (io::ErrorKind::Other, anyhow!("player gonee")))?; - loop { - let next = match pu.log.get(self.to_send) { - Some(next) => next, None => { break } - }; - let next_len = UPDATE_MAX_FRAMING_SIZE + next.json.len(); - if next_len > buf.len() { break } - - if next.client == self.client { - write!(buf, r#" -event: recorded -data: {{ gen: {}, piece: {}, cseq:{} }} -"#, - &self.gen, &next.piece, &next.client_seq); - } else { - write!(buf, r#" -id: {} -data: {} -"#, - &self.to_send, - &next.json); - } - } - loop { - let generated = orig_wanted - buf.len(); - if generated > 0 { return generated } - - amig = self.cv.wait_timeout(amig, UPDATE_KEEPALIVE)?.0; - write!(buf,r#" -: keepalive -"#); - } - } -} - - /* - loop { - e - let send_from = (||{ - let l = self.updates.len(); - let last_probe = match updates.last() { - None => return None, - Some(&now) if self.last_sent > now.gen => return l+1, - _ => l, - }; - let (lo, hi /* half-open */) = loop { - let depth = l - last_probe; - depth *= 2; - if depth > l { break (0, last_probe) } - let probe = l - depth; - let here = updates[probe]; - if here.gen < l - - if let Some(&now) = { - if { return None } - } - let probe = inst.updates.len() - 1; - let (lo, hi) = loop { - if search == 0 { break } - search -= 1; - tu = inst.updates[search]; - if - - let lo = 0; - - }; - loop { - implement this! - } - for (tclient, tcl) in &mut g.clients { - if tclient == client { - tcl.transmit_update(&Update { - gen, - u : UpdatePayload::ClientSequence(piece, form.s), - }); - } else { - tcl.transmit_update(&update); - } - } - */ -/* - - thread::sleep(Duration::from_millis(500)); - let message = XUpdate::TestCounter { value : self.next }; - let data = serde_json::to_string(&message)?; - let data = format!("data: {}\n\n", &data); - // eprintln!("want to return into &[;{}] {:?}", buf.len(), &data); - self.next += 1; - buf[0..data.len()].copy_from_slice(data.as_bytes()); - Ok(buf.len()) - } -}*/ - -/* -#[derive(Deserialize)] -struct APIForm { - t : String, - c : ClientId, -} - */ - -#[get("/_/updates/")] -#[throws(RE)] -fn updates(ctoken : InstanceAccess) +#[get("/_/updates//")] +#[throws(E)] +fn updates(ctoken : InstanceAccess, gen: Generation) -> impl response::Responder<'static> { let iad = ctoken.i; - let client = iad.ident; - let _ = { - let mut ig = iad.g.lock().map_err(|e| anyhow!("lock poison {:?}",&e))?; - let _g = &mut ig.gs; - let cl = ig.clients.get(client).ok_or_else(|| anyhow!("no client"))?; - let _player = cl.player; - }; - let tc = TestCounterInner { next : 0 }; - let tc = BufReader::new(tc); - let ch = response::Stream::chunked(tc, 1); - let ct = ContentType::parse_flexible("text/event-stream; charset=utf-8"). - unwrap(); - response::content::Content(ct,ch) + let content = sse::content(iad); + let content = response::Stream::chunked(content, 1); + const CTYPE : &str = "text/event-stream; charset=utf-8"; + let ctype = ContentType::parse_flexible(CTYPE).unwrap(); + response::content::Content(ctype,content) } -#[get("/_/")] +#[Get("/_/")] fn resource(leaf : CheckedResourceLeaf) -> io::Result { let template_dir = "templates"; // xxx NamedFile::open(format!("{}/{}", template_dir, leaf.safe)) @@ -370,7 +242,7 @@ fn main() { loading, session, resource, - updates, + sse::updates, api_grab, api_ungrab, api_move, diff --git a/src/gamestate.rs b/src/gamestate.rs index 48471c35..8d6da5ac 100644 --- a/src/gamestate.rs +++ b/src/gamestate.rs @@ -5,7 +5,14 @@ slotmap::new_key_type!{ pub struct PieceId; } -pub type Counter = u64; +#[derive(Copy,Clone,Debug,Ord,PartialOrd,Eq,PartialEq)] +#[derive(Serialize)] +#[serde(transparent)] +pub struct Generation (u64); + +impl Generation { + pub fn increment(&mut self) { self.0 += 1 } +} visible_slotmap_key!{ VisiblePieceId('.') } @@ -49,8 +56,8 @@ pub struct PieceRecord { pub face : FaceId, pub held : Option, pub lastclient : ClientId, - pub gen_lastclient : Counter, - pub gen_before_lastclient : Counter, + pub gen_lastclient : Generation, + pub gen_before_lastclient : Generation, } impl PieceRecord { @@ -67,7 +74,7 @@ impl PieceRecord { pub struct GameState { pub pieces : DenseSlotMap, pub players : DenseSlotMap, - pub gen : Counter, + pub gen : Generation, } #[derive(Debug)] @@ -83,10 +90,10 @@ pub fn xxx_gamestate_init() -> GameState { face : 0.into(), held : None, lastclient : Default::default(), - gen_lastclient : 0, - gen_before_lastclient : 0, + gen_lastclient : Generation(0), + gen_before_lastclient : Generation(0), }; pieces.insert(pr); } - GameState { pieces, gen : 1, players : Default::default(), } + GameState { pieces, gen : Generation(1), players : Default::default(), } } diff --git a/src/global.rs b/src/global.rs index 3ecdf43f..dd384a6c 100644 --- a/src/global.rs +++ b/src/global.rs @@ -24,15 +24,15 @@ impl Client { } pub struct PreparedUpdate { - gen : Counter, - client : ClientId, - piece : PieceId, - client_seq : ClientSequence, - json : String, + pub gen : Generation, + pub client : ClientId, + pub piece : PieceId, + pub client_seq : ClientSequence, + pub json : String, } pub struct PlayerUpdates { - pub log : StableIndexVecDeque, + pub log : StableIndexVecDeque, pub cv : Condvar, } diff --git a/src/imports.rs b/src/imports.rs index a949f8f1..625df08f 100644 --- a/src/imports.rs +++ b/src/imports.rs @@ -1,6 +1,6 @@ pub use std::io; -pub use std::io::{BufReader,Read}; +pub use std::io::{BufReader,Read,Write}; pub use std::fmt::{self,Display,Debug}; pub use std::thread; pub use std::time::Duration; @@ -44,7 +44,7 @@ pub use crate::gamestate::*; pub use crate::pieces::*; pub use crate::keydata::*; pub use crate::updates::*; -pub use crate::sse::*; +pub use crate::sse; pub type E = anyhow::Error; pub type AE = anyhow::Error; @@ -53,3 +53,4 @@ pub type SvgData = Vec; pub type Coord = isize; pub type Pos = [Coord; 2]; pub type Colour = String; + diff --git a/src/sse.rs b/src/sse.rs index c090c3e4..2133453b 100644 --- a/src/sse.rs +++ b/src/sse.rs @@ -1,27 +1,169 @@ +#![feature(proc_macro_hygiene, decl_macro)] + use crate::imports::*; #[derive(Copy,Clone,Debug,Eq,PartialEq,Ord,PartialOrd)] #[derive(Serialize,Deserialize)] #[serde(transparent)] -pub struct UpdateCounter (i64); +pub struct UpdateId (i64); use vecdeque_stableix::StableIndexOffset; use std::ops::Neg; -impl Neg for UpdateCounter { +const UPDATE_READER_SIZE : usize = 1024*32; +const UPDATE_MAX_FRAMING_SIZE : usize = 200; +const UPDATE_KEEPALIVE : Duration = Duration::from_secs(14); + +impl Neg for UpdateId { type Output = Self; - fn neg(self) -> Self { UpdateCounter(-self.0) } + fn neg(self) -> Self { UpdateId(-self.0) } } -impl StableIndexOffset for UpdateCounter { +impl StableIndexOffset for UpdateId { fn try_increment(&mut self) -> Option<()> { self.0.try_increment() } fn try_decrement(&mut self) -> Option<()> { self.0.try_decrement() } fn index_input(&self, input: Self) -> Option { self.0.index_input(input.0) } fn index_output(&self, inner: usize) -> Option { - self.0.index_output(inner).map(|v| UpdateCounter(v)) + self.0.index_output(inner).map(|v| UpdateId(v)) + } + fn zero() -> Self { UpdateId(0) } +} + +struct UpdateReader { + player : PlayerId, + client : ClientId, + to_send : UpdateId, // xxx race for setting this initially + ami : Arc>, +} + +impl Read for UpdateReader { + fn read(&mut self, mut buf: &mut [u8]) -> io::Result { + let em : fn(&'static str) -> io::Error = |s| + io::Error::new(io::ErrorKind::Other, anyhow!(s)); + + let amig = self.ami.lock().map_err(|_| em("poison"))?; + let orig_wanted = buf.len(); + + let pu = &mut amig.updates.get(self.player) + .ok_or_else(|| em("player gonee"))?; + loop { + let next = match pu.log.get(self.to_send) { + Some(next) => next, None => { break } + }; + let next_len = UPDATE_MAX_FRAMING_SIZE + next.json.len(); + if next_len > buf.len() { break } + + if next.client == self.client { + write!(buf, r#" +event: recorded +data: {{ gen: {}, piece: {}, cseq:{} }} +"#, + &next.gen, &next.piece, &next.client_seq); + } else { + write!(buf, r#" +id: {} +data: {} +"#, + &self.to_send, + &next.json); + } + } + loop { + let generated = orig_wanted - buf.len(); + if generated > 0 { return generated } + + amig = self.cv.wait_timeout(amig, UPDATE_KEEPALIVE)?.0; + write!(buf,r#" +: keepalive +"#); + } } - fn zero() -> Self { UpdateCounter(0) } +} + + /* + loop { + e + let send_from = (||{ + let l = self.updates.len(); + let last_probe = match updates.last() { + None => return None, + Some(&now) if self.last_sent > now.gen => return l+1, + _ => l, + }; + let (lo, hi /* half-open */) = loop { + let depth = l - last_probe; + depth *= 2; + if depth > l { break (0, last_probe) } + let probe = l - depth; + let here = updates[probe]; + if here.gen < l + + if let Some(&now) = { + if { return None } + } + let probe = inst.updates.len() - 1; + let (lo, hi) = loop { + if search == 0 { break } + search -= 1; + tu = inst.updates[search]; + if + + let lo = 0; + + }; + loop { + implement this! + } + for (tclient, tcl) in &mut g.clients { + if tclient == client { + tcl.transmit_update(&Update { + gen, + u : UpdatePayload::ClientSequence(piece, form.s), + }); + } else { + tcl.transmit_update(&update); + } + } + */ +/* + + thread::sleep(Duration::from_millis(500)); + let message = XUpdate::TestCounter { value : self.next }; + let data = serde_json::to_string(&message)?; + let data = format!("data: {}\n\n", &data); + // eprintln!("want to return into &[;{}] {:?}", buf.len(), &data); + self.next += 1; + buf[0..data.len()].copy_from_slice(data.as_bytes()); + Ok(buf.len()) + } +}*/ + +/* +#[derive(Deserialize)] +struct APIForm { + t : String, + c : ClientId, +} + */ + +#[throws(E)] +pub fn content(iad : InstanceAccessDetails, gen: Generation) + -> impl Read { + let client = iad.ident; + + let content = { + let mut ig = iad.g.lock().map_err(|e| anyhow!("lock poison {:?}",&e))?; + let g = &mut ig.gs; + let cl = ig.clients.get(client).ok_or_else(|| anyhow!("no client"))?; + let player = cl.player; + let ami = iad.g.clone(); + + let to_send = UpdateId(42); // xxx + + UpdateReader { player, client, to_send, ami } + }; + BufReader::with_capacity(UPDATE_READER_SIZE, content) } diff --git a/src/updates.rs b/src/updates.rs index 6566d3df..a0ce16a1 100644 --- a/src/updates.rs +++ b/src/updates.rs @@ -7,7 +7,7 @@ pub struct ClientSequence(u64); #[derive(Debug,Serialize)] pub struct Update { - pub gen : Counter, + pub gen : Generation, pub u : UpdatePayload, }