X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/tripe-android/blobdiff_plain/68df6e8f5b575d7b737733339339b3b05ecc72a3..04a5abaece151705e9bd7026653f79938a7a2fbc:/util.scala diff --git a/util.scala b/util.scala index 75b3677..ea776e7 100644 --- a/util.scala +++ b/util.scala @@ -31,9 +31,12 @@ import scala.concurrent.duration.{Deadline, Duration}; import scala.util.control.{Breaks, ControlThrowable}; import java.io.{BufferedReader, Closeable, File, InputStream, Reader}; -import java.net.{URL, URLConnection}; +import java.net.{HttpURLConnection, URL, URLConnection}; import java.nio.{ByteBuffer, CharBuffer}; +import java.nio.channels.{SelectionKey, Selector}; +import java.nio.channels.spi.{AbstractSelector, AbstractSelectableChannel}; import java.nio.charset.Charset; +import java.util.{Set => JSet}; import java.util.concurrent.locks.{Lock, ReentrantLock}; /*----- Miscellaneous useful things ---------------------------------------*/ @@ -194,6 +197,61 @@ def loopUnit(body: (=> Nothing) => Unit): Unit = { val BREAKS = new Breaks; import BREAKS.{breakable, break}; +/*----- Interruptably doing things ----------------------------------------*/ + +private class InterruptCatcher[T](body: => T, onWakeup: => Unit) + extends AbstractSelector(null) { + /* Hook onto the VM's thread interruption machinery. + * + * The `run' method is the only really interesting one. It will run the + * BODY, returning its result; if the thread is interrupted during this + * time, ONWAKEUP is invoked for effect. The expectation is that ONWAKEUP + * will somehow cause BODY to stop early. + * + * Credit for this hack goes to Nicholas Wilson: see + * . + */ + + private def nope: Nothing = + { throw new UnsupportedOperationException("can't do that"); } + protected def implCloseSelector() { } + protected def register(chan: AbstractSelectableChannel, + ops: Int, att: Any): SelectionKey = nope; + def keys(): JSet[SelectionKey] = nope; + def selectedKeys(): JSet[SelectionKey] = nope; + def select(): Int = nope; + def select(millis: Long): Int = nope; + def selectNow(): Int = nope; + + def run(): T = try { + begin(); + val ret = body; + if (Thread.interrupted()) throw new InterruptedException; + ret + } finally { + end(); + } + def wakeup(): Selector = { onWakeup; this } +} + +class PendingInterruptable[T] private[tripe](body: => T) { + /* This class exists to provide the `onInterrupt THUNK' syntax. */ + + def onInterrupt(thunk: => Unit): T = + new InterruptCatcher(body, thunk).run(); +} +def interruptably[T](body: => T) = { + /* interruptably { BODY } onInterrupt { THUNK } + * + * Execute BODY and return its result. If the thread receives an + * interrupt -- or is already in an interrupted state -- execute THUNK for + * effect; it is expected to cause BODY to return expeditiously, and when + * the BODY completes, an `InterruptedException' is thrown. + */ + + new PendingInterruptable(body); +} + /*----- A gadget for fetching URLs ----------------------------------------*/ class URLFetchException(msg: String) extends Exception(msg); @@ -208,47 +266,74 @@ def fetchURL(url: URL, cb: URLFetchCallbacks) { /* Fetch the URL, feeding the data through the callbacks CB. */ withCleaner { clean => - var win: Boolean = false; - clean { cb.done(win); } + var win: Boolean = false; clean { cb.done(win); } - /* Set up the connection, and run a preflight check. */ + /* Set up the connection. This isn't going to block, I think, and we + * need to use it in the interrupt handler. + */ val c = url.openConnection(); - cb.preflight(c); - - /* Start fetching data. */ - val in = c.getInputStream; clean { in.close(); } - val explen = c.getContentLength; - /* Read a buffer at a time, and give it to the callback. Maintain a - * running total. + /* Java's default URL handlers don't respond to interrupts, so we have to + * take over this duty. */ - var len: Long = 0; - blockUnit { exit => - for ((buf, n) <- blocks(in)) { - cb.write(buf, n, len); - len += n; - if (explen != -1 && len > explen) exit; + interruptably { + /* Run the caller's preflight check. This must be done here, since it + * might well block while it discovers things like the content length. + */ + cb.preflight(c); + + /* Start fetching data. */ + val in = c.getInputStream; clean { in.close(); } + val explen = c.getContentLength; + + /* Read a buffer at a time, and give it to the callback. Maintain a + * running total. + */ + var len: Long = 0; + blockUnit { exit => + for ((buf, n) <- blocks(in)) { + cb.write(buf, n, len); + len += n; + if (explen != -1 && len > explen) exit; + } } - } - /* I can't find it documented anywhere that the existing machinery - * checks the received stream against the advertised content length. - * It doesn't hurt to check again, anyway. - */ - if (explen != -1 && explen != len) { - throw new URLFetchException( - s"received $len /= $explen bytes from `$url'"); - } + /* I can't find it documented anywhere that the existing machinery + * checks the received stream against the advertised content length. + * It doesn't hurt to check again, anyway. + */ + if (explen != -1 && explen != len) { + throw new URLFetchException( + s"received $len /= $explen bytes from `$url'"); + } - /* Glorious success is ours. */ - win = true; + /* Glorious success is ours. */ + win = true; + } onInterrupt { + /* Oh. How do we do this? */ + + c match { + case c: HttpURLConnection => + /* It's an HTTP connection (what happened to the case here?). + * HTTPS connections match too because they're a subclass. Getting + * the input stream will block, but there's an easier way. + */ + c.disconnect(); + + case _ => + /* It's something else. Let's hope that getting the input stream + * doesn't block. + */ + c.getInputStream.close(); + } + } } } /*----- Threading things --------------------------------------------------*/ -def thread[T](name: String, run: Boolean = true, daemon: Boolean = true) - (f: => T): Thread = { +def thread(name: String, run: Boolean = true, daemon: Boolean = true) + (f: => Unit): Thread = { /* Make a thread with a given name, and maybe start running it. */ val t = new Thread(new Runnable { def run() { f; } }, name); @@ -257,6 +342,28 @@ def thread[T](name: String, run: Boolean = true, daemon: Boolean = true) t } +class ValueThread[T](name: String, group: ThreadGroup = null, + stacksz: Long = 0)(body: => T) + extends Thread(group, null, name, stacksz) { + private[this] var exc: Throwable = _; + private[this] var ret: T = _; + + override def run() { + try { ret = body; } + catch { case e: Throwable => exc = e; } + } + def get: T = + if (isAlive) throw new IllegalArgumentException("still running"); + else if (exc != null) throw exc; + else ret; +} +def valueThread[T](name: String, run: Boolean = true) + (body: => T): ValueThread[T] = { + val t = new ValueThread(name)(body); + if (run) t.start(); + t +} + /*----- Quoting and parsing tokens ----------------------------------------*/ def quoteTokens(v: Seq[String]): String = { @@ -416,7 +523,7 @@ def blocks(in: InputStream, blksz: Int): } } def blocks(in: InputStream): - BufferedIterator[(Array[Byte], Int)] = blocks(in, 4096); + BufferedIterator[(Array[Byte], Int)] = blocks(in, 65536); def blocks(in: BufferedReader, blksz: Int): BufferedIterator[(Array[Char], Int)] = @@ -430,7 +537,7 @@ def blocks(in: BufferedReader, blksz: Int): } } def blocks(in: BufferedReader): - BufferedIterator[(Array[Char], Int)] = blocks(in, 4096); + BufferedIterator[(Array[Char], Int)] = blocks(in, 65536); def blocks(r: Reader, blksz: Int): BufferedIterator[(Array[Char], Int)] = blocks(bufferedReader(r), blksz); def blocks(r: Reader): BufferedIterator[(Array[Char], Int)] = @@ -454,6 +561,15 @@ def oxford(conj: String, things: Seq[String]): String = things match { sb.result } +def formatTime(t: Int): String = + if (t < -1) "???" + else { + val (s, t1) = (t%60, t/60); + val (m, h) = (t1%60, t1/60); + if (h > 0) f"$h%d:$m%02d:$s%02d" + else f"$m%02d:$s%02d" + } + /*----- That's all, folks -------------------------------------------------*/ }