aboutsummaryrefslogtreecommitdiff
path: root/b4/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'b4/__init__.py')
-rw-r--r--b4/__init__.py115
1 files changed, 66 insertions, 49 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
index b4ce2d4..41f8dbc 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -437,7 +437,7 @@ class LoreSeries:
attpolicy = 'softfail'
# Make sure it's not too old compared to the message date
# Timezone doesn't matter as we calculate whole days
- tdelta = lmsg.date.replace(tzinfo=None) - attdoc.sigdate
+ tdelta = lmsg.date.replace(tzinfo=None) - attdoc.lsig.sigdate
if tdelta.days > attstaled:
# Uh-oh, attestation is too old!
logger.info(' %s %s', attfail, lmsg.full_subject)
@@ -445,7 +445,7 @@ class LoreSeries:
attstaled, tdelta.days))
else:
logger.info(' %s %s', attpass, lmsg.full_subject)
- attdata[at-1] = attdoc.attestor.get_trailer(lmsg.fromemail)
+ attdata[at-1] = attdoc.lsig.attestor.get_trailer(lmsg.fromemail)
else:
logger.info(' %s', lmsg.full_subject)
@@ -876,7 +876,7 @@ class LoreMessage:
# We return the first hit
return attdoc
# Does this doc have an exact match?
- uid = attdoc.attestor.get_matching_uid(self.fromemail)
+ uid = attdoc.lsig.attestor.get_matching_uid(self.fromemail)
if uid[1] == self.fromemail:
return attdoc
# stick an error in the first available attdoc saying
@@ -1022,15 +1022,63 @@ class LoreAttestor:
return '\n'.join(out)
-class LoreAttestationDocument:
- def __init__(self, source, sigdata):
- self.source = source
+class LoreAttestationSignature:
+ def __init__(self, output, trustmodel):
self.good = False
self.valid = False
self.trusted = False
self.sigdate = None
- self.passing = False
self.attestor = None
+ self.errors = set()
+
+ gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
+ if gs_matches:
+ logger.debug(' GOODSIG')
+ self.good = True
+ keyid = gs_matches.groups()[0]
+ self.attestor = LoreAttestor(keyid)
+ puid = '%s <%s>' % self.attestor.get_primary_uid()
+ vs_matches = re.search(r'^\[GNUPG:\] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
+ if vs_matches:
+ logger.debug(' VALIDSIG')
+ self.valid = True
+ ymd = vs_matches.groups()[1]
+ self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d')
+ # Do we have a TRUST_(FULLY|ULTIMATE)?
+ ts_matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M)
+ if ts_matches:
+ logger.debug(' TRUST_%s', ts_matches.groups()[0])
+ self.trusted = True
+ else:
+ self.errors.add('Insufficient trust (model=%s): %s (%s)'
+ % (trustmodel, keyid, puid))
+ else:
+ self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid))
+ else:
+ # Are we missing a key?
+ matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
+ if matches:
+ self.errors.add('Missing public key: %s' % matches.groups()[0])
+
+ def __repr__(self):
+ out = list()
+ out.append(' good: %s' % self.good)
+ out.append(' valid: %s' % self.valid)
+ out.append(' trusted: %s' % self.trusted)
+ if self.attestor is not None:
+ out.append(' attestor: %s' % self.attestor.keyid)
+
+ out.append(' --- validation errors ---')
+ for error in self.errors:
+ out.append(' | %s' % error)
+ return '\n'.join(out)
+
+
+class LoreAttestationDocument:
+ def __init__(self, source, sigdata):
+ self.source = source
+ self.lsig = None
+ self.passing = False
self.hashes = set()
self.errors = set()
@@ -1041,42 +1089,14 @@ class LoreAttestationDocument:
logger.debug('Validating document obtained from %s', self.source)
ecode, output = gpg_run_command(gpgargs, stdin=sigdata.encode('utf-8'))
- if ecode == 0:
- # We're looking for both GOODSIG and VALIDSIG
- gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
- if gs_matches:
- logger.debug(' GOODSIG')
- self.good = True
- keyid = gs_matches.groups()[0]
- self.attestor = LoreAttestor(keyid)
- puid = '%s <%s>' % self.attestor.get_primary_uid()
- vs_matches = re.search(r'^\[GNUPG:\] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
- if vs_matches:
- logger.debug(' VALIDSIG')
- self.valid = True
- ymd = vs_matches.groups()[1]
- self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d')
- # Do we have a TRUST_(FULLY|ULTIMATE)?
- ts_matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M)
- if ts_matches:
- logger.debug(' TRUST_%s', ts_matches.groups()[0])
- self.trusted = True
- else:
- self.errors.add('Insufficient trust (model=%s): %s (%s)'
- % (config['attestation-trust-model'], keyid, puid))
- else:
- self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid))
- else:
- # Are we missing a key?
- matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
- if matches:
- self.errors.add('Missing public key: %s' % matches.groups()[0])
- else:
- logger.debug('NOGOOD: Signature on %s failed to verify', self.source)
- return
+ self.lsig = LoreAttestationSignature(output, config['attestation-trust-model'])
+ self.errors.update(self.lsig.errors)
- if self.good and self.valid and self.trusted:
+ if self.lsig.good and self.lsig.valid and self.lsig.trusted:
self.passing = True
+ else:
+ # Not going any further
+ return
if source.find('http') == 0:
# We only cache known-good attestations obtained from remote
@@ -1109,19 +1129,14 @@ class LoreAttestationDocument:
def __repr__(self):
out = list()
out.append(' source: %s' % self.source)
- out.append(' good: %s' % self.good)
- out.append(' valid: %s' % self.valid)
- out.append(' trusted: %s' % self.trusted)
- if self.attestor is not None:
- out.append(' attestor: %s' % self.attestor.keyid)
-
out.append(' --- validation errors ---')
for error in self.errors:
out.append(' | %s' % error)
out.append(' --- hashes ---')
for hg in self.hashes:
out.append(' | %s-%s-%s' % (hg[0][:8], hg[1][:8], hg[2][:8]))
- return '\n'.join(out)
+ ret = '\n'.join(out) + '\n' + str(self.lsig)
+ return ret
@staticmethod
def get_from_cache(attid):
@@ -1249,7 +1264,9 @@ def _run_command(cmdargs, stdin=None, logstderr=False):
output = output.decode('utf-8', errors='replace')
if logstderr and len(error.strip()):
- logger.debug('Stderr: %s', error.decode('utf-8', errors='replace'))
+ errout = error.decode('utf-8', errors='replace')
+ logger.debug('Stderr: %s', errout)
+ output += errout
return sp.returncode, output