--- /dev/null
+#! /usr/bin/python
+###
+### Create, upgrade, and maintain (native and cross-) chroots
+###
+### (c) 2018 Mark Wooding
+###
+
+###----- Licensing notice ---------------------------------------------------
+###
+### This file is part of the distorted.org.uk chroot maintenance tools.
+###
+### distorted-chroot is free software: you can redistribute it and/or
+### modify it under the terms of the GNU General Public License as
+### published by the Free Software Foundation; either version 2 of the
+### License, or (at your option) any later version.
+###
+### distorted-chroot is distributed in the hope that it will be useful,
+### but WITHOUT ANY WARRANTY; without even the implied warranty of
+### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+### General Public License for more details.
+###
+### You should have received a copy of the GNU General Public License
+### along with distorted-chroot. If not, write to the Free Software
+### Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307,
+### USA.
+
+import errno as E
+import fcntl as FC
+import optparse as OP
+import os as OS
+import random as R
+import re as RX
+import signal as SIG
+import select as SEL
+import sys as SYS
+import time as T
+
+import jobclient as JC
+
+QUIS = OS.path.basename(SYS.argv[0])
+TODAY = T.strftime("%Y-%m-%d")
+
+###--------------------------------------------------------------------------
+### Random utilities.
+
+class ExpectedError (Exception):
+ """A fatal error which shouldn't print a backtrace."""
+ pass
+
+def spew(msg):
+ """Print MSG to stderr as a debug trace."""
+ if OPT.debug: OS.write(2, ";; %s\n" % msg)
+
+RC = 0
+def moan(msg):
+ """Print MSG to stderr as a warning."""
+ if not OPT.silent: OS.write(2, "%s: %s\n" % (QUIS, msg))
+def error(msg):
+ """Print MSG to stderr, and remember to exit nonzero."""
+ global RC
+ moan(msg)
+ RC = 2
+
+class Tag (object):
+ """Unique objects with no internal structure."""
+ def __init__(me, label): me._label = label
+ def __str__(me): return '#<%s %s>' % (me.__class__.__name__, me._label)
+ def __repr__(me): return '#<%s %s>' % (me.__class__.__name__, me._label)
+
+###--------------------------------------------------------------------------
+### Running parallel jobs.
+
+## Return codes from `check'
+SLEEP = Tag('SLEEP')
+READY = Tag('READY')
+FAILED = Tag('FAILED')
+DONE = Tag('DONE')
+
+class BaseJob (object):
+ """
+ Base class for jobs.
+
+ Subclasses must implement `run' and `_mkname', and probably ought to extend
+ `check'.
+ """
+
+ ## A magic token to prevent sneaky uninterned jobs.
+ _MAGIC = Tag('MAGIC')
+
+ ## A map from job names to objects.
+ _MAP = {}
+
+ ## Number of tail lines of the log to print on failure.
+ LOGLINES = 20
+
+ def __init__(me, _token, *args, **kw):
+ """
+ Initialize a job.
+
+ Jobs are interned! Don't construct instances (of subclasses) directly:
+ use the `ensure' class method.
+ """
+ assert token is me._MAGIC
+ super(BaseJob, me).__init__(*args, **kw)
+
+ ## Dependencies on other jobs.
+ me._deps = set()
+ me._waiting = set()
+
+ ## Attributes maintained by the JobServer
+ me.done = False
+ me.win = None
+ me._token = None
+ me._known = False
+ me._logkid = -1
+ me._logfile = None
+
+ @classmethod
+ def ensure(cls, *args, **kw):
+ """
+ Return the unique job with the given parameters.
+
+ If a matching job already exists, then return it. Otherwise, create the
+ new job, register it in the table, and notify the scheduler about it.
+ """
+ me = cls(_token = cls._MAGIC, *args, **kw)
+ try:
+ job = cls._MAP[me.name]
+ except KeyError:
+ cls._MAP[me.name] = me
+ SCHED.add(me)
+ return me
+ else:
+ return job
+
+ ## Naming.
+ @property
+ def name(me):
+ """Return the job's name, as calculated by `_mkname'."""
+ try: name = me._name
+ except AttributeError: name = me._name = me._mkname()
+ return name
+
+ ## Subclass responsibilities.
+ def _mkname(me):
+ """
+ Return the job's name.
+
+ By default, this is an unhelpful string which is distinct for every job.
+ Subclasses should normally override this method to return a name as an
+ injective function of the job parameters.
+ """
+ return "%s.%x" % (me.__class__.__name__, id(me))
+
+ def check(me):
+ """
+ Return whether the job is ready to run.
+
+ Returns a pair STATE, REASON. The REASON is a human-readable string
+ explaining what's going on, or `None' if it's not worth explaining. The
+ STATE is one of the following.
+
+ * `READY' -- the job can be run at any time.
+
+ * `FAILED' -- the job can't be started. Usually, this means that some
+ prerequisite job failed, there was some error in the job's
+ parameters, or the environment is unsuitable for the job to run.
+
+ * `DONE' -- the job has nothing to do. Usually, this means that the
+ thing the job acts on is already up-to-date. It's bad form to do
+ even minor work in `check'.
+
+ * `SLEEP' -- the job can't be run right now. It has arranged to be
+ retried if conditions change. (Spurious wakeups are permitted and
+ must be handled correctly.)
+
+ The default behaviour checks the set of dependencies, as built by the
+ `await' method, and returns `SLEEP' or `FAILED' as appropriate, or
+ `READY' if all the prerequisite jobs have completed successfully.
+ """
+ for job in me._deps:
+ if not job.done:
+ job._waiting.add(me)
+ return SLEEP, "waiting for job `%s'" % job.name
+ elif not job.win and not OPT.ignerr:
+ return FAILED, "dependent on failed job `%s'" % job.name
+ return READY, None
+
+ ## Subclass utilities.
+ def await(me, job):
+ """Make sure that JOB completes before allowing this job to start."""
+ me._deps.add(job)
+
+ def _logtail(me):
+ """
+ Dump the last `LOGLINES' lines of the logfile.
+
+ This is called if the job fails and was being run quietly, to provide the
+ user with some context for the failure.
+ """
+
+ ## Gather blocks from the end of the log until we have enough lines.
+ with open(me._logfile, 'r') as f:
+ nlines = 0
+ bufs = []
+ bufsz = 4096
+ f.seek(0, 2); off = f.tell()
+ spew("start: off = %d" % off)
+ while nlines <= me.LOGLINES and off > 0:
+ off = max(0, off - bufsz)
+ f.seek(off, 0)
+ spew("try at off = %d" % off)
+ buf = f.read(bufsz)
+ nlines += buf.count('\n')
+ spew("now lines = %d" % nlines)
+ bufs.append(buf)
+ buf = ''.join(reversed(bufs))
+
+ ## We probably overshot. Skip the extra lines from the start.
+ i = 0
+ while nlines > me.LOGLINES: i = buf.index('\n', i) + 1; nlines -= 1
+
+ ## If we ended up trimming the log, print an ellipsis.
+ if off > 0 or i > 0: print "%-24s * [...]" % me.name
+
+ ## Print the log tail.
+ lines = buf[i:].split('\n')
+ if lines and lines[-1] == '': lines.pop()
+ for line in lines: print "%-24s %s" % (me.name, line)
+
+class BaseJobToken (object):
+ """
+ A job token is the authorization for a job to be run.
+
+ Subclasses must implement `recycle' to allow some other job to use the
+ token.
+ """
+ pass
+
+class TrivialJobToken (BaseJobToken):
+ """
+ A trivial reusable token, for when issuing jobs in parallel without limit.
+
+ There only needs to be one of these.
+ """
+ def recycle(me): pass
+TRIVIAL_TOKEN = TrivialJobToken()
+
+class JobServerToken (BaseJobToken):
+ """A job token storing a byte from the jobserver pipe."""
+ def __init__(me, char, pipefd, *arg, **kw):
+ super(JobServerToken, me).__init__(*arg, **kw)
+ me._char = char
+ me._fd = pipefd
+ def recycle(me):
+ OS.write(me._fd, me._char)
+
+class PrivateJobToken (BaseJobToken):
+ """
+ The private job token belonging to a scheduler.
+
+ When running under a GNU Make jobserver, there is a token for each byte in
+ the pipe, and an additional one which represents the slot we're actually
+ running in. This class represents that additional token.
+ """
+ def __init__(me, sched, *arg, **kw):
+ super(JobServerToken, me).__init__(*arg, **kw)
+ me._sched = sched
+ def recycle(me):
+ assert me._sched._privtoken is None
+ me._sched._privtoken = me
+
+class JobStreamLogger (object):
+ """Log an output stream from a job, to stdout, and into a logfile."""
+ def __init__(me, fd, tag, marker, logfd = -1):
+ """
+ Initialize the JobStreamLogger.
+
+ * FD is the descriptor to read. It will be made nonblocking.
+
+ * LOGFD is the logfile descriptor to write, or -1 if we're not
+ logging.
+
+ * The TAG and MARKER are used to format the log messages.
+
+ If `OPT.quiet' is false, then lines read from FD are written to standard
+ output as
+
+ TAG X LINE
+
+ where X is the MARKER character used to distinguish this output stream,
+ because they're all interleaved in the logfile. The TAG (and following
+ space) is not written to the logfile, since the file's name is sufficient
+ to explain what's going on.
+
+ See `preselect' and `postselect' for the protocol.
+ """
+
+ ## Store the parameters.
+ me._fd = fd
+ me._logfd = logfd
+ me._tag = tag
+ me._marker = marker
+
+ ## Make the descriptor nonblocking.
+ FC.fcntl(fd, FC.F_SETFL, FC.fcntl(fd, FC.F_GETFL) | OS.O_NONBLOCK)
+
+ ## Clear the line buffer.
+ me._buf = ""
+
+ def preselect(me, rfds):
+ """
+ Augment the list RFDS with our input descriptor, if it's still active.
+
+ If the list is empty then the logging process will quit.
+ """
+ if me._fd >= 0: rfds.append(me._fd)
+
+ def postselect(me, fd):
+ """Read and process input if FD is our descriptor."""
+
+ ## Check the descriptor.
+ if fd != me._fd: return
+
+ ## If we encounter end-of-file, we'll close. Stop then, or if we run out
+ ## of stuff.
+ while me._fd >= 0:
+
+ ## Try to read some data. If there's nothing to read then we're done.
+ try:
+ buf = OS.read(me._fd, 4096)
+ except OSError, err:
+ if err.errno == E.EAGAIN or err.errno == E.WOULDBLOCK: break
+ else: raise
+
+ ## Work out what to do.
+ if buf == "":
+ ## We've encountered end-of-file. Close the descriptor. If there's
+ ## a final unterminated line in the buffer, then we'll have to drain
+ ## it.
+
+ OS.close(me._fd); me._fd = -1
+ if me._buf == "": lines = []
+ else: lines = [me._buf]; me._buf = ""
+
+ else:
+ ## Some data arrived. Append it to our existing buffer, and gather
+ ## any complete lines, leaving the final stub behind for next time.
+
+ lines = (me._buf + buf).split('\n')
+ lines, me._buf = lines[:-1], lines[-1]
+
+ ## Format and print the lines we found.
+ for line in lines:
+ if not OPT.quiet:
+ OS.write(1, '%-24s %s %s\n' % (me._tag, me._marker, line))
+ if me._logfd != -1:
+ OS.write(me._logfd, '%s %s\n' % (me._marker, line))
+
+class JobScheduler (object):
+ """
+ The main machinery for running and ordering jobs.
+
+ This handles all of the details of job scheduling.
+ """
+
+ def __init__(me, rfd = -1, wfd = -1, npar = 1):
+ """
+ Initialize a scheduler.
+
+ * RFD and WFD are the read and write ends of the jobserver pipe, as
+ determined from the `MAKEFLAGS' environment variable, or -1.
+
+ * NPAR is the maximum number of jobs to run in parallel, or `True' if
+ there is no maximum (i.e., we're in `forkbomb' mode).
+ """
+
+ ## Set the parallelism state. The `_rfd' and `_wfd' are the read and
+ ## write ends of the jobserver pipe, or -1 if there is no jobserver.
+ ## `_par' is true if we're meant to run jobs in parallel. The case _par
+ ## and _rfd = -1 means unconstrained parallelism.
+ ##
+ ## The jobserver pipe contains a byte for each shared job slot. A
+ ## scheduler reads a byte from the pipe for each job it wants to run
+ ## (nearly -- see `_privtoken' below), and puts the byte back when the
+ ## job finishes. The GNU Make jobserver protocol specification insists
+ ## that we preserve the value of the byte in the pipe (though doesn't
+ ## currently make any use of this flexibility), so we record it in a
+ ## `JobToken' object's `_char' attribute.
+ me._par = rfd != -1 or npar is True or npar != 1
+ spew("par is %r" % me._par)
+ if rfd == -1 and npar > 1:
+ rfd, wfd = OS.pipe()
+ OS.write(wfd, (npar - 1)*'+')
+ me._rfd = rfd; me._wfd = wfd
+
+ ## The scheduler state. A job starts in the `_check' list. Each
+ ## iteration of the scheduler loop will inspect the jobs here and see
+ ## whether it's ready to run: if not, it gets put in the `_sleep' list,
+ ## where it will languish until something moves it back; if it is ready,
+ ## it gets moved to the `_ready' list to wait for a token from the
+ ## jobserver. At that point the job can be started, and it moves to the
+ ## `_kidmap', which associates a process-id with each running job.
+ ## Finally, jobs which have completed are simply forgotten. The `_njobs'
+ ## counter keeps track of how many jobs are outstanding, so that we can
+ ## stop when there are none left.
+ me._check = set()
+ me._sleep = set()
+ me._ready = set()
+ me._kidmap = {}
+ me._logkidmap = {}
+ me._njobs = 0
+
+ ## As well as the jobserver pipe, we implicitly have one extra job slot,
+ ## which is the one we took when we were started by our parent. The
+ ## right to do processing in this slot is represnted by the `private
+ ## token' here, distinguished from tokens from the jobserver pipe by
+ ## having `None' as its `_char' value.
+ me._privtoken = PrivateJobToken(me)
+
+ def add(me, job):
+ """Notice a new job and arrange for it to (try to) run."""
+ if job._known: return
+ spew("adding new job `%s'" % job.name)
+ job._known = True
+ me._check.add(job)
+ me._njobs += 1
+
+ def _killall(me):
+ """Zap all jobs which aren't yet running."""
+ for jobset in [me._sleep, me._check, me._ready]:
+ while jobset:
+ job = jobset.pop()
+ job.done = True
+ job.win = False
+ me._njobs -= 1
+
+ def _retire(me, job, win, outcome):
+ """
+ Declare that a job has stopped, and deal with the consequences.
+
+ JOB is the completed job, which should not be on any of the job queues.
+ WIN is true if the job succeeded, and false otherwise. OUTCOME is a
+ human-readable string explaining how the job came to its end, or `None'
+ if no message should be reported.
+ """
+
+ global RC
+
+ ## Return the job's token to the pool.
+ job._token.recycle()
+ job._token = None
+ me._njobs -= 1
+
+ ## Update and maybe report the job's status.
+ job.done = True
+ job.win = win
+ if outcome is not None and not OPT.silent:
+ print "%-24s %c (%s)" % (job.name, job.win and '|' or '*', outcome)
+
+ ## If the job failed, and we care, arrange to exit nonzero.
+ if not win and not OPT.ignerr: RC = 2
+
+ ## If the job failed, and we're supposed to give up after the first
+ ## error, then zap all of the waiting jobs.
+ if not job.win and not OPT.keepon and not OPT.ignerr: me._killall()
+
+ ## If this job has dependents then wake them up and see whether they're
+ ## ready to run.
+ for j in job._waiting:
+ try: me._sleep.remove(j)
+ except KeyError: pass
+ else:
+ spew("waking dependent job `%s'" % j.name)
+ me._check.add(j)
+
+ def _reap(me, kid, st):
+ """
+ Deal with the child with process-id KID having exited with status ST.
+ """
+
+ ## Maybe this is a logging child. Note that the logging child has
+ ## finished.
+ try: job = me._logkidmap[kid]
+ except KeyError: pass
+ else:
+ del me._logkidmap[kid]
+ job._logkid = DONE
+ if job.done and not job.win and OPT.quiet: job._logtail()
+ return
+
+ ## Find the job associated with this process-id.
+ try:
+ job = me._kidmap[kid]
+ except KeyError:
+ spew("unknown child %d exits with status 0x%04x" % (kid, st))
+ return
+
+ ## Remove the job from the list.
+ del me._kidmap[kid]
+
+ ## Update and (maybe) report the job status.
+ win = False
+ if OS.WIFSIGNALED(st): outcome = 'killed by signal %d' % OS.WTERMSIG(st)
+ elif OS.WIFEXITED(st):
+ rc = OS.WEXITSTATUS(st)
+ if rc: outcome = 'failed: rc = %d' % rc
+ else:
+ win = True
+ outcome = None
+ else: outcome = 'died with incomprehensible status 0x%04x' % st
+
+ ## Maybe print the job log tail.
+ if not win and OPT.quiet and job._logkid is DONE: job._logtail()
+
+ ## Retire the job.
+ me._retire(job, win, outcome)
+
+ def _reapkids(me):
+ """Reap all finished child processes."""
+ while True:
+ try: kid, st = OS.waitpid(-1, OS.WNOHANG)
+ except OSError, err:
+ if err.errno == E.ECHILD: break
+ else: raise
+ if kid == 0: break
+ me._reap(kid, st)
+
+ def run_job(me, job):
+ """Start running the JOB."""
+
+ ## Make pipes to collect the job's output and error reports.
+ r_out, w_out = OS.pipe()
+ r_err, w_err = OS.pipe()
+
+ ## Find a log file to write. Avoid races over the log names; but this
+ ## means that the log descriptor needs to be handled somewhat carefully.
+ logseq = 1
+ while True:
+ logfile = "log/%s-%s#%d" % (job.name, TODAY, logseq)
+ try:
+ logfd = OS.open(logfile, OS.O_WRONLY | OS.O_CREAT | OS.O_EXCL, 0666)
+ except OSError, err:
+ if err.errno == E.EEXIST: logseq += 1; continue
+ else: raise
+ else:
+ break
+ job._logfile = logfile
+
+ ## Make sure there's no pending output, or we might get two copies. (I
+ ## don't know how to flush all output streams in Python, but this is good
+ ## enough for our purposes.)
+ SYS.stdout.flush()
+
+ ## Set up the logging child first. If we can't, take down the whole job.
+ try: job._logkid = OS.fork()
+ except OSError, err: OS.close(logfd); return None, err
+ if not job._logkid:
+ ## The main logging loop.
+
+ ## Close the jobserver descriptors, and the write ends of the pipes.
+ if me._rfd != -1: OS.close(me._rfd)
+ if me._wfd != -1: OS.close(me._wfd)
+ OS.close(w_out); OS.close(w_err)
+
+ ## Make JobStreamLogger objects for the job's stdout and stderr.
+ jobinputs = [JobStreamLogger(fd, job.name, ch, logfd)
+ for ch, fd in [('|', r_out), ('*', r_err)]]
+
+ while True:
+ ## Wait for input arriving on the remaining pipes. Once a pipe
+ ## closes, the JobStreamLogger for that pipe will stop adding it to
+ ## `rfds', so we stop if `rfds' is an empty list.
+ rfds = []
+ for ji in jobinputs: ji.preselect(rfds)
+ if not rfds: break
+
+ ## Collect input from the active descriptors.
+ rfds, _, _ = SEL.select(rfds, [], [])
+ for fd in rfds:
+ for ji in jobinputs: ji.postselect(fd)
+
+ ## We're done. (Closing the descriptors here would be like polishing
+ ## the floors before the building is demolished.)
+ OS._exit(0)
+
+ ## Back in the main process: record the logging child. At this point we
+ ## no longer need the logfile descriptor.
+ me._logkidmap[job._logkid] = job
+ OS.close(logfd)
+
+ ## Start the main job process.
+ try: kid = OS.fork()
+ except OSError, err: return None, err
+ if not kid:
+ ## The main job.
+
+ ## Close the jobserver descriptors, and the read ends of the pipes, and
+ ## move the write ends to the right places. (This will go wrong if we
+ ## were started without enough descriptors. Fingers crossed.)
+ if me._rfd != -1: OS.close(me._rfd)
+ if me._wfd != -1: OS.close(me._wfd)
+ OS.dup2(w_out, 1); OS.dup2(w_err, 2)
+ OS.close(r_out); OS.close(w_out)
+ OS.close(r_err); OS.close(w_err)
+ spew("running job `%s' as pid %d" % (job.name, OS.getpid()))
+
+ ## Run the job, catching nonlocal flow.
+ try:
+ job.run()
+ except Exception, err:
+ moan("fatal Python exception: %s" % err)
+ OS._exit(2)
+ except BaseException, err:
+ moan("caught unexpected exception: %r" % err)
+ OS._exit(112)
+ else:
+ spew("job `%s' ran to completion" % job.name)
+ OS._exit(0)
+
+ ## Back in the main process: close both the pipes and return the child
+ ## process.
+ OS.close(r_out); OS.close(w_out)
+ OS.close(r_err); OS.close(w_err)
+ return kid, None
+
+ def run(me):
+ """Run the scheduler."""
+
+ spew("JobScheduler starts")
+
+ while True:
+ ## The main scheduler loop. We go through three main phases:
+ ##
+ ## * Inspect the jobs in the `check' list to see whether they can
+ ## run. After this, the `check' list will be empty.
+ ##
+ ## * If there are running jobs, check to see whether any of them have
+ ## stopped, and deal with the results. Also, if there are jobs
+ ## ready to start and a job token has become available, then
+ ## retrieve the token. (Doing these at the same time is the tricky
+ ## part.)
+ ##
+ ## * If there is a job ready to run, and we retrieved a token, then
+ ## start running the job.
+
+ ## Check the pending jobs to see if they can make progress: run each
+ ## job's `check' method and move it to the appropriate queue. (It's OK
+ ## if `check' methods add more jobs to the list, as long as things
+ ## settle down eventually.)
+ while True:
+ try: job = me._check.pop()
+ except KeyError: break
+ state, reason = job.check()
+ tail = reason is not None and ": %s" % reason or ""
+ if state == READY:
+ spew("job `%s' ready to run%s" % (job.name, tail))
+ me._ready.add(job)
+ elif state is FAILED:
+ spew("job `%s' refused to run%s" % (job.name, tail))
+ me._retire(job, False, "refused to run%s" % tail)
+ elif state is DONE:
+ spew("job `%s' has nothing to do%s" % (job.name, tail))
+ me._retire(job, True, reason)
+ elif state is SLEEP:
+ spew("job `%s' can't run yet%s" % (job.name, tail))
+ me._sleep.add(job)
+ else:
+ raise ValueError("unexpected job check from `%s': %r, %r" %
+ (job.name, state, reason))
+
+ ## If there are no jobs left, then we're done.
+ if not me._njobs:
+ spew("all jobs completed")
+ break
+
+ ## Make sure we can make progress. There are no jobs on the check list
+ ## any more, because we just cleared it. We assume that jobs which are
+ ## ready to run will eventually receive a token. So we only end up in
+ ## trouble if there are jobs asleep, but none running or ready to run.
+ ##spew("#jobs = %d" % me._njobs)
+ ##spew("sleeping: %s" % ", ".join([j.name for j in me._sleep]))
+ ##spew("ready: %s" % ", ".join([j.name for j in me._ready]))
+ ##spew("running: %s" % ", ".join([j.name for j in me._kidmap.itervalues()]))
+ assert not me._sleep or me._kidmap or me._ready
+
+ ## Wait for something to happen.
+ if not me._ready or (not me._par and me._privtoken is None):
+ ## If we have no jobs ready to run, then we must wait for an existing
+ ## child to exit. Hopefully, a sleeping job will be able to make
+ ## progress after this.
+ ##
+ ## Alternatively, if we're not supposed to be running jobs in
+ ## parallel and we don't have the private token, then we have no
+ ## choice but to wait for the running job to complete.
+ ##
+ ## There's no check here for `ECHILD'. We really shouldn't be here
+ ## if there are no children to wait for. (The check list must be
+ ## empty because we just drained it. If the ready list is empty,
+ ## then all of the jobs must be running or sleeping; but the
+ ## assertion above means that either there are no jobs at all, in
+ ## which case we should have stopped, or at least one is running, in
+ ## which case it's safe to wait for it. The other case is that we're
+ ## running jobs sequentially, and one is currently running, so
+ ## there's nothing for it but to wait for it -- and hope that it will
+ ## wake up one of the sleeping jobs. The remaining possibility is
+ ## that we've miscounted somewhere, which will cause a crash.)
+ if not me._ready:
+ spew("no new jobs ready: waiting for outstanding jobs to complete")
+ else:
+ spew("job running without parallelism: waiting for it to finish")
+ kid, st = OS.waitpid(-1, 0)
+ me._reap(kid, st)
+ me._reapkids()
+ continue
+
+ ## We have jobs ready to run, so try to acquire a token.
+ if me._rfd == -1 and me._par:
+ ## We're running with unlimited parallelism, so we don't need a token
+ ## to run a job.
+ spew("running new job without token")
+ token = None
+ elif me._privtoken:
+ ## Our private token is available, so we can use that to start
+ ## a new job.
+ spew("private token available: assigning to new job")
+ token = me._privtoken
+ me._privtoken = None
+ else:
+ ## We have to read from the jobserver pipe. Unfortunately, we're not
+ ## allowed to set the pipe nonblocking, because make is also using it
+ ## and will get into a serious mess. And we must deal with `SIGCHLD'
+ ## arriving at any moment. We use the same approach as GNU Make. We
+ ## start by making a copy of the jobserver descriptor: it's this
+ ## descriptor we actually try to read from. We set a signal handler
+ ## to close this descriptor if a child exits. And we try one last
+ ## time to reap any children which have exited just before we try
+ ## reading the jobserver pipe. This way we're covered:
+ ##
+ ## * If a child exits during the main loop, before we establish the
+ ## descriptor copy then we'll notice when we try reaping
+ ## children.
+ ##
+ ## * If a child exits between the last-chance reap and the read,
+ ## the signal handler will close the descriptor and the `read'
+ ## call will fail with `EBADF'.
+ ##
+ ## * If a child exits while we're inside the `read' system call,
+ ## then the syscall will fail with `EINTR'.
+ ##
+ ## The only problem is that we can't do this from Python, because
+ ## Python signal handlers are delayed. This is what the `jobclient'
+ ## module is for.
+ ##
+ ## The `jobclient' function is called as
+ ##
+ ## jobclient(FD)
+ ##
+ ## It returns a tuple of three values: TOKEN, PID, STATUS. If TOKEN
+ ## is not `None', then reading the pipe succeeded; if TOKEN is empty,
+ ## then the pipe returned EOF, so we should abort; otherwise, TOKEN
+ ## is a singleton string holding the token character. If PID is not
+ ## `None', then PID is the process id of a child which exited, and
+ ## STATUS is its exit status.
+ spew("waiting for token from jobserver")
+ tokch, kid, st = JC.jobclient(me._rfd)
+
+ if kid is not None:
+ me._reap(kid, st)
+ me._reapkids()
+ if tokch is None:
+ spew("no token; trying again")
+ continue
+ elif token == '':
+ error("jobserver pipe closed; giving up")
+ me._killall()
+ continue
+ spew("received token from jobserver")
+ token = JobToken(tokch)
+
+ ## We have a token, so we should start up the job.
+ job = me._ready.pop()
+ job._token = token
+ spew("start new job `%s'" % job.name)
+ kid, err = me.run_job(job)
+ if kid is None:
+ me._retire(job, False, "failed to fork: %s" % err)
+ continue
+ else:
+ me._kidmap[kid] = job
+
+ ## We ran out of work to do.
+ spew("JobScheduler done")
+
+###--------------------------------------------------------------------------
+### Process the configuration and options.
+
+CONFIG = {}
+R_CONFIG = RX.compile(r"^([a-zA-Z0-9_]+)='(.*)'$")
+def read_config():
+ raw = r"""
+ """; raw = open('state/config.sh').read(); _ignore = """ @@@config@@@
+ """
+ for line in raw.split('\n'):
+ line = line.strip()
+ if not line or line.startswith('#'): continue
+ m = R_CONFIG.match(line)
+ if not m: raise ExpectedError("bad config line `%s'" % line)
+ CONFIG[m.group(1)] = m.group(2).replace("'\\''", "'")
+
+OPTIONS = OP.OptionParser(usage = 'chroot-maint [-ikns] [-jN] CMD [ARGS...]')
+for short, long, props in [
+ ("-d", "--debug", {
+ 'dest': 'debug', 'default': False, 'action': 'store_true',
+ 'help': "print lots of debugging drivel" }),
+ ("-i", "--ignore-errors", {
+ 'dest': 'ignerr', 'default': False, 'action': 'store_true',
+ 'help': "ignore all errors encountered while processing" }),
+ ("-j", "--jobs", {
+ 'dest': 'njobs', 'metavar': 'N', 'default': 1, 'type': 'int',
+ 'help': 'run up to N jobs in parallel' }),
+ ("-J", "--forkbomb", {
+ 'dest': 'njobs', 'action': 'store_true',
+ 'help': 'run as many jobs in parallel as possible' }),
+ ("-k", "--keep-going", {
+ 'dest': 'keepon', 'default': False, 'action': 'store_true',
+ 'help': "keep going even if independent jobs fail" }),
+ ("-n", "--dry-run", {
+ 'dest': 'dryrun', 'default': False, 'action': 'store_true',
+ 'help': "don't actually do anything" }),
+ ("-q", "--quiet", {
+ 'dest': 'quiet', 'default': False, 'action': 'store_true',
+ 'help': "don't print the output from successful jobs" }),
+ ("-s", "--silent", {
+ 'dest': 'silent', 'default': False, 'action': 'store_true',
+ 'help': "don't print progress messages" })]:
+ OPTIONS.add_option(short, long, **props)
+
+###--------------------------------------------------------------------------
+### Main program.
+
+class SleepJob (BaseJob):
+ def __init__(me, nsec, label, deps = [], *arg, **kw):
+ super(SleepJob, me).__init__(*arg, **kw)
+ me._label = label
+ me._nsec = nsec
+ for j in deps: me.await(j)
+ def _mkname(me): return 'sleep.%s-%g' % (me._label, me._nsec)
+ def check(me):
+ state, reason = super(SleepJob, me).check()
+ if state is not READY: return state, reason
+ if me._nsec == 0: return DONE, None
+ elif OPT.dryrun: return DONE, "sleep for %gs" % me._nsec
+ return READY, None
+ def run(me):
+ OS.write(1, "Sleeping for %gs...\n" % me._nsec)
+ T.sleep(me._nsec)
+ OS.write(1, "Done.")
+
+class BadJob (BaseJob):
+ def __init__(me, deps, *arg, **kw):
+ super(BadJob, me).__init__(*arg, **kw)
+ for j in deps: me.await(j)
+ def _mkname(me): return 'bad'
+ def run(me):
+ for i in xrange(50):
+ print "blah blah %d blah" % i
+ print >>SYS.stderr, "blah ouch"
+ SYS.stdout.flush()
+ OS._exit(69)
+
+R_JOBSERV = RX.compile(r'^--jobserver-(?:fds|auth)=(\d+),(\d+)$')
+
+def coin(p = 0.5): return R.random() < p
+
+try:
+ OPT, args = OPTIONS.parse_args()
+ rfd, wfd = -1, -1
+ njobs = OPT.njobs
+ try: mkflags = OS.environ['MAKEFLAGS']
+ except KeyError: pass
+ else:
+ ff = mkflags.split()
+ for f in ff:
+ m = R_JOBSERV.match(f)
+ if m: rfd, wfd = int(m.group(1)), int(m.group(2))
+ elif f == '-j': njobs = None
+ elif not f.startswith('-'):
+ for ch in f:
+ if ch == 'i': OPT.ignerr = True
+ elif ch == 'k': OPT.keepon = True
+ elif ch == 'n': OPT.dryrun = True
+ elif ch == 's': OPT.silent = True
+ if OPT.njobs < 1:
+ raise ExpectedError("running no more than %d jobs is silly" % OPT.njobs)
+ read_config()
+ SCHED = JobScheduler(rfd, wfd, njobs)
+ bad = BadJob.ensure([])
+ l0 = [SleepJob.ensure(0.2, "j0.%d" % i)
+ for i in xrange(10)]
+ l1 = [SleepJob.ensure(0.5, "j1.%d" % i, [j for j in l0 if coin()])
+ for i in xrange(10)]
+ l2 = [SleepJob.ensure(1.0, "j2.%d" % i, [j for j in l1 if coin()] + [bad])
+ for i in xrange(10)]
+ SCHED.run()
+except ExpectedError, err:
+ error(err.message)
+finally:
+ run_cleanups()
+SYS.exit(RC)
+
+###----- That's all, folks --------------------------------------------------