#!/usr/bin/python2.4 # # adt-run is part of autopkgtest # autopkgtest is a tool for testing Debian binary packages # # autopkgtest is Copyright (C) 2006 Canonical Ltd. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. # # See the file CREDITS for a full list of credits information (often # installed as /usr/share/doc/autopkgtest/CREDITS). import signal import optparse import tempfile import sys import subprocess import traceback import urllib import string import re as regexp import os import errno import fnmatch from optparse import OptionParser tmpdir = None testbed = None errorcode = 0 signal.signal(signal.SIGINT, signal.SIG_DFL) # undo stupid Python SIGINT thing class Quit: def __init__(q,ec,m): q.ec = ec; q.m = m def bomb(m): raise Quit(20, "unexpected error: %s" % m) def badpkg(m): raise Quit(12, "erroneous package: %s" % m) def report(tname, result): print '%-20s %s' % (tname, result) class Unsupported: def __init__(u, lno, m): if lno >= 0: u.m = '%s (control line %d)' % (m, lno) else: u.m = m def report(u, tname): global errorcode errorcode != 2 report(tname, 'SKIP %s' % u.m) def debug(m): global opts if not opts.debug: return print >>sys.stderr, 'atd-run: debug:', m def flatten(l): return reduce((lambda a,b: a + b), l, []) class Path: # p.path[tb] None or path not None => path known # p.file[tb] None or path not None => file exists # p.what # p.spec_tb # p.tb_scratch def ensure_path(p, tb=False): if tb and not p.spec_tb: if not testbed.scratch: error "called ensure_path for `%s' when testbed closed" % what if not p.tb_scratch or p.tb_scratch is not testbed.scratch: if p.path[tb] is not None: return if tb: p.path[tb] = p.tb_tmpdir else: p.path[tb] = tmpdir p.path[tb] += '/'+p.what def ensure_file(p, tb=False): if p.file[tb] is not None: return p.ensure_path(tb) testbed.open() def write(p, tb=False): p.ensure_path(tb) return p.path[tb] def read(p, tb=False): p.ensure_file(tb) class InputPath: class OutputPath: class OutputPath: def __init__(p, path, spec_tb, what, dir=False): if p.tb: if p.p[:1] != '/': bomb("path %s specified as being in testbed but" " not absolute: `%s'" % (what, p.p)) p.path[spec_tb] = p.file[spec_tb] = path p.what = what if spec_tb: p. tb_path = path p.tb_onpath = path p.tb_onhost = None 4 def __init__(p, tb, path, what, dir=False, tbscratch=None, xfmap=None lpath=None): p.tb = tb p.p = path p.what = what p.dir = dir p.tbscratch = tbscratch p.lpath = None if p.tb: if p.p[:1] != '/': bomb("path %s specified as being in testbed but" " not absolute: `%s'" % (what, p.p)) p.local = None p.down = p.p else: p.local = p.p p.down = None if p.dir: p.dirsfx = '/' else: p.dirsfx = '' def path(p): return p.p + p.dirsfx def append(p, suffix, what, dir=False): return Path(p.tb, p.path() + suffix, what=what, dir=dir, tbscratch=p.tbscratch) def __str__(p): if p.tb: pfx = '/VIRT' elif p.p[:1] == '/': pfx = '/HOST' else: pfx = './' return pfx + p.p def xfmapcopy(p, cud, dstdir): if p.xfmap is None: return srcdir = os.path.dirname(p.path()+'/') dstdir = p.xfmapdstdir+'/' for f in p.xfmap(file(p.local)): if '/' in f: bomb("control file %s mentions other filename" "containing slash" % p.what) testbed.command(cud, (srcdir+f, dstdir+f)) def onhost(p, lpath = None): if lpath is not None: if p.lpath is not None: assert(p.lpath == lpath) p.lpath = lpath if p.local is not None: if p.lpath is not None: assert(p.local == p.lpath) return p.local testbed.open() if p.xfmap is None: p.local = p.lpath if p.local is None: p.local = tmpdir + '/tb-' + p.what else: assert(p.lpath is None) assert(not p.dir) p.xfmapdstdir = tmpdir + '/tbd-' + p.what os.mkdir(p.xfmapdstdir) p.local = p.xfmapdstdir + '/' + os.path.basename(p.down) testbed.command('copyup', (p.path(), p.local + p.dirsfx)) p.xfmapcopy('copyup') return p.local def maybe_onhost(p): if p.lpath is None: return None return p.onhost() def ontb(p): testbed.open() if p.tbscratch is not None: if p.tbscratch != testbed.scratch: p.down = None if p.down is not None: return p.down if p.tb: bomb("testbed scratch path " + str(p) + " survived testbed") if p.xfmap is None: p.down = testbed.scratch.p + '/host-' + p.what else: assert(not p.dir) p.xfmapdstdir = testbed.scratch.p + '/hostd-' + p.what testbed.command('mkdir '+p.xfmapdstdir) p.down = p.xfmapdstdir + '/' + os.path.basename(p.local) p.tbscratch = testbed.scratch testbed.command('copydown', (p.path(), p.down + p.dirsfx)) p.xfmapcopy('copydown') return p.down def Action: def __init__(a, kind, path, arghandling, ix): a.kind = kind a.path = path a.ah = arghandling a.what = '%s%s' % (kind,ix); ix++ def parse_args(): global opts usage = "%prog -- ..." parser = OptionParser(usage=usage) pa = parser.add_option pe = parser.add_option arghandling = { 'dsc_tests': True, 'dsc_filter': '*', 'deb_forbuilds': 'auto', 'deb_fortests': 'auto', 'tb': False, 'override_control': None } initial_arghandling = arghandling.copy() n_actions = 0 #---------- # actions (ie, test sets to run, sources to build, binaries to use): def cb_action(op,optstr,value,parser, long,kindpath,is_act): parser.largs.append((value,kindpath)) n_actions += is_act def pa_action(long, metavar, kindpath, help, is_act=True): pa('','--'+long, action='callback', callback=cb_action, nargs=1, type='string', callback_args=(long,kindpath,is_act), help=help) pa_action('build-tree', 'TREE', '@/', help='run tests from build tree TREE') pa_action('source', 'DSC', '@.dsc', help='build DSC and use its tests and/or' ' generated binary packages') pa_action('binary', 'DEB', '@.deb', help='use binary package DEB according' ' to most recent --binaries-* settings') pa_action('override-control', 'CONTROL', ('control',), is_act=0, help='run tests from control file CONTROL instead, ' (applies to next test suite only)') #---------- # argument handling settings (what ways to use action # arguments, and pathname processing): def cb_setah(option, opt_str, value, parser, toset,setval): if type(setval) == list: if not value in setval: parser.error('value for %s option (%s) is not ' 'one of the permitted values (%s)' % (value, opt_str, setval.join(' '))) elif setval is not None: value = setval for v in toset: arghandling[v] = value parser.largs.append(arghandling.copy()) def pa_setah(long, affected,effect, **kwargs): type = metavar; if type: type = 'string' pa('',long, action='callback', callback=cb_setah, callback_args=(affected,effect), **kwargs) ' according to most recent --binaries-* settings') #---- paths: host or testbed: # pa_setah('--paths-testbed', ['tb'],True, help='subsequent path specifications refer to the testbed') pa_setah('--paths-host', ['tb'],False, help='subsequent path specifications refer to the host') #---- source processing settings: pa_setah('--sources-tests', ['dsc_tests'],True, help='run tests from builds of subsequent sources') pa_setah('--sources-no-tests', ['dsc_tests'],False, help='do not run tests from builds of subsequent sources') pa_setah('--built-binaries-filter', ['dsc_filter'],None, type=string, metavar='PATTERN-LIST', help='from subsequent sources, use binaries matching' ' PATTERN-LIST (comma-separated glob patterns)' ' according to most recent --binaries-* settings') pa_setah('--no-built-binaries', ['dsc_filter'], '_', help='from subsequent sources, do not use any binaries') #---- binary package processing settings: def pa_setahbins(long,toset,how): pa_setah(long, toset,['ignore','auto','install'], type=string, metavar='IGNORE|AUTO|INSTALL', default='auto', help=how+' ignore binaries, install them as needed' ' for dependencies, or unconditionally install' ' them, respectively') pa_setahbins('--binaries', ['deb_forbuilds','deb_fortests'], '') pa_setahbins('--binaries-forbuilds', ['deb_forbuilds'], 'for builds, ') pa_setahbins('--binaries-fortests', ['deb_fortests'], 'for tests, ') #---------- # general options: def cb_vserv(op,optstr,value,parser): parser.values.vserver = list(parser.rargs) del parser.rargs[:] def cb_path(op,optstr,value,parser, long,dir,xfmap): name = long.replace('-','_') path = Path(arghandling['tb'], value, long, dir, xfmap=xfmap) setattr(parser.values, name, path) def pa_path(long, help, dir=False, xfmap=None): pa('','--'+long, action='callback', callback=cb_path, nargs=1, type='string', callback_args=(long,dir,xfmap), help=, metavar='PATH') pa_path('output-dir', 'write stderr/out files in PATH', dir=True) pa('','--user', type='string', dest='user', help='run tests as USER (needs root on testbed)') pa('','--fakeroot', type='string', dest='fakeroot', help='prefix debian/rules build with FAKEROOT') pa('-d', '--debug', action='store_true', dest='debug'); #---------- # actual meat: class SpecialOption(optparse.Option): pass vs_op = SpecialOption('','--VSERVER-DUMMY') vs_op.action = 'callback' vs_op.type = None vs_op.default = None vs_op.nargs = 0 vs_op.callback = cb_vserv vs_op.callback_args = ( ) vs_op.callback_kwargs = { } vs_op.help = 'introduces virtualisation server and args' vs_op._short_opts = [] vs_op._long_opts = ['---'] pa(vs_op) (opts,args) = parser.parse_args() if not hasattr(opts,'vserver'): parser.error('you must specifiy --- ...') if not n_actions: parser.error('nothing to do specified') arghandling = initial_arghandling opts.actions = [] ix = 0 for act in args: if type(act) == dict: arghandling = act continue elif type(act) == tuple: pass elif type(act) == string: act = (act,act) else: error "unknown action in list `%s' having" "type `%s' % (act, type(act)) (path, kindpath) = act if type(kindpath) is tuple: kind = kindpath[0] elif kindpath.endswith('/'): kind = 'tree' elif kindpath.endswith('.deb'): kind = 'deb' elif kindpath.endswith('.dsc'): kind = 'dsc' else: parser.error("do not know how to handle filename \`%s';" " specify --source --binary or --build-tree") opts.actions.append(Action(kind, path, arghandling, ix)) ix++ def finalise_options(): global opts, testbed if opts.user is None and 'root-on-testbed' not in caps: opts.user = '' if opts.user is None: su = 'suggested-normal-user=' ul = [ e[length(su):] for e in caps if e.startswith(su) ] if len(ul) > 1: print >>sys.stderr, "warning: virtualisation" " system offers several suggested-normal-user" " values: "+('/'.join(ul))+", using "+ul[0] if ul: opts.user = ul[0] else: opts.user = '' if opts.user: if 'root-on-testbed' not in caps: print >>sys.stderr, "warning: virtualisation" " system does not offer root on testbed," " but --user option specified: failure likely" opts.user_wrap = lambda x: 'su %s -c "%s"' % (opts.user, x) else: opts.user_wrap = lambda x: x if opts.fakeroot is None: opts.fakeroot = '' if opts.user or 'root-on-testbed' not in testbed.caps: opts.fakeroot = 'fakeroot' logpath_counters = {} def logpath(idstr): # if idstr ends with `-' then a counter is appended if idstr.endswith('-'): if not logpath_counters.has_key(idstr): logpath_counters[idstr] = 1 else: logpath_counters[idstr] += 1 idstr.append(`logpath_counters[idstr]`) idstr = 'log-' + idstr if opts.output_dir is None: return testbed.scratch.append(idstr, idstr) elif opts.output_dir.tb: return opts.output_dir.append(idstr, idstr) else: return Path(True, testbed.scratch.p, idstr, lpath=opts.output_dir.p+'/'+idstr) class Testbed: def __init__(tb): tb.sp = None tb.lastsend = None tb.scratch = None def start(tb): p = subprocess.PIPE tb.sp = subprocess.Popen(opts.vserver, stdin=p, stdout=p, stderr=None) tb.expect('ok') tb.caps = tb.command('capabilities') def stop(tb): tb.close() if tb.sp is None: return ec = tb.sp.returncode if ec is None: tb.sp.stdout.close() tb.send('quit') tb.sp.stdin.close() ec = tb.sp.wait() if ec: tb.bomb('testbed gave exit status %d after quit' % ec) def open(tb): if tb.scratch is not None: return p = tb.commandr1('open') tb.scratch = Path(True, p, 'tb-scratch', dir=True) tb.scratch.tbscratch = tb.scratch def close(tb): if tb.scratch is None: return tb.scratch = None if tb.sp is None: return tb.command('close') def bomb(tb, m): if tb.sp is not None: tb.sp.stdout.close() tb.sp.stdin.close() ec = tb.sp.wait() if ec: print >>sys.stderr, ('adt-run: testbed failing,' ' exit status %d' % ec) tb.sp = None raise Quit(16, 'testbed failed: %s' % m) def send(tb, string): tb.sp.stdin try: debug('>> '+string) print >>tb.sp.stdin, string tb.sp.stdin.flush() tb.lastsend = string except: (type, value, dummy) = sys.exc_info() tb.bomb('cannot send to testbed: %s' % traceback. format_exception_only(type, value)) def expect(tb, keyword, nresults=-1): l = tb.sp.stdout.readline() if not l: tb.bomb('unexpected eof from the testbed') if not l.endswith('\n'): tb.bomb('unterminated line from the testbed') l = l.rstrip('\n') debug('<< '+l) ll = l.split() if not ll: tb.bomb('unexpected whitespace-only line from the testbed') if ll[0] != keyword: if tb.lastsend is None: tb.bomb("got banner `%s', expected `%s...'" % (l, keyword)) else: tb.bomb("sent `%s', got `%s', expected `%s...'" % (tb.lastsend, l, keyword)) ll = ll[1:] if nresults >= 0 and len(ll) != nresults: tb.bomb("sent `%s', got `%s' (%d result parameters)," " expected %d result parameters" % (string, l, len(ll), nresults)) return ll def commandr(tb, cmd, nresults, args=()): if type(cmd) is str: cmd = [cmd] al = cmd + map(urllib.quote, args) tb.send(string.join(al)) ll = tb.expect('ok') rl = map(urllib.unquote, ll) return rl def command(tb, cmd, args=()): tb.commandr(cmd, 0, args) def commandr1(tb, cmd, args=()): rl = tb.commandr(cmd, 1, args) return rl[0] class FieldBase: def __init__(f, fname, stz, base, tnames, vl): assert(vl) f.stz = stz f.base = base f.tnames = tnames f.vl = vl def words(f): def distribute(vle): (lno, v) = vle r = v.split() r = map((lambda w: (lno, w)), r) return r return flatten(map(distribute, f.vl)) def atmostone(f): if len(vl) == 1: (f.lno, f.v) = vl[0] else: raise Unsupported(f.vl[1][0], 'only one %s field allowed' % fn) return f.v def acquire_built_source(): global opts if opts.build_source: assert(opts.tests_tree is None) bss = build_some_source('t', opts.build_source) opts.tests_tree = bss[0] class FieldIgnore(FieldBase): def parse(f): pass class Restriction: def __init__(r,rname,base): pass class Restriction_rw_tests_tree(Restriction): pass class Field_Restrictions(FieldBase): def parse(f): for wle in f.words(): (lno, rname) = wle rname = rname.replace('-','_') try: rclass = globals()['Restriction_'+rname] except KeyError: raise Unsupported(lno, 'unknown restriction %s' % rname) r = rclass(rname, f.base) f.base['restrictions'].append(r) class Field_Tests(FieldIgnore): pass class Field_Tests_directory(FieldBase): def parse(f): td = atmostone(f) if td.startswith('/'): raise Unspported(f.lno, 'Tests-Directory may not be absolute') base['testsdir'] = td def run_tests(): for t in tests: t.run() if not tests: global errorcode report('*', 'SKIP no tests in this package') errorcode |= 8 class Test: def __init__(t, tname, base): if '/' in tname: raise Unsupported(base[' lno'], 'test name may not contain / character') for k in base: setattr(t,k,base[k]) t.tname = tname if len(base['testsdir']): tpath = base['testsdir'] + '/' + tname else: tpath = tname t.p = opts.tests_tree.append(tpath, 'test-'+tname) def report(t, m): report(t.tname, m) def reportfail(t, m): global errorcode errorcode |= 4 report(t.tname, 'FAIL ' + m) def run(t): testbed.open() def stdouterr(oe): idstr = oe + '-' + t.tname if opts.output_dir is not None and opts.output_dir.tb: return opts.output_dir.append(idstr) else: return testbed.scratch.append(idstr, idstr) def stdouterrh(p, oe): idstr = oe + '-' + t.tname if opts.output_dir is None or opts.output_dir.tb: return p.onhost() else: return p.onhost(opts.output_dir.onhost() + '/' + idstr) so = stdouterr('stdout') se = stdouterr('stderr') rc = testbed.commandr1('execute',(t.p.ontb(), '/dev/null', so.ontb(), se.ontb(), opts.tests_tree.ontb())) soh = stdouterrh(so, 'stdout') seh = stdouterrh(se, 'stderr') rc = int(rc) stab = os.stat(seh) if stab.st_size != 0: l = file(seh).readline() l = l.rstrip('\n \t\r') if len(l) > 40: l = l[:40] + '...' t.reportfail('stderr: %s' % l) elif rc != 0: t.reportfail('non-zero exit status %d' % rc) else: t.report('PASS') def read_control(): global tests try: control = file(opts.control.onhost(), 'r') except IOError, oe: if oe[0] != errno.ENOENT: raise tests = [] return lno = 0 def badctrl(m): testbed.badpkg('tests/control line %d: %s' % (lno, m)) stz = None # stz[field_name][index] = (lno, value) stanzas = [ ] stz = None def end_stanza(stz): if stz is None: return stz[' errs'] = 0 stanzas.append(stz) stz = None hcurrent = None initre = regexp.compile('([A-Z][-0-9a-z]*)\s*\:\s*(.*)$') while 1: l = control.readline() if not l: break lno += 1 if not l.endswith('\n'): badctrl('unterminated line') if regexp.compile('\s*\#').match(l): continue if not regexp.compile('\S').match(l): end_stanza(stz); continue initmat = initre.match(l) if initmat: (fname, l) = initmat.groups() fname = string.capwords(fname) if stz is None: stz = { ' lno': lno } if not stz.has_key(fname): stz[fname] = [ ] hcurrent = stz[fname] elif regexp.compile('\s').match(l): if not hcurrent: badctrl('unexpected continuation') else: badctrl('syntax error') hcurrent.append((lno, l)) end_stanza(stz) def testbadctrl(stz, lno, m): report_badctrl(lno, m) stz[' errs'] += 1 for stz in stanzas: try: try: tnames = stz['Tests'] except KeyError: tnames = ['*'] raise Unsupported(stz[' lno'], 'no Tests field') tnames = map((lambda lt: lt[1]), tnames) tnames = string.join(tnames).split() base = { 'restrictions': [], 'testsdir': 'debian/tests' } for fname in stz.keys(): if fname.startswith(' '): continue vl = stz[fname] try: fclass = globals()['Field_'+ fname.replace('-','_')] except KeyError: raise Unsupported(vl[0][0], 'unknown metadata field %s' % fname) f = fclass(stz, fname, base, tnames, vl) f.parse() tests = [] for tname in tnames: t = Test(tname, base) tests.append(t) except Unsupported, u: for tname in tnames: u.report(tname) continue def print_exception(ei, msgprefix=''): if msgprefix: print >>sys.stderr, msgprefix (et, q, tb) = ei if et is Quit: print >>sys.stderr, 'adt-run:', q.m return q.ec else: print >>sys.stderr, "adt-run: unexpected, exceptional, error:" traceback.print_exc() return 20 def cleanup(): try: rm_ec = 0 if tmpdir is not None: rm_ec = subprocess.call(['rm','-rf','--',tmpdir]) if testbed is not None: testbed.stop() if rm_ec: bomb('rm -rf -- %s failed, code %d' % (tmpdir, ec)) except: print_exception(sys.exc_info(), '\nadt-run: error cleaning up:\n') os._exit(20) def source_rules_command(act,script,which,work,results_lines=0): script = "exec 3>&1 >&2\n" + '\n'.join(script) so = TemporaryPath('%s-%s-results' % (what,which)) se = TemporaryPath('%s-%s-log' & (what,which)) rc = testbed.commandr1(['execute', ','.join(map(urllib.quote, ['sh','-xec',script]))], '/dev/null', so.write(True), se.write(True), work.write(True)) results = file(so.read()).read().split("\n") if rc: act.bomb("%s failed with exit code %d" % (which,rc), se) if results_lines is not None and len(results) != results_lines: act.bomb("got %d lines of results from %s where %d expected" % (len(results), which, results_lines), se) if results_lines==1: return results[0] return results def build_source(act,ah): prepare_testbed_for_action() what = act.ah['what'] dsc_what = what+'/'+os.path.basename(act.path) dsc = InputPath(dsc_what, act.path, arghandling['tb']) if not dsc.spec_tb: dsc_file = open(dsc.read()) in_files = False re = regexp.compile('^\s+[0-9a-f]+\s+\d+\s+([^/.][^/]*)$') for l in dsc_file(): if l.startswith('Files:'): in_files = True elif l.startswith('#'): pass elif not l.startswith(' '): in_files = False elif not in_files: pass m = re.match(l) if not m: act.bomb(".dsc contains unparseable line" " in Files: `%s'" % (`dsc`,l)) subfile = dsc.enclosingdir().append('/'+m.groups(0)) subfile.ensure_file(True) dsc.ensure_file(True) work = AccumulationPath(what+'/build', dir=True) script = [ 'cd '+work.write(True), 'gdebi '+dsc.read(True), 'dpkg-source -x '+dsc.read(True), 'cd */.', 'pwd >&3', opts.user_wrap('debian/rules build'), ] result_pwd = source_rules_command(act,script,what,'build',work,1) if os.path.dirname(result_pwd) != work.read(True): act.bomb("results dir `%s' is not in expected parent dir `%s'" % (results[0], work.read(True)), se) ah.tests_tree = work.append('/'+os.path.basename(results[0])) if ah['dsc_tests']: ah.tests_tree.preserve_now() ah.binaries = [] if ah['dsc_filter'] != '_': script = [ 'cd '+work.write(True)+'/*/.', opts.user_wrap(opts.fakeroot+' debian/rules binary'), 'cd ..', 'echo *.deb >&3', ] result_debs = source_rules_command(act,script,what, 'debian/rules binary',work,1) if result_debs == '*': debs = [] else: debs = debs.split(' ') re = regexp.compile('^([-+.0-9a-z]+)_[^_/]+(?:_[^_/]+)\.deb$') for deb in debs: m = re.match(deb) if not m: act.bomb("badly-named binary `%s'" % deb, se) package = m.groups(deb) for pat in ah['dsc_filter'].split(','): if fnmatch.fnmatchcase(package,pat): deb_path = work.read()+'/'+deb deb_what = package+'_'+what+'.deb' bin = InputPath(deb_what,deb_path,True) bin.preserve_now() record_binary(bin,'builds') ah.binaries.append(bin) break def record_binary( def process_actions(): global binaries binaries = {} ix = 0 for act opts.actions: if act.kind == 'deb': record_binary(act,'builds') if act.kind == 'dsc': build_source(act) # build_source records tree location in ah binaries = {} control_override = None for (kind,path,ah) in opts.actions: if kind == 'control': control_override = act if kind == 'deb': record_binary(act,'tests') if kind == 'dsc': for bin in act.binaries: record_binary(bin,'tests') if not act.ah['dsc_tests']: continue run_tests(act,control_override) control_override = None if kind == 'tree': run_tests(act,control_override) control_override = None def main(): global testbed global tmpdir try: parse_args() except SystemExit, se: os._exit(20) try: tmpdir = tempfile.mkdtemp() testbed = Testbed() testbed.start() testbed.open() finalise_options() process_actions() except: ec = print_exception(sys.exc_info(), '') cleanup() os._exit(ec) cleanup() os._exit(errorcode) main()