#!/usr/bin/python2.4 # usage: # adt-run ... --- [...] # # invoke in toplevel of package (not necessarily built) # with package installed # exit status: # 0 all tests passed # 4 at least one test failed # 8 no tests in this package # 12 erroneous package # 16 testbed failure # 20 other unexpected failures including bad usage import signal import optparse import tempfile import sys import subprocess import traceback import urllib import string import re as regexp from optparse import OptionParser tmpdir = None testbed = None signal.signal(signal.SIGINT, signal.SIG_DFL) # undo stupid Python SIGINT thing class Quit: def __init__(q,ec,m): q.ec = ec; q.m = m def bomb(m): raise Quit(20, "unexpected error: %s" % m) def badpkg(m): raise Quit(12, "erroneous package: %s" % m) def debug(m): global opts if not opts.debug: return print >>sys.stderr, 'atd-run: debug:', m class Path: def __init__(p, tb, path, what, dir=False): p.tb = tb p.p = path p.what = what p.dir = dir if p.tb: if p.p[:1] != '/': bomb("path %s specified as being in testbed but" " not absolute: `%s'" % (what, p.p)) p.local = None else: p.local = p.p if p.dir: p.dirsfx = '/' else: p.dirsfx = '' def path(p): return p.p + p.dirsfx def append(p, suffix, what, dir=False): return Path(p.tb, p.path() + suffix, what=what, dir=dir) def __str__(p): if p.tb: pfx = '/VIRT' elif p.p[:1] == '/': pfx = '/HOST' else: pfx = './' return pfx + p.p def onhost(p): if not p.tb: return p.p if p.local is not None: return p.local testbed.open() p.local = tmpdir + '/tb.' + p.what testbed.command('copyup', (p.path(), p.local + p.dirsfx)) return p.local def parse_args(): global opts usage = "%prog -- ..." parser = OptionParser(usage=usage) pa = parser.add_option pe = parser.add_option def cb_vserv(op,optstr,value,parser): parser.values.vserver = list(parser.rargs) del parser.rargs[:] def cb_path(op,optstr,value,parser, long,tb,dir): name = long.replace('-','_') parser.values.__dict__[name] = Path(tb, value, long, dir) def pa_path(long, dir, help): def papa_tb(long, ca, pahelp): pa('', long, action='callback', callback=cb_path, nargs=1, type='string', callback_args=ca, help=(help % pahelp), metavar='PATH') papa_tb('--'+long, (long, False, dir), 'host') papa_tb('--'+long+'-tb',(long, True, dir), 'testbed') pa_path('build-tree', True, 'use build tree from PATH on %s') pa_path('control', False, 'read control file PATH on %s') pa('-d', '--debug', action='store_true', dest='debug'); pa('','--user', type='string', help='run tests as USER (needs root on testbed)') class SpecialOption(optparse.Option): pass vs_op = SpecialOption('','--VSERVER-DUMMY') vs_op.action = 'callback' vs_op.type = None vs_op.default = None vs_op.nargs = 0 vs_op.callback = cb_vserv vs_op.callback_args = ( ) vs_op.callback_kwargs = { } vs_op.help = 'introduces virtualisation server and args' vs_op._short_opts = [] #vs_op._long_opts = ['--DUMMY'] vs_op._long_opts = ['---'] pa(vs_op) (opts,args) = parser.parse_args() if not hasattr(opts,'vserver'): parser.error('you must specifiy --- ...') if opts.build_tree is None: opts.build_tree = Path(False, '.', 'build-tree', dir=True) if opts.control is None: opts.control = opts.build_tree.append( 'debian/tests/control', 'control') class Testbed: def __init__(tb): tb.sp = None tb.lastsend = None tb.scratch = None def start(tb): p = subprocess.PIPE tb.sp = subprocess.Popen(opts.vserver, stdin=p, stdout=p, stderr=None) tb.expect('ok') def stop(tb): tb.close() if tb.sp is None: return ec = tb.sp.returncode if ec is None: tb.sp.stdout.close() tb.send('quit') tb.sp.stdin.close() ec = tb.sp.wait() if ec: tb.bomb('testbed gave exit status %d after quit' % ec) def open(tb): if tb.scratch is not None: return p = tb.commandr1('open') tb.scratch = Path(True, p, 'tb-scratch', dir=True) def close(tb): if tb.scratch is None: return tb.scratch = None tb.command('close') def bomb(tb, m): if tb.sp is not None: tb.sp.stdout.close() tb.sp.stdin.close() ec = tb.sp.wait() if ec: print >>sys.stderr, ('adt-run: testbed failing,' ' exit status %d' % ec) tb.sp = None raise Quit(16, 'testbed failed: %s' % m) def send(tb, string): try: debug('>> '+string) print >>tb.sp.stdin, string tb.sp.stdin.flush() tb.lastsend = string except: tb.bomb('cannot send to testbed: %s' % formatexception_only(sys.last_type, sys.last_value)) def expect(tb, keyword, nresults=-1): l = tb.sp.stdout.readline() if not l: tb.bomb('unexpected eof from the testbed') if not l.endswith('\n'): tb.bomb('unterminated line from the testbed') l = l.rstrip('\n') debug('<< '+l) ll = l.split() if not ll: tb.bomb('unexpected whitespace-only line from the testbed') if ll[0] != keyword: if tb.lastsend is None: tb.bomb("got banner `%s', expected `%s...'" % (l, keyword)) else: tb.bomb("sent `%s', got `%s', expected `%s...'" % (tb.lastsend, l, keyword)) ll = ll[1:] if nresults >= 0 and len(ll) != nresults: tb.bomb("sent `%s', got `%s' (%d result parameters)," " expected %d result parameters" % (string, l, len(ll), nresults)) return ll def commandr(tb, cmd, nresults, args=()): al = [cmd] + map(urllib.quote, args) tb.send(string.join(al)) ll = tb.expect('ok') rl = map(urllib.unquote, ll) return rl def command(tb, cmd, args=()): tb.commandr(cmd, 0, args) def commandr1(tb, cmd, args=()): rl = tb.commandr(cmd, 1, args) return rl[0] def read_control(): global tests control = file(opts.control.onhost(), 'r') lno = 0 def badctrl(m): testbed.badpkg('tests/control line %d: %s' % (lno, m)) hmap = None # hmap[header_name][index] = (lno, value) stanzas = [ ] def end_stanza(): if hmap is None: continue stanzas.append(hmap) hmap = None hcurrent = None initre = regexp.compile('([A-Z][-0-9a-z]*)\s*\:\s*(.*)$') while 1: l = control.readline() if not l: break lno++ if not l.endswith('\n'): badctrl('unterminated line') if regexp.compile('\s*\#').match(l): continue if not regexp.compile('\S').match(l): end_stanza(); continue initmat = initre.match(l) if initmat: (hname, l) = initmat.groups() hname = capwords(hname) if hmap is None: hmap = { ' lno' => lno } if not haskey(hmap, hname): hmap[hname] = [ ] hcurrent = hmap[hname] elif regexp.compile('\s').match(l): if not hcurrent: badctrl('unexpected continuation') else: badctrl('syntax error') hcurrent.append((lno, l)) end_stanza() def mergesplit(v): return string.join(v).split() for stz in stanzas: try: tests = stz['Tests'] except KeyError: report_unsupported_test('*', 'no Tests field (near control file line %d)' % stz[lno]) continue tests = mergesplit(tests) base = { } restrictions = mergesplit(stz.get('Restrictions',[])) for rname in restrictions: try: rr = globals()['Restriction_'+rname] except KeyError: for t in tests: report_unsupported_test(t, 'unsupported restriction %s' % rname) continue if rstr in ['needs-root base['restrictions'] = restrictions base.testsdir = oneonly(Tests-directory: try: hcurrent hmap[hname].append(l) if : pass elif tb.badpkg('unterminated line in control') testbed.close() def print_exception(ei, msgprefix=''): if msgprefix: print >>sys.stderr, msgprefix (et, q, tb) = ei if et is Quit: print >>sys.stderr, 'adt-run:', q.m return q.ec else: print >>sys.stderr, "adt-run: unexpected, exceptional, error:" traceback.print_exc() return 20 def cleanup(): try: rm_ec = 0 if tmpdir is not None: rm_ec = subprocess.call(['rm','-rf','--',tmpdir]) if testbed is not None: testbed.stop() if rm_ec: bomb('rm -rf -- %s failed, code %d' % (tmpdir, ec)) except: print_exception(sys.exc_info(), '\nadt-run: error cleaning up:\n') sys.exit(20) def main(): global testbed global tmpdir try: parse_args() tmpdir = tempfile.mkdtemp() testbed = Testbed() testbed.start() read_control() except: ec = print_exception(sys.exc_info(), '') cleanup() sys.exit(ec) cleanup() main()