import scala.util.control.{Breaks, ControlThrowable};
import java.io.{BufferedReader, Closeable, File, InputStream, Reader};
-import java.net.{URL, URLConnection};
+import java.net.{HttpURLConnection, URL, URLConnection};
import java.nio.{ByteBuffer, CharBuffer};
+import java.nio.channels.{SelectionKey, Selector};
+import java.nio.channels.spi.{AbstractSelector, AbstractSelectableChannel};
import java.nio.charset.Charset;
+import java.util.{Set => JSet};
import java.util.concurrent.locks.{Lock, ReentrantLock};
/*----- Miscellaneous useful things ---------------------------------------*/
val BREAKS = new Breaks;
import BREAKS.{breakable, break};
+/*----- Interruptably doing things ----------------------------------------*/
+
+private class InterruptCatcher[T](body: => T, onWakeup: => Unit)
+ extends AbstractSelector(null) {
+ /* Hook onto the VM's thread interruption machinery.
+ *
+ * The `run' method is the only really interesting one. It will run the
+ * BODY, returning its result; if the thread is interrupted during this
+ * time, ONWAKEUP is invoked for effect. The expectation is that ONWAKEUP
+ * will somehow cause BODY to stop early.
+ *
+ * Credit for this hack goes to Nicholas Wilson: see
+ * <https://github.com/NWilson/javaInterruptHook>.
+ */
+
+ private def nope: Nothing =
+ { throw new UnsupportedOperationException("can't do that"); }
+ protected def implCloseSelector() { }
+ protected def register(chan: AbstractSelectableChannel,
+ ops: Int, att: Any): SelectionKey = nope;
+ def keys(): JSet[SelectionKey] = nope;
+ def selectedKeys(): JSet[SelectionKey] = nope;
+ def select(): Int = nope;
+ def select(millis: Long): Int = nope;
+ def selectNow(): Int = nope;
+
+ def run(): T = try {
+ begin();
+ val ret = body;
+ if (Thread.interrupted()) throw new InterruptedException;
+ ret
+ } finally {
+ end();
+ }
+ def wakeup(): Selector = { onWakeup; this }
+}
+
+class PendingInterruptable[T] private[tripe](body: => T) {
+ /* This class exists to provide the `onInterrupt THUNK' syntax. */
+
+ def onInterrupt(thunk: => Unit): T =
+ new InterruptCatcher(body, thunk).run();
+}
+def interruptably[T](body: => T) = {
+ /* interruptably { BODY } onInterrupt { THUNK }
+ *
+ * Execute BODY and return its result. If the thread receives an
+ * interrupt -- or is already in an interrupted state -- execute THUNK for
+ * effect; it is expected to cause BODY to return expeditiously, and when
+ * the BODY completes, an `InterruptedException' is thrown.
+ */
+
+ new PendingInterruptable(body);
+}
+
/*----- A gadget for fetching URLs ----------------------------------------*/
class URLFetchException(msg: String) extends Exception(msg);
/* Fetch the URL, feeding the data through the callbacks CB. */
withCleaner { clean =>
- var win: Boolean = false;
- clean { cb.done(win); }
+ var win: Boolean = false; clean { cb.done(win); }
- /* Set up the connection, and run a preflight check. */
+ /* Set up the connection. This isn't going to block, I think, and we
+ * need to use it in the interrupt handler.
+ */
val c = url.openConnection();
- cb.preflight(c);
-
- /* Start fetching data. */
- val in = c.getInputStream; clean { in.close(); }
- val explen = c.getContentLength;
- /* Read a buffer at a time, and give it to the callback. Maintain a
- * running total.
+ /* Java's default URL handlers don't respond to interrupts, so we have to
+ * take over this duty.
*/
- var len: Long = 0;
- blockUnit { exit =>
- for ((buf, n) <- blocks(in)) {
- cb.write(buf, n, len);
- len += n;
- if (explen != -1 && len > explen) exit;
+ interruptably {
+ /* Run the caller's preflight check. This must be done here, since it
+ * might well block while it discovers things like the content length.
+ */
+ cb.preflight(c);
+
+ /* Start fetching data. */
+ val in = c.getInputStream; clean { in.close(); }
+ val explen = c.getContentLength;
+
+ /* Read a buffer at a time, and give it to the callback. Maintain a
+ * running total.
+ */
+ var len: Long = 0;
+ blockUnit { exit =>
+ for ((buf, n) <- blocks(in)) {
+ cb.write(buf, n, len);
+ len += n;
+ if (explen != -1 && len > explen) exit;
+ }
}
- }
- /* I can't find it documented anywhere that the existing machinery
- * checks the received stream against the advertised content length.
- * It doesn't hurt to check again, anyway.
- */
- if (explen != -1 && explen != len) {
- throw new URLFetchException(
- s"received $len /= $explen bytes from `$url'");
- }
+ /* I can't find it documented anywhere that the existing machinery
+ * checks the received stream against the advertised content length.
+ * It doesn't hurt to check again, anyway.
+ */
+ if (explen != -1 && explen != len) {
+ throw new URLFetchException(
+ s"received $len /= $explen bytes from `$url'");
+ }
- /* Glorious success is ours. */
- win = true;
+ /* Glorious success is ours. */
+ win = true;
+ } onInterrupt {
+ /* Oh. How do we do this? */
+
+ c match {
+ case c: HttpURLConnection =>
+ /* It's an HTTP connection (what happened to the case here?).
+ * HTTPS connections match too because they're a subclass. Getting
+ * the input stream will block, but there's an easier way.
+ */
+ c.disconnect();
+
+ case _ =>
+ /* It's something else. Let's hope that getting the input stream
+ * doesn't block.
+ */
+ c.getInputStream.close();
+ }
+ }
}
}
/*----- Threading things --------------------------------------------------*/
-def thread[T](name: String, run: Boolean = true, daemon: Boolean = true)
- (f: => T): Thread = {
+def thread(name: String, run: Boolean = true, daemon: Boolean = true)
+ (f: => Unit): Thread = {
/* Make a thread with a given name, and maybe start running it. */
val t = new Thread(new Runnable { def run() { f; } }, name);
t
}
+class ValueThread[T](name: String, group: ThreadGroup = null,
+ stacksz: Long = 0)(body: => T)
+ extends Thread(group, null, name, stacksz) {
+ private[this] var exc: Throwable = _;
+ private[this] var ret: T = _;
+
+ override def run() {
+ try { ret = body; }
+ catch { case e: Throwable => exc = e; }
+ }
+ def get: T =
+ if (isAlive) throw new IllegalArgumentException("still running");
+ else if (exc != null) throw exc;
+ else ret;
+}
+def valueThread[T](name: String, run: Boolean = true)
+ (body: => T): ValueThread[T] = {
+ val t = new ValueThread(name)(body);
+ if (run) t.start();
+ t
+}
+
/*----- Quoting and parsing tokens ----------------------------------------*/
def quoteTokens(v: Seq[String]): String = {
}
}
def blocks(in: InputStream):
- BufferedIterator[(Array[Byte], Int)] = blocks(in, 4096);
+ BufferedIterator[(Array[Byte], Int)] = blocks(in, 65536);
def blocks(in: BufferedReader, blksz: Int):
BufferedIterator[(Array[Char], Int)] =
}
}
def blocks(in: BufferedReader):
- BufferedIterator[(Array[Char], Int)] = blocks(in, 4096);
+ BufferedIterator[(Array[Char], Int)] = blocks(in, 65536);
def blocks(r: Reader, blksz: Int): BufferedIterator[(Array[Char], Int)] =
blocks(bufferedReader(r), blksz);
def blocks(r: Reader): BufferedIterator[(Array[Char], Int)] =
sb.result
}
+def formatTime(t: Int): String =
+ if (t < -1) "???"
+ else {
+ val (s, t1) = (t%60, t/60);
+ val (m, h) = (t1%60, t1/60);
+ if (h > 0) f"$h%d:$m%02d:$s%02d"
+ else f"$m%02d:$s%02d"
+ }
+
/*----- That's all, folks -------------------------------------------------*/
}