chiark / gitweb /
Overhaul the build machinery.
[tripe-android] / admin.scala
CommitLineData
8eabb4ff
MW
1/* -*-scala-*-
2 *
3 * Managing TrIPE administration connections
4 *
5 * (c) 2018 Straylight/Edgeware
6 */
7
8/*----- Licensing notice --------------------------------------------------*
9 *
10 * This file is part of the Trivial IP Encryption (TrIPE) Android app.
11 *
12 * TrIPE is free software: you can redistribute it and/or modify it under
13 * the terms of the GNU General Public License as published by the Free
14 * Software Foundation; either version 3 of the License, or (at your
15 * option) any later version.
16 *
17 * TrIPE is distributed in the hope that it will be useful, but WITHOUT
18 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
20 * for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with TrIPE. If not, see <https://www.gnu.org/licenses/>.
24 */
25
26package uk.org.distorted.tripe; package object admin {
27
28/*----- Imports -----------------------------------------------------------*/
29
30import java.io.{BufferedReader, Reader, Writer};
31import java.util.concurrent.locks.{Condition, ReentrantLock => Lock};
32
33import scala.collection.mutable.{HashMap, Publisher};
34import scala.concurrent.Channel;
35import scala.util.control.Breaks;
36
25c35469 37import Implicits._;
8eabb4ff
MW
38
39/*----- Classification of server messages ---------------------------------*/
40
41sealed abstract class Message;
42
43sealed abstract class JobMessage extends Message;
44case object JobOK extends JobMessage;
c8292b34
MW
45final case class JobInfo(info: Seq[String]) extends JobMessage;
46final case class JobFail(err: Seq[String]) extends JobMessage;
8eabb4ff
MW
47case object JobLostConnection extends JobMessage;
48
c8292b34 49final case class BackgroundJobMessage(tag: String, msg: JobMessage)
8eabb4ff 50 extends Message;
c8292b34 51final case class JobDetached(tag: String) extends Message;
8eabb4ff
MW
52
53sealed abstract class AsyncMessage extends Message;
c8292b34
MW
54final case class Trace(msg: String) extends AsyncMessage;
55final case class Warning(err: Seq[String]) extends AsyncMessage;
56final case class Notify(note: Seq[String]) extends AsyncMessage;
8eabb4ff
MW
57case object ConnectionLost extends AsyncMessage;
58
59sealed abstract class ServiceMessage extends Message;
c8292b34
MW
60final case class ServiceCancel(jobid: String) extends ServiceMessage;
61final case class ServiceClaim(svc: String, version: String)
8eabb4ff 62 extends ServiceMessage;
c8292b34 63final case class ServiceJob(jobid: String, svc: String,
8eabb4ff
MW
64 cmd: String, args: Seq[String])
65 extends ServiceMessage;
66
67/*----- Main code ---------------------------------------------------------*/
68
8eabb4ff
MW
69class ConnectionClosed extends Exception;
70
71class ServerFailed(msg: String) extends Exception(msg);
72
73class CommandFailed(val msg: Seq[String]) extends Exception {
74 override def getMessage(): String =
75 "%s(%s)".format(getClass.getName, quoteTokens(msg));
76}
77
78class ConnectionLostException extends Exception;
79
80class Connection(val in: Reader, val out: Writer)
81 extends Publisher[AsyncMessage]
82{
83 /* Synchronization.
84 *
85 * This class is complicatedly multithreaded. The following fields must
86 * only be accessed while the instance is locked. To prevent deadlocks,
87 * hold the `Connection' lock before locking any individual `Job' objects.
88 */
89
90 var livep: Boolean = true; // Is this connection still alive?
91 var fgjob: Option[this.Job] = None; // Foreground job, if there is one.
92 val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
93 var bgseq = 0; // Next background job tag.
94
95 class Job extends Iterator[Seq[String]] {
96 private[Connection] val ch = new Channel[JobMessage];
97 private[this] var nextmsg: Option[JobMessage] = None;
98
99 private[this] def fetchNext()
100 { if (nextmsg == None) nextmsg = Some(ch.read); }
101 override def hasNext: Boolean = {
102 fetchNext();
103 nextmsg match {
104 case Some(JobOK) => false
105 case _ => true
106 }
107 }
108 override def next(): Seq[String] = {
109 fetchNext();
110 nextmsg match {
111 case None => ???
112 case Some(JobOK) => throw new NoSuchElementException
113 case Some(JobFail(msg)) => throw new CommandFailed(msg)
114 case Some(JobLostConnection) => throw new ConnectionLostException
115 case Some(JobInfo(msg)) => nextmsg = None; msg
7894831e
MW
116 }
117 }
7894831e 118
8eabb4ff
MW
119 def keyvals(): Map[String, String] = {
120 val b = Map.newBuilder[String, String];
121 for (line <- this; token <- line) {
122 token.indexOf('=') match {
123 case -1 => throw new ServerFailed("missing `=' in key-value list");
124 case eq =>
125 val k = token.substring(0, eq);
126 val v = token.substring(eq + 1);
127 b += k -> v;
128 }
129 }
130 b.result
131 }
7894831e 132
8eabb4ff
MW
133 def traceish(): Seq[(Char, Boolean, String)] = {
134 val b = Seq.newBuilder[(Char, Boolean, String)];
135 for (line <- this) line match {
136 case List(key, desc@_*) =>
137 val live = if (key.length == 1) false
138 else if (key.length == 2 && key(1) == '+') true
139 else throw new ServerFailed(
140 s"incomprehensible traceish key `$key'");
141 b += ((key(0), live, desc.mkString(" ")));
142 case _ => throw new ServerFailed("empty line in traceish output");
143 }
144 b.result
145 }
7894831e 146
8eabb4ff
MW
147 def expectEmpty() {
148 if (hasNext) throw new ServerFailed("no output expected");
7894831e 149 }
8eabb4ff
MW
150
151 def oneLine(): Seq[String] = {
152 if (hasNext) {
153 val line = next();
154 if (!hasNext) return line;
7894831e 155 }
8eabb4ff
MW
156 throw new ServerFailed("exactly one line expected");
157 }
158 }
159
160 def submit(bg: Boolean, toks: String*): this.Job = {
161 var cmd = toks;
162println(";; wait for lock");
163 synchronized {
164 if (bg) {
165 val tag = bgseq formatted "J%05d"; bgseq += 1;
166 cmd = toks match {
167 case Seq(cmd, tail@_*) => cmd +: "-background" +: tag +: tail;
168 }
169 }
170println(";; wait for foreground");
171 while (livep && fgjob != None) wait();
172 if (!livep) throw new ConnectionClosed;
173println(";; write command");
174 try { out.write(quoteTokens(cmd)); out.write('\n'); out.flush(); }
175 catch { case e: Throwable => notify(); throw e; }
176 val j = new Job;
177 fgjob = Some(j);
178 j
179 }
180 }
181
182 def submit(toks: String*): this.Job = submit(false, toks: _*);
183
184 def close() { synchronized { out.close(); } }
185
186 /* These two expect the connection lock to be held. */
187 def foregroundJob: Job =
188 fgjob.getOrElse { throw new ServerFailed("no foreground job"); }
189 def releaseForegroundJob() { fgjob = None; notify(); }
190
191 def parseServerLine(s: String): Message = nextToken(s) match {
192 case None => throw new ServerFailed("empty line from server")
193 case Some(("TRACE", next)) => Trace(s.substring(next))
194 case Some((code, next)) => (code, splitTokens(s, next)) match {
195 case ("OK", Seq()) => JobOK
196 case ("INFO", tail) => JobInfo(tail)
197 case ("FAIL", tail) => JobFail(tail)
198 case ("BGDETACH", Seq(tag)) => JobDetached(tag)
199 case ("BGOK", Seq(tag)) => BackgroundJobMessage(tag, JobOK)
200 case ("BGINFO", Seq(tag, tail@_*)) =>
201 BackgroundJobMessage(tag, JobInfo(tail))
202 case ("BGFAIL", Seq(tag, tail@_*)) =>
203 BackgroundJobMessage(tag, JobFail(tail))
204 case ("WARN", tail) => Warning(tail)
205 case ("NOTE", tail) => Notify(tail)
206 case ("SVCCLAIM", Seq(svc, ver)) => ServiceClaim(svc, ver)
207 case ("SVCJOB", Seq(tag, svc, cmd, args@_*)) =>
208 ServiceJob(tag, svc, cmd, args)
209 case ("SVCCANCEL", Seq(tag)) => ServiceCancel(tag)
210 case (_, tail) => throw new ServerFailed(
211 "incomprehensible line from server: " + quoteTokens(code +: tail))
7894831e 212 }
7894831e
MW
213 }
214
8eabb4ff
MW
215 def processJobMessage(msg: JobMessage)
216 (getjob: (Boolean) => Job) {
217 synchronized { getjob(msg.isInstanceOf[JobInfo]); }.ch.write(msg);
7894831e 218 }
8eabb4ff
MW
219
220 /* Reading lines from the server. */
221 val readthr = thread("admin reader") {
222println(";; readthr running");
223 val bin = in match {
224 case br: BufferedReader => br;
225 case _ => new BufferedReader(in)
226 }
227 var line: String = null;
228
229 try {
230println(";; wait for line");
231 while ({line = bin.readLine; line != null}) {
232println(s";; line: $line");
233 parseServerLine(line) match {
234 case JobDetached(tag) => synchronized {
235 jobmap(tag) = foregroundJob; releaseForegroundJob();
236 }
237 case msg: JobMessage => processJobMessage(msg) { keep =>
238 val j = foregroundJob; if (!keep) releaseForegroundJob(); j
239 }
240 case BackgroundJobMessage(tag, msg) =>
241 processJobMessage(msg) { keep =>
242 val j = jobmap.getOrElse(tag, throw new ServerFailed(
243 s"no job with tag `${tag}'"));
244 if (!keep) jobmap.remove(tag);
245 j
246 }
247 case msg: AsyncMessage =>
248 publish(msg);
249 case _: ServiceMessage =>
c8292b34 250 ok;
8eabb4ff
MW
251 }
252 }
253 } catch {
254 case e: Throwable => e.printStackTrace();
255 } finally {
256 synchronized {
257 livep = false;
258 for ((_, j) <- jobmap) j.ch.write(JobLostConnection);
259 fgjob match {
260 case Some(j) =>
261 j.ch.write(JobLostConnection);
262 fgjob = None;
263 notifyAll();
c8292b34 264 case None => ok;
8eabb4ff
MW
265 }
266 }
267 publish(ConnectionLost);
268 in.close(); out.close();
269 }
270 }
271}
272
273/*----- That's all, folks -------------------------------------------------*/
274
7894831e 275}