diff options
Diffstat (limited to 'b4/__init__.py')
-rw-r--r-- | b4/__init__.py | 115 |
1 files changed, 66 insertions, 49 deletions
diff --git a/b4/__init__.py b/b4/__init__.py index b4ce2d4..41f8dbc 100644 --- a/b4/__init__.py +++ b/b4/__init__.py @@ -437,7 +437,7 @@ class LoreSeries: attpolicy = 'softfail' # Make sure it's not too old compared to the message date # Timezone doesn't matter as we calculate whole days - tdelta = lmsg.date.replace(tzinfo=None) - attdoc.sigdate + tdelta = lmsg.date.replace(tzinfo=None) - attdoc.lsig.sigdate if tdelta.days > attstaled: # Uh-oh, attestation is too old! logger.info(' %s %s', attfail, lmsg.full_subject) @@ -445,7 +445,7 @@ class LoreSeries: attstaled, tdelta.days)) else: logger.info(' %s %s', attpass, lmsg.full_subject) - attdata[at-1] = attdoc.attestor.get_trailer(lmsg.fromemail) + attdata[at-1] = attdoc.lsig.attestor.get_trailer(lmsg.fromemail) else: logger.info(' %s', lmsg.full_subject) @@ -876,7 +876,7 @@ class LoreMessage: # We return the first hit return attdoc # Does this doc have an exact match? - uid = attdoc.attestor.get_matching_uid(self.fromemail) + uid = attdoc.lsig.attestor.get_matching_uid(self.fromemail) if uid[1] == self.fromemail: return attdoc # stick an error in the first available attdoc saying @@ -1022,15 +1022,63 @@ class LoreAttestor: return '\n'.join(out) -class LoreAttestationDocument: - def __init__(self, source, sigdata): - self.source = source +class LoreAttestationSignature: + def __init__(self, output, trustmodel): self.good = False self.valid = False self.trusted = False self.sigdate = None - self.passing = False self.attestor = None + self.errors = set() + + gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M) + if gs_matches: + logger.debug(' GOODSIG') + self.good = True + keyid = gs_matches.groups()[0] + self.attestor = LoreAttestor(keyid) + puid = '%s <%s>' % self.attestor.get_primary_uid() + vs_matches = re.search(r'^\[GNUPG:\] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M) + if vs_matches: + logger.debug(' VALIDSIG') + self.valid = True + ymd = vs_matches.groups()[1] + self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d') + # Do we have a TRUST_(FULLY|ULTIMATE)? + ts_matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M) + if ts_matches: + logger.debug(' TRUST_%s', ts_matches.groups()[0]) + self.trusted = True + else: + self.errors.add('Insufficient trust (model=%s): %s (%s)' + % (trustmodel, keyid, puid)) + else: + self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid)) + else: + # Are we missing a key? + matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M) + if matches: + self.errors.add('Missing public key: %s' % matches.groups()[0]) + + def __repr__(self): + out = list() + out.append(' good: %s' % self.good) + out.append(' valid: %s' % self.valid) + out.append(' trusted: %s' % self.trusted) + if self.attestor is not None: + out.append(' attestor: %s' % self.attestor.keyid) + + out.append(' --- validation errors ---') + for error in self.errors: + out.append(' | %s' % error) + return '\n'.join(out) + + +class LoreAttestationDocument: + def __init__(self, source, sigdata): + self.source = source + self.lsig = None + self.passing = False self.hashes = set() self.errors = set() @@ -1041,42 +1089,14 @@ class LoreAttestationDocument: logger.debug('Validating document obtained from %s', self.source) ecode, output = gpg_run_command(gpgargs, stdin=sigdata.encode('utf-8')) - if ecode == 0: - # We're looking for both GOODSIG and VALIDSIG - gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M) - if gs_matches: - logger.debug(' GOODSIG') - self.good = True - keyid = gs_matches.groups()[0] - self.attestor = LoreAttestor(keyid) - puid = '%s <%s>' % self.attestor.get_primary_uid() - vs_matches = re.search(r'^\[GNUPG:\] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M) - if vs_matches: - logger.debug(' VALIDSIG') - self.valid = True - ymd = vs_matches.groups()[1] - self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d') - # Do we have a TRUST_(FULLY|ULTIMATE)? - ts_matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M) - if ts_matches: - logger.debug(' TRUST_%s', ts_matches.groups()[0]) - self.trusted = True - else: - self.errors.add('Insufficient trust (model=%s): %s (%s)' - % (config['attestation-trust-model'], keyid, puid)) - else: - self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid)) - else: - # Are we missing a key? - matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M) - if matches: - self.errors.add('Missing public key: %s' % matches.groups()[0]) - else: - logger.debug('NOGOOD: Signature on %s failed to verify', self.source) - return + self.lsig = LoreAttestationSignature(output, config['attestation-trust-model']) + self.errors.update(self.lsig.errors) - if self.good and self.valid and self.trusted: + if self.lsig.good and self.lsig.valid and self.lsig.trusted: self.passing = True + else: + # Not going any further + return if source.find('http') == 0: # We only cache known-good attestations obtained from remote @@ -1109,19 +1129,14 @@ class LoreAttestationDocument: def __repr__(self): out = list() out.append(' source: %s' % self.source) - out.append(' good: %s' % self.good) - out.append(' valid: %s' % self.valid) - out.append(' trusted: %s' % self.trusted) - if self.attestor is not None: - out.append(' attestor: %s' % self.attestor.keyid) - out.append(' --- validation errors ---') for error in self.errors: out.append(' | %s' % error) out.append(' --- hashes ---') for hg in self.hashes: out.append(' | %s-%s-%s' % (hg[0][:8], hg[1][:8], hg[2][:8])) - return '\n'.join(out) + ret = '\n'.join(out) + '\n' + str(self.lsig) + return ret @staticmethod def get_from_cache(attid): @@ -1249,7 +1264,9 @@ def _run_command(cmdargs, stdin=None, logstderr=False): output = output.decode('utf-8', errors='replace') if logstderr and len(error.strip()): - logger.debug('Stderr: %s', error.decode('utf-8', errors='replace')) + errout = error.decode('utf-8', errors='replace') + logger.debug('Stderr: %s', errout) + output += errout return sp.returncode, output |