chiark / gitweb /
keys.scala, etc.: Make merging public keys have a progress bar.
[tripe-android] / admin.scala
CommitLineData
8eabb4ff
MW
1/* -*-scala-*-
2 *
3 * Managing TrIPE administration connections
4 *
5 * (c) 2018 Straylight/Edgeware
6 */
7
8/*----- Licensing notice --------------------------------------------------*
9 *
10 * This file is part of the Trivial IP Encryption (TrIPE) Android app.
11 *
12 * TrIPE is free software: you can redistribute it and/or modify it under
13 * the terms of the GNU General Public License as published by the Free
14 * Software Foundation; either version 3 of the License, or (at your
15 * option) any later version.
16 *
17 * TrIPE is distributed in the hope that it will be useful, but WITHOUT
18 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
24 */
25
26package uk.org.distorted.tripe; package object admin {
27
28/*----- Imports -----------------------------------------------------------*/
29
3bb2303d 30import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter};
8eabb4ff
MW
31import java.util.concurrent.locks.{Condition, ReentrantLock => Lock};
32
0157de02 33import scala.collection.mutable.HashMap;
8eabb4ff
MW
34import scala.concurrent.Channel;
35import scala.util.control.Breaks;
36
25c35469 37import Implicits._;
3bb2303d 38import sys.{serverInput, serverOutput};
8eabb4ff
MW
39
40/*----- Classification of server messages ---------------------------------*/
41
42sealed abstract class Message;
43
44sealed abstract class JobMessage extends Message;
45case object JobOK extends JobMessage;
c8292b34
MW
46final case class JobInfo(info: Seq[String]) extends JobMessage;
47final case class JobFail(err: Seq[String]) extends JobMessage;
8eabb4ff
MW
48case object JobLostConnection extends JobMessage;
49
c8292b34 50final case class BackgroundJobMessage(tag: String, msg: JobMessage)
8eabb4ff 51 extends Message;
c8292b34 52final case class JobDetached(tag: String) extends Message;
8eabb4ff
MW
53
54sealed abstract class AsyncMessage extends Message;
c8292b34
MW
55final case class Trace(msg: String) extends AsyncMessage;
56final case class Warning(err: Seq[String]) extends AsyncMessage;
57final case class Notify(note: Seq[String]) extends AsyncMessage;
8eabb4ff
MW
58case object ConnectionLost extends AsyncMessage;
59
60sealed abstract class ServiceMessage extends Message;
c8292b34
MW
61final case class ServiceCancel(jobid: String) extends ServiceMessage;
62final case class ServiceClaim(svc: String, version: String)
8eabb4ff 63 extends ServiceMessage;
c8292b34 64final case class ServiceJob(jobid: String, svc: String,
8eabb4ff
MW
65 cmd: String, args: Seq[String])
66 extends ServiceMessage;
67
68/*----- Main code ---------------------------------------------------------*/
69
8eabb4ff
MW
70class ConnectionClosed extends Exception;
71
72class ServerFailed(msg: String) extends Exception(msg);
73
74class CommandFailed(val msg: Seq[String]) extends Exception {
75 override def getMessage(): String =
76 "%s(%s)".format(getClass.getName, quoteTokens(msg));
77}
78
79class ConnectionLostException extends Exception;
80
0157de02 81object Connection extends Hook[AsyncMessage]
8eabb4ff
MW
82{
83 /* Synchronization.
84 *
85 * This class is complicatedly multithreaded. The following fields must
86 * only be accessed while the instance is locked. To prevent deadlocks,
87 * hold the `Connection' lock before locking any individual `Job' objects.
88 */
89
3bb2303d
MW
90 private var livep: Boolean = true; // Is this connection still alive?
91 private var fgjob: Option[this.Job] = None; // Foreground job, if there is one.
92 private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
93 private var bgseq = 0; // Next background job tag.
8eabb4ff 94
3bb2303d
MW
95 private val in = new BufferedReader(new InputStreamReader(serverInput));
96 private val out = new OutputStreamWriter(serverOutput);
97
98 type Pub = Connection.type;
fd8dac14 99
8eabb4ff
MW
100 class Job extends Iterator[Seq[String]] {
101 private[Connection] val ch = new Channel[JobMessage];
102 private[this] var nextmsg: Option[JobMessage] = None;
103
104 private[this] def fetchNext()
0157de02 105 { if (!nextmsg) nextmsg = Some(ch.read); }
8eabb4ff
MW
106 override def hasNext: Boolean = {
107 fetchNext();
108 nextmsg match {
109 case Some(JobOK) => false
110 case _ => true
111 }
112 }
113 override def next(): Seq[String] = {
114 fetchNext();
115 nextmsg match {
7eb3f62e 116 case None => unreachable;
0157de02
MW
117 case Some(JobOK) => throw new NoSuchElementException;
118 case Some(JobFail(msg)) => throw new CommandFailed(msg);
119 case Some(JobLostConnection) => throw new ConnectionLostException;
8eabb4ff 120 case Some(JobInfo(msg)) => nextmsg = None; msg
7894831e
MW
121 }
122 }
7894831e 123
8eabb4ff
MW
124 def keyvals(): Map[String, String] = {
125 val b = Map.newBuilder[String, String];
126 for (line <- this; token <- line) {
127 token.indexOf('=') match {
128 case -1 => throw new ServerFailed("missing `=' in key-value list");
129 case eq =>
130 val k = token.substring(0, eq);
131 val v = token.substring(eq + 1);
132 b += k -> v;
133 }
134 }
135 b.result
136 }
7894831e 137
8eabb4ff
MW
138 def traceish(): Seq[(Char, Boolean, String)] = {
139 val b = Seq.newBuilder[(Char, Boolean, String)];
140 for (line <- this) line match {
141 case List(key, desc@_*) =>
142 val live = if (key.length == 1) false
143 else if (key.length == 2 && key(1) == '+') true
144 else throw new ServerFailed(
145 s"incomprehensible traceish key `$key'");
146 b += ((key(0), live, desc.mkString(" ")));
147 case _ => throw new ServerFailed("empty line in traceish output");
148 }
149 b.result
150 }
7894831e 151
8eabb4ff
MW
152 def expectEmpty() {
153 if (hasNext) throw new ServerFailed("no output expected");
7894831e 154 }
8eabb4ff
MW
155
156 def oneLine(): Seq[String] = {
157 if (hasNext) {
158 val line = next();
159 if (!hasNext) return line;
7894831e 160 }
8eabb4ff
MW
161 throw new ServerFailed("exactly one line expected");
162 }
163 }
164
165 def submit(bg: Boolean, toks: String*): this.Job = {
166 var cmd = toks;
167println(";; wait for lock");
168 synchronized {
169 if (bg) {
170 val tag = bgseq formatted "J%05d"; bgseq += 1;
171 cmd = toks match {
172 case Seq(cmd, tail@_*) => cmd +: "-background" +: tag +: tail;
173 }
174 }
175println(";; wait for foreground");
176 while (livep && fgjob != None) wait();
177 if (!livep) throw new ConnectionClosed;
178println(";; write command");
179 try { out.write(quoteTokens(cmd)); out.write('\n'); out.flush(); }
180 catch { case e: Throwable => notify(); throw e; }
181 val j = new Job;
182 fgjob = Some(j);
183 j
184 }
185 }
186
187 def submit(toks: String*): this.Job = submit(false, toks: _*);
188
8eabb4ff
MW
189 /* These two expect the connection lock to be held. */
190 def foregroundJob: Job =
191 fgjob.getOrElse { throw new ServerFailed("no foreground job"); }
192 def releaseForegroundJob() { fgjob = None; notify(); }
193
194 def parseServerLine(s: String): Message = nextToken(s) match {
195 case None => throw new ServerFailed("empty line from server")
196 case Some(("TRACE", next)) => Trace(s.substring(next))
197 case Some((code, next)) => (code, splitTokens(s, next)) match {
198 case ("OK", Seq()) => JobOK
199 case ("INFO", tail) => JobInfo(tail)
200 case ("FAIL", tail) => JobFail(tail)
201 case ("BGDETACH", Seq(tag)) => JobDetached(tag)
202 case ("BGOK", Seq(tag)) => BackgroundJobMessage(tag, JobOK)
203 case ("BGINFO", Seq(tag, tail@_*)) =>
204 BackgroundJobMessage(tag, JobInfo(tail))
205 case ("BGFAIL", Seq(tag, tail@_*)) =>
206 BackgroundJobMessage(tag, JobFail(tail))
207 case ("WARN", tail) => Warning(tail)
208 case ("NOTE", tail) => Notify(tail)
209 case ("SVCCLAIM", Seq(svc, ver)) => ServiceClaim(svc, ver)
210 case ("SVCJOB", Seq(tag, svc, cmd, args@_*)) =>
211 ServiceJob(tag, svc, cmd, args)
212 case ("SVCCANCEL", Seq(tag)) => ServiceCancel(tag)
213 case (_, tail) => throw new ServerFailed(
214 "incomprehensible line from server: " + quoteTokens(code +: tail))
7894831e 215 }
7894831e
MW
216 }
217
8eabb4ff
MW
218 def processJobMessage(msg: JobMessage)
219 (getjob: (Boolean) => Job) {
220 synchronized { getjob(msg.isInstanceOf[JobInfo]); }.ch.write(msg);
7894831e 221 }
8eabb4ff
MW
222
223 /* Reading lines from the server. */
224 val readthr = thread("admin reader") {
225println(";; readthr running");
226 val bin = in match {
227 case br: BufferedReader => br;
228 case _ => new BufferedReader(in)
229 }
230 var line: String = null;
231
232 try {
233println(";; wait for line");
234 while ({line = bin.readLine; line != null}) {
235println(s";; line: $line");
236 parseServerLine(line) match {
237 case JobDetached(tag) => synchronized {
238 jobmap(tag) = foregroundJob; releaseForegroundJob();
239 }
240 case msg: JobMessage => processJobMessage(msg) { keep =>
241 val j = foregroundJob; if (!keep) releaseForegroundJob(); j
242 }
243 case BackgroundJobMessage(tag, msg) =>
244 processJobMessage(msg) { keep =>
245 val j = jobmap.getOrElse(tag, throw new ServerFailed(
246 s"no job with tag `${tag}'"));
247 if (!keep) jobmap.remove(tag);
248 j
249 }
250 case msg: AsyncMessage =>
0157de02 251 callHook(msg);
8eabb4ff 252 case _: ServiceMessage =>
c8292b34 253 ok;
8eabb4ff
MW
254 }
255 }
256 } catch {
257 case e: Throwable => e.printStackTrace();
258 } finally {
259 synchronized {
260 livep = false;
261 for ((_, j) <- jobmap) j.ch.write(JobLostConnection);
262 fgjob match {
263 case Some(j) =>
264 j.ch.write(JobLostConnection);
265 fgjob = None;
266 notifyAll();
c8292b34 267 case None => ok;
8eabb4ff
MW
268 }
269 }
0157de02 270 callHook(ConnectionLost);
8eabb4ff
MW
271 }
272 }
273}
274
275/*----- That's all, folks -------------------------------------------------*/
276
7894831e 277}