From ef3e6ea50b1740dcda53ee8dc065611896b3a7db Mon Sep 17 00:00:00 2001 From: Konstantin Ryabitsev Date: Fri, 20 Nov 2020 16:04:44 -0500 Subject: Fix signature verification for b4 pr We moved pgp sig verification code around, so fix it for the invocation in b4 pr. Signed-off-by: Konstantin Ryabitsev --- b4/__init__.py | 76 ++++++++++++++++++++++++++++++++++++---------------------- b4/attest.py | 4 ++-- b4/pr.py | 14 ++++++----- 3 files changed, 57 insertions(+), 37 deletions(-) diff --git a/b4/__init__.py b/b4/__init__.py index ab5797d..ac0e85c 100644 --- a/b4/__init__.py +++ b/b4/__init__.py @@ -1753,35 +1753,11 @@ class LoreAttestationSignaturePGP(LoreAttestationSignature): os.unlink(savefile) output = out.decode() - gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M) - if gs_matches: - logger.debug(' GOODSIG') - self.good = True - keyid = gs_matches.groups()[0] - self.attestor = LoreAttestorPGP(keyid) - puid = '%s <%s>' % self.attestor.get_primary_uid() - vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M) - if vs_matches: - logger.debug(' VALIDSIG') - self.valid = True - ymd = vs_matches.groups()[1] - self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc) - # Do we have a TRUST_(FULLY|ULTIMATE)? - ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M) - if ts_matches: - logger.debug(' TRUST_%s', ts_matches.groups()[0]) - self.trusted = True - self.passing = True - else: - self.errors.add('Insufficient trust (model=%s): %s (%s)' - % (trustmodel, keyid, puid)) - else: - self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid)) - else: - # Are we missing a key? - matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M) - if matches: - self.errors.add('Missing public key: %s' % matches.groups()[0]) + self.good, self.valid, self.trusted, self.attestor, self.sigdate, self.errors = \ + validate_gpg_signature(output, trustmodel) + + if self.good and self.valid and self.trusted: + self.passing = True # A couple of final verifications self.verify_time_drift() @@ -2319,6 +2295,48 @@ def get_parts_from_header(hstr: str) -> dict: return hdata +def validate_gpg_signature(output, trustmodel): + good = False + valid = False + trusted = False + attestor = None + sigdate = None + errors = set() + gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M) + if gs_matches: + logger.debug(' GOODSIG') + good = True + keyid = gs_matches.groups()[0] + attestor = LoreAttestorPGP(keyid) + puid = '%s <%s>' % attestor.get_primary_uid() + vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M) + if vs_matches: + logger.debug(' VALIDSIG') + valid = True + ymd = vs_matches.groups()[1] + sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc) + # Do we have a TRUST_(FULLY|ULTIMATE)? + ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M) + if ts_matches: + logger.debug(' TRUST_%s', ts_matches.groups()[0]) + trusted = True + else: + errors.add('Insufficient trust (model=%s): %s (%s)' % (trustmodel, keyid, puid)) + else: + errors.add('Signature not valid from key: %s (%s)' % (attestor.keyid, puid)) + else: + # Are we missing a key? + matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M) + if matches: + errors.add('Missing public key: %s' % matches.groups()[0]) + # Is the key expired? + matches = re.search(r'^\[GNUPG:] EXPKEYSIG (.*)$', output, re.M) + if matches: + errors.add('Expired key: %s' % matches.groups()[0]) + + return good, valid, trusted, attestor, sigdate, errors + + def dkim_get_txt(name: bytes, timeout: int = 5): global _DKIM_DNS_CACHE if name not in _DKIM_DNS_CACHE: diff --git a/b4/attest.py b/b4/attest.py index 4391b19..a4012d7 100644 --- a/b4/attest.py +++ b/b4/attest.py @@ -39,7 +39,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa f'm={lmsg.attestation.mb}', f'p={lmsg.attestation.pb}', ] - hhname, hhval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts)) + hhname, hhval = b4.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts)) headers.append(f'{hhname}:{hhval}') logger.debug('Signing with mode=%s', mode) @@ -59,7 +59,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa 'b=', ] - shname, shval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts)) + shname, shval = b4.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts)) headers.append(f'{shname}:{shval}') payload = '\r\n'.join(headers).encode() ecode, out, err = b4.gpg_run_command(gpgargs, payload) diff --git a/b4/pr.py b/b4/pr.py index 40d3127..b7ed9e1 100644 --- a/b4/pr.py +++ b/b4/pr.py @@ -127,27 +127,29 @@ def attest_fetch_head(gitdir, lmsg): ecode, out = b4.git_run_command(gitdir, ['verify-tag', '--raw', 'FETCH_HEAD'], logstderr=True) elif otype == 'commit': ecode, out = b4.git_run_command(gitdir, ['verify-commit', '--raw', 'FETCH_HEAD'], logstderr=True) - lsig = b4.LoreAttestationSignature(out, 'git') - if lsig.good and lsig.valid and lsig.trusted: + + good, valid, trusted, attestor, sigdate, errors = b4.validate_gpg_signature(out, 'pgp') + + if good and valid and trusted: passing = True out = out.strip() if not len(out) and attpolicy != 'check': - lsig.errors.add('Remote %s is not signed!' % otype) + errors.add('Remote %s is not signed!' % otype) if passing: - trailer = lsig.attestor.get_trailer(lmsg.fromemail) + trailer = attestor.get_trailer(lmsg.fromemail) logger.info(' ---') logger.info(' %s %s', attpass, trailer) return - if lsig.errors: + if errors: logger.critical(' ---') if len(out): logger.critical(' Pull request is signed, but verification did not succeed:') else: logger.critical(' Pull request verification did not succeed:') - for error in lsig.errors: + for error in errors: logger.critical(' %s %s', attfail, error) if attpolicy == 'hardfail': -- cgit v1.2.3