/*----- Imports -----------------------------------------------------------*/
-import java.io.{BufferedReader, Reader, Writer};
+import java.io.{BufferedReader, InputStreamReader, OutputStreamWriter};
import java.util.concurrent.locks.{Condition, ReentrantLock => Lock};
import scala.collection.mutable.{HashMap, Publisher};
import scala.concurrent.Channel;
import scala.util.control.Breaks;
-import Magic._;
+import Implicits._;
+import sys.{serverInput, serverOutput};
/*----- Classification of server messages ---------------------------------*/
sealed abstract class JobMessage extends Message;
case object JobOK extends JobMessage;
-case class JobInfo(info: Seq[String]) extends JobMessage;
-case class JobFail(err: Seq[String]) extends JobMessage;
+final case class JobInfo(info: Seq[String]) extends JobMessage;
+final case class JobFail(err: Seq[String]) extends JobMessage;
case object JobLostConnection extends JobMessage;
-case class BackgroundJobMessage(tag: String, msg: JobMessage)
+final case class BackgroundJobMessage(tag: String, msg: JobMessage)
extends Message;
-case class JobDetached(tag: String) extends Message;
+final case class JobDetached(tag: String) extends Message;
sealed abstract class AsyncMessage extends Message;
-case class Trace(msg: String) extends AsyncMessage;
-case class Warning(err: Seq[String]) extends AsyncMessage;
-case class Notify(note: Seq[String]) extends AsyncMessage;
+final case class Trace(msg: String) extends AsyncMessage;
+final case class Warning(err: Seq[String]) extends AsyncMessage;
+final case class Notify(note: Seq[String]) extends AsyncMessage;
case object ConnectionLost extends AsyncMessage;
sealed abstract class ServiceMessage extends Message;
-case class ServiceCancel(jobid: String) extends ServiceMessage;
-case class ServiceClaim(svc: String, version: String)
+final case class ServiceCancel(jobid: String) extends ServiceMessage;
+final case class ServiceClaim(svc: String, version: String)
extends ServiceMessage;
-case class ServiceJob(jobid: String, svc: String,
+final case class ServiceJob(jobid: String, svc: String,
cmd: String, args: Seq[String])
extends ServiceMessage;
/*----- Main code ---------------------------------------------------------*/
-object Connection {
-}
-
class ConnectionClosed extends Exception;
class ServerFailed(msg: String) extends Exception(msg);
class ConnectionLostException extends Exception;
-class Connection(val in: Reader, val out: Writer)
- extends Publisher[AsyncMessage]
+object Connection extends Publisher[AsyncMessage]
{
/* Synchronization.
*
* hold the `Connection' lock before locking any individual `Job' objects.
*/
- var livep: Boolean = true; // Is this connection still alive?
- var fgjob: Option[this.Job] = None; // Foreground job, if there is one.
- val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
- var bgseq = 0; // Next background job tag.
+ private var livep: Boolean = true; // Is this connection still alive?
+ private var fgjob: Option[this.Job] = None; // Foreground job, if there is one.
+ private val jobmap = new HashMap[String, this.Job]; // Maps tags to extant jobs.
+ private var bgseq = 0; // Next background job tag.
+
+ private val in = new BufferedReader(new InputStreamReader(serverInput));
+ private val out = new OutputStreamWriter(serverOutput);
+
+ type Pub = Connection.type;
class Job extends Iterator[Seq[String]] {
private[Connection] val ch = new Channel[JobMessage];
def submit(toks: String*): this.Job = submit(false, toks: _*);
- def close() { synchronized { out.close(); } }
-
/* These two expect the connection lock to be held. */
def foregroundJob: Job =
fgjob.getOrElse { throw new ServerFailed("no foreground job"); }
case msg: AsyncMessage =>
publish(msg);
case _: ServiceMessage =>
- ();
+ ok;
}
}
} catch {
j.ch.write(JobLostConnection);
fgjob = None;
notifyAll();
- case None => ();
+ case None => ok;
}
}
publish(ConnectionLost);
- in.close(); out.close();
}
}
}