From: Ian Jackson Date: Sun, 8 Aug 2021 18:00:14 +0000 (+0100) Subject: server: wip, task plumbing X-Git-Tag: hippotat/1.0.0~203 X-Git-Url: https://www.chiark.greenend.org.uk/ucgi/~ian/git?a=commitdiff_plain;h=6374dafe747df2042e4d6c626241db6669845b4f;p=hippotat.git server: wip, task plumbing Signed-off-by: Ian Jackson --- diff --git a/src/bin/server.rs b/src/bin/server.rs index f56db02..4426987 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -15,6 +15,17 @@ pub struct Opts { const METADATA_MAX_LEN: usize = MAX_OVERHEAD; +type AllClients = HashMap; + +struct ClientHandles { + ic: Arc, + web: tokio::sync::mpsc::Sender, +} + +struct WebRequest { + reply: tokio::sync::oneshot::Sender<()>, +} + async fn handle( // context: (), // addr: SocketAddr, @@ -102,18 +113,47 @@ async fn handle( Ok(hyper::Response::new(hyper::Body::from("Hello World"))) } +async fn run_client(_ic: Arc, _web: mpsc::Receiver) + -> Result +{ + Err(anyhow!("xxx")) +} #[tokio::main] async fn main() { let opts = Opts::from_args(); - let mut hservers = vec![]; - let (ics, global,ipif) = config::startup( + let mut tasks: Vec<( + JoinHandle, + String, + )> = vec![]; + + let (global, ipif) = config::startup( "hippotatd", LinkEnd::Server, &opts.config, &opts.log, |ics| { let global = config::InstanceConfigGlobal::from(&ics); let ipif = Ipif::start(&global.ipif, None)?; + let _all_clients: AllClients = ics.into_iter().map(|ic| { + let ic = Arc::new(ic); + + let (web_send, web_recv) = mpsc::channel( + 5 // xxx should me max_requests_outstanding but that's + // marked client-only so needs rework + ); + + let ic_ = ic.clone(); + tasks.push((tokio::spawn(async move { + run_client(ic_, web_recv).await.void_unwrap_err() + }), format!("client {}", &ic))); + + (ic.link.client, + ClientHandles { + ic, + web: web_send, + }) + }).collect(); + let make_service = hyper::service::make_service_fn(|_conn| async { Ok::<_, Infallible>(hyper::service::service_fn(handle)) }); @@ -125,17 +165,24 @@ async fn main() { .http1_preserve_header_case(true) .serve(make_service); info!("listening on {}", &addr); - let task = tokio::task::spawn(server); - hservers.push(task); + let task = tokio::task::spawn(async move { + match server.await { + Ok(()) => anyhow!("shut down?!"), + Err(e) => e.into(), + } + }); + tasks.push((task, format!("http server {}", addr))); } - - Ok((ics, global, ipif)) + + Ok((global, ipif)) }); - let x = future::select_all(&mut hservers).await; - error!("xxx hserver {:?}", &x); + let died = future::select_all( + tasks.iter_mut().map(|e| &mut e.0) + ).await; + error!("xxx {:?}", &died); ipif.quitting(None).await; - dbg!(ics, global); + dbg!(global); } diff --git a/src/ipif.rs b/src/ipif.rs index 5364e37..b98249b 100644 --- a/src/ipif.rs +++ b/src/ipif.rs @@ -7,7 +7,7 @@ use crate::prelude::*; pub struct Ipif { pub tx: t_io::Split>, pub rx: t_proc::ChildStdin, - stderr_task: tokio::task::JoinHandle>, + stderr_task: JoinHandle>, child: t_proc::Child, } diff --git a/src/prelude.rs b/src/prelude.rs index f44acbd..22a9c27 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -43,7 +43,8 @@ pub use thiserror::Error; pub use tokio::io::{AsyncBufReadExt, AsyncWriteExt}; pub use tokio::pin; pub use tokio::select; -pub use tokio::task; +pub use tokio::sync::{mpsc, oneshot}; +pub use tokio::task::{self, JoinHandle}; pub use tokio::time::{Duration, Instant}; pub use void::{self, Void, ResultVoidExt, ResultVoidErrExt};