}
const METADATA_MAX_LEN: usize = MAX_OVERHEAD;
+const INTERNAL_QUEUE: usize = 15; // xxx: config
#[derive(Debug)]
-struct Global {
+pub struct Global {
config: config::InstanceConfigGlobal,
all_clients: HashMap<ClientName, Client>,
}
#[derive(Debug)]
-struct Client {
+pub struct Client {
ic: Arc<InstanceConfig>,
- web: tokio::sync::mpsc::Sender<WebRequest>,
+ web: mpsc::Sender<WebRequest>,
+ route: mpsc::Sender<RoutedPacket>,
}
+pub type RoutedPacket = Box<[u8]>; // not MIME data
+
/// Sent from hyper worker pool task to client task
#[allow(dead_code)] // xxx
#[derive(Debug)]
length_hint: usize,
body: hyper::body::Body,
boundary_finder: multipart::BoundaryFinder,
- reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+ reply_to: oneshot::Sender<WebResponse>,
warnings: Warnings,
conn: Arc<String>,
}
type WebResponseData = Vec<u8>;
#[throws(PacketError)]
-pub fn route_packet(conn: &str, link: &dyn Display,
- packet: Box<[u8]>, daddr: IpAddr)
+pub async fn route_packet(global: &Global,
+ conn: &str, link: &(dyn Display + Sync),
+ packet: RoutedPacket, daddr: IpAddr)
{
- // xxx
- trace!("{} {} discarding packet daddr={:?} len={}",
- conn, link, daddr, packet.len());
+ let c = &global.config;
+ let len = packet.len();
+ let trace = |how: &str, why: &str| {
+ trace!("{} {} {} {} {:?} len={}",
+ conn, link, how, why, daddr, len);
+ };
+
+ let (dest, why) =
+ if daddr == c.vaddr || ! c.vnetwork.iter().any(|n| n.contains(&daddr)) {
+ (None, "ipif-inbound-xxx")
+ } else if daddr == c.vrelay {
+ (None, "vrelay")
+ } else if let Some(client) = global.all_clients.get(&ClientName(daddr)) {
+ (Some(&client.route), "client")
+ } else {
+ (None, "no-client")
+ };
+
+ let dest = if let Some(d) = dest { d } else {
+ trace("discard", why); return;
+ };
+
+ match dest.send(packet).await {
+ Ok(()) => trace("forward", why),
+ Err(_) => trace("task-crashed!", why),
+ }
}
async fn handle(
//eprintln!("boundary={:?} start={} name={:?} client={}",
// boundary, start, &comp.name, &client.ic);
- let (reply_to, reply_recv) = tokio::sync::oneshot::channel();
+ let (reply_to, reply_recv) = oneshot::channel();
trace!("{} {} request, Content-Length={}",
&conn, &client_name, length_hint);
let wreq = WebRequest {
#[allow(unused_variables)] // xxx
#[allow(unused_mut)] // xxx
-async fn run_client(ic: Arc<InstanceConfig>,
- mut web: mpsc::Receiver<WebRequest>)
+async fn run_client(global: Arc<Global>,
+ ic: Arc<InstanceConfig>,
+ mut web: mpsc::Receiver<WebRequest>,
+ mut routed: mpsc::Receiver<RoutedPacket>)
-> Result<Void, AE>
{
struct Outstanding {
- reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+ reply_to: oneshot::Sender<WebResponse>,
oi: OutstandingInner,
}
#[derive(Debug)]
let downbound: VecDeque<(/*xxx*/)> = default();
let try_send_response = |
- reply_to: tokio::sync::oneshot::Sender<WebResponse>,
+ reply_to: oneshot::Sender<WebResponse>,
response: WebResponse
| {
reply_to.send(response)
let daddr = ip_packet_addr::<true>(header)?;
Ok(daddr)
}, |(daddr,packet)| route_packet(
- &conn, &ic.link.client, daddr,packet
+ &global, &conn, &ic.link.client, daddr,packet
),
|e| Ok::<_,SlipFramesError<_>>({ warnings.add(&e)?; })
- )?;
+ ).await?;
}
let oi = OutstandingInner {
let ics = ics.into_iter().map(Arc::new).collect_vec();
let (client_handles_send, client_handles_recv) = ics.iter()
- .map(|_ic| mpsc::channel(
- 5 // xxx should me max_requests_outstanding but that's
- // marked client-only so needs rework
- )).unzip::<_,_,Vec<_>,Vec<_>>();
+ .map(|_ic| {
+ let (web_send, web_recv) = mpsc::channel(
+ 5 // xxx should me max_requests_outstanding but that's
+ // marked client-only so needs rework
+ );
+ let (route_send, route_recv) = mpsc::channel(
+ INTERNAL_QUEUE
+ );
+ ((web_send, route_send), (web_recv, route_recv))
+ }).unzip::<_,_,Vec<_>,Vec<_>>();
let all_clients = izip!(
- ics.into_iter(),
+ &ics,
client_handles_send,
- client_handles_recv,
- ).map(|(ic, web_send, web_recv)| {
- let r = (ic.link.client,
- Client {
- ic: ic.clone(),
- web: web_send,
- });
-
- let ic_ = ic.clone();
- tasks.push((tokio::spawn(async move {
- run_client(ic_, web_recv).await.void_unwrap_err()
- }), format!("client {}", &ic)));
-
- r
+ ).map(|(ic, (web_send, route_send))| {
+ (ic.link.client,
+ Client {
+ ic: ic.clone(),
+ web: web_send,
+ route: route_send,
+ })
}).collect();
+
let global = Arc::new(Global {
config: global_config,
all_clients,
});
+ for (ic, (web_recv, route_recv)) in izip!(
+ ics,
+ client_handles_recv,
+ ) {
+ let global_ = global.clone();
+ let ic_ = ic.clone();
+ tasks.push((tokio::spawn(async move {
+ run_client(global_, ic_, web_recv, route_recv)
+ .await.void_unwrap_err()
+ }), format!("client {}", &ic)));
+ }
+
for addr in &global.config.addrs {
let global_ = global.clone();
let make_service = hyper::service::make_service_fn(