chiark / gitweb /
ownsource: try to honour .gitignore
[hippotat.git] / hippotatlib / ownsource.py
1 # Automatic source code provision (AGPL compliance)
2
3 import os
4 import sys
5 import fnmatch
6 import stat
7 import subprocess
8 import tempfile
9
10 class SourceShipmentPreparer():
11   def __init__(s, destdir):
12     # caller may modify, and should read after calling generate()
13     s.output_name = 'srcbomb.tar.gz'
14     # defaults, caller can modify after creation
15     s.src_filter = s.src_filter_glob
16     s.src_package_globs = ['!/usr/local/*', '/usr*']
17     s.src_filter_globs = ['!/etc/*']
18     s.src_likeparent = s.src_likeparent_git
19     s.src_direxcludes = s.src_direxcludes_git
20     s.report_from_packages = s.report_from_packages_debian
21     s.cwd = os.getcwd()
22     s.find_rune_base = "find -type f -perm -004 \! -path '*/tmp/*'"
23     s.ignores = ['*~', '*.bak', '*.tmp', '#*#', '__pycache__',
24                   '[0-9][0-9][0-9][0-9]-src.cpio']
25     s.rune_shell = ['/bin/bash', '-ec']
26     s.show_pathnames = True
27     s.rune_cpio = r'''
28             set -o pipefail
29             (
30              %s
31              # ^ by default, is find ... -print0
32             ) | (
33              cpio -Hustar -o --quiet -0 -R 1000:1000 || \
34              cpio -Hustar -o --quiet -0
35             )
36     '''
37     s.rune_portmanteau = r'''
38             outfile=$1; shift
39             rm -f "$outfile"
40             GZIP=-1 tar zcf "$outfile" "$@"
41     '''
42     s.manifest_name='0000-MANIFEST.txt'
43     # private
44     s._destdir = destdir
45     s._outcounter = 0
46     s._manifest = []
47     s._dirmap = { }
48     s._package_files = { } # map filename => infol
49
50   def thing_matches_globs(s, thing, globs):
51     for pat in globs:
52       negate = pat.startswith('!')
53       if negate: pat = pat[1:]
54       if fnmatch.fnmatch(thing, pat):
55         return not negate
56     return negate
57
58   def src_filter_glob(s, src): # default s.src_filter
59     return s.thing_matches_globs(src, s.src_filter_globs)
60
61   def src_direxcludes_git(s, d):
62     try:
63       excl = open(os.path.join(d, '.gitignore'))
64     except FileNotFoundError:
65       return []
66     r = []
67     for l in excl:
68       l.strip
69       if l.startswith('#'): next
70       if not len(l): next
71       r += l
72     return r
73
74   def src_likeparent_git(s, src):
75     try:
76       os.stat(os.path.join(src, '.git/.'))
77     except FileNotFoundError:
78       return False
79     else:
80       return True
81
82   def src_parentfinder(s, src, infol): # callers may monkey-patch away
83     for deref in (False,True):
84       xinfo = []
85
86       search = src
87       if deref:
88         search = os.path.realpath(search)
89
90       def ascend():
91         nonlocal search
92         xinfo.append(os.path.basename(search))
93         search = os.path.dirname(search)
94
95       try:
96         stab = os.lstat(search)
97       except FileNotFoundError:
98         return
99       if stat.S_ISREG(stab.st_mode):
100         ascend()
101
102       while not os.path.ismount(search):
103         if s.src_likeparent(search):
104           xinfo.reverse()
105           if len(xinfo): infol.append('want=' + os.path.join(*xinfo))
106           return search
107
108         ascend()
109
110     # no .git found anywhere
111     return src
112
113   def path_prenormaliser(s, d, infol): # callers may monkey-patch away
114     return os.path.join(s.cwd, os.path.abspath(d))
115
116   def srcdir_find_rune(s, d):
117     script = s.find_rune_base
118     ignores = s.ignores + [s.output_name, s.manifest_name]
119     ignores += s.src_direxcludes(d)
120     for excl in ignores:
121       assert("'" not in excl)
122       script += r" \! -name '%s'"     % excl
123       script += r" \! -path '*/%s/*'" % excl
124     script += ' -print0'
125     return script
126
127   def manifest_append(s, name, infol):
128     s._manifest.append({ 'file':name, 'info':' '.join(infol) })
129
130   def manifest_append_absentfile(s, name, infol):
131     s._manifest.append({ 'file_print':name, 'info':' '.join(infol) })
132
133   def new_output_name(s, nametail, infol):
134     s._outcounter += 1
135     name = '%04d-%s' % (s._outcounter, nametail)
136     s.manifest_append(name, infol)
137     return name
138
139   def open_output_fh(s, name, mode):
140     return open(os.path.join(s._destdir, name), mode)
141
142   def src_dir(s, d, infol):
143     try: name = s._dirmap[d]
144     except KeyError: pass
145     else:
146       s.manifest_append(name, infol)
147       return
148
149     if s.show_pathnames: infol.append(d)
150     find_rune = s.srcdir_find_rune(d)
151     total_rune = s.rune_cpio % find_rune
152
153     name = s.new_output_name('src.cpio', infol)
154     s._dirmap[d] = name
155     fh = s.open_output_fh(name, 'wb')
156
157     subprocess.run(s.rune_shell + [total_rune],
158                    cwd=d,
159                    stdin=subprocess.DEVNULL,
160                    stdout=fh,
161                    restore_signals=True,
162                    check=True)
163     fh.close()
164
165   def src_indir(s, d, infol):
166     d = s.path_prenormaliser(d, infol)
167     if not s.src_filter(d): return
168
169     d = s.src_parentfinder(d, infol)
170     if d is None: return
171     s.src_dir(d, infol)
172
173   def report_from_packages_debian(s, files):
174     dpkg_S_in = tempfile.TemporaryFile(mode='w+')
175     for (file, infols) in files.items():
176       assert('\n' not in file)
177       dpkg_S_in.write(file)
178       dpkg_S_in.write('\0')
179     dpkg_S_in.seek(0)
180     cmdl = ['xargs','-0r','dpkg','-S','--']
181     dpkg_S = subprocess.Popen(cmdl,
182                               cwd='/',
183                               stdin=dpkg_S_in,
184                               stdout=subprocess.PIPE,
185                               stderr=sys.stderr,
186                               close_fds=False)
187     dpkg_show_in = tempfile.TemporaryFile(mode='w+')
188     pkginfos = { }
189     for l in dpkg_S.stdout:
190       l = l.strip(b'\n').decode('utf-8')
191       (pkgs, fname) = l.split(': ',1)
192       pks = pkgs.split(', ')
193       for pk in pks:
194         pkginfos.setdefault(pk,{'files':[]})['files'].append(fname)
195         print(pk, file=dpkg_show_in)
196     assert(dpkg_S.wait() == 0)
197     dpkg_show_in.seek(0)
198     cmdl = ['xargs','-r','dpkg-query',
199             r'-f${binary:Package}\t${Package}\t${Architecture}\t${Version}\t${source:Package}\t${source:Version}\n',
200             '--show','--']
201     dpkg_show = subprocess.Popen(cmdl,
202                                  cwd='/',
203                                  stdin=dpkg_show_in,
204                                  stdout=subprocess.PIPE,
205                                  stderr=sys.stderr,
206                                  close_fds=False)
207     for l in dpkg_show.stdout:
208       l = l.strip(b'\n').decode('utf-8')
209       (pk,p,a,v,sp,sv) = l.split('\t')
210       pkginfos[pk]['binary'] = p
211       pkginfos[pk]['arch'] = a
212       pkginfos[pk]['version'] = v
213       pkginfos[pk]['source'] = sp
214       pkginfos[pk]['sourceversion'] = sv
215     assert(dpkg_show.wait() == 0)
216     for pk in sorted(pkginfos.keys()):
217       pi = pkginfos[pk]
218       debfname = '%s_%s_%s.deb' % (pi['binary'], pi['version'], pi['arch'])
219       dscfname = '%s_%s.dsc' % (pi['source'], pi['sourceversion'])
220       s.manifest_append_absentfile(dscfname, [debfname])
221       for fname in pi['files']:
222         infol = files[fname]
223         if s.show_pathnames: infol = infol + ['loaded='+fname]
224         s.manifest_append_absentfile(' \t' + debfname, infol)
225
226   def thing_ought_packaged(s, fname):
227     return s.thing_matches_globs(fname, s.src_package_globs)
228
229   def src_file_packaged(s, fname, infol):
230     s._package_files.setdefault(fname,[]).extend(infol)
231
232   def src_file(s, fname, infol):
233     def fngens():
234       yield (infol, fname)
235       infol_copy = infol.copy()
236       yield (infol_copy, s.path_prenormaliser(fname, infol_copy))
237       yield (infol, os.path.realpath(fname))
238
239     for (tinfol, tfname) in fngens():
240       if s.thing_ought_packaged(tfname):
241         s.src_file_packaged(tfname, tinfol)
242         return
243
244     s.src_indir(fname, infol)
245
246   def src_argv0(s, program, infol):
247     s.src_file(program, infol)
248
249   def src_syspath(s, fname, infol):
250     if s.thing_ought_packaged(fname): return
251     s.src_indir(fname, infol)
252
253   def src_module(s, m, infol):
254     try: fname = m.__file__
255     except AttributeError: return
256     infol.append('module='+m.__name__)
257
258     if s.thing_ought_packaged(fname):
259       s.src_file_packaged(fname, infol)
260     else:
261       s.src_indir(fname, infol)
262
263   def srcs_allitems(s, dirs=sys.path):
264     s.src_argv0(sys.argv[0], ['argv[0]'])
265     for d in sys.path:
266       s.src_syspath(d, ['sys.path'])
267     for m in sys.modules.values():
268       s.src_module(m, ['sys.modules'])
269     s.report_from_packages(s._package_files)
270
271   def mk_portmanteau(s):
272     cmdl = s.rune_shell + [ s.rune_portmanteau, 'x',
273                             s.output_name, s.manifest_name ]
274     mfh = s.open_output_fh(s.manifest_name,'w')
275     for me in s._manifest:
276       try: fname = me['file']
277       except KeyError: fname = me.get('file_print','')
278       else: cmdl.append(fname)
279       print('%s\t%s' % (fname, me['info']), file=mfh)
280     mfh.close()
281     subprocess.run(cmdl,
282                    cwd=s._destdir,
283                    stdin=subprocess.DEVNULL,
284                    stdout=sys.stderr,
285                    restore_signals=True,
286                    check=True)
287
288   def generate(s):
289     s.srcs_allitems()
290     s.mk_portmanteau()