aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--b4/__init__.py468
-rw-r--r--requirements.txt8
2 files changed, 352 insertions, 124 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
index a0fa086..c552a5f 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -29,8 +29,16 @@ from email import charset
charset.add_charset('utf-8', None)
emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None)
+try:
+ import dns.resolver
+ import dkim
+
+ can_dkim_verify = True
+ _resolver = dns.resolver.get_default_resolver()
+except ModuleNotFoundError:
+ can_dkim_verify = False
+
__VERSION__ = '0.6.0-dev'
-ATTESTATION_FORMAT_VER = '0.1'
logger = logging.getLogger('b4')
@@ -38,9 +46,14 @@ HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@')
FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)')
PASS_SIMPLE = '[P]'
+WEAK_SIMPLE = '[D]'
FAIL_SIMPLE = '[F]'
-PASS_FANCY = '[\033[32m\u2713\033[0m]'
-FAIL_FANCY = '[\033[31m\u2717\033[0m]'
+PASS_FANCY = '\033[32m\u2714\033[0m'
+WEAK_FANCY = '\033[33m\u2713\033[0m'
+FAIL_FANCY = '\033[31m\u2717\033[0m'
+
+HDR_PATCH_HASHES = 'X-Patch-Hashes'
+HDR_PATCH_SIG = 'X-Patch-Sig'
# You can use bash-style globbing here
WANTHDRS = [
@@ -480,22 +493,17 @@ class LoreSeries:
logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email')
addmysob = False
- attdata = [None] * self.expected
+ attdata = [(None, None)] * self.expected
attpolicy = config['attestation-policy']
- try:
- attstaled = int(config['attestation-staleness-days'])
- except ValueError:
- attstaled = 30
- # exact_from_match = False
- # if config['attestation-uid-match'] == 'strict':
- # exact_from_match = True
if config['attestation-checkmarks'] == 'fancy':
attpass = PASS_FANCY
attfail = FAIL_FANCY
+ attweak = WEAK_FANCY
else:
attpass = PASS_SIMPLE
attfail = FAIL_SIMPLE
+ attweak = WEAK_SIMPLE
at = 1
atterrors = list()
@@ -508,6 +516,7 @@ class LoreSeries:
if lmsg is None:
logger.critical('CRITICAL: [%s/%s] is missing, cannot cherrypick', at, self.expected)
raise KeyError('Cherrypick not in series')
+
if lmsg is not None:
if self.has_cover and covertrailers and self.patches[0].followup_trailers:
lmsg.followup_trailers += self.patches[0].followup_trailers
@@ -521,17 +530,12 @@ class LoreSeries:
lmsg.load_hashes()
latt = lmsg.attestation
if latt and latt.validate(lmsg.msg):
- # Make sure it's not too old compared to the message date
- # Timezone doesn't matter as we calculate whole days
- tdelta = lmsg.date.replace(tzinfo=None) - latt.lsig.sigdate
- if tdelta.days > attstaled:
- # Uh-oh, attestation is too old!
- logger.info(' %s %s', attfail, lmsg.full_subject)
- atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected,
- attstaled, tdelta.days))
+ if latt.lsig.attestor and latt.lsig.attestor.mode == 'domain':
+ logger.info(' %s %s', attweak, lmsg.full_subject)
+ attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attweak) # noqa
else:
logger.info(' %s %s', attpass, lmsg.full_subject)
- attdata[at - 1] = latt.lsig.attestor.get_trailer(lmsg.fromemail)
+ attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attpass) # noqa
else:
if attpolicy in ('softfail', 'hardfail'):
logger.info(' %s %s', attfail, lmsg.full_subject)
@@ -561,12 +565,16 @@ class LoreSeries:
if attpolicy == 'off':
return mbx
- failed = None in attdata
+
+ failed = (None, None) in attdata
if not failed:
logger.info(' ---')
- for trailer in set(attdata):
- logger.info(' %s %s', attpass, trailer)
+ for trailer, attmode in set(attdata):
+ logger.info(' %s %s', attmode, trailer)
return mbx
+ elif not can_dkim_verify:
+ logger.info(' ---')
+ logger.info(' NOTE: install dkimpy for DKIM signature attestation.')
errors = set(atterrors)
for attdoc in ATTESTATIONS:
@@ -1452,6 +1460,27 @@ class LoreAttestor:
self.keyid = keyid
self.uids = list()
+ def __repr__(self):
+ out = list()
+ out.append(' keyid: %s' % self.keyid)
+ for uid in self.uids:
+ out.append(' uid: %s <%s>' % uid)
+ return '\n'.join(out)
+
+
+class LoreAttestorDKIM(LoreAttestor):
+ def __init__(self, keyid):
+ self.mode = 'domain'
+ super().__init__(keyid)
+
+ def get_trailer(self, fromaddr): # noqa
+ return 'Attestation-by: DKIM/%s (From: %s)' % (self.keyid, fromaddr)
+
+
+class LoreAttestorPGP(LoreAttestor):
+ def __init__(self, keyid):
+ super().__init__(keyid)
+ self.mode = 'person'
self.load_subkey_uids()
def load_subkey_uids(self):
@@ -1497,16 +1526,11 @@ class LoreAttestor:
return 'Attestation-by: %s <%s> (pgp: %s)' % (uid[0], uid[1], self.keyid)
- def __repr__(self):
- out = list()
- out.append(' keyid: %s' % self.keyid)
- for uid in self.uids:
- out.append(' uid: %s <%s>' % uid)
- return '\n'.join(out)
-
class LoreAttestationSignature:
- def __init__(self, output, trustmodel):
+ def __init__(self, msg):
+ self.msg = msg
+ self.mode = None
self.good = False
self.valid = False
self.trusted = False
@@ -1515,19 +1539,232 @@ class LoreAttestationSignature:
self.attestor = None
self.errors = set()
+ config = get_main_config()
+ try:
+ driftd = int(config['attestation-staleness-days'])
+ except ValueError:
+ driftd = 30
+
+ self.maxdrift = datetime.timedelta(days=driftd)
+
+ def verify_time_drift(self) -> None:
+ msgdt = email.utils.parsedate_to_datetime(str(self.msg['Date']))
+ sdrift = self.sigdate - msgdt
+ if sdrift > self.maxdrift:
+ self.passing = False
+ self.errors.add('Time drift between Date and t too great (%s)' % sdrift)
+ return
+ logger.debug('PASS : time drift between Date and t (%s)', sdrift)
+
+ def verify_identity_domain(self, identity: str, domain: str):
+ # Domain is supposed to be present in identity
+ if not identity.endswith(domain):
+ logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity)
+ self.passing = False
+ return
+ fromeml = email.utils.getaddresses(self.msg.get_all('from', []))[0][1]
+ if identity.find('@') < 0:
+ logger.debug('identity must contain @ (i=%s)', identity)
+ self.passing = False
+ return
+ ilocal, idomain = identity.split('@')
+ # identity is supposed to be present in from
+ if not fromeml.endswith(f'@{idomain}'):
+ self.errors.add('identity (i=%s) does not match from (from=%s)' % (identity, fromeml))
+ self.passing = False
+ return
+ logger.debug('identity and domain match From header')
+
+ # @staticmethod
+ # def get_dkim_key(domain: str, selector: str, timeout: int = 5) -> Tuple[str, str]:
+ # global DNSCACHE
+ # if (domain, selector) in DNSCACHE:
+ # return DNSCACHE[(domain, selector)]
+ #
+ # name = f'{selector}._domainkey.{domain}.'
+ # logger.debug('DNS-lookup: %s', name)
+ # keydata = None
+ # try:
+ # a = dns.resolver.resolve(name, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout) # noqa
+ # # Find v=DKIM1
+ # for r in a.response.answer:
+ # if r.rdtype == dns.rdatatype.TXT:
+ # for item in r.items:
+ # # Concatenate all strings
+ # txtdata = b''.join(item.strings)
+ # if txtdata.find(b'v=DKIM1') >= 0:
+ # keydata = txtdata.decode()
+ # break
+ # if keydata:
+ # break
+ # except dns.resolver.NXDOMAIN: # noqa
+ # raise LookupError('Domain %s does not exist', name)
+ #
+ # if not keydata:
+ # raise LookupError('Domain %s does not contain a DKIM record', name)
+ #
+ # parts = get_parts_from_header(keydata)
+ # if 'p' not in parts:
+ # raise LookupError('Domain %s does not contain a DKIM key', name)
+ # if 'k' not in parts:
+ # raise LookupError('Domain %s does not indicate key time', name)
+ #
+ # DNSCACHE[(domain, selector)] = (parts['k'], parts['p'])
+ # logger.debug('k=%s, p=%s', parts['k'], parts['p'])
+ # return parts['k'], parts['p']
+
+ def __repr__(self):
+ out = list()
+ out.append(' mode: %s' % self.mode)
+ out.append(' good: %s' % self.good)
+ out.append(' valid: %s' % self.valid)
+ out.append(' trusted: %s' % self.trusted)
+ if self.attestor is not None:
+ out.append(' attestor: %s' % self.attestor.keyid)
+
+ out.append(' --- validation errors ---')
+ for error in self.errors:
+ out.append(' | %s' % error)
+ return '\n'.join(out)
+
+
+class LoreAttestationSignatureDKIM(LoreAttestationSignature):
+ def __init__(self, msg):
+ super().__init__(msg)
+ self.mode = 'dkim'
+ # Doesn't quite work right, so just use dkimpy's native
+ # self.native_verify()
+ # return
+
+ if not dkim.verify(self.msg.as_bytes(), dnsfunc=dkim_get_txt):
+ return
+ self.good = True
+
+ # Grab toplevel signature that we just verified
+ dks = self.msg.get('dkim-signature')
+ ddata = get_parts_from_header(dks)
+ self.attestor = LoreAttestorDKIM(ddata['d'])
+ self.valid = True
+ self.trusted = True
+ self.passing = True
+
+ if ddata.get('t'):
+ self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc)
+ else:
+ self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date']))
+
+ # def native_verify(self):
+ # dks = self.msg.get('dkim-signature')
+ # ddata = get_parts_from_header(dks)
+ # try:
+ # kt, kp = LoreAttestationSignature.get_dkim_key(ddata['d'], ddata['s'])
+ # if kt not in ('rsa',): # 'ed25519'):
+ # logger.debug('DKIM key type %s not supported', kt)
+ # return
+ # pk = base64.b64decode(kp)
+ # sig = base64.b64decode(ddata['b'])
+ # except (LookupError, binascii.Error) as ex:
+ # logger.debug('Unable to look up DKIM key: %s', ex)
+ # return
+ #
+ # headers = list()
+ #
+ # for header in ddata['h'].split(':'):
+ # # For the POC, we assume 'relaxed/'
+ # hval = self.msg.get(header)
+ # if hval is None:
+ # # Missing headers are omitted by the DKIM RFC
+ # continue
+ # if ddata['c'].startswith('relaxed/'):
+ # hname, hval = dkim_canonicalize_header(header, str(self.msg.get(header)))
+ # else:
+ # hname = header
+ # hval = str(self.msg.get(header))
+ # headers.append(f'{hname}:{hval}')
+ #
+ # # Now we add the dkim-signature header itself, without b= content
+ # if ddata['c'].startswith('relaxed/'):
+ # dname, dval = dkim_canonicalize_header('dkim-signature', dks)
+ # else:
+ # dname = 'DKIM-Signature'
+ # dval = dks
+ #
+ # dval = dval.rsplit('; b=')[0] + '; b='
+ # headers.append(f'{dname}:{dval}')
+ # payload = ('\r\n'.join(headers)).encode()
+ # key = RSA.import_key(pk)
+ # hashed = SHA256.new(payload)
+ # try:
+ # # noinspection PyTypeChecker
+ # pkcs1_15.new(key).verify(hashed, sig)
+ # except (ValueError, TypeError):
+ # logger.debug('DKIM signature did not verify')
+ # self.errors.add('The DKIM signature did NOT verify!')
+ # return
+ #
+ # self.good = True
+ # if not ddata.get('i'):
+ # ddata['i'] = '@' + ddata['d']
+ #
+ # logger.debug('PASS : DKIM signature for d=%s, s=%s', ddata['d'], ddata['s'])
+ #
+ # self.attestor = LoreAttestorDKIM(ddata['d'])
+ # self.valid = True
+ # self.trusted = True
+ # self.passing = True
+ #
+ # self.verify_identity_domain(ddata['i'], ddata['d'])
+ # if ddata.get('t'):
+ # self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc)
+ # self.verify_time_drift()
+ # else:
+ # self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date']))
+
+
+class LoreAttestationSignaturePGP(LoreAttestationSignature):
+ def __init__(self, msg):
+ super().__init__(msg)
+ self.mode = 'pgp'
+
+ shdr = msg.get(HDR_PATCH_SIG)
+ sdata = get_parts_from_header(shdr)
+ hhdr = msg.get(HDR_PATCH_HASHES)
+ sig = base64.b64decode(sdata['b'])
+ headers = list()
+ hhname, hhval = dkim_canonicalize_header(HDR_PATCH_HASHES, str(hhdr))
+ headers.append(f'{hhname}:{hhval}')
+ # Now we add the sig header itself, without b= content
+ shname, shval = dkim_canonicalize_header(HDR_PATCH_SIG, shdr)
+ shval = shval.rsplit('; b=')[0] + '; b='
+ headers.append(f'{shname}:{shval}')
+ payload = ('\r\n'.join(headers)).encode()
+ savefile = mkstemp('in-header-pgp-verify')[1]
+ with open(savefile, 'wb') as fh:
+ fh.write(sig)
+
+ gpgargs = list()
+ config = get_main_config()
+ trustmodel = config.get('attestation-trust-model', 'tofu')
+ if trustmodel == 'tofu':
+ gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
+ gpgargs += ['--verify', '--status-fd=1', savefile, '-']
+ ecode, out, err = gpg_run_command(gpgargs, stdin=payload)
+ os.unlink(savefile)
+ output = out.decode()
+
gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
if gs_matches:
logger.debug(' GOODSIG')
self.good = True
keyid = gs_matches.groups()[0]
- self.attestor = LoreAttestor(keyid)
+ self.attestor = LoreAttestorPGP(keyid)
puid = '%s <%s>' % self.attestor.get_primary_uid()
vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
if vs_matches:
logger.debug(' VALIDSIG')
self.valid = True
ymd = vs_matches.groups()[1]
- self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d')
+ self.sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
# Do we have a TRUST_(FULLY|ULTIMATE)?
ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
if ts_matches:
@@ -1545,18 +1782,9 @@ class LoreAttestationSignature:
if matches:
self.errors.add('Missing public key: %s' % matches.groups()[0])
- def __repr__(self):
- out = list()
- out.append(' good: %s' % self.good)
- out.append(' valid: %s' % self.valid)
- out.append(' trusted: %s' % self.trusted)
- if self.attestor is not None:
- out.append(' attestor: %s' % self.attestor.keyid)
-
- out.append(' --- validation errors ---')
- for error in self.errors:
- out.append(' | %s' % error)
- return '\n'.join(out)
+ # A couple of final verifications
+ self.verify_time_drift()
+ # XXX: Need to verify identity domain
class LoreAttestation:
@@ -1568,9 +1796,6 @@ class LoreAttestation:
self.mb = base64.b64encode(_m.digest()).decode()
self.pb = base64.b64encode(_p.digest()).decode()
- self.hashes_header_name = 'X-Patch-Hashes'
- self.sig_header_name = 'X-Patch-Sig'
-
self.lsig = None
self.passing = False
self.iv = False
@@ -1592,90 +1817,50 @@ class LoreAttestation:
out.append(' iv: %s' % self.iv)
out.append(' mv: %s' % self.mv)
out.append(' pv: %s' % self.pv)
+ out.append(' pass: %s' % self.passing)
return '\n'.join(out)
- @staticmethod
- def verify_identity_domain(msg, identity: str, domain: str) -> bool:
- # Domain is supposed to be present in identity
- if not identity.endswith(domain):
- logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity)
- return False
- fromeml = email.utils.getaddresses(msg.get_all('from', []))[0][1]
- if identity.find('@') < 0:
- logger.debug('identity must contain @ (i=%s)', identity)
- return False
- ilocal, idomain = identity.split('@')
- # identity is supposed to be present in from
- if not fromeml.endswith(f'@{idomain}'):
- logger.debug('identity (i=%s) does not match from (from=%s)', identity, fromeml)
- return False
- logger.debug('identity and domain match From header')
- return True
-
- @staticmethod
- def get_gpg_attestation(smsg: bytes, dsig: bytes) -> LoreAttestationSignature:
- # We can't pass both the detached sig and the content on stdin, so
- # use a temporary file
- savefile = mkstemp('in-header-pgp-verify')[1]
- with open(savefile, 'wb') as fh:
- fh.write(dsig)
-
- gpgargs = list()
- config = get_main_config()
- if config['attestation-trust-model'] == 'tofu':
- gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
- gpgargs += ['--verify', '--status-fd=1', savefile, '-']
- ecode, out, err = gpg_run_command(gpgargs, stdin=smsg)
- os.unlink(savefile)
- return LoreAttestationSignature(out.decode(), config['attestation-trust-model'])
-
- @staticmethod
- def dkim_canonicalize_header(hname, hval):
- hname = hname.lower()
- hval = hval.strip()
- hval = re.sub(r'\n', '', hval)
- hval = re.sub(r'\s+', ' ', hval)
- return hname, hval
-
- @staticmethod
- def get_parts_from_header(hstr: str) -> dict:
- hstr = re.sub(r'\s*', '', hstr)
- hdata = dict()
- for chunk in hstr.split(';'):
- parts = chunk.split('=', 1)
- if len(parts) < 2:
- continue
- hdata[parts[0]] = parts[1]
- return hdata
-
def validate(self, msg):
- shdr = msg.get(self.sig_header_name)
+ # Check if we have a X-Patch-Sig header. At this time, we only support two modes:
+ # - GPG mode, which we check for fist
+ # - Plain DKIM mode, which we check as fall-back
+ # More modes may be coming in the future, depending on feedback.
+ shdr = msg.get(HDR_PATCH_SIG)
+ hhdr = msg.get(HDR_PATCH_HASHES)
+ if hhdr is None:
+ # Do we have a dkim signature header?
+ if msg.get('DKIM-Signature'):
+ if can_dkim_verify:
+ self.lsig = LoreAttestationSignatureDKIM(msg)
+ if self.lsig.passing:
+ self.passing = True
+ self.iv = True
+ self.mv = True
+ self.pv = True
+ return self.passing
+ return None
+
if shdr is None:
return None
- sdata = LoreAttestation.get_parts_from_header(shdr)
- sig = base64.b64decode(sdata['b'])
- headers = list()
- hhname, hhval = LoreAttestation.dkim_canonicalize_header(self.hashes_header_name,
- str(msg.get(self.hashes_header_name)))
- headers.append(f'{hhname}:{hhval}')
- # Now we add the sig header itself, without b= content
- shname, shval = LoreAttestation.dkim_canonicalize_header(self.sig_header_name, shdr)
- shval = shval.rsplit('; b=')[0] + '; b='
- headers.append(f'{shname}:{shval}')
- payload = ('\r\n'.join(headers)).encode()
- self.lsig = LoreAttestation.get_gpg_attestation(payload, sig)
- if self.lsig.passing:
- hdata = LoreAttestation.get_parts_from_header(hhval)
- if hdata['i'] == self.ib:
- self.iv = True
- if hdata['m'] == self.mb:
- self.mv = True
- if hdata['p'] == self.pb:
- self.pv = True
+
+ sdata = get_parts_from_header(shdr)
+ if sdata.get('m') == 'pgp':
+ self.lsig = LoreAttestationSignaturePGP(msg)
+ if self.lsig.passing:
+ hdata = get_parts_from_header(hhdr)
+ if hdata['i'] == self.ib:
+ self.iv = True
+ if hdata['m'] == self.mb:
+ self.mv = True
+ if hdata['p'] == self.pb:
+ self.pv = True
if self.iv and self.mv and self.pv:
self.passing = True
+ if self.lsig is None:
+ return None
+
return self.passing
@@ -2112,3 +2297,40 @@ def parse_int_range(intrange, upper=None):
yield from range(int(nr[0]), upper+1)
else:
logger.critical('Unknown range value specified: %s', n)
+
+
+def dkim_canonicalize_header(hname, hval):
+ hname = hname.lower()
+ hval = hval.strip()
+ hval = re.sub(r'\n', '', hval)
+ hval = re.sub(r'\s+', ' ', hval)
+ return hname, hval
+
+
+def get_parts_from_header(hstr: str) -> dict:
+ hstr = re.sub(r'\s*', '', hstr)
+ hdata = dict()
+ for chunk in hstr.split(';'):
+ parts = chunk.split('=', 1)
+ if len(parts) < 2:
+ continue
+ hdata[parts[0]] = parts[1]
+ return hdata
+
+
+def dkim_get_txt(name: bytes, timeout: int = 5):
+ lookup = name.decode()
+ logger.debug('DNS-lookup: %s', lookup)
+ try:
+ a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True)
+ # Find v=DKIM1
+ for r in a.response.answer:
+ if r.rdtype == dns.rdatatype.TXT:
+ for item in r.items:
+ # Concatenate all strings
+ txtdata = b''.join(item.strings)
+ if txtdata.find(b'v=DKIM1') >= 0:
+ return txtdata
+ except dns.resolver.NXDOMAIN:
+ pass
+ return None
diff --git a/requirements.txt b/requirements.txt
index f229360..11d400d 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1 +1,7 @@
-requests
+requests ~= 2.24.0
+# These are optional, needed for attestation features
+dnspython~=2.0.0
+dkimpy~=1.0.5
+# These may be required in the future for other patch attestation features
+#pycryptodomex~=3.9.9
+#PyNaCl~=1.4.0