import errno
import fnmatch
import shutil
+import copy
from optparse import OptionParser
signal.signal(signal.SIGINT, signal.SIG_DFL) # undo stupid Python SIGINT thing
for l in m.rstrip('\n').split('\n'):
print >>sys.stderr, 'atd-run: debug:', l
+def mkdir_okexist(pathname, mode=02755):
+ try:
+ os.mkdir(pathname, mode)
+ except OSError, oe:
+ if oe.errno != errno.EEXIST: raise
+
def rmtree(what, pathname):
debug('/ %s rmtree %s' % (what, pathname))
shutil.rmtree(pathname)
def __str__(p):
def ptbp(tbp):
- if p.path[tbp] is None: out = '-'
- elif p.file[tbp] is None: out = '?'+p.path[tbp]
- else: out = '!'+p.path[tbp]
- out += p.dir
- return out
+ if p.path[tbp] is None: return '-'+p.dir
+ elif p.file[tbp] is None: return p.path[tbp]+p.dir+'?'
+ else: return p.path[tbp]+p.dir+'!'
out = p.what
+ if p.spec is not None:
+ if p.spec_tbp: out += '#'
+ else: out += '='
+ out += p.spec
+ out += ':'
out += ptbp(False)
out += '|'
out += ptbp(True)
- if p.spec is not None:
- if p.spec_tbp: out += '>'
- else: out += '<'
- out += p.spec
return out
def _wrong(p, how):
if p.dir and not p.file[tbp]:
if not tbp:
p._debug('mkdir H')
- try: os.mkdir(p.path[tbp])
- except OSError, oe:
- if oe.errno != errno.EEXIST: raise
+ mkdir_okexist(p.path[tbp])
else:
cmdl = ['sh','-ec',
- 'test -d "$1" || mkdir "$1"',
+ 'test -d "$1" || mkdir -p "$1"',
'x', p.path[tbp]]
tf_what = urllib.quote(p.what).replace('/','%2F')
(rc,se) = testbed.execute('mkdir-'+tf_what, cmdl)
testbed.command(cud, (src, dst))
p.file[tbp] = p.path[tbp]
- return p.file[tbp]
-
- def subpath(p, what, leaf, constructor):
- p._debug('subpath %s /%s %s...' % (what, leaf, `constructor`))
- if not p.dir: p._wrong("creating subpath of non-directory")
- return constructor(what, p.spec+p.dir+leaf, p.spec_tbp)
- def sibling(p, what, leaf, constructor):
- p._debug('sibling what=%s leaf=%s %s...' % (what, leaf, `constructor`))
- dir = os.path.dirname(p.spec)
- if dir: dir += '/'
- return constructor(what, dir+leaf, p.spec_tbp)
+ return p.file[tbp] + p.dir
def invalidate(p, tbp=False):
p.file[tbp] = None
bomb("directory `%s' specified for "
"non-directory %s" % (pf[tbp], p.what))
+ def _relative_init(p, what, parent, leaf, onlyon_tbp, setfiles, sibling):
+ AutoFile.__init__(p,what)
+ sh_on = ''; sh_sibl = ''
+ if onlyon_tbp is not None: sh_on = ' (on %s)' % ('HT'[onlyon_tbp])
+ if sibling: sh_sibl=' (sibling)'
+ parent._debug('using as base: %s: %s%s%s' %
+ (str(parent), leaf, sh_on, sh_sibl))
+ if not sibling and not parent.dir:
+ parent._wrong('asked for non-sibling relative path of non-dir')
+ if sibling: trim = os.path.dirname
+ else: trim = lambda x: x
+ for tbp in [False,True]:
+ if parent.path[tbp] is None: continue
+ trimmed = trim(parent.path[tbp])
+ if trimmed: trimmed += '/'
+ p.path[tbp] = trimmed + leaf
+ if setfiles and (onlyon_tbp is None or onlyon_tbp == tbp):
+ p.file[tbp] = p.path[tbp]
+
class InputFile(AutoFile):
def _init(p, what, spec, spec_tbp=False):
AutoFile.__init__(p, what)
p.spec = spec
p.spec_tbp = spec_tbp
- p.path[spec_tbp] = p.file[spec_tbp] = spec
+ p.path[spec_tbp] = spec
+ p.file[p.spec_tbp] = p.path[p.spec_tbp]
def __init__(p, what, spec, spec_tbp=False):
p._init(what,spec,spec_tbp)
p._constructed()
p.dir = '/'
p._constructed()
+class RelativeInputFile(AutoFile):
+ def __init__(p, what, parent, leaf, onlyon_tbp=None, sibling=False):
+ p._relative_init(what, parent, leaf, onlyon_tbp, True, sibling)
+ p._constructed()
+
+class RelativeOutputFile(AutoFile):
+ def __init__(p, what, parent, leaf, sibling=False):
+ p._relative_init(what, parent, leaf, None, False, sibling)
+ p._constructed()
+
class TemporaryFile(AutoFile):
def __init__(p, what):
AutoFile.__init__(p, what)
opts.actions.append(Action(kind, af, arghandling, what))
def finalise_options():
- global opts, tb
+ global opts, tb, tmpdir
+
+ if opts.tmpdir is not None:
+ rmtree('tmpdir(specified)',opts.tmpdir)
+ mkdir_okexist(opts.tmpdir, 0700)
+ tmpdir = opts.tmpdir
+ else:
+ assert(tmpdir is None)
+ tmpdir = tempfile.mkdtemp()
if opts.user is None and 'root-on-testbed' not in testbed.caps:
opts.user = ''
if opts.user is None:
su = 'suggested-normal-user='
ul = [
- e[length(su):]
+ e[len(su):]
for e in testbed.caps
if e.startswith(su)
]
print >>sys.stderr, ("warning: virtualisation"
" system does not offer root on testbed,"
" but --user option specified: failure likely")
- opts.user_wrap = lambda x: 'su %s -c "%s"' % (opts.user, x)
+ opts.user_wrap = lambda x: "su %s -c '%s'" % (opts.user, x)
else:
opts.user_wrap = lambda x: x
tb.scratch = None
tb.modified = False
tb.blamed = []
+ tb._ephemeral = []
+ tb._debug('init')
+ def _debug(tb, m):
+ debug('** '+m)
def start(tb):
+ tb._debug('start')
p = subprocess.PIPE
debug_subprocess('vserver', opts.vserver)
tb.sp = subprocess.Popen(opts.vserver,
stdin=p, stdout=p, stderr=None)
tb.expect('ok')
tb.caps = tb.commandr('capabilities')
+ tb._need_reset_apt = False
def stop(tb):
+ tb._debug('stop')
tb.close()
if tb.sp is None: return
ec = tb.sp.returncode
if ec:
tb.bomb('testbed gave exit status %d after quit' % ec)
def open(tb):
+ tb._debug('open, scratch=%s' % tb.scratch)
if tb.scratch is not None: return
pl = tb.commandr('open')
tb.scratch = InputDir('tb-scratch', pl[0], True)
+ tb.deps_processed = []
+ def mungeing_apt(tb):
+ if not 'revert' in tb.caps:
+ tb._need_reset_apt = True
+ def reset_apt(tb):
+ if not tb._need_reset_apt: return
+ what = 'aptget-update-reset'
+ cmdl = ['apt-get','-qy','update']
+ (rc,se) = tb.execute(what, cmdl)
+ if rc:
+ print >>sys.stderr, se, ("\n" "warning: failed to restore"
+ " testbed apt cache, exit code %d" % rc)
+ tb._need_reset_apt = False
def close(tb):
+ tb._debug('close, scratch=%s' % tb.scratch)
if tb.scratch is None: return
tb.scratch = None
if tb.sp is None: return
tb.command('close')
- def prepare(tb):
- if tb.modified and 'reset' in tb.caps:
- tb.command('reset')
+ def prepare(tb, deps_new):
+ tb._debug('prepare, modified=%s, deps_processed=%s, deps_new=%s' %
+ (tb.modified, tb.deps_processed, deps_new))
+ if 'revert' in tb.caps and (tb.modified or
+ [d for d in tb.deps_processed if d not in deps_new]):
+ tb._debug('reset **')
+ tb.command('revert')
tb.blamed = []
- tb.modified = False
+ for af in tb._ephemeral: af.invalidate(True)
binaries.publish()
+ tb.modified = False
+ tb._install_deps(deps_new)
+ def register_ephemeral(tb, af):
+ tb._ephemeral.append(af)
+ def _install_deps(tb, deps_new):
+ tb._debug(' installing dependencies '+`deps_new`)
+ tb.deps_processed = deps_new
+ if not deps_new: return
+ dstr = ', '.join(deps_new)
+ script = binaries.apt_pkg_gdebi_script(
+ dstr, [[
+ 'from GDebi.DebPackage import DebPackage',
+ 'd = DebPackage(cache)',
+ 'res = d.satisfyDependsStr(arg)',
+ ]])
+ cmdl = ['python','-c',script]
+ what = 'install-deps'
+ debug_subprocess(what, cmdl, script=script)
+ (rc,se) = testbed.execute(what, cmdl)
+ if rc: badpkg('dependency install failed, exit code %d' % rc, se)
def needs_reset(tb):
+ tb._debug('needs_reset, previously=%s' % tb.modified)
tb.modified = True
def blame(tb, m):
+ tb._debug('blame += %s' % m)
tb.blamed.append(m)
def bomb(tb, m):
+ tb._debug('bomb %s' % m)
if tb.sp is not None:
tb.sp.stdout.close()
tb.sp.stdin.close()
rl = tb.commandr(cmd, args, 1)
return rl[0]
def execute(tb, what, cmdargs,
- si='/dev/null', so='/dev/null', se=None, cwd=None):
+ si='/dev/null', so='/dev/null', se=None, cwd=None,
+ dump_fd=None):
if cwd is None: cwd = tb.scratch.write(True)
se_use = se
if se_use is None:
se_af = TemporaryFile('xerr-'+what)
se_use = se_af.write(True)
- rc = tb.commandr1('execute', [None,
- ','.join(map(urllib.quote, cmdargs)),
- si, so, se_use, cwd])
+ cmdl = [None,
+ ','.join(map(urllib.quote, cmdargs)),
+ si, so, se_use, cwd]
+ if dump_fd is not None: cmdl += ['debug=%d-%d' % (dump_fd,2)]
+ rc = tb.commandr1('execute', cmdl)
try: rc = int(rc)
except ValueError: bomb("execute for %s gave invalid response `%s'"
% (what,rc))
class Restriction:
def __init__(r,rname,base): pass
+class Restriction_rw_build_tree(Restriction): pass
class Restriction_rw_tests_tree(Restriction): pass
class Restriction_breaks_testbed(Restriction):
def __init__(r, rname, base):
- if 'reset' not in testbed.caps:
+ if 'revert' not in testbed.caps:
raise Unsupported(f.lno,
- 'Test breaks testbed but testbed cannot reset')
+ 'Test breaks testbed but testbed cannot revert')
class Field_Restrictions(FieldBase):
def parse(f):
class Field_Tests(FieldIgnore): pass
+class Field_Depends(FieldBase):
+ def parse(f):
+ dl = map(lambda x: x.strip(),
+ flatten(map(lambda x: x.split(','), f.vl)))
+ re = regexp.compile('[^-.+:~]')
+ for d in dl:
+ if re.search(d):
+ badpkg("Test Depends field contains dependency"
+ " `%s' with invalid characters" % d)
+ f.base['depends'] = dl
+
class Field_Tests_directory(FieldBase):
def parse(f):
td = atmostone(f)
if td.startswith('/'): raise Unspported(f.lno,
'Tests-Directory may not be absolute')
- base['testsdir'] = td
+ f.base['testsdir'] = td
-def run_tests(stanzas):
- global errorcode
+def run_tests(stanzas, tree):
+ global errorcode, testbed
for stanza in stanzas:
tests = stanza[' tests']
if not tests:
report('*', 'SKIP no tests in this package')
errorcode |= 8
for t in tests:
- testbed.prepare()
- t.run()
+ t.prepare()
+ t.run(tree)
if 'breaks-testbed' in t.restrictions:
testbed.needs_reset()
testbed.needs_reset()
class Test:
- def __init__(t, tname, base, act_what):
+ def __init__(t, tname, base, act):
if '/' in tname: raise Unsupported(base[' lno'],
'test name may not contain / character')
for k in base: setattr(t,k,base[k])
t.tname = tname
- t.what = act_what+'-'+tname
- if len(base['testsdir']): tpath = base['testsdir'] + '/' + tname
- else: tpath = tname
- t.af = opts.tests_tree.subpath('test-'+tname, tpath, InputFile)
+ t.act = act
+ t.what = act.what+'t-'+tname
+ if len(base['testsdir']): t.path = base['testsdir'] + '/' + tname
+ else: t.path = tname
+ t._debug('constructed; path=%s' % t.path)
+ t._debug(' .depends=%s' % t.depends)
+ def _debug(t, m):
+ debug('& %s: %s' % (t.what, m))
def report(t, m):
report(t.what, m)
def reportfail(t, m):
global errorcode
errorcode |= 4
report(t.what, 'FAIL ' + m)
- def run(t):
+ def prepare(t):
+ t._debug('preparing')
+ dn = []
+ for d in t.depends:
+ t._debug(' processing dependency '+d)
+ if not '*' in d:
+ t._debug(' literal dependency '+d)
+ dn.append(d)
+ else:
+ for (pkg,bin) in t.act.binaries:
+ d = d.replace('*',pkg)
+ t._debug(' synthesised dependency '+d)
+ dn.append(d)
+ testbed.prepare(dn)
+ def run(t, tree):
+ t._debug('running')
def stdouterr(oe):
- idstr = oe + '-' + t.what
+ idstr = t.what + '-' + oe
if opts.output_dir is not None and opts.output_dir.tb:
use_dir = opts.output_dir
else:
use_dir = testbed.scratch
- return use_dir.subpath(idstr, idstr, OutputFile)
+ return RelativeOutputFile(idstr, use_dir, idstr)
+
+ t.act.work.write(True)
+ af = RelativeInputFile(t.what, tree, t.path)
so = stdouterr('stdout')
se = stdouterr('stderr')
rc = testbed.execute('test-'+t.what,
- [opts.user_wrap(t.af.read(True))],
- so=so, se=se, cwd=opts.tests_tree.write(True))
+ [opts.user_wrap(af.read(True))],
+ so=so.write(True), se=se.write(True), cwd=tree.read(True))
stab = os.stat(se.read())
if stab.st_size != 0:
control_af = control_override
testbed.blame('arg:'+control_override.spec)
else:
- control_af = tree.subpath(act.what+'-testcontrol',
- 'debian/tests/control', InputFile)
-
+ control_af = RelativeInputFile(act.what+'-testcontrol',
+ tree, 'debian/tests/control')
try:
control = file(control_af.read(), 'r')
except OSError, oe:
tnames = string.join(tnames).split()
base = {
'restrictions': [],
- 'testsdir': 'debian/tests'
+ 'testsdir': 'debian/tests',
+ 'depends' : '*'
}
for fname in stz.keys():
if fname.startswith(' '): continue
f = fclass(stz, fname, base, tnames, vl)
f.parse()
for tname in tnames:
- t = Test(tname, base, act.what)
+ t = Test(tname, base, act)
stz[' tests'].append(t)
except Unsupported, u:
for tname in tnames: u.report(tname)
def cleanup():
try:
rm_ec = 0
- if tmpdir is not None:
+ if opts.tmpdir is None and tmpdir is not None:
rmtree('tmpdir', tmpdir)
if testbed is not None:
+ testbed.reset_apt()
testbed.stop()
if rm_ec: bomb('rm -rf -- %s failed, code %d' % (tmpdir, ec))
except:
def determine_package(act):
cmd = 'dpkg-deb --info --'.split(' ')+[act.af.read(),'control']
- running = Popen(cmd, stdout=PIPE)
+ running = subprocess.Popen(cmd, stdout=subprocess.PIPE)
output = running.communicate()[0]
rc = running.wait()
if rc: badpkg('failed to parse binary package, code %d' % rc)
re = regexp.compile('^\s*Package\s*:\s*([0-9a-z][-+.0-9a-z]*)\s*$')
act.pkg = None
- for l in '\n'.split(output):
- m = re.match(output)
+ for l in output.split('\n'):
+ m = re.match(l)
if not m: continue
if act.pkg: badpkg('two Package: lines in control file')
- act.pkg = m.groups
+ act.pkg = m.groups()[0]
if not act.pkg: badpkg('no good Package: line in control file')
class Binaries:
if opts.gnupghome is None:
opts.gnupghome = tmpdir+'/gnupg'
+ b._debug('initialising')
try:
for x in ['pubring','secring']:
os.stat(opts.gnupghome + '/' + x + '.gpg')
except OSError, oe:
if oe.errno != errno.ENOENT: raise
- if ok: debug('# no key generation needed')
+ if ok: b._debug('no key generation needed')
else: b.genkey()
+ def _debug(b, s):
+ debug('* '+s)
+
def genkey(b):
- try:
- os.mkdir(os.path.dirname(opts.gnupghome), 02755)
- os.mkdir(opts.gnupghome, 0700)
- except OSError, oe:
- if oe.errno != errno.EEXIST: raise
+ b._debug('preparing for key generation')
+
+ mkdir_okexist(os.path.dirname(opts.gnupghome), 02755)
+ mkdir_okexist(opts.gnupghome, 0700)
script = '''
cd "$1"
print >>sys.stderr, tp
bomb('key generation failed, code %d' % rc)
+ def apt_configs(b):
+ return {
+ "Dir::Etc::sourcelist": b.dir.read(True)+'sources.list',
+ }
+
+ def apt_pkg_gdebi_script(b, arg, middle):
+ script = [
+ 'import apt_pkg',
+ 'import urllib',
+ 'arg = urllib.unquote("%s")' % urllib.quote(arg),
+ ]
+ for (k,v) in b.apt_configs().iteritems():
+ v = urllib.quote(v)
+ script.append('apt_pkg.Config.Set("%s",urllib.unquote("%s"))'
+ % (k, v))
+ script += [
+ 'from GDebi.Cache import Cache',
+ 'cache = Cache()',
+ ]
+ for m in middle:
+ script += m + [
+ 'print res',
+ 'print d.missingDeps',
+ 'print d.requiredChanges',
+ 'assert(res)',
+ 'cache.commit()',
+ ''
+ ]
+ return '\n'.join(script)
+ def apt_get(b):
+ ag = ['apt-get','-qy']
+ for kv in b.apt_configs().iteritems():
+ ag += ['-o', '%s=%s' % kv]
+ return ag
+
def reset(b):
+ b._debug('reset')
rmtree('binaries', b.dir.read())
b.dir.invalidate()
b.dir.write()
b.install = []
b.blamed = []
+ b.registered = set()
def register(b, act, pkg, af, forwhat, blamed):
+ b._debug('register what=%s deb_%s=%s pkg=%s af=%s'
+ % (act.what, forwhat, act.ah['deb_'+forwhat], pkg, str(af)))
+
if act.ah['deb_'+forwhat] == 'ignore': return
b.blamed += testbed.blamed
leafname = pkg+'.deb'
- dest = b.dir.subpath('binaries--'+leafname, leafname, OutputFile)
+ dest = RelativeOutputFile('binaries--'+leafname, b.dir, leafname)
try: os.remove(dest.write())
except OSError, oe:
if act.ah['deb_'+forwhat] == 'install':
b.install.append(pkg)
+ b.registered.add(pkg)
+
def publish(b):
+ b._debug('publish')
+
script = '''
cd "$1"
apt-ftparchive packages . >Packages
- gzip -f Packages
+ gzip <Packages >Packages.gz
apt-ftparchive release . >Release
+ rm -f Release.gpg
gpg --homedir="$2" --batch --detach-sign --armour -o Release.gpg Release
gpg --homedir="$2" --batch --export >archive-key.pgp
'''
b.dir.invalidate(True)
apt_source = b.dir.read(True)
+ so = TemporaryFile('vlds')
script = '''
- apt-key add archive-key.pgp
- echo "deb file:///'+apt_source+'/ /" >/etc/apt/sources.list.d/autopkgtest
+ apt-key add archive-key.pgp >&2
+ echo "deb file://'''+apt_source+''' /" >sources.list
+ cat /etc/apt/sources.list >>sources.list
+ if [ "x`ls /var/lib/dpkg/updates`" != x ]; then
+ echo >&2 "/var/lib/dpkg/updates contains some files, aargh"; exit 1
+ fi
+ '''+ ' '.join(b.apt_get()) +''' update >&2
+ cat /var/lib/dpkg/status
'''
+ testbed.mungeing_apt()
debug_subprocess('apt-key', script=script)
(rc,se) = testbed.execute('apt-key',
['sh','-ec',script],
- cwd=b.dir.write(True))
+ so=so.write(True), cwd=b.dir.write(True))
if rc: bomb('apt setup failed with exit code %d' % rc, se)
testbed.blamed += b.blamed
+ b._debug('publish reinstall checking...')
+ pkgs_reinstall = set()
+ pkg = None
+ for l in file(so.read()):
+ if l.startswith('Package: '):
+ pkg = l[9:].rstrip()
+ elif l.startswith('Status: install '):
+ if pkg in b.registered:
+ pkgs_reinstall.add(pkg)
+ b._debug(' publish reinstall needs '+pkg)
+
+ if pkgs_reinstall:
+ for pkg in pkgs_reinstall: testbed.blame(pkg)
+ what = 'apt-get-reinstall'
+ cmdl = (b.apt_get() + ['--reinstall','install'] +
+ [pkg for pkg in pkgs_reinstall])
+ debug_subprocess(what, cmdl)
+ (rc,se) = testbed.execute(what, cmdl)
+ if rc: badpkg("installation of basic binarries failed,"
+ " exit code %d" % rc, se)
+
+ b._debug('publish install...')
for pkg in b.install:
+ what = 'apt-get-install-%s' % pkg
testbed.blame(pkg)
- (rc,se) = testbed.execute('install-%s'+act.what,
- ['apt-get','-qy','install',pkg])
+ cmdl = b.apt_get() + ['install',pkg]
+ debug_subprocess(what, cmdl)
+ (rc,se) = testbed.execute(what, cmdl)
if rc: badpkg("installation of %s failed, exit code %d"
% (pkg, rc), se)
+ b._debug('publish done')
+
#---------- processing of sources (building)
def source_rules_command(act,script,what,which,work,results_lines=0):
- script = "exec 3>&1 >&2\n" + '\n'.join(script)
+ if opts.debug:
+ trace = "%s-%s-trace" % (what,which)
+ script = [ "mkfifo -m600 "+trace,
+ "tee <"+trace+" /dev/stderr >&4 &",
+ "exec >"+trace+" 2>&1" ] + script
+
+ script = [ "exec 3>&1 >&2",
+ "set -x" ] + script
+ script = '\n'.join(script)
so = TemporaryFile('%s-%s-results' % (what,which))
se = TemporaryFile('%s-%s-log' % (what,which))
debug_subprocess('source-rules-command/%s/%s' % (act.what, which),
rc = testbed.execute('%s-%s' % (what,which),
['sh','-xec',script],
so=so.write(True), se=se.write(True),
- cwd= work.write(True))
- results = file(so.read()).read().split("\n")
+ cwd= work.write(True), dump_fd=4)
+ results = file(so.read()).read().rstrip('\n').split("\n")
se = file(se.read()).read()
if rc: badpkg("%s failed with exit code %d" % (which,rc), se)
if results_lines is not None and len(results) != results_lines:
if not m: badpkg(".dsc contains unparseable line"
" in Files: `%s'" % l)
leaf = m.groups(0)[0]
- subfile = dsc.sibling(
- what+'/'+leaf, leaf,
- InputFile)
+ subfile = RelativeInputFile(what+'/'+leaf, dsc, leaf,
+ sibling=True)
subfile.read(True)
dsc.read(True)
-
+
+ script = binaries.apt_pkg_gdebi_script(
+ dsc.read(True), [[
+ 'from GDebi.DscSrcPackage import DscSrcPackage',
+ 'd = DscSrcPackage(cache, arg)',
+ 'res = d.checkDeb()',
+ ],[
+ 'from GDebi.DebPackage import DebPackage',
+ 'd = DebPackage(cache)',
+ 'res = d.satisfyDependsStr("build-essential")',
+ ]])
+ cmdl = ['python','-c',script]
+ whatp = what+'-builddeps'
+ debug_subprocess(whatp, cmdl, script=script)
+ (rc,se) = testbed.execute(what, cmdl)
+ if rc: badpkg('build-depends install failed, exit code %d' % rc, se)
+
work = TemporaryDir(what+'-build')
script = [
'cd '+work.write(True),
- 'apt-get update',
- 'apt-get -y install build-essential',
- 'gdebi '+dsc.read(True) +'||apt-get -y install dpatch bison',
+ ]
+ if opts.user: script += [
+ 'chown '+opts.user+' .',
+ 'dsc=dsc.read(True) '+
+ opts.user_wrap('dpkg-source -x $dsc')
+ ]
+ else: script += [
'dpkg-source -x '+dsc.read(True),
+ ]
+ script += [
'cd */.',
'dpkg-checkbuilddeps',
'pwd >&3',
opts.user_wrap('debian/rules build'),
- ]
+ ]
result_pwd = source_rules_command(act,script,what,'build',work,1)
- if os.path.dirname(result_pwd) != work.read(True):
+ if os.path.dirname(result_pwd)+'/' != work.read(True):
badpkg("results dir `%s' is not in expected parent dir `%s'"
- % (results[0], work.read(True)))
+ % (result_pwd, work.read(True)))
+ act.work = work
act.tests_tree = InputDir(what+'-tests-tree',
- work.read(True)+os.path.basename(results[0]),
- InputDir)
+ work.read(True)+os.path.basename(result_pwd),
+ True)
if act.ah['dsc_tests']:
act.tests_tree.read()
- act.tests_tree.invalidate(True)
+ testbed.register_ephemeral(act.work)
+ testbed.register_ephemeral(act.tests_tree)
- act.blamed = testbed.blamed.copy()
+ act.blamed = copy.copy(testbed.blamed)
+ def debug_b(m): debug('* <dsc:%s> %s' % (act.what, m))
act.binaries = []
- if act.ah['dsc_filter'] != '_':
+ filter = act.ah['dsc_filter']
+ debug_b('filter=%s' % filter)
+ if filter != '_':
script = [
'cd '+work.write(True)+'/*/.',
opts.user_wrap(opts.gainroot+' debian/rules binary'),
result_debs = source_rules_command(act,script,what,
'binary',work,1)
if result_debs == '*': debs = []
- else: debs = debs.split(' ')
+ else: debs = result_debs.split(' ')
+ debug_b('debs='+`debs`)
re = regexp.compile('^([-+.0-9a-z]+)_[^_/]+(?:_[^_/]+)\.deb$')
for deb in debs:
m = re.match(deb)
if not m: badpkg("badly-named binary `%s'" % deb)
- pkg = m.groups(0)
- for pat in act.ah['dsc_filter'].split(','):
- if fnmatch.fnmatchcase(pkg,pat):
- deb_af = work.read()+'/'+deb
- deb_what = pkg+'_'+what+'.deb'
- bin = InputFile(deb_what,deb_af,True)
- bin.preserve_now()
- binaries.register(act,pkg,bin,'builds',
- testbed.blamed)
- act.binaries.subpath((pkg,bin))
- break
+ pkg = m.groups()[0]
+ debug_b(' deb=%s, pkg=%s' % (deb,pkg))
+ for pat in filter.split(','):
+ debug_b(' pat=%s' % pat)
+ if not fnmatch.fnmatchcase(pkg,pat):
+ debug_b(' no match')
+ continue
+ deb_what = pkg+'_'+what+'.deb'
+ bin = RelativeInputFile(deb_what,work,deb,True)
+ debug_b(' deb_what=%s, bin=%s' %
+ (deb_what, str(bin)))
+ binaries.register(act,pkg,bin,
+ 'forbuilds',testbed.blamed)
+ act.binaries.append((pkg,bin))
+ break
+ debug_b('all done.')
#---------- main processing loop and main program
def process_actions():
global binaries
+ def debug_a1(m): debug('@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@ '+m)
+ def debug_a2(m): debug('@@@@@@@@@@@@@@@@@@@@ '+m)
+ def debug_a3(m): debug('@@@@@@@@@@ '+m)
+
+ debug_a1('starting')
testbed.open()
binaries = Binaries()
binaries.reset()
+
+ debug_a1('builds ...')
for act in opts.actions:
- testbed.prepare()
+ debug_a2('%s %s' %
+ (act.kind, act.what))
+
+ testbed.prepare([])
if act.kind == 'deb':
- blame('arg:'+act.af.spec)
+ testbed.blame('arg:'+act.af.spec)
determine_package(act)
- blame('deb:'+act.pkg)
- binaries.register(act,act.pkg,act.af,'builds',
- testbed.blamed)
+ testbed.blame('deb:'+act.pkg)
+ binaries.register(act,act.pkg,act.af,
+ 'forbuilds',testbed.blamed)
if act.kind == 'dsc':
build_source(act)
+ if act.kind == 'tree':
+ act.binaries = []
+
+ debug_a1('builds done.')
binaries.reset()
control_override = None
+
+ debug_a1('tests ...')
for act in opts.actions:
- testbed.prepare()
+ debug_a2('test %s %s' % (act.kind, act.what))
+
+ testbed.needs_reset()
if act.kind == 'control':
control_override = act.af
if act.kind == 'deb':
- binaries.register(act,act.pkg,act.af,'tests',
+ binaries.register(act,act.pkg,act.af,'fortests',
['deb:'+act.pkg])
if act.kind == 'dsc':
for (pkg,bin) in act.binaries:
- binaries.register(act,pkg,bin,'tests',
+ binaries.register(act,pkg,bin,'fortests',
act.blamed)
if act.ah['dsc_tests']:
+ debug_a3('read control ...')
stanzas = read_control(act, act.tests_tree,
control_override)
testbed.blamed += act.blamed
- run_tests(act, stanzas)
+ debug_a3('run_tests ...')
+ run_tests(stanzas, act.tests_tree)
control_override = None
if act.kind == 'tree':
testbed.blame('arg:'+act.af.spec)
- stanzas = read_control(act, act.af,
- control_override)
- run_tests(act, stanzas)
+ stanzas = read_control(act, act.af, control_override)
+ debug_a3('run_tests ...')
+ run_tests(stanzas, act.af)
control_override = None
+ debug_a1('tests done.')
def main():
global testbed
except SystemExit, se:
os._exit(20)
try:
- tmpdir = tempfile.mkdtemp()
testbed = Testbed()
testbed.start()
finalise_options()