aboutsummaryrefslogtreecommitdiff
path: root/b4/__init__.py
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-10-02 21:03:25 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-10-02 21:03:25 -0400
commit0d759ee9113b371a678c4acafffafa79570e9511 (patch)
tree5f2b074ea2c81208802c1db1efef29c0a1c438b1 /b4/__init__.py
parentca6d35e7728c17b505e6be62ec3b6687aa5bf26b (diff)
downloadb4-0d759ee9113b371a678c4acafffafa79570e9511.tar.gz
Reimplement attestation for in-header hashes
Rewrite attestation to implement in-header hashing and signing. For now, just implementing mode=pgp, but other modes are coming next. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
Diffstat (limited to 'b4/__init__.py')
-rw-r--r--b4/__init__.py408
1 files changed, 164 insertions, 244 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
index dc393a9..9cd1f87 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -19,6 +19,7 @@ import time
import shutil
import mailbox
import pwd
+import base64
from pathlib import Path
from tempfile import mkstemp, TemporaryDirectory
@@ -353,12 +354,6 @@ class LoreMailbox:
self.covers[lmsg.revision] = lmsg
return
- if re.search(r'^Comment: att-fmt-ver:', lmsg.body, re.I | re.M):
- logger.debug('Found attestation message')
- LoreAttestationDocument.load_from_string(lmsg.msgid, lmsg.body)
- # We don't keep it, because it's not useful for us beyond this point
- return
-
if lmsg.has_diff:
if lmsg.revision not in self.series:
if lmsg.revision_inferred and lmsg.in_reply_to:
@@ -490,9 +485,9 @@ class LoreSeries:
attstaled = int(config['attestation-staleness-days'])
except ValueError:
attstaled = 30
- exact_from_match = False
- if config['attestation-uid-match'] == 'strict':
- exact_from_match = True
+ # exact_from_match = False
+ # if config['attestation-uid-match'] == 'strict':
+ # exact_from_match = True
if config['attestation-checkmarks'] == 'fancy':
attpass = PASS_FANCY
@@ -522,50 +517,35 @@ class LoreSeries:
lmsg.followup_trailers.append(('Link', linkmask % lmsg.msgid, None))
if attpolicy != 'off':
- lore_lookup = False
- if at == 1:
- # We only hit lore on the first patch
- lore_lookup = True
- attdoc = lmsg.get_attestation(lore_lookup=lore_lookup, exact_from_match=exact_from_match)
- if attdoc is None:
+ lmsg.load_hashes()
+ latt = lmsg.attestation
+ latt.validate(lmsg.msg)
+ if latt.passing:
+ # Make sure it's not too old compared to the message date
+ # Timezone doesn't matter as we calculate whole days
+ tdelta = lmsg.date.replace(tzinfo=None) - latt.lsig.sigdate
+ if tdelta.days > attstaled:
+ # Uh-oh, attestation is too old!
+ logger.info(' %s %s', attfail, lmsg.full_subject)
+ atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected,
+ attstaled, tdelta.days))
+ else:
+ logger.info(' %s %s', attpass, lmsg.full_subject)
+ attdata[at - 1] = latt.lsig.attestor.get_trailer(lmsg.fromemail)
+ else:
if attpolicy in ('softfail', 'hardfail'):
logger.info(' %s %s', attfail, lmsg.full_subject)
- # Which part failed?
- fi = fm = fp = True
- for attdoc in ATTESTATIONS:
- for i, m, p in attdoc.hashes:
- if p == lmsg.attestation.p:
- fp = False
- if m == lmsg.attestation.m:
- fm = False
- if i == lmsg.attestation.i:
- fi = False
failed = list()
- if fp:
+ if not latt.pv:
failed.append('patch content')
- if fm:
+ if not latt.pm:
failed.append('commit message')
- if fi:
+ if not latt.pi:
failed.append('patch metadata')
atterrors.append('Patch %s/%s failed attestation (%s)' % (at, lmsg.expected,
', '.join(failed)))
else:
logger.info(' %s', lmsg.full_subject)
- else:
- if attpolicy == 'check':
- # switch to softfail policy now that we have at least one hit
- attpolicy = 'softfail'
- # Make sure it's not too old compared to the message date
- # Timezone doesn't matter as we calculate whole days
- tdelta = lmsg.date.replace(tzinfo=None) - attdoc.lsig.sigdate
- if tdelta.days > attstaled:
- # Uh-oh, attestation is too old!
- logger.info(' %s %s', attfail, lmsg.full_subject)
- atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected,
- attstaled, tdelta.days))
- else:
- logger.info(' %s %s', attpass, lmsg.full_subject)
- attdata[at-1] = attdoc.lsig.attestor.get_trailer(lmsg.fromemail)
else:
logger.info(' %s', lmsg.full_subject)
@@ -803,6 +783,8 @@ class LoreMessage:
self.attestation = None
# Patchwork hash
self.pwhash = None
+ # Git patch-id
+ self.git_patch_id = None
# Blob indexes
self.blob_indexes = None
@@ -1140,42 +1122,53 @@ class LoreMessage:
if self.attestation is not None:
return
logger.debug('Calculating hashes for: %s', self.full_subject)
+ # Calculate git-patch-id first
+ cmdargs = ['patch-id', '--stable']
+ stdin = self.msg.as_string(policy=emlpolicy).encode()
+ ecode, out = git_run_command(None, cmdargs, stdin)
+ if ecode > 0:
+ # Git doesn't think there's a patch there
+ return
+ fline = out.split('\n')[0]
+ if len(fline) < 40:
+ # Not sure what that is
+ return
+ self.git_patch_id = fline[:40]
+
msg_out = mkstemp()
patch_out = mkstemp()
cmdargs = ['mailinfo', '--encoding=UTF-8', msg_out[1], patch_out[1]]
- emlout = self.msg.as_string(policy=emlpolicy)
- ecode, info = git_run_command(None, cmdargs, emlout.encode('utf-8'))
+ ecode, info = git_run_command(None, cmdargs, stdin)
if ecode > 0:
logger.debug('ERROR: Could not get mailinfo')
return
- ihasher = hashlib.sha256()
+ i = hashlib.sha256()
+ m = hashlib.sha256()
+ p = hashlib.sha256()
for line in info.split('\n'):
# We don't use the "Date:" field because it is likely to be
# mangled between when git-format-patch generates it and
# when it is sent out by git-send-email (or other tools).
if re.search(r'^(Author|Email|Subject):', line):
- ihasher.update((line + '\n').encode('utf-8'))
- i = ihasher.hexdigest()
+ i.update((line + '\n').encode())
- with open(msg_out[1], 'r') as mfh:
+ with open(msg_out[1], 'rb') as mfh:
msg = mfh.read()
- mhasher = hashlib.sha256()
- mhasher.update(msg.encode('utf-8'))
- m = mhasher.hexdigest()
+ m.update(msg)
os.unlink(msg_out[1])
- p = None
with open(patch_out[1], 'rb') as pfh:
patch = pfh.read().decode(self.charset, errors='replace')
if len(patch.strip()):
diff = LoreMessage.get_clean_diff(patch)
- phasher = hashlib.sha256()
- phasher.update(diff.encode('utf-8'))
- p = phasher.hexdigest()
+ p.update(diff.encode())
self.pwhash = LoreMessage.get_patchwork_hash(patch)
# Load the indexes, if we have them
self.blob_indexes = LoreMessage.get_indexes(diff)
+ else:
+ p = None
+
os.unlink(patch_out[1])
if i and m and p:
@@ -1367,30 +1360,6 @@ class LoreMessage:
am_msg.set_charset('utf-8')
return am_msg
- def _load_attestation(self, lore_lookup=True):
- self.load_hashes()
- if self.attestation:
- self.attestation.validate(lore_lookup=lore_lookup)
-
- def get_attestation(self, lore_lookup=True, exact_from_match=True):
- self._load_attestation(lore_lookup=lore_lookup)
- if not self.attestation or not self.attestation.passing:
- return None
-
- for attdoc in self.attestation.attdocs:
- if not exact_from_match:
- # We return the first hit
- return attdoc
- # Does this doc have an exact match?
- uid = attdoc.lsig.attestor.get_matching_uid(self.fromemail)
- if uid[1] == self.fromemail:
- return attdoc
- # stick an error in the first available attdoc saying
- # that exact from match failed
- self.attestation.attdocs[0].errors.add('Exact UID match failed for %s' % self.fromemail)
-
- return None
-
class LoreSubject:
def __init__(self, subject):
@@ -1488,11 +1457,13 @@ class LoreAttestor:
global SUBKEY_DATA
if self.keyid not in SUBKEY_DATA:
gpgargs = ['--with-colons', '--list-keys', self.keyid]
- ecode, keyinfo = gpg_run_command(gpgargs)
+ ecode, out, err = gpg_run_command(gpgargs)
if ecode > 0:
logger.critical('ERROR: Unable to get UIDs list matching key %s', self.keyid)
return
+ keyinfo = out.decode()
+
uids = list()
for line in keyinfo.split('\n'):
if line[:4] != 'uid:':
@@ -1538,6 +1509,7 @@ class LoreAttestationSignature:
self.good = False
self.valid = False
self.trusted = False
+ self.passing = False
self.sigdate = None
self.attestor = None
self.errors = set()
@@ -1560,6 +1532,7 @@ class LoreAttestationSignature:
if ts_matches:
logger.debug(' TRUST_%s', ts_matches.groups()[0])
self.trusted = True
+ self.passing = True
else:
self.errors.add('Insufficient trust (model=%s): %s (%s)'
% (trustmodel, keyid, puid))
@@ -1585,176 +1558,121 @@ class LoreAttestationSignature:
return '\n'.join(out)
-class LoreAttestationDocument:
- def __init__(self, source, sigdata):
- self.source = source
- self.lsig = None
- self.passing = False
- self.hashes = set()
- self.errors = set()
-
- gpgargs = ['--verify', '--status-fd=1']
- config = get_main_config()
- if config['attestation-trust-model'] == 'tofu':
- gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
-
- logger.debug('Validating document obtained from %s', self.source)
- ecode, output = gpg_run_command(gpgargs, stdin=sigdata.encode('utf-8'))
- self.lsig = LoreAttestationSignature(output, config['attestation-trust-model'])
- self.errors.update(self.lsig.errors)
+class LoreAttestation:
+ def __init__(self, _i, _m, _p):
+ self.i = _i.hexdigest()
+ self.m = _m.hexdigest()
+ self.p = _p.hexdigest()
+ self.ib = base64.b64encode(_i.digest()).decode()
+ self.mb = base64.b64encode(_m.digest()).decode()
+ self.pb = base64.b64encode(_p.digest()).decode()
- if self.lsig.good and self.lsig.valid and self.lsig.trusted:
- self.passing = True
- else:
- # Not going any further
- return
+ self.hashes_header_name = 'X-Patch-Hashes'
+ self.sig_header_name = 'X-Patch-Sig'
- if source.find('http') == 0:
- # We only cache known-good attestations obtained from remote
- save_cache(sigdata, source, suffix='attestation')
-
- hg = [None, None, None]
- for line in sigdata.split('\n'):
- # It's a yaml document, but we don't parse it as yaml for safety reasons
- line = line.rstrip()
- if re.search(r'^([0-9a-f-]{26}:|-----BEGIN.*)$', line):
- if None not in hg:
- self.hashes.add(tuple(hg))
- hg = [None, None, None]
- continue
- matches = re.search(r'^\s+([imp]):\s*([0-9a-f]{64})$', line)
- if matches:
- t, v = matches.groups()
- if t == 'i':
- hg[0] = v
- elif t == 'm':
- hg[1] = v
- elif t == 'p':
- hg[2] = v
+ self.lsig = None
+ self.passing = False
+ self.iv = False
+ self.mv = False
+ self.pv = False
def __repr__(self):
out = list()
- out.append(' source: %s' % self.source)
- out.append(' --- validation errors ---')
- for error in self.errors:
- out.append(' | %s' % error)
- out.append(' --- hashes ---')
- for hg in self.hashes:
- out.append(' | %s-%s-%s' % (hg[0][:8], hg[1][:8], hg[2][:8]))
- ret = '\n'.join(out) + '\n' + str(self.lsig)
- return ret
+ out.append(' i: %s' % self.i)
+ out.append(' m: %s' % self.m)
+ out.append(' p: %s' % self.p)
+ out.append(' ib: %s' % self.ib)
+ out.append(' mb: %s' % self.mb)
+ out.append(' pb: %s' % self.pb)
+ out.append(' iv: %s' % self.iv)
+ out.append(' mv: %s' % self.mv)
+ out.append(' pv: %s' % self.pv)
+ return '\n'.join(out)
@staticmethod
- def get_from_cache(attid):
- cachedir = get_cache_dir()
- attdocs = list()
- for entry in os.listdir(cachedir):
- if entry.find('.attestation') <= 0:
- continue
- fullpath = os.path.join(cachedir, entry)
- with open(fullpath, 'r') as fh:
- content = fh.read()
- # Can't be 0, because it has to have pgp ascii wrapper
- if content.find(attid) > 0:
- attdoc = LoreAttestationDocument(fullpath, content)
- attdocs.append(attdoc)
- return attdocs
+ def verify_identity_domain(msg, identity: str, domain: str) -> bool:
+ # Domain is supposed to be present in identity
+ if not identity.endswith(domain):
+ logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity)
+ return False
+ fromeml = email.utils.getaddresses(msg.get_all('from', []))[0][1]
+ if identity.find('@') < 0:
+ logger.debug('identity must contain @ (i=%s)', identity)
+ return False
+ ilocal, idomain = identity.split('@')
+ # identity is supposed to be present in from
+ if not fromeml.endswith(f'@{idomain}'):
+ logger.debug('identity (i=%s) does not match from (from=%s)', identity, fromeml)
+ return False
+ logger.debug('identity and domain match From header')
+ return True
@staticmethod
- def get_from_lore(attid):
- attdocs = list()
- # XXX: Querying this via the Atom feed is a temporary kludge until we have
- # proper search API on lore.kernel.org
- status = None
- cachedata = get_cache(attid, suffix='lookup')
- if cachedata:
- try:
- status = int(cachedata)
- except ValueError:
- pass
- if status is not None and status != 200:
- logger.debug('Cache says looking up %s = %s', attid, status)
- return attdocs
-
+ def get_gpg_attestation(smsg: bytes, dsig: bytes) -> LoreAttestationSignature:
+ # We can't pass both the detached sig and the content on stdin, so
+ # use a temporary file
+ savefile = mkstemp('in-header-pgp-verify')[1]
+ with open(savefile, 'wb') as fh:
+ fh.write(dsig)
+
+ gpgargs = list()
config = get_main_config()
- queryurl = '%s?%s' % (config['attestation-query-url'],
- urllib.parse.urlencode({'q': attid, 'x': 'A', 'o': '-1'}))
- logger.debug('Query URL: %s', queryurl)
- session = get_requests_session()
- resp = session.get(queryurl)
- if resp.status_code != 200:
- # Record this as a bad hit
- save_cache(str(resp.status_code), attid, suffix='lookup')
-
- matches = re.findall(
- r'link\s+href="([^"]+)".*?(-----BEGIN PGP SIGNED MESSAGE-----.*?-----END PGP SIGNATURE-----)',
- resp.content.decode('utf-8'), flags=re.DOTALL
- )
-
- if matches:
- for link, sigdata in matches:
- attdoc = LoreAttestationDocument(link, sigdata)
- attdocs.append(attdoc)
-
- return attdocs
+ if config['attestation-trust-model'] == 'tofu':
+ gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
+ gpgargs += ['--verify', '--status-fd=1', savefile, '-']
+ ecode, out, err = gpg_run_command(gpgargs, stdin=smsg)
+ os.unlink(savefile)
+ return LoreAttestationSignature(out.decode(), config['attestation-trust-model'])
@staticmethod
- def load_from_file(afile):
- global ATTESTATIONS
- with open(afile, 'r') as fh:
- sigdata = fh.read()
- ATTESTATIONS.append(LoreAttestationDocument(afile, sigdata))
+ def dkim_canonicalize_header(hname, hval):
+ hname = hname.lower()
+ hval = hval.strip()
+ hval = re.sub(r'\n', '', hval)
+ hval = re.sub(r'\s+', ' ', hval)
+ return hname, hval
@staticmethod
- def load_from_string(source, content):
- global ATTESTATIONS
- ATTESTATIONS.append(LoreAttestationDocument(source, content))
-
-
-class LoreAttestation:
- def __init__(self, i, m, p):
- self.attid = '%s-%s-%s' % (i[:8], m[:8], p[:8])
- self.i = i
- self.m = m
- self.p = p
- self.passing = False
- self.attdocs = list()
+ def get_parts_from_header(hstr: str) -> dict:
+ hstr = re.sub(r'\s*', '', hstr)
+ hdata = dict()
+ for chunk in hstr.split(';'):
+ parts = chunk.split('=', 1)
+ if len(parts) < 2:
+ continue
+ hdata[parts[0]] = parts[1]
+ return hdata
- def _check_if_passing(self):
- global ATTESTATIONS
- hg = (self.i, self.m, self.p)
- for attdoc in ATTESTATIONS:
- if hg in attdoc.hashes and attdoc.passing:
+ def validate(self, msg):
+ shdr = msg.get(self.sig_header_name)
+ if shdr is None:
+ return None
+ sdata = LoreAttestation.get_parts_from_header(shdr)
+ sig = base64.b64decode(sdata['b'])
+ headers = list()
+ hhname, hhval = LoreAttestation.dkim_canonicalize_header(self.hashes_header_name,
+ str(msg.get(self.hashes_header_name)))
+ headers.append(f'{hhname}:{hhval}')
+ # Now we add the sig header itself, without b= content
+ shname, shval = LoreAttestation.dkim_canonicalize_header(self.sig_header_name, shdr)
+ shval = shval.rsplit('; b=')[0] + '; b='
+ headers.append(f'{shname}:{shval}')
+ payload = ('\r\n'.join(headers)).encode()
+ self.lsig = LoreAttestation.get_gpg_attestation(payload, sig)
+ if self.lsig.passing:
+ hdata = LoreAttestation.get_parts_from_header(hhval)
+ if hdata['i'] == self.ib:
+ self.iv = True
+ if hdata['m'] == self.mb:
+ self.mv = True
+ if hdata['p'] == self.pb:
+ self.pv = True
+
+ if self.iv and self.mv and self.pv:
self.passing = True
- self.attdocs.append(attdoc)
-
- def validate(self, lore_lookup=True):
- global ATTESTATIONS
- self._check_if_passing()
-
- if not len(self.attdocs):
- attdocs = LoreAttestationDocument.get_from_cache(self.attid)
- ATTESTATIONS += attdocs
- self._check_if_passing()
- if not len(self.attdocs) and lore_lookup:
- attdocs = LoreAttestationDocument.get_from_lore(self.attid)
- ATTESTATIONS += attdocs
- self._check_if_passing()
- def __repr__(self):
- out = list()
- out.append(' attid: %s' % self.attid)
- out.append(' i: %s' % self.i)
- out.append(' m: %s' % self.m)
- out.append(' p: %s' % self.p)
- out.append(' --- attdocs ---')
- for attdoc in self.attdocs:
- out.append(str(attdoc))
- return '\n'.join(out)
-
-
-def _run_command(cmdargs, stdin=None, logstderr=False):
+def _run_command(cmdargs, stdin=None):
logger.debug('Running %s' % ' '.join(cmdargs))
sp = subprocess.Popen(cmdargs,
@@ -1764,24 +1682,17 @@ def _run_command(cmdargs, stdin=None, logstderr=False):
(output, error) = sp.communicate(input=stdin)
- output = output.decode('utf-8', errors='replace')
-
- if logstderr and len(error.strip()):
- errout = error.decode('utf-8', errors='replace')
- logger.debug('Stderr: %s', errout)
- output += errout
+ return sp.returncode, output, error
- return sp.returncode, output
-
-def gpg_run_command(args, stdin=None, logstderr=False):
+def gpg_run_command(args, stdin=None):
config = get_main_config()
cmdargs = [config['gpgbin'], '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb']
if config['attestation-gnupghome'] is not None:
cmdargs += ['--homedir', config['attestation-gnupghome']]
cmdargs += args
- return _run_command(cmdargs, stdin=stdin, logstderr=logstderr)
+ return _run_command(cmdargs, stdin=stdin)
def git_run_command(gitdir, args, stdin=None, logstderr=False):
@@ -1792,7 +1703,16 @@ def git_run_command(gitdir, args, stdin=None, logstderr=False):
cmdargs += ['--git-dir', gitdir]
cmdargs += args
- return _run_command(cmdargs, stdin=stdin, logstderr=logstderr)
+ ecode, out, err = _run_command(cmdargs, stdin=stdin)
+
+ out = out.decode(errors='replace')
+
+ if logstderr and len(err.strip()):
+ err = err.decode(errors='replace')
+ logger.debug('Stderr: %s', err)
+ out += err
+
+ return ecode, out
def git_get_command_lines(gitdir, args):