chiark / gitweb /
rough work in progress; may not build
[tripe-android] / admin.scala
1 /* -*-scala-*-
2  *
3  * Managing TrIPE administration connections
4  *
5  * (c) 2018 Straylight/Edgeware
6  */
7
8 /*----- Licensing notice --------------------------------------------------*
9  *
10  * This file is part of the Trivial IP Encryption (TrIPE) Android app.
11  *
12  * TrIPE is free software: you can redistribute it and/or modify it under
13  * the terms of the GNU General Public License as published by the Free
14  * Software Foundation; either version 3 of the License, or (at your
15  * option) any later version.
16  *
17  * TrIPE is distributed in the hope that it will be useful, but WITHOUT
18  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
19  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
20  * for more details.
21  *
22  * You should have received a copy of the GNU General Public License
23  * along with TrIPE.  If not, see <https://www.gnu.org/licenses/>.
24  */
25
26 package uk.org.distorted.tripe; package object admin {
27
28 /*----- Imports -----------------------------------------------------------*/
29
30 import java.io.{BufferedReader, Reader, Writer};
31 import java.util.concurrent.locks.{Condition, ReentrantLock => Lock};
32
33 import scala.collection.mutable.{HashMap, Publisher};
34 import scala.concurrent.Channel;
35 import scala.util.control.Breaks;
36
37 import Implicits._;
38
39 /*----- Classification of server messages ---------------------------------*/
40
41 sealed abstract class Message;
42
43 sealed abstract class JobMessage extends Message;
44 case object JobOK extends JobMessage;
45 final case class JobInfo(info: Seq[String]) extends JobMessage;
46 final case class JobFail(err: Seq[String]) extends JobMessage;
47 case object JobLostConnection extends JobMessage;
48
49 final case class BackgroundJobMessage(tag: String, msg: JobMessage)
50         extends Message;
51 final case class JobDetached(tag: String) extends Message;
52
53 sealed abstract class AsyncMessage extends Message;
54 final case class Trace(msg: String) extends AsyncMessage;
55 final case class Warning(err: Seq[String]) extends AsyncMessage;
56 final case class Notify(note: Seq[String]) extends AsyncMessage;
57 case object ConnectionLost extends AsyncMessage;
58
59 sealed abstract class ServiceMessage extends Message;
60 final case class ServiceCancel(jobid: String) extends ServiceMessage;
61 final case class ServiceClaim(svc: String, version: String)
62         extends ServiceMessage;
63 final case class ServiceJob(jobid: String, svc: String,
64                       cmd: String, args: Seq[String])
65         extends ServiceMessage;
66
67 /*----- Main code ---------------------------------------------------------*/
68
69 class ConnectionClosed extends Exception;
70
71 class ServerFailed(msg: String) extends Exception(msg);
72
73 class CommandFailed(val msg: Seq[String]) extends Exception {
74   override def getMessage(): String =
75     "%s(%s)".format(getClass.getName, quoteTokens(msg));
76 }
77
78 class ConnectionLostException extends Exception;
79
80 class Connection(val in: Reader, val out: Writer)
81         extends Publisher[AsyncMessage]
82 {
83   /* Synchronization.
84    *
85    * This class is complicatedly multithreaded.  The following fields must
86    * only be accessed while the instance is locked.  To prevent deadlocks,
87    * hold the `Connection' lock before locking any individual `Job' objects.
88    */
89
90   var livep: Boolean = true;            // Is this connection still alive?
91   var fgjob: Option[this.Job] = None;   // Foreground job, if there is one.
92   val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
93   var bgseq = 0;                        // Next background job tag.
94
95   class Job extends Iterator[Seq[String]] {
96     private[Connection] val ch = new Channel[JobMessage];
97     private[this] var nextmsg: Option[JobMessage] = None;
98
99     private[this] def fetchNext()
100       { if (nextmsg == None) nextmsg = Some(ch.read); }
101     override def hasNext: Boolean = {
102       fetchNext();
103       nextmsg match {
104         case Some(JobOK) => false
105         case _ => true
106       }
107     }
108     override def next(): Seq[String] = {
109       fetchNext();
110       nextmsg match {
111         case None => ???
112         case Some(JobOK) => throw new NoSuchElementException
113         case Some(JobFail(msg)) => throw new CommandFailed(msg)
114         case Some(JobLostConnection) => throw new ConnectionLostException
115         case Some(JobInfo(msg)) => nextmsg = None; msg
116       }
117     }
118
119     def keyvals(): Map[String, String] = {
120       val b = Map.newBuilder[String, String];
121       for (line <- this; token <- line) {
122         token.indexOf('=') match {
123           case -1 => throw new ServerFailed("missing `=' in key-value list");
124           case eq =>
125             val k = token.substring(0, eq);
126             val v = token.substring(eq + 1);
127             b += k -> v;
128         }
129       }
130       b.result
131     }
132
133     def traceish(): Seq[(Char, Boolean, String)] = {
134       val b = Seq.newBuilder[(Char, Boolean, String)];
135       for (line <- this) line match {
136         case List(key, desc@_*) =>
137           val live = if (key.length == 1) false
138                      else if (key.length == 2 && key(1) == '+') true
139                      else throw new ServerFailed(
140                        s"incomprehensible traceish key `$key'");
141           b += ((key(0), live, desc.mkString(" ")));
142         case _ => throw new ServerFailed("empty line in traceish output");
143       }
144       b.result
145     }
146
147     def expectEmpty() {
148       if (hasNext) throw new ServerFailed("no output expected");
149     }
150
151     def oneLine(): Seq[String] = {
152       if (hasNext) {
153         val line = next();
154         if (!hasNext) return line;
155       }
156       throw new ServerFailed("exactly one line expected");
157     }
158   }
159
160   def submit(bg: Boolean, toks: String*): this.Job = {
161     var cmd = toks;
162 println(";; wait for lock");
163     synchronized {
164       if (bg) {
165         val tag = bgseq formatted "J%05d"; bgseq += 1;
166         cmd = toks match {
167           case Seq(cmd, tail@_*) => cmd +: "-background" +: tag +: tail;
168         }
169       }
170 println(";; wait for foreground");
171       while (livep && fgjob != None) wait();
172       if (!livep) throw new ConnectionClosed;
173 println(";; write command");
174       try { out.write(quoteTokens(cmd)); out.write('\n'); out.flush(); }
175       catch { case e: Throwable => notify(); throw e; }
176       val j = new Job;
177       fgjob = Some(j);
178       j
179     }
180   }
181
182   def submit(toks: String*): this.Job = submit(false, toks: _*);
183
184   def close() { synchronized { out.close(); } }
185
186   /* These two expect the connection lock to be held. */
187   def foregroundJob: Job =
188     fgjob.getOrElse { throw new ServerFailed("no foreground job"); }
189   def releaseForegroundJob() { fgjob = None; notify(); }
190
191   def parseServerLine(s: String): Message = nextToken(s) match {
192     case None => throw new ServerFailed("empty line from server")
193     case Some(("TRACE", next)) => Trace(s.substring(next))
194     case Some((code, next)) => (code, splitTokens(s, next)) match {
195       case ("OK", Seq()) => JobOK
196       case ("INFO", tail) => JobInfo(tail)
197       case ("FAIL", tail) => JobFail(tail)
198       case ("BGDETACH", Seq(tag)) => JobDetached(tag)
199       case ("BGOK", Seq(tag)) => BackgroundJobMessage(tag, JobOK)
200       case ("BGINFO", Seq(tag, tail@_*)) =>
201         BackgroundJobMessage(tag, JobInfo(tail))
202       case ("BGFAIL", Seq(tag, tail@_*)) =>
203         BackgroundJobMessage(tag, JobFail(tail))
204       case ("WARN", tail) => Warning(tail)
205       case ("NOTE", tail) => Notify(tail)
206       case ("SVCCLAIM", Seq(svc, ver)) => ServiceClaim(svc, ver)
207       case ("SVCJOB", Seq(tag, svc, cmd, args@_*)) =>
208         ServiceJob(tag, svc, cmd, args)
209       case ("SVCCANCEL", Seq(tag)) => ServiceCancel(tag)
210       case (_, tail) => throw new ServerFailed(
211         "incomprehensible line from server: " + quoteTokens(code +: tail))
212     }
213   }
214
215   def processJobMessage(msg: JobMessage)
216                        (getjob: (Boolean) => Job) {
217     synchronized { getjob(msg.isInstanceOf[JobInfo]); }.ch.write(msg);
218   }
219
220   /* Reading lines from the server. */
221   val readthr = thread("admin reader") {
222 println(";; readthr running");
223     val bin = in match {
224       case br: BufferedReader => br;
225       case _ => new BufferedReader(in)
226     }
227     var line: String = null;
228
229     try {
230 println(";; wait for line");
231       while ({line = bin.readLine; line != null}) {
232 println(s";; line: $line");
233         parseServerLine(line) match {
234           case JobDetached(tag) => synchronized {
235             jobmap(tag) = foregroundJob; releaseForegroundJob();
236           }
237           case msg: JobMessage => processJobMessage(msg) { keep =>
238             val j = foregroundJob; if (!keep) releaseForegroundJob(); j
239           }
240           case BackgroundJobMessage(tag, msg) =>
241             processJobMessage(msg) { keep =>
242               val j = jobmap.getOrElse(tag, throw new ServerFailed(
243                 s"no job with tag `${tag}'"));
244               if (!keep) jobmap.remove(tag);
245               j
246             }
247           case msg: AsyncMessage =>
248             publish(msg);
249           case _: ServiceMessage =>
250             ok;
251         }
252       }
253     } catch {
254       case e: Throwable => e.printStackTrace();
255     } finally {
256       synchronized {
257         livep = false;
258         for ((_, j) <- jobmap) j.ch.write(JobLostConnection);
259         fgjob match {
260           case Some(j) =>
261             j.ch.write(JobLostConnection);
262             fgjob = None;
263             notifyAll();
264           case None => ok;
265         }
266       }
267       publish(ConnectionLost);
268       in.close(); out.close();
269     }
270   }
271 }
272
273 /*----- That's all, folks -------------------------------------------------*/
274
275 }