From: Mark Wooding Date: Wed, 11 Sep 2019 16:52:48 +0000 (+0100) Subject: @@@ bin/chroot-maint: Program for maintaining chroots. X-Git-Url: http://www.chiark.greenend.org.uk/ucgi/~mdw/git/distorted-chroot/commitdiff_plain/7c2be0c62a152afdcfa565ef9fce5b1352ef62db @@@ bin/chroot-maint: Program for maintaining chroots. --- diff --git a/Makefile b/Makefile index 88f8443..1b4c567 100644 --- a/Makefile +++ b/Makefile @@ -351,6 +351,7 @@ $(PYMODULES): $(STATE)/lib/python/%.so: $$(call c-object,$$($$*_SOURCES)) ###-------------------------------------------------------------------------- ### Scripts. +SCRIPTS += chroot-maint SCRIPTS += mkbuildchroot SCRIPTS += mkchrootconf SCRIPTS += install-cross-tools update-cross-tools @@ -691,4 +692,7 @@ clean:: realclean:: clean rm -rf $(LOCAL) +maint: $(STATE)/bin/chroot-maint $(STATE)/lib/python/jobclient.so + +PYTHONPATH=$(STATE)/lib/python $(STATE)/bin/chroot-maint $(ARGS) + ###----- That's all, folks -------------------------------------------------- diff --git a/bin/chroot-maint b/bin/chroot-maint new file mode 100755 index 0000000..96451ba --- /dev/null +++ b/bin/chroot-maint @@ -0,0 +1,912 @@ +#! /usr/bin/python +### +### Create, upgrade, and maintain (native and cross-) chroots +### +### (c) 2018 Mark Wooding +### + +###----- Licensing notice --------------------------------------------------- +### +### This file is part of the distorted.org.uk chroot maintenance tools. +### +### distorted-chroot is free software: you can redistribute it and/or +### modify it under the terms of the GNU General Public License as +### published by the Free Software Foundation; either version 2 of the +### License, or (at your option) any later version. +### +### distorted-chroot is distributed in the hope that it will be useful, +### but WITHOUT ANY WARRANTY; without even the implied warranty of +### MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +### General Public License for more details. +### +### You should have received a copy of the GNU General Public License +### along with distorted-chroot. If not, write to the Free Software +### Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, +### USA. + +import errno as E +import fcntl as FC +import optparse as OP +import os as OS +import random as R +import re as RX +import signal as SIG +import select as SEL +import sys as SYS +import time as T + +import jobclient as JC + +QUIS = OS.path.basename(SYS.argv[0]) +TODAY = T.strftime("%Y-%m-%d") + +###-------------------------------------------------------------------------- +### Random utilities. + +class ExpectedError (Exception): + """A fatal error which shouldn't print a backtrace.""" + pass + +def spew(msg): + """Print MSG to stderr as a debug trace.""" + if OPT.debug: OS.write(2, ";; %s\n" % msg) + +RC = 0 +def moan(msg): + """Print MSG to stderr as a warning.""" + if not OPT.silent: OS.write(2, "%s: %s\n" % (QUIS, msg)) +def error(msg): + """Print MSG to stderr, and remember to exit nonzero.""" + global RC + moan(msg) + RC = 2 + +class Tag (object): + """Unique objects with no internal structure.""" + def __init__(me, label): me._label = label + def __str__(me): return '#<%s %s>' % (me.__class__.__name__, me._label) + def __repr__(me): return '#<%s %s>' % (me.__class__.__name__, me._label) + +###-------------------------------------------------------------------------- +### Running parallel jobs. + +## Return codes from `check' +SLEEP = Tag('SLEEP') +READY = Tag('READY') +FAILED = Tag('FAILED') +DONE = Tag('DONE') + +class BaseJob (object): + """ + Base class for jobs. + + Subclasses must implement `run' and `_mkname', and probably ought to extend + `check'. + """ + + ## A magic token to prevent sneaky uninterned jobs. + _MAGIC = Tag('MAGIC') + + ## A map from job names to objects. + _MAP = {} + + ## Number of tail lines of the log to print on failure. + LOGLINES = 20 + + def __init__(me, _token, *args, **kw): + """ + Initialize a job. + + Jobs are interned! Don't construct instances (of subclasses) directly: + use the `ensure' class method. + """ + assert token is me._MAGIC + super(BaseJob, me).__init__(*args, **kw) + + ## Dependencies on other jobs. + me._deps = set() + me._waiting = set() + + ## Attributes maintained by the JobServer + me.done = False + me.win = None + me._token = None + me._known = False + me._logkid = -1 + me._logfile = None + + @classmethod + def ensure(cls, *args, **kw): + """ + Return the unique job with the given parameters. + + If a matching job already exists, then return it. Otherwise, create the + new job, register it in the table, and notify the scheduler about it. + """ + me = cls(_token = cls._MAGIC, *args, **kw) + try: + job = cls._MAP[me.name] + except KeyError: + cls._MAP[me.name] = me + SCHED.add(me) + return me + else: + return job + + ## Naming. + @property + def name(me): + """Return the job's name, as calculated by `_mkname'.""" + try: name = me._name + except AttributeError: name = me._name = me._mkname() + return name + + ## Subclass responsibilities. + def _mkname(me): + """ + Return the job's name. + + By default, this is an unhelpful string which is distinct for every job. + Subclasses should normally override this method to return a name as an + injective function of the job parameters. + """ + return "%s.%x" % (me.__class__.__name__, id(me)) + + def check(me): + """ + Return whether the job is ready to run. + + Returns a pair STATE, REASON. The REASON is a human-readable string + explaining what's going on, or `None' if it's not worth explaining. The + STATE is one of the following. + + * `READY' -- the job can be run at any time. + + * `FAILED' -- the job can't be started. Usually, this means that some + prerequisite job failed, there was some error in the job's + parameters, or the environment is unsuitable for the job to run. + + * `DONE' -- the job has nothing to do. Usually, this means that the + thing the job acts on is already up-to-date. It's bad form to do + even minor work in `check'. + + * `SLEEP' -- the job can't be run right now. It has arranged to be + retried if conditions change. (Spurious wakeups are permitted and + must be handled correctly.) + + The default behaviour checks the set of dependencies, as built by the + `await' method, and returns `SLEEP' or `FAILED' as appropriate, or + `READY' if all the prerequisite jobs have completed successfully. + """ + for job in me._deps: + if not job.done: + job._waiting.add(me) + return SLEEP, "waiting for job `%s'" % job.name + elif not job.win and not OPT.ignerr: + return FAILED, "dependent on failed job `%s'" % job.name + return READY, None + + ## Subclass utilities. + def await(me, job): + """Make sure that JOB completes before allowing this job to start.""" + me._deps.add(job) + + def _logtail(me): + """ + Dump the last `LOGLINES' lines of the logfile. + + This is called if the job fails and was being run quietly, to provide the + user with some context for the failure. + """ + + ## Gather blocks from the end of the log until we have enough lines. + with open(me._logfile, 'r') as f: + nlines = 0 + bufs = [] + bufsz = 4096 + f.seek(0, 2); off = f.tell() + spew("start: off = %d" % off) + while nlines <= me.LOGLINES and off > 0: + off = max(0, off - bufsz) + f.seek(off, 0) + spew("try at off = %d" % off) + buf = f.read(bufsz) + nlines += buf.count('\n') + spew("now lines = %d" % nlines) + bufs.append(buf) + buf = ''.join(reversed(bufs)) + + ## We probably overshot. Skip the extra lines from the start. + i = 0 + while nlines > me.LOGLINES: i = buf.index('\n', i) + 1; nlines -= 1 + + ## If we ended up trimming the log, print an ellipsis. + if off > 0 or i > 0: print "%-24s * [...]" % me.name + + ## Print the log tail. + lines = buf[i:].split('\n') + if lines and lines[-1] == '': lines.pop() + for line in lines: print "%-24s %s" % (me.name, line) + +class BaseJobToken (object): + """ + A job token is the authorization for a job to be run. + + Subclasses must implement `recycle' to allow some other job to use the + token. + """ + pass + +class TrivialJobToken (BaseJobToken): + """ + A trivial reusable token, for when issuing jobs in parallel without limit. + + There only needs to be one of these. + """ + def recycle(me): pass +TRIVIAL_TOKEN = TrivialJobToken() + +class JobServerToken (BaseJobToken): + """A job token storing a byte from the jobserver pipe.""" + def __init__(me, char, pipefd, *arg, **kw): + super(JobServerToken, me).__init__(*arg, **kw) + me._char = char + me._fd = pipefd + def recycle(me): + OS.write(me._fd, me._char) + +class PrivateJobToken (BaseJobToken): + """ + The private job token belonging to a scheduler. + + When running under a GNU Make jobserver, there is a token for each byte in + the pipe, and an additional one which represents the slot we're actually + running in. This class represents that additional token. + """ + def __init__(me, sched, *arg, **kw): + super(JobServerToken, me).__init__(*arg, **kw) + me._sched = sched + def recycle(me): + assert me._sched._privtoken is None + me._sched._privtoken = me + +class JobStreamLogger (object): + """Log an output stream from a job, to stdout, and into a logfile.""" + def __init__(me, fd, tag, marker, logfd = -1): + """ + Initialize the JobStreamLogger. + + * FD is the descriptor to read. It will be made nonblocking. + + * LOGFD is the logfile descriptor to write, or -1 if we're not + logging. + + * The TAG and MARKER are used to format the log messages. + + If `OPT.quiet' is false, then lines read from FD are written to standard + output as + + TAG X LINE + + where X is the MARKER character used to distinguish this output stream, + because they're all interleaved in the logfile. The TAG (and following + space) is not written to the logfile, since the file's name is sufficient + to explain what's going on. + + See `preselect' and `postselect' for the protocol. + """ + + ## Store the parameters. + me._fd = fd + me._logfd = logfd + me._tag = tag + me._marker = marker + + ## Make the descriptor nonblocking. + FC.fcntl(fd, FC.F_SETFL, FC.fcntl(fd, FC.F_GETFL) | OS.O_NONBLOCK) + + ## Clear the line buffer. + me._buf = "" + + def preselect(me, rfds): + """ + Augment the list RFDS with our input descriptor, if it's still active. + + If the list is empty then the logging process will quit. + """ + if me._fd >= 0: rfds.append(me._fd) + + def postselect(me, fd): + """Read and process input if FD is our descriptor.""" + + ## Check the descriptor. + if fd != me._fd: return + + ## If we encounter end-of-file, we'll close. Stop then, or if we run out + ## of stuff. + while me._fd >= 0: + + ## Try to read some data. If there's nothing to read then we're done. + try: + buf = OS.read(me._fd, 4096) + except OSError, err: + if err.errno == E.EAGAIN or err.errno == E.WOULDBLOCK: break + else: raise + + ## Work out what to do. + if buf == "": + ## We've encountered end-of-file. Close the descriptor. If there's + ## a final unterminated line in the buffer, then we'll have to drain + ## it. + + OS.close(me._fd); me._fd = -1 + if me._buf == "": lines = [] + else: lines = [me._buf]; me._buf = "" + + else: + ## Some data arrived. Append it to our existing buffer, and gather + ## any complete lines, leaving the final stub behind for next time. + + lines = (me._buf + buf).split('\n') + lines, me._buf = lines[:-1], lines[-1] + + ## Format and print the lines we found. + for line in lines: + if not OPT.quiet: + OS.write(1, '%-24s %s %s\n' % (me._tag, me._marker, line)) + if me._logfd != -1: + OS.write(me._logfd, '%s %s\n' % (me._marker, line)) + +class JobScheduler (object): + """ + The main machinery for running and ordering jobs. + + This handles all of the details of job scheduling. + """ + + def __init__(me, rfd = -1, wfd = -1, npar = 1): + """ + Initialize a scheduler. + + * RFD and WFD are the read and write ends of the jobserver pipe, as + determined from the `MAKEFLAGS' environment variable, or -1. + + * NPAR is the maximum number of jobs to run in parallel, or `True' if + there is no maximum (i.e., we're in `forkbomb' mode). + """ + + ## Set the parallelism state. The `_rfd' and `_wfd' are the read and + ## write ends of the jobserver pipe, or -1 if there is no jobserver. + ## `_par' is true if we're meant to run jobs in parallel. The case _par + ## and _rfd = -1 means unconstrained parallelism. + ## + ## The jobserver pipe contains a byte for each shared job slot. A + ## scheduler reads a byte from the pipe for each job it wants to run + ## (nearly -- see `_privtoken' below), and puts the byte back when the + ## job finishes. The GNU Make jobserver protocol specification insists + ## that we preserve the value of the byte in the pipe (though doesn't + ## currently make any use of this flexibility), so we record it in a + ## `JobToken' object's `_char' attribute. + me._par = rfd != -1 or npar is True or npar != 1 + spew("par is %r" % me._par) + if rfd == -1 and npar > 1: + rfd, wfd = OS.pipe() + OS.write(wfd, (npar - 1)*'+') + me._rfd = rfd; me._wfd = wfd + + ## The scheduler state. A job starts in the `_check' list. Each + ## iteration of the scheduler loop will inspect the jobs here and see + ## whether it's ready to run: if not, it gets put in the `_sleep' list, + ## where it will languish until something moves it back; if it is ready, + ## it gets moved to the `_ready' list to wait for a token from the + ## jobserver. At that point the job can be started, and it moves to the + ## `_kidmap', which associates a process-id with each running job. + ## Finally, jobs which have completed are simply forgotten. The `_njobs' + ## counter keeps track of how many jobs are outstanding, so that we can + ## stop when there are none left. + me._check = set() + me._sleep = set() + me._ready = set() + me._kidmap = {} + me._logkidmap = {} + me._njobs = 0 + + ## As well as the jobserver pipe, we implicitly have one extra job slot, + ## which is the one we took when we were started by our parent. The + ## right to do processing in this slot is represnted by the `private + ## token' here, distinguished from tokens from the jobserver pipe by + ## having `None' as its `_char' value. + me._privtoken = PrivateJobToken(me) + + def add(me, job): + """Notice a new job and arrange for it to (try to) run.""" + if job._known: return + spew("adding new job `%s'" % job.name) + job._known = True + me._check.add(job) + me._njobs += 1 + + def _killall(me): + """Zap all jobs which aren't yet running.""" + for jobset in [me._sleep, me._check, me._ready]: + while jobset: + job = jobset.pop() + job.done = True + job.win = False + me._njobs -= 1 + + def _retire(me, job, win, outcome): + """ + Declare that a job has stopped, and deal with the consequences. + + JOB is the completed job, which should not be on any of the job queues. + WIN is true if the job succeeded, and false otherwise. OUTCOME is a + human-readable string explaining how the job came to its end, or `None' + if no message should be reported. + """ + + global RC + + ## Return the job's token to the pool. + job._token.recycle() + job._token = None + me._njobs -= 1 + + ## Update and maybe report the job's status. + job.done = True + job.win = win + if outcome is not None and not OPT.silent: + print "%-24s %c (%s)" % (job.name, job.win and '|' or '*', outcome) + + ## If the job failed, and we care, arrange to exit nonzero. + if not win and not OPT.ignerr: RC = 2 + + ## If the job failed, and we're supposed to give up after the first + ## error, then zap all of the waiting jobs. + if not job.win and not OPT.keepon and not OPT.ignerr: me._killall() + + ## If this job has dependents then wake them up and see whether they're + ## ready to run. + for j in job._waiting: + try: me._sleep.remove(j) + except KeyError: pass + else: + spew("waking dependent job `%s'" % j.name) + me._check.add(j) + + def _reap(me, kid, st): + """ + Deal with the child with process-id KID having exited with status ST. + """ + + ## Maybe this is a logging child. Note that the logging child has + ## finished. + try: job = me._logkidmap[kid] + except KeyError: pass + else: + del me._logkidmap[kid] + job._logkid = DONE + if job.done and not job.win and OPT.quiet: job._logtail() + return + + ## Find the job associated with this process-id. + try: + job = me._kidmap[kid] + except KeyError: + spew("unknown child %d exits with status 0x%04x" % (kid, st)) + return + + ## Remove the job from the list. + del me._kidmap[kid] + + ## Update and (maybe) report the job status. + win = False + if OS.WIFSIGNALED(st): outcome = 'killed by signal %d' % OS.WTERMSIG(st) + elif OS.WIFEXITED(st): + rc = OS.WEXITSTATUS(st) + if rc: outcome = 'failed: rc = %d' % rc + else: + win = True + outcome = None + else: outcome = 'died with incomprehensible status 0x%04x' % st + + ## Maybe print the job log tail. + if not win and OPT.quiet and job._logkid is DONE: job._logtail() + + ## Retire the job. + me._retire(job, win, outcome) + + def _reapkids(me): + """Reap all finished child processes.""" + while True: + try: kid, st = OS.waitpid(-1, OS.WNOHANG) + except OSError, err: + if err.errno == E.ECHILD: break + else: raise + if kid == 0: break + me._reap(kid, st) + + def run_job(me, job): + """Start running the JOB.""" + + ## Make pipes to collect the job's output and error reports. + r_out, w_out = OS.pipe() + r_err, w_err = OS.pipe() + + ## Find a log file to write. Avoid races over the log names; but this + ## means that the log descriptor needs to be handled somewhat carefully. + logseq = 1 + while True: + logfile = "log/%s-%s#%d" % (job.name, TODAY, logseq) + try: + logfd = OS.open(logfile, OS.O_WRONLY | OS.O_CREAT | OS.O_EXCL, 0666) + except OSError, err: + if err.errno == E.EEXIST: logseq += 1; continue + else: raise + else: + break + job._logfile = logfile + + ## Make sure there's no pending output, or we might get two copies. (I + ## don't know how to flush all output streams in Python, but this is good + ## enough for our purposes.) + SYS.stdout.flush() + + ## Set up the logging child first. If we can't, take down the whole job. + try: job._logkid = OS.fork() + except OSError, err: OS.close(logfd); return None, err + if not job._logkid: + ## The main logging loop. + + ## Close the jobserver descriptors, and the write ends of the pipes. + if me._rfd != -1: OS.close(me._rfd) + if me._wfd != -1: OS.close(me._wfd) + OS.close(w_out); OS.close(w_err) + + ## Make JobStreamLogger objects for the job's stdout and stderr. + jobinputs = [JobStreamLogger(fd, job.name, ch, logfd) + for ch, fd in [('|', r_out), ('*', r_err)]] + + while True: + ## Wait for input arriving on the remaining pipes. Once a pipe + ## closes, the JobStreamLogger for that pipe will stop adding it to + ## `rfds', so we stop if `rfds' is an empty list. + rfds = [] + for ji in jobinputs: ji.preselect(rfds) + if not rfds: break + + ## Collect input from the active descriptors. + rfds, _, _ = SEL.select(rfds, [], []) + for fd in rfds: + for ji in jobinputs: ji.postselect(fd) + + ## We're done. (Closing the descriptors here would be like polishing + ## the floors before the building is demolished.) + OS._exit(0) + + ## Back in the main process: record the logging child. At this point we + ## no longer need the logfile descriptor. + me._logkidmap[job._logkid] = job + OS.close(logfd) + + ## Start the main job process. + try: kid = OS.fork() + except OSError, err: return None, err + if not kid: + ## The main job. + + ## Close the jobserver descriptors, and the read ends of the pipes, and + ## move the write ends to the right places. (This will go wrong if we + ## were started without enough descriptors. Fingers crossed.) + if me._rfd != -1: OS.close(me._rfd) + if me._wfd != -1: OS.close(me._wfd) + OS.dup2(w_out, 1); OS.dup2(w_err, 2) + OS.close(r_out); OS.close(w_out) + OS.close(r_err); OS.close(w_err) + spew("running job `%s' as pid %d" % (job.name, OS.getpid())) + + ## Run the job, catching nonlocal flow. + try: + job.run() + except Exception, err: + moan("fatal Python exception: %s" % err) + OS._exit(2) + except BaseException, err: + moan("caught unexpected exception: %r" % err) + OS._exit(112) + else: + spew("job `%s' ran to completion" % job.name) + OS._exit(0) + + ## Back in the main process: close both the pipes and return the child + ## process. + OS.close(r_out); OS.close(w_out) + OS.close(r_err); OS.close(w_err) + return kid, None + + def run(me): + """Run the scheduler.""" + + spew("JobScheduler starts") + + while True: + ## The main scheduler loop. We go through three main phases: + ## + ## * Inspect the jobs in the `check' list to see whether they can + ## run. After this, the `check' list will be empty. + ## + ## * If there are running jobs, check to see whether any of them have + ## stopped, and deal with the results. Also, if there are jobs + ## ready to start and a job token has become available, then + ## retrieve the token. (Doing these at the same time is the tricky + ## part.) + ## + ## * If there is a job ready to run, and we retrieved a token, then + ## start running the job. + + ## Check the pending jobs to see if they can make progress: run each + ## job's `check' method and move it to the appropriate queue. (It's OK + ## if `check' methods add more jobs to the list, as long as things + ## settle down eventually.) + while True: + try: job = me._check.pop() + except KeyError: break + state, reason = job.check() + tail = reason is not None and ": %s" % reason or "" + if state == READY: + spew("job `%s' ready to run%s" % (job.name, tail)) + me._ready.add(job) + elif state is FAILED: + spew("job `%s' refused to run%s" % (job.name, tail)) + me._retire(job, False, "refused to run%s" % tail) + elif state is DONE: + spew("job `%s' has nothing to do%s" % (job.name, tail)) + me._retire(job, True, reason) + elif state is SLEEP: + spew("job `%s' can't run yet%s" % (job.name, tail)) + me._sleep.add(job) + else: + raise ValueError("unexpected job check from `%s': %r, %r" % + (job.name, state, reason)) + + ## If there are no jobs left, then we're done. + if not me._njobs: + spew("all jobs completed") + break + + ## Make sure we can make progress. There are no jobs on the check list + ## any more, because we just cleared it. We assume that jobs which are + ## ready to run will eventually receive a token. So we only end up in + ## trouble if there are jobs asleep, but none running or ready to run. + ##spew("#jobs = %d" % me._njobs) + ##spew("sleeping: %s" % ", ".join([j.name for j in me._sleep])) + ##spew("ready: %s" % ", ".join([j.name for j in me._ready])) + ##spew("running: %s" % ", ".join([j.name for j in me._kidmap.itervalues()])) + assert not me._sleep or me._kidmap or me._ready + + ## Wait for something to happen. + if not me._ready or (not me._par and me._privtoken is None): + ## If we have no jobs ready to run, then we must wait for an existing + ## child to exit. Hopefully, a sleeping job will be able to make + ## progress after this. + ## + ## Alternatively, if we're not supposed to be running jobs in + ## parallel and we don't have the private token, then we have no + ## choice but to wait for the running job to complete. + ## + ## There's no check here for `ECHILD'. We really shouldn't be here + ## if there are no children to wait for. (The check list must be + ## empty because we just drained it. If the ready list is empty, + ## then all of the jobs must be running or sleeping; but the + ## assertion above means that either there are no jobs at all, in + ## which case we should have stopped, or at least one is running, in + ## which case it's safe to wait for it. The other case is that we're + ## running jobs sequentially, and one is currently running, so + ## there's nothing for it but to wait for it -- and hope that it will + ## wake up one of the sleeping jobs. The remaining possibility is + ## that we've miscounted somewhere, which will cause a crash.) + if not me._ready: + spew("no new jobs ready: waiting for outstanding jobs to complete") + else: + spew("job running without parallelism: waiting for it to finish") + kid, st = OS.waitpid(-1, 0) + me._reap(kid, st) + me._reapkids() + continue + + ## We have jobs ready to run, so try to acquire a token. + if me._rfd == -1 and me._par: + ## We're running with unlimited parallelism, so we don't need a token + ## to run a job. + spew("running new job without token") + token = None + elif me._privtoken: + ## Our private token is available, so we can use that to start + ## a new job. + spew("private token available: assigning to new job") + token = me._privtoken + me._privtoken = None + else: + ## We have to read from the jobserver pipe. Unfortunately, we're not + ## allowed to set the pipe nonblocking, because make is also using it + ## and will get into a serious mess. And we must deal with `SIGCHLD' + ## arriving at any moment. We use the same approach as GNU Make. We + ## start by making a copy of the jobserver descriptor: it's this + ## descriptor we actually try to read from. We set a signal handler + ## to close this descriptor if a child exits. And we try one last + ## time to reap any children which have exited just before we try + ## reading the jobserver pipe. This way we're covered: + ## + ## * If a child exits during the main loop, before we establish the + ## descriptor copy then we'll notice when we try reaping + ## children. + ## + ## * If a child exits between the last-chance reap and the read, + ## the signal handler will close the descriptor and the `read' + ## call will fail with `EBADF'. + ## + ## * If a child exits while we're inside the `read' system call, + ## then the syscall will fail with `EINTR'. + ## + ## The only problem is that we can't do this from Python, because + ## Python signal handlers are delayed. This is what the `jobclient' + ## module is for. + ## + ## The `jobclient' function is called as + ## + ## jobclient(FD) + ## + ## It returns a tuple of three values: TOKEN, PID, STATUS. If TOKEN + ## is not `None', then reading the pipe succeeded; if TOKEN is empty, + ## then the pipe returned EOF, so we should abort; otherwise, TOKEN + ## is a singleton string holding the token character. If PID is not + ## `None', then PID is the process id of a child which exited, and + ## STATUS is its exit status. + spew("waiting for token from jobserver") + tokch, kid, st = JC.jobclient(me._rfd) + + if kid is not None: + me._reap(kid, st) + me._reapkids() + if tokch is None: + spew("no token; trying again") + continue + elif token == '': + error("jobserver pipe closed; giving up") + me._killall() + continue + spew("received token from jobserver") + token = JobToken(tokch) + + ## We have a token, so we should start up the job. + job = me._ready.pop() + job._token = token + spew("start new job `%s'" % job.name) + kid, err = me.run_job(job) + if kid is None: + me._retire(job, False, "failed to fork: %s" % err) + continue + else: + me._kidmap[kid] = job + + ## We ran out of work to do. + spew("JobScheduler done") + +###-------------------------------------------------------------------------- +### Process the configuration and options. + +CONFIG = {} +R_CONFIG = RX.compile(r"^([a-zA-Z0-9_]+)='(.*)'$") +def read_config(): + raw = r""" + """; raw = open('state/config.sh').read(); _ignore = """ @@@config@@@ + """ + for line in raw.split('\n'): + line = line.strip() + if not line or line.startswith('#'): continue + m = R_CONFIG.match(line) + if not m: raise ExpectedError("bad config line `%s'" % line) + CONFIG[m.group(1)] = m.group(2).replace("'\\''", "'") + +OPTIONS = OP.OptionParser(usage = 'chroot-maint [-ikns] [-jN] CMD [ARGS...]') +for short, long, props in [ + ("-d", "--debug", { + 'dest': 'debug', 'default': False, 'action': 'store_true', + 'help': "print lots of debugging drivel" }), + ("-i", "--ignore-errors", { + 'dest': 'ignerr', 'default': False, 'action': 'store_true', + 'help': "ignore all errors encountered while processing" }), + ("-j", "--jobs", { + 'dest': 'njobs', 'metavar': 'N', 'default': 1, 'type': 'int', + 'help': 'run up to N jobs in parallel' }), + ("-J", "--forkbomb", { + 'dest': 'njobs', 'action': 'store_true', + 'help': 'run as many jobs in parallel as possible' }), + ("-k", "--keep-going", { + 'dest': 'keepon', 'default': False, 'action': 'store_true', + 'help': "keep going even if independent jobs fail" }), + ("-n", "--dry-run", { + 'dest': 'dryrun', 'default': False, 'action': 'store_true', + 'help': "don't actually do anything" }), + ("-q", "--quiet", { + 'dest': 'quiet', 'default': False, 'action': 'store_true', + 'help': "don't print the output from successful jobs" }), + ("-s", "--silent", { + 'dest': 'silent', 'default': False, 'action': 'store_true', + 'help': "don't print progress messages" })]: + OPTIONS.add_option(short, long, **props) + +###-------------------------------------------------------------------------- +### Main program. + +class SleepJob (BaseJob): + def __init__(me, nsec, label, deps = [], *arg, **kw): + super(SleepJob, me).__init__(*arg, **kw) + me._label = label + me._nsec = nsec + for j in deps: me.await(j) + def _mkname(me): return 'sleep.%s-%g' % (me._label, me._nsec) + def check(me): + state, reason = super(SleepJob, me).check() + if state is not READY: return state, reason + if me._nsec == 0: return DONE, None + elif OPT.dryrun: return DONE, "sleep for %gs" % me._nsec + return READY, None + def run(me): + OS.write(1, "Sleeping for %gs...\n" % me._nsec) + T.sleep(me._nsec) + OS.write(1, "Done.") + +class BadJob (BaseJob): + def __init__(me, deps, *arg, **kw): + super(BadJob, me).__init__(*arg, **kw) + for j in deps: me.await(j) + def _mkname(me): return 'bad' + def run(me): + for i in xrange(50): + print "blah blah %d blah" % i + print >>SYS.stderr, "blah ouch" + SYS.stdout.flush() + OS._exit(69) + +R_JOBSERV = RX.compile(r'^--jobserver-(?:fds|auth)=(\d+),(\d+)$') + +def coin(p = 0.5): return R.random() < p + +try: + OPT, args = OPTIONS.parse_args() + rfd, wfd = -1, -1 + njobs = OPT.njobs + try: mkflags = OS.environ['MAKEFLAGS'] + except KeyError: pass + else: + ff = mkflags.split() + for f in ff: + m = R_JOBSERV.match(f) + if m: rfd, wfd = int(m.group(1)), int(m.group(2)) + elif f == '-j': njobs = None + elif not f.startswith('-'): + for ch in f: + if ch == 'i': OPT.ignerr = True + elif ch == 'k': OPT.keepon = True + elif ch == 'n': OPT.dryrun = True + elif ch == 's': OPT.silent = True + if OPT.njobs < 1: + raise ExpectedError("running no more than %d jobs is silly" % OPT.njobs) + read_config() + SCHED = JobScheduler(rfd, wfd, njobs) + bad = BadJob.ensure([]) + l0 = [SleepJob.ensure(0.2, "j0.%d" % i) + for i in xrange(10)] + l1 = [SleepJob.ensure(0.5, "j1.%d" % i, [j for j in l0 if coin()]) + for i in xrange(10)] + l2 = [SleepJob.ensure(1.0, "j2.%d" % i, [j for j in l1 if coin()] + [bad]) + for i in xrange(10)] + SCHED.run() +except ExpectedError, err: + error(err.message) +finally: + run_cleanups() +SYS.exit(RC) + +###----- That's all, folks --------------------------------------------------