#![allow(unused_imports)]
use anyhow::{anyhow,Context};
use thiserror::Error;
use tokio::select;
use tokio::sync::watch;
use tokio::process::{Command,Child,ChildStdout};
use tokio::io::AsyncReadExt;
use tokio::time::{Duration,delay_for};
use tokio::net::TcpStream;
use futures_util::FutureExt;
use futures_util::future::{self,try_select};
use futures_util::future::Either::*;
use libc::SIGTERM;
use std::os::unix::process::ExitStatusExt;
use std::fmt::{self,Display,Formatter};
use std::process::Stdio;
use apigpio::{Connection,GpioChange};
use apigpio::Level::*;
use picollar::{tasktrack,sequence};
use picollar::ksmode::*;
use picollar::ksmode::Mode::*;
use picollar::ledpins::*;
use picollar::debounce::*;
use picollar::local::{ShutdownPins,SHUTDOWN_PINS};
use picollar::config::{self,Config};
use picollar::RecvError;
type E = anyhow::Error;
#[derive(Debug,Copy,Clone)]
enum Led {
Red,
AppFailed,
InMode(OverallMode),
}
use Led::*;
const PING_INTERVAL_SECS : u32 = 3;
const PING_GRACE : Duration = Duration::from_millis(500);
const DEFAULT_LOCAL_SSH : &str = "127.0.0.1:22";
const SSH_GRACE : Duration = Duration::from_millis(500);
const SSH_CHECK_EVERY : Duration = Duration::from_millis(200);
const SHUTDOWN_REQUEST_DELAY : Duration = Duration::from_millis(3000);
type LedInstructions = watch::Receiver<Led>;
async fn watch_led_ping(cfg : Config,
mut instr : LedInstructions,
output : watch::Sender<bool>) -> Result<(),E> {
let mut pinger : Option<(Child, ChildStdout, bool)> = None;
let mut how = *instr.borrow();
'read_how : loop {
match how {
Red | AppFailed | InMode(Shutdown) => {
output.broadcast(true)?;
},
InMode(Run(Local)) => {
output.broadcast(true)?;
if let Some((mut child,_,_)) = pinger {
println!("manager: ping: killing");
child.kill()?;
child.await?;
}
pinger = None;
},
InMode(Run(Net)) | InMode(Run(Dev)) => {
if pinger.is_none() {
let target : String = cfg.need("ping_address")?;
println!("manager: ping: launching");
let mut cmd = cfg.command_general
("ping_command",
&["ping","-n"],
&["-i", &format!("{}",PING_INTERVAL_SECS), &target]
)?;
cmd
.stdout(Stdio::piped());
pinger = Some({
let mut child = cmd.spawn()?;
let stdout = child.stdout.take().unwrap();
(child, stdout, false)
});
output.broadcast(false)?;
}
let ping_await = Duration::from_secs(PING_INTERVAL_SECS.into())
+ PING_GRACE;
let mut sent = None;
let mut send = |ok|{
if sent != Some(ok) {
println!("manager: ping: report {}", ok);
output.broadcast(ok)?
}
sent = Some(ok);
<Result<(),E>>::Ok(())
};
loop {
let (ref mut child,
ref mut stdout,
ref mut had_newline) = pinger.as_mut().unwrap();
let pinger_ok = select!{
pinger_ok = async { <Result<bool,E>>::Ok(loop {
let mut buf = [0u8; 80];
let n = stdout.read(&mut buf).await?;
if n == 0 {
println!("manager: ping: eof!");
break false;
}
if *had_newline { break true }
if buf.contains(&('\n' as u8)) { *had_newline = true }
})} => {
let ok = pinger_ok?;
send(ok)?;
ok
},
_ = delay_for(ping_await) => {
send(false)?;
true
},
status = child => {
println!("manager: ping failed: {:?}", status);
send(false)?;
false
},
new_how = instr.recv() => {
how = new_how.ok_or(RecvError{})?;
continue 'read_how;
},
};
if !pinger_ok {
pinger = None;
break;
}
}
}
}
how = instr.recv().await.ok_or(RecvError{})?;
}
}
async fn watch_led_ssh(cfg : Config,
mut instr : LedInstructions,
output : watch::Sender<bool>) -> Result<(),E> {
let mut how = *instr.borrow();
loop {
match how {
Red | AppFailed
| InMode(Run(Local))
| InMode(Run(Net))
| InMode(Shutdown)
=> {
output.broadcast(true)?;
}
InMode(Run(Dev)) => {
output.broadcast(false)?;
let target = cfg.lookup("local_ssh", DEFAULT_LOCAL_SSH.to_owned())?;
println!("manager: ssh: checking {}", &target);
loop {
let ok = select!{
r = TcpStream::connect(&target) => { r.is_ok() },
_ = delay_for(SSH_GRACE) => { false },
};
if ok { break }
delay_for(SSH_CHECK_EVERY).await;
}
println!("manager: ssh: ok");
output.broadcast(true)?;
}
};
how = instr.recv().await.ok_or(RecvError{})?;
}
}
async fn led_spawn(cfg : Config, pi : Connection,
tt : &mut tasktrack::Tracker)
-> Result<watch::Sender<Led>,E> {
let (instr_sender, mut instr_receiver) = watch::channel(Led::Red);
let (ledping_sender, mut ledping_receiver) = watch::channel(false);
tt.spawn("led pinger", watch_led_ping(cfg.clone(), instr_receiver.clone(),
ledping_sender));
let (sshcheck_sender, mut sshcheck_receiver) = watch::channel(false);
tt.spawn("ssh checkerr", watch_led_ssh(cfg.clone(), instr_receiver.clone(),
sshcheck_sender));
tt.spawn("led implementor", async move {
loop {
let mode = *instr_receiver.borrow();
let oks = (*ledping_receiver.borrow(),
*sshcheck_receiver.borrow());
let rgb = match mode {
Red => [H,L,L],
AppFailed => [H,L,L],
InMode(Run(Dev)) => [L,L,H],
InMode(Run(Local)) => [L,H,L],
InMode(Run(Net)) => [H,H,H],
InMode(Shutdown) => [H,H,L],
};
let flash = match (mode, oks) {
(Red, (_ ,_ )) => false,
(AppFailed, (_ ,_ )) => true ,
(InMode(Shutdown ), (_ ,_ )) => false,
(InMode(Run(Dev )), (true ,true )) => false,
(InMode(Run(Local)), (_ ,_ )) => false,
(InMode(Run(Net )), (true ,_ )) => false,
(InMode(Run(_ )), (_ ,_ )) => true ,
};
(select!{
r = instr_receiver.recv() => { r.ok_or(RecvError{})?; Ok(()) }
r = ledping_receiver.recv() => { r.ok_or(RecvError{})?; Ok(()) }
r = sshcheck_receiver.recv() => { r.ok_or(RecvError{})?; Ok(()) }
r = async {
if flash {
flashled(&pi, rgb, FLASH_NORMAL).await?;
} else {
setled(&pi, rgb).await?;
let _nothing : () = future::pending().await;
}
<Result<(),E>>::Ok(())
} => r,
})?;
}
});
Ok(instr_sender)
}
pub async fn xmit_stop(pi : &Connection) -> Result<(),E> {
pi.wave_tx_stop().await?;
pi.gpio_write(sequence::GPIO, L).await?;
pi.wave_clear().await?;
Ok(())
}
pub struct XmitGuard {
pi : Option<Connection>,
}
impl XmitGuard {
pub fn new(pi : Connection) -> XmitGuard { XmitGuard { pi : Some(pi) } }
async fn stop(&mut self) -> Result<(),E> {
if let Some(pi) = self.pi.take() { xmit_stop(&pi).await? }
Ok(())
}
}
impl Drop for XmitGuard {
fn drop(&mut self) {
if self.pi.is_some() { panic!("forgot to stop XmitGuard") }
}
}
#[derive(Debug,Copy,Clone,Eq,PartialEq)]
pub enum OverallMode { Run(Mode), Shutdown }
use OverallMode::*;
impl OverallMode {
pub fn runlevel(self) -> &'static str { match self {
Run(Dev ) => "2",
Run(Local) => "4",
Run(Net ) => "3",
Shutdown => "0",
} }
pub fn wants_app_mode(self) -> Option<Mode> { match self {
Run(ksm) => if ksm.ksmode_wants_app() { Some(ksm) } else { None },
Shutdown => None,
} }
pub fn wants_app(self) -> bool { self.wants_app_mode().is_some() }
}
struct State {
cfg : Config,
pi : Connection,
ksmode_output : watch::Receiver<Mode>,
mode : OverallMode,
shutdownreq : ShutdownTracker,
ledchannel : watch::Sender<Led>,
}
#[derive(Copy,Clone,Debug,Eq,PartialEq)]
pub struct ShutdownRequest(pub bool);
pub type ShutdownTracker = watch::Receiver<ShutdownRequest>;
impl Debounceable for ShutdownRequest {
type Spec = ShutdownPins;
fn pins(spec : &Self::Spec) -> &[u32] { spec }
fn interpret(l : &[GpioChange]) -> Option<Self> {
Some(ShutdownRequest( l.iter().all(|p| p.level == Some(L) ) ))
}
fn delay() -> Duration { SHUTDOWN_REQUEST_DELAY }
fn description() -> String { "ShutdownRequest".to_owned() }
fn equivalent(&self, other : &Self) -> bool { *self == *other }
}
impl State {
async fn new_mode(&mut self) -> Result<(),E> {
let old_mode = self.mode;
if old_mode == Shutdown {
self.led(InMode(old_mode))?;
let _nothing : () = future::pending().await;
}
let ref mut ksmode = &mut self.ksmode_output;
let ref mut shutdownreq = &mut self.shutdownreq;
let new_mode = select!{
r = ksmode.recv() => {
Run(r.ok_or(RecvError{})?)
},
r = async {
if old_mode == Run(Dev) {
delay_for(SHUTDOWN_REQUEST_DELAY).await;
loop {
let rr = shutdownreq.recv().await.ok_or(RecvError{})?;
let want_shutdown = rr.0;
if want_shutdown {
println!("manager: shutdown request!");
break <Result<(),E>>::Ok(())
}
}
} else {
future::pending().await
}
} => {
r?;
Shutdown
},
};
self.mode = new_mode;
self.telinit().await?;
Ok(())
}
async fn run_telinit(&mut self, runlevel : &str) -> Result<(),E> {
let mut cmd = self.cfg.command_general
("telinit", &["telinit"], &[runlevel])?;
let ok = cmd
.spawn()?
.await?;
if !ok.success() { Err(anyhow!("telinit failed {:?}",ok))? }
Ok(())
}
async fn telinit(&mut self) -> Result<(),E> {
self.led(Led::Red)?;
let runlevel = self.mode.runlevel();
println!("manager: entering mode {:?} (switching to runlevel {})...",
self.mode, runlevel);
let l = match self.mode { Shutdown => InMode(self.mode), _ => Red };
self.led(l)?;
self.run_telinit(runlevel).await.context("run telinit")?;
Ok(())
}
fn led(&mut self, l : Led) -> Result<(),E> {
self.ledchannel.broadcast(l)?;
Ok(())
}
async fn run_app(&mut self, ksm : Mode) -> Result<(),E> {
'restarting_app : loop {
println!("manager: starting application...");
let mut xmit = XmitGuard::new(self.pi.clone());
let mut cmd = self.cfg.command_simple
("application", "application", &[format!("{:?}",ksm)])?;
let app = cmd.spawn();
let app = match app {
Err(e) => {
xmit.stop().await?;
eprintln!("failed to execute application: {:?}", e);
self.led(Led::AppFailed)?;
break;
},
Ok(a) => a,
};
let status = match future::select(
Box::pin(async { 'appish_mode : loop {
self.led(Led::InMode(self.mode))?;
self.new_mode().await?;
if !self.mode.wants_app() {
println!("manager: want {:?}, killing application", self.mode);
self.led(Led::Red)?;
xmit_stop(&self.pi).await?;
break <Result<(),E>>::Ok(())
}
println!("manager: want {:?}, keeping application", self.mode);
continue 'appish_mode;
} }),
app
).await {
Left((ok, mut app)) => {
app.kill().unwrap_or_else(|e|{
eprintln!("manager: warning: app kill failed: {:?}", e);
});
app.await?;
xmit.stop().await?;
ok?;
return Ok(());
},
Right((status, _)) => {
status?
},
};
self.led(Led::Red)?;
xmit.stop().await?;
if status.signal() == Some(SIGTERM) {
println!("manager: application got SIGTERM");
continue 'restarting_app;
} else if status.success() {
self.led(Led::Red)?;
println!("manager: application exited, expecting new mode");
break 'restarting_app;
} else {
println!("manager: application failed {:?}", status);
self.led(Led::AppFailed)?;
break 'restarting_app;
}
}
self.new_mode().await?;
return Ok(());
}
async fn operate(&mut self) -> Result<(),E> {
self.telinit().await?;
loop {
if let Some(ksm) = self.mode.wants_app_mode() {
self.run_app(ksm).await?;
} else {
self.led(Led::InMode(self.mode))?;
self.new_mode().await?;
}
}
}
}
async fn real_main() -> Result<(),E> {
let cfg = config::read()?;
let pi = Connection::new().await?;
let mut tt = tasktrack::Tracker::new();
let mut ksmode_output = keyswitch_tracker(pi.clone(), &mut tt, None).await?;
let shutdownreq = new_debouncer(pi.clone(), &mut tt,
&SHUTDOWN_PINS, Some(ShutdownRequest(false))).await?;
let ledchannel = led_spawn(cfg.clone(), pi.clone(), &mut tt).await?;
let mode = ksmode_output.recv().await.expect("initial KS mode");
let mut state = State {
cfg, pi, ksmode_output, ledchannel, shutdownreq,
mode : Run(mode)
};
select!{
r = state.operate() => r?,
r = tt.failfast() => r?,
};
Ok(())
}
picollar::wrap_main!("manager");