From e123952efd144401a198ab1f8337eb2529e26f95 Mon Sep 17 00:00:00 2001 From: Konstantin Ryabitsev Date: Mon, 30 Mar 2020 17:40:08 -0400 Subject: Add attestation checks for b4 pr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We now use similar subroutines for checking signatures on FETCH_HEAD as we do for patch attestation, making it a convenient operation during the fetch stage: $ b4 pr 202003292114.2252CAEF7@keescook Looking up https://lore.kernel.org/r/202003292114.2252CAEF7@keescook Grabbing thread from lore.kernel.org Looking at: [GIT PULL] seccomp updates for v5.7-rc1 Fetching https://git.kernel.org/pub/scm/linux/kernel/git/kees/linux.git tags/seccomp-v5.7-rc1 --- [✓] Attestation-by: Kees Cook (pgp: 8972F4DFDC6DC026) --- Successfully fetched into FETCH_HEAD Hopefully, I didn't introduce too many bugs into patch attestation, since I had to rewrite the backend a bit to work for both native git operations and patch attestation calls. Signed-off-by: Konstantin Ryabitsev --- b4/__init__.py | 115 +++++++++++++++++++++++++++++++++------------------------ b4/attest.py | 2 +- b4/pr.py | 56 +++++++++++++++++++++++++++- 3 files changed, 122 insertions(+), 51 deletions(-) diff --git a/b4/__init__.py b/b4/__init__.py index b4ce2d4..41f8dbc 100644 --- a/b4/__init__.py +++ b/b4/__init__.py @@ -437,7 +437,7 @@ class LoreSeries: attpolicy = 'softfail' # Make sure it's not too old compared to the message date # Timezone doesn't matter as we calculate whole days - tdelta = lmsg.date.replace(tzinfo=None) - attdoc.sigdate + tdelta = lmsg.date.replace(tzinfo=None) - attdoc.lsig.sigdate if tdelta.days > attstaled: # Uh-oh, attestation is too old! logger.info(' %s %s', attfail, lmsg.full_subject) @@ -445,7 +445,7 @@ class LoreSeries: attstaled, tdelta.days)) else: logger.info(' %s %s', attpass, lmsg.full_subject) - attdata[at-1] = attdoc.attestor.get_trailer(lmsg.fromemail) + attdata[at-1] = attdoc.lsig.attestor.get_trailer(lmsg.fromemail) else: logger.info(' %s', lmsg.full_subject) @@ -876,7 +876,7 @@ class LoreMessage: # We return the first hit return attdoc # Does this doc have an exact match? - uid = attdoc.attestor.get_matching_uid(self.fromemail) + uid = attdoc.lsig.attestor.get_matching_uid(self.fromemail) if uid[1] == self.fromemail: return attdoc # stick an error in the first available attdoc saying @@ -1022,15 +1022,63 @@ class LoreAttestor: return '\n'.join(out) -class LoreAttestationDocument: - def __init__(self, source, sigdata): - self.source = source +class LoreAttestationSignature: + def __init__(self, output, trustmodel): self.good = False self.valid = False self.trusted = False self.sigdate = None - self.passing = False self.attestor = None + self.errors = set() + + gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M) + if gs_matches: + logger.debug(' GOODSIG') + self.good = True + keyid = gs_matches.groups()[0] + self.attestor = LoreAttestor(keyid) + puid = '%s <%s>' % self.attestor.get_primary_uid() + vs_matches = re.search(r'^\[GNUPG:\] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M) + if vs_matches: + logger.debug(' VALIDSIG') + self.valid = True + ymd = vs_matches.groups()[1] + self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d') + # Do we have a TRUST_(FULLY|ULTIMATE)? + ts_matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M) + if ts_matches: + logger.debug(' TRUST_%s', ts_matches.groups()[0]) + self.trusted = True + else: + self.errors.add('Insufficient trust (model=%s): %s (%s)' + % (trustmodel, keyid, puid)) + else: + self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid)) + else: + # Are we missing a key? + matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M) + if matches: + self.errors.add('Missing public key: %s' % matches.groups()[0]) + + def __repr__(self): + out = list() + out.append(' good: %s' % self.good) + out.append(' valid: %s' % self.valid) + out.append(' trusted: %s' % self.trusted) + if self.attestor is not None: + out.append(' attestor: %s' % self.attestor.keyid) + + out.append(' --- validation errors ---') + for error in self.errors: + out.append(' | %s' % error) + return '\n'.join(out) + + +class LoreAttestationDocument: + def __init__(self, source, sigdata): + self.source = source + self.lsig = None + self.passing = False self.hashes = set() self.errors = set() @@ -1041,42 +1089,14 @@ class LoreAttestationDocument: logger.debug('Validating document obtained from %s', self.source) ecode, output = gpg_run_command(gpgargs, stdin=sigdata.encode('utf-8')) - if ecode == 0: - # We're looking for both GOODSIG and VALIDSIG - gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M) - if gs_matches: - logger.debug(' GOODSIG') - self.good = True - keyid = gs_matches.groups()[0] - self.attestor = LoreAttestor(keyid) - puid = '%s <%s>' % self.attestor.get_primary_uid() - vs_matches = re.search(r'^\[GNUPG:\] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M) - if vs_matches: - logger.debug(' VALIDSIG') - self.valid = True - ymd = vs_matches.groups()[1] - self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d') - # Do we have a TRUST_(FULLY|ULTIMATE)? - ts_matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M) - if ts_matches: - logger.debug(' TRUST_%s', ts_matches.groups()[0]) - self.trusted = True - else: - self.errors.add('Insufficient trust (model=%s): %s (%s)' - % (config['attestation-trust-model'], keyid, puid)) - else: - self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid)) - else: - # Are we missing a key? - matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M) - if matches: - self.errors.add('Missing public key: %s' % matches.groups()[0]) - else: - logger.debug('NOGOOD: Signature on %s failed to verify', self.source) - return + self.lsig = LoreAttestationSignature(output, config['attestation-trust-model']) + self.errors.update(self.lsig.errors) - if self.good and self.valid and self.trusted: + if self.lsig.good and self.lsig.valid and self.lsig.trusted: self.passing = True + else: + # Not going any further + return if source.find('http') == 0: # We only cache known-good attestations obtained from remote @@ -1109,19 +1129,14 @@ class LoreAttestationDocument: def __repr__(self): out = list() out.append(' source: %s' % self.source) - out.append(' good: %s' % self.good) - out.append(' valid: %s' % self.valid) - out.append(' trusted: %s' % self.trusted) - if self.attestor is not None: - out.append(' attestor: %s' % self.attestor.keyid) - out.append(' --- validation errors ---') for error in self.errors: out.append(' | %s' % error) out.append(' --- hashes ---') for hg in self.hashes: out.append(' | %s-%s-%s' % (hg[0][:8], hg[1][:8], hg[2][:8])) - return '\n'.join(out) + ret = '\n'.join(out) + '\n' + str(self.lsig) + return ret @staticmethod def get_from_cache(attid): @@ -1249,7 +1264,9 @@ def _run_command(cmdargs, stdin=None, logstderr=False): output = output.decode('utf-8', errors='replace') if logstderr and len(error.strip()): - logger.debug('Stderr: %s', error.decode('utf-8', errors='replace')) + errout = error.decode('utf-8', errors='replace') + logger.debug('Stderr: %s', errout) + output += errout return sp.returncode, output diff --git a/b4/attest.py b/b4/attest.py index 35c062b..5f9a2b4 100644 --- a/b4/attest.py +++ b/b4/attest.py @@ -161,7 +161,7 @@ def verify_attestation(cmdargs): if ecode != 128: ecode = 0 logger.critical('%s %s', attpass, lmsg.full_subject) - attrailers.add(attdoc.attestor.get_trailer(lmsg.fromemail)) + attrailers.add(attdoc.lsig.attestor.get_trailer(lmsg.fromemail)) logger.critical('---') if ecode > 0: diff --git a/b4/pr.py b/b4/pr.py index 55261f1..0ed9277 100644 --- a/b4/pr.py +++ b/b4/pr.py @@ -118,6 +118,55 @@ def parse_pr_data(msg): return lmsg +def attest_fetch_head(gitdir, lmsg): + config = b4.get_main_config() + attpolicy = config['attestation-policy'] + if config['attestation-checkmarks'] == 'fancy': + attpass = b4.PASS_FANCY + attfail = b4.FAIL_FANCY + else: + attpass = b4.PASS_SIMPLE + attfail = b4.FAIL_SIMPLE + # Is FETCH_HEAD a tag or a commit? + htype = b4.git_get_command_lines(gitdir, ['cat-file', '-t', 'FETCH_HEAD']) + lsig = None + passing = False + out = '' + otype = 'unknown' + if len(htype): + otype = htype[0] + if otype == 'tag': + ecode, out = b4.git_run_command(gitdir, ['verify-tag', '--raw', 'FETCH_HEAD'], logstderr=True) + elif otype == 'commit': + ecode, out = b4.git_run_command(gitdir, ['verify-commit', '--raw', 'FETCH_HEAD'], logstderr=True) + lsig = b4.LoreAttestationSignature(out, 'git') + if lsig.good and lsig.valid and lsig.trusted: + passing = True + + out = out.strip() + if not len(out) and attpolicy != 'check': + lsig.errors.add('Remote %s is not signed!' % otype) + + if passing: + trailer = lsig.attestor.get_trailer(lmsg.fromemail) + logger.info(' ---') + logger.info(' %s %s', attpass, trailer) + return + + if lsig.errors: + logger.critical(' ---') + if len(out): + logger.critical(' Pull request is signed, but verification did not succeed:') + else: + logger.critical(' Pull request verification did not succeed:') + for error in lsig.errors: + logger.critical(' %s %s', attfail, error) + + if attpolicy == 'hardfail': + import sys + sys.exit(128) + + def fetch_remote(gitdir, lmsg, branch=None): # Do we know anything about this base commit? if lmsg.pr_base_commit and not git_commit_exists(gitdir, lmsg.pr_base_commit): @@ -139,6 +188,10 @@ def fetch_remote(gitdir, lmsg, branch=None): logger.critical(out) return ecode + config = b4.get_main_config() + if config['attestation-policy'] != 'off': + attest_fetch_head(gitdir, lmsg) + logger.info('---') if branch: gitargs = ['checkout', '-b', branch, 'FETCH_HEAD'] @@ -244,7 +297,7 @@ def main(cmdargs): mbx.close() os.unlink(savefile) - if lmsg is None: + if lmsg is None or lmsg.pr_remote_tip_commit is None: logger.critical('ERROR: Could not find pull request info in %s', msgid) sys.exit(1) @@ -282,6 +335,7 @@ def main(cmdargs): if len(loglines) and loglines[0].find(lmsg.pr_tip_commit) == 0: logger.info('Pull request is at the tip of FETCH_HEAD') if cmdargs.check: + attest_fetch_head(gitdir, lmsg) sys.exit(0) elif cmdargs.check: -- cgit v1.2.3