aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-03-30 17:40:08 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-03-30 17:43:47 -0400
commite123952efd144401a198ab1f8337eb2529e26f95 (patch)
treea63345f0bf0fdc1bc40fbadbcf8379fe993e94ee
parentcd0b996f37a75e212614c23df9020e2022491647 (diff)
downloadb4-e123952efd144401a198ab1f8337eb2529e26f95.tar.gz
Add attestation checks for b4 pr
We now use similar subroutines for checking signatures on FETCH_HEAD as we do for patch attestation, making it a convenient operation during the fetch stage: $ b4 pr 202003292114.2252CAEF7@keescook Looking up https://lore.kernel.org/r/202003292114.2252CAEF7@keescook Grabbing thread from lore.kernel.org Looking at: [GIT PULL] seccomp updates for v5.7-rc1 Fetching https://git.kernel.org/pub/scm/linux/kernel/git/kees/linux.git tags/seccomp-v5.7-rc1 --- [✓] Attestation-by: Kees Cook <keescook@chromium.org> (pgp: 8972F4DFDC6DC026) --- Successfully fetched into FETCH_HEAD Hopefully, I didn't introduce too many bugs into patch attestation, since I had to rewrite the backend a bit to work for both native git operations and patch attestation calls. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r--b4/__init__.py115
-rw-r--r--b4/attest.py2
-rw-r--r--b4/pr.py56
3 files changed, 122 insertions, 51 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
index b4ce2d4..41f8dbc 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -437,7 +437,7 @@ class LoreSeries:
attpolicy = 'softfail'
# Make sure it's not too old compared to the message date
# Timezone doesn't matter as we calculate whole days
- tdelta = lmsg.date.replace(tzinfo=None) - attdoc.sigdate
+ tdelta = lmsg.date.replace(tzinfo=None) - attdoc.lsig.sigdate
if tdelta.days > attstaled:
# Uh-oh, attestation is too old!
logger.info(' %s %s', attfail, lmsg.full_subject)
@@ -445,7 +445,7 @@ class LoreSeries:
attstaled, tdelta.days))
else:
logger.info(' %s %s', attpass, lmsg.full_subject)
- attdata[at-1] = attdoc.attestor.get_trailer(lmsg.fromemail)
+ attdata[at-1] = attdoc.lsig.attestor.get_trailer(lmsg.fromemail)
else:
logger.info(' %s', lmsg.full_subject)
@@ -876,7 +876,7 @@ class LoreMessage:
# We return the first hit
return attdoc
# Does this doc have an exact match?
- uid = attdoc.attestor.get_matching_uid(self.fromemail)
+ uid = attdoc.lsig.attestor.get_matching_uid(self.fromemail)
if uid[1] == self.fromemail:
return attdoc
# stick an error in the first available attdoc saying
@@ -1022,15 +1022,63 @@ class LoreAttestor:
return '\n'.join(out)
-class LoreAttestationDocument:
- def __init__(self, source, sigdata):
- self.source = source
+class LoreAttestationSignature:
+ def __init__(self, output, trustmodel):
self.good = False
self.valid = False
self.trusted = False
self.sigdate = None
- self.passing = False
self.attestor = None
+ self.errors = set()
+
+ gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
+ if gs_matches:
+ logger.debug(' GOODSIG')
+ self.good = True
+ keyid = gs_matches.groups()[0]
+ self.attestor = LoreAttestor(keyid)
+ puid = '%s <%s>' % self.attestor.get_primary_uid()
+ vs_matches = re.search(r'^\[GNUPG:\] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
+ if vs_matches:
+ logger.debug(' VALIDSIG')
+ self.valid = True
+ ymd = vs_matches.groups()[1]
+ self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d')
+ # Do we have a TRUST_(FULLY|ULTIMATE)?
+ ts_matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M)
+ if ts_matches:
+ logger.debug(' TRUST_%s', ts_matches.groups()[0])
+ self.trusted = True
+ else:
+ self.errors.add('Insufficient trust (model=%s): %s (%s)'
+ % (trustmodel, keyid, puid))
+ else:
+ self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid))
+ else:
+ # Are we missing a key?
+ matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
+ if matches:
+ self.errors.add('Missing public key: %s' % matches.groups()[0])
+
+ def __repr__(self):
+ out = list()
+ out.append(' good: %s' % self.good)
+ out.append(' valid: %s' % self.valid)
+ out.append(' trusted: %s' % self.trusted)
+ if self.attestor is not None:
+ out.append(' attestor: %s' % self.attestor.keyid)
+
+ out.append(' --- validation errors ---')
+ for error in self.errors:
+ out.append(' | %s' % error)
+ return '\n'.join(out)
+
+
+class LoreAttestationDocument:
+ def __init__(self, source, sigdata):
+ self.source = source
+ self.lsig = None
+ self.passing = False
self.hashes = set()
self.errors = set()
@@ -1041,42 +1089,14 @@ class LoreAttestationDocument:
logger.debug('Validating document obtained from %s', self.source)
ecode, output = gpg_run_command(gpgargs, stdin=sigdata.encode('utf-8'))
- if ecode == 0:
- # We're looking for both GOODSIG and VALIDSIG
- gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
- if gs_matches:
- logger.debug(' GOODSIG')
- self.good = True
- keyid = gs_matches.groups()[0]
- self.attestor = LoreAttestor(keyid)
- puid = '%s <%s>' % self.attestor.get_primary_uid()
- vs_matches = re.search(r'^\[GNUPG:\] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
- if vs_matches:
- logger.debug(' VALIDSIG')
- self.valid = True
- ymd = vs_matches.groups()[1]
- self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d')
- # Do we have a TRUST_(FULLY|ULTIMATE)?
- ts_matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M)
- if ts_matches:
- logger.debug(' TRUST_%s', ts_matches.groups()[0])
- self.trusted = True
- else:
- self.errors.add('Insufficient trust (model=%s): %s (%s)'
- % (config['attestation-trust-model'], keyid, puid))
- else:
- self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid))
- else:
- # Are we missing a key?
- matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
- if matches:
- self.errors.add('Missing public key: %s' % matches.groups()[0])
- else:
- logger.debug('NOGOOD: Signature on %s failed to verify', self.source)
- return
+ self.lsig = LoreAttestationSignature(output, config['attestation-trust-model'])
+ self.errors.update(self.lsig.errors)
- if self.good and self.valid and self.trusted:
+ if self.lsig.good and self.lsig.valid and self.lsig.trusted:
self.passing = True
+ else:
+ # Not going any further
+ return
if source.find('http') == 0:
# We only cache known-good attestations obtained from remote
@@ -1109,19 +1129,14 @@ class LoreAttestationDocument:
def __repr__(self):
out = list()
out.append(' source: %s' % self.source)
- out.append(' good: %s' % self.good)
- out.append(' valid: %s' % self.valid)
- out.append(' trusted: %s' % self.trusted)
- if self.attestor is not None:
- out.append(' attestor: %s' % self.attestor.keyid)
-
out.append(' --- validation errors ---')
for error in self.errors:
out.append(' | %s' % error)
out.append(' --- hashes ---')
for hg in self.hashes:
out.append(' | %s-%s-%s' % (hg[0][:8], hg[1][:8], hg[2][:8]))
- return '\n'.join(out)
+ ret = '\n'.join(out) + '\n' + str(self.lsig)
+ return ret
@staticmethod
def get_from_cache(attid):
@@ -1249,7 +1264,9 @@ def _run_command(cmdargs, stdin=None, logstderr=False):
output = output.decode('utf-8', errors='replace')
if logstderr and len(error.strip()):
- logger.debug('Stderr: %s', error.decode('utf-8', errors='replace'))
+ errout = error.decode('utf-8', errors='replace')
+ logger.debug('Stderr: %s', errout)
+ output += errout
return sp.returncode, output
diff --git a/b4/attest.py b/b4/attest.py
index 35c062b..5f9a2b4 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -161,7 +161,7 @@ def verify_attestation(cmdargs):
if ecode != 128:
ecode = 0
logger.critical('%s %s', attpass, lmsg.full_subject)
- attrailers.add(attdoc.attestor.get_trailer(lmsg.fromemail))
+ attrailers.add(attdoc.lsig.attestor.get_trailer(lmsg.fromemail))
logger.critical('---')
if ecode > 0:
diff --git a/b4/pr.py b/b4/pr.py
index 55261f1..0ed9277 100644
--- a/b4/pr.py
+++ b/b4/pr.py
@@ -118,6 +118,55 @@ def parse_pr_data(msg):
return lmsg
+def attest_fetch_head(gitdir, lmsg):
+ config = b4.get_main_config()
+ attpolicy = config['attestation-policy']
+ if config['attestation-checkmarks'] == 'fancy':
+ attpass = b4.PASS_FANCY
+ attfail = b4.FAIL_FANCY
+ else:
+ attpass = b4.PASS_SIMPLE
+ attfail = b4.FAIL_SIMPLE
+ # Is FETCH_HEAD a tag or a commit?
+ htype = b4.git_get_command_lines(gitdir, ['cat-file', '-t', 'FETCH_HEAD'])
+ lsig = None
+ passing = False
+ out = ''
+ otype = 'unknown'
+ if len(htype):
+ otype = htype[0]
+ if otype == 'tag':
+ ecode, out = b4.git_run_command(gitdir, ['verify-tag', '--raw', 'FETCH_HEAD'], logstderr=True)
+ elif otype == 'commit':
+ ecode, out = b4.git_run_command(gitdir, ['verify-commit', '--raw', 'FETCH_HEAD'], logstderr=True)
+ lsig = b4.LoreAttestationSignature(out, 'git')
+ if lsig.good and lsig.valid and lsig.trusted:
+ passing = True
+
+ out = out.strip()
+ if not len(out) and attpolicy != 'check':
+ lsig.errors.add('Remote %s is not signed!' % otype)
+
+ if passing:
+ trailer = lsig.attestor.get_trailer(lmsg.fromemail)
+ logger.info(' ---')
+ logger.info(' %s %s', attpass, trailer)
+ return
+
+ if lsig.errors:
+ logger.critical(' ---')
+ if len(out):
+ logger.critical(' Pull request is signed, but verification did not succeed:')
+ else:
+ logger.critical(' Pull request verification did not succeed:')
+ for error in lsig.errors:
+ logger.critical(' %s %s', attfail, error)
+
+ if attpolicy == 'hardfail':
+ import sys
+ sys.exit(128)
+
+
def fetch_remote(gitdir, lmsg, branch=None):
# Do we know anything about this base commit?
if lmsg.pr_base_commit and not git_commit_exists(gitdir, lmsg.pr_base_commit):
@@ -139,6 +188,10 @@ def fetch_remote(gitdir, lmsg, branch=None):
logger.critical(out)
return ecode
+ config = b4.get_main_config()
+ if config['attestation-policy'] != 'off':
+ attest_fetch_head(gitdir, lmsg)
+
logger.info('---')
if branch:
gitargs = ['checkout', '-b', branch, 'FETCH_HEAD']
@@ -244,7 +297,7 @@ def main(cmdargs):
mbx.close()
os.unlink(savefile)
- if lmsg is None:
+ if lmsg is None or lmsg.pr_remote_tip_commit is None:
logger.critical('ERROR: Could not find pull request info in %s', msgid)
sys.exit(1)
@@ -282,6 +335,7 @@ def main(cmdargs):
if len(loglines) and loglines[0].find(lmsg.pr_tip_commit) == 0:
logger.info('Pull request is at the tip of FETCH_HEAD')
if cmdargs.check:
+ attest_fetch_head(gitdir, lmsg)
sys.exit(0)
elif cmdargs.check: