#!/usr/bin/python2.4 # usage: # adt-run ... --- [...] # # invoke in toplevel of package (not necessarily built) # with package installed # exit status: # 0 all tests passed # 4 at least one test failed # 8 no tests in this package # 12 erroneous package # 16 testbed failure # 20 other unexpected failures including bad usage import signal import optparse import tempfile import sys import subprocess import traceback import urllib import string import re as regexp from optparse import OptionParser tmpdir = None testbed = None signal.signal(signal.SIGINT, signal.SIG_DFL) # undo stupid Python SIGINT thing class Quit: def __init__(q,ec,m): q.ec = ec; q.m = m def bomb(m): raise Quit(20, "unexpected error: %s" % m) def badpkg(m): raise Quit(12, "erroneous package: %s" % m) class Unsupported: def __init__(u, lno, m): if lno >= 0: u.m = '%s (control line %d)' % (m, lno) else: u.m = m def report(u, tname): print '%-20s SKIP %s' % (tname, u.m) def debug(m): global opts if not opts.debug: return print >>sys.stderr, 'atd-run: debug:', m def flatten(l): return reduce((lambda a,b: a + b), l, []) class Path: def __init__(p, tb, path, what, dir=False): p.tb = tb p.p = path p.what = what p.dir = dir if p.tb: if p.p[:1] != '/': bomb("path %s specified as being in testbed but" " not absolute: `%s'" % (what, p.p)) p.local = None else: p.local = p.p if p.dir: p.dirsfx = '/' else: p.dirsfx = '' def path(p): return p.p + p.dirsfx def append(p, suffix, what, dir=False): return Path(p.tb, p.path() + suffix, what=what, dir=dir) def __str__(p): if p.tb: pfx = '/VIRT' elif p.p[:1] == '/': pfx = '/HOST' else: pfx = './' return pfx + p.p def onhost(p): if not p.tb: return p.p if p.local is not None: return p.local testbed.open() p.local = tmpdir + '/tb.' + p.what testbed.command('copyup', (p.path(), p.local + p.dirsfx)) return p.local def parse_args(): global opts usage = "%prog -- ..." parser = OptionParser(usage=usage) pa = parser.add_option pe = parser.add_option def cb_vserv(op,optstr,value,parser): parser.values.vserver = list(parser.rargs) del parser.rargs[:] def cb_path(op,optstr,value,parser, long,tb,dir): name = long.replace('-','_') setattr(parser.values, name, Path(tb, value, long, dir)) def pa_path(long, dir, help): def papa_tb(long, ca, pahelp): pa('', long, action='callback', callback=cb_path, nargs=1, type='string', callback_args=ca, help=(help % pahelp), metavar='PATH') papa_tb('--'+long, (long, False, dir), 'host') papa_tb('--'+long+'-tb',(long, True, dir), 'testbed') pa_path('build-tree', True, 'use build tree from PATH on %s') pa_path('control', False, 'read control file PATH on %s') pa('-d', '--debug', action='store_true', dest='debug'); pa('','--user', type='string', help='run tests as USER (needs root on testbed)') class SpecialOption(optparse.Option): pass vs_op = SpecialOption('','--VSERVER-DUMMY') vs_op.action = 'callback' vs_op.type = None vs_op.default = None vs_op.nargs = 0 vs_op.callback = cb_vserv vs_op.callback_args = ( ) vs_op.callback_kwargs = { } vs_op.help = 'introduces virtualisation server and args' vs_op._short_opts = [] #vs_op._long_opts = ['--DUMMY'] vs_op._long_opts = ['---'] pa(vs_op) (opts,args) = parser.parse_args() if not hasattr(opts,'vserver'): parser.error('you must specifiy --- ...') if opts.build_tree is None: opts.build_tree = Path(False, '.', 'build-tree', dir=True) if opts.control is None: opts.control = opts.build_tree.append( 'debian/tests/control', 'control') class Testbed: def __init__(tb): tb.sp = None tb.lastsend = None tb.scratch = None def start(tb): p = subprocess.PIPE tb.sp = subprocess.Popen(opts.vserver, stdin=p, stdout=p, stderr=None) tb.expect('ok') def stop(tb): tb.close() if tb.sp is None: return ec = tb.sp.returncode if ec is None: tb.sp.stdout.close() tb.send('quit') tb.sp.stdin.close() ec = tb.sp.wait() if ec: tb.bomb('testbed gave exit status %d after quit' % ec) def open(tb): if tb.scratch is not None: return p = tb.commandr1('open') tb.scratch = Path(True, p, 'tb-scratch', dir=True) def close(tb): if tb.scratch is None: return tb.scratch = None tb.command('close') def bomb(tb, m): if tb.sp is not None: tb.sp.stdout.close() tb.sp.stdin.close() ec = tb.sp.wait() if ec: print >>sys.stderr, ('adt-run: testbed failing,' ' exit status %d' % ec) tb.sp = None raise Quit(16, 'testbed failed: %s' % m) def send(tb, string): try: debug('>> '+string) print >>tb.sp.stdin, string tb.sp.stdin.flush() tb.lastsend = string except: tb.bomb('cannot send to testbed: %s' % formatexception_only(sys.last_type, sys.last_value)) def expect(tb, keyword, nresults=-1): l = tb.sp.stdout.readline() if not l: tb.bomb('unexpected eof from the testbed') if not l.endswith('\n'): tb.bomb('unterminated line from the testbed') l = l.rstrip('\n') debug('<< '+l) ll = l.split() if not ll: tb.bomb('unexpected whitespace-only line from the testbed') if ll[0] != keyword: if tb.lastsend is None: tb.bomb("got banner `%s', expected `%s...'" % (l, keyword)) else: tb.bomb("sent `%s', got `%s', expected `%s...'" % (tb.lastsend, l, keyword)) ll = ll[1:] if nresults >= 0 and len(ll) != nresults: tb.bomb("sent `%s', got `%s' (%d result parameters)," " expected %d result parameters" % (string, l, len(ll), nresults)) return ll def commandr(tb, cmd, nresults, args=()): al = [cmd] + map(urllib.quote, args) tb.send(string.join(al)) ll = tb.expect('ok') rl = map(urllib.unquote, ll) return rl def command(tb, cmd, args=()): tb.commandr(cmd, 0, args) def commandr1(tb, cmd, args=()): rl = tb.commandr(cmd, 1, args) return rl[0] class FieldBase: def __init__(f, fname, stz, base, tnames, vl): f.stz = stz f.base = base f.tnames = tnames f.vl = vl def words(f): def distribute(vle): (lno, v) = vle r = v.split() r = map((lambda w: (lno, w)), r) return r return flatten(map(distribute, f.vl)) def atmostone(f, default): if not vl: f.v = default f.lno = -1 elif len(vl) == 1: (f.lno, f.v) = vl[0] else: raise Unsupported(f.vl[1][0], 'only one %s field allowed' % fn) return f.v class FieldIgnore(FieldBase): def parse(f): pass class Restriction: def __init__(r,rname,base): pass class Restriction_rw_build_tree(Restriction): pass class Field_Restrictions(FieldBase): def parse(f): for wle in f.words(): (lno, rname) = wle rname = rname.replace('-','_') try: rclass = globals()['Restriction_'+rname] except KeyError: raise Unsupported(lno, 'unknown restriction %s' % rname) r = rclass(rname, f.base) f.base['restrictions'].append(r) class Field_Tests(FieldIgnore): pass class Field_Tests_directory(FieldBase): def parse(f): base['testsdir'] = oneonly(f) class Test: def __init__(t, tname, base): t.tname = tname for k in base: setattr(t,k,base[k]) def read_control(): global tests control = file(opts.control.onhost(), 'r') lno = 0 def badctrl(m): testbed.badpkg('tests/control line %d: %s' % (lno, m)) stz = None # stz[field_name][index] = (lno, value) stanzas = [ ] stz = None def end_stanza(stz): if stz is None: return stz[' errs'] = 0 stanzas.append(stz) stz = None hcurrent = None initre = regexp.compile('([A-Z][-0-9a-z]*)\s*\:\s*(.*)$') while 1: l = control.readline() if not l: break lno += 1 if not l.endswith('\n'): badctrl('unterminated line') if regexp.compile('\s*\#').match(l): continue if not regexp.compile('\S').match(l): end_stanza(stz); continue initmat = initre.match(l) if initmat: (fname, l) = initmat.groups() fname = string.capwords(fname) if stz is None: stz = { ' lno': lno } if not stz.has_key(fname): stz[fname] = [ ] hcurrent = stz[fname] elif regexp.compile('\s').match(l): if not hcurrent: badctrl('unexpected continuation') else: badctrl('syntax error') hcurrent.append((lno, l)) end_stanza(stz) def testbadctrl(stz, lno, m): report_badctrl(lno, m) stz[' errs'] += 1 for stz in stanzas: try: try: tnames = stz['Tests'] except KeyError: tnames = ['*'] raise Unsupported(stz[' lno'], 'no Tests field') tnames = map((lambda lt: lt[1]), tnames) tnames = string.join(tnames).split() base = { 'restrictions': [], 'testsdir': 'debian/tests' } for fname in stz.keys(): if fname.startswith(' '): continue vl = stz[fname] try: fclass = globals()['Field_'+ fname.replace('-','_')] except KeyError: raise Unsupported(vl[0][0], 'unknown metadata field %s' % fname) f = fclass(stz, fname, base, tnames, vl) f.parse() except Unsupported, u: for tname in tnames: u.report(tname) continue tests = [] for tname in tnames: t = Test(tname, base) tests.append(t) testbed.close() def print_exception(ei, msgprefix=''): if msgprefix: print >>sys.stderr, msgprefix (et, q, tb) = ei if et is Quit: print >>sys.stderr, 'adt-run:', q.m return q.ec else: print >>sys.stderr, "adt-run: unexpected, exceptional, error:" traceback.print_exc() return 20 def cleanup(): try: rm_ec = 0 if tmpdir is not None: rm_ec = subprocess.call(['rm','-rf','--',tmpdir]) if testbed is not None: testbed.stop() if rm_ec: bomb('rm -rf -- %s failed, code %d' % (tmpdir, ec)) except: print_exception(sys.exc_info(), '\nadt-run: error cleaning up:\n') sys.exit(20) def main(): global testbed global tmpdir try: parse_args() tmpdir = tempfile.mkdtemp() testbed = Testbed() testbed.start() read_control() except: ec = print_exception(sys.exc_info(), '') cleanup() sys.exit(ec) cleanup() main()