// ---------- basic definitions ----------
const UPDATE_READER_SIZE: usize = 1024*32;
-const UPDATE_MAX_FRAMING_SIZE: usize = 200;
const UPDATE_KEEPALIVE: Duration = Duration::from_secs(14);
const UPDATE_EXPIRE: Duration = Duration::from_secs(66);
struct UpdateReader {
wn: UpdateReaderWN,
overflow: Option<io::Cursor<Box<[u8]>>>,
- need_flush: bool,
gref: InstanceRef,
keepalives: Wrapping<u32>,
ending_send: Option<io::Cursor<Box<[u8]>>>,
}
deref_to_field!{UpdateReader, UpdateReaderWN, wn} // no DerefMut
-#[derive(Error,Debug)]
-#[error("WouldBlock error misreported!")]
-struct FlushWouldBlockError{}
-
impl UpdateReaderWN {
#[throws(io::Error)]
fn write_next<U>(&mut self, mut buf: &mut U, tz: &Timezone,
}
}
-#[derive(Debug)]
-struct BufForRead<'b> {
- csr: io::Cursor<&'b mut [u8]>,
+type BufForSend = Vec<u8>;
+
+#[derive(Debug, Default)]
+struct BufForRead {
+ buf: Vec<u8>,
}
-impl Write for BufForRead<'_> {
+impl Write for BufForRead {
#[throws(io::Error)]
- fn write(&mut self, d: &[u8]) -> usize { self.csr.write(d)? }
+ fn write(&mut self, d: &[u8]) -> usize { self.buf.write(d)? }
#[throws(io::Error)]
- fn flush(&mut self) { self.csr.flush()? }
+ fn flush(&mut self) { self.buf.flush()? }
}
-impl BufForRead<'_> {
- fn reset_to_start(&mut self) { self.csr.set_position(0) }
- fn generated(&self) -> usize { self.csr.position().try_into().unwrap() }
- fn at_start(&self) -> bool { self.generated() == 0 }
- fn remaining(&self) -> usize {
- self.csr.get_ref().len() -
- usize::try_from(self.csr.position()).unwrap()
- }
+impl BufForRead {
+ fn reset_to_start(&mut self) { self.buf.truncate(0) }
+ fn at_start(&self) -> bool { self.buf.len() == 0 }
+ fn len(&self) -> usize { self.buf.len() }
fn copy_from<R: InfallibleBufRead>(&mut self, mut read: R) {
let rbuf = read.fill_buf().unwrap();
- let did = self.csr.write(rbuf).unwrap();
+ let did = self.write(rbuf).unwrap();
read.consume(did);
}
- #[throws(io::Error)]
- fn just_copy_from<R: InfallibleBufRead>(&mut self, read: R) -> usize {
+ fn just_copy_from<R: InfallibleBufRead>(mut self, read: R) -> BufForSend {
self.copy_from(read);
- self.generated()
+ self.finish()
}
+
+ fn finish(self) -> BufForSend { assert!(! self.buf.is_empty()); self.buf }
+ fn finish_eof() -> BufForSend { vec![] }
}
trait InfallibleBufRead: BufRead { }
impl<T> InfallibleBufRead for io::Cursor<T> where io::Cursor<T>: BufRead { }
impl<T> InfallibleBufRead for &mut T where T: InfallibleBufRead { }
impl UpdateReader {
- async fn read(&mut self, buf: &mut [u8]) -> Result<usize, io::Error> {
- let mut buf = BufForRead{ csr: io::Cursor::new(buf) };
- if buf.remaining() == 0 { return Ok(0) }
+ #[throws(io::Error)]
+ async fn read(&mut self) -> BufForSend {
+ let mut buf = BufForRead::default();
if let Some(ref mut ending) = self.ending_send {
return buf.just_copy_from(ending);
break
}
};
- let next_len = UPDATE_MAX_FRAMING_SIZE + next.json_len(self.player);
- if next_len > buf.remaining() {
- if ! buf.at_start() { break }
-
- if self.overflow.is_some() {
- throw!(self.wn.trouble("overflow mismanaged",()));
- }
- self.overflow = {
- let mut overflow = Vec::with_capacity(next_len);
- self.wn.write_next(&mut overflow, &iplayer.ipl.tz, &next)
- .map_err(|e| self.wn.trouble("overflow.write_next",&e))?;
- debug!("overflow {} {}, len={}",
- &self.wn.player, &self.wn.client, &overflow.len());
- Some(io::Cursor::new(overflow.into_boxed_slice()))
- };
- continue;
- }
self.wn.write_next(&mut buf, &iplayer.ipl.tz, &next)
.map_err(|e| self.wn.trouble("UpdateReader.write_next",&e))?;
+ if buf.len() >= UPDATE_READER_SIZE { return buf.finish() }
+
let before = next.when - UPDATE_EXPIRE;
pu.expire_upto(before);
}
let cv = pu.get_cv();
- let generated = buf.generated();
- if generated > 0 {
- self.need_flush = true;
- return Ok(generated)
- }
-
- if self.need_flush {
- self.need_flush = false;
- return Err(io::Error::new(io::ErrorKind::WouldBlock,
- FlushWouldBlockError{}));
+ if buf.len() > 0 {
+ return buf.finish();
}
if (||{
let client = ig.clients.get_mut(self.client)?;
client.lastseen = Instant::now();
Some(())
- })() == None { return Ok(0) }
+ })() == None { return BufForRead::finish_eof() }
let was_gen = ig.gs.gen;
data: online {} {} G{}\n\n",
self.player, self.client, was_gen)?;
self.keepalives += Wrapping(1);
- self.need_flush = true;
- return Ok(buf.generated());
+ return buf.finish();
}
}
#[throws(Fatal)]
pub fn content(iad: InstanceAccessDetails<ClientId>, gen: Generation)
- -> Pin<Box<dyn futures::Stream<
- Item=Result<Bytes, io::Error>
- >>> {
+ -> Pin<Box<dyn futures::Stream<Item=Result<Bytes, io::Error>>>>
+{
let client = iad.ident;
let update_reader = {
};
UpdateReader {
- need_flush: false,
keepalives: Wrapping(0),
overflow: None,
gref,
}
};
- Box::pin(futures::stream::try_unfold(update_reader,
- |mut update_reader| async {
- // TODO change error type here to not be io::Error
- // TODO get rid of io::ErrorKind::WouldBlock kludge
- // TODO what is the point now of BufForRead? Combine this with that?
- // TODO adaptive buffer length
- let mut buffer = vec![ 0u8; UPDATE_READER_SIZE ];
- let mut used = 0;
- loop {
- if used == buffer.len() { break }
-
- let got = match update_reader.read(&mut buffer[used..]).await {
- Err(e) if e.kind() == io::ErrorKind::WouldBlock => {
- if used > 0 { break } else { continue }
- },
- x => x,
- }?;
-
- used += got;
- }
- Ok(if used > 0 {
- buffer.truncate(used);
- Some((Bytes::from(buffer), update_reader))
+ Box::pin(futures::stream::try_unfold(
+ update_reader, |mut update_reader| async
+ {
+ let got = update_reader.read().await?;
+ Ok(if got.len() > 0 {
+ Some((Bytes::from(got), update_reader))
} else {
None
})