diff options
author | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2020-11-20 12:10:27 -0500 |
---|---|---|
committer | Konstantin Ryabitsev <konstantin@linuxfoundation.org> | 2020-11-20 12:10:27 -0500 |
commit | e8ac2e9b40a758464b6a8e360dd840935479b953 (patch) | |
tree | 9eb36f1f6d73735addd095dfbfb8890c1d7134f7 | |
parent | 791fb2c96794ea71a7e79b0c347b123f18defcce (diff) | |
download | b4-e8ac2e9b40a758464b6a8e360dd840935479b953.tar.gz |
Add initial support for DKIM attestation
Now that vger is doing a much better job preserving DKIM signatures, it
makes sense to teach b4 to check those. It's still failing for most
mailman lists, but those are fewer than vger sources.
Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r-- | b4/__init__.py | 468 | ||||
-rw-r--r-- | requirements.txt | 8 |
2 files changed, 352 insertions, 124 deletions
diff --git a/b4/__init__.py b/b4/__init__.py index a0fa086..c552a5f 100644 --- a/b4/__init__.py +++ b/b4/__init__.py @@ -29,8 +29,16 @@ from email import charset charset.add_charset('utf-8', None) emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None) +try: + import dns.resolver + import dkim + + can_dkim_verify = True + _resolver = dns.resolver.get_default_resolver() +except ModuleNotFoundError: + can_dkim_verify = False + __VERSION__ = '0.6.0-dev' -ATTESTATION_FORMAT_VER = '0.1' logger = logging.getLogger('b4') @@ -38,9 +46,14 @@ HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@') FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)') PASS_SIMPLE = '[P]' +WEAK_SIMPLE = '[D]' FAIL_SIMPLE = '[F]' -PASS_FANCY = '[\033[32m\u2713\033[0m]' -FAIL_FANCY = '[\033[31m\u2717\033[0m]' +PASS_FANCY = '\033[32m\u2714\033[0m' +WEAK_FANCY = '\033[33m\u2713\033[0m' +FAIL_FANCY = '\033[31m\u2717\033[0m' + +HDR_PATCH_HASHES = 'X-Patch-Hashes' +HDR_PATCH_SIG = 'X-Patch-Sig' # You can use bash-style globbing here WANTHDRS = [ @@ -480,22 +493,17 @@ class LoreSeries: logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email') addmysob = False - attdata = [None] * self.expected + attdata = [(None, None)] * self.expected attpolicy = config['attestation-policy'] - try: - attstaled = int(config['attestation-staleness-days']) - except ValueError: - attstaled = 30 - # exact_from_match = False - # if config['attestation-uid-match'] == 'strict': - # exact_from_match = True if config['attestation-checkmarks'] == 'fancy': attpass = PASS_FANCY attfail = FAIL_FANCY + attweak = WEAK_FANCY else: attpass = PASS_SIMPLE attfail = FAIL_SIMPLE + attweak = WEAK_SIMPLE at = 1 atterrors = list() @@ -508,6 +516,7 @@ class LoreSeries: if lmsg is None: logger.critical('CRITICAL: [%s/%s] is missing, cannot cherrypick', at, self.expected) raise KeyError('Cherrypick not in series') + if lmsg is not None: if self.has_cover and covertrailers and self.patches[0].followup_trailers: lmsg.followup_trailers += self.patches[0].followup_trailers @@ -521,17 +530,12 @@ class LoreSeries: lmsg.load_hashes() latt = lmsg.attestation if latt and latt.validate(lmsg.msg): - # Make sure it's not too old compared to the message date - # Timezone doesn't matter as we calculate whole days - tdelta = lmsg.date.replace(tzinfo=None) - latt.lsig.sigdate - if tdelta.days > attstaled: - # Uh-oh, attestation is too old! - logger.info(' %s %s', attfail, lmsg.full_subject) - atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected, - attstaled, tdelta.days)) + if latt.lsig.attestor and latt.lsig.attestor.mode == 'domain': + logger.info(' %s %s', attweak, lmsg.full_subject) + attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attweak) # noqa else: logger.info(' %s %s', attpass, lmsg.full_subject) - attdata[at - 1] = latt.lsig.attestor.get_trailer(lmsg.fromemail) + attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attpass) # noqa else: if attpolicy in ('softfail', 'hardfail'): logger.info(' %s %s', attfail, lmsg.full_subject) @@ -561,12 +565,16 @@ class LoreSeries: if attpolicy == 'off': return mbx - failed = None in attdata + + failed = (None, None) in attdata if not failed: logger.info(' ---') - for trailer in set(attdata): - logger.info(' %s %s', attpass, trailer) + for trailer, attmode in set(attdata): + logger.info(' %s %s', attmode, trailer) return mbx + elif not can_dkim_verify: + logger.info(' ---') + logger.info(' NOTE: install dkimpy for DKIM signature attestation.') errors = set(atterrors) for attdoc in ATTESTATIONS: @@ -1452,6 +1460,27 @@ class LoreAttestor: self.keyid = keyid self.uids = list() + def __repr__(self): + out = list() + out.append(' keyid: %s' % self.keyid) + for uid in self.uids: + out.append(' uid: %s <%s>' % uid) + return '\n'.join(out) + + +class LoreAttestorDKIM(LoreAttestor): + def __init__(self, keyid): + self.mode = 'domain' + super().__init__(keyid) + + def get_trailer(self, fromaddr): # noqa + return 'Attestation-by: DKIM/%s (From: %s)' % (self.keyid, fromaddr) + + +class LoreAttestorPGP(LoreAttestor): + def __init__(self, keyid): + super().__init__(keyid) + self.mode = 'person' self.load_subkey_uids() def load_subkey_uids(self): @@ -1497,16 +1526,11 @@ class LoreAttestor: return 'Attestation-by: %s <%s> (pgp: %s)' % (uid[0], uid[1], self.keyid) - def __repr__(self): - out = list() - out.append(' keyid: %s' % self.keyid) - for uid in self.uids: - out.append(' uid: %s <%s>' % uid) - return '\n'.join(out) - class LoreAttestationSignature: - def __init__(self, output, trustmodel): + def __init__(self, msg): + self.msg = msg + self.mode = None self.good = False self.valid = False self.trusted = False @@ -1515,19 +1539,232 @@ class LoreAttestationSignature: self.attestor = None self.errors = set() + config = get_main_config() + try: + driftd = int(config['attestation-staleness-days']) + except ValueError: + driftd = 30 + + self.maxdrift = datetime.timedelta(days=driftd) + + def verify_time_drift(self) -> None: + msgdt = email.utils.parsedate_to_datetime(str(self.msg['Date'])) + sdrift = self.sigdate - msgdt + if sdrift > self.maxdrift: + self.passing = False + self.errors.add('Time drift between Date and t too great (%s)' % sdrift) + return + logger.debug('PASS : time drift between Date and t (%s)', sdrift) + + def verify_identity_domain(self, identity: str, domain: str): + # Domain is supposed to be present in identity + if not identity.endswith(domain): + logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity) + self.passing = False + return + fromeml = email.utils.getaddresses(self.msg.get_all('from', []))[0][1] + if identity.find('@') < 0: + logger.debug('identity must contain @ (i=%s)', identity) + self.passing = False + return + ilocal, idomain = identity.split('@') + # identity is supposed to be present in from + if not fromeml.endswith(f'@{idomain}'): + self.errors.add('identity (i=%s) does not match from (from=%s)' % (identity, fromeml)) + self.passing = False + return + logger.debug('identity and domain match From header') + + # @staticmethod + # def get_dkim_key(domain: str, selector: str, timeout: int = 5) -> Tuple[str, str]: + # global DNSCACHE + # if (domain, selector) in DNSCACHE: + # return DNSCACHE[(domain, selector)] + # + # name = f'{selector}._domainkey.{domain}.' + # logger.debug('DNS-lookup: %s', name) + # keydata = None + # try: + # a = dns.resolver.resolve(name, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout) # noqa + # # Find v=DKIM1 + # for r in a.response.answer: + # if r.rdtype == dns.rdatatype.TXT: + # for item in r.items: + # # Concatenate all strings + # txtdata = b''.join(item.strings) + # if txtdata.find(b'v=DKIM1') >= 0: + # keydata = txtdata.decode() + # break + # if keydata: + # break + # except dns.resolver.NXDOMAIN: # noqa + # raise LookupError('Domain %s does not exist', name) + # + # if not keydata: + # raise LookupError('Domain %s does not contain a DKIM record', name) + # + # parts = get_parts_from_header(keydata) + # if 'p' not in parts: + # raise LookupError('Domain %s does not contain a DKIM key', name) + # if 'k' not in parts: + # raise LookupError('Domain %s does not indicate key time', name) + # + # DNSCACHE[(domain, selector)] = (parts['k'], parts['p']) + # logger.debug('k=%s, p=%s', parts['k'], parts['p']) + # return parts['k'], parts['p'] + + def __repr__(self): + out = list() + out.append(' mode: %s' % self.mode) + out.append(' good: %s' % self.good) + out.append(' valid: %s' % self.valid) + out.append(' trusted: %s' % self.trusted) + if self.attestor is not None: + out.append(' attestor: %s' % self.attestor.keyid) + + out.append(' --- validation errors ---') + for error in self.errors: + out.append(' | %s' % error) + return '\n'.join(out) + + +class LoreAttestationSignatureDKIM(LoreAttestationSignature): + def __init__(self, msg): + super().__init__(msg) + self.mode = 'dkim' + # Doesn't quite work right, so just use dkimpy's native + # self.native_verify() + # return + + if not dkim.verify(self.msg.as_bytes(), dnsfunc=dkim_get_txt): + return + self.good = True + + # Grab toplevel signature that we just verified + dks = self.msg.get('dkim-signature') + ddata = get_parts_from_header(dks) + self.attestor = LoreAttestorDKIM(ddata['d']) + self.valid = True + self.trusted = True + self.passing = True + + if ddata.get('t'): + self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc) + else: + self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date'])) + + # def native_verify(self): + # dks = self.msg.get('dkim-signature') + # ddata = get_parts_from_header(dks) + # try: + # kt, kp = LoreAttestationSignature.get_dkim_key(ddata['d'], ddata['s']) + # if kt not in ('rsa',): # 'ed25519'): + # logger.debug('DKIM key type %s not supported', kt) + # return + # pk = base64.b64decode(kp) + # sig = base64.b64decode(ddata['b']) + # except (LookupError, binascii.Error) as ex: + # logger.debug('Unable to look up DKIM key: %s', ex) + # return + # + # headers = list() + # + # for header in ddata['h'].split(':'): + # # For the POC, we assume 'relaxed/' + # hval = self.msg.get(header) + # if hval is None: + # # Missing headers are omitted by the DKIM RFC + # continue + # if ddata['c'].startswith('relaxed/'): + # hname, hval = dkim_canonicalize_header(header, str(self.msg.get(header))) + # else: + # hname = header + # hval = str(self.msg.get(header)) + # headers.append(f'{hname}:{hval}') + # + # # Now we add the dkim-signature header itself, without b= content + # if ddata['c'].startswith('relaxed/'): + # dname, dval = dkim_canonicalize_header('dkim-signature', dks) + # else: + # dname = 'DKIM-Signature' + # dval = dks + # + # dval = dval.rsplit('; b=')[0] + '; b=' + # headers.append(f'{dname}:{dval}') + # payload = ('\r\n'.join(headers)).encode() + # key = RSA.import_key(pk) + # hashed = SHA256.new(payload) + # try: + # # noinspection PyTypeChecker + # pkcs1_15.new(key).verify(hashed, sig) + # except (ValueError, TypeError): + # logger.debug('DKIM signature did not verify') + # self.errors.add('The DKIM signature did NOT verify!') + # return + # + # self.good = True + # if not ddata.get('i'): + # ddata['i'] = '@' + ddata['d'] + # + # logger.debug('PASS : DKIM signature for d=%s, s=%s', ddata['d'], ddata['s']) + # + # self.attestor = LoreAttestorDKIM(ddata['d']) + # self.valid = True + # self.trusted = True + # self.passing = True + # + # self.verify_identity_domain(ddata['i'], ddata['d']) + # if ddata.get('t'): + # self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc) + # self.verify_time_drift() + # else: + # self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date'])) + + +class LoreAttestationSignaturePGP(LoreAttestationSignature): + def __init__(self, msg): + super().__init__(msg) + self.mode = 'pgp' + + shdr = msg.get(HDR_PATCH_SIG) + sdata = get_parts_from_header(shdr) + hhdr = msg.get(HDR_PATCH_HASHES) + sig = base64.b64decode(sdata['b']) + headers = list() + hhname, hhval = dkim_canonicalize_header(HDR_PATCH_HASHES, str(hhdr)) + headers.append(f'{hhname}:{hhval}') + # Now we add the sig header itself, without b= content + shname, shval = dkim_canonicalize_header(HDR_PATCH_SIG, shdr) + shval = shval.rsplit('; b=')[0] + '; b=' + headers.append(f'{shname}:{shval}') + payload = ('\r\n'.join(headers)).encode() + savefile = mkstemp('in-header-pgp-verify')[1] + with open(savefile, 'wb') as fh: + fh.write(sig) + + gpgargs = list() + config = get_main_config() + trustmodel = config.get('attestation-trust-model', 'tofu') + if trustmodel == 'tofu': + gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good'] + gpgargs += ['--verify', '--status-fd=1', savefile, '-'] + ecode, out, err = gpg_run_command(gpgargs, stdin=payload) + os.unlink(savefile) + output = out.decode() + gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M) if gs_matches: logger.debug(' GOODSIG') self.good = True keyid = gs_matches.groups()[0] - self.attestor = LoreAttestor(keyid) + self.attestor = LoreAttestorPGP(keyid) puid = '%s <%s>' % self.attestor.get_primary_uid() vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M) if vs_matches: logger.debug(' VALIDSIG') self.valid = True ymd = vs_matches.groups()[1] - self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d') + self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc) # Do we have a TRUST_(FULLY|ULTIMATE)? ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M) if ts_matches: @@ -1545,18 +1782,9 @@ class LoreAttestationSignature: if matches: self.errors.add('Missing public key: %s' % matches.groups()[0]) - def __repr__(self): - out = list() - out.append(' good: %s' % self.good) - out.append(' valid: %s' % self.valid) - out.append(' trusted: %s' % self.trusted) - if self.attestor is not None: - out.append(' attestor: %s' % self.attestor.keyid) - - out.append(' --- validation errors ---') - for error in self.errors: - out.append(' | %s' % error) - return '\n'.join(out) + # A couple of final verifications + self.verify_time_drift() + # XXX: Need to verify identity domain class LoreAttestation: @@ -1568,9 +1796,6 @@ class LoreAttestation: self.mb = base64.b64encode(_m.digest()).decode() self.pb = base64.b64encode(_p.digest()).decode() - self.hashes_header_name = 'X-Patch-Hashes' - self.sig_header_name = 'X-Patch-Sig' - self.lsig = None self.passing = False self.iv = False @@ -1592,90 +1817,50 @@ class LoreAttestation: out.append(' iv: %s' % self.iv) out.append(' mv: %s' % self.mv) out.append(' pv: %s' % self.pv) + out.append(' pass: %s' % self.passing) return '\n'.join(out) - @staticmethod - def verify_identity_domain(msg, identity: str, domain: str) -> bool: - # Domain is supposed to be present in identity - if not identity.endswith(domain): - logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity) - return False - fromeml = email.utils.getaddresses(msg.get_all('from', []))[0][1] - if identity.find('@') < 0: - logger.debug('identity must contain @ (i=%s)', identity) - return False - ilocal, idomain = identity.split('@') - # identity is supposed to be present in from - if not fromeml.endswith(f'@{idomain}'): - logger.debug('identity (i=%s) does not match from (from=%s)', identity, fromeml) - return False - logger.debug('identity and domain match From header') - return True - - @staticmethod - def get_gpg_attestation(smsg: bytes, dsig: bytes) -> LoreAttestationSignature: - # We can't pass both the detached sig and the content on stdin, so - # use a temporary file - savefile = mkstemp('in-header-pgp-verify')[1] - with open(savefile, 'wb') as fh: - fh.write(dsig) - - gpgargs = list() - config = get_main_config() - if config['attestation-trust-model'] == 'tofu': - gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good'] - gpgargs += ['--verify', '--status-fd=1', savefile, '-'] - ecode, out, err = gpg_run_command(gpgargs, stdin=smsg) - os.unlink(savefile) - return LoreAttestationSignature(out.decode(), config['attestation-trust-model']) - - @staticmethod - def dkim_canonicalize_header(hname, hval): - hname = hname.lower() - hval = hval.strip() - hval = re.sub(r'\n', '', hval) - hval = re.sub(r'\s+', ' ', hval) - return hname, hval - - @staticmethod - def get_parts_from_header(hstr: str) -> dict: - hstr = re.sub(r'\s*', '', hstr) - hdata = dict() - for chunk in hstr.split(';'): - parts = chunk.split('=', 1) - if len(parts) < 2: - continue - hdata[parts[0]] = parts[1] - return hdata - def validate(self, msg): - shdr = msg.get(self.sig_header_name) + # Check if we have a X-Patch-Sig header. At this time, we only support two modes: + # - GPG mode, which we check for fist + # - Plain DKIM mode, which we check as fall-back + # More modes may be coming in the future, depending on feedback. + shdr = msg.get(HDR_PATCH_SIG) + hhdr = msg.get(HDR_PATCH_HASHES) + if hhdr is None: + # Do we have a dkim signature header? + if msg.get('DKIM-Signature'): + if can_dkim_verify: + self.lsig = LoreAttestationSignatureDKIM(msg) + if self.lsig.passing: + self.passing = True + self.iv = True + self.mv = True + self.pv = True + return self.passing + return None + if shdr is None: return None - sdata = LoreAttestation.get_parts_from_header(shdr) - sig = base64.b64decode(sdata['b']) - headers = list() - hhname, hhval = LoreAttestation.dkim_canonicalize_header(self.hashes_header_name, - str(msg.get(self.hashes_header_name))) - headers.append(f'{hhname}:{hhval}') - # Now we add the sig header itself, without b= content - shname, shval = LoreAttestation.dkim_canonicalize_header(self.sig_header_name, shdr) - shval = shval.rsplit('; b=')[0] + '; b=' - headers.append(f'{shname}:{shval}') - payload = ('\r\n'.join(headers)).encode() - self.lsig = LoreAttestation.get_gpg_attestation(payload, sig) - if self.lsig.passing: - hdata = LoreAttestation.get_parts_from_header(hhval) - if hdata['i'] == self.ib: - self.iv = True - if hdata['m'] == self.mb: - self.mv = True - if hdata['p'] == self.pb: - self.pv = True + + sdata = get_parts_from_header(shdr) + if sdata.get('m') == 'pgp': + self.lsig = LoreAttestationSignaturePGP(msg) + if self.lsig.passing: + hdata = get_parts_from_header(hhdr) + if hdata['i'] == self.ib: + self.iv = True + if hdata['m'] == self.mb: + self.mv = True + if hdata['p'] == self.pb: + self.pv = True if self.iv and self.mv and self.pv: self.passing = True + if self.lsig is None: + return None + return self.passing @@ -2112,3 +2297,40 @@ def parse_int_range(intrange, upper=None): yield from range(int(nr[0]), upper+1) else: logger.critical('Unknown range value specified: %s', n) + + +def dkim_canonicalize_header(hname, hval): + hname = hname.lower() + hval = hval.strip() + hval = re.sub(r'\n', '', hval) + hval = re.sub(r'\s+', ' ', hval) + return hname, hval + + +def get_parts_from_header(hstr: str) -> dict: + hstr = re.sub(r'\s*', '', hstr) + hdata = dict() + for chunk in hstr.split(';'): + parts = chunk.split('=', 1) + if len(parts) < 2: + continue + hdata[parts[0]] = parts[1] + return hdata + + +def dkim_get_txt(name: bytes, timeout: int = 5): + lookup = name.decode() + logger.debug('DNS-lookup: %s', lookup) + try: + a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True) + # Find v=DKIM1 + for r in a.response.answer: + if r.rdtype == dns.rdatatype.TXT: + for item in r.items: + # Concatenate all strings + txtdata = b''.join(item.strings) + if txtdata.find(b'v=DKIM1') >= 0: + return txtdata + except dns.resolver.NXDOMAIN: + pass + return None diff --git a/requirements.txt b/requirements.txt index f229360..11d400d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,7 @@ -requests +requests ~= 2.24.0 +# These are optional, needed for attestation features +dnspython~=2.0.0 +dkimpy~=1.0.5 +# These may be required in the future for other patch attestation features +#pycryptodomex~=3.9.9 +#PyNaCl~=1.4.0 |