### Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
### USA.
+## still to do:
+## local software
+## log cleaning
+## tidy up
+
+import contextlib as CTX
import errno as E
import fcntl as FC
+import fnmatch as FM
+import glob as GLOB
+import itertools as I
import optparse as OP
import os as OS
import random as R
import re as RX
import signal as SIG
import select as SEL
+import stat as ST
+from cStringIO import StringIO
import sys as SYS
import time as T
+import traceback as TB
import jobclient as JC
def __str__(me): return '#<%s %s>' % (me.__class__.__name__, me._label)
def __repr__(me): return '#<%s %s>' % (me.__class__.__name__, me._label)
+class Struct (object):
+ def __init__(me, **kw): me.__dict__.update(kw)
+
+class Cleanup (object):
+ """
+ A context manager for stacking other context managers.
+
+ By itself, it does nothing. Attach other context managers with `enter' or
+ loose cleanup functions with `add'. On exit, contexts are left and
+ cleanups performed in reverse order.
+ """
+ def __init__(me):
+ me._cleanups = []
+ def __enter__(me):
+ return me
+ def __exit__(me, exty, exval, extb):
+ trap = False
+ for c in reversed(me._cleanups):
+ if c(exty, exval, extb): trap = True
+ return trap
+ def enter(me, ctx):
+ v = ctx.__enter__()
+ me._cleanups.append(ctx.__exit__)
+ return v
+ def add(me, func):
+ me._cleanups.append(lambda exty, exval, extb: func())
+
+def zulu(t = None):
+ """Return the time T (default now) as a string."""
+ return T.strftime("%Y-%m-%dT%H:%M:%SZ", T.gmtime(t))
+
+R_ZULU = RX.compile(r"^(\d+)-(\d+)-(\d+)T(\d+):(\d+):(\d+)Z$")
+def unzulu(z):
+ """Convert the time string Z back to a Unix time."""
+ m = R_ZULU.match(z)
+ if not m: raise ValueError("bad time spec `%s'" % z)
+ yr, mo, dy, hr, mi, se = map(int, m.groups())
+ return T.mktime((yr, mo, dy, hr, mi, se, 0, 0, 0))
+
+###--------------------------------------------------------------------------
+### Simple select(2) utilities.
+
+class BaseSelector (object):
+ """
+ A base class for hooking into `select_loop'.
+
+ See `select_loop' for details of the protocol.
+ """
+ def preselect(me, rfds, wfds): pass
+ def postselect_read(me, fd): pass
+ def postselect_write(me, fd): pass
+
+class WriteLinesSelector (BaseSelector):
+ """Write whole lines to an output file descriptor."""
+
+ def __init__(me, fd, nextfn = None, *args, **kw):
+ """
+ Initialize the WriteLinesSelector to write to the file descriptor FD.
+
+ The FD is marked non-blocking.
+
+ The lines are produced by the NEXTFN, which is called without arguments.
+ It can affect the output in three ways:
+
+ * It can return a string (or almost any other kind of object, which
+ will be converted into a string by `str'), which will be written to
+ the descriptor followed by a newline. Lines are written in the order
+ in which they are produced.
+
+ * It can return `None', which indicates that there are no more items to
+ be written for the moment. The function will be called again from
+ time to time, to see if it has changed its mind. This is the right
+ thing to do in order to stall output temporarily.
+
+ * It can raise `StopIteration', which indicates that there will never
+ be any more items. The file descriptor will be closed.
+
+ Subclasses can override this behaviour by defining a method `_next' and
+ passing `None' as the NEXTFN.
+ """
+ super(WriteLinesSelector, me).__init__(*args, **kw)
+ set_nonblocking(fd)
+ me._fd = fd
+ if nextfn is not None: me._next = nextfn
+
+ ## Selector state.
+ ##
+ ## * `_buf' contains a number of output items, already formatted, and
+ ## ready for output in a single batch. It might be empty.
+ ##
+ ## * `_pos' is the current output position in `_buf'.
+ ##
+ ## * `_more' is set unless the `_next' function has raised
+ ## `StopIteration': it indicates that we should close the descriptor
+ ## once the all of the remaining data in the buffer has been sent.
+ me._buf = ""
+ me._pos = 0
+ me._more = True
+
+ def _refill(me):
+ """Refill `_buf' by calling `_next'."""
+ sio = StringIO(); n = 0
+ while n < 4096:
+ try: item = me._next()
+ except StopIteration: me._more = False; break
+ if item is None: break
+ item = str(item)
+ sio.write(item); sio.write("\n"); n += len(item) + 1
+ me._buf = sio.getvalue(); me._pos = 0
+
+ def preselect(me, rfds, wfds):
+ if me._fd == -1: return
+ if me._buf == "" and me._more: me._refill()
+ if me._buf != "" or not me._more: wfds.append(me._fd)
+
+ def postselect_write(me, fd):
+ if fd != me._fd: return
+ while True:
+ if me._pos >= len(me._buf):
+ if me._more: me._refill()
+ if not me._more: OS.close(me._fd); me._fd = -1; break
+ if not me._buf: break
+ try: n = OS.write(me._fd, me._buf[me._pos:])
+ except OSError, err:
+ if err.errno == E.EAGAIN or err.errno == E.WOULDBLOCK: break
+ elif err.errno == E.EPIPE: OS.close(me._fd); me._fd = -1; break
+ else: raise
+ me._pos += n
+
+class ReadLinesSelector (BaseSelector):
+ """Report whole lines from an input file descriptor as they arrive."""
+
+ def __init__(me, fd, linefn = None, *args, **kw):
+ """
+ Initialize the ReadLinesSelector to read from the file descriptor FD.
+
+ The FD is marked non-blocking.
+
+ For each whole line, and the final partial line (if any), the selector
+ calls LINEFN with the line as an argument (without the terminating
+ newline, if any).
+
+ Subclasses can override this behaviour by defining a method `_line' and
+ passing `None' as the LINEFN.
+ """
+ super(ReadLinesSelector, me).__init__(*args, **kw)
+ set_nonblocking(fd)
+ me._fd = fd
+ me._buf = ""
+ if linefn is not None: me._line = linefn
+
+ def preselect(me, rfds, wfds):
+ if me._fd != -1: rfds.append(me._fd)
+
+ def postselect_read(me, fd):
+ if fd != me._fd: return
+ while True:
+ try: buf = OS.read(me._fd, 4096)
+ except OSError, err:
+ if err.errno == E.EAGAIN or err.errno == E.WOULDBLOCK: break
+ else: raise
+ if buf == "":
+ OS.close(me._fd); me._fd = -1
+ if me._buf: me._line(me._buf)
+ break
+ buf = me._buf + buf
+ i = 0
+ while True:
+ try: j = buf.index("\n", i)
+ except ValueError: break
+ me._line(buf[i:j])
+ i = j + 1
+ me._buf = buf[i:]
+
+def select_loop(selectors):
+ """
+ Multiplex I/O between the various SELECTORS.
+
+ A `selector' SEL is an object which implements the selector protocol, which
+ consists of three methods.
+
+ * SEL.preselect(RFDS, WFDS) -- add any file descriptors which the
+ selector is interested in reading from to the list RFDS, and add file
+ descriptors it's interested in writing to to the list WFDS.
+
+ * SEL.postselect_read(FD) -- informs the selector that FD is ready for
+ reading.
+
+ * SEL.postselect_write(FD) -- informs the selector that FD is ready for
+ writing.
+
+ The `select_loop' function loops as follows.
+
+ * It calls the `preselect' method on each SELECTOR to determine what I/O
+ events it thinks are interesting.
+
+ * It waits for some interesting event to happen.
+
+ * It calls the `postselect_read' and/or `postselect_write' methods on all
+ of the selectors for each file descriptor which is ready.
+
+ The loop ends when no selector is interested in any events. This is simple
+ but rather inefficient.
+ """
+ while True:
+ rfds, wfds = [], []
+ for sel in selectors: sel.preselect(rfds, wfds)
+ if not rfds and not wfds: break
+ rfds, wfds, _ = SEL.select(rfds, wfds, [])
+ for fd in rfds:
+ for sel in selectors: sel.postselect_read(fd)
+ for fd in wfds:
+ for sel in selectors: sel.postselect_write(fd)
+
+###--------------------------------------------------------------------------
+### Running subprocesses.
+
+def wait_outcome(st):
+ """
+ Given a ST from `waitpid' (or similar), return a human-readable outcome.
+ """
+ if OS.WIFSIGNALED(st): return "killed by signal %d" % OS.WTERMSIG(st)
+ elif OS.WIFEXITED(st):
+ rc = OS.WEXITSTATUS(st)
+ if rc: return "failed: rc = %d" % rc
+ else: return "completed successfully"
+ else: return "died with incomprehensible status 0x%04x" % st
+
+class SubprocessFailure (Exception):
+ """An exception indicating that a subprocess failed."""
+ def __init__(me, what, st):
+ me.st = st
+ me.what = what
+ if OS.WIFEXITED(st): me.rc, me.sig = OS.WEXITSTATUS(st), None
+ elif OS.WIFSIGNALED(st): me.rc, me.sig = None, OS.WTERMSIG(st)
+ else: me.rc, me.sig = None, None
+ def __str__(me):
+ return "subprocess `%s' %s" % (me.what, wait_outcome(me.st))
+
+INHERIT = Tag('INHERIT')
+PIPE = Tag('PIPE')
+DISCARD = Tag('DISCARD')
+@CTX.contextmanager
+def subprocess(command,
+ stdin = INHERIT, stdout = INHERIT, stderr = INHERIT,
+ jobserver = DISCARD):
+ """
+ Hairy context manager for running subprocesses.
+
+ The COMMAND is a list of arguments; COMMAND[0] names the program to be
+ invoked. (There's currently no way to run a program with an unusual
+ `argv[0]'.)
+
+ The keyword arguments `stdin', `stdout', and `stderr' explain what to do
+ with the standard file descriptors.
+
+ * `INHERIT' means that they should be left alone: the child will use a
+ copy of the parent's descriptor. This is the default.
+
+ * `DISCARD' means that the descriptor should be re-opened onto
+ `/dev/null' (for reading or writing as appropriate).
+
+ * `PIPE' means that the descriptor should be re-opened as (the read or
+ write end, as appropriate, of) a pipe, and the other end returned to
+ the context body.
+
+ Simiarly, the JOBSERVER may be `INHERIT' to pass the jobserver descriptors
+ and environment variable down to the child, or `DISCARD' to close it. The
+ default is `DISCARD'.
+
+ The context is returned three values, which are file descriptors for other
+ pipe ends for stdin, stdout, and stderr respectively, or -1 if there is no
+ pipe.
+
+ The context owns the pipe descriptors, and is expected to close them
+ itself. (Timing of closure is significant, particularly for `stdin'.)
+ """
+
+ ## Set up.
+ r_in, w_in = -1, -1
+ r_out, w_out = -1, -1
+ r_err, w_err = -1, -1
+ spew("running subprocess `%s'" % " ".join(command))
+
+ ## Clean up as necessary...
+ try:
+
+ ## Set up stdin.
+ if stdin is PIPE: r_in, w_in = OS.pipe()
+ elif stdin is DISCARD: r_in = OS.open("/dev/null", OS.O_RDONLY)
+ elif stdin is not INHERIT:
+ raise ValueError("bad `stdin' value `%r'" % stdin)
+
+ ## Set up stdout.
+ if stdout is PIPE: r_out, w_out = OS.pipe()
+ elif stdout is DISCARD: w_out = OS.open("/dev/null", OS.O_WRONLY)
+ elif stdout is not INHERIT:
+ raise ValueError("bad `stderr' value `%r'" % stdout)
+
+ ## Set up stderr.
+ if stderr is PIPE: r_err, w_err = OS.pipe()
+ elif stderr is DISCARD: w_err = OS.open("/dev/null", OS.O_WRONLY)
+ elif stderr is not INHERIT:
+ raise ValueError("bad `stderr' value `%r'" % stderr)
+
+ ## Start up the child.
+ kid = OS.fork()
+
+ if kid == 0:
+ ## Child process.
+
+ ## Fix up stdin.
+ if r_in != -1: OS.dup2(r_in, 0); OS.close(r_in)
+ if w_in != -1: OS.close(w_in)
+
+ ## Fix up stdout.
+ if w_out != -1: OS.dup2(w_out, 1); OS.close(w_out)
+ if r_out != -1: OS.close(r_out)
+
+ ## Fix up stderr.
+ if w_err != -1: OS.dup2(w_err, 2); OS.close(w_err)
+ if r_err != -1: OS.close(r_err)
+
+ ## Fix up the jobserver.
+ if jobserver is DISCARD: SCHED.close_jobserver()
+
+ ## Run the program.
+ try: OS.execvp(command[0], command)
+ except OSError, err:
+ moan("failed to run `%s': %s" % err.strerror)
+ OS._exit(127)
+
+ ## Close the other ends of the pipes.
+ if r_in != -1: OS.close(r_in); r_in = -1
+ if w_out != -1: OS.close(w_out); w_out = -1
+ if w_err != -1: OS.close(w_err); w_err = -1
+
+ ## Return control to the context body. Remember not to close its pipes.
+ yield w_in, r_out, r_err
+ w_in = r_out = r_err = -1
+
+ ## Collect the child process's exit status.
+ _, st = OS.waitpid(kid, 0)
+ spew("subprocess `%s' %s" % (" ".join(command), wait_outcome(st)))
+ if st: raise SubprocessFailure(" ".join(command), st)
+
+ ## Tidy up.
+ finally:
+
+ ## Close any left-over file descriptors.
+ for fd in [r_in, w_in, r_out, w_out, r_err, w_err]:
+ if fd != -1: OS.close(fd)
+
+def set_nonblocking(fd):
+ """Mark the descriptor FD as non-blocking."""
+ FC.fcntl(fd, FC.F_SETFL, FC.fcntl(fd, FC.F_GETFL) | OS.O_NONBLOCK)
+
+class DribbleOut (BaseSelector):
+ """A simple selector to feed a string to a descriptor, in pieces."""
+ def __init__(me, fd, string, *args, **kw):
+ super(DribbleOut, me).__init__(*args, **kw)
+ me._fd = fd
+ me._string = string
+ me._i = 0
+ set_nonblocking(me._fd)
+ me.result = None
+ def preselect(me, rfds, wfds):
+ if me._fd != -1: wfds.append(me._fd)
+ def postselect_write(me, fd):
+ if fd != me._fd: return
+ try: n = OS.write(me._fd, me._string)
+ except OSError, err:
+ if err.errno == E.EAGAIN or err.errno == E.EWOULDBLOCK: return
+ elif err.errno == E.EPIPE: OS.close(me._fd); me._fd = -1; return
+ else: raise
+ if n == len(me._string): OS.close(me._fd); me._fd = -1
+ else: me._string = me._string[n:]
+
+class DribbleIn (BaseSelector):
+ """A simple selector to collect all the input as a big string."""
+ def __init__(me, fd, *args, **kw):
+ super(DribbleIn, me).__init__(*args, **kw)
+ me._fd = fd
+ me._buf = StringIO()
+ set_nonblocking(me._fd)
+ def preselect(me, rfds, wfds):
+ if me._fd != -1: rfds.append(me._fd)
+ def postselect_read(me, fd):
+ if fd != me._fd: return
+ while True:
+ try: buf = OS.read(me._fd, 4096)
+ except OSError, err:
+ if err.errno == E.EAGAIN or err.errno == E.EWOULDBLOCK: break
+ else: raise
+ if buf == "": OS.close(me._fd); me._fd = -1; break
+ else: me._buf.write(buf)
+ @property
+ def result(me): return me._buf.getvalue()
+
+RETURN = Tag('RETURN')
+def run_program(command,
+ stdin = INHERIT, stdout = INHERIT, stderr = INHERIT,
+ *args, **kwargs):
+ """
+ A simplifying wrapper around `subprocess'.
+
+ The COMMAND is a list of arguments; COMMAND[0] names the program to be
+ invoked, as for `subprocess'.
+
+ The keyword arguments `stdin', `stdout', and `stderr' explain what to do
+ with the standard file descriptors.
+
+ * `INHERIT' means that they should be left alone: the child will use a
+ copy of the parent's descriptor.
+
+ * `DISCARD' means that the descriptor should be re-opened onto
+ `/dev/null' (for reading or writing as appropriate).
+
+ * `RETURN', for an output descriptor, means that all of the output
+ produced on that descriptor should be collected and returned as a
+ string.
+
+ * A string, for stdin, means that the string should be provided on the
+ child's standard input.
+
+ (The value `PIPE' is not permitted here.)
+
+ Other arguments are passed on to `subprocess'.
+
+ If no descriptors are marked `RETURN', then the function returns `None'; if
+ exactly one descriptor is so marked, then the function returns that
+ descriptor's output as a string; otherwise, it returns a tuple of strings
+ for each such descriptor, in the usual order.
+ """
+ kw = dict(); kw.update(kwargs)
+ selfn = []
+
+ if isinstance(stdin, basestring):
+ kw['stdin'] = PIPE; selfn.append(lambda fds: DribbleOut(fds[0], stdin))
+ elif stdin is INHERIT or stdin is DISCARD:
+ kw['stdin'] = stdin
+ else:
+ raise ValueError("bad `stdin' value `%r'" % stdin)
+
+ if stdout is RETURN:
+ kw['stdout'] = PIPE; selfn.append(lambda fds: DribbleIn(fds[1]))
+ elif stdout is INHERIT or stdout is DISCARD:
+ kw['stdout'] = stdout
+ else:
+ raise ValueError("bad `stdout' value `%r'" % stdout)
+
+ if stderr is RETURN:
+ kw['stderr'] = PIPE; selfn.append(lambda fds: DribbleIn(fds[2]))
+ elif stderr is INHERIT or stderr is DISCARD:
+ kw['stderr'] = stderr
+ else:
+ raise ValueError("bad `stderr' value `%r'" % stderr)
+
+ with subprocess(command, *args, **kw) as fds:
+ sel = [fn(fds) for fn in selfn]
+ select_loop(sel)
+ rr = []
+ for s in sel:
+ r = s.result
+ if r is not None: rr.append(r)
+ if len(rr) == 0: return None
+ if len(rr) == 1: return rr[0]
+ else: return tuple(rr)
+
+###--------------------------------------------------------------------------
+### Other system-ish utilities.
+
+@CTX.contextmanager
+def safewrite(path):
+ """
+ Context manager for writing to a file.
+
+ A new file, named `PATH.new', is opened for writing, and the file object
+ provided to the context body. If the body completes normally, the file is
+ closed and renamed to PATH. If the body raises an exception, the file is
+ still closed, but not renamed into place.
+ """
+ new = path + ".new"
+ with open(new, "w") as f: yield f
+ OS.rename(new, path)
+
+@CTX.contextmanager
+def safewrite_root(path, mode = None, uid = None, gid = None):
+ """
+ Context manager for writing to a file with root privileges.
+
+ This is as for `safewrite', but the file is opened and written as root.
+ """
+ new = path + ".new"
+ with subprocess(C.ROOTLY + ["tee", new],
+ stdin = PIPE, stdout = DISCARD) as (fd_in, _, _):
+ pipe = OS.fdopen(fd_in, 'w')
+ try: yield pipe
+ finally: pipe.close()
+ if mode is not None: run_program(C.ROOTLY + ["chmod", mode, new])
+ if uid is not None:
+ run_program(C.ROOTLY + ["chown",
+ uid + (gid is not None and ":" + gid or ""),
+ new])
+ elif gid is not None:
+ run_program(C.ROOTLY + ["chgrp", gid, new])
+ run_program(C.ROOTLY + ["mv", new, path])
+
+def mountpoint_p(dir):
+ """Return true if DIR is a mountpoint."""
+
+ ## A mountpoint can be distinguished because it is a directory whose device
+ ## number differs from its parent.
+ try: st1 = OS.stat(dir)
+ except OSError, err:
+ if err.errno == E.ENOENT: return False
+ else: raise
+ if not ST.S_ISDIR(st1.st_mode): return False
+ st0 = OS.stat(OS.path.join(dir, ".."))
+ return st0.st_dev != st1.st_dev
+
+def mkdir_p(dir, mode = 0777):
+ """
+ Make a directory DIR, and any parents, as necessary.
+
+ Unlike `OS.makedirs', this doesn't fail if DIR already exists.
+ """
+ d = ""
+ for p in dir.split("/"):
+ d = OS.path.join(d, p)
+ try: OS.mkdir(d, mode)
+ except OSError, err:
+ if err.errno == E.EEXIST: pass
+ else: raise
+
+def umount(fs):
+ """
+ Unmount the filesystem FS.
+
+ The FS may be the block device holding the filesystem, or (more usually)
+ the mount point.
+ """
+
+ ## Sometimes random things can prevent unmounting. Be persistent.
+ for i in xrange(5):
+ try: run_program(C.ROOTLY + ["umount", fs], stderr = DISCARD)
+ except SubprocessFailure, err:
+ if err.rc == 32: pass
+ else: raise
+ else: return
+ T.sleep(0.2)
+ run_program(C.ROOTLY + ["umount", fs], stderr = DISCARD)
+
+@CTX.contextmanager
+def lockfile(lock, exclp = True, waitp = True):
+ """
+ Acquire an exclusive lock on a named file LOCK while executing the body.
+
+ If WAITP is true, wait until the lock is available; if false, then fail
+ immediately if the lock can't be acquired.
+ """
+ fd = -1
+ flag = 0
+ if exclp: flag |= FC.LOCK_EX
+ else: flag |= FC.LOCK_SH
+ if not waitp: flag |= FC.LOCK_NB
+ spew("acquiring %s lock on `%s'" %
+ (exclp and "exclusive" or "shared", lock))
+ try:
+ while True:
+
+ ## Open the file and take note of which file it is.
+ fd = OS.open(lock, OS.O_RDWR | OS.O_CREAT, 0666)
+ st0 = OS.fstat(fd)
+
+ ## Acquire the lock, waiting if necessary.
+ FC.lockf(fd, flag)
+
+ ## Check that the lock file is still the same one. It's permissible
+ ## for the lock holder to release the lock by unlinking or renaming the
+ ## lock file, in which case there might be a different lockfile there
+ ## now which we need to acquire instead.
+ ##
+ ## It's tempting to `optimize' this code by opening a new file
+ ## descriptor here so as to elide the additional call to fstat(2)
+ ## above. But this doesn't work: if we successfully acquire the lock,
+ ## we then have two file descriptors open on the lock file, so we have
+ ## to close one -- but, under the daft fcntl(2) rules, even closing
+ ## `nfd' will release the lock immediately.
+ try:
+ st1 = OS.stat(lock)
+ except OSError, err:
+ if err.errno == E.ENOENT: pass
+ else: raise
+ if st0.st_dev == st1.st_dev and st0.st_ino == st1.st_ino: break
+ OS.close(fd)
+
+ ## We have the lock, so away we go.
+ spew("lock `%s' acquired" % lock)
+ yield None
+ spew("lock `%s' released" % lock)
+
+ finally:
+ if fd != -1: OS.close(fd)
+
+def block_device_p(dev):
+ """Return true if DEV names a block device."""
+ try: st = OS.stat(dev)
+ except OSError, err:
+ if err.errno == E.ENOENT: return False
+ else: raise
+ else: return ST.S_ISBLK(st.st_mode)
+
###--------------------------------------------------------------------------
### Running parallel jobs.
Jobs are interned! Don't construct instances (of subclasses) directly:
use the `ensure' class method.
"""
- assert token is me._MAGIC
+ assert _token is me._MAGIC
super(BaseJob, me).__init__(*args, **kw)
## Dependencies on other jobs.
me._deps = set()
me._waiting = set()
- ## Attributes maintained by the JobServer
+ ## Attributes maintained by the JobServer.
me.done = False
+ me.started = False
me.win = None
me._token = None
me._known = False
+ me._st = None
me._logkid = -1
me._logfile = None
f.seek(off, 0)
spew("try at off = %d" % off)
buf = f.read(bufsz)
- nlines += buf.count('\n')
+ nlines += buf.count("\n")
spew("now lines = %d" % nlines)
bufs.append(buf)
buf = ''.join(reversed(bufs))
## We probably overshot. Skip the extra lines from the start.
i = 0
- while nlines > me.LOGLINES: i = buf.index('\n', i) + 1; nlines -= 1
+ while nlines > me.LOGLINES: i = buf.index("\n", i) + 1; nlines -= 1
## If we ended up trimming the log, print an ellipsis.
- if off > 0 or i > 0: print "%-24s * [...]" % me.name
+ if off > 0 or i > 0: print "%-*s * [...]" % (TAGWD, me.name)
## Print the log tail.
- lines = buf[i:].split('\n')
+ lines = buf[i:].split("\n")
if lines and lines[-1] == '': lines.pop()
- for line in lines: print "%-24s %s" % (me.name, line)
+ for line in lines: print "%-*s %s" % (TAGWD, me.name, line)
class BaseJobToken (object):
"""
There only needs to be one of these.
"""
- def recycle(me): pass
+ def recycle(me):
+ spew("no token needed; nothing to recycle")
TRIVIAL_TOKEN = TrivialJobToken()
class JobServerToken (BaseJobToken):
"""A job token storing a byte from the jobserver pipe."""
- def __init__(me, char, pipefd, *arg, **kw):
- super(JobServerToken, me).__init__(*arg, **kw)
+ def __init__(me, char, pipefd, *args, **kw):
+ super(JobServerToken, me).__init__(*args, **kw)
me._char = char
me._fd = pipefd
def recycle(me):
+ spew("returning token to jobserver pipe")
OS.write(me._fd, me._char)
class PrivateJobToken (BaseJobToken):
the pipe, and an additional one which represents the slot we're actually
running in. This class represents that additional token.
"""
- def __init__(me, sched, *arg, **kw):
- super(JobServerToken, me).__init__(*arg, **kw)
+ def __init__(me, sched, *args, **kw):
+ super(PrivateJobToken, me).__init__(*args, **kw)
me._sched = sched
def recycle(me):
assert me._sched._privtoken is None
+ spew("recycling private token")
me._sched._privtoken = me
-class JobStreamLogger (object):
- """Log an output stream from a job, to stdout, and into a logfile."""
- def __init__(me, fd, tag, marker, logfd = -1):
- """
- Initialize the JobStreamLogger.
-
- * FD is the descriptor to read. It will be made nonblocking.
-
- * LOGFD is the logfile descriptor to write, or -1 if we're not
- logging.
-
- * The TAG and MARKER are used to format the log messages.
-
- If `OPT.quiet' is false, then lines read from FD are written to standard
- output as
-
- TAG X LINE
-
- where X is the MARKER character used to distinguish this output stream,
- because they're all interleaved in the logfile. The TAG (and following
- space) is not written to the logfile, since the file's name is sufficient
- to explain what's going on.
-
- See `preselect' and `postselect' for the protocol.
- """
-
- ## Store the parameters.
- me._fd = fd
- me._logfd = logfd
- me._tag = tag
- me._marker = marker
-
- ## Make the descriptor nonblocking.
- FC.fcntl(fd, FC.F_SETFL, FC.fcntl(fd, FC.F_GETFL) | OS.O_NONBLOCK)
-
- ## Clear the line buffer.
- me._buf = ""
-
- def preselect(me, rfds):
- """
- Augment the list RFDS with our input descriptor, if it's still active.
-
- If the list is empty then the logging process will quit.
- """
- if me._fd >= 0: rfds.append(me._fd)
-
- def postselect(me, fd):
- """Read and process input if FD is our descriptor."""
-
- ## Check the descriptor.
- if fd != me._fd: return
-
- ## If we encounter end-of-file, we'll close. Stop then, or if we run out
- ## of stuff.
- while me._fd >= 0:
-
- ## Try to read some data. If there's nothing to read then we're done.
- try:
- buf = OS.read(me._fd, 4096)
- except OSError, err:
- if err.errno == E.EAGAIN or err.errno == E.WOULDBLOCK: break
- else: raise
-
- ## Work out what to do.
- if buf == "":
- ## We've encountered end-of-file. Close the descriptor. If there's
- ## a final unterminated line in the buffer, then we'll have to drain
- ## it.
-
- OS.close(me._fd); me._fd = -1
- if me._buf == "": lines = []
- else: lines = [me._buf]; me._buf = ""
-
- else:
- ## Some data arrived. Append it to our existing buffer, and gather
- ## any complete lines, leaving the final stub behind for next time.
-
- lines = (me._buf + buf).split('\n')
- lines, me._buf = lines[:-1], lines[-1]
-
- ## Format and print the lines we found.
- for line in lines:
- if not OPT.quiet:
- OS.write(1, '%-24s %s %s\n' % (me._tag, me._marker, line))
- if me._logfd != -1:
- OS.write(me._logfd, '%s %s\n' % (me._marker, line))
+TAGWD = 30
class JobScheduler (object):
"""
if rfd == -1 and npar > 1:
rfd, wfd = OS.pipe()
OS.write(wfd, (npar - 1)*'+')
+ OS.environ["MAKEFLAGS"] = \
+ (" -j --jobserver-auth=%(rfd)d,%(wfd)d " +
+ "--jobserver-fds=%(rfd)d,%(wfd)d") % dict(rfd = rfd, wfd = wfd)
me._rfd = rfd; me._wfd = wfd
## The scheduler state. A job starts in the `_check' list. Each
me._check.add(job)
me._njobs += 1
+ def close_jobserver(me):
+ """
+ Close the jobserver file descriptors.
+
+ This should be called within child processes to prevent them from messing
+ with the jobserver.
+ """
+ if me._rfd != -1: OS.close(me._rfd); me._rfd = -1
+ if me._wfd != -1: OS.close(me._wfd); me._wfd = -1
+ try: del OS.environ["MAKEFLAGS"]
+ except KeyError: pass
+
def _killall(me):
"""Zap all jobs which aren't yet running."""
for jobset in [me._sleep, me._check, me._ready]:
global RC
## Return the job's token to the pool.
- job._token.recycle()
+ if job._token is not None: job._token.recycle()
job._token = None
me._njobs -= 1
job.done = True
job.win = win
if outcome is not None and not OPT.silent:
- print "%-24s %c (%s)" % (job.name, job.win and '|' or '*', outcome)
+ if OPT.quiet and not job.win and job._logfile: job._logtail()
+ print "%-*s %c (%s)" % \
+ (TAGWD, job.name, job.win and '|' or '*', outcome)
## If the job failed, and we care, arrange to exit nonzero.
if not win and not OPT.ignerr: RC = 2
Deal with the child with process-id KID having exited with status ST.
"""
- ## Maybe this is a logging child. Note that the logging child has
- ## finished.
- try: job = me._logkidmap[kid]
- except KeyError: pass
- else:
- del me._logkidmap[kid]
- job._logkid = DONE
- if job.done and not job.win and OPT.quiet: job._logtail()
- return
-
- ## Find the job associated with this process-id.
- try:
- job = me._kidmap[kid]
+ ## Figure out what kind of child this is. Note that it has finished.
+ try: job = me._kidmap[kid]
except KeyError:
- spew("unknown child %d exits with status 0x%04x" % (kid, st))
+ try: job = me._logkidmap[kid]
+ except KeyError:
+ spew("unknown child %d exits with status 0x%04x" % (kid, st))
+ return
+ else:
+ ## It's a logging child.
+ del me._logkidmap[kid]
+ job._logkid = DONE
+ spew("logging process for job `%s' exits with status 0x%04x" %
+ (job.name, st))
+ else:
+ job._st = st
+ del me._kidmap[kid]
+ spew("main process for job `%s' exits with status 0x%04x" %
+ (job.name, st))
+
+ ## If either of the job's associated processes is still running then we
+ ## should stop now and give the other one a chance.
+ if job._st is None or job._logkid is not DONE:
+ spew("deferring retirement for job `%s'" % job.name)
return
-
- ## Remove the job from the list.
- del me._kidmap[kid]
+ spew("completing deferred retirement for job `%s'" % job.name)
## Update and (maybe) report the job status.
- win = False
- if OS.WIFSIGNALED(st): outcome = 'killed by signal %d' % OS.WTERMSIG(st)
- elif OS.WIFEXITED(st):
- rc = OS.WEXITSTATUS(st)
- if rc: outcome = 'failed: rc = %d' % rc
- else:
- win = True
- outcome = None
- else: outcome = 'died with incomprehensible status 0x%04x' % st
-
- ## Maybe print the job log tail.
- if not win and OPT.quiet and job._logkid is DONE: job._logtail()
+ if job._st == 0: win = True; outcome = None
+ else: win = False; outcome = wait_outcome(job._st)
## Retire the job.
me._retire(job, win, outcome)
def run_job(me, job):
"""Start running the JOB."""
+ if OPT.dryrun: return None, None
+
## Make pipes to collect the job's output and error reports.
r_out, w_out = OS.pipe()
r_err, w_err = OS.pipe()
## The main logging loop.
## Close the jobserver descriptors, and the write ends of the pipes.
- if me._rfd != -1: OS.close(me._rfd)
- if me._wfd != -1: OS.close(me._wfd)
+ me.close_jobserver()
OS.close(w_out); OS.close(w_err)
- ## Make JobStreamLogger objects for the job's stdout and stderr.
- jobinputs = [JobStreamLogger(fd, job.name, ch, logfd)
- for ch, fd in [('|', r_out), ('*', r_err)]]
-
- while True:
- ## Wait for input arriving on the remaining pipes. Once a pipe
- ## closes, the JobStreamLogger for that pipe will stop adding it to
- ## `rfds', so we stop if `rfds' is an empty list.
- rfds = []
- for ji in jobinputs: ji.preselect(rfds)
- if not rfds: break
-
- ## Collect input from the active descriptors.
- rfds, _, _ = SEL.select(rfds, [], [])
- for fd in rfds:
- for ji in jobinputs: ji.postselect(fd)
+ ## Capture the job's stdout and stderr and wait for everything to
+ ## happen.
+ def log_lines(fd, marker):
+ def fn(line):
+ if not OPT.quiet:
+ OS.write(1, "%-*s %s %s\n" % (TAGWD, job.name, marker, line))
+ OS.write(logfd, "%s %s\n" % (marker, line))
+ return ReadLinesSelector(fd, fn)
+ select_loop([log_lines(r_out, "|"), log_lines(r_err, "*")])
## We're done. (Closing the descriptors here would be like polishing
## the floors before the building is demolished.)
if not kid:
## The main job.
- ## Close the jobserver descriptors, and the read ends of the pipes, and
- ## move the write ends to the right places. (This will go wrong if we
- ## were started without enough descriptors. Fingers crossed.)
- if me._rfd != -1: OS.close(me._rfd)
- if me._wfd != -1: OS.close(me._wfd)
+ ## Close the read ends of the pipes, and move the write ends to the
+ ## right places. (This will go wrong if we were started without enough
+ ## descriptors. Fingers crossed.)
OS.dup2(w_out, 1); OS.dup2(w_err, 2)
OS.close(r_out); OS.close(w_out)
OS.close(r_err); OS.close(w_err)
try:
job.run()
except Exception, err:
- moan("fatal Python exception: %s" % err)
+ TB.print_exc(SYS.stderr)
OS._exit(2)
except BaseException, err:
moan("caught unexpected exception: %r" % err)
## process.
OS.close(r_out); OS.close(w_out)
OS.close(r_err); OS.close(w_err)
+ job.started = True
return kid, None
def run(me):
##spew("sleeping: %s" % ", ".join([j.name for j in me._sleep]))
##spew("ready: %s" % ", ".join([j.name for j in me._ready]))
##spew("running: %s" % ", ".join([j.name for j in me._kidmap.itervalues()]))
- assert not me._sleep or me._kidmap or me._ready
+ assert not me._sleep or me._kidmap or me._logkidmap or me._ready
## Wait for something to happen.
if not me._ready or (not me._par and me._privtoken is None):
## We're running with unlimited parallelism, so we don't need a token
## to run a job.
spew("running new job without token")
- token = None
+ token = TRIVIAL_TOKEN
elif me._privtoken:
## Our private token is available, so we can use that to start
## a new job.
me._killall()
continue
spew("received token from jobserver")
- token = JobToken(tokch)
+ token = JobServerToken(tokch, me._wfd)
## We have a token, so we should start up the job.
job = me._ready.pop()
job._token = token
spew("start new job `%s'" % job.name)
kid, err = me.run_job(job)
- if kid is None:
+ if err is not None:
me._retire(job, False, "failed to fork: %s" % err)
continue
- else:
- me._kidmap[kid] = job
+ if kid is None: me._retire(job, True, "dry run")
+ else: me._kidmap[kid] = job
## We ran out of work to do.
spew("JobScheduler done")
+###--------------------------------------------------------------------------
+### Metadata files.
+
+class MetadataClass (type):
+ def __new__(me, name, supers, dict):
+ try: vars = dict['VARS']
+ except KeyError: pass
+ else: dict['_VARSET'] = set(vars)
+ return super(MetadataClass, me).__new__(me, name, supers, dict)
+
+class BaseMetadata (object):
+ __metaclass__ = MetadataClass
+
+ def __init__(me, **kw):
+ for k, v in kw.iteritems():
+ if k not in me._VARSET:
+ raise ValueError("unexpected key `%s' in `%s' metadata" %
+ (k, me.__class__.__name__))
+ setattr(me, k, v)
+
+ @classmethod
+ def read(cls, file):
+ map = {}
+ with open(file) as f:
+ for line in f:
+ line = line.strip()
+ if line == "" or line.startswith("#"): continue
+ k, v = line.split("=", 1)
+ map[k.strip()] = v.strip()
+ return cls(**map)
+
+ def _write(me, file):
+ file.write("### -*-conf-*-\n")
+ for k in me.VARS:
+ try: v = getattr(me, k)
+ except AttributeError: pass
+ else: file.write("%s = %s\n" % (k, v))
+
+ def write(me, path):
+ with safewrite(path) as f: me._write(f)
+
+ def __repr__(me):
+ return "#<%s: %s>" % (me.__class__.__name__,
+ ", ".join("%s=%r" % (k, getattr(me, k, None))
+ for k in me.VARS))
+
+###--------------------------------------------------------------------------
+### Chroot metadata.
+
+def lockfile_path(file):
+ lockdir = OS.path.join(C.STATE, "lock"); mkdir_p(lockdir)
+ return OS.path.join(lockdir, file)
+
+def chroot_src_lockfile(dist, arch):
+ return lockfile_path("source.%s-%s" % (dist, arch))
+
+def crosstools_lockfile(dist, arch):
+ return lockfile_path("cross-tools.%s-%s" % (dist, arch))
+
+def chroot_src_lv(dist, arch):
+ return "%s%s-%s" % (C.LVPREFIX, dist, arch)
+
+def chroot_src_blkdev(dist, arch):
+ return OS.path.join("/dev", C.VG, chroot_src_lv(dist, arch))
+
+def chroot_src_mntpt(dist, arch):
+ mnt = OS.path.join(C.STATE, "mnt", "%s-%s" % (dist, arch))
+ mkdir_p(mnt)
+ return mnt
+
+class NoSuchChroot (Exception):
+ def __init__(me, dist, arch):
+ me.dist = dist
+ me.arch = arch
+ def __str__(me):
+ return "chroot for `%s' on `%s' not found" % (me.dist, me.arch)
+
+@CTX.contextmanager
+def mount_chroot_src(dist, arch):
+ dev = chroot_src_blkdev(dist, arch)
+ if not block_device_p(dev): raise NoSuchChroot(dist, arch)
+ mnt = chroot_src_mntpt(dist, arch)
+ try:
+ run_program(C.ROOTLY + ["mount", dev, mnt])
+ yield mnt
+ finally:
+ umount(mnt)
+
+@CTX.contextmanager
+def chroot_session(dist, arch, sourcep = False):
+ chroot = chroot_src_lv(dist, arch)
+ if sourcep: chroot = "source:" + chroot
+ session = run_program(["schroot", "-uroot", "-b", "-c", chroot],
+ stdout = RETURN).rstrip("\n")
+ try:
+ root = OS.path.join("/schroot", session, "fs")
+ yield session, root
+ finally:
+ run_program(["schroot", "-e", "-c", session])
+
+def run_root(command, **kw):
+ return run_program(C.ROOTLY + command, **kw)
+
+def run_schroot_session(session, command, **kw):
+ return run_program(["schroot", "-uroot", "-r",
+ "-c", session, "--"] + command, **kw)
+
+def run_schroot_source(dist, arch, command, **kw):
+ return run_program(["schroot", "-uroot",
+ "-c", "source:%s" % chroot_src_lv(dist, arch),
+ "--"] + command, **kw)
+
+class ChrootMetadata (BaseMetadata):
+ VARS = ['dist', 'arch', 'update']
+
+ @classmethod
+ def read(cls, dist, arch):
+ try:
+ with lockfile(chroot_src_lockfile(dist, arch)):
+ with mount_chroot_src(dist, arch) as mnt:
+ return super(ChrootMetadata, cls).read(OS.path.join(mnt, "META"))
+ except IOError, err:
+ if err.errno == E.ENOENT: pass
+ else: raise
+ except NoSuchChroot: pass
+ return cls(dist = dist, arch = arch, update = None)
+
+ def write(me):
+ with mount_chroot_src(me.dist, me.arch) as mnt:
+ with safewrite_root(OS.path.join(mnt, "META")) as f:
+ me._write(f)
+
+class CrossToolsMetadata (BaseMetadata):
+ VARS = ['dist', 'arch', 'update']
+
+ @classmethod
+ def read(cls, dist, arch):
+ try:
+ with lockfile(crosstools_lockfile(dist, arch)):
+ return super(CrossToolsMetadata, cls) \
+ .read(OS.path.join(C.LOCAL, "cross",
+ "%s-%s" % (dist, arch), "META"))
+ except IOError, err:
+ if err.errno == E.ENOENT: pass
+ else: raise
+ return cls(dist = dist, arch = arch, update = None)
+
+ def write(me, dir = None):
+ if dir is None:
+ dir = OS.path.join(C.LOCAL, "cross", "%s-%s" % (me.dist, me.arch))
+ with safewrite_root(OS.path.join(dir, "META")) as f:
+ me._write(f)
+
+def switch_prefix(string, map):
+ for old, new in map:
+ if string.startswith(old): return new + string[len(old):]
+ raise ValueError("expected `%s' to start with one of %s" %
+ ", ".join(["`%s'" % old for old, new in map]))
+
+def host_to_chroot(path):
+ return switch_prefix(path, [(C.LOCAL + "/", "/usr/local.schroot/")])
+
+def chroot_to_host(path):
+ return switch_prefix(path, [("/usr/local.schroot/", C.LOCAL + "/")])
+
+def split_dist_arch(spec):
+ dash = spec.index("-")
+ return spec[:dash], spec[dash + 1:]
+
+def progress(msg):
+ OS.write(1, ";; %s\n" % msg)
+
+###--------------------------------------------------------------------------
+### Extracting the cross tools.
+
+CREATE = Tag("CREATE")
+FORCE = Tag("FORCE")
+
+def elf_binary_p(arch, path):
+ if not OS.path.isfile(path): return False
+ with open(path, 'rb') as f: magic = f.read(20)
+ if magic[0:4] != "\x7fELF": return False
+ if magic[8:16] != 8*"\0": return False
+ if arch == "i386":
+ if magic[4:7] != "\x01\x01\x01": return False
+ if magic[18:20] != "\x03\x00": return False
+ elif arch == "amd64":
+ if magic[4:7] != "\x02\x01\x01": return False
+ if magic[18:20] != "\x3e\x00": return False
+ else:
+ raise ValueError("unsupported donor architecture `%s'" % arch)
+ return True
+
+def update_cross_tools(meta):
+ with Cleanup() as clean:
+
+ dist, arch = meta.dist, meta.arch
+
+ mymulti = run_program(["dpkg-architecture", "-a" + arch,
+ "-qDEB_HOST_MULTIARCH"],
+ stdout = RETURN).rstrip("\n")
+ crossarchs = [run_program(["dpkg-architecture", "-A" + a,
+ "-qDEB_TARGET_GNU_TYPE"],
+ stdout = RETURN).rstrip("\n")
+ for a in C.FOREIGN_ARCHS]
+
+ crossdir = OS.path.join(C.LOCAL, "cross", "%s-%s" % (dist, arch))
+ crossold = crossdir + ".old"; crossnew = crossdir + ".new"
+ usrbin = OS.path.join(crossnew, "usr/bin")
+
+ clean.enter(lockfile(crosstools_lockfile(dist, arch)))
+ run_root(["rm", "-rf", crossnew])
+ run_root(["mkdir", crossnew])
+
+ ## Open a session to the donor chroot.
+ progress("establish snapshot")
+ session, root = clean.enter(chroot_session(dist, arch))
+
+ ## Make sure the donor tree is up-to-date, and install the extra packages
+ ## we need.
+ progress("install tools packages")
+ run_schroot_session(session, ["eatmydata", "sh", "-e", "-c", """
+ apt-get update
+ apt-get -y upgrade
+ apt-get -y install "$@"
+ """, "."] + C.CROSS_PACKAGES)
+
+ def chase(path):
+ dest = ""
+
+ ## Work through the remaining components of the PATH.
+ while path != "":
+ try: sl = path.index("/")
+ except ValueError: step = path; path = ""
+ else: step, path = path[:sl], path[sl + 1:]
+
+ ## Split off and analyse the first component.
+ if step == "" or step == ".":
+ ## A redundant `/' or `./'. Skip it.
+ pass
+ elif step == "..":
+ ## A `../'. Strip off the trailing component of DEST.
+ dest = dest[:dest.rindex("/")]
+ else:
+ ## Something else. Transfer the component name to DEST.
+ dest += "/" + step
+
+ ## If DEST refers to something in the cross-tools tree then we're
+ ## good.
+ crossdest = crossnew + dest
+ try: st = OS.lstat(crossdest)
+ except OSError, err:
+ if err.errno == E.ENOENT:
+ ## No. We need to copy something from the donor tree so that the
+ ## name works.
+
+ st = OS.lstat(root + dest)
+ if ST.S_ISDIR(st.st_mode):
+ run_root(["mkdir", crossdest])
+ else:
+ progress("copy `%s'" % dest)
+ run_root(["rsync", "-aHR", "%s/.%s" % (root, dest), crossnew])
+ else:
+ raise
+
+ ## If DEST refers to a symbolic link, then prepend the link target to
+ ## PATH so that we can be sure the link will work.
+ if ST.S_ISLNK(st.st_mode):
+ link = OS.readlink(crossdest)
+ if link.startswith("/"): dest = ""; link = link[1:]
+ else:
+ try: dest = dest[:dest.rindex("/")]
+ except ValueError: dest = ""
+ if path == "": path = link
+ else: path = "%s/%s" % (path, link)
+
+ ## Work through the shopping list, copying the things it names into the
+ ## cross-tools tree.
+ scan = []
+ for pat in C.CROSS_PATHS:
+ pat = pat.replace("MULTI", mymulti)
+ any = False
+ for rootpath in GLOB.iglob(root + pat):
+ any = True
+ path = rootpath[len(root):]
+ progress("copy `%s'" % path)
+ run_root(["rsync", "-aHR", "%s/.%s" % (root, path), crossnew])
+ if not any:
+ raise RuntimeError("no matches for cross-tool pattern `%s'" % pat)
+
+ ## Scan the new tree: chase down symbolic links, copying extra stuff that
+ ## we'll need; and examine ELF binaries to make sure we get the necessary
+ ## shared libraries.
+ def visit(_, dir, files):
+ for f in files:
+ path = OS.path.join(dir, f)
+ inside = switch_prefix(path, [(crossnew + "/", "/")])
+ if OS.path.islink(path): chase(inside)
+ elif elf_binary_p(arch, path): scan.append(inside)
+ OS.path.walk(crossnew, visit, None)
+
+ ## Work through the ELF binaries in `scan', determining which shared
+ ## libraries they'll need.
+ with subprocess(["schroot", "-r", "-c", session, "--",
+ "sh", "-e", "-c", """
+ while read path; do
+ ldd "$path" | while read a b c d; do
+ case $a:$b:$c:$d in
+ not:a:dynamic:executable) ;;
+ statically:linked::) ;;
+ /*) echo "+$a" ;;
+ *:=\\>:/*) echo "+$c" ;;
+ linux-*) ;;
+ *) echo >&2 "failed to find shared library \\`$a'"; exit 2 ;;
+ esac
+ done
+ echo -
+ done
+ """], stdin = PIPE, stdout = PIPE) as (fd_in, fd_out, _):
+ v = Struct(n = 0)
+ def line_in():
+ try: path = scan.pop()
+ except IndexError:
+ if v.n: return None
+ else: raise StopIteration
+ else: v.n += 1; return path
+ def line_out(line):
+ if line == "-": v.n -= 1; return
+ assert line.startswith("+")
+ lib = line[1:]
+ path = crossnew + lib
+ try: OS.lstat(path)
+ except OSError, err:
+ if err.errno == E.ENOENT: pass
+ else: raise
+ else: return
+ chase(lib)
+ if elf_binary_p(arch, path):
+ scan.append(switch_prefix(path, [(crossnew + "/", "/")]))
+ select_loop([WriteLinesSelector(fd_in, line_in),
+ ReadLinesSelector(fd_out, line_out)])
+
+ ## Set up the cross-compiler and emulator. This is rather hairy.
+ progress("establish TOOLCHAIN and QEMU")
+ run_root(["mkdir", OS.path.join(crossnew, "TOOLCHAIN")])
+ qemudir = OS.path.join(crossnew, "QEMU")
+ run_root(["mkdir", qemudir])
+ for gnu in C.FOREIGN_GNUARCHS:
+ run_root(["mkdir", OS.path.join(crossnew, "TOOLCHAIN", gnu)])
+ for f in OS.listdir(usrbin):
+ for gnu in C.FOREIGN_GNUARCHS:
+ gnuprefix = gnu + "-"
+ if f.startswith(gnuprefix):
+ tooldir = OS.path.join(crossnew, "TOOLCHAIN", gnu)
+ run_root(["mv", OS.path.join(usrbin, f), tooldir])
+ run_root(["ln", "-s", f,
+ OS.path.join(tooldir, f[len(gnuprefix):])])
+ break
+ else:
+ if f.startswith("qemu-") and f.endswith("-static"):
+ run_root(["mv", OS.path.join(usrbin, f), qemudir])
+ toollib = OS.path.join(crossnew, "TOOLCHAIN", "lib")
+ run_root(["mkdir", toollib])
+ run_root(["ln", "-s", "../../usr/lib/gcc-cross", toollib])
+
+ ## We're done. Replace the old cross-tools with our new one.
+ meta.update = zulu()
+ meta.write(crossnew)
+ if OS.path.exists(crossdir): run_root(["mv", crossdir, crossold])
+ run_root(["mv", crossnew, crossdir])
+ run_root(["rm", "-rf", crossold])
+
+class CrossToolsJob (BaseJob):
+
+ @classmethod
+ def specs(cls): return C.NATIVE_CHROOTS
+
+ def __init__(me, spec, fresh = CREATE, *args, **kw):
+ super(CrossToolsJob, me).__init__(*args, **kw)
+ me._dist, me._arch = split_dist_arch(spec)
+ me._fresh = fresh
+ me._meta = CrossToolsMetadata.read(me._dist, me._arch)
+ me._chroot = ChrootJob.ensure("%s-%s" % (me._dist, me._arch), fresh)
+ me.await(me._chroot)
+
+ def _mkname(me): return "cross-tools.%s-%s" % (me._dist, me._arch)
+
+ def check(me):
+ status, reason = super(CrossToolsJob, me).check()
+ if status is not READY: return status, reason
+ if me._chroot.started: return READY, "prerequisites run"
+ return check_fresh(me._fresh, me._meta.update)
+
+ def run(me):
+ update_cross_tools(me._meta)
+
+###--------------------------------------------------------------------------
+### Installing the cross tools.
+
+R_DIVERT = RX.compile(r"^diversion of (.*) to .* by install-cross-tools$")
+
+def _install_cross_tools(meta):
+ with Cleanup() as clean:
+
+ dist, arch = meta.dist, meta.arch
+
+ mymulti = run_program(["dpkg-architecture", "-a", C.TOOLSARCH,
+ "-qDEB_HOST_MULTIARCH"],
+ stdout = RETURN).rstrip("\n")
+ gnuarch = run_program(["dpkg-architecture", "-A", arch,
+ "-qDEB_TARGET_GNU_TYPE"],
+ stdout = RETURN).rstrip("\n")
+
+ crossdir = OS.path.join(C.LOCAL, "cross", "%s-%s" % (dist, C.TOOLSARCH))
+
+ qarch, qhost = C.QEMUARCH[arch], C.QEMUHOST[arch]
+ qemudir = OS.path.join(C.LOCAL, "cross", "%s-%s" % (dist, qhost), "QEMU")
+
+ ## Acquire lockfiles in a canonical order to prevent deadlocks.
+ donors = [C.TOOLSARCH]
+ if qarch != C.TOOLSARCH: donors.append(qarch)
+ donors.sort()
+ for a in donors:
+ clean.enter(lockfile(crosstools_lockfile(dist, a), exclp = False))
+
+ ## Open a session.
+ session, root = clean.enter(chroot_session(dist, arch, sourcep = True))
+
+ ## Search the cross-tools tree for tools, to decide what to do with each
+ ## file. Make lists:
+ ##
+ ## * `want_div' is simply a set of all files in the chroot which need
+ ## dpkg diversions to prevent foreign versions of the tools from
+ ## clobbering our native versions.
+ ##
+ ## * `want_link' is a dictionary mapping paths which need symbolic
+ ## links into the cross-tools trees to their link destinations.
+ want_div = set()
+ want_link = dict()
+ cross_prefix = crossdir + "/"
+ qemu_prefix = qemudir + "/"
+ toolchain_prefix = OS.path.join(crossdir, "TOOLCHAIN", gnuarch) + "/"
+ def examine(path):
+ dest = switch_prefix(path, [(qemu_prefix, "/usr/bin/"),
+ (toolchain_prefix, "/usr/bin/"),
+ (cross_prefix, "/")])
+ if OS.path.islink(path): src = OS.readlink(path)
+ else: src = host_to_chroot(path)
+ want_link[dest] = src
+ if not OS.path.isdir(path): want_div.add(dest)
+ examine(OS.path.join(qemudir, "qemu-%s-static" % qarch))
+ examine(OS.path.join(crossdir, "lib", mymulti))
+ examine(OS.path.join(crossdir, "usr/lib", mymulti))
+ examine(OS.path.join(crossdir, "usr/lib/gcc-cross"))
+ def visit(_, dir, files):
+ ff = []
+ for f in files:
+ if f == "META" or f == "QEMU" or f == "TOOLCHAIN" or \
+ (dir.endswith("/lib") and (f == mymulti or f == "gcc-cross")):
+ continue
+ ff.append(f)
+ path = OS.path.join(dir, f)
+ if not OS.path.isdir(path): examine(path)
+ files[:] = ff
+ OS.path.walk(crossdir, visit, None)
+ OS.path.walk(OS.path.join(crossdir, "TOOLCHAIN", gnuarch),
+ visit, None)
+
+ ## Build the set `have_div' of paths which already have diversions.
+ have_div = set()
+ with subprocess(["schroot", "-uroot", "-r", "-c", session, "--",
+ "dpkg-divert", "--list"],
+ stdout = PIPE) as (_, fd_out, _):
+ try:
+ f = OS.fdopen(fd_out)
+ for line in f:
+ m = R_DIVERT.match(line.rstrip("\n"))
+ if m: have_div.add(m.group(1))
+ finally:
+ f.close()
+
+ ## Build a dictionary `have_link' of symbolic links into the cross-tools
+ ## trees.
+ have_link = dict()
+ with subprocess(["schroot", "-uroot", "-r", "-c", session, "--",
+ "sh", "-e", "-c", """
+ find / -xdev -lname "/usr/local.schroot/cross/*" -printf "%p %l\n"
+ """], stdout = PIPE) as (_, fd_out, _):
+ try:
+ f = OS.fdopen(fd_out)
+ for line in f:
+ dest, src = line.split()
+ have_link[dest] = src
+ finally:
+ f.close()
+
+ ## Add diversions for the paths which need one, but don't have one.
+ ## There's a hack here because the `--no-rename' option was required in
+ ## the same version in which it was introduced, so there's no single
+ ## incantation that will work across the boundary.
+ with subprocess(["schroot", "-uroot", "-r", "-c", session, "--",
+ "sh", "-e", "-c", """
+ a="%(arch)s"
+
+ if dpkg-divert >/dev/null 2>&1 --no-rename --help
+ then no_rename=--no-rename
+ else no_rename=
+ fi
+
+ while read path; do
+ dpkg-divert --package "install-cross-tools" $no_rename \
+ --divert "$path.$a" --add "$path"
+ done
+ """ % dict(arch = arch)], stdin = PIPE) as (fd_in, _, _):
+ try:
+ f = OS.fdopen(fd_in, 'w')
+ for path in want_div:
+ if path not in have_div: f.write(path + "\n")
+ finally:
+ f.close()
+
+ ## Go through each diverted tool, and, if it hasn't been moved aside,
+ ## then /link/ it across now. If we rename it, then the chroot will stop
+ ## working -- which is why we didn't allow `dpkg-divert' to do the
+ ## rename. We can tell a tool that hasn't been moved, because it's a
+ ## symlink into one of the cross trees.
+ chroot_cross_prefix = host_to_chroot(crossdir) + "/"
+ chroot_qemu_prefix = host_to_chroot(qemudir) + "/"
+ for path in want_div:
+ real = root + path; div = real + "." + arch
+ if OS.path.exists(div): continue
+ if not OS.path.exists(real): continue
+ if OS.path.islink(real):
+ realdest = OS.readlink(real)
+ if realdest.startswith(chroot_cross_prefix) or \
+ realdest.startswith(chroot_qemu_prefix):
+ continue
+ toolsdest = OS.readlink(crossdir + path)
+ if realdest == toolsdest: continue
+ progress("preserve existing foreign file `%s'" % path)
+ run_root(["ln", real, div])
+
+ ## Update all of the symbolic links which are currently wrong: add links
+ ## which are missing, delete ones which are obsolete, and update ones
+ ## which have the wrong target.
+ for path, src in want_link.iteritems():
+ real = root + path
+ try: old_src = have_link[path]
+ except KeyError: pass
+ else:
+ if src == old_src: continue
+ new = real + ".new"
+ progress("link `%s' -> `%s'" % (path, src))
+ dir = OS.path.dirname(real)
+ if not OS.path.isdir(dir): run_root(["mkdir", "-p", dir])
+ if OS.path.exists(new): run_root(["rm", "-f", new])
+ run_root(["ln", "-s", src, new])
+ run_root(["mv", new, real])
+ for path in have_link.iterkeys():
+ if path in want_link: continue
+ progress("remove obsolete link `%s' -> `%s'" % path)
+ real = root + path
+ run_root(["rm", "-f", real])
+
+ ## Remove diversions from paths which don't need them any more. Here
+ ## it's safe to rename, because either the tool isn't there, in which
+ ## case it obviously wasn't important, or it is, and `dpkg-divert' will
+ ## atomically replace our link with the foreign version.
+ with subprocess(["schroot", "-uroot", "-r", "-c", session, "--",
+ "sh", "-e", "-c", """
+ a="%(arch)s"
+
+ while read path; do
+ dpkg-divert --package "install-cross-tools" --rename \
+ --divert "$path.$a" --remove "$path"
+ done
+ """ % dict(arch = arch)], stdin = PIPE) as (fd_in, _, _):
+ try:
+ f = OS.fdopen(fd_in, 'w')
+ for path in have_div:
+ if path not in want_div: f.write(path + "\n")
+ finally:
+ f.close()
+
+def install_cross_tools(meta):
+ with lockfile(chroot_src_lockfile(meta.dist, meta.arch)):
+ _install_cross_tools(meta)
+
+###--------------------------------------------------------------------------
+### Constructing a chroot.
+
+def make_chroot(meta):
+ with Cleanup() as clean:
+
+ dist, arch = meta.dist, meta.arch
+ clean.enter(lockfile(chroot_src_lockfile(dist, arch)))
+
+ mnt = chroot_src_mntpt(dist, arch)
+ dev = chroot_src_blkdev(dist, arch)
+ lv = chroot_src_lv(dist, arch)
+ newlv = lv + ".new"
+
+ ## Clean up any leftover debris.
+ if mountpoint_p(mnt): umount(mnt)
+ if block_device_p(dev):
+ run_root(["lvremove", "-f", "%s/%s" % (C.VG, lv)])
+
+ ## Create the logical volume and filesystem. It's important that the
+ ## logical volume not have its official name until after it contains a
+ ## mountable filesystem.
+ progress("create filesystem")
+ run_root(["lvcreate", "--yes", C.LVSZ, "-n", newlv, C.VG])
+ run_root(["mkfs", "-j", "-L%s-%s" % (dist, arch),
+ OS.path.join("/dev", C.VG, newlv)])
+ run_root(["lvrename", C.VG, newlv, lv])
+
+ ## Start installing the chroot.
+ with mount_chroot_src(dist, arch) as mnt:
+
+ ## Set the basic structure.
+ run_root(["mkdir", "-m755", OS.path.join(mnt, "fs")])
+ run_root(["chmod", "750", mnt])
+
+ ## Install the base system.
+ progress("install base system")
+ run_root(["eatmydata", "debootstrap"] +
+ (arch in C.FOREIGN_ARCHS and ["--foreign"] or []) +
+ ["--arch=" + arch, "--variant=minbase",
+ "--include=" + ",".join(C.BASE_PACKAGES),
+ dist, OS.path.join(mnt, "fs"), C.DEBMIRROR])
+
+ ## If this is a cross-installation, then install the necessary `qemu'
+ ## and complete the installation.
+ if arch in C.FOREIGN_ARCHS:
+ qemu = OS.path.join("cross", "%s-%s" % (dist, C.QEMUHOST[arch]),
+ "QEMU", "qemu-%s-static" % C.QEMUARCH[arch])
+ run_root(["install", OS.path.join(C.LOCAL, qemu),
+ OS.path.join(mnt, "fs/usr/bin")])
+ run_root(["chroot", OS.path.join(mnt, "fs"),
+ "/debootstrap/debootstrap", "--second-stage"])
+ run_root(["ln", "-sf",
+ OS.path.join("/usr/local.schroot", qemu),
+ OS.path.join(mnt, "fs/usr/bin")])
+
+ ## Set up `/usr/local'.
+ progress("install `/usr/local' symlink")
+ run_root(["rm", "-rf", OS.path.join(mnt, "fs/usr/local")])
+ run_root(["ln", "-s", "local.schroot",
+ OS.path.join(mnt, "fs/usr/local")])
+
+ ## Install the `apt' configuration.
+ progress("configure package manager")
+ run_root(["rm", "-f", OS.path.join(mnt, "fs/etc/apt/sources.list")])
+ for c in C.APTCONF:
+ run_root(["ln", "-s",
+ OS.path.join("/usr/local.schroot/etc/apt/apt.conf.d", c),
+ OS.path.join(mnt, "fs/etc/apt/apt.conf.d")])
+ run_root(["ln", "-s",
+ "/usr/local.schroot/etc/apt/sources.%s" % dist,
+ OS.path.join(mnt, "fs/etc/apt/sources.list")])
+
+ with safewrite_root\
+ (OS.path.join(mnt, "fs/etc/apt/apt.conf.d/20arch")) as f:
+ f.write("""\
+### -*-conf-*-
+
+APT {
+ Architecture "%s";
+};
+""" % arch)
+
+ ## Set up the locale and time zone from the host system.
+ progress("configure locales and timezone")
+ run_root(["cp", "/etc/locale.gen", "/etc/timezone",
+ OS.path.join(mnt, "fs/etc")])
+ with open("/etc/timezone") as f: tz = f.readline().strip()
+ run_root(["ln", "-sf",
+ OS.path.join("/usr/share/timezone", tz),
+ OS.path.join(mnt, "fs/etc/localtime")])
+ run_root(["cp", "/etc/default/locale",
+ OS.path.join(mnt, "fs/etc/default")])
+
+ ## Fix `/etc/mtab'.
+ progress("set `/etc/mtab'")
+ run_root(["ln", "-sf", "/proc/mounts",
+ OS.path.join(mnt, "fs/etc/mtab")])
+
+ ## Prevent daemons from starting within the chroot.
+ progress("inhibit daemon startup")
+ with safewrite_root(OS.path.join(mnt, "fs/usr/sbin/policy-rc.d"),
+ mode = "755") as f:
+ f.write("""\
+#! /bin/sh
+echo >&2 "policy-rc.d: Services disabled by policy."
+exit 101
+""")
+
+ ## Hack the dynamic linker to prefer libraries in `/usr' over
+ ## `/usr/local'. This prevents `dpkg-shlibdeps' from becoming
+ ## confused.
+ progress("configure dynamic linker")
+ with safewrite_root\
+ (OS.path.join(mnt, "fs/etc/ld.so.conf.d/libc.conf")) as f:
+ f.write("# libc default configuration")
+ with safewrite_root\
+ (OS.path.join(mnt, "fs/etc/ld.so.conf.d/zzz-local.conf")) as f:
+ f.write("""\
+### -*-conf-*-
+### Local hack to make /usr/local/ late.
+/usr/local/lib
+""")
+
+ ## If this is a foreign architecture then we need to set it up.
+ if arch in C.FOREIGN_ARCHS:
+
+ ## Keep the chroot's native Qemu out of our way: otherwise we'll stop
+ ## being able to run programs in the chroot. There's a hack here
+ ## because the `--no-rename' option was required in the same version in
+ ## which is was introduced, so there's no single incantation that will
+ ## work across the boundary.
+ progress("divert emulator")
+ run_schroot_source(dist, arch, ["eatmydata", "sh", "-e", "-c", """
+ if dpkg-divert >/dev/null 2>&1 --no-rename --help
+ then no_rename=--no-rename
+ else no_rename=
+ fi
+
+ dpkg-divert --package install-cross-tools $no_rename \
+ --divert /usr/bin/%(qemu)s.%(arch)s --add /usr/bin/%(qemu)s
+ """ % dict(arch = arch, qemu = C.QEMUARCH[arch])])
+
+ ## Install faster native tools.
+ _install_cross_tools(meta)
+
+ ## Finishing touches.
+ progress("finishing touches")
+ run_schroot_source(dist, arch, ["eatmydata", "sh", "-e", "-c", """
+ apt-get update
+ apt-get -y upgrade
+ apt-get -y install "$@"
+ ldconfig
+ apt-get -y autoremove
+ apt-get clean
+ """, "."] + C.EXTRA_PACKAGES)
+
+ ## Mark the chroot as done.
+ meta.update = zulu()
+ meta.write()
+
+def update_chroot(meta):
+ with Cleanup() as clean:
+ dist, arch = meta.dist, meta.arch
+ clean.enter(lockfile(chroot_src_lockfile(dist, arch)))
+ run_schroot_source(dist, arch, ["eatmydata", "sh", "-e", "-c", """
+ apt-get update
+ apt-get -y dist-upgrade
+ apt-get -y autoremove
+ apt-get -y clean
+ """])
+ if arch in C.FOREIGN_ARCHS: _install_cross_tools(meta)
+
+def check_fresh(fresh, update):
+ if update is None: return READY, "must create"
+ elif fresh is FORCE: return READY, "update forced"
+ elif fresh is CREATE: return DONE, "already created"
+ elif NOW - unzulu(update) < update: return READY, "too stale: updating"
+ else: return DONE, "already sufficiently up-to-date"
+
+class ChrootJob (BaseJob):
+
+ @classmethod
+ def specs(cls): return C.ALL_CHROOTS
+
+ def __init__(me, spec, fresh = CREATE, *args, **kw):
+ super(ChrootJob, me).__init__(*args, **kw)
+ me._dist, me._arch = split_dist_arch(spec)
+ me._fresh = fresh
+ me._meta = ChrootMetadata.read(me._dist, me._arch)
+ me._tools_chroot = me._qemu_chroot = None
+
+ if me._arch in C.FOREIGN_ARCHS:
+ me._tools_chroot = CrossToolsJob.ensure\
+ ("%s-%s" % (me._dist, C.TOOLSARCH), me._fresh)
+ me._qemu_chroot = CrossToolsJob.ensure\
+ ("%s-%s" % (me._dist, C.QEMUHOST[me._arch]), me._fresh)
+ me.await(me._tools_chroot)
+ me.await(me._qemu_chroot)
+
+ def _mkname(me): return "chroot.%s-%s" % (me._dist, me._arch)
+
+ def check(me):
+ status, reason = super(ChrootJob, me).check()
+ if status is not READY: return status, reason
+ if (me._tools_chroot is not None and me._tools_chroot.started) or \
+ (me._qemu_chroot is not None and me._qemu_chroot.started):
+ return READY, "prerequisites run"
+ return check_fresh(me._fresh, me._meta.update)
+
+ def run(me):
+ if me._meta.update is not None: update_chroot(me._meta)
+ else: make_chroot(me._meta)
+
###--------------------------------------------------------------------------
### Process the configuration and options.
-CONFIG = {}
R_CONFIG = RX.compile(r"^([a-zA-Z0-9_]+)='(.*)'$")
-def read_config():
- raw = r"""
- """; raw = open('state/config.sh').read(); _ignore = """ @@@config@@@
- """
- for line in raw.split('\n'):
- line = line.strip()
- if not line or line.startswith('#'): continue
- m = R_CONFIG.match(line)
- if not m: raise ExpectedError("bad config line `%s'" % line)
- CONFIG[m.group(1)] = m.group(2).replace("'\\''", "'")
+
+class Config (object):
+
+ def _conv_str(s): return s
+ def _conv_list(s): return s.split()
+ def _conv_set(s): return set(s.split())
+
+ _CONVERT = {
+ "ROOTLY": _conv_list,
+ "DISTS": _conv_set,
+ "MYARCH": _conv_set,
+ "NATIVE_ARCHS": _conv_set,
+ "FOREIGN_ARCHS": _conv_set,
+ "FOREIGN_GNUARCHS": _conv_list,
+ "ALL_ARCHS": _conv_set,
+ "NATIVE_CHROOTS": _conv_set,
+ "FOREIGN_CHROOTS": _conv_set,
+ "ALL_CHROOTS": _conv_set,
+ "BASE_PACKAGES": _conv_list,
+ "EXTRA_PACKAGES": _conv_list,
+ "CROSS_PACKAGES": _conv_list,
+ "CROSS_PATHS": _conv_list,
+ "APTCONF": _conv_list,
+ "LOCALPKGS": _conv_list,
+ "SCHROOT_COPYFILES": _conv_list,
+ "SCHROOT_NSSDATABASES": _conv_list
+ }
+
+ _CONV_MAP = {
+ "*_APTCONFSRC": ("APTCONFSRC", _conv_str),
+ "*_DEPS": ("PKGDEPS", _conv_list),
+ "*_QEMUHOST": ("QEMUHOST", _conv_str),
+ "*_QEMUARCH": ("QEMUARCH", _conv_str),
+ "*_ALIASES": ("DISTALIAS", _conv_str)
+ }
+
+ _conv_str = staticmethod(_conv_str)
+ _conv_list = staticmethod(_conv_list)
+ _conv_set = staticmethod(_conv_set)
+
+ def __init__(me):
+ raw = r"""
+ """; raw = open('state/config.sh').read(); _ignore = """ @@@config@@@
+ """
+ me._conf = {}
+ for line in raw.split("\n"):
+ line = line.strip()
+ if not line or line.startswith('#'): continue
+ m = R_CONFIG.match(line)
+ if not m: raise ExpectedError("bad config line `%s'" % line)
+ k, v = m.group(1), m.group(2).replace("'\\''", "'")
+ d = me._conf
+ try: conv = me._CONVERT[k]
+ except KeyError:
+ i = 0
+ while True:
+ try: i = k.index("_", i + 1)
+ except ValueError: conv = me._conv_str; break
+ try: map, conv = me._CONV_MAP["*" + k[i:]]
+ except KeyError: pass
+ else:
+ d = me._conf.setdefault(map, dict())
+ k = k[:i]
+ if k.startswith("_"): k = k[1:]
+ break
+ d[k] = conv(v)
+
+ def __getattr__(me, attr):
+ try: return me._conf[attr]
+ except KeyError, err: raise AttributeError(err.args[0])
OPTIONS = OP.OptionParser(usage = 'chroot-maint [-ikns] [-jN] CMD [ARGS...]')
for short, long, props in [
### Main program.
class SleepJob (BaseJob):
- def __init__(me, nsec, label, deps = [], *arg, **kw):
- super(SleepJob, me).__init__(*arg, **kw)
+ def __init__(me, nsec, label, deps = [], *args, **kw):
+ super(SleepJob, me).__init__(*args, **kw)
me._label = label
me._nsec = nsec
for j in deps: me.await(j)
OS.write(1, "Done.")
class BadJob (BaseJob):
- def __init__(me, deps, *arg, **kw):
- super(BadJob, me).__init__(*arg, **kw)
+ def __init__(me, deps, *args, **kw):
+ super(BadJob, me).__init__(*args, **kw)
for j in deps: me.await(j)
def _mkname(me): return 'bad'
def run(me):
SYS.stdout.flush()
OS._exit(69)
+class TestJob (BaseJob):
+ @classmethod
+ def specs(cls): return ["-"]
+ def __init__(me, spec, *args, **kw):
+ super(TestJob, me).__init__(*args, **kw)
+ def _mkname(me): return "test"
+ def run(me):
+ run_program(["sh", "-e", "-c", "env | sort"])
+ run_program(["ls", "-l", "/proc/self/fd"])
+
R_JOBSERV = RX.compile(r'^--jobserver-(?:fds|auth)=(\d+),(\d+)$')
def coin(p = 0.5): return R.random() < p
+JOBMAP = { "chroot": ChrootJob,
+ "cross-tools": CrossToolsJob,
+ "test": TestJob }
+
try:
OPT, args = OPTIONS.parse_args()
rfd, wfd = -1, -1
elif ch == 's': OPT.silent = True
if OPT.njobs < 1:
raise ExpectedError("running no more than %d jobs is silly" % OPT.njobs)
- read_config()
+
+ C = Config()
SCHED = JobScheduler(rfd, wfd, njobs)
- bad = BadJob.ensure([])
- l0 = [SleepJob.ensure(0.2, "j0.%d" % i)
- for i in xrange(10)]
- l1 = [SleepJob.ensure(0.5, "j1.%d" % i, [j for j in l0 if coin()])
- for i in xrange(10)]
- l2 = [SleepJob.ensure(1.0, "j2.%d" % i, [j for j in l1 if coin()] + [bad])
- for i in xrange(10)]
+ OS.environ["http_proxy"] = C.PROXY
+
+ if not args: OPTIONS.print_usage(SYS.stderr); SYS.exit(2)
+ try: jobcls = JOBMAP[args[0]]
+ except KeyError: raise ExpectedError("unknown job type `%s'" % args[0])
+
+ all_specs = jobcls.specs()
+ if len(args) == 1:
+ specs = all_specs
+ else:
+ specs = []
+ for pat in args[1:]:
+ any = False
+ for s in all_specs:
+ if FM.fnmatch(s, pat): specs.append(s); any = True
+ if not any: raise ExpectedError("no match for `%s'" % pat)
+ for s in specs:
+ jobcls.ensure(s)
+
SCHED.run()
+
except ExpectedError, err:
- error(err.message)
-finally:
- run_cleanups()
+ error(err)
SYS.exit(RC)
###----- That's all, folks --------------------------------------------------