#!/usr/bin/python2.4 # # adt-run is part of autopkgtest # autopkgtest is a tool for testing Debian binary packages # # autopkgtest is Copyright (C) 2006 Canonical Ltd. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. # # See the file CREDITS for a full list of credits information (often # installed as /usr/share/doc/autopkgtest/CREDITS). import signal import optparse import tempfile import sys import subprocess import traceback import urllib import string import re as regexp import os import errno import fnmatch import shutil from optparse import OptionParser signal.signal(signal.SIGINT, signal.SIG_DFL) # undo stupid Python SIGINT thing #---------- global variables tmpdir = None # pathstring on host testbed = None # Testbed errorcode = 0 # exit status that we are going to use binaries = None # Binaries (.debs we have registered) #---------- errors we define class Quit: def __init__(q,ec,m): q.ec = ec; q.m = m def bomb(m): raise Quit(20, "unexpected error: %s" % m) def badpkg(m): print 'blame: ', ' '.join(testbed.blamed) raise Quit(12, "erroneous package: %s" % m) def report(tname, result): print '%-20s %s' % (tname, result) class Unsupported: def __init__(u, lno, m): if lno >= 0: u.m = '%s (control line %d)' % (m, lno) else: u.m = m def report(u, tname): global errorcode errorcode != 2 report(tname, 'SKIP %s' % u.m) def debug(m): global opts if not opts.debug: return print >>sys.stderr, 'atd-run: debug:', m def flatten(l): return reduce((lambda a,b: a + b), l, []) #---------- fancy automatic file-copying class class Path: # p.path[tb] None or path not None => path known # p.file[tb] None or path not None => file exists # p.what # p.spec # p.spec_tb # p.tb_scratch def ensure_path(p, tb=False): if tb and not p.spec_tb: if not testbed.scratch: error "called ensure_path for `%s' when testbed closed" % what if not p.tb_scratch or p.tb_scratch is not testbed.scratch: if p.path[tb] is not None: return if tb: p.path[tb] = p.tb_tmpdir else: p.path[tb] = tmpdir p.path[tb] += '/'+p.what def ensure_file(p, tb=False): if p.file[tb] is not None: return p.ensure_path(tb) testbed.open() def write(p, tb=False): p.ensure_path(tb) return p.path[tb] def read(p, tb=False): p.ensure_file(tb) class InputPath: class OutputPath: class OutputPath: def __init__(p, path, spec_tb, what, dir=False): if p.tb: if p.p[:1] != '/': bomb("path %s specified as being in testbed but" " not absolute: `%s'" % (what, p.p)) p.path[spec_tb] = p.file[spec_tb] = path p.what = what if spec_tb: p. tb_path = path p.tb_onpath = path p.tb_onhost = None 4 def __init__(p, tb, path, what, dir=False, tbscratch=None, xfmap=None lpath=None): p.tb = tb p.p = path p.what = what p.dir = dir p.tbscratch = tbscratch p.lpath = None if p.tb: if p.p[:1] != '/': bomb("path %s specified as being in testbed but" " not absolute: `%s'" % (what, p.p)) p.local = None p.down = p.p else: p.local = p.p p.down = None if p.dir: p.dirsfx = '/' else: p.dirsfx = '' def path(p): return p.p + p.dirsfx def append(p, suffix, what, dir=False): return Path(p.tb, p.path() + suffix, what=what, dir=dir, tbscratch=p.tbscratch) def __str__(p): if p.tb: pfx = '/VIRT' elif p.p[:1] == '/': pfx = '/HOST' else: pfx = './' return pfx + p.p def xfmapcopy(p, cud, dstdir): if p.xfmap is None: return srcdir = os.path.dirname(p.path()+'/') dstdir = p.xfmapdstdir+'/' for f in p.xfmap(file(p.local)): if '/' in f: bomb("control file %s mentions other filename" "containing slash" % p.what) testbed.command(cud, (srcdir+f, dstdir+f)) def onhost(p, lpath = None): if lpath is not None: if p.lpath is not None: assert(p.lpath == lpath) p.lpath = lpath if p.local is not None: if p.lpath is not None: assert(p.local == p.lpath) return p.local if p.xfmap is None: p.local = p.lpath if p.local is None: p.local = tmpdir + '/tb-' + p.what else: assert(p.lpath is None) assert(not p.dir) p.xfmapdstdir = tmpdir + '/tbd-' + p.what os.mkdir(p.xfmapdstdir) p.local = p.xfmapdstdir + '/' + os.path.basename(p.down) testbed.command('copyup', (p.path(), p.local + p.dirsfx)) p.xfmapcopy('copyup') return p.local def maybe_onhost(p): if p.lpath is None: return None return p.onhost() def ontb(p): if p.tbscratch is not None: if p.tbscratch != testbed.scratch: p.down = None if p.down is not None: return p.down if p.tb: bomb("testbed scratch path " + str(p) + " survived testbed") if p.xfmap is None: p.down = testbed.scratch.p + '/host-' + p.what else: assert(not p.dir) p.xfmapdstdir = testbed.scratch.p + '/hostd-' + p.what testbed.command('mkdir '+p.xfmapdstdir) p.down = p.xfmapdstdir + '/' + os.path.basename(p.local) p.tbscratch = testbed.scratch testbed.command('copydown', (p.path(), p.down + p.dirsfx)) p.xfmapcopy('copydown') return p.down #---------- parsing and representation of the arguments class Action: def __init__(a, kind, path, arghandling, ix): # extra attributes get added during processing a.kind = kind a.path = path # just a string a.ah = arghandling a.what = '%s%s' % (kind,ix) def parse_args(): global opts usage = "%prog -- ..." parser = OptionParser(usage=usage) pa = parser.add_option pe = parser.add_option arghandling = { 'dsc_tests': True, 'dsc_filter': '*', 'deb_forbuilds': 'auto', 'deb_fortests': 'auto', 'tb': False, 'override_control': None } initial_arghandling = arghandling.copy() n_actions = 0 #---------- # actions (ie, test sets to run, sources to build, binaries to use): def cb_action(op,optstr,value,parser, long,kindpath,is_act): parser.largs.append((value,kindpath)) n_actions += is_act def pa_action(long, metavar, kindpath, help, is_act=True): pa('','--'+long, action='callback', callback=cb_action, nargs=1, type='string', callback_args=(long,kindpath,is_act), help=help) pa_action('build-tree', 'TREE', '@/', help='run tests from build tree TREE') pa_action('source', 'DSC', '@.dsc', help='build DSC and use its tests and/or' ' generated binary packages') pa_action('binary', 'DEB', '@.deb', help='use binary package DEB according' ' to most recent --binaries-* settings') pa_action('override-control', 'CONTROL', ('control',), is_act=0, help='run tests from control file CONTROL instead, ' (applies to next test suite only)') #---------- # argument handling settings (what ways to use action # arguments, and pathname processing): def cb_setah(option, opt_str, value, parser, toset,setval): if type(setval) == list: if not value in setval: parser.error('value for %s option (%s) is not ' 'one of the permitted values (%s)' % (value, opt_str, setval.join(' '))) elif setval is not None: value = setval for v in toset: arghandling[v] = value parser.largs.append(arghandling.copy()) def pa_setah(long, affected,effect, **kwargs): type = metavar; if type: type = 'string' pa('',long, action='callback', callback=cb_setah, callback_args=(affected,effect), **kwargs) ' according to most recent --binaries-* settings') #---- paths: host or testbed: # pa_setah('--paths-testbed', ['tb'],True, help='subsequent path specifications refer to the testbed') pa_setah('--paths-host', ['tb'],False, help='subsequent path specifications refer to the host') #---- source processing settings: pa_setah('--sources-tests', ['dsc_tests'],True, help='run tests from builds of subsequent sources') pa_setah('--sources-no-tests', ['dsc_tests'],False, help='do not run tests from builds of subsequent sources') pa_setah('--built-binaries-filter', ['dsc_filter'],None, type=string, metavar='PATTERN-LIST', help='from subsequent sources, use binaries matching' ' PATTERN-LIST (comma-separated glob patterns)' ' according to most recent --binaries-* settings') pa_setah('--no-built-binaries', ['dsc_filter'], '_', help='from subsequent sources, do not use any binaries') #---- binary package processing settings: def pa_setahbins(long,toset,how): pa_setah(long, toset,['ignore','auto','install'], type=string, metavar='IGNORE|AUTO|INSTALL', default='auto', help=how+' ignore binaries, install them as needed' ' for dependencies, or unconditionally install' ' them, respectively') pa_setahbins('--binaries', ['deb_forbuilds','deb_fortests'], '') pa_setahbins('--binaries-forbuilds', ['deb_forbuilds'], 'for builds, ') pa_setahbins('--binaries-fortests', ['deb_fortests'], 'for tests, ') #---------- # general options: def cb_vserv(op,optstr,value,parser): parser.values.vserver = list(parser.rargs) del parser.rargs[:] def cb_path(op,optstr,value,parser, long,dir,xfmap): name = long.replace('-','_') path = Path(arghandling['tb'], value, long, dir, xfmap=xfmap) setattr(parser.values, name, path) def pa_path(long, help, dir=False, xfmap=None): pa('','--'+long, action='callback', callback=cb_path, nargs=1, type='string', callback_args=(long,dir,xfmap), help=, metavar='PATH') pa_path('output-dir', 'write stderr/out files in PATH', dir=True) pa('','--user', type='string', dest='user', help='run tests as USER (needs root on testbed)') pa('','--fakeroot', type='string', dest='fakeroot', help='prefix debian/rules build with FAKEROOT') pa('-d', '--debug', action='store_true', dest='debug'); pa('','--gnupg-home', type='string', dest='gnupghome', default='~/.autopkgtest/gpg', help='use GNUPGHOME rather than ~/.autopkgtest (for " signing private apt archive);" " `fresh' means generate new key each time.") #---------- # actual meat: class SpecialOption(optparse.Option): pass vs_op = SpecialOption('','--VSERVER-DUMMY') vs_op.action = 'callback' vs_op.type = None vs_op.default = None vs_op.nargs = 0 vs_op.callback = cb_vserv vs_op.callback_args = ( ) vs_op.callback_kwargs = { } vs_op.help = 'introduces virtualisation server and args' vs_op._short_opts = [] vs_op._long_opts = ['---'] pa(vs_op) (opts,args) = parser.parse_args() if not hasattr(opts,'vserver'): parser.error('you must specifiy --- ...') if not n_actions: parser.error('nothing to do specified') arghandling = initial_arghandling opts.actions = [] ix = 0 for act in args: if type(act) == dict: arghandling = act continue elif type(act) == tuple: pass elif type(act) == string: act = (act,act) else: error "unknown action in list `%s' having" "type `%s' % (act, type(act)) (pathname, kindpath) = act if type(kindpath) is tuple: kind = kindpath[0] elif kindpath.endswith('/'): kind = 'tree' elif kindpath.endswith('.deb'): kind = 'deb' elif kindpath.endswith('.dsc'): kind = 'dsc' else: parser.error("do not know how to handle filename \`%s';" " specify --source --binary or --build-tree") path = InputPath(pathname, arghandling['tb']) opts.actions.append(Action(kind, path, arghandling, ix)) ix++ def finalise_options(): global opts, testbed if opts.user is None and 'root-on-testbed' not in caps: opts.user = '' if opts.user is None: su = 'suggested-normal-user=' ul = [ e[length(su):] for e in caps if e.startswith(su) ] if len(ul) > 1: print >>sys.stderr, "warning: virtualisation" " system offers several suggested-normal-user" " values: "+('/'.join(ul))+", using "+ul[0] if ul: opts.user = ul[0] else: opts.user = '' if opts.user: if 'root-on-testbed' not in caps: print >>sys.stderr, "warning: virtualisation" " system does not offer root on testbed," " but --user option specified: failure likely" opts.user_wrap = lambda x: 'su %s -c "%s"' % (opts.user, x) else: opts.user_wrap = lambda x: x if opts.fakeroot is None: opts.fakeroot = '' if opts.user or 'root-on-testbed' not in testbed.caps: opts.fakeroot = 'fakeroot' if opts.gnupghome.startswith('~/'): try: home = os.environ['HOME'] except KeyError: parser.error("HOME environment variable" " not set, needed for --gnupghome=`%s" % opts.gnupghome) opts.gnupghome = home + opts.gnupghome[1:] elif opts.gnupghome == 'fresh': opts.gnupghome = None #---------- testbed management - the Testbed class class Testbed: def __init__(tb): tb.sp = None tb.lastsend = None tb.scratch = None tb.modified = False tb.blamed = [] def start(tb): p = subprocess.PIPE tb.sp = subprocess.Popen(opts.vserver, stdin=p, stdout=p, stderr=None) tb.expect('ok') tb.caps = tb.command('capabilities') def stop(tb): tb.close() if tb.sp is None: return ec = tb.sp.returncode if ec is None: tb.sp.stdout.close() tb.send('quit') tb.sp.stdin.close() ec = tb.sp.wait() if ec: tb.bomb('testbed gave exit status %d after quit' % ec) def open(tb): if tb.scratch is not None: return p = tb.commandr1('open') tb.scratch = Path(True, p, 'tb-scratch', dir=True) tb.scratch.tbscratch = tb.scratch def close(tb): if tb.scratch is None: return tb.scratch = None if tb.sp is None: return tb.command('close') def prepare(tb): if tb.modified and 'reset' in caps: tb.command('reset') tb.blamed = [] tb.modified = False binaries.publish(act) def needs_reset(tb): tb.modified = True def blame(tb, m): tb.blamed.append(m) def bomb(tb, m): if tb.sp is not None: tb.sp.stdout.close() tb.sp.stdin.close() ec = tb.sp.wait() if ec: print >>sys.stderr, ('adt-run: testbed failing,' ' exit status %d' % ec) tb.sp = None raise Quit(16, 'testbed failed: %s' % m) def send(tb, string): tb.sp.stdin try: debug('>> '+string) print >>tb.sp.stdin, string tb.sp.stdin.flush() tb.lastsend = string except: (type, value, dummy) = sys.exc_info() tb.bomb('cannot send to testbed: %s' % traceback. format_exception_only(type, value)) def expect(tb, keyword, nresults=-1): l = tb.sp.stdout.readline() if not l: tb.bomb('unexpected eof from the testbed') if not l.endswith('\n'): tb.bomb('unterminated line from the testbed') l = l.rstrip('\n') debug('<< '+l) ll = l.split() if not ll: tb.bomb('unexpected whitespace-only line from the testbed') if ll[0] != keyword: if tb.lastsend is None: tb.bomb("got banner `%s', expected `%s...'" % (l, keyword)) else: tb.bomb("sent `%s', got `%s', expected `%s...'" % (tb.lastsend, l, keyword)) ll = ll[1:] if nresults >= 0 and len(ll) != nresults: tb.bomb("sent `%s', got `%s' (%d result parameters)," " expected %d result parameters" % (string, l, len(ll), nresults)) return ll def commandr(tb, cmd, nresults, args=()): if type(cmd) is str: cmd = [cmd] al = cmd + map(urllib.quote, args) tb.send(string.join(al)) ll = tb.expect('ok') rl = map(urllib.unquote, ll) return rl def command(tb, cmd, args=()): tb.commandr(cmd, 0, args) def commandr1(tb, cmd, args=()): rl = tb.commandr(cmd, 1, args) return rl[0] #---------- representation of test control files: Field*, Test, etc. class FieldBase: def __init__(f, fname, stz, base, tnames, vl): assert(vl) f.stz = stz f.base = base f.tnames = tnames f.vl = vl def words(f): def distribute(vle): (lno, v) = vle r = v.split() r = map((lambda w: (lno, w)), r) return r return flatten(map(distribute, f.vl)) def atmostone(f): if len(vl) == 1: (f.lno, f.v) = vl[0] else: raise Unsupported(f.vl[1][0], 'only one %s field allowed' % fn) return f.v class FieldIgnore(FieldBase): def parse(f): pass class Restriction: def __init__(r,rname,base): pass class Restriction_rw_tests_tree(Restriction): pass class Restriction_breaks_testbed(Restriction): if 'reset' not in caps: raise Unsupported(f.lno, 'Test breaks testbed but testbed cannot reset') class Field_Restrictions(FieldBase): def parse(f): for wle in f.words(): (lno, rname) = wle rname = rname.replace('-','_') try: rclass = globals()['Restriction_'+rname] except KeyError: raise Unsupported(lno, 'unknown restriction %s' % rname) r = rclass(rname, f.base) f.base['restrictions'].append(r) class Field_Tests(FieldIgnore): pass class Field_Tests_directory(FieldBase): def parse(f): td = atmostone(f) if td.startswith('/'): raise Unspported(f.lno, 'Tests-Directory may not be absolute') base['testsdir'] = td def run_tests(stanzas): global errorcode for stanza in stanzas: tests = stanza[' tests'] if not tests: report('*', 'SKIP no tests in this package') errorcode |= 8 for t in tests: testbed.prepare() t.run() if 'breaks-testbed' in t.restrictions: testbed.needs_reset() testbed.needs_reset() class Test: def __init__(t, tname, base): if '/' in tname: raise Unsupported(base[' lno'], 'test name may not contain / character') for k in base: setattr(t,k,base[k]) t.tname = tname if len(base['testsdir']): tpath = base['testsdir'] + '/' + tname else: tpath = tname t.p = opts.tests_tree.append(tpath, 'test-'+tname) def report(t, m): report(t.tname, m) def reportfail(t, m): global errorcode errorcode |= 4 report(t.tname, 'FAIL ' + m) def run(t): def stdouterr(oe): idstr = oe + '-' + t.tname if opts.output_dir is not None and opts.output_dir.tb: return opts.output_dir.append(idstr) else: return testbed.scratch.append(idstr, idstr) def stdouterrh(p, oe): idstr = oe + '-' + t.tname if opts.output_dir is None or opts.output_dir.tb: return p.onhost() else: return p.onhost(opts.output_dir.onhost() + '/' + idstr) so = stdouterr('stdout') se = stdouterr('stderr') rc = testbed.commandr1('execute',(t.p.ontb(), '/dev/null', so.ontb(), se.ontb(), opts.tests_tree.ontb())) soh = stdouterrh(so, 'stdout') seh = stdouterrh(se, 'stderr') rc = int(rc) stab = os.stat(seh) if stab.st_size != 0: l = file(seh).readline() l = l.rstrip('\n \t\r') if len(l) > 40: l = l[:40] + '...' t.reportfail('stderr: %s' % l) elif rc != 0: t.reportfail('non-zero exit status %d' % rc) else: t.report('PASS') def read_control(act, tree, control_override): stanzas = [ ] if control_override is not None: control_path = control_override testbed.blame('arg:'+control_path) else: control_path = tree.append('/debian/tests/control') testbed.blame('arg:'+tree.spec) try: control = file(control_path.onhost(), 'r') except IOError, oe: if oe[0] != errno.ENOENT: raise return [] lno = 0 def badctrl(m): act.bomb('tests/control line %d: %s' % (lno, m)) stz = None # stz[field_name][index] = (lno, value) # special field names: # stz[' lno'] = number # stz[' tests'] = list of Test objects def end_stanza(stz): if stz is None: return stz[' errs'] = 0 stanzas.append(stz) stz = None hcurrent = None initre = regexp.compile('([A-Z][-0-9a-z]*)\s*\:\s*(.*)$') while 1: l = control.readline() if not l: break lno += 1 if not l.endswith('\n'): badctrl('unterminated line') if regexp.compile('\s*\#').match(l): continue if not regexp.compile('\S').match(l): end_stanza(stz); continue initmat = initre.match(l) if initmat: (fname, l) = initmat.groups() fname = string.capwords(fname) if stz is None: stz = { ' lno': lno, ' tests': [] } if not stz.has_key(fname): stz[fname] = [ ] hcurrent = stz[fname] elif regexp.compile('\s').match(l): if not hcurrent: badctrl('unexpected continuation') else: badctrl('syntax error') hcurrent.append((lno, l)) end_stanza(stz) def testbadctrl(stz, lno, m): report_badctrl(lno, m) stz[' errs'] += 1 for stz in stanzas: try: try: tnames = stz['Tests'] except KeyError: tnames = ['*'] raise Unsupported(stz[' lno'], 'no Tests field') tnames = map((lambda lt: lt[1]), tnames) tnames = string.join(tnames).split() base = { 'restrictions': [], 'testsdir': 'debian/tests' } for fname in stz.keys(): if fname.startswith(' '): continue vl = stz[fname] try: fclass = globals()['Field_'+ fname.replace('-','_')] except KeyError: raise Unsupported(vl[0][0], 'unknown metadata field %s' % fname) f = fclass(stz, fname, base, tnames, vl) f.parse() for tname in tnames: t = Test(tname, base) stz[' tests'].append(t) except Unsupported, u: for tname in tnames: u.report(tname) continue return stanzas def print_exception(ei, msgprefix=''): if msgprefix: print >>sys.stderr, msgprefix (et, q, tb) = ei if et is Quit: print >>sys.stderr, 'adt-run:', q.m return q.ec else: print >>sys.stderr, "adt-run: unexpected, exceptional, error:" traceback.print_exc() return 20 def cleanup(): try: rm_ec = 0 if tmpdir is not None: shutil.rmtree(tmpdir) if testbed is not None: testbed.stop() if rm_ec: bomb('rm -rf -- %s failed, code %d' % (tmpdir, ec)) except: print_exception(sys.exc_info(), '\nadt-run: error cleaning up:\n') os._exit(20) #---------- registration, installation etc. of .deb's: Binaries def determine_package(act): cmd = 'dpkg-deb --info --'.split(' ')+[act.path.read(),'control'] running = Popen(cmd, stdout=PIPE) output = running.communicate()[0] rc = running.wait() if rc: badpkg('failed to parse binary package, code %d' % rc) re = regexp.compile('^\s*Package\s*:\s*([0-9a-z][-+.0-9a-z]*)\s*$') act.pkg = None for l in '\n'.split(output): m = re.match(output) if not m: continue if act.pkg: badpkg('two Package: lines in control file') act.pkg = m.groups if not act.pkg: badpkg('no good Package: line in control file') class Binaries: def __init__(b): b.dir = tmpdir+'/binaries' if opts.gnupghome is None: opts.gnupghome = tmpdir+'/gnupg' try: for x in ['pubring','secring']: os.stat(opts.gnupghome + '/' + x + '.gpg') except IOError, oe: if oe.errno != errno.ENOENT: raise try: os.mkdir(opts.gnupghome, 0700) except IOError, oe: if oe.errno != errno.EEXIST: raise script = ' cd "$1" exec >key-gen-log 2>&1 cat <<"END" >key-gen-params Key-Type: DSA Key-Length: 1024 Key-Usage: sign Name-Real: autopkgtest per-run key Name-Comment: do not trust this key Name-Email: autopkgtest@example.com END set -x gpg --homedir="$1" --batch --gen-key key-gen-params ' cmdl = ['sh','-ec',script,'x',opts.gnupghome] rc = subprocess.call(cmdl) if rc: try: f = open(opts.gnupghome+'/key-gen-log') tp = file.read() except IOError, e: tp = e print >>sys.stderr, tp bomb('key generation failed, code %d' % rc) def reset(b): shutil.rmtree(b.dir) os.mkdir(b.dir) b.tbpath = testbed.scratch.append('/binaries') b.install = [] b.blamed = [] def register(b, act, pkg, path, forwhat, blamed): if act.ah['deb_'+forwhat] == 'ignore': return b.blamed += testbed.blamed here = path.read() leafname = pkg+'.deb' dest = b.dir+'/'+leafname try: os.remove(dest) except IOError, oe: if oe.errno != errno.ENOENT: raise e try: os.link(here, dest) except IOError, oe: if oe.errno != errno.EXDEV: raise e shutil.copy(here, dest) if act.ah['deb_'+forwhat] == 'install': b.install.append(pkg) def publish(b): script = ' cd "$1" apt-ftparchive packages . >Packages gzip -f Packages apt-ftparchive release . >Release gpg --homedir="$2" --batch --detach-sign --armour -o Release.gpg Release gpg --homedir="$2" --batch --export >archive-key.pgp ' cmdl = ['sh','-ec',script,'x',b.dir,opts.gnupghome] rc = subprocess.call(cmd) if rc: bomb('apt-ftparchive or signature failed, code %d' % rc) tbp = b.tbpath.write(True) testbed.command('copydown', (b.dir+'/', tbp+'/')) se = TemporaryPath('%s-aptkey-stderr' % act.what) script = ' apt-key add archive-key.pgp echo "deb file:///'+tbp+'/ /" >/etc/apt/sources.list.d/autopkgtest ' rc = testbed.commandr1(['execute', ','.join(map(urllib.quote, ['sh','-ec','script']))], '/dev/null', '/dev/null', se.write(True), tbp) if rc: bomb('apt setup failed with exit code %d' % rc, se) testbed.blamed += b.blamed for pkg in b.install: testbed.blame(pkg) se = TemporaryPath('%s-install-%s-stderr' % (act.what,pkg)) rc = testbed.commandr1('execute','apt-get,-qy,install,'+pkg, '/dev/null','/dev/null',se.ontb(), tb.scratch.read(True)) if rc: badpkg("installation of %s failed, exit code %d" % (pkg, rc), se) #---------- processing of sources (building) def source_rules_command(act,script,which,work,results_lines=0): script = "exec 3>&1 >&2\n" + '\n'.join(script) so = TemporaryPath('%s-%s-results' % (what,which)) se = TemporaryPath('%s-%s-log' & (what,which)) rc = testbed.commandr1(['execute', ','.join(map(urllib.quote, ['sh','-xec',script]))], '/dev/null', so.write(True), se.write(True), work.write(True)) results = file(so.read()).read().split("\n") if rc: badpkg_se("%s failed with exit code %d" % (which,rc), se) if results_lines is not None and len(results) != results_lines: badpkg_se("got %d lines of results from %s where %d expected" % (len(results), which, results_lines), se) if results_lines==1: return results[0] return results def build_source(act): act.blame = 'arg:'+act.path.spec() testbed.blame(act.blame) testbed.needs_reset() what = act.ah['what'] dsc = act.path basename = dsc.spec; if basename is None: basename = 'source.dsc' dsc_what = what+'/'+basename dsc_file = open(dsc.read()) in_files = False fre = regexp.compile('^\s+[0-9a-f]+\s+\d+\s+([^/.][^/]*)$') for l in dsc_file(): if l.startswith('Files:'): in_files = True elif l.startswith('#'): pass elif not l.startswith(' '): in_files = False if l.startswith('Source:'): act.blame = 'dsc:'+l[7:].strip() testbed.blame(act.blame) elif not in_files: pass if not dsc.spec_tb: continue m = re.match(l) if not m: badpkg(".dsc contains unparseable line" " in Files: `%s'" % (`dsc`,l)) subfile = dsc.enclosingdir().append('/'+m.groups(0)) subfile.read(True) dsc.read(True) work = AccumulationPath(what+'/build', dir=True) script = [ 'cd '+work.write(True), 'gdebi '+dsc.read(True), 'dpkg-source -x '+dsc.read(True), 'cd */.', 'pwd >&3', opts.user_wrap('debian/rules build'), ] result_pwd = source_rules_command(act,script,what,'build',work,1) if os.path.dirname(result_pwd) != work.read(True): badpkg_se("results dir `%s' is not in expected parent dir `%s'" % (results[0], work.read(True)), se) act.tests_tree = work.append('/'+os.path.basename(results[0])) if act.ah['dsc_tests']: act.tests_tree.preserve_now() act.blamed = testbed.blamed.copy() act.binaries = [] if act.ah['dsc_filter'] != '_': script = [ 'cd '+work.write(True)+'/*/.', opts.user_wrap(opts.fakeroot+' debian/rules binary'), 'cd ..', 'echo *.deb >&3', ] result_debs = source_rules_command(act,script,what, 'debian/rules binary',work,1) if result_debs == '*': debs = [] else: debs = debs.split(' ') re = regexp.compile('^([-+.0-9a-z]+)_[^_/]+(?:_[^_/]+)\.deb$') for deb in debs: m = re.match(deb) if not m: badpkg("badly-named binary `%s'" % deb) pkg = m.groups(0) for pat in act.ah['dsc_filter'].split(','): if fnmatch.fnmatchcase(pkg,pat): deb_path = work.read()+'/'+deb deb_what = pkg+'_'+what+'.deb' bin = InputPath(deb_what,deb_path,True) bin.preserve_now() binaries.register(act,pkg,bin,'builds', testbed.blamed) act.binaries.append((pkg,bin)) break #---------- main processing loop and main program def process_actions(): global binaries binaries = Binaries() b.reset() for act in opts.actions: testbed.prepare() if act.kind == 'deb': blame('arg:'+path.spec) determine_package(act) blame('deb:'+act.pkg) binaries.register(act,act.pkg,act.path,'builds', testbed.blamed) if act.kind == 'dsc': build_source(act) b.reset() control_override = None for act in opts.actions: testbed.prepare() if act.kind == 'control': control_override = act.path if act.kind == 'deb': binaries.register(act,act.pkg,act.path,'tests', ['deb:'+act.pkg]) if act.kind == 'dsc': for (pkg,bin) in act.binaries: binaries.register(act,pkg,bin,'tests', act.blamed) if not act.ah['dsc_tests']: continue stanzas = read_control(act, act.tests_tree, control_override) testbed.blamed += act.blamed run_tests(act, stanzas) control_override = None if act.kind == 'tree': testbed.blame('arg:'+act.path.spec) stanzas = read_control(act, act.path, control_override) run_tests(act, stanzas) control_override = None def main(): global testbed global tmpdir try: parse_args() except SystemExit, se: os._exit(20) try: tmpdir = tempfile.mkdtemp() testbed = Testbed() testbed.start() finalise_options() process_actions() except: ec = print_exception(sys.exc_info(), '') cleanup() os._exit(ec) cleanup() os._exit(errorcode) main()