#!/usr/bin/python2.4 # # adt-run is part of autodebtest # autodebtest is a tool for testing Debian binary packages # # autodebtest is Copyright (C) 2006 Canonical Ltd. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. # # See the file CREDITS for a full list of credits information (often # installed as /usr/share/doc/autodebtest/CREDITS). import signal import optparse import tempfile import sys import subprocess import traceback import urllib import string import re as regexp import os import errno from optparse import OptionParser tmpdir = None testbed = None errorcode = 0 signal.signal(signal.SIGINT, signal.SIG_DFL) # undo stupid Python SIGINT thing class Quit: def __init__(q,ec,m): q.ec = ec; q.m = m def bomb(m): raise Quit(20, "unexpected error: %s" % m) def badpkg(m): raise Quit(12, "erroneous package: %s" % m) def report(tname, result): print '%-20s %s' % (tname, result) class Unsupported: def __init__(u, lno, m): if lno >= 0: u.m = '%s (control line %d)' % (m, lno) else: u.m = m def report(u, tname): global errorcode errorcode != 2 report(tname, 'SKIP %s' % u.m) def debug(m): global opts if not opts.debug: return print >>sys.stderr, 'atd-run: debug:', m def flatten(l): return reduce((lambda a,b: a + b), l, []) class Path: def __init__(p, tb, path, what, dir=False, tbscratch=None): p.tb = tb p.p = path p.what = what p.dir = dir p.tbscratch = tbscratch if p.tb: if p.p[:1] != '/': bomb("path %s specified as being in testbed but" " not absolute: `%s'" % (what, p.p)) p.local = None p.down = p.p else: p.local = p.p p.down = None if p.dir: p.dirsfx = '/' else: p.dirsfx = '' def path(p): return p.p + p.dirsfx def append(p, suffix, what, dir=False): return Path(p.tb, p.path() + suffix, what=what, dir=dir, tbscratch=p.tbscratch) def __str__(p): if p.tb: pfx = '/VIRT' elif p.p[:1] == '/': pfx = '/HOST' else: pfx = './' return pfx + p.p def onhost(p, lpath = None): if p.local is not None: if lpath is not None: assert(p.local == lpath) return p.local testbed.open() p.local = lpath if p.local is None: p.local = tmpdir + '/tb-' + p.what testbed.command('copyup', (p.path(), p.local + p.dirsfx)) return p.local def ontb(p): testbed.open() if p.tbscratch is not None: if p.tbscratch != testbed.scratch: p.down = None if p.down is not None: return p.down if p.tb: bomb("testbed scratch path " + str(p) + " survived testbed") p.down = testbed.scratch.p + '/host-' + p.what p.tbscratch = testbed.scratch testbed.command('copydown', (p.path(), p.down + p.dirsfx)) return p.down def parse_args(): global opts usage = "%prog -- ..." parser = OptionParser(usage=usage) pa = parser.add_option pe = parser.add_option def cb_vserv(op,optstr,value,parser): parser.values.vserver = list(parser.rargs) del parser.rargs[:] def cb_path(op,optstr,value,parser, long,tb,dir): name = long.replace('-','_') setattr(parser.values, name, Path(tb, value, long, dir)) def pa_path(long, dir, help): def papa_tb(long, ca, pahelp): pa('', long, action='callback', callback=cb_path, nargs=1, type='string', callback_args=ca, help=(help % pahelp), metavar='PATH') papa_tb('--'+long, (long, False, dir), 'host') papa_tb('--'+long+'-tb',(long, True, dir), 'testbed') pa_path('build-tree', True, 'use build tree from PATH on %s') pa_path('control', False, 'read control file PATH on %s') pa_path('output-dir', True, 'write stderr/out files in PATH on %s') pa('-d', '--debug', action='store_true', dest='debug'); # pa('','--user', type='string', # help='run tests as USER (needs root on testbed)') # nyi class SpecialOption(optparse.Option): pass vs_op = SpecialOption('','--VSERVER-DUMMY') vs_op.action = 'callback' vs_op.type = None vs_op.default = None vs_op.nargs = 0 vs_op.callback = cb_vserv vs_op.callback_args = ( ) vs_op.callback_kwargs = { } vs_op.help = 'introduces virtualisation server and args' vs_op._short_opts = [] #vs_op._long_opts = ['--DUMMY'] vs_op._long_opts = ['---'] pa(vs_op) (opts,args) = parser.parse_args() if not hasattr(opts,'vserver'): parser.error('you must specifiy --- ...') if opts.build_tree is None: opts.build_tree = Path(False, '.', 'build-tree', dir=True) if opts.control is None: opts.control = opts.build_tree.append( 'debian/tests/control', 'control') class Testbed: def __init__(tb): tb.sp = None tb.lastsend = None tb.scratch = None def start(tb): p = subprocess.PIPE tb.sp = subprocess.Popen(opts.vserver, stdin=p, stdout=p, stderr=None) tb.expect('ok') def stop(tb): tb.close() if tb.sp is None: return ec = tb.sp.returncode if ec is None: tb.sp.stdout.close() tb.send('quit') tb.sp.stdin.close() ec = tb.sp.wait() if ec: tb.bomb('testbed gave exit status %d after quit' % ec) def open(tb): if tb.scratch is not None: return p = tb.commandr1('open') tb.scratch = Path(True, p, 'tb-scratch', dir=True) tb.scratch.tbscratch = tb.scratch def close(tb): if tb.scratch is None: return tb.scratch = None if tb.sp is None: return tb.command('close') def bomb(tb, m): if tb.sp is not None: tb.sp.stdout.close() tb.sp.stdin.close() ec = tb.sp.wait() if ec: print >>sys.stderr, ('adt-run: testbed failing,' ' exit status %d' % ec) tb.sp = None raise Quit(16, 'testbed failed: %s' % m) def send(tb, string): tb.sp.stdin try: debug('>> '+string) print >>tb.sp.stdin, string tb.sp.stdin.flush() tb.lastsend = string except: (type, value, dummy) = sys.exc_info() tb.bomb('cannot send to testbed: %s' % traceback. format_exception_only(type, value)) def expect(tb, keyword, nresults=-1): l = tb.sp.stdout.readline() if not l: tb.bomb('unexpected eof from the testbed') if not l.endswith('\n'): tb.bomb('unterminated line from the testbed') l = l.rstrip('\n') debug('<< '+l) ll = l.split() if not ll: tb.bomb('unexpected whitespace-only line from the testbed') if ll[0] != keyword: if tb.lastsend is None: tb.bomb("got banner `%s', expected `%s...'" % (l, keyword)) else: tb.bomb("sent `%s', got `%s', expected `%s...'" % (tb.lastsend, l, keyword)) ll = ll[1:] if nresults >= 0 and len(ll) != nresults: tb.bomb("sent `%s', got `%s' (%d result parameters)," " expected %d result parameters" % (string, l, len(ll), nresults)) return ll def commandr(tb, cmd, nresults, args=()): al = [cmd] + map(urllib.quote, args) tb.send(string.join(al)) ll = tb.expect('ok') rl = map(urllib.unquote, ll) return rl def command(tb, cmd, args=()): tb.commandr(cmd, 0, args) def commandr1(tb, cmd, args=()): rl = tb.commandr(cmd, 1, args) return rl[0] class FieldBase: def __init__(f, fname, stz, base, tnames, vl): assert(vl) f.stz = stz f.base = base f.tnames = tnames f.vl = vl def words(f): def distribute(vle): (lno, v) = vle r = v.split() r = map((lambda w: (lno, w)), r) return r return flatten(map(distribute, f.vl)) def atmostone(f): if len(vl) == 1: (f.lno, f.v) = vl[0] else: raise Unsupported(f.vl[1][0], 'only one %s field allowed' % fn) return f.v class FieldIgnore(FieldBase): def parse(f): pass class Restriction: def __init__(r,rname,base): pass class Restriction_rw_build_tree(Restriction): pass class Field_Restrictions(FieldBase): def parse(f): for wle in f.words(): (lno, rname) = wle rname = rname.replace('-','_') try: rclass = globals()['Restriction_'+rname] except KeyError: raise Unsupported(lno, 'unknown restriction %s' % rname) r = rclass(rname, f.base) f.base['restrictions'].append(r) class Field_Tests(FieldIgnore): pass class Field_Tests_directory(FieldBase): def parse(f): td = atmostone(f) if td.startswith('/'): raise Unspported(f.lno, 'Tests-Directory may not be absolute') base['testsdir'] = td def run_tests(): testbed.close() for t in tests: t.run() if not tests: global errorcode report('*', 'SKIP no tests in this package') errorcode |= 8 class Test: def __init__(t, tname, base): if '/' in tname: raise Unsupported(base[' lno'], 'test name may not contain / character') for k in base: setattr(t,k,base[k]) t.tname = tname if len(base['testsdir']): tpath = base['testsdir'] + '/' + tname else: tpath = tname t.p = opts.build_tree.append(tpath, 'test-'+tname) def report(t, m): report(t.tname, m) def reportfail(t, m): global errorcode errorcode |= 4 report(t.tname, 'FAIL ' + m) def run(t): testbed.open() def stdouterr(oe): idstr = oe + '-' + t.tname if opts.output_dir is not None and opts.output_dir.tb: return opts.output_dir.append(idstr) else: return testbed.scratch.append(idstr, idstr) def stdouterrh(p, oe): idstr = oe + '-' + t.tname if opts.output_dir is None or opts.output_dir.tb: return p.onhost() else: return p.onhost(opts.output_dir.onhost() + '/' + idstr) so = stdouterr('stdout') se = stdouterr('stderr') rc = testbed.commandr1('execute',(t.p.ontb(), '/dev/null', so.ontb(), se.ontb(), opts.build_tree.ontb())) soh = stdouterrh(so, 'stdout') seh = stdouterrh(se, 'stderr') testbed.close() rc = int(rc) stab = os.stat(seh) if stab.st_size != 0: l = file(seh).readline() l = l.rstrip('\n \t\r') if len(l) > 40: l = l[:40] + '...' t.reportfail('stderr: %s' % l) elif rc != 0: t.reportfail('non-zero exit status %d' % rc) else: t.report('PASS') def read_control(): global tests try: control = file(opts.control.onhost(), 'r') except IOError, oe: if oe[0] != errno.ENOENT: raise tests = [] return lno = 0 def badctrl(m): testbed.badpkg('tests/control line %d: %s' % (lno, m)) stz = None # stz[field_name][index] = (lno, value) stanzas = [ ] stz = None def end_stanza(stz): if stz is None: return stz[' errs'] = 0 stanzas.append(stz) stz = None hcurrent = None initre = regexp.compile('([A-Z][-0-9a-z]*)\s*\:\s*(.*)$') while 1: l = control.readline() if not l: break lno += 1 if not l.endswith('\n'): badctrl('unterminated line') if regexp.compile('\s*\#').match(l): continue if not regexp.compile('\S').match(l): end_stanza(stz); continue initmat = initre.match(l) if initmat: (fname, l) = initmat.groups() fname = string.capwords(fname) if stz is None: stz = { ' lno': lno } if not stz.has_key(fname): stz[fname] = [ ] hcurrent = stz[fname] elif regexp.compile('\s').match(l): if not hcurrent: badctrl('unexpected continuation') else: badctrl('syntax error') hcurrent.append((lno, l)) end_stanza(stz) def testbadctrl(stz, lno, m): report_badctrl(lno, m) stz[' errs'] += 1 for stz in stanzas: try: try: tnames = stz['Tests'] except KeyError: tnames = ['*'] raise Unsupported(stz[' lno'], 'no Tests field') tnames = map((lambda lt: lt[1]), tnames) tnames = string.join(tnames).split() base = { 'restrictions': [], 'testsdir': 'debian/tests' } for fname in stz.keys(): if fname.startswith(' '): continue vl = stz[fname] try: fclass = globals()['Field_'+ fname.replace('-','_')] except KeyError: raise Unsupported(vl[0][0], 'unknown metadata field %s' % fname) f = fclass(stz, fname, base, tnames, vl) f.parse() tests = [] for tname in tnames: t = Test(tname, base) tests.append(t) except Unsupported, u: for tname in tnames: u.report(tname) continue def print_exception(ei, msgprefix=''): if msgprefix: print >>sys.stderr, msgprefix (et, q, tb) = ei if et is Quit: print >>sys.stderr, 'adt-run:', q.m return q.ec else: print >>sys.stderr, "adt-run: unexpected, exceptional, error:" traceback.print_exc() return 20 def cleanup(): try: rm_ec = 0 if tmpdir is not None: rm_ec = subprocess.call(['rm','-rf','--',tmpdir]) if testbed is not None: testbed.stop() if rm_ec: bomb('rm -rf -- %s failed, code %d' % (tmpdir, ec)) except: print_exception(sys.exc_info(), '\nadt-run: error cleaning up:\n') os._exit(20) def main(): global testbed global tmpdir try: parse_args() except SystemExit, se: os._exit(20) try: tmpdir = tempfile.mkdtemp() testbed = Testbed() testbed.start() testbed.open() testbed.close() read_control() run_tests() except: ec = print_exception(sys.exc_info(), '') cleanup() os._exit(ec) cleanup() os._exit(errorcode) main()