chiark / gitweb /
fix bare except to satisfy newer pycodestyle
[fdroidserver.git] / fdroidserver / common.py
index 807974d6013a9f942231c19f75d4d3522a44ec5c..794504e311798a44a3b033caa73c1df40a80addf 100644 (file)
@@ -34,6 +34,9 @@ import logging
 import hashlib
 import socket
 import base64
+import zipfile
+import tempfile
+import json
 import xml.etree.ElementTree as XMLElementTree
 
 from binascii import hexlify
@@ -509,6 +512,32 @@ def publishednameinfo(filename):
     return result
 
 
+apk_release_filename = re.compile('(?P<appid>[a-zA-Z0-9_\.]+)_(?P<vercode>[0-9]+)\.apk')
+apk_release_filename_with_sigfp = re.compile('(?P<appid>[a-zA-Z0-9_\.]+)_(?P<vercode>[0-9]+)_(?P<sigfp>[0-9a-f]{7})\.apk')
+
+
+def apk_parse_release_filename(apkname):
+    """Parses the name of an APK file according the F-Droids APK naming
+    scheme and returns the tokens.
+
+    WARNING: Returned values don't necessarily represent the APKs actual
+    properties, the are just paresed from the file name.
+
+    :returns: A triplet containing (appid, versionCode, signer), where appid
+        should be the package name, versionCode should be the integer
+        represion of the APKs version and signer should be the first 7 hex
+        digists of the sha256 signing key fingerprint which was used to sign
+        this APK.
+    """
+    m = apk_release_filename_with_sigfp.match(apkname)
+    if m:
+        return m.group('appid'), m.group('vercode'), m.group('sigfp')
+    m = apk_release_filename.match(apkname)
+    if m:
+        return m.group('appid'), m.group('vercode'), None
+    return None, None, None
+
+
 def get_release_filename(app, build):
     if build.output:
         return "%s_%s.%s" % (app.id, build.versionCode, get_file_extension(build.output))
@@ -609,14 +638,16 @@ class vcs:
     def repotype(self):
         return None
 
-    # Take the local repository to a clean version of the given revision, which
-    # is specificed in the VCS's native format. Beforehand, the repository can
-    # be dirty, or even non-existent. If the repository does already exist
-    # locally, it will be updated from the origin, but only once in the
-    # lifetime of the vcs object.
-    # None is acceptable for 'rev' if you know you are cloning a clean copy of
-    # the repo - otherwise it must specify a valid revision.
     def gotorevision(self, rev, refresh=True):
+        """Take the local repository to a clean version of the given
+        revision, which is specificed in the VCS's native
+        format. Beforehand, the repository can be dirty, or even
+        non-existent. If the repository does already exist locally, it
+        will be updated from the origin, but only once in the lifetime
+        of the vcs object.  None is acceptable for 'rev' if you know
+        you are cloning a clean copy of the repo - otherwise it must
+        specify a valid revision.
+        """
 
         if self.clone_failed:
             raise VCSException(_("Downloading the repository already failed once, not trying again."))
@@ -665,9 +696,11 @@ class vcs:
         if exc is not None:
             raise exc
 
-    # Derived classes need to implement this. It's called once basic checking
-    # has been performend.
     def gotorevisionx(self, rev):  # pylint: disable=unused-argument
+        """Derived classes need to implement this.
+
+        It's called once basic checking has been performed.
+        """
         raise VCSException("This VCS type doesn't define gotorevisionx")
 
     # Initialise and update submodules
@@ -684,17 +717,16 @@ class vcs:
                 rtags.append(tag)
         return rtags
 
-    # Get a list of all the known tags, sorted from newest to oldest
     def latesttags(self):
+        """Get a list of all the known tags, sorted from newest to oldest"""
         raise VCSException('latesttags not supported for this vcs type')
 
-    # Get current commit reference (hash, revision, etc)
     def getref(self):
+        """Get current commit reference (hash, revision, etc)"""
         raise VCSException('getref not supported for this vcs type')
 
-    # Returns the srclib (name, path) used in setting up the current
-    # revision, or None.
     def getsrclib(self):
+        """Returns the srclib (name, path) used in setting up the current revision, or None."""
         return self.srclib
 
 
@@ -703,11 +735,14 @@ class vcs_git(vcs):
     def repotype(self):
         return 'git'
 
-    # If the local directory exists, but is somehow not a git repository, git
-    # will traverse up the directory tree until it finds one that is (i.e.
-    # fdroidserver) and then we'll proceed to destroy it! This is called as
-    # a safety check.
     def checkrepo(self):
+        """If the local directory exists, but is somehow not a git repository,
+        git will traverse up the directory tree until it finds one
+        that is (i.e.  fdroidserver) and then we'll proceed to destroy
+        it!  This is called as a safety check.
+
+        """
+
         p = FDroidPopen(['git', 'rev-parse', '--show-toplevel'], cwd=self.local, output=False)
         result = p.output.rstrip()
         if not result.endswith(self.local):
@@ -812,11 +847,13 @@ class vcs_gitsvn(vcs):
     def repotype(self):
         return 'git-svn'
 
-    # If the local directory exists, but is somehow not a git repository, git
-    # will traverse up the directory tree until it finds one that is (i.e.
-    # fdroidserver) and then we'll proceed to destory it! This is called as
-    # a safety check.
     def checkrepo(self):
+        """If the local directory exists, but is somehow not a git repository,
+        git will traverse up the directory tree until it finds one that
+        is (i.e.  fdroidserver) and then we'll proceed to destory it!
+        This is called as a safety check.
+
+        """
         p = FDroidPopen(['git', 'rev-parse', '--show-toplevel'], cwd=self.local, output=False)
         result = p.output.rstrip()
         if not result.endswith(self.local):
@@ -1254,14 +1291,16 @@ def is_valid_package_name(name):
     return re.match("[A-Za-z_][A-Za-z_0-9.]+$", name)
 
 
-# Get the specified source library.
-# Returns the path to it. Normally this is the path to be used when referencing
-# it, which may be a subdirectory of the actual project. If you want the base
-# directory of the project, pass 'basepath=True'.
 def getsrclib(spec, srclib_dir, subdir=None, basepath=False,
               raw=False, prepare=True, preponly=False, refresh=True,
               build=None):
+    """Get the specified source library.
+
+    Returns the path to it. Normally this is the path to be used when
+    referencing it, which may be a subdirectory of the actual project. If
+    you want the base directory of the project, pass 'basepath=True'.
 
+    """
     number = None
     subdir = None
     if raw:
@@ -1563,9 +1602,8 @@ def prepare_source(vcs, app, build, build_dir, srclib_dir, extlib_dir, onserver=
     return (root_dir, srclibpaths)
 
 
-# Extend via globbing the paths from a field and return them as a map from
-# original path to resulting paths
 def getpaths_map(build_dir, globpaths):
+    """Extend via globbing the paths from a field and return them as a map from original path to resulting paths"""
     paths = dict()
     for p in globpaths:
         p = p.strip()
@@ -1577,8 +1615,8 @@ def getpaths_map(build_dir, globpaths):
     return paths
 
 
-# Extend via globbing the paths from a field and return them as a set
 def getpaths(build_dir, globpaths):
+    """Extend via globbing the paths from a field and return them as a set"""
     paths_map = getpaths_map(build_dir, globpaths)
     paths = set()
     for k, v in paths_map.items():
@@ -1599,6 +1637,13 @@ class KnownApks:
     """
 
     def __init__(self):
+        '''Load filename/date info about previously seen APKs
+
+        Since the appid and date strings both will never have spaces,
+        this is parsed as a list from the end to allow the filename to
+        have any combo of spaces.
+        '''
+
         self.path = os.path.join('stats', 'known_apks.txt')
         self.apks = {}
         if os.path.isfile(self.path):
@@ -1608,7 +1653,10 @@ class KnownApks:
                     if len(t) == 2:
                         self.apks[t[0]] = (t[1], None)
                     else:
-                        self.apks[t[0]] = (t[1], datetime.strptime(t[2], '%Y-%m-%d'))
+                        appid = t[-2]
+                        date = datetime.strptime(t[-1], '%Y-%m-%d')
+                        filename = line[0:line.rfind(appid) - 1]
+                        self.apks[filename] = (appid, date)
         self.changed = False
 
     def writeifchanged(self):
@@ -1643,16 +1691,17 @@ class KnownApks:
         _, added = self.apks[apkName]
         return added
 
-    # Look up information - given the 'apkname', returns (app id, date added/None).
-    # Or returns None for an unknown apk.
     def getapp(self, apkname):
+        """Look up information - given the 'apkname', returns (app id, date added/None).
+
+        Or returns None for an unknown apk.
+        """
         if apkname in self.apks:
             return self.apks[apkname]
         return None
 
-    # Get the most recent 'num' apps added to the repo, as a list of package ids
-    # with the most recent first.
     def getlatest(self, num):
+        """Get the most recent 'num' apps added to the repo, as a list of package ids with the most recent first"""
         apps = {}
         for apk, app in self.apks.items():
             appid, added = app
@@ -1994,6 +2043,67 @@ def place_srclib(root_dir, number, libpath):
 apk_sigfile = re.compile(r'META-INF/[0-9A-Za-z]+\.(SF|RSA|DSA|EC)')
 
 
+def signer_fingerprint_short(sig):
+    """Obtain shortened sha256 signing-key fingerprint for pkcs7 signature.
+
+    Extracts the first 7 hexadecimal digits of sha256 signing-key fingerprint
+    for a given pkcs7 signature.
+
+    :param sig: Contents of an APK signing certificate.
+    :returns: shortened signing-key fingerprint.
+    """
+    return signer_fingerprint(sig)[:7]
+
+
+def signer_fingerprint(sig):
+    """Obtain sha256 signing-key fingerprint for pkcs7 signature.
+
+    Extracts hexadecimal sha256 signing-key fingerprint string
+    for a given pkcs7 signature.
+
+    :param: Contents of an APK signature.
+    :returns: shortened signature fingerprint.
+    """
+    cert_encoded = get_certificate(sig)
+    return hashlib.sha256(cert_encoded).hexdigest()
+
+
+def apk_signer_fingerprint(apk_path):
+    """Obtain sha256 signing-key fingerprint for APK.
+
+    Extracts hexadecimal sha256 signing-key fingerprint string
+    for a given APK.
+
+    :param apkpath: path to APK
+    :returns: signature fingerprint
+    """
+
+    with zipfile.ZipFile(apk_path, 'r') as apk:
+        certs = [n for n in apk.namelist() if CERT_PATH_REGEX.match(n)]
+
+        if len(certs) < 1:
+            logging.error("Found no signing certificates on %s" % apk_path)
+            return None
+        if len(certs) > 1:
+            logging.error("Found multiple signing certificates on %s" % apk_path)
+            return None
+
+        cert = apk.read(certs[0])
+        return signer_fingerprint(cert)
+
+
+def apk_signer_fingerprint_short(apk_path):
+    """Obtain shortened sha256 signing-key fingerprint for APK.
+
+    Extracts the first 7 hexadecimal digits of sha256 signing-key fingerprint
+    for a given pkcs7 APK.
+
+    :param apk_path: path to APK
+    :returns: shortened signing-key fingerprint
+    """
+    return apk_signer_fingerprint(apk_path)[:7]
+
+
 def metadata_get_sigdir(appid, vercode=None):
     """Get signature directory for app"""
     if vercode:
@@ -2002,6 +2112,128 @@ def metadata_get_sigdir(appid, vercode=None):
         return os.path.join('metadata', appid, 'signatures')
 
 
+def metadata_find_developer_signature(appid, vercode=None):
+    """Tires to find the developer signature for given appid.
+
+    This picks the first signature file found in metadata an returns its
+    signature.
+
+    :returns: sha256 signing key fingerprint of the developer signing key.
+        None in case no signature can not be found."""
+
+    # fetch list of dirs for all versions of signatures
+    appversigdirs = []
+    if vercode:
+        appversigdirs.append(metadata_get_sigdir(appid, vercode))
+    else:
+        appsigdir = metadata_get_sigdir(appid)
+        if os.path.isdir(appsigdir):
+            numre = re.compile('[0-9]+')
+            for ver in os.listdir(appsigdir):
+                if numre.match(ver):
+                    appversigdir = os.path.join(appsigdir, ver)
+                    appversigdirs.append(appversigdir)
+
+    for sigdir in appversigdirs:
+        sigs = glob.glob(os.path.join(sigdir, '*.DSA')) + \
+            glob.glob(os.path.join(sigdir, '*.EC')) + \
+            glob.glob(os.path.join(sigdir, '*.RSA'))
+        if len(sigs) > 1:
+            raise FDroidException('ambiguous signatures, please make sure there is only one signature in \'{}\'. (The signature has to be the App maintainers signature for version of the APK.)'.format(sigdir))
+        for sig in sigs:
+            with open(sig, 'rb') as f:
+                return signer_fingerprint(f.read())
+    return None
+
+
+def metadata_find_signing_files(appid, vercode):
+    """Gets a list of singed manifests and signatures.
+
+    :param appid: app id string
+    :param vercode: app version code
+    :returns: a list of triplets for each signing key with following paths:
+        (signature_file, singed_file, manifest_file)
+    """
+    ret = []
+    sigdir = metadata_get_sigdir(appid, vercode)
+    sigs = glob.glob(os.path.join(sigdir, '*.DSA')) + \
+        glob.glob(os.path.join(sigdir, '*.EC')) + \
+        glob.glob(os.path.join(sigdir, '*.RSA'))
+    extre = re.compile('(\.DSA|\.EC|\.RSA)$')
+    for sig in sigs:
+        sf = extre.sub('.SF', sig)
+        if os.path.isfile(sf):
+            mf = os.path.join(sigdir, 'MANIFEST.MF')
+            if os.path.isfile(mf):
+                ret.append((sig, sf, mf))
+    return ret
+
+
+def metadata_find_developer_signing_files(appid, vercode):
+    """Get developer signature files for specified app from metadata.
+
+    :returns: A triplet of paths for signing files from metadata:
+        (signature_file, singed_file, manifest_file)
+    """
+    allsigningfiles = metadata_find_signing_files(appid, vercode)
+    if allsigningfiles and len(allsigningfiles) == 1:
+        return allsigningfiles[0]
+    else:
+        return None
+
+
+def apk_strip_signatures(signed_apk, strip_manifest=False):
+    """Removes signatures from APK.
+
+    :param signed_apk: path to apk file.
+    :param strip_manifest: when set to True also the manifest file will
+        be removed from the APK.
+    """
+    with tempfile.TemporaryDirectory() as tmpdir:
+        tmp_apk = os.path.join(tmpdir, 'tmp.apk')
+        os.rename(signed_apk, tmp_apk)
+        with ZipFile(tmp_apk, 'r') as in_apk:
+            with ZipFile(signed_apk, 'w') as out_apk:
+                for f in in_apk.infolist():
+                    if not apk_sigfile.match(f.filename):
+                        if strip_manifest:
+                            if f.filename != 'META-INF/MANIFEST.MF':
+                                buf = in_apk.read(f.filename)
+                                out_apk.writestr(f.filename, buf)
+                        else:
+                            buf = in_apk.read(f.filename)
+                            out_apk.writestr(f.filename, buf)
+
+
+def apk_implant_signatures(apkpath, signaturefile, signedfile, manifest):
+    """Implats a signature from metadata into an APK.
+
+    Note: this changes there supplied APK in place. So copy it if you
+    need the original to be preserved.
+
+    :param apkpath: location of the apk
+    """
+    # get list of available signature files in metadata
+    with tempfile.TemporaryDirectory() as tmpdir:
+        # orig_apk = os.path.join(tmpdir, 'orig.apk')
+        # os.rename(apkpath, orig_apk)
+        apkwithnewsig = os.path.join(tmpdir, 'newsig.apk')
+        with ZipFile(apkpath, 'r') as in_apk:
+            with ZipFile(apkwithnewsig, 'w') as out_apk:
+                for sig_file in [signaturefile, signedfile, manifest]:
+                    out_apk.write(sig_file, arcname='META-INF/' +
+                                  os.path.basename(sig_file))
+                for f in in_apk.infolist():
+                    if not apk_sigfile.match(f.filename):
+                        if f.filename != 'META-INF/MANIFEST.MF':
+                            buf = in_apk.read(f.filename)
+                            out_apk.writestr(f.filename, buf)
+        os.remove(apkpath)
+        p = SdkToolsPopen(['zipalign', '-v', '4', apkwithnewsig, apkpath])
+        if p.returncode != 0:
+            raise BuildException("Failed to align application")
+
+
 def apk_extract_signatures(apkpath, outdir, manifest=True):
     """Extracts a signature files from APK and puts them into target directory.
 
@@ -2041,30 +2273,35 @@ def verify_apks(signed_apk, unsigned_apk, tmp_dir):
               describing what went wrong.
     """
 
-    signed = ZipFile(signed_apk, 'r')
-    meta_inf_files = ['META-INF/MANIFEST.MF']
-    for f in signed.namelist():
-        if apk_sigfile.match(f) \
-           or f in ['META-INF/fdroidserverid', 'META-INF/buildserverid']:
-            meta_inf_files.append(f)
-    if len(meta_inf_files) < 3:
-        return "Signature files missing from {0}".format(signed_apk)
-
-    tmp_apk = os.path.join(tmp_dir, 'sigcp_' + os.path.basename(unsigned_apk))
-    unsigned = ZipFile(unsigned_apk, 'r')
-    # only read the signature from the signed APK, everything else from unsigned
-    with ZipFile(tmp_apk, 'w') as tmp:
-        for filename in meta_inf_files:
-            tmp.writestr(signed.getinfo(filename), signed.read(filename))
-        for info in unsigned.infolist():
-            if info.filename in meta_inf_files:
-                logging.warning('Ignoring ' + info.filename + ' from ' + unsigned_apk)
-                continue
-            if info.filename in tmp.namelist():
-                return "duplicate filename found: " + info.filename
-            tmp.writestr(info, unsigned.read(info.filename))
-    unsigned.close()
-    signed.close()
+    if not os.path.isfile(signed_apk):
+        return 'can not verify: file does not exists: {}'.format(signed_apk)
+
+    if not os.path.isfile(unsigned_apk):
+        return 'can not verify: file does not exists: {}'.format(unsigned_apk)
+
+    with ZipFile(signed_apk, 'r') as signed:
+        meta_inf_files = ['META-INF/MANIFEST.MF']
+        for f in signed.namelist():
+            if apk_sigfile.match(f) \
+               or f in ['META-INF/fdroidserverid', 'META-INF/buildserverid']:
+                meta_inf_files.append(f)
+        if len(meta_inf_files) < 3:
+            return "Signature files missing from {0}".format(signed_apk)
+
+        tmp_apk = os.path.join(tmp_dir, 'sigcp_' + os.path.basename(unsigned_apk))
+        with ZipFile(unsigned_apk, 'r') as unsigned:
+            # only read the signature from the signed APK, everything else from unsigned
+            with ZipFile(tmp_apk, 'w') as tmp:
+                for filename in meta_inf_files:
+                    tmp.writestr(signed.getinfo(filename), signed.read(filename))
+                for info in unsigned.infolist():
+                    if info.filename in meta_inf_files:
+                        logging.warning('Ignoring %s from %s',
+                                        info.filename, unsigned_apk)
+                        continue
+                    if info.filename in tmp.namelist():
+                        return "duplicate filename found: " + info.filename
+                    tmp.writestr(info, unsigned.read(info.filename))
 
     verified = verify_apk_signature(tmp_apk)
 
@@ -2098,6 +2335,8 @@ def verify_apk_signature(apk, min_sdk_version=None):
     Try to use apksigner whenever possible since jarsigner is very
     shitty: unsigned APKs pass as "verified"!  Warning, this does
     not work on JARs with apksigner >= 0.7 (build-tools 26.0.1)
+
+    :returns: boolean whether the APK was verified
     """
     if set_command_in_config('apksigner'):
         args = [config['apksigner'], 'verify']
@@ -2109,7 +2348,7 @@ def verify_apk_signature(apk, min_sdk_version=None):
         try:
             verify_jar_signature(apk)
             return True
-        except:
+        except Exception:
             pass
     return False
 
@@ -2124,6 +2363,7 @@ def verify_old_apk_signature(apk):
     jarsigner passes unsigned APKs as "verified"! So this has to turn
     on -strict then check for result 4.
 
+    :returns: boolean whether the APK was verified
     """
 
     _java_security = os.path.join(os.getcwd(), '.java.security')
@@ -2325,6 +2565,34 @@ def get_certificate(certificate_file):
     return encoder.encode(cert)
 
 
+def load_stats_fdroid_signing_key_fingerprints():
+    """Load list of signing-key fingerprints stored by fdroid publish from file.
+
+    :returns: list of dictionanryies containing the singing-key fingerprints.
+    """
+    jar_file = os.path.join('stats', 'publishsigkeys.jar')
+    if not os.path.isfile(jar_file):
+        return {}
+    cmd = [config['jarsigner'], '-strict', '-verify', jar_file]
+    p = FDroidPopen(cmd, output=False)
+    if p.returncode != 4:
+        raise FDroidException("Signature validation of '{}' failed! "
+                              "Please run publish again to rebuild this file.".format(jar_file))
+
+    jar_sigkey = apk_signer_fingerprint(jar_file)
+    repo_key_sig = config.get('repo_key_sha256')
+    if repo_key_sig:
+        if jar_sigkey != repo_key_sig:
+            raise FDroidException("Signature key fingerprint of file '{}' does not match repo_key_sha256 in config.py (found fingerprint: '{}')".format(jar_file, jar_sigkey))
+    else:
+        logging.warning("repo_key_sha256 not in config.py, setting it to the signature key fingerprint of '{}'".format(jar_file))
+        config['repo_key_sha256'] = jar_sigkey
+        write_to_config(config, 'repo_key_sha256')
+
+    with zipfile.ZipFile(jar_file, 'r') as f:
+        return json.loads(str(f.read('publishsigkeys.json'), 'utf-8'))
+
+
 def write_to_config(thisconfig, key, value=None, config_file=None):
     '''write a key/value to the local config.py