chiark / gitweb /
server: wip, task plumbing
authorIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 8 Aug 2021 18:00:14 +0000 (19:00 +0100)
committerIan Jackson <ijackson@chiark.greenend.org.uk>
Sun, 8 Aug 2021 18:50:02 +0000 (19:50 +0100)
Signed-off-by: Ian Jackson <ijackson@chiark.greenend.org.uk>
src/bin/server.rs
src/ipif.rs
src/prelude.rs

index f56db02d0c2271b8c16c8714d99b66a8a7a7dd46..442698731b099803ae65d40bddbf8e79b4d35c67 100644 (file)
@@ -15,6 +15,17 @@ pub struct Opts {
 
 const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
 
+type AllClients = HashMap<ClientName, ClientHandles>;
+
+struct ClientHandles {
+  ic: Arc<InstanceConfig>,
+  web: tokio::sync::mpsc::Sender<WebRequest>,
+}
+
+struct WebRequest {
+  reply: tokio::sync::oneshot::Sender<()>,
+}
+
 async fn handle(
 //    context: (),
 //    addr: SocketAddr,
@@ -102,18 +113,47 @@ async fn handle(
   Ok(hyper::Response::new(hyper::Body::from("Hello World")))
 }
 
+async fn run_client(_ic: Arc<InstanceConfig>, _web: mpsc::Receiver<WebRequest>)
+                    -> Result<Void, AE>
+{
+  Err(anyhow!("xxx"))
+}
 
 #[tokio::main]
 async fn main() {
   let opts = Opts::from_args();
-  let mut hservers = vec![];
-  let (ics, global,ipif) = config::startup(
+  let mut tasks: Vec<(
+    JoinHandle<AE>,
+    String,
+  )> = vec![];
+
+  let (global, ipif) = config::startup(
     "hippotatd", LinkEnd::Server,
     &opts.config, &opts.log, |ics|
   {
     let global = config::InstanceConfigGlobal::from(&ics);
     let ipif = Ipif::start(&global.ipif, None)?;
 
+    let _all_clients: AllClients = ics.into_iter().map(|ic| {
+      let ic = Arc::new(ic);
+
+      let (web_send, web_recv) = mpsc::channel(
+        5 // xxx should me max_requests_outstanding but that's
+          // marked client-only so needs rework
+      );
+
+      let ic_ = ic.clone();
+      tasks.push((tokio::spawn(async move {
+        run_client(ic_, web_recv).await.void_unwrap_err()
+      }), format!("client {}", &ic)));
+
+      (ic.link.client,
+       ClientHandles {
+         ic,
+         web: web_send,
+       })
+    }).collect();
+
     let make_service = hyper::service::make_service_fn(|_conn| async {
       Ok::<_, Infallible>(hyper::service::service_fn(handle))
     });
@@ -125,17 +165,24 @@ async fn main() {
         .http1_preserve_header_case(true)
         .serve(make_service);
       info!("listening on {}", &addr);
-      let task = tokio::task::spawn(server);
-      hservers.push(task);
+      let task = tokio::task::spawn(async move {
+        match server.await {
+          Ok(()) => anyhow!("shut down?!"),
+          Err(e) => e.into(),
+        }
+      });
+      tasks.push((task, format!("http server {}", addr)));
     }
-
-    Ok((ics, global, ipif))
+    
+    Ok((global, ipif))
   });
 
-  let x = future::select_all(&mut hservers).await;
-  error!("xxx hserver {:?}", &x);
+  let died = future::select_all(
+    tasks.iter_mut().map(|e| &mut e.0)
+  ).await;
+  error!("xxx {:?}", &died);
 
   ipif.quitting(None).await;
 
-  dbg!(ics, global);
+  dbg!(global);
 }
index 5364e3757a412553321302445000b896946f8c67..b98249b3a75332a38ffb446343117b628b8ee2a8 100644 (file)
@@ -7,7 +7,7 @@ use crate::prelude::*;
 pub struct Ipif {
   pub tx: t_io::Split<t_io::BufReader<t_proc::ChildStdout>>,
   pub rx: t_proc::ChildStdin,
-  stderr_task: tokio::task::JoinHandle<io::Result<()>>,
+  stderr_task: JoinHandle<io::Result<()>>,
   child: t_proc::Child,
 }
 
index f44acbdf85d3773558f2eacbfa5b67ad9c653a24..22a9c27550b81b9623bc261a34980059f46998fa 100644 (file)
@@ -43,7 +43,8 @@ pub use thiserror::Error;
 pub use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
 pub use tokio::pin;
 pub use tokio::select;
-pub use tokio::task;
+pub use tokio::sync::{mpsc, oneshot};
+pub use tokio::task::{self, JoinHandle};
 pub use tokio::time::{Duration, Instant};
 pub use void::{self, Void, ResultVoidExt, ResultVoidErrExt};