#!/usr/bin/python2.4 # # adt-run is part of autopkgtest # autopkgtest is a tool for testing Debian binary packages # # autopkgtest is Copyright (C) 2006 Canonical Ltd. # # This program is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. # # See the file CREDITS for a full list of credits information (often # installed as /usr/share/doc/autopkgtest/CREDITS). import signal import optparse import tempfile import sys import subprocess import traceback import urllib import string import re as regexp import os import errno import fnmatch import shutil from optparse import OptionParser signal.signal(signal.SIGINT, signal.SIG_DFL) # undo stupid Python SIGINT thing #---------- global variables tmpdir = None # pathstring on host testbed = None # Testbed errorcode = 0 # exit status that we are going to use binaries = None # Binaries (.debs we have registered) #---------- errors we define class Quit: def __init__(q,ec,m): q.ec = ec; q.m = m def bomb(m, se=''): print >>sys.stderr, se raise Quit(20, "unexpected error: %s" % m) def badpkg(m, se=''): print >>sys.stderr, se print 'blame: ', ' '.join(testbed.blamed) raise Quit(12, "erroneous package: %s" % m) def report(tname, result): print '%-20s %s' % (tname, result) class Unsupported: def __init__(u, lno, m): if lno >= 0: u.m = '%s (control line %d)' % (m, lno) else: u.m = m def report(u, tname): global errorcode errorcode != 2 report(tname, 'SKIP %s' % u.m) def debug(m): global opts if not opts.debug: return print >>sys.stderr, 'atd-run: debug:', m def flatten(l): return reduce((lambda a,b: a + b), l, []) #---------- fancy automatic file-copying class class AutoFile: # p.what # p.path[tb] None or path not None => path known # p.file[tb] None or path not None => file exists # p.spec string or not set # p.spec_tbp True or False, or not set # p.dir '' or '/' def __init__(p, what): p.what = what p.path = [None,None] p.file = [None,None] p.dir = '' def __str__(p): out = p.what def ptbp(tbp): if p.path[tbp] is None: out += '-' elif p.file[tbp] is None: out += '?'+p.path[tbp] else: out += '!'+p.path[tbp] out += p.dir ptbp(False) out += '|' ptbp(False) if p.has_key('spec'): if p.spec_tb: out += '<' else: out += '>' out += p.spec return out def _wrong(p, how): xtra = '' if p.has_key('spec'): xtra = ' spec[%s]=%s' % (p.spec, p.spec_tb) error "internal error: %s (%s)" % (how, p.__str__()) def _ensure_path(p, tbp): if p.path[tbp] is None: if not tbp: p.path[tbp] = tmpdir+'/'+what else: p.path[tbp] = testbed.scratch.path[True]+'/'+what def write(p, tbp=False): p._ensure_path(tbp) if p.dir and not p.file[tbp]: if not tbp: try: os.mkdir(p.path[tbp]) except IOError, oe: if oe.errno != errno.EEXIST: raise else: cmdl = ['sh','-ec', 'test -d "$1" || mkdir "$1"', 'x', p.path[tbp]] tf_what = urllib.quote(p.what).replace('/',' ') (rc,se) = testbed.execute('mkdir-'+tf_what, cmdl) if rc: bomb('failed to create directory %s' % p.path[tbp], se) p.file[tbp] = p.path[tbp] return p.path[tbp] def read(p, tbp=False): if p.file[not tbp] is None: p._wrong("requesting read but nonexistent") p._ensure_path(tbp) if p.file[tbp] is None: cud = ['copyup','copydown'][tbp] src = p.file[not tbp] + p.dir dst = p.file[tbp] + p.dir testbed.command(cud, (src, dst)) return p.file[tbp] def subpath(p, what, leaf, constructor): if not p.dir: p._wrong("creating subpath of non-directory") return constructor(what, p.spec+p.dir+leaf, p.spec_tbp) def sibling(p, what, leaf, constructor): dir = os.path.dirname(p.spec) if dir: dir += '/' return constructor(what, dir+leaf, p.spec_tbp) def invalidate(p, tbp=False): p.file[tbp] = None def _check(p): for tbp in [False,True]: for pf in [t.path, t.file]: if pf[tbp] is None: continue if not pf[tbp]: bomb('empty path specified for '+p.what) if p.dir and pf[tbp].endswith('/'): pf[tbp] = pf[tbp].rstrip('/') if not pf[tbp]: pf[tbp] = '/' if not p.dir and pf[tbp].endswith('/'): bomb("directory `%s' specified for " "non-directory %s" % (pf[tbp], p.what)) class InputFile(Path): def __init__(p, what, spec, spec_tbp=False): AutoFile.__init__(p, what) p.spec = spec p.spec_tbp = spec_tbp p.path[spec_tbp] = p.file[spec_tbp] = spec p._check() class InputDir(Path): def __init__(p, what, spec, spec_tbp=False): InputFile.__init__(p,what,spec,spec_tbp) p.dir = '/' p._check() class OutputFile(Path): def __init__(p, what, spec, spec_tbp=False): AutoFile.__init__(p, what) p.spec = spec p.spec_tbp = spec_tbp p.path[spec_tbp] = spec p._check() class OutputDir(Path): def __init__(p, what, spec, spec_tbp=False): OutputFile.__init__(p,what,spec,spec_tbp) p.dir = '/' p._check() class TemporaryFile(Path): def __init__(p, what): AutoFile.__init__(p, what) #---------- parsing and representation of the arguments class Action: def __init__(a, kind, af, arghandling, what): # extra attributes get added during processing a.kind = kind a.af = af a.ah = arghandling a.what = what def parse_args(): global opts usage = "%prog -- ..." parser = OptionParser(usage=usage) pa = parser.add_option pe = parser.add_option arghandling = { 'dsc_tests': True, 'dsc_filter': '*', 'deb_forbuilds': 'auto', 'deb_fortests': 'auto', 'tb': False, 'override_control': None } initial_arghandling = arghandling.copy() n_actions = 0 #---------- # actions (ie, test sets to run, sources to build, binaries to use): def cb_action(op,optstr,value,parser, long,kindpath,is_act): parser.largs.append((value,kindpath)) n_actions += is_act def pa_action(long, metavar, kindpath, help, is_act=True): pa('','--'+long, action='callback', callback=cb_action, nargs=1, type='string', callback_args=(long,kindpath,is_act), help=help) pa_action('build-tree', 'TREE', '@/', help='run tests from build tree TREE') pa_action('source', 'DSC', '@.dsc', help='build DSC and use its tests and/or' ' generated binary packages') pa_action('binary', 'DEB', '@.deb', help='use binary package DEB according' ' to most recent --binaries-* settings') pa_action('override-control', 'CONTROL', ('control',), is_act=0, help='run tests from control file CONTROL instead, ' (applies to next test suite only)') #---------- # argument handling settings (what ways to use action # arguments, and pathname processing): def cb_setah(option, opt_str, value, parser, toset,setval): if type(setval) == list: if not value in setval: parser.error('value for %s option (%s) is not ' 'one of the permitted values (%s)' % (value, opt_str, setval.join(' '))) elif setval is not None: value = setval for v in toset: arghandling[v] = value parser.largs.append(arghandling.copy()) def pa_setah(long, affected,effect, **kwargs): type = metavar; if type: type = 'string' pa('',long, action='callback', callback=cb_setah, callback_args=(affected,effect), **kwargs) ' according to most recent --binaries-* settings') #---- paths: host or testbed: # pa_setah('--paths-testbed', ['tb'],True, help='subsequent path specifications refer to the testbed') pa_setah('--paths-host', ['tb'],False, help='subsequent path specifications refer to the host') #---- source processing settings: pa_setah('--sources-tests', ['dsc_tests'],True, help='run tests from builds of subsequent sources') pa_setah('--sources-no-tests', ['dsc_tests'],False, help='do not run tests from builds of subsequent sources') pa_setah('--built-binaries-filter', ['dsc_filter'],None, type=string, metavar='PATTERN-LIST', help='from subsequent sources, use binaries matching' ' PATTERN-LIST (comma-separated glob patterns)' ' according to most recent --binaries-* settings') pa_setah('--no-built-binaries', ['dsc_filter'], '_', help='from subsequent sources, do not use any binaries') #---- binary package processing settings: def pa_setahbins(long,toset,how): pa_setah(long, toset,['ignore','auto','install'], type=string, metavar='IGNORE|AUTO|INSTALL', default='auto', help=how+' ignore binaries, install them as needed' ' for dependencies, or unconditionally install' ' them, respectively') pa_setahbins('--binaries', ['deb_forbuilds','deb_fortests'], '') pa_setahbins('--binaries-forbuilds', ['deb_forbuilds'], 'for builds, ') pa_setahbins('--binaries-fortests', ['deb_fortests'], 'for tests, ') #---------- # general options: def cb_vserv(op,optstr,value,parser): parser.values.vserver = list(parser.rargs) del parser.rargs[:] def cb_path(op,optstr,value,parser, constructor,long,dir): name = long.replace('-','_') af = constructor(arghandling['tb'], value, long, dir) setattr(parser.values, name, af) def pa_path(long, constructor, help, dir=False): pa('','--'+long, action='callback', callback=cb_path, callback_args=(constructor,long,dir), nargs=1, type='string', help=help, metavar='PATH') pa_path('output-dir', OutputDir, dir=True, help='write stderr/out files in PATH') pa('','--tmp-dir', type='string', dest='tmpdir', help='write temporary files to TMPDIR, emptying it' ' beforehand and leaving it behind at the end') pa('','--user', type='string', dest='user', help='run tests as USER (needs root on testbed)') pa('','--gain-root', type='string', dest='gainroot', help='prefix debian/rules binary with GAINROOT') pa('-d', '--debug', action='store_true', dest='debug'); pa('','--gnupg-home', type='string', dest='gnupghome', default='~/.autopkgtest/gpg', help='use GNUPGHOME rather than ~/.autopkgtest (for " signing private apt archive);" " `fresh' means generate new key each time.") #---------- # actual meat: class SpecialOption(optparse.Option): pass vs_op = SpecialOption('','--VSERVER-DUMMY') vs_op.action = 'callback' vs_op.type = None vs_op.default = None vs_op.nargs = 0 vs_op.callback = cb_vserv vs_op.callback_args = ( ) vs_op.callback_kwargs = { } vs_op.help = 'introduces virtualisation server and args' vs_op._short_opts = [] vs_op._long_opts = ['---'] pa(vs_op) (opts,args) = parser.parse_args() if not hasattr(opts,'vserver'): parser.error('you must specifiy --- ...') if not n_actions: parser.error('nothing to do specified') arghandling = initial_arghandling opts.actions = [] ix = 0 for act in args: if type(act) == dict: arghandling = act continue elif type(act) == tuple: pass elif type(act) == string: act = (act,act) else: error "unknown action in list `%s' having" "type `%s' % (act, type(act)) (pathstr, kindpath) = act constructor = InputPath if type(kindpath) is tuple: kind = kindpath[0] elif kindpath.endswith('.deb'): kind = 'deb' elif kindpath.endswith('.dsc'): kind = 'dsc' elif kindpath.endswith('/'): kind = 'tree' constructor = InputPathDir else: parser.error("do not know how to handle filename \`%s';" " specify --source --binary or --build-tree") what = '%s%s' % (kind,ix); ix++ af = constructor(what+'-'+kind, pathstr, arghandling['tb']) opts.actions.append(Action(kind, af, arghandling, what)) def finalise_options(): global opts, testbed if opts.user is None and 'root-on-testbed' not in caps: opts.user = '' if opts.user is None: su = 'suggested-normal-user=' ul = [ e[length(su):] for e in caps if e.startswith(su) ] if len(ul) > 1: print >>sys.stderr, "warning: virtualisation" " system offers several suggested-normal-user" " values: "+('/'.join(ul))+", using "+ul[0] if ul: opts.user = ul[0] else: opts.user = '' if opts.user: if 'root-on-testbed' not in caps: print >>sys.stderr, "warning: virtualisation" " system does not offer root on testbed," " but --user option specified: failure likely" opts.user_wrap = lambda x: 'su %s -c "%s"' % (opts.user, x) else: opts.user_wrap = lambda x: x if opts.gainroot is None: opts.gainroot = '' if opts.user or 'root-on-testbed' not in testbed.caps: opts.gainroot = 'fakeroot' if opts.gnupghome.startswith('~/'): try: home = os.environ['HOME'] except KeyError: parser.error("HOME environment variable" " not set, needed for --gnupghome=`%s" % opts.gnupghome) opts.gnupghome = home + opts.gnupghome[1:] elif opts.gnupghome == 'fresh': opts.gnupghome = None #---------- testbed management - the Testbed class class Testbed: def __init__(tb): tb.sp = None tb.lastsend = None tb.scratch = None tb.modified = False tb.blamed = [] def start(tb): p = subprocess.PIPE tb.sp = subprocess.Popen(opts.vserver, stdin=p, stdout=p, stderr=None) tb.expect('ok') tb.caps = tb.commandr('capabilities') def stop(tb): tb.close() if tb.sp is None: return ec = tb.sp.returncode if ec is None: tb.sp.stdout.close() tb.send('quit') tb.sp.stdin.close() ec = tb.sp.wait() if ec: tb.bomb('testbed gave exit status %d after quit' % ec) def open(tb): if tb.scratch is not None: return pl = tb.commandr('open') tb.scratch = OutputDir('tb-scratch', pl[0], True) def close(tb): if tb.scratch is None: return tb.scratch = None if tb.sp is None: return tb.command('close') def prepare(tb): if tb.modified and 'reset' in caps: tb.command('reset') tb.blamed = [] tb.modified = False binaries.publish(act) def needs_reset(tb): tb.modified = True def blame(tb, m): tb.blamed.append(m) def bomb(tb, m): if tb.sp is not None: tb.sp.stdout.close() tb.sp.stdin.close() ec = tb.sp.wait() if ec: print >>sys.stderr, ('adt-run: testbed failing,' ' exit status %d' % ec) tb.sp = None raise Quit(16, 'testbed failed: %s' % m) def send(tb, string): tb.sp.stdin try: debug('>> '+string) print >>tb.sp.stdin, string tb.sp.stdin.flush() tb.lastsend = string except: (type, value, dummy) = sys.exc_info() tb.bomb('cannot send to testbed: %s' % traceback. format_exception_only(type, value)) def expect(tb, keyword, nresults=-1): l = tb.sp.stdout.readline() if not l: tb.bomb('unexpected eof from the testbed') if not l.endswith('\n'): tb.bomb('unterminated line from the testbed') l = l.rstrip('\n') debug('<< '+l) ll = l.split() if not ll: tb.bomb('unexpected whitespace-only line from the testbed') if ll[0] != keyword: if tb.lastsend is None: tb.bomb("got banner `%s', expected `%s...'" % (l, keyword)) else: tb.bomb("sent `%s', got `%s', expected `%s...'" % (tb.lastsend, l, keyword)) ll = ll[1:] if nresults >= 0 and len(ll) != nresults: tb.bomb("sent `%s', got `%s' (%d result parameters)," " expected %d result parameters" % (string, l, len(ll), nresults)) return ll def commandr(tb, cmd, nresults, args=()): if type(cmd) is str: cmd = [cmd] al = cmd + map(urllib.quote, args) tb.send(string.join(al)) ll = tb.expect('ok', nresults) rl = map(urllib.unquote, ll) return rl def command(tb, cmd, args=()): tb.commandr(cmd, 0, args) def commandr1(tb, cmd, args=()): rl = tb.commandr(cmd, 1, args) return rl[0] def execute(tb, what, cmdargs, si='/dev/null', so='/dev/null', se=None, cwd=None) if cwd is None: cwd = tb.tbscratch.write(True) se_use = se if se_use is None: se_use = TemporaryFile('xerr-'+what).write(True) rc = tb.commandr1(['execute', ','.join(map(urllib.quote, cmdargs))], si, so, se_use, cwd) try: rc = int(rc) except ValueError: bomb("execute for %s gave invalid response `%s'" % (what,rc)) if se is not None: return rc return (rc, file(se.read()).read()) #---------- representation of test control files: Field*, Test, etc. class FieldBase: def __init__(f, fname, stz, base, tnames, vl): assert(vl) f.stz = stz f.base = base f.tnames = tnames f.vl = vl def words(f): def distribute(vle): (lno, v) = vle r = v.split() r = map((lambda w: (lno, w)), r) return r return flatten(map(distribute, f.vl)) def atmostone(f): if len(vl) == 1: (f.lno, f.v) = vl[0] else: raise Unsupported(f.vl[1][0], 'only one %s field allowed' % fn) return f.v class FieldIgnore(FieldBase): def parse(f): pass class Restriction: def __init__(r,rname,base): pass class Restriction_rw_tests_tree(Restriction): pass class Restriction_breaks_testbed(Restriction): if 'reset' not in caps: raise Unsupported(f.lno, 'Test breaks testbed but testbed cannot reset') class Field_Restrictions(FieldBase): def parse(f): for wle in f.words(): (lno, rname) = wle rname = rname.replace('-','_') try: rclass = globals()['Restriction_'+rname] except KeyError: raise Unsupported(lno, 'unknown restriction %s' % rname) r = rclass(rname, f.base) f.base['restrictions'].append(r) class Field_Tests(FieldIgnore): pass class Field_Tests_directory(FieldBase): def parse(f): td = atmostone(f) if td.startswith('/'): raise Unspported(f.lno, 'Tests-Directory may not be absolute') base['testsdir'] = td def run_tests(stanzas): global errorcode for stanza in stanzas: tests = stanza[' tests'] if not tests: report('*', 'SKIP no tests in this package') errorcode |= 8 for t in tests: testbed.prepare() t.run() if 'breaks-testbed' in t.restrictions: testbed.needs_reset() testbed.needs_reset() class Test: def __init__(t, tname, base, act_what): if '/' in tname: raise Unsupported(base[' lno'], 'test name may not contain / character') for k in base: setattr(t,k,base[k]) t.tname = tname t.what = act_what+'-'+tname if len(base['testsdir']): tpath = base['testsdir'] + '/' + tname else: tpath = tname t.af = opts.tests_tree.subpath('test-'+tname, tpath, InputFile) def report(t, m): report(t.what, m) def reportfail(t, m): global errorcode errorcode |= 4 report(t.what, 'FAIL ' + m) def run(t): def stdouterr(oe): idstr = oe + '-' + t.what if opts.output_dir is not None and opts.output_dir.tb: use_dir = opts.output_dir else: use_dir = testbed.scratch return use_dir.subpath(idstr, idstr, OutputFile) so = stdouterr('stdout') se = stdouterr('stderr') rc = testbed.execute('test-'+t.what, [opts.user_wrap(t.af.read(True))], so=so, se=se, cwd=opts.tests_tree.write(True)) stab = os.stat(se.read()) if stab.st_size != 0: l = file(se.read()).readline() l = l.rstrip('\n \t\r') if len(l) > 40: l = l[:40] + '...' t.reportfail('status: %d, stderr: %s' % (rc, l)) elif rc != 0: t.reportfail('non-zero exit status %d' % rc) else: t.report('PASS') def read_control(act, tree, control_override): stanzas = [ ] if control_override is not None: control_af = control_override testbed.blame('arg:'+control_override.spec) else: control_af = tree.subpath(act.what+'-testcontrol', 'debian/tests/control', InputFile) try: control = file(control_af.read(), 'r') except IOError, oe: if oe[0] != errno.ENOENT: raise return [] lno = 0 def badctrl(m): act.bomb('tests/control line %d: %s' % (lno, m)) stz = None # stz[field_name][index] = (lno, value) # special field names: # stz[' lno'] = number # stz[' tests'] = list of Test objects def end_stanza(stz): if stz is None: return stz[' errs'] = 0 stanzas.append(stz) stz = None hcurrent = None initre = regexp.compile('([A-Z][-0-9a-z]*)\s*\:\s*(.*)$') while 1: l = control.readline() if not l: break lno += 1 if not l.endswith('\n'): badctrl('unterminated line') if regexp.compile('\s*\#').match(l): continue if not regexp.compile('\S').match(l): end_stanza(stz); continue initmat = initre.match(l) if initmat: (fname, l) = initmat.groups() fname = string.capwords(fname) if stz is None: stz = { ' lno': lno, ' tests': [] } if not stz.has_key(fname): stz[fname] = [ ] hcurrent = stz[fname] elif regexp.compile('\s').match(l): if not hcurrent: badctrl('unexpected continuation') else: badctrl('syntax error') hcurrent.append((lno, l)) end_stanza(stz) def testbadctrl(stz, lno, m): report_badctrl(lno, m) stz[' errs'] += 1 for stz in stanzas: try: try: tnames = stz['Tests'] except KeyError: tnames = ['*'] raise Unsupported(stz[' lno'], 'no Tests field') tnames = map((lambda lt: lt[1]), tnames) tnames = string.join(tnames).split() base = { 'restrictions': [], 'testsdir': 'debian/tests' } for fname in stz.keys(): if fname.startswith(' '): continue vl = stz[fname] try: fclass = globals()['Field_'+ fname.replace('-','_')] except KeyError: raise Unsupported(vl[0][0], 'unknown metadata field %s' % fname) f = fclass(stz, fname, base, tnames, vl) f.parse() for tname in tnames: t = Test(tname, base, act.what) stz[' tests'].append(t) except Unsupported, u: for tname in tnames: u.report(tname) continue return stanzas def print_exception(ei, msgprefix=''): if msgprefix: print >>sys.stderr, msgprefix (et, q, tb) = ei if et is Quit: print >>sys.stderr, 'adt-run:', q.m return q.ec else: print >>sys.stderr, "adt-run: unexpected, exceptional, error:" traceback.print_exc() return 20 def cleanup(): try: rm_ec = 0 if tmpdir is not None: shutil.rmtree(tmpdir) if testbed is not None: testbed.stop() if rm_ec: bomb('rm -rf -- %s failed, code %d' % (tmpdir, ec)) except: print_exception(sys.exc_info(), '\nadt-run: error cleaning up:\n') os._exit(20) #---------- registration, installation etc. of .deb's: Binaries def determine_package(act): cmd = 'dpkg-deb --info --'.split(' ')+[act.af.read(),'control'] running = Popen(cmd, stdout=PIPE) output = running.communicate()[0] rc = running.wait() if rc: badpkg('failed to parse binary package, code %d' % rc) re = regexp.compile('^\s*Package\s*:\s*([0-9a-z][-+.0-9a-z]*)\s*$') act.pkg = None for l in '\n'.split(output): m = re.match(output) if not m: continue if act.pkg: badpkg('two Package: lines in control file') act.pkg = m.groups if not act.pkg: badpkg('no good Package: line in control file') class Binaries: def __init__(b): b.dir = TemporaryDir('binaries') if opts.gnupghome is None: opts.gnupghome = tmpdir+'/gnupg' try: for x in ['pubring','secring']: os.stat(opts.gnupghome + '/' + x + '.gpg') except IOError, oe: if oe.errno != errno.ENOENT: raise try: os.mkdir(opts.gnupghome, 0700) except IOError, oe: if oe.errno != errno.EEXIST: raise script = ' cd "$1" exec >key-gen-log 2>&1 cat <<"END" >key-gen-params Key-Type: DSA Key-Length: 1024 Key-Usage: sign Name-Real: autopkgtest per-run key Name-Comment: do not trust this key Name-Email: autopkgtest@example.com END set -x gpg --homedir="$1" --batch --gen-key key-gen-params ' cmdl = ['sh','-ec',script,'x',opts.gnupghome] rc = subprocess.call(cmdl) if rc: try: f = open(opts.gnupghome+'/key-gen-log') tp = file.read() except IOError, e: tp = e print >>sys.stderr, tp bomb('key generation failed, code %d' % rc) def reset(b): shutil.rmtree(b.dir.read()) b.dir.write() b.install = [] b.blamed = [] def register(b, act, pkg, af, forwhat, blamed): if act.ah['deb_'+forwhat] == 'ignore': return b.blamed += testbed.blamed leafname = pkg+'.deb' dest = b.dir.subpath('binaries--'+leafname, leafname, OutputFile) try: os.remove(dest.write()) except IOError, oe: if oe.errno != errno.ENOENT: raise e try: os.link(af.read(), dest.write()) except IOError, oe: if oe.errno != errno.EXDEV: raise e shutil.copy(af.read(), dest) if act.ah['deb_'+forwhat] == 'install': b.install.append(pkg) def publish(b): script = ' cd "$1" apt-ftparchive packages . >Packages gzip -f Packages apt-ftparchive release . >Release gpg --homedir="$2" --batch --detach-sign --armour -o Release.gpg Release gpg --homedir="$2" --batch --export >archive-key.pgp ' cmdl = ['sh','-ec',script,'x',b.dir.write(),opts.gnupghome] rc = subprocess.call(cmd) if rc: bomb('apt-ftparchive or signature failed, code %d' % rc) b.dir.invalidate(True) apt_source = b.dir.read(True) script = ' apt-key add archive-key.pgp echo "deb file:///'+apt_source+'/ /" >/etc/apt/sources.list.d/autopkgtest ' (rc,se) = testbed.execute('aptkey-'+what, ['sh','-ec','script'], b.dir) if rc: bomb('apt setup failed with exit code %d' % rc, se) testbed.blamed += b.blamed for pkg in b.install: testbed.blame(pkg) (rc,se) = testbed.execute('install-%s'+act.what, ['apt-get','-qy','install',pkg]) if rc: badpkg("installation of %s failed, exit code %d" % (pkg, rc), se) #---------- processing of sources (building) def source_rules_command(act,script,which,work,results_lines=0): script = "exec 3>&1 >&2\n" + '\n'.join(script) so = TemporaryFile('%s-%s-results' % (what,which)) se = TemporaryFile('%s-%s-log' & (what,which)) rc = testbed.execute('%s-%s' % (what,which), ['sh','-xec',script], so=so, se=se, cwd= work.write(True)) results = file(so.read()).read().split("\n") if rc: badpkg_se("%s failed with exit code %d" % (which,rc), se) if results_lines is not None and len(results) != results_lines: badpkg_se("got %d lines of results from %s where %d expected" % (len(results), which, results_lines), se) if results_lines==1: return results[0] return results def build_source(act): act.blame = 'arg:'+act.af.spec() testbed.blame(act.blame) testbed.needs_reset() what = act.ah['what'] dsc = act.af basename = dsc.spec dsc_what = what+'/'+basename dsc_file = open(dsc.read()) in_files = False fre = regexp.compile('^\s+[0-9a-f]+\s+\d+\s+([^/.][^/]*)$') for l in dsc_file(): if l.startswith('Files:'): in_files = True elif l.startswith('#'): pass elif not l.startswith(' '): in_files = False if l.startswith('Source:'): act.blame = 'dsc:'+l[7:].strip() testbed.blame(act.blame) elif not in_files: pass m = re.match(l) if not m: badpkg(".dsc contains unparseable line" " in Files: `%s'" % (`dsc`,l)) subfile = dsc.sibling( dsc_what+'/'+m.groups(0), m.groups(0), InputFile) subfile.read(True) dsc.read(True) work = TemporaryDir(what+'-build') script = [ 'cd '+work.write(True), 'gdebi '+dsc.read(True), 'dpkg-source -x '+dsc.read(True), 'cd */.', 'pwd >&3', opts.user_wrap('debian/rules build'), ] result_pwd = source_rules_command(act,script,what,'build',work,1) if os.path.dirname(result_pwd) != work.read(True): badpkg_se("results dir `%s' is not in expected parent dir `%s'" % (results[0], work.read(True)), se) act.tests_tree = InputDir(dsc_what+'tests-tree', work.read(True)+os.path.basename(results[0]), InputDir) if act.ah['dsc_tests']: act.tests_tree.read() act.tests_tree.invalidate(True) act.blamed = testbed.blamed.copy() act.binaries = [] if act.ah['dsc_filter'] != '_': script = [ 'cd '+work.write(True)+'/*/.', opts.user_wrap(opts.gainroot+' debian/rules binary'), 'cd ..', 'echo *.deb >&3', ] result_debs = source_rules_command(act,script,what, 'debian/rules binary',work,1) if result_debs == '*': debs = [] else: debs = debs.split(' ') re = regexp.compile('^([-+.0-9a-z]+)_[^_/]+(?:_[^_/]+)\.deb$') for deb in debs: m = re.match(deb) if not m: badpkg("badly-named binary `%s'" % deb) pkg = m.groups(0) for pat in act.ah['dsc_filter'].split(','): if fnmatch.fnmatchcase(pkg,pat): deb_af = work.read()+'/'+deb deb_what = pkg+'_'+what+'.deb' bin = InputFile(deb_what,deb_af,True) bin.preserve_now() binaries.register(act,pkg,bin,'builds', testbed.blamed) act.binaries.subpath((pkg,bin)) break #---------- main processing loop and main program def process_actions(): global binaries binaries = Binaries() b.reset() for act in opts.actions: testbed.prepare() if act.kind == 'deb': blame('arg:'+act.af.spec) determine_package(act) blame('deb:'+act.pkg) binaries.register(act,act.pkg,act.af,'builds', testbed.blamed) if act.kind == 'dsc': build_source(act) b.reset() control_override = None for act in opts.actions: testbed.prepare() if act.kind == 'control': control_override = act.af if act.kind == 'deb': binaries.register(act,act.pkg,act.af,'tests', ['deb:'+act.pkg]) if act.kind == 'dsc': for (pkg,bin) in act.binaries: binaries.register(act,pkg,bin,'tests', act.blamed) if act.ah['dsc_tests']: stanzas = read_control(act, act.tests_tree, control_override) testbed.blamed += act.blamed run_tests(act, stanzas) control_override = None if act.kind == 'tree': testbed.blame('arg:'+act.af.spec) stanzas = read_control(act, act.af, control_override) run_tests(act, stanzas) control_override = None def main(): global testbed global tmpdir try: parse_args() except SystemExit, se: os._exit(20) try: tmpdir = tempfile.mkdtemp() testbed = Testbed() testbed.start() finalise_options() process_actions() except: ec = print_exception(sys.exc_info(), '') cleanup() os._exit(ec) cleanup() os._exit(errorcode) main()