Commit | Line | Data |
---|---|---|
8eabb4ff MW |
1 | /* -*-scala-*- |
2 | * | |
3 | * Managing TrIPE administration connections | |
4 | * | |
5 | * (c) 2018 Straylight/Edgeware | |
6 | */ | |
7 | ||
8 | /*----- Licensing notice --------------------------------------------------* | |
9 | * | |
10 | * This file is part of the Trivial IP Encryption (TrIPE) Android app. | |
11 | * | |
12 | * TrIPE is free software: you can redistribute it and/or modify it under | |
13 | * the terms of the GNU General Public License as published by the Free | |
14 | * Software Foundation; either version 3 of the License, or (at your | |
15 | * option) any later version. | |
16 | * | |
17 | * TrIPE is distributed in the hope that it will be useful, but WITHOUT | |
18 | * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or | |
19 | * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License | |
20 | * for more details. | |
21 | * | |
22 | * You should have received a copy of the GNU General Public License | |
23 | * along with TrIPE. If not, see <https://www.gnu.org/licenses/>. | |
24 | */ | |
25 | ||
26 | package uk.org.distorted.tripe; package object admin { | |
27 | ||
28 | /*----- Imports -----------------------------------------------------------*/ | |
29 | ||
30 | import java.io.{BufferedReader, Reader, Writer}; | |
31 | import java.util.concurrent.locks.{Condition, ReentrantLock => Lock}; | |
32 | ||
33 | import scala.collection.mutable.{HashMap, Publisher}; | |
34 | import scala.concurrent.Channel; | |
35 | import scala.util.control.Breaks; | |
36 | ||
25c35469 | 37 | import Implicits._; |
8eabb4ff MW |
38 | |
39 | /*----- Classification of server messages ---------------------------------*/ | |
40 | ||
41 | sealed abstract class Message; | |
42 | ||
43 | sealed abstract class JobMessage extends Message; | |
44 | case object JobOK extends JobMessage; | |
c8292b34 MW |
45 | final case class JobInfo(info: Seq[String]) extends JobMessage; |
46 | final case class JobFail(err: Seq[String]) extends JobMessage; | |
8eabb4ff MW |
47 | case object JobLostConnection extends JobMessage; |
48 | ||
c8292b34 | 49 | final case class BackgroundJobMessage(tag: String, msg: JobMessage) |
8eabb4ff | 50 | extends Message; |
c8292b34 | 51 | final case class JobDetached(tag: String) extends Message; |
8eabb4ff MW |
52 | |
53 | sealed abstract class AsyncMessage extends Message; | |
c8292b34 MW |
54 | final case class Trace(msg: String) extends AsyncMessage; |
55 | final case class Warning(err: Seq[String]) extends AsyncMessage; | |
56 | final case class Notify(note: Seq[String]) extends AsyncMessage; | |
8eabb4ff MW |
57 | case object ConnectionLost extends AsyncMessage; |
58 | ||
59 | sealed abstract class ServiceMessage extends Message; | |
c8292b34 MW |
60 | final case class ServiceCancel(jobid: String) extends ServiceMessage; |
61 | final case class ServiceClaim(svc: String, version: String) | |
8eabb4ff | 62 | extends ServiceMessage; |
c8292b34 | 63 | final case class ServiceJob(jobid: String, svc: String, |
8eabb4ff MW |
64 | cmd: String, args: Seq[String]) |
65 | extends ServiceMessage; | |
66 | ||
67 | /*----- Main code ---------------------------------------------------------*/ | |
68 | ||
8eabb4ff MW |
69 | class ConnectionClosed extends Exception; |
70 | ||
71 | class ServerFailed(msg: String) extends Exception(msg); | |
72 | ||
73 | class CommandFailed(val msg: Seq[String]) extends Exception { | |
74 | override def getMessage(): String = | |
75 | "%s(%s)".format(getClass.getName, quoteTokens(msg)); | |
76 | } | |
77 | ||
78 | class ConnectionLostException extends Exception; | |
79 | ||
80 | class Connection(val in: Reader, val out: Writer) | |
81 | extends Publisher[AsyncMessage] | |
82 | { | |
83 | /* Synchronization. | |
84 | * | |
85 | * This class is complicatedly multithreaded. The following fields must | |
86 | * only be accessed while the instance is locked. To prevent deadlocks, | |
87 | * hold the `Connection' lock before locking any individual `Job' objects. | |
88 | */ | |
89 | ||
90 | var livep: Boolean = true; // Is this connection still alive? | |
91 | var fgjob: Option[this.Job] = None; // Foreground job, if there is one. | |
92 | val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs. | |
93 | var bgseq = 0; // Next background job tag. | |
94 | ||
95 | class Job extends Iterator[Seq[String]] { | |
96 | private[Connection] val ch = new Channel[JobMessage]; | |
97 | private[this] var nextmsg: Option[JobMessage] = None; | |
98 | ||
99 | private[this] def fetchNext() | |
100 | { if (nextmsg == None) nextmsg = Some(ch.read); } | |
101 | override def hasNext: Boolean = { | |
102 | fetchNext(); | |
103 | nextmsg match { | |
104 | case Some(JobOK) => false | |
105 | case _ => true | |
106 | } | |
107 | } | |
108 | override def next(): Seq[String] = { | |
109 | fetchNext(); | |
110 | nextmsg match { | |
111 | case None => ??? | |
112 | case Some(JobOK) => throw new NoSuchElementException | |
113 | case Some(JobFail(msg)) => throw new CommandFailed(msg) | |
114 | case Some(JobLostConnection) => throw new ConnectionLostException | |
115 | case Some(JobInfo(msg)) => nextmsg = None; msg | |
7894831e MW |
116 | } |
117 | } | |
7894831e | 118 | |
8eabb4ff MW |
119 | def keyvals(): Map[String, String] = { |
120 | val b = Map.newBuilder[String, String]; | |
121 | for (line <- this; token <- line) { | |
122 | token.indexOf('=') match { | |
123 | case -1 => throw new ServerFailed("missing `=' in key-value list"); | |
124 | case eq => | |
125 | val k = token.substring(0, eq); | |
126 | val v = token.substring(eq + 1); | |
127 | b += k -> v; | |
128 | } | |
129 | } | |
130 | b.result | |
131 | } | |
7894831e | 132 | |
8eabb4ff MW |
133 | def traceish(): Seq[(Char, Boolean, String)] = { |
134 | val b = Seq.newBuilder[(Char, Boolean, String)]; | |
135 | for (line <- this) line match { | |
136 | case List(key, desc@_*) => | |
137 | val live = if (key.length == 1) false | |
138 | else if (key.length == 2 && key(1) == '+') true | |
139 | else throw new ServerFailed( | |
140 | s"incomprehensible traceish key `$key'"); | |
141 | b += ((key(0), live, desc.mkString(" "))); | |
142 | case _ => throw new ServerFailed("empty line in traceish output"); | |
143 | } | |
144 | b.result | |
145 | } | |
7894831e | 146 | |
8eabb4ff MW |
147 | def expectEmpty() { |
148 | if (hasNext) throw new ServerFailed("no output expected"); | |
7894831e | 149 | } |
8eabb4ff MW |
150 | |
151 | def oneLine(): Seq[String] = { | |
152 | if (hasNext) { | |
153 | val line = next(); | |
154 | if (!hasNext) return line; | |
7894831e | 155 | } |
8eabb4ff MW |
156 | throw new ServerFailed("exactly one line expected"); |
157 | } | |
158 | } | |
159 | ||
160 | def submit(bg: Boolean, toks: String*): this.Job = { | |
161 | var cmd = toks; | |
162 | println(";; wait for lock"); | |
163 | synchronized { | |
164 | if (bg) { | |
165 | val tag = bgseq formatted "J%05d"; bgseq += 1; | |
166 | cmd = toks match { | |
167 | case Seq(cmd, tail@_*) => cmd +: "-background" +: tag +: tail; | |
168 | } | |
169 | } | |
170 | println(";; wait for foreground"); | |
171 | while (livep && fgjob != None) wait(); | |
172 | if (!livep) throw new ConnectionClosed; | |
173 | println(";; write command"); | |
174 | try { out.write(quoteTokens(cmd)); out.write('\n'); out.flush(); } | |
175 | catch { case e: Throwable => notify(); throw e; } | |
176 | val j = new Job; | |
177 | fgjob = Some(j); | |
178 | j | |
179 | } | |
180 | } | |
181 | ||
182 | def submit(toks: String*): this.Job = submit(false, toks: _*); | |
183 | ||
184 | def close() { synchronized { out.close(); } } | |
185 | ||
186 | /* These two expect the connection lock to be held. */ | |
187 | def foregroundJob: Job = | |
188 | fgjob.getOrElse { throw new ServerFailed("no foreground job"); } | |
189 | def releaseForegroundJob() { fgjob = None; notify(); } | |
190 | ||
191 | def parseServerLine(s: String): Message = nextToken(s) match { | |
192 | case None => throw new ServerFailed("empty line from server") | |
193 | case Some(("TRACE", next)) => Trace(s.substring(next)) | |
194 | case Some((code, next)) => (code, splitTokens(s, next)) match { | |
195 | case ("OK", Seq()) => JobOK | |
196 | case ("INFO", tail) => JobInfo(tail) | |
197 | case ("FAIL", tail) => JobFail(tail) | |
198 | case ("BGDETACH", Seq(tag)) => JobDetached(tag) | |
199 | case ("BGOK", Seq(tag)) => BackgroundJobMessage(tag, JobOK) | |
200 | case ("BGINFO", Seq(tag, tail@_*)) => | |
201 | BackgroundJobMessage(tag, JobInfo(tail)) | |
202 | case ("BGFAIL", Seq(tag, tail@_*)) => | |
203 | BackgroundJobMessage(tag, JobFail(tail)) | |
204 | case ("WARN", tail) => Warning(tail) | |
205 | case ("NOTE", tail) => Notify(tail) | |
206 | case ("SVCCLAIM", Seq(svc, ver)) => ServiceClaim(svc, ver) | |
207 | case ("SVCJOB", Seq(tag, svc, cmd, args@_*)) => | |
208 | ServiceJob(tag, svc, cmd, args) | |
209 | case ("SVCCANCEL", Seq(tag)) => ServiceCancel(tag) | |
210 | case (_, tail) => throw new ServerFailed( | |
211 | "incomprehensible line from server: " + quoteTokens(code +: tail)) | |
7894831e | 212 | } |
7894831e MW |
213 | } |
214 | ||
8eabb4ff MW |
215 | def processJobMessage(msg: JobMessage) |
216 | (getjob: (Boolean) => Job) { | |
217 | synchronized { getjob(msg.isInstanceOf[JobInfo]); }.ch.write(msg); | |
7894831e | 218 | } |
8eabb4ff MW |
219 | |
220 | /* Reading lines from the server. */ | |
221 | val readthr = thread("admin reader") { | |
222 | println(";; readthr running"); | |
223 | val bin = in match { | |
224 | case br: BufferedReader => br; | |
225 | case _ => new BufferedReader(in) | |
226 | } | |
227 | var line: String = null; | |
228 | ||
229 | try { | |
230 | println(";; wait for line"); | |
231 | while ({line = bin.readLine; line != null}) { | |
232 | println(s";; line: $line"); | |
233 | parseServerLine(line) match { | |
234 | case JobDetached(tag) => synchronized { | |
235 | jobmap(tag) = foregroundJob; releaseForegroundJob(); | |
236 | } | |
237 | case msg: JobMessage => processJobMessage(msg) { keep => | |
238 | val j = foregroundJob; if (!keep) releaseForegroundJob(); j | |
239 | } | |
240 | case BackgroundJobMessage(tag, msg) => | |
241 | processJobMessage(msg) { keep => | |
242 | val j = jobmap.getOrElse(tag, throw new ServerFailed( | |
243 | s"no job with tag `${tag}'")); | |
244 | if (!keep) jobmap.remove(tag); | |
245 | j | |
246 | } | |
247 | case msg: AsyncMessage => | |
248 | publish(msg); | |
249 | case _: ServiceMessage => | |
c8292b34 | 250 | ok; |
8eabb4ff MW |
251 | } |
252 | } | |
253 | } catch { | |
254 | case e: Throwable => e.printStackTrace(); | |
255 | } finally { | |
256 | synchronized { | |
257 | livep = false; | |
258 | for ((_, j) <- jobmap) j.ch.write(JobLostConnection); | |
259 | fgjob match { | |
260 | case Some(j) => | |
261 | j.ch.write(JobLostConnection); | |
262 | fgjob = None; | |
263 | notifyAll(); | |
c8292b34 | 264 | case None => ok; |
8eabb4ff MW |
265 | } |
266 | } | |
267 | publish(ConnectionLost); | |
268 | in.close(); out.close(); | |
269 | } | |
270 | } | |
271 | } | |
272 | ||
273 | /*----- That's all, folks -------------------------------------------------*/ | |
274 | ||
7894831e | 275 | } |