diff options
Diffstat (limited to 'b4/__init__.py')
-rw-r--r-- | b4/__init__.py | 408 |
1 files changed, 164 insertions, 244 deletions
diff --git a/b4/__init__.py b/b4/__init__.py index dc393a9..9cd1f87 100644 --- a/b4/__init__.py +++ b/b4/__init__.py @@ -19,6 +19,7 @@ import time import shutil import mailbox import pwd +import base64 from pathlib import Path from tempfile import mkstemp, TemporaryDirectory @@ -353,12 +354,6 @@ class LoreMailbox: self.covers[lmsg.revision] = lmsg return - if re.search(r'^Comment: att-fmt-ver:', lmsg.body, re.I | re.M): - logger.debug('Found attestation message') - LoreAttestationDocument.load_from_string(lmsg.msgid, lmsg.body) - # We don't keep it, because it's not useful for us beyond this point - return - if lmsg.has_diff: if lmsg.revision not in self.series: if lmsg.revision_inferred and lmsg.in_reply_to: @@ -490,9 +485,9 @@ class LoreSeries: attstaled = int(config['attestation-staleness-days']) except ValueError: attstaled = 30 - exact_from_match = False - if config['attestation-uid-match'] == 'strict': - exact_from_match = True + # exact_from_match = False + # if config['attestation-uid-match'] == 'strict': + # exact_from_match = True if config['attestation-checkmarks'] == 'fancy': attpass = PASS_FANCY @@ -522,50 +517,35 @@ class LoreSeries: lmsg.followup_trailers.append(('Link', linkmask % lmsg.msgid, None)) if attpolicy != 'off': - lore_lookup = False - if at == 1: - # We only hit lore on the first patch - lore_lookup = True - attdoc = lmsg.get_attestation(lore_lookup=lore_lookup, exact_from_match=exact_from_match) - if attdoc is None: + lmsg.load_hashes() + latt = lmsg.attestation + latt.validate(lmsg.msg) + if latt.passing: + # Make sure it's not too old compared to the message date + # Timezone doesn't matter as we calculate whole days + tdelta = lmsg.date.replace(tzinfo=None) - latt.lsig.sigdate + if tdelta.days > attstaled: + # Uh-oh, attestation is too old! + logger.info(' %s %s', attfail, lmsg.full_subject) + atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected, + attstaled, tdelta.days)) + else: + logger.info(' %s %s', attpass, lmsg.full_subject) + attdata[at - 1] = latt.lsig.attestor.get_trailer(lmsg.fromemail) + else: if attpolicy in ('softfail', 'hardfail'): logger.info(' %s %s', attfail, lmsg.full_subject) - # Which part failed? - fi = fm = fp = True - for attdoc in ATTESTATIONS: - for i, m, p in attdoc.hashes: - if p == lmsg.attestation.p: - fp = False - if m == lmsg.attestation.m: - fm = False - if i == lmsg.attestation.i: - fi = False failed = list() - if fp: + if not latt.pv: failed.append('patch content') - if fm: + if not latt.pm: failed.append('commit message') - if fi: + if not latt.pi: failed.append('patch metadata') atterrors.append('Patch %s/%s failed attestation (%s)' % (at, lmsg.expected, ', '.join(failed))) else: logger.info(' %s', lmsg.full_subject) - else: - if attpolicy == 'check': - # switch to softfail policy now that we have at least one hit - attpolicy = 'softfail' - # Make sure it's not too old compared to the message date - # Timezone doesn't matter as we calculate whole days - tdelta = lmsg.date.replace(tzinfo=None) - attdoc.lsig.sigdate - if tdelta.days > attstaled: - # Uh-oh, attestation is too old! - logger.info(' %s %s', attfail, lmsg.full_subject) - atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected, - attstaled, tdelta.days)) - else: - logger.info(' %s %s', attpass, lmsg.full_subject) - attdata[at-1] = attdoc.lsig.attestor.get_trailer(lmsg.fromemail) else: logger.info(' %s', lmsg.full_subject) @@ -803,6 +783,8 @@ class LoreMessage: self.attestation = None # Patchwork hash self.pwhash = None + # Git patch-id + self.git_patch_id = None # Blob indexes self.blob_indexes = None @@ -1140,42 +1122,53 @@ class LoreMessage: if self.attestation is not None: return logger.debug('Calculating hashes for: %s', self.full_subject) + # Calculate git-patch-id first + cmdargs = ['patch-id', '--stable'] + stdin = self.msg.as_string(policy=emlpolicy).encode() + ecode, out = git_run_command(None, cmdargs, stdin) + if ecode > 0: + # Git doesn't think there's a patch there + return + fline = out.split('\n')[0] + if len(fline) < 40: + # Not sure what that is + return + self.git_patch_id = fline[:40] + msg_out = mkstemp() patch_out = mkstemp() cmdargs = ['mailinfo', '--encoding=UTF-8', msg_out[1], patch_out[1]] - emlout = self.msg.as_string(policy=emlpolicy) - ecode, info = git_run_command(None, cmdargs, emlout.encode('utf-8')) + ecode, info = git_run_command(None, cmdargs, stdin) if ecode > 0: logger.debug('ERROR: Could not get mailinfo') return - ihasher = hashlib.sha256() + i = hashlib.sha256() + m = hashlib.sha256() + p = hashlib.sha256() for line in info.split('\n'): # We don't use the "Date:" field because it is likely to be # mangled between when git-format-patch generates it and # when it is sent out by git-send-email (or other tools). if re.search(r'^(Author|Email|Subject):', line): - ihasher.update((line + '\n').encode('utf-8')) - i = ihasher.hexdigest() + i.update((line + '\n').encode()) - with open(msg_out[1], 'r') as mfh: + with open(msg_out[1], 'rb') as mfh: msg = mfh.read() - mhasher = hashlib.sha256() - mhasher.update(msg.encode('utf-8')) - m = mhasher.hexdigest() + m.update(msg) os.unlink(msg_out[1]) - p = None with open(patch_out[1], 'rb') as pfh: patch = pfh.read().decode(self.charset, errors='replace') if len(patch.strip()): diff = LoreMessage.get_clean_diff(patch) - phasher = hashlib.sha256() - phasher.update(diff.encode('utf-8')) - p = phasher.hexdigest() + p.update(diff.encode()) self.pwhash = LoreMessage.get_patchwork_hash(patch) # Load the indexes, if we have them self.blob_indexes = LoreMessage.get_indexes(diff) + else: + p = None + os.unlink(patch_out[1]) if i and m and p: @@ -1367,30 +1360,6 @@ class LoreMessage: am_msg.set_charset('utf-8') return am_msg - def _load_attestation(self, lore_lookup=True): - self.load_hashes() - if self.attestation: - self.attestation.validate(lore_lookup=lore_lookup) - - def get_attestation(self, lore_lookup=True, exact_from_match=True): - self._load_attestation(lore_lookup=lore_lookup) - if not self.attestation or not self.attestation.passing: - return None - - for attdoc in self.attestation.attdocs: - if not exact_from_match: - # We return the first hit - return attdoc - # Does this doc have an exact match? - uid = attdoc.lsig.attestor.get_matching_uid(self.fromemail) - if uid[1] == self.fromemail: - return attdoc - # stick an error in the first available attdoc saying - # that exact from match failed - self.attestation.attdocs[0].errors.add('Exact UID match failed for %s' % self.fromemail) - - return None - class LoreSubject: def __init__(self, subject): @@ -1488,11 +1457,13 @@ class LoreAttestor: global SUBKEY_DATA if self.keyid not in SUBKEY_DATA: gpgargs = ['--with-colons', '--list-keys', self.keyid] - ecode, keyinfo = gpg_run_command(gpgargs) + ecode, out, err = gpg_run_command(gpgargs) if ecode > 0: logger.critical('ERROR: Unable to get UIDs list matching key %s', self.keyid) return + keyinfo = out.decode() + uids = list() for line in keyinfo.split('\n'): if line[:4] != 'uid:': @@ -1538,6 +1509,7 @@ class LoreAttestationSignature: self.good = False self.valid = False self.trusted = False + self.passing = False self.sigdate = None self.attestor = None self.errors = set() @@ -1560,6 +1532,7 @@ class LoreAttestationSignature: if ts_matches: logger.debug(' TRUST_%s', ts_matches.groups()[0]) self.trusted = True + self.passing = True else: self.errors.add('Insufficient trust (model=%s): %s (%s)' % (trustmodel, keyid, puid)) @@ -1585,176 +1558,121 @@ class LoreAttestationSignature: return '\n'.join(out) -class LoreAttestationDocument: - def __init__(self, source, sigdata): - self.source = source - self.lsig = None - self.passing = False - self.hashes = set() - self.errors = set() - - gpgargs = ['--verify', '--status-fd=1'] - config = get_main_config() - if config['attestation-trust-model'] == 'tofu': - gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good'] - - logger.debug('Validating document obtained from %s', self.source) - ecode, output = gpg_run_command(gpgargs, stdin=sigdata.encode('utf-8')) - self.lsig = LoreAttestationSignature(output, config['attestation-trust-model']) - self.errors.update(self.lsig.errors) +class LoreAttestation: + def __init__(self, _i, _m, _p): + self.i = _i.hexdigest() + self.m = _m.hexdigest() + self.p = _p.hexdigest() + self.ib = base64.b64encode(_i.digest()).decode() + self.mb = base64.b64encode(_m.digest()).decode() + self.pb = base64.b64encode(_p.digest()).decode() - if self.lsig.good and self.lsig.valid and self.lsig.trusted: - self.passing = True - else: - # Not going any further - return + self.hashes_header_name = 'X-Patch-Hashes' + self.sig_header_name = 'X-Patch-Sig' - if source.find('http') == 0: - # We only cache known-good attestations obtained from remote - save_cache(sigdata, source, suffix='attestation') - - hg = [None, None, None] - for line in sigdata.split('\n'): - # It's a yaml document, but we don't parse it as yaml for safety reasons - line = line.rstrip() - if re.search(r'^([0-9a-f-]{26}:|-----BEGIN.*)$', line): - if None not in hg: - self.hashes.add(tuple(hg)) - hg = [None, None, None] - continue - matches = re.search(r'^\s+([imp]):\s*([0-9a-f]{64})$', line) - if matches: - t, v = matches.groups() - if t == 'i': - hg[0] = v - elif t == 'm': - hg[1] = v - elif t == 'p': - hg[2] = v + self.lsig = None + self.passing = False + self.iv = False + self.mv = False + self.pv = False def __repr__(self): out = list() - out.append(' source: %s' % self.source) - out.append(' --- validation errors ---') - for error in self.errors: - out.append(' | %s' % error) - out.append(' --- hashes ---') - for hg in self.hashes: - out.append(' | %s-%s-%s' % (hg[0][:8], hg[1][:8], hg[2][:8])) - ret = '\n'.join(out) + '\n' + str(self.lsig) - return ret + out.append(' i: %s' % self.i) + out.append(' m: %s' % self.m) + out.append(' p: %s' % self.p) + out.append(' ib: %s' % self.ib) + out.append(' mb: %s' % self.mb) + out.append(' pb: %s' % self.pb) + out.append(' iv: %s' % self.iv) + out.append(' mv: %s' % self.mv) + out.append(' pv: %s' % self.pv) + return '\n'.join(out) @staticmethod - def get_from_cache(attid): - cachedir = get_cache_dir() - attdocs = list() - for entry in os.listdir(cachedir): - if entry.find('.attestation') <= 0: - continue - fullpath = os.path.join(cachedir, entry) - with open(fullpath, 'r') as fh: - content = fh.read() - # Can't be 0, because it has to have pgp ascii wrapper - if content.find(attid) > 0: - attdoc = LoreAttestationDocument(fullpath, content) - attdocs.append(attdoc) - return attdocs + def verify_identity_domain(msg, identity: str, domain: str) -> bool: + # Domain is supposed to be present in identity + if not identity.endswith(domain): + logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity) + return False + fromeml = email.utils.getaddresses(msg.get_all('from', []))[0][1] + if identity.find('@') < 0: + logger.debug('identity must contain @ (i=%s)', identity) + return False + ilocal, idomain = identity.split('@') + # identity is supposed to be present in from + if not fromeml.endswith(f'@{idomain}'): + logger.debug('identity (i=%s) does not match from (from=%s)', identity, fromeml) + return False + logger.debug('identity and domain match From header') + return True @staticmethod - def get_from_lore(attid): - attdocs = list() - # XXX: Querying this via the Atom feed is a temporary kludge until we have - # proper search API on lore.kernel.org - status = None - cachedata = get_cache(attid, suffix='lookup') - if cachedata: - try: - status = int(cachedata) - except ValueError: - pass - if status is not None and status != 200: - logger.debug('Cache says looking up %s = %s', attid, status) - return attdocs - + def get_gpg_attestation(smsg: bytes, dsig: bytes) -> LoreAttestationSignature: + # We can't pass both the detached sig and the content on stdin, so + # use a temporary file + savefile = mkstemp('in-header-pgp-verify')[1] + with open(savefile, 'wb') as fh: + fh.write(dsig) + + gpgargs = list() config = get_main_config() - queryurl = '%s?%s' % (config['attestation-query-url'], - urllib.parse.urlencode({'q': attid, 'x': 'A', 'o': '-1'})) - logger.debug('Query URL: %s', queryurl) - session = get_requests_session() - resp = session.get(queryurl) - if resp.status_code != 200: - # Record this as a bad hit - save_cache(str(resp.status_code), attid, suffix='lookup') - - matches = re.findall( - r'link\s+href="([^"]+)".*?(-----BEGIN PGP SIGNED MESSAGE-----.*?-----END PGP SIGNATURE-----)', - resp.content.decode('utf-8'), flags=re.DOTALL - ) - - if matches: - for link, sigdata in matches: - attdoc = LoreAttestationDocument(link, sigdata) - attdocs.append(attdoc) - - return attdocs + if config['attestation-trust-model'] == 'tofu': + gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good'] + gpgargs += ['--verify', '--status-fd=1', savefile, '-'] + ecode, out, err = gpg_run_command(gpgargs, stdin=smsg) + os.unlink(savefile) + return LoreAttestationSignature(out.decode(), config['attestation-trust-model']) @staticmethod - def load_from_file(afile): - global ATTESTATIONS - with open(afile, 'r') as fh: - sigdata = fh.read() - ATTESTATIONS.append(LoreAttestationDocument(afile, sigdata)) + def dkim_canonicalize_header(hname, hval): + hname = hname.lower() + hval = hval.strip() + hval = re.sub(r'\n', '', hval) + hval = re.sub(r'\s+', ' ', hval) + return hname, hval @staticmethod - def load_from_string(source, content): - global ATTESTATIONS - ATTESTATIONS.append(LoreAttestationDocument(source, content)) - - -class LoreAttestation: - def __init__(self, i, m, p): - self.attid = '%s-%s-%s' % (i[:8], m[:8], p[:8]) - self.i = i - self.m = m - self.p = p - self.passing = False - self.attdocs = list() + def get_parts_from_header(hstr: str) -> dict: + hstr = re.sub(r'\s*', '', hstr) + hdata = dict() + for chunk in hstr.split(';'): + parts = chunk.split('=', 1) + if len(parts) < 2: + continue + hdata[parts[0]] = parts[1] + return hdata - def _check_if_passing(self): - global ATTESTATIONS - hg = (self.i, self.m, self.p) - for attdoc in ATTESTATIONS: - if hg in attdoc.hashes and attdoc.passing: + def validate(self, msg): + shdr = msg.get(self.sig_header_name) + if shdr is None: + return None + sdata = LoreAttestation.get_parts_from_header(shdr) + sig = base64.b64decode(sdata['b']) + headers = list() + hhname, hhval = LoreAttestation.dkim_canonicalize_header(self.hashes_header_name, + str(msg.get(self.hashes_header_name))) + headers.append(f'{hhname}:{hhval}') + # Now we add the sig header itself, without b= content + shname, shval = LoreAttestation.dkim_canonicalize_header(self.sig_header_name, shdr) + shval = shval.rsplit('; b=')[0] + '; b=' + headers.append(f'{shname}:{shval}') + payload = ('\r\n'.join(headers)).encode() + self.lsig = LoreAttestation.get_gpg_attestation(payload, sig) + if self.lsig.passing: + hdata = LoreAttestation.get_parts_from_header(hhval) + if hdata['i'] == self.ib: + self.iv = True + if hdata['m'] == self.mb: + self.mv = True + if hdata['p'] == self.pb: + self.pv = True + + if self.iv and self.mv and self.pv: self.passing = True - self.attdocs.append(attdoc) - - def validate(self, lore_lookup=True): - global ATTESTATIONS - self._check_if_passing() - - if not len(self.attdocs): - attdocs = LoreAttestationDocument.get_from_cache(self.attid) - ATTESTATIONS += attdocs - self._check_if_passing() - if not len(self.attdocs) and lore_lookup: - attdocs = LoreAttestationDocument.get_from_lore(self.attid) - ATTESTATIONS += attdocs - self._check_if_passing() - def __repr__(self): - out = list() - out.append(' attid: %s' % self.attid) - out.append(' i: %s' % self.i) - out.append(' m: %s' % self.m) - out.append(' p: %s' % self.p) - out.append(' --- attdocs ---') - for attdoc in self.attdocs: - out.append(str(attdoc)) - return '\n'.join(out) - - -def _run_command(cmdargs, stdin=None, logstderr=False): +def _run_command(cmdargs, stdin=None): logger.debug('Running %s' % ' '.join(cmdargs)) sp = subprocess.Popen(cmdargs, @@ -1764,24 +1682,17 @@ def _run_command(cmdargs, stdin=None, logstderr=False): (output, error) = sp.communicate(input=stdin) - output = output.decode('utf-8', errors='replace') - - if logstderr and len(error.strip()): - errout = error.decode('utf-8', errors='replace') - logger.debug('Stderr: %s', errout) - output += errout + return sp.returncode, output, error - return sp.returncode, output - -def gpg_run_command(args, stdin=None, logstderr=False): +def gpg_run_command(args, stdin=None): config = get_main_config() cmdargs = [config['gpgbin'], '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] if config['attestation-gnupghome'] is not None: cmdargs += ['--homedir', config['attestation-gnupghome']] cmdargs += args - return _run_command(cmdargs, stdin=stdin, logstderr=logstderr) + return _run_command(cmdargs, stdin=stdin) def git_run_command(gitdir, args, stdin=None, logstderr=False): @@ -1792,7 +1703,16 @@ def git_run_command(gitdir, args, stdin=None, logstderr=False): cmdargs += ['--git-dir', gitdir] cmdargs += args - return _run_command(cmdargs, stdin=stdin, logstderr=logstderr) + ecode, out, err = _run_command(cmdargs, stdin=stdin) + + out = out.decode(errors='replace') + + if logstderr and len(err.strip()): + err = err.decode(errors='replace') + logger.debug('Stderr: %s', err) + out += err + + return ecode, out def git_get_command_lines(gitdir, args): |