summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--b4/__init__.py76
-rw-r--r--b4/attest.py4
-rw-r--r--b4/pr.py14
3 files changed, 57 insertions, 37 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
index ab5797d..ac0e85c 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -1753,35 +1753,11 @@ class LoreAttestationSignaturePGP(LoreAttestationSignature):
os.unlink(savefile)
output = out.decode()
- gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
- if gs_matches:
- logger.debug(' GOODSIG')
- self.good = True
- keyid = gs_matches.groups()[0]
- self.attestor = LoreAttestorPGP(keyid)
- puid = '%s <%s>' % self.attestor.get_primary_uid()
- vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
- if vs_matches:
- logger.debug(' VALIDSIG')
- self.valid = True
- ymd = vs_matches.groups()[1]
- self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
- # Do we have a TRUST_(FULLY|ULTIMATE)?
- ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
- if ts_matches:
- logger.debug(' TRUST_%s', ts_matches.groups()[0])
- self.trusted = True
- self.passing = True
- else:
- self.errors.add('Insufficient trust (model=%s): %s (%s)'
- % (trustmodel, keyid, puid))
- else:
- self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid))
- else:
- # Are we missing a key?
- matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
- if matches:
- self.errors.add('Missing public key: %s' % matches.groups()[0])
+ self.good, self.valid, self.trusted, self.attestor, self.sigdate, self.errors = \
+ validate_gpg_signature(output, trustmodel)
+
+ if self.good and self.valid and self.trusted:
+ self.passing = True
# A couple of final verifications
self.verify_time_drift()
@@ -2319,6 +2295,48 @@ def get_parts_from_header(hstr: str) -> dict:
return hdata
+def validate_gpg_signature(output, trustmodel):
+ good = False
+ valid = False
+ trusted = False
+ attestor = None
+ sigdate = None
+ errors = set()
+ gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
+ if gs_matches:
+ logger.debug(' GOODSIG')
+ good = True
+ keyid = gs_matches.groups()[0]
+ attestor = LoreAttestorPGP(keyid)
+ puid = '%s <%s>' % attestor.get_primary_uid()
+ vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
+ if vs_matches:
+ logger.debug(' VALIDSIG')
+ valid = True
+ ymd = vs_matches.groups()[1]
+ sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
+ # Do we have a TRUST_(FULLY|ULTIMATE)?
+ ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
+ if ts_matches:
+ logger.debug(' TRUST_%s', ts_matches.groups()[0])
+ trusted = True
+ else:
+ errors.add('Insufficient trust (model=%s): %s (%s)' % (trustmodel, keyid, puid))
+ else:
+ errors.add('Signature not valid from key: %s (%s)' % (attestor.keyid, puid))
+ else:
+ # Are we missing a key?
+ matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
+ if matches:
+ errors.add('Missing public key: %s' % matches.groups()[0])
+ # Is the key expired?
+ matches = re.search(r'^\[GNUPG:] EXPKEYSIG (.*)$', output, re.M)
+ if matches:
+ errors.add('Expired key: %s' % matches.groups()[0])
+
+ return good, valid, trusted, attestor, sigdate, errors
+
+
def dkim_get_txt(name: bytes, timeout: int = 5):
global _DKIM_DNS_CACHE
if name not in _DKIM_DNS_CACHE:
diff --git a/b4/attest.py b/b4/attest.py
index 4391b19..a4012d7 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -39,7 +39,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
f'm={lmsg.attestation.mb}',
f'p={lmsg.attestation.pb}',
]
- hhname, hhval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
+ hhname, hhval = b4.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
headers.append(f'{hhname}:{hhval}')
logger.debug('Signing with mode=%s', mode)
@@ -59,7 +59,7 @@ def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = Fa
'b=',
]
- shname, shval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
+ shname, shval = b4.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
headers.append(f'{shname}:{shval}')
payload = '\r\n'.join(headers).encode()
ecode, out, err = b4.gpg_run_command(gpgargs, payload)
diff --git a/b4/pr.py b/b4/pr.py
index 40d3127..b7ed9e1 100644
--- a/b4/pr.py
+++ b/b4/pr.py
@@ -127,27 +127,29 @@ def attest_fetch_head(gitdir, lmsg):
ecode, out = b4.git_run_command(gitdir, ['verify-tag', '--raw', 'FETCH_HEAD'], logstderr=True)
elif otype == 'commit':
ecode, out = b4.git_run_command(gitdir, ['verify-commit', '--raw', 'FETCH_HEAD'], logstderr=True)
- lsig = b4.LoreAttestationSignature(out, 'git')
- if lsig.good and lsig.valid and lsig.trusted:
+
+ good, valid, trusted, attestor, sigdate, errors = b4.validate_gpg_signature(out, 'pgp')
+
+ if good and valid and trusted:
passing = True
out = out.strip()
if not len(out) and attpolicy != 'check':
- lsig.errors.add('Remote %s is not signed!' % otype)
+ errors.add('Remote %s is not signed!' % otype)
if passing:
- trailer = lsig.attestor.get_trailer(lmsg.fromemail)
+ trailer = attestor.get_trailer(lmsg.fromemail)
logger.info(' ---')
logger.info(' %s %s', attpass, trailer)
return
- if lsig.errors:
+ if errors:
logger.critical(' ---')
if len(out):
logger.critical(' Pull request is signed, but verification did not succeed:')
else:
logger.critical(' Pull request verification did not succeed:')
- for error in lsig.errors:
+ for error in errors:
logger.critical(' %s %s', attfail, error)
if attpolicy == 'hardfail':