chiark / gitweb /
ownsource: tie in (part 1) and cleanup stuff
[hippotat.git] / hippotatlib / ownsource.py
1 # Automatic source code provision (AGPL compliance)
2
3 import os
4 import sys
5 import fnmatch
6 import stat
7 import subprocess
8 import tempfile
9
10 class SourceShipmentPreparer():
11   def __init__(s, destdir):
12     # caller may modify, and should read after calling generate()
13     s.output_name = 'srcbomb.tar.gz'
14     # s.output_path   alternatively caller may read this
15     # defaults, caller can modify after creation
16     s.logger = lambda m: print('SourceShipmentPreparer',m)
17     s.src_filter = s.src_filter_glob
18     s.src_package_globs = ['!/usr/local/*', '/usr*']
19     s.src_filter_globs = ['!/etc/*']
20     s.src_likeparent = s.src_likeparent_git
21     s.src_direxcludes = s.src_direxcludes_git
22     s.report_from_packages = s.report_from_packages_debian
23     s.cwd = os.getcwd()
24     s.find_rune_base = "find -type f -perm -004 \! -path '*/tmp/*'"
25     s.ignores = ['*~', '*.bak', '*.tmp', '#*#', '__pycache__',
26                   '[0-9][0-9][0-9][0-9]-src.cpio']
27     s.rune_shell = ['/bin/bash', '-ec']
28     s.show_pathnames = True
29     s.rune_cpio = r'''
30             set -o pipefail
31             (
32              %s
33              # ^ by default, is find ... -print0
34             ) | (
35              cpio -Hustar -o --quiet -0 -R 1000:1000 || \
36              cpio -Hustar -o --quiet -0
37             )
38     '''
39     s.rune_portmanteau = r'''
40             outfile=$1; shift
41             rm -f "$outfile"
42             GZIP=-1 tar zcf "$outfile" "$@"
43     '''
44     s.manifest_name='0000-MANIFEST.txt'
45     # private
46     s._destdir = destdir
47     s._outcounter = 0
48     s._manifest = []
49     s._dirmap = { }
50     s._package_files = { } # map filename => infol
51
52   def thing_matches_globs(s, thing, globs):
53     for pat in globs:
54       negate = pat.startswith('!')
55       if negate: pat = pat[1:]
56       if fnmatch.fnmatch(thing, pat):
57         return not negate
58     return negate
59
60   def src_filter_glob(s, src): # default s.src_filter
61     return s.thing_matches_globs(src, s.src_filter_globs)
62
63   def src_direxcludes_git(s, d):
64     try:
65       excl = open(os.path.join(d, '.gitignore'))
66     except FileNotFoundError:
67       return []
68     r = []
69     for l in excl:
70       l.strip
71       if l.startswith('#'): next
72       if not len(l): next
73       r += l
74     return r
75
76   def src_likeparent_git(s, src):
77     try:
78       os.stat(os.path.join(src, '.git/.'))
79     except FileNotFoundError:
80       return False
81     else:
82       return True
83
84   def src_parentfinder(s, src, infol): # callers may monkey-patch away
85     for deref in (False,True):
86       xinfo = []
87
88       search = src
89       if deref:
90         search = os.path.realpath(search)
91
92       def ascend():
93         nonlocal search
94         xinfo.append(os.path.basename(search))
95         search = os.path.dirname(search)
96
97       try:
98         stab = os.lstat(search)
99       except FileNotFoundError:
100         return
101       if stat.S_ISREG(stab.st_mode):
102         ascend()
103
104       while not os.path.ismount(search):
105         if s.src_likeparent(search):
106           xinfo.reverse()
107           if len(xinfo): infol.append('want=' + os.path.join(*xinfo))
108           return search
109
110         ascend()
111
112     # no .git found anywhere
113     return src
114
115   def path_prenormaliser(s, d, infol): # callers may monkey-patch away
116     return os.path.join(s.cwd, os.path.abspath(d))
117
118   def srcdir_find_rune(s, d):
119     script = s.find_rune_base
120     ignores = s.ignores + [s.output_name, s.manifest_name]
121     ignores += s.src_direxcludes(d)
122     for excl in ignores:
123       assert("'" not in excl)
124       script += r" \! -name '%s'"     % excl
125       script += r" \! -path '*/%s/*'" % excl
126     script += ' -print0'
127     return script
128
129   def manifest_append(s, name, infol):
130     s._manifest.append({ 'file':name, 'info':' '.join(infol) })
131
132   def manifest_append_absentfile(s, name, infol):
133     s._manifest.append({ 'file_print':name, 'info':' '.join(infol) })
134
135   def new_output_name(s, nametail, infol):
136     s._outcounter += 1
137     name = '%04d-%s' % (s._outcounter, nametail)
138     s.manifest_append(name, infol)
139     return name
140
141   def open_output_fh(s, name, mode):
142     return open(os.path.join(s._destdir, name), mode)
143
144   def src_dir(s, d, infol):
145     try: name = s._dirmap[d]
146     except KeyError: pass
147     else:
148       s.manifest_append(name, infol)
149       return
150
151     if s.show_pathnames: infol.append(d)
152     find_rune = s.srcdir_find_rune(d)
153     total_rune = s.rune_cpio % find_rune
154
155     name = s.new_output_name('src.cpio', infol)
156     s._dirmap[d] = name
157     fh = s.open_output_fh(name, 'wb')
158
159     s.logger('packing up into %s: %s (because %s)' %
160              (name, d, ' '.join(infol)))
161
162     subprocess.run(s.rune_shell + [total_rune],
163                    cwd=d,
164                    stdin=subprocess.DEVNULL,
165                    stdout=fh,
166                    restore_signals=True,
167                    check=True)
168     fh.close()
169
170   def src_indir(s, d, infol):
171     d = s.path_prenormaliser(d, infol)
172     if not s.src_filter(d): return
173
174     d = s.src_parentfinder(d, infol)
175     if d is None: return
176     s.src_dir(d, infol)
177
178   def report_from_packages_debian(s, files):
179     dpkg_S_in = tempfile.TemporaryFile(mode='w+')
180     for (file, infols) in files.items():
181       assert('\n' not in file)
182       dpkg_S_in.write(file)
183       dpkg_S_in.write('\0')
184     dpkg_S_in.seek(0)
185     cmdl = ['xargs','-0r','dpkg','-S','--']
186     dpkg_S = subprocess.Popen(cmdl,
187                               cwd='/',
188                               stdin=dpkg_S_in,
189                               stdout=subprocess.PIPE,
190                               stderr=sys.stderr,
191                               close_fds=False)
192     dpkg_show_in = tempfile.TemporaryFile(mode='w+')
193     pkginfos = { }
194     for l in dpkg_S.stdout:
195       l = l.strip(b'\n').decode('utf-8')
196       (pkgs, fname) = l.split(': ',1)
197       pks = pkgs.split(', ')
198       for pk in pks:
199         pkginfos.setdefault(pk,{'files':[]})['files'].append(fname)
200         print(pk, file=dpkg_show_in)
201     assert(dpkg_S.wait() == 0)
202     dpkg_show_in.seek(0)
203     cmdl = ['xargs','-r','dpkg-query',
204             r'-f${binary:Package}\t${Package}\t${Architecture}\t${Version}\t${source:Package}\t${source:Version}\n',
205             '--show','--']
206     dpkg_show = subprocess.Popen(cmdl,
207                                  cwd='/',
208                                  stdin=dpkg_show_in,
209                                  stdout=subprocess.PIPE,
210                                  stderr=sys.stderr,
211                                  close_fds=False)
212     for l in dpkg_show.stdout:
213       l = l.strip(b'\n').decode('utf-8')
214       (pk,p,a,v,sp,sv) = l.split('\t')
215       pkginfos[pk]['binary'] = p
216       pkginfos[pk]['arch'] = a
217       pkginfos[pk]['version'] = v
218       pkginfos[pk]['source'] = sp
219       pkginfos[pk]['sourceversion'] = sv
220     assert(dpkg_show.wait() == 0)
221     for pk in sorted(pkginfos.keys()):
222       pi = pkginfos[pk]
223       debfname = '%s_%s_%s.deb' % (pi['binary'], pi['version'], pi['arch'])
224       dscfname = '%s_%s.dsc' % (pi['source'], pi['sourceversion'])
225       s.manifest_append_absentfile(dscfname, [debfname])
226       s.logger('mentioning %s and %s because %s' %
227                (dscfname, debfname, pi['files'][0]))
228       for fname in pi['files']:
229         infol = files[fname]
230         if s.show_pathnames: infol = infol + ['loaded='+fname]
231         s.manifest_append_absentfile(' \t' + debfname, infol)
232
233   def thing_ought_packaged(s, fname):
234     return s.thing_matches_globs(fname, s.src_package_globs)
235
236   def src_file_packaged(s, fname, infol):
237     s._package_files.setdefault(fname,[]).extend(infol)
238
239   def src_file(s, fname, infol):
240     def fngens():
241       yield (infol, fname)
242       infol_copy = infol.copy()
243       yield (infol_copy, s.path_prenormaliser(fname, infol_copy))
244       yield (infol, os.path.realpath(fname))
245
246     for (tinfol, tfname) in fngens():
247       if s.thing_ought_packaged(tfname):
248         s.src_file_packaged(tfname, tinfol)
249         return
250
251     s.src_indir(fname, infol)
252
253   def src_argv0(s, program, infol):
254     s.src_file(program, infol)
255
256   def src_syspath(s, fname, infol):
257     if s.thing_ought_packaged(fname): return
258     s.src_indir(fname, infol)
259
260   def src_module(s, m, infol):
261     try: fname = m.__file__
262     except AttributeError: return
263     infol.append('module='+m.__name__)
264
265     if s.thing_ought_packaged(fname):
266       s.src_file_packaged(fname, infol)
267     else:
268       s.src_indir(fname, infol)
269
270   def srcs_allitems(s, dirs=sys.path):
271     s.logger('allitems')
272     s.src_argv0(sys.argv[0], ['argv[0]'])
273     for d in sys.path:
274       s.src_syspath(d, ['sys.path'])
275     for m in sys.modules.values():
276       s.src_module(m, ['sys.modules'])
277     s.report_from_packages(s._package_files)
278     s.logger('allitems done')
279
280   def mk_portmanteau(s):
281     s.logger('making portmanteau')
282     cmdl = s.rune_shell + [ s.rune_portmanteau, 'x',
283                             s.output_name, s.manifest_name ]
284     mfh = s.open_output_fh(s.manifest_name,'w')
285     for me in s._manifest:
286       try: fname = me['file']
287       except KeyError: fname = me.get('file_print','')
288       else: cmdl.append(fname)
289       print('%s\t%s' % (fname, me['info']), file=mfh)
290     mfh.close()
291     subprocess.run(cmdl,
292                    cwd=s._destdir,
293                    stdin=subprocess.DEVNULL,
294                    stdout=sys.stderr,
295                    restore_signals=True,
296                    check=True)
297     s.output_path = os.path.join(s._destdir, s.output_name)
298     s.logger('portmanteau ready in %s' % s.output_path)
299
300   def generate(s):
301     s.srcs_allitems()
302     s.mk_portmanteau()