import os
import errno
import fnmatch
+import shutil
from optparse import OptionParser
+signal.signal(signal.SIGINT, signal.SIG_DFL) # undo stupid Python SIGINT thing
-tmpdir = None
-testbed = None
-errorcode = 0
+#---------- global variables
-signal.signal(signal.SIGINT, signal.SIG_DFL) # undo stupid Python SIGINT thing
+tmpdir = None # pathstring on host
+testbed = None # Testbed
+errorcode = 0 # exit status that we are going to use
+binaries = None # Binaries (.debs we have registered)
+
+#---------- errors we define
class Quit:
def __init__(q,ec,m): q.ec = ec; q.m = m
def bomb(m): raise Quit(20, "unexpected error: %s" % m)
-def badpkg(m): raise Quit(12, "erroneous package: %s" % m)
+def badpkg(m):
+ print 'blame: ', ' '.join(testbed.blamed)
+ raise Quit(12, "erroneous package: %s" % m)
+
def report(tname, result): print '%-20s %s' % (tname, result)
class Unsupported:
def flatten(l):
return reduce((lambda a,b: a + b), l, [])
+#---------- fancy automatic file-copying class
+
class Path:
# p.path[tb] None or path not None => path known
# p.file[tb] None or path not None => file exists
# p.what
+ # p.spec
# p.spec_tb
# p.tb_scratch
if p.local is not None:
if p.lpath is not None: assert(p.local == p.lpath)
return p.local
- testbed.open()
if p.xfmap is None:
p.local = p.lpath
return p.onhost()
def ontb(p):
- testbed.open()
if p.tbscratch is not None:
if p.tbscratch != testbed.scratch:
p.xfmapcopy('copydown')
return p.down
-def Action:
+#---------- parsing and representation of the arguments
+
+class Action:
def __init__(a, kind, path, arghandling, ix):
+ # extra attributes get added during processing
a.kind = kind
- a.path = path
+ a.path = path # just a string
a.ah = arghandling
- a.what = '%s%s' % (kind,ix); ix++
+ a.what = '%s%s' % (kind,ix)
def parse_args():
global opts
pa('','--fakeroot', type='string', dest='fakeroot',
help='prefix debian/rules build with FAKEROOT')
pa('-d', '--debug', action='store_true', dest='debug');
+ pa('','--gnupg-home', type='string', dest='gnupghome',
+ default='~/.autopkgtest/gpg',
+ help='use GNUPGHOME rather than ~/.autopkgtest (for
+ " signing private apt archive);"
+ " `fresh' means generate new key each time.")
#----------
# actual meat:
else:
error "unknown action in list `%s' having"
"type `%s' % (act, type(act))
- (path, kindpath) = act
+ (pathname, kindpath) = act
if type(kindpath) is tuple: kind = kindpath[0]
elif kindpath.endswith('/'): kind = 'tree'
else: parser.error("do not know how to handle filename \`%s';"
" specify --source --binary or --build-tree")
+ path = InputPath(pathname, arghandling['tb'])
+
opts.actions.append(Action(kind, path, arghandling, ix))
ix++
'root-on-testbed' not in testbed.caps:
opts.fakeroot = 'fakeroot'
-logpath_counters = {}
+ if opts.gnupghome.startswith('~/'):
+ try: home = os.environ['HOME']
+ except KeyError:
+ parser.error("HOME environment variable"
+ " not set, needed for --gnupghome=`%s"
+ % opts.gnupghome)
+ opts.gnupghome = home + opts.gnupghome[1:]
+ elif opts.gnupghome == 'fresh':
+ opts.gnupghome = None
-def logpath(idstr):
- # if idstr ends with `-' then a counter is appended
- if idstr.endswith('-'):
- if not logpath_counters.has_key(idstr):
- logpath_counters[idstr] = 1
- else:
- logpath_counters[idstr] += 1
- idstr.append(`logpath_counters[idstr]`)
- idstr = 'log-' + idstr
- if opts.output_dir is None:
- return testbed.scratch.append(idstr, idstr)
- elif opts.output_dir.tb:
- return opts.output_dir.append(idstr, idstr)
- else:
- return Path(True, testbed.scratch.p, idstr,
- lpath=opts.output_dir.p+'/'+idstr)
+#---------- testbed management - the Testbed class
class Testbed:
def __init__(tb):
tb.sp = None
tb.lastsend = None
tb.scratch = None
+ tb.modified = False
+ tb.blamed = []
def start(tb):
p = subprocess.PIPE
tb.sp = subprocess.Popen(opts.vserver,
tb.scratch = None
if tb.sp is None: return
tb.command('close')
+ def prepare(tb):
+ if tb.modified and 'reset' in caps:
+ tb.command('reset')
+ tb.blamed = []
+ tb.modified = False
+ binaries.publish(act)
+ def needs_reset(tb):
+ tb.modified = True
+ def blame(tb, m):
+ tb.blamed.append(m)
def bomb(tb, m):
if tb.sp is not None:
tb.sp.stdout.close()
rl = tb.commandr(cmd, 1, args)
return rl[0]
+#---------- representation of test control files: Field*, Test, etc.
+
class FieldBase:
def __init__(f, fname, stz, base, tnames, vl):
assert(vl)
'only one %s field allowed' % fn)
return f.v
-
-def acquire_built_source():
- global opts
-
- if opts.build_source:
- assert(opts.tests_tree is None)
- bss = build_some_source('t', opts.build_source)
- opts.tests_tree = bss[0]
-
class FieldIgnore(FieldBase):
def parse(f): pass
def __init__(r,rname,base): pass
class Restriction_rw_tests_tree(Restriction): pass
+class Restriction_breaks_testbed(Restriction):
+ if 'reset' not in caps:
+ raise Unsupported(f.lno,
+ 'Test breaks testbed but testbed cannot reset')
class Field_Restrictions(FieldBase):
def parse(f):
'Tests-Directory may not be absolute')
base['testsdir'] = td
-def run_tests():
- for t in tests:
- t.run()
- if not tests:
- global errorcode
- report('*', 'SKIP no tests in this package')
- errorcode |= 8
+def run_tests(stanzas):
+ global errorcode
+ for stanza in stanzas:
+ tests = stanza[' tests']
+ if not tests:
+ report('*', 'SKIP no tests in this package')
+ errorcode |= 8
+ for t in tests:
+ testbed.prepare()
+ t.run()
+ if 'breaks-testbed' in t.restrictions:
+ testbed.needs_reset()
+ testbed.needs_reset()
class Test:
def __init__(t, tname, base):
errorcode |= 4
report(t.tname, 'FAIL ' + m)
def run(t):
- testbed.open()
def stdouterr(oe):
idstr = oe + '-' + t.tname
if opts.output_dir is not None and opts.output_dir.tb:
else:
t.report('PASS')
-def read_control():
- global tests
+def read_control(act, tree, control_override):
+ stanzas = [ ]
+
+ if control_override is not None:
+ control_path = control_override
+ testbed.blame('arg:'+control_path)
+ else:
+ control_path = tree.append('/debian/tests/control')
+ testbed.blame('arg:'+tree.spec)
+
try:
- control = file(opts.control.onhost(), 'r')
+ control = file(control_path.onhost(), 'r')
except IOError, oe:
if oe[0] != errno.ENOENT: raise
- tests = []
- return
- lno = 0
- def badctrl(m): testbed.badpkg('tests/control line %d: %s' % (lno, m))
- stz = None # stz[field_name][index] = (lno, value)
-
- stanzas = [ ]
- stz = None
+ return []
+ lno = 0
+ def badctrl(m): act.bomb('tests/control line %d: %s' % (lno, m))
+ stz = None # stz[field_name][index] = (lno, value)
+ # special field names:
+ # stz[' lno'] = number
+ # stz[' tests'] = list of Test objects
def end_stanza(stz):
if stz is None: return
stz[' errs'] = 0
(fname, l) = initmat.groups()
fname = string.capwords(fname)
if stz is None:
- stz = { ' lno': lno }
+ stz = { ' lno': lno, ' tests': [] }
if not stz.has_key(fname): stz[fname] = [ ]
hcurrent = stz[fname]
elif regexp.compile('\s').match(l):
'unknown metadata field %s' % fname)
f = fclass(stz, fname, base, tnames, vl)
f.parse()
- tests = []
for tname in tnames:
t = Test(tname, base)
- tests.append(t)
+ stz[' tests'].append(t)
except Unsupported, u:
for tname in tnames: u.report(tname)
continue
+ return stanzas
+
def print_exception(ei, msgprefix=''):
if msgprefix: print >>sys.stderr, msgprefix
(et, q, tb) = ei
try:
rm_ec = 0
if tmpdir is not None:
- rm_ec = subprocess.call(['rm','-rf','--',tmpdir])
+ shutil.rmtree(tmpdir)
if testbed is not None:
testbed.stop()
if rm_ec: bomb('rm -rf -- %s failed, code %d' % (tmpdir, ec))
'\nadt-run: error cleaning up:\n')
os._exit(20)
+#---------- registration, installation etc. of .deb's: Binaries
+
+def determine_package(act):
+ cmd = 'dpkg-deb --info --'.split(' ')+[act.path.read(),'control']
+ running = Popen(cmd, stdout=PIPE)
+ output = running.communicate()[0]
+ rc = running.wait()
+ if rc: badpkg('failed to parse binary package, code %d' % rc)
+ re = regexp.compile('^\s*Package\s*:\s*([0-9a-z][-+.0-9a-z]*)\s*$')
+ act.pkg = None
+ for l in '\n'.split(output):
+ m = re.match(output)
+ if not m: continue
+ if act.pkg: badpkg('two Package: lines in control file')
+ act.pkg = m.groups
+ if not act.pkg: badpkg('no good Package: line in control file')
+
+class Binaries:
+ def __init__(b):
+ b.dir = tmpdir+'/binaries'
+
+ if opts.gnupghome is None:
+ opts.gnupghome = tmpdir+'/gnupg'
+
+ try:
+ for x in ['pubring','secring']:
+ os.stat(opts.gnupghome + '/' + x + '.gpg')
+ except IOError, oe:
+ if oe.errno != errno.ENOENT: raise
+
+ try: os.mkdir(opts.gnupghome, 0700)
+ except IOError, oe: if oe.errno != errno.EEXIST: raise
+ script = '
+ cd "$1"
+ exec >key-gen-log 2>&1
+ cat <<"END" >key-gen-params
+Key-Type: DSA
+Key-Length: 1024
+Key-Usage: sign
+Name-Real: autopkgtest per-run key
+Name-Comment: do not trust this key
+Name-Email: autopkgtest@example.com
+END
+ set -x
+ gpg --homedir="$1" --batch --gen-key key-gen-params
+ '
+ cmdl = ['sh','-ec',script,'x',opts.gnupghome]
+ rc = subprocess.call(cmdl)
+ if rc:
+ try:
+ f = open(opts.gnupghome+'/key-gen-log')
+ tp = file.read()
+ except IOError, e: tp = e
+ print >>sys.stderr, tp
+ bomb('key generation failed, code %d' % rc)
+
+ def reset(b):
+ shutil.rmtree(b.dir)
+ os.mkdir(b.dir)
+ b.tbpath = testbed.scratch.append('/binaries')
+ b.install = []
+ b.blamed = []
+
+ def register(b, act, pkg, path, forwhat, blamed):
+ if act.ah['deb_'+forwhat] == 'ignore': return
+
+ b.blamed += testbed.blamed
+
+ here = path.read()
+ leafname = pkg+'.deb'
+ dest = b.dir+'/'+leafname
+
+ try: os.remove(dest)
+ except IOError, oe:
+ if oe.errno != errno.ENOENT: raise e
+
+ try: os.link(here, dest)
+ except IOError, oe:
+ if oe.errno != errno.EXDEV: raise e
+ shutil.copy(here, dest)
+
+ if act.ah['deb_'+forwhat] == 'install':
+ b.install.append(pkg)
+
+ def publish(b):
+ script = '
+ cd "$1"
+ apt-ftparchive packages . >Packages
+ gzip -f Packages
+ apt-ftparchive release . >Release
+ gpg --homedir="$2" --batch --detach-sign --armour -o Release.gpg Release
+ gpg --homedir="$2" --batch --export >archive-key.pgp
+ '
+ cmdl = ['sh','-ec',script,'x',b.dir,opts.gnupghome]
+ rc = subprocess.call(cmd)
+ if rc: bomb('apt-ftparchive or signature failed, code %d' % rc)
+
+ tbp = b.tbpath.write(True)
+ testbed.command('copydown', (b.dir+'/', tbp+'/'))
+
+ se = TemporaryPath('%s-aptkey-stderr' % act.what)
+ script = '
+ apt-key add archive-key.pgp
+ echo "deb file:///'+tbp+'/ /" >/etc/apt/sources.list.d/autopkgtest
+ '
+ rc = testbed.commandr1(['execute',
+ ','.join(map(urllib.quote, ['sh','-ec','script']))],
+ '/dev/null', '/dev/null', se.write(True), tbp)
+ if rc: bomb('apt setup failed with exit code %d' % rc, se)
+
+ testbed.blamed += b.blamed
+
+ for pkg in b.install:
+ testbed.blame(pkg)
+ se = TemporaryPath('%s-install-%s-stderr' % (act.what,pkg))
+ rc = testbed.commandr1('execute','apt-get,-qy,install,'+pkg,
+ '/dev/null','/dev/null',se.ontb(),
+ tb.scratch.read(True))
+ if rc:
+ badpkg("installation of %s failed, exit code %d"
+ % (pkg, rc), se)
+
+#---------- processing of sources (building)
+
def source_rules_command(act,script,which,work,results_lines=0):
script = "exec 3>&1 >&2\n" + '\n'.join(script)
so = TemporaryPath('%s-%s-results' % (what,which))
'/dev/null', so.write(True), se.write(True), work.write(True))
results = file(so.read()).read().split("\n")
if rc:
- act.bomb("%s failed with exit code %d" % (which,rc), se)
+ badpkg_se("%s failed with exit code %d" % (which,rc), se)
if results_lines is not None and len(results) != results_lines:
- act.bomb("got %d lines of results from %s where %d expected"
+ badpkg_se("got %d lines of results from %s where %d expected"
% (len(results), which, results_lines), se)
if results_lines==1: return results[0]
return results
-def build_source(act,ah):
- prepare_testbed_for_action()
+def build_source(act):
+ act.blame = 'arg:'+act.path.spec()
+ testbed.blame(act.blame)
+ testbed.needs_reset()
what = act.ah['what']
- dsc_what = what+'/'+os.path.basename(act.path)
- dsc = InputPath(dsc_what, act.path, arghandling['tb'])
-
- if not dsc.spec_tb:
- dsc_file = open(dsc.read())
- in_files = False
- re = regexp.compile('^\s+[0-9a-f]+\s+\d+\s+([^/.][^/]*)$')
- for l in dsc_file():
- if l.startswith('Files:'): in_files = True
- elif l.startswith('#'): pass
- elif not l.startswith(' '): in_files = False
- elif not in_files: pass
-
- m = re.match(l)
- if not m: act.bomb(".dsc contains unparseable line"
- " in Files: `%s'" % (`dsc`,l))
-
- subfile = dsc.enclosingdir().append('/'+m.groups(0))
- subfile.ensure_file(True)
- dsc.ensure_file(True)
+ dsc = act.path
+ basename = dsc.spec; if basename is None: basename = 'source.dsc'
+ dsc_what = what+'/'+basename
+
+ dsc_file = open(dsc.read())
+ in_files = False
+ fre = regexp.compile('^\s+[0-9a-f]+\s+\d+\s+([^/.][^/]*)$')
+ for l in dsc_file():
+ if l.startswith('Files:'): in_files = True
+ elif l.startswith('#'): pass
+ elif not l.startswith(' '):
+ in_files = False
+ if l.startswith('Source:'):
+ act.blame = 'dsc:'+l[7:].strip()
+ testbed.blame(act.blame)
+ elif not in_files: pass
+ if not dsc.spec_tb: continue
+ m = re.match(l)
+ if not m: badpkg(".dsc contains unparseable line"
+ " in Files: `%s'" % (`dsc`,l))
+ subfile = dsc.enclosingdir().append('/'+m.groups(0))
+ subfile.read(True)
+ dsc.read(True)
work = AccumulationPath(what+'/build', dir=True)
result_pwd = source_rules_command(act,script,what,'build',work,1)
if os.path.dirname(result_pwd) != work.read(True):
- act.bomb("results dir `%s' is not in expected parent dir `%s'"
+ badpkg_se("results dir `%s' is not in expected parent dir `%s'"
% (results[0], work.read(True)), se)
- ah.tests_tree = work.append('/'+os.path.basename(results[0]))
- if ah['dsc_tests']:
- ah.tests_tree.preserve_now()
+ act.tests_tree = work.append('/'+os.path.basename(results[0]))
+ if act.ah['dsc_tests']:
+ act.tests_tree.preserve_now()
+
+ act.blamed = testbed.blamed.copy()
- ah.binaries = []
- if ah['dsc_filter'] != '_':
+ act.binaries = []
+ if act.ah['dsc_filter'] != '_':
script = [
'cd '+work.write(True)+'/*/.',
opts.user_wrap(opts.fakeroot+' debian/rules binary'),
re = regexp.compile('^([-+.0-9a-z]+)_[^_/]+(?:_[^_/]+)\.deb$')
for deb in debs:
m = re.match(deb)
- if not m: act.bomb("badly-named binary `%s'" % deb, se)
- package = m.groups(deb)
- for pat in ah['dsc_filter'].split(','):
- if fnmatch.fnmatchcase(package,pat):
+ if not m: badpkg("badly-named binary `%s'" % deb)
+ pkg = m.groups(0)
+ for pat in act.ah['dsc_filter'].split(','):
+ if fnmatch.fnmatchcase(pkg,pat):
deb_path = work.read()+'/'+deb
- deb_what = package+'_'+what+'.deb'
+ deb_what = pkg+'_'+what+'.deb'
bin = InputPath(deb_what,deb_path,True)
bin.preserve_now()
- record_binary(bin,'builds')
- ah.binaries.append(bin)
+ binaries.register(act,pkg,bin,'builds',
+ testbed.blamed)
+ act.binaries.append((pkg,bin))
break
-def record_binary(
+#---------- main processing loop and main program
def process_actions():
global binaries
+ binaries = Binaries()
- binaries = {}
- ix = 0
- for act opts.actions:
+ b.reset()
+ for act in opts.actions:
+ testbed.prepare()
if act.kind == 'deb':
- record_binary(act,'builds')
+ blame('arg:'+path.spec)
+ determine_package(act)
+ blame('deb:'+act.pkg)
+ binaries.register(act,act.pkg,act.path,'builds',
+ testbed.blamed)
if act.kind == 'dsc':
build_source(act)
- # build_source records tree location in ah
- binaries = {}
+ b.reset()
control_override = None
- for (kind,path,ah) in opts.actions:
- if kind == 'control':
- control_override = act
- if kind == 'deb':
- record_binary(act,'tests')
- if kind == 'dsc':
- for bin in act.binaries: record_binary(bin,'tests')
+ for act in opts.actions:
+ testbed.prepare()
+ if act.kind == 'control':
+ control_override = act.path
+ if act.kind == 'deb':
+ binaries.register(act,act.pkg,act.path,'tests',
+ ['deb:'+act.pkg])
+ if act.kind == 'dsc':
+ for (pkg,bin) in act.binaries:
+ binaries.register(act,pkg,bin,'tests',
+ act.blamed)
if not act.ah['dsc_tests']: continue
- run_tests(act,control_override)
+ stanzas = read_control(act, act.tests_tree,
+ control_override)
+ testbed.blamed += act.blamed
+ run_tests(act, stanzas)
control_override = None
- if kind == 'tree':
- run_tests(act,control_override)
+ if act.kind == 'tree':
+ testbed.blame('arg:'+act.path.spec)
+ stanzas = read_control(act, act.path,
+ control_override)
+ run_tests(act, stanzas)
control_override = None
def main():
tmpdir = tempfile.mkdtemp()
testbed = Testbed()
testbed.start()
- testbed.open()
finalise_options()
process_actions()
except: