From: Ian Jackson Date: Sun, 27 Mar 2022 23:50:10 +0000 (+0100) Subject: actix: Replace Rocket X-Git-Tag: otter-1.0.0~117 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ianmdlvl/git?a=commitdiff_plain;h=78e45b188945eb82300a57f39a7ffffa0d325087;p=otter.git actix: Replace Rocket Signed-off-by: Ian Jackson --- diff --git a/TODO b/TODO index 5f618d1c..193518f8 100644 --- a/TODO +++ b/TODO @@ -1 +1,10 @@ remove test programs +request logging +template render failure logging +template don't do */* thing, and strip .tera +TODOs in diff +content-type for download bundles +http2 HEAD requests +fix listen +fix that /_/updates vs /_/ involves ordering +one tera version diff --git a/daemon/api.rs b/daemon/api.rs index 171bd4c6..1337b7a7 100644 --- a/daemon/api.rs +++ b/daemon/api.rs @@ -7,20 +7,20 @@ use crate::imports::*; pub use super::*; #[derive(Clone,Debug)] -pub struct InstanceAccess<'i, Id> { - pub raw_token: &'i RawTokenVal, +pub struct InstanceAccess { + pub raw_token: RawToken, pub i: InstanceAccessDetails, } -impl<'r, Id> FromFormValue<'r> for InstanceAccess<'r, Id> +impl FromStr for InstanceAccess where Id: AccessId, Fatal: From { - type Error = FER; + type Err = FER; #[throws(FER)] - fn from_form_value(param: &'r RawStr) -> Self { - let token = RawTokenVal::from_str(param.as_str()); - let i = InstanceAccessDetails::from_token(token)?; - InstanceAccess { raw_token: token, i } + fn from_str(s: &str) -> Self { + let raw_token = RawToken(s.to_owned()); + let i = InstanceAccessDetails::from_token(raw_token.borrow())?; + InstanceAccess { raw_token, i } } } @@ -76,33 +76,29 @@ mod op { #[error("{0}")] pub struct FatalErrorResponse(#[from] Fatal); -impl From<&FatalErrorResponse> for rocket::http::Status { - fn from(oe: &FatalErrorResponse) -> rocket::http::Status { +impl ResponseError for FatalErrorResponse { + fn status_code(&self) -> StatusCode { use Fatal::*; - match oe.0 { - ServerFailure(_) => Status::InternalServerError, + match self.0 { + ServerFailure(_) + => StatusCode::INTERNAL_SERVER_ERROR, NoClient | NoPlayer(_) | GameBeingDestroyed(_) - => Status::NotFound, + => StatusCode::NOT_FOUND, BadJSON(_) | BadLoose - => Status::BadRequest, + => StatusCode::BAD_REQUEST, } } -} -impl<'r> Responder<'r> for FatalErrorResponse { - #[throws(Status)] - fn respond_to(self, req: &Request) -> Response<'r> { - let msg = format!("Online-layer error\n{:?}\n{}\n", self, self); - let status = (&self).into(); - let mut resp = Responder::respond_to(msg,req).unwrap(); - resp.set_status(status); - resp + fn error_response(&self) -> HttpResponse { + self.status_code().respond_text( + &format_args!("Online-layer error\n{:?}\n{}\n", self, self) + ) } } #[throws(Fatal)] fn api_piece_op(form: Json>) - -> impl response::Responder<'static> { + -> impl Responder { // thread::sleep(Duration::from_millis(2000)); let iad = lookup_token(form.ctoken.borrow())?; let client = iad.ident; @@ -239,10 +235,10 @@ macro_rules! api_route_core { #[derive(Debug,Serialize,Deserialize)] $formdef - #[post($path, format="json", data="
")] + #[post($path)] #[throws(FER)] - fn $fn(form: Json>) - -> impl response::Responder<'static> { + async fn $fn(form: Json>) + -> impl Responder { api_piece_op(form)? } @@ -636,8 +632,8 @@ api_route!{ } } -pub fn mount(rocket_instance: Rocket) -> Rocket { - rocket_instance.mount("/", routes![ +pub fn routes() -> impl HttpServiceFactory { + services![ api_grab, api_ungrab, api_setz, @@ -646,5 +642,5 @@ pub fn mount(rocket_instance: Rocket) -> Rocket { api_wrest, api_pin, api_uo, - ]) + ] } diff --git a/daemon/main.rs b/daemon/main.rs index 83c1766c..9079923b 100644 --- a/daemon/main.rs +++ b/daemon/main.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later // There is NO WARRANTY. +#![feature(lint_reasons)] #![feature(proc_macro_hygiene, decl_macro)] use otter::imports::thiserror; @@ -13,20 +14,7 @@ pub mod cmdlistener; pub mod session; pub mod sse; -pub use rocket::http::Status; -pub use rocket::http::{ContentType, RawStr}; -pub use rocket::request::Request; -pub use rocket::request::{FromFormValue, FromParam, FromRequest, LenientForm}; -pub use rocket::response; -pub use rocket::response::NamedFile; -pub use rocket::response::{Responder, Response}; -pub use rocket::{get, post, routes}; -pub use rocket::{Rocket, State}; -pub use rocket_contrib::helmet::*; -pub use rocket_contrib::json::Json; -pub use rocket_contrib::templates::tera::{self, Value}; -pub use rocket_contrib::templates::Engines; -pub use rocket_contrib::templates::Template; +pub use std::pin::Pin; pub use crate::api::InstanceAccess; pub use crate::api::{FatalErrorResponse}; @@ -34,49 +22,108 @@ pub use crate::cmdlistener::*; pub type FER = FatalErrorResponse; -use rocket::fairing; -use rocket::response::Content; -use rocket_contrib::serve::StaticFiles; +use actix_web::{route, post, HttpServer, Responder}; +//App, +use actix_web::{HttpResponse, HttpResponseBuilder, ResponseError}; +use actix_web::{HttpRequest, FromRequest}; +use actix_web::services; +use actix_web::dev::HttpServiceFactory; +use actix_web::web::{self, Bytes, Data, Json, Path, Query}; +use actix_web::body::BoxBody; +use actix_web::http::header; +use actix_web::http::{Method, StatusCode}; +use actix_web::middleware; +use actix_files::NamedFile; +use actix_cors::Cors; use otter::prelude::*; +use otter::imports::tera_standalone as tera; +use tera::Tera; + +const CT_JAVASCRIPT: mime::Mime = mime::APPLICATION_JAVASCRIPT_UTF_8; +const CT_TEXT: mime::Mime = mime::TEXT_PLAIN_UTF_8; +const CT_HTML: mime::Mime = mime::TEXT_HTML_UTF_8; +const CT_ZIP: &'static str = "application/zip"; +const CT_WASM: &'static str = "application/wasm"; + +trait IntoMime: Debug { + fn into_mime(&self) -> mime::Mime; +} +impl IntoMime for &str { + fn into_mime(&self) -> mime::Mime { self.parse().expect(self) } +} +impl IntoMime for mime::Mime { + fn into_mime(&self) -> mime::Mime { self.clone() } +} +type ConstContentType = &'static dyn IntoMime; #[derive(Serialize,Debug)] struct FrontPageRenderContext { debug_js_inject: Arc, } +pub type Template = HttpResponse; + +pub struct Templates { + tera: tera::Tera, +} + +impl Templates { + #[throws(StartupError)] + pub fn new(template_dir: &str) -> Self { + let tera = Tera::new(&format!("{}/*.tera", template_dir)) + .context("initialise templates")?; + Templates { tera } + } + + #[throws(InternalError)] + pub fn render(&self, template: &str, c: C) -> Template { + #[throws(InternalError)] + fn inner(tmpls: &Templates, template: &str, c: tera::Result) + -> Template { + let s = tmpls.tera.render(&format!("{}.tera", template), &c?)?; + HttpResponseBuilder::new(StatusCode::OK) + .content_type(CT_HTML) + .body(s) + } + + let c = tera::Context::from_serialize(c); + inner(self, template, c)? + } +} + #[derive(Copy,Clone,Debug)] enum ResourceLocation { Main, Wasm(&'static str), } type RL = ResourceLocation; -const RESOURCES: &[(&'static str, ResourceLocation, ContentType)] = &[ - ("script.js", RL::Main, ContentType::JavaScript), - ("LICENCE", RL::Main, ContentType::Plain), - ("libre", RL::Main, ContentType::HTML), - ("shapelib.html", RL::Main, ContentType::HTML), - ("AGPLv3", RL::Main, ContentType::Plain), - ("CC-BY-SA-3.0", RL::Main, ContentType::Plain), - ("CC-BY-SA-4.0", RL::Main, ContentType::Plain), - ("wasm.wasm", RL::Wasm("otter_wasm_bg.wasm"), ContentType::WASM), - ("wasm.js", RL::Wasm("otter_wasm.js"), ContentType::JavaScript), +const RESOURCES: &[(&'static str, ResourceLocation, ConstContentType)] = &[ + ("script.js", RL::Main, &CT_JAVASCRIPT), + ("LICENCE", RL::Main, &CT_TEXT), + ("libre", RL::Main, &CT_HTML), + ("shapelib.html", RL::Main, &CT_HTML), + ("AGPLv3", RL::Main, &CT_TEXT), + ("CC-BY-SA-3.0", RL::Main, &CT_TEXT), + ("CC-BY-SA-4.0", RL::Main, &CT_TEXT), + ("wasm.wasm", RL::Wasm("otter_wasm_bg.wasm"), &CT_WASM), + ("wasm.js", RL::Wasm("otter_wasm.js"), &CT_JAVASCRIPT), ]; #[derive(Debug)] struct CheckedResourceLeaf { safe_leaf: &'static str, locn: ResourceLocation, - ctype: ContentType, + ctype: ConstContentType, } #[derive(Error,Debug)] #[error("not a valid resource path")] struct UnknownResource{} -impl<'r> FromParam<'r> for CheckedResourceLeaf { - type Error = UnknownResource; - fn from_param(param: &'r RawStr) -> Result { +impl FromStr for CheckedResourceLeaf { + type Err = UnknownResource; + fn from_str(s: &str) -> Result { for &(safe_leaf, locn, ref ctype) in RESOURCES { - if safe_leaf == param.as_str() { + if safe_leaf == s { return Ok(CheckedResourceLeaf { safe_leaf, locn, ctype: ctype.clone(), @@ -87,7 +134,10 @@ impl<'r> FromParam<'r> for CheckedResourceLeaf { } } -type PlayerQueryString<'r> = WholeQueryString>; +type PlayerQueryString = WholeQueryString< + InstanceAccess, + FER, + >; #[derive(Serialize,Debug)] struct LoadingRenderContext<'r> { @@ -100,20 +150,24 @@ struct LoadingRenderContext<'r> { movehist_len_i: usize, movehist_len_max: usize, } -#[get("/")] +#[route("/", method="GET", method="HEAD")] #[throws(FER)] -fn loading_p(ia: PlayerQueryString) -> Template { - loading(None, ia)? +async fn loading_p(ia: PlayerQueryString, + templates: Data) -> Template { + loading(None, ia, templates)? } -#[get("/")] +#[route("/{layout}", method="GET", method="HEAD")] #[throws(FER)] -fn loading_l(layout: Parse, ia: PlayerQueryString) +async fn loading_l(layout: Path>, + ia: PlayerQueryString, + templates: Data) -> Template { - loading(Some((layout.0).0), ia)? + loading(Some((layout.0).0), ia, templates)? } #[throws(Fatal)] -fn loading(layout: Option, ia: PlayerQueryString) +fn loading(layout: Option, ia: PlayerQueryString, + templates: Data) -> Template { if let Some(ia) = ia.0 { @@ -123,60 +177,81 @@ fn loading(layout: Option, ia: PlayerQueryString) let c = LoadingRenderContext { nick: gpl.nick.clone(), game: g.name.to_string(), - ptoken: &ia.raw_token, + ptoken: ia.raw_token.borrow(), debug_js_inject: config().debug_js_inject.clone(), movehist_lens: JsonString(movehist::LENS), movehist_len_i: movehist::LEN_DEF_I, movehist_len_max: movehist::LEN_MAX, layout, }; - Template::render("loading", &c) + templates.render("loading", &c)? } else { let c = FrontPageRenderContext { debug_js_inject: config().debug_js_inject.clone(), }; - Template::render("front", &c) + templates.render("front", &c)? } } -struct WholeQueryString(pub Option); +struct WholeQueryString(pub Option, PhantomData); +impl From> for WholeQueryString { + fn from(v: Option) -> Self { Self(v, default()) } +} -impl<'a,'r,T> FromRequest<'a,'r> for WholeQueryString - where T: 'a + FromFormValue<'a>, - T::Error: Debug, - for <'x> &'x T::Error: Into, +impl FromRequest for WholeQueryString +where T: FromStr, + E: ResponseError + 'static, + T::Err: Into +// T::Error: Debug, { - type Error = >::Error; - fn from_request(r: &'a rocket::Request<'r>) - -> rocket::Outcome - { - eprintln!("REQUEST uri={:?}", &r.uri()); - match r.uri().query().map(|s| { - let s = RawStr::from_str(s); - FromFormValue::from_form_value(s) - }).transpose() { - Ok(v) => rocket::Outcome::Success(WholeQueryString(v)), - Err(e) => rocket::Outcome::Failure(((&e).into(), e)), - } + type Future = futures::future::Ready, E>>; + type Error = E; + fn from_request(req: &HttpRequest, _: &mut actix_web::dev::Payload) + -> Self::Future { + futures::future::ready( + req.uri().query() + .map(|s| s.parse()) + .transpose() + .map_err(Into::into) + .map(WholeQueryString::from) + ) } } #[derive(Debug)] pub struct Parse(pub T); -impl<'r, T> FromParam<'r> for Parse - where T: FromStr, - ::Err: Debug, -// where : Into +impl<'de,T> Deserialize<'de> for Parse +where T: FromStr, + T::Err: std::error::Error, { - type Error = ::Err; - #[throws(Self::Error)] - fn from_param(param: &'r RawStr) -> Parse { - Parse(param.as_str().parse()?) + #[throws(D::Error)] + fn deserialize>(d: D) -> Self { + struct ParseVisitor; + impl<'vde> serde::de::Visitor<'vde> for ParseVisitor { + type Value = Cow<'vde, str>; + #[throws(E)] + fn visit_str(self, v: &str) -> Self::Value { + v.to_owned().into() + } + #[throws(E)] + fn visit_borrowed_str(self, v: &'vde str) -> Self::Value { + v.into() + } + #[throws(fmt::Error)] + fn expecting(&self, f: &mut Formatter<'_>) { + write!(f, "string, from URL path")?; + } + } + + let s = d.deserialize_str(ParseVisitor)?; + let v = s.parse().map_err(|e: T::Err| D::Error::custom(e.to_string()))?; + Parse(v) } } -pub struct BundleToken(pub AssetUrlToken); +//pub struct BundleToken(pub AssetUrlToken); +/* impl<'r> FromFormValue<'r> for BundleToken { type Error = BundleDownloadError; #[throws(BundleDownloadError)] @@ -184,32 +259,45 @@ impl<'r> FromFormValue<'r> for BundleToken { BundleToken(param.as_str().parse()?) } } +*/ + +fn updates_cors() -> Cors { + + Cors::default() + .allowed_methods([Method::GET]) +} + +#[derive(Debug, Deserialize)] +struct UpdatesParams { + ctoken: Parse>, + gen: u64, +} -#[get("/_/updates?&")] +#[route("/_/updates", method="GET", wrap="updates_cors()")] #[throws(FER)] -fn updates<'r>(ctoken: InstanceAccess, gen: u64, - cors: rocket_cors::Guard<'r>) - -> impl response::Responder<'r> { +async fn updates_route(query: Query) -> impl Responder { + let UpdatesParams { ctoken, gen } = query.into_inner(); let gen = Generation(gen); - let iad = ctoken.i; + let iad = ctoken.0.i; debug!("starting update stream {:?}", &iad); - let client = iad.ident; let content = sse::content(iad, gen)?; - let content = DebugReader(content, client); - let content = response::Stream::chunked(content, 4096); - const CTYPE: &str = "text/event-stream; charset=utf-8"; - let ctype = ContentType::parse_flexible(CTYPE).unwrap(); - cors.responder(response::content::Content(ctype, content)) + HttpResponse::build(StatusCode::OK) + .content_type("text/event-stream; charset=utf-8") + .streaming(content) } -#[get("/_/")] +#[route("/_/{leaf}", method="GET", method="HEAD")] #[throws(io::Error)] -fn resource<'r>(leaf: CheckedResourceLeaf) -> impl Responder<'r> { +async fn resource(leaf: Path>) -> impl Responder { + let leaf = leaf.into_inner().0; let path = match leaf.locn { RL::Main => format!("{}/{}", config().template_dir, leaf.safe_leaf), RL::Wasm(s) => format!("{}/{}", config().wasm_dir, s), }; - Content(leaf.ctype, NamedFile::open(path)?) + NamedFile::open(path)? + .disable_content_disposition() + .prefer_utf8(true) + .set_content_type(leaf.ctype.into_mime()) } #[derive(Error,Debug)] @@ -221,32 +309,25 @@ pub enum BundleDownloadError { display_as_debug!{BundleDownloadError} use BundleDownloadError as BDE; -impl<'r> Responder<'r> for BundleDownloadError { - fn respond_to(self, _: &rocket::Request) - -> Result, rocket::http::Status> { - Err((&self).into()) - } -} - -impl From<&BundleDownloadError> for rocket::http::Status { - fn from(e: &BundleDownloadError) -> rocket::http::Status { - use rocket::http::Status as S; - match e { - BDE::NotFound => S::NotFound, - BDE::BadAssetUrlToken(_) => S::Forbidden, - BDE::IE(_) => S::InternalServerError, +impl ResponseError for BundleDownloadError { + fn status_code(&self) -> StatusCode { + match self { + BDE::NotFound => StatusCode::NOT_FOUND, + BDE::BadAssetUrlToken(_) => StatusCode::FORBIDDEN, + BDE::IE(_) => StatusCode::INTERNAL_SERVER_ERROR, } } } -#[get("/_/bundle//")] +#[route("/_/bundle/{instance}/{id}", method="GET", method="HEAD")] #[throws(BundleDownloadError)] -fn bundle<'r>(instance: Parse, - id: Parse, - token: WholeQueryString) - -> impl Responder<'r> -{ - if_let!{ Some(BundleToken(token)) = token.0; else throw!(BadAssetUrlToken) }; +async fn bundle_route(path: Path<( + Parse, + Parse, +)>, token: WholeQueryString +) -> impl Responder { + let (instance, id) = path.into_inner(); + if_let!{ Some(token) = token.0; else throw!(BadAssetUrlToken) }; let instance = &instance.0; let id = id.0; let gref = Instance::lookup_by_name_unauth(instance) @@ -257,18 +338,21 @@ fn bundle<'r>(instance: Parse, ig.asset_url_key.check("bundle", &(instance, id), &token)? }.map(|(_,id)| id); let path = id.path(&ig, auth); - let f = match rocket::response::NamedFile::open(&path) { + let f = match NamedFile::open(&path) { Err(e) if e.kind() == ErrorKind::NotFound => throw!(BDE::NotFound), Err(e) => throw!(IE::from(AE::from(e).context(path).context("bundle"))), Ok(y) => y, }; drop(ig); let ctype = match id.kind { - bundles::Kind::Zip => ContentType::ZIP, + bundles::Kind::Zip => CT_ZIP, }; - Content(ctype, f) + f + .disable_content_disposition() + .set_content_type(ctype.into_mime()) } +/* #[derive(Debug,Copy,Clone)] struct ContentTypeFixup; impl fairing::Fairing for ContentTypeFixup { @@ -296,26 +380,35 @@ impl fairing::Fairing for ContentTypeFixup { } } } +*/ -#[derive(Debug,Copy,Clone)] -struct ReportStartup; -impl fairing::Fairing for ReportStartup { - fn info(&self) -> fairing::Info { - fairing::Info { - name: "ReportStartup", - kind: fairing::Kind::Launch, - } +fn on_launch() { + println!("{}", DAEMON_STARTUP_REPORT); + std::io::stdout().flush().unwrap_or_else( + |e| warn!("failed to report started: {:?}", &e) + ); +} + +#[ext] +impl StatusCode { + fn respond_text(self, t: &dyn Display) -> HttpResponse { + HttpResponse::build(self) + .content_type(CT_TEXT) + .body(t.to_string()) } - fn on_launch(&self, _rocket: &Rocket) { - println!("{}", DAEMON_STARTUP_REPORT); - std::io::stdout().flush().unwrap_or_else( - |e| warn!("failed to report started: {:?}", &e) - ); +} + +async fn not_found_handler(method: Method) -> impl Responder { + match method { + Method::GET | Method::HEAD | Method::POST => + StatusCode::NOT_FOUND.respond_text(&"404 Not found."), + _ => + StatusCode::METHOD_NOT_ALLOWED.respond_text(&"Unsupported HTTP method"), } } -#[throws(StartupError)] -fn main() { +#[actix_web::main] // not compatible with fehler +async fn main() -> Result<(),StartupError> { use structopt::StructOpt; #[derive(StructOpt)] struct Opts { @@ -330,8 +423,6 @@ fn main() { ServerConfig::read(opts.config_filename.as_ref().map(String::as_str), PathResolveMethod::Chdir)?; - std::env::set_var("ROCKET_CLI_COLORS", "off"); - let c = config(); flexi_logger::Logger::with(log_config().clone()).start()?; @@ -349,73 +440,82 @@ fn main() { shapelib::load_global_libs(&config().shapelibs)?; c.lock_save_area()?; + let templates = Templates::new(&c.template_dir)?; load_accounts()?; load_games(&mut AccountsGuard::lock(), &mut games_lock())?; let cl = CommandListener::new()?; cl.spawn()?; - let helmet = SpaceHelmet::default() - .enable(NoSniff::Enable) - .enable(Frame::Deny) - .enable(Referrer::NoReferrer); - - let mut cbuilder = rocket::config::Config::build( - if c.debug { - rocket::config::Environment::Development - } else { - info!("requesting Production"); - rocket::config::Environment::Production - } - ); +// let updates = updates.wrap( + + let c = Arc::new(c); + let templates = Data::new(templates); + + let http = HttpServer::new({ + let c = c.clone(); + move || { + + let json_config = actix_web::web::JsonConfig::default() + .content_type(|ctype| ctype == mime::APPLICATION_JSON) + .content_type_required(true); + + let src_service = actix_files::Files::new("/_/src", &c.bundled_sources) + .show_files_listing() + .redirect_to_slash_directory() + .index_file("index.html") + .disable_content_disposition(); + + let app = actix_web::App::new() + .service(services![ + loading_l, + loading_p, + bundle_route, + updates_route, + resource, + session::routes(), + api::routes(), + ]) + .wrap(middleware::DefaultHeaders::new() + .add((header::X_CONTENT_TYPE_OPTIONS, "nosniff")) + .add((header::X_FRAME_OPTIONS, "DENY")) + .add((header::REFERRER_POLICY, "no-referrer")) + ) + .app_data(json_config) + .app_data(templates.clone()) + .service(src_service) + .default_service(web::to(not_found_handler)); + + app + + }}); + + let mut http = http + .disable_signals(); + + let (addrs, def_port): (&[&dyn IpAddress], _) = if c.debug { + (&[&Ipv6Addr::LOCALHOST, &Ipv4Addr::LOCALHOST ], 8000) + } else { + (&[&Ipv6Addr::UNSPECIFIED, &Ipv4Addr::UNSPECIFIED], 80) + }; + let port = c.http_port.unwrap_or(def_port); - if c.debug { - cbuilder = cbuilder.address("127.0.0.1"); - } - cbuilder = cbuilder.workers(c.rocket_workers); - if let Some(port) = c.http_port { - cbuilder = cbuilder.port(port); + for addr in addrs { + let addr = addr.with_port(port); + http = http.bind(addr) + .with_context(|| format!("bind {:?}", addr))?; } - cbuilder.extras.insert("template_dir".to_owned(), - c.template_dir.clone().into()); thread::spawn(game_flush_task); - let cors_state = { - use rocket_cors::*; - let opts = CorsOptions::default() - .allowed_origins(AllowedOrigins::all()) - .allowed_methods(iter::once(rocket::http::Method::Get.into()).collect()); - opts.validate().expect("cors options"); - opts.to_cors().expect("cors") - }; - - let rconfig = cbuilder.finalize()?; - - let mut r = rocket::custom(rconfig) - .attach(ContentTypeFixup) - .attach(helmet) - .attach(Template::fairing()) - .manage(cors_state) - .mount("/", routes![ - loading_l, - loading_p, - bundle, - resource, - updates, - ]) - .mount("/_/src", StaticFiles::from(&c.bundled_sources)) - ; + thread::spawn(client_periodic_expiry); + thread::spawn(logs_periodic_expiry); if opts.report_startup { - r = r.attach(ReportStartup); + on_launch(); } - let r = crate::session::mount(r); - let r = crate::api::mount(r); - - thread::spawn(client_periodic_expiry); - thread::spawn(logs_periodic_expiry); + http.run().await.context("after startup")?; - r.launch(); + Ok(()) } diff --git a/daemon/session.rs b/daemon/session.rs index 28317429..2cf74441 100644 --- a/daemon/session.rs +++ b/daemon/session.rs @@ -66,15 +66,16 @@ struct DataLoad { } #[derive(Deserialize)] -struct SessionForm { +pub struct SessionForm { ptoken: RawToken, } -#[post("/_/session/", format="json", data="")] +#[post("/_/session/{layout}")] #[throws(FER)] -fn session(form: Json, - layout: Parse) +pub async fn session(form: Json, + templates: Data, + layout: Path>) -> Template { - session_inner(form, layout.0)? + session_inner(form, templates, layout.into_inner().0)? } #[ext] @@ -89,6 +90,7 @@ impl SvgAttrs { } fn session_inner(form: Json, + templates: Data, layout: PresentationLayout) -> Result { // make session in this game, log a message to other players @@ -255,11 +257,11 @@ fn session_inner(form: Json, &player, client, &c.nick, &c.ctoken, iad.gref.lock().ok().as_ref().map(|ig| &**ig)); - Ok(Template::render(layout.template(),&c)) + Ok(templates.render(layout.template(),&c)?) } -pub fn mount(rocket_instance: Rocket) -> Rocket { - rocket_instance.mount("/", routes![ +pub fn routes() -> impl HttpServiceFactory { + services![ session, - ]) + ] } diff --git a/daemon/sse.rs b/daemon/sse.rs index d58537d2..38ebf477 100644 --- a/daemon/sse.rs +++ b/daemon/sse.rs @@ -7,6 +7,8 @@ use otter::prelude::*; +use super::*; + // ---------- basic definitions ---------- const UPDATE_READER_SIZE: usize = 1024*32; @@ -96,8 +98,8 @@ trait InfallibleBufRead: BufRead { } impl InfallibleBufRead for io::Cursor where io::Cursor: BufRead { } impl InfallibleBufRead for &mut T where T: InfallibleBufRead { } -impl Read for UpdateReader { - fn read(&mut self, buf: &mut [u8]) -> Result { +impl UpdateReader { + async fn read(&mut self, buf: &mut [u8]) -> Result { let mut buf = BufForRead{ csr: io::Cursor::new(buf) }; if buf.remaining() == 0 { return Ok(0) } @@ -203,11 +205,19 @@ impl Read for UpdateReader { Some(()) })() == None { return Ok(0) } - cv.wait_for(&mut ig.c, UPDATE_KEEPALIVE); + let was_gen = ig.gs.gen; + + match tokio::time::timeout( + UPDATE_KEEPALIVE, + cv.wait_no_relock(ig.c) + ).await { + Err(_elapsed) => { }, + Ok(baton) => baton.dispose(), + }; write!(buf, "event: commsworking\n\ data: online {} {} G{}\n\n", - self.player, self.client, ig.gs.gen)?; + self.player, self.client, was_gen)?; self.keepalives += Wrapping(1); self.need_flush = true; return Ok(buf.generated()); @@ -218,10 +228,12 @@ impl Read for UpdateReader { #[throws(Fatal)] pub fn content(iad: InstanceAccessDetails, gen: Generation) - -> impl Read { + -> Pin + >>> { let client = iad.ident; - let content = { + let update_reader = { let mut g = iad.gref.lock()?; let _g = &mut g.gs; let cl = g.clients.byid(client)?; @@ -250,7 +262,32 @@ pub fn content(iad: InstanceAccessDetails, gen: Generation) }, } }; - let content = BufReader::with_capacity(UPDATE_READER_SIZE, content); - //DebugReader(content) - content + + Box::pin(futures::stream::try_unfold(update_reader, + |mut update_reader| async { + // TODO change error type here to not be io::Error + // TODO get rid of io::ErrorKind::WouldBlock kludge + // TODO what is the point now of BufForRead? Combine this with that? + // TODO adaptive buffer length + let mut buffer = vec![ 0u8; UPDATE_READER_SIZE ]; + let mut used = 0; + loop { + if used == buffer.len() { break } + + let got = match update_reader.read(&mut buffer[used..]).await { + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + if used > 0 { break } else { continue } + }, + x => x, + }?; + + used += got; + } + Ok(if used > 0 { + buffer.truncate(used); + Some((Bytes::from(buffer), update_reader)) + } else { + None + }) + })) as _ } diff --git a/src/prelude.rs b/src/prelude.rs index c2895c62..a9d717ba 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -40,7 +40,7 @@ pub use std::os::unix::fs::{MetadataExt, OpenOptionsExt}; pub use std::os::unix::io::{AsRawFd, IntoRawFd, RawFd}; pub use std::os::unix::net::UnixStream; pub use std::os::unix::process::{CommandExt, ExitStatusExt}; -pub use std::net::{IpAddr, SocketAddr, ToSocketAddrs}; +pub use std::net::{IpAddr, SocketAddr, ToSocketAddrs, Ipv6Addr, Ipv4Addr}; pub use std::path::PathBuf; pub use std::process::{exit, Child, Command, Stdio}; pub use std::str; @@ -52,6 +52,7 @@ pub use std::thread::{self, sleep}; pub use std::time::{self, Duration, Instant}; pub use anyhow::{anyhow, ensure, Context}; +pub use async_condvar_fair::{Condvar, BatonExt as _}; pub use arrayvec::ArrayVec; pub use boolinator::Boolinator as _; pub use byteorder::{BigEndian, LittleEndian, ReadBytesExt, WriteBytesExt}; @@ -79,7 +80,7 @@ pub use nix::time::clock_gettime; pub use num_derive::{ToPrimitive, FromPrimitive}; pub use num_traits::{Bounded, FromPrimitive, ToPrimitive}; pub use ordered_float::OrderedFloat; -pub use parking_lot::{Condvar, Mutex, MutexGuard}; +pub use parking_lot::{Mutex, MutexGuard}; pub use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard}; pub use percent_encoding::percent_decode_str; pub use percent_encoding::utf8_percent_encode; diff --git a/src/utils.rs b/src/utils.rs index faf4cf49..3dbfae0c 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -738,7 +738,7 @@ impl Drop for CookedStdout { fn drop(&mut self) { self.must_flush() } } -pub trait IpAddress { +pub trait IpAddress: Debug { fn with_port(&self, port: u16) -> SocketAddr; } diff --git a/templates/landscape.tera b/templates/landscape.tera index eeaad5a4..84006b03 100644 --- a/templates/landscape.tera +++ b/templates/landscape.tera @@ -1,4 +1,4 @@ -{% import "macros" as m %}{# -*- HTML -*- -#} +{% import "macros.tera" as m %}{# -*- HTML -*- -#} diff --git a/templates/session.tera b/templates/session.tera index 22c414f0..25728e32 100644 --- a/templates/session.tera +++ b/templates/session.tera @@ -1,4 +1,4 @@ -{% import "macros" as m %}{# -*- HTML -*- -#} +{% import "macros.tera" as m %}{# -*- HTML -*- -#}