aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-10-02 21:03:25 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-10-02 21:03:25 -0400
commit0d759ee9113b371a678c4acafffafa79570e9511 (patch)
tree5f2b074ea2c81208802c1db1efef29c0a1c438b1
parentca6d35e7728c17b505e6be62ec3b6687aa5bf26b (diff)
downloadb4-0d759ee9113b371a678c4acafffafa79570e9511.tar.gz
Reimplement attestation for in-header hashes
Rewrite attestation to implement in-header hashing and signing. For now, just implementing mode=pgp, but other modes are coming next. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rwxr-xr-xb4-send-email.sh9
-rw-r--r--b4/__init__.py408
-rw-r--r--b4/attest.py344
-rw-r--r--b4/command.py15
4 files changed, 366 insertions, 410 deletions
diff --git a/b4-send-email.sh b/b4-send-email.sh
new file mode 100755
index 0000000..9dd4640
--- /dev/null
+++ b/b4-send-email.sh
@@ -0,0 +1,9 @@
+#!/usr/bin/env bash
+#
+# Wrapper for running b4-send-email from checkout
+#
+
+REAL_SCRIPT=$(realpath -e ${BASH_SOURCE[0]})
+SCRIPT_TOP="${SCRIPT_TOP:-$(dirname ${REAL_SCRIPT})}"
+
+exec env PYTHONPATH="${SCRIPT_TOP}" python3 "${SCRIPT_TOP}/b4/attest.py" "${@}"
diff --git a/b4/__init__.py b/b4/__init__.py
index dc393a9..9cd1f87 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -19,6 +19,7 @@ import time
import shutil
import mailbox
import pwd
+import base64
from pathlib import Path
from tempfile import mkstemp, TemporaryDirectory
@@ -353,12 +354,6 @@ class LoreMailbox:
self.covers[lmsg.revision] = lmsg
return
- if re.search(r'^Comment: att-fmt-ver:', lmsg.body, re.I | re.M):
- logger.debug('Found attestation message')
- LoreAttestationDocument.load_from_string(lmsg.msgid, lmsg.body)
- # We don't keep it, because it's not useful for us beyond this point
- return
-
if lmsg.has_diff:
if lmsg.revision not in self.series:
if lmsg.revision_inferred and lmsg.in_reply_to:
@@ -490,9 +485,9 @@ class LoreSeries:
attstaled = int(config['attestation-staleness-days'])
except ValueError:
attstaled = 30
- exact_from_match = False
- if config['attestation-uid-match'] == 'strict':
- exact_from_match = True
+ # exact_from_match = False
+ # if config['attestation-uid-match'] == 'strict':
+ # exact_from_match = True
if config['attestation-checkmarks'] == 'fancy':
attpass = PASS_FANCY
@@ -522,50 +517,35 @@ class LoreSeries:
lmsg.followup_trailers.append(('Link', linkmask % lmsg.msgid, None))
if attpolicy != 'off':
- lore_lookup = False
- if at == 1:
- # We only hit lore on the first patch
- lore_lookup = True
- attdoc = lmsg.get_attestation(lore_lookup=lore_lookup, exact_from_match=exact_from_match)
- if attdoc is None:
+ lmsg.load_hashes()
+ latt = lmsg.attestation
+ latt.validate(lmsg.msg)
+ if latt.passing:
+ # Make sure it's not too old compared to the message date
+ # Timezone doesn't matter as we calculate whole days
+ tdelta = lmsg.date.replace(tzinfo=None) - latt.lsig.sigdate
+ if tdelta.days > attstaled:
+ # Uh-oh, attestation is too old!
+ logger.info(' %s %s', attfail, lmsg.full_subject)
+ atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected,
+ attstaled, tdelta.days))
+ else:
+ logger.info(' %s %s', attpass, lmsg.full_subject)
+ attdata[at - 1] = latt.lsig.attestor.get_trailer(lmsg.fromemail)
+ else:
if attpolicy in ('softfail', 'hardfail'):
logger.info(' %s %s', attfail, lmsg.full_subject)
- # Which part failed?
- fi = fm = fp = True
- for attdoc in ATTESTATIONS:
- for i, m, p in attdoc.hashes:
- if p == lmsg.attestation.p:
- fp = False
- if m == lmsg.attestation.m:
- fm = False
- if i == lmsg.attestation.i:
- fi = False
failed = list()
- if fp:
+ if not latt.pv:
failed.append('patch content')
- if fm:
+ if not latt.pm:
failed.append('commit message')
- if fi:
+ if not latt.pi:
failed.append('patch metadata')
atterrors.append('Patch %s/%s failed attestation (%s)' % (at, lmsg.expected,
', '.join(failed)))
else:
logger.info(' %s', lmsg.full_subject)
- else:
- if attpolicy == 'check':
- # switch to softfail policy now that we have at least one hit
- attpolicy = 'softfail'
- # Make sure it's not too old compared to the message date
- # Timezone doesn't matter as we calculate whole days
- tdelta = lmsg.date.replace(tzinfo=None) - attdoc.lsig.sigdate
- if tdelta.days > attstaled:
- # Uh-oh, attestation is too old!
- logger.info(' %s %s', attfail, lmsg.full_subject)
- atterrors.append('Attestation for %s/%s is over %sd old: %sd' % (at, lmsg.expected,
- attstaled, tdelta.days))
- else:
- logger.info(' %s %s', attpass, lmsg.full_subject)
- attdata[at-1] = attdoc.lsig.attestor.get_trailer(lmsg.fromemail)
else:
logger.info(' %s', lmsg.full_subject)
@@ -803,6 +783,8 @@ class LoreMessage:
self.attestation = None
# Patchwork hash
self.pwhash = None
+ # Git patch-id
+ self.git_patch_id = None
# Blob indexes
self.blob_indexes = None
@@ -1140,42 +1122,53 @@ class LoreMessage:
if self.attestation is not None:
return
logger.debug('Calculating hashes for: %s', self.full_subject)
+ # Calculate git-patch-id first
+ cmdargs = ['patch-id', '--stable']
+ stdin = self.msg.as_string(policy=emlpolicy).encode()
+ ecode, out = git_run_command(None, cmdargs, stdin)
+ if ecode > 0:
+ # Git doesn't think there's a patch there
+ return
+ fline = out.split('\n')[0]
+ if len(fline) < 40:
+ # Not sure what that is
+ return
+ self.git_patch_id = fline[:40]
+
msg_out = mkstemp()
patch_out = mkstemp()
cmdargs = ['mailinfo', '--encoding=UTF-8', msg_out[1], patch_out[1]]
- emlout = self.msg.as_string(policy=emlpolicy)
- ecode, info = git_run_command(None, cmdargs, emlout.encode('utf-8'))
+ ecode, info = git_run_command(None, cmdargs, stdin)
if ecode > 0:
logger.debug('ERROR: Could not get mailinfo')
return
- ihasher = hashlib.sha256()
+ i = hashlib.sha256()
+ m = hashlib.sha256()
+ p = hashlib.sha256()
for line in info.split('\n'):
# We don't use the "Date:" field because it is likely to be
# mangled between when git-format-patch generates it and
# when it is sent out by git-send-email (or other tools).
if re.search(r'^(Author|Email|Subject):', line):
- ihasher.update((line + '\n').encode('utf-8'))
- i = ihasher.hexdigest()
+ i.update((line + '\n').encode())
- with open(msg_out[1], 'r') as mfh:
+ with open(msg_out[1], 'rb') as mfh:
msg = mfh.read()
- mhasher = hashlib.sha256()
- mhasher.update(msg.encode('utf-8'))
- m = mhasher.hexdigest()
+ m.update(msg)
os.unlink(msg_out[1])
- p = None
with open(patch_out[1], 'rb') as pfh:
patch = pfh.read().decode(self.charset, errors='replace')
if len(patch.strip()):
diff = LoreMessage.get_clean_diff(patch)
- phasher = hashlib.sha256()
- phasher.update(diff.encode('utf-8'))
- p = phasher.hexdigest()
+ p.update(diff.encode())
self.pwhash = LoreMessage.get_patchwork_hash(patch)
# Load the indexes, if we have them
self.blob_indexes = LoreMessage.get_indexes(diff)
+ else:
+ p = None
+
os.unlink(patch_out[1])
if i and m and p:
@@ -1367,30 +1360,6 @@ class LoreMessage:
am_msg.set_charset('utf-8')
return am_msg
- def _load_attestation(self, lore_lookup=True):
- self.load_hashes()
- if self.attestation:
- self.attestation.validate(lore_lookup=lore_lookup)
-
- def get_attestation(self, lore_lookup=True, exact_from_match=True):
- self._load_attestation(lore_lookup=lore_lookup)
- if not self.attestation or not self.attestation.passing:
- return None
-
- for attdoc in self.attestation.attdocs:
- if not exact_from_match:
- # We return the first hit
- return attdoc
- # Does this doc have an exact match?
- uid = attdoc.lsig.attestor.get_matching_uid(self.fromemail)
- if uid[1] == self.fromemail:
- return attdoc
- # stick an error in the first available attdoc saying
- # that exact from match failed
- self.attestation.attdocs[0].errors.add('Exact UID match failed for %s' % self.fromemail)
-
- return None
-
class LoreSubject:
def __init__(self, subject):
@@ -1488,11 +1457,13 @@ class LoreAttestor:
global SUBKEY_DATA
if self.keyid not in SUBKEY_DATA:
gpgargs = ['--with-colons', '--list-keys', self.keyid]
- ecode, keyinfo = gpg_run_command(gpgargs)
+ ecode, out, err = gpg_run_command(gpgargs)
if ecode > 0:
logger.critical('ERROR: Unable to get UIDs list matching key %s', self.keyid)
return
+ keyinfo = out.decode()
+
uids = list()
for line in keyinfo.split('\n'):
if line[:4] != 'uid:':
@@ -1538,6 +1509,7 @@ class LoreAttestationSignature:
self.good = False
self.valid = False
self.trusted = False
+ self.passing = False
self.sigdate = None
self.attestor = None
self.errors = set()
@@ -1560,6 +1532,7 @@ class LoreAttestationSignature:
if ts_matches:
logger.debug(' TRUST_%s', ts_matches.groups()[0])
self.trusted = True
+ self.passing = True
else:
self.errors.add('Insufficient trust (model=%s): %s (%s)'
% (trustmodel, keyid, puid))
@@ -1585,176 +1558,121 @@ class LoreAttestationSignature:
return '\n'.join(out)
-class LoreAttestationDocument:
- def __init__(self, source, sigdata):
- self.source = source
- self.lsig = None
- self.passing = False
- self.hashes = set()
- self.errors = set()
-
- gpgargs = ['--verify', '--status-fd=1']
- config = get_main_config()
- if config['attestation-trust-model'] == 'tofu':
- gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
-
- logger.debug('Validating document obtained from %s', self.source)
- ecode, output = gpg_run_command(gpgargs, stdin=sigdata.encode('utf-8'))
- self.lsig = LoreAttestationSignature(output, config['attestation-trust-model'])
- self.errors.update(self.lsig.errors)
+class LoreAttestation:
+ def __init__(self, _i, _m, _p):
+ self.i = _i.hexdigest()
+ self.m = _m.hexdigest()
+ self.p = _p.hexdigest()
+ self.ib = base64.b64encode(_i.digest()).decode()
+ self.mb = base64.b64encode(_m.digest()).decode()
+ self.pb = base64.b64encode(_p.digest()).decode()
- if self.lsig.good and self.lsig.valid and self.lsig.trusted:
- self.passing = True
- else:
- # Not going any further
- return
+ self.hashes_header_name = 'X-Patch-Hashes'
+ self.sig_header_name = 'X-Patch-Sig'
- if source.find('http') == 0:
- # We only cache known-good attestations obtained from remote
- save_cache(sigdata, source, suffix='attestation')
-
- hg = [None, None, None]
- for line in sigdata.split('\n'):
- # It's a yaml document, but we don't parse it as yaml for safety reasons
- line = line.rstrip()
- if re.search(r'^([0-9a-f-]{26}:|-----BEGIN.*)$', line):
- if None not in hg:
- self.hashes.add(tuple(hg))
- hg = [None, None, None]
- continue
- matches = re.search(r'^\s+([imp]):\s*([0-9a-f]{64})$', line)
- if matches:
- t, v = matches.groups()
- if t == 'i':
- hg[0] = v
- elif t == 'm':
- hg[1] = v
- elif t == 'p':
- hg[2] = v
+ self.lsig = None
+ self.passing = False
+ self.iv = False
+ self.mv = False
+ self.pv = False
def __repr__(self):
out = list()
- out.append(' source: %s' % self.source)
- out.append(' --- validation errors ---')
- for error in self.errors:
- out.append(' | %s' % error)
- out.append(' --- hashes ---')
- for hg in self.hashes:
- out.append(' | %s-%s-%s' % (hg[0][:8], hg[1][:8], hg[2][:8]))
- ret = '\n'.join(out) + '\n' + str(self.lsig)
- return ret
+ out.append(' i: %s' % self.i)
+ out.append(' m: %s' % self.m)
+ out.append(' p: %s' % self.p)
+ out.append(' ib: %s' % self.ib)
+ out.append(' mb: %s' % self.mb)
+ out.append(' pb: %s' % self.pb)
+ out.append(' iv: %s' % self.iv)
+ out.append(' mv: %s' % self.mv)
+ out.append(' pv: %s' % self.pv)
+ return '\n'.join(out)
@staticmethod
- def get_from_cache(attid):
- cachedir = get_cache_dir()
- attdocs = list()
- for entry in os.listdir(cachedir):
- if entry.find('.attestation') <= 0:
- continue
- fullpath = os.path.join(cachedir, entry)
- with open(fullpath, 'r') as fh:
- content = fh.read()
- # Can't be 0, because it has to have pgp ascii wrapper
- if content.find(attid) > 0:
- attdoc = LoreAttestationDocument(fullpath, content)
- attdocs.append(attdoc)
- return attdocs
+ def verify_identity_domain(msg, identity: str, domain: str) -> bool:
+ # Domain is supposed to be present in identity
+ if not identity.endswith(domain):
+ logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity)
+ return False
+ fromeml = email.utils.getaddresses(msg.get_all('from', []))[0][1]
+ if identity.find('@') < 0:
+ logger.debug('identity must contain @ (i=%s)', identity)
+ return False
+ ilocal, idomain = identity.split('@')
+ # identity is supposed to be present in from
+ if not fromeml.endswith(f'@{idomain}'):
+ logger.debug('identity (i=%s) does not match from (from=%s)', identity, fromeml)
+ return False
+ logger.debug('identity and domain match From header')
+ return True
@staticmethod
- def get_from_lore(attid):
- attdocs = list()
- # XXX: Querying this via the Atom feed is a temporary kludge until we have
- # proper search API on lore.kernel.org
- status = None
- cachedata = get_cache(attid, suffix='lookup')
- if cachedata:
- try:
- status = int(cachedata)
- except ValueError:
- pass
- if status is not None and status != 200:
- logger.debug('Cache says looking up %s = %s', attid, status)
- return attdocs
-
+ def get_gpg_attestation(smsg: bytes, dsig: bytes) -> LoreAttestationSignature:
+ # We can't pass both the detached sig and the content on stdin, so
+ # use a temporary file
+ savefile = mkstemp('in-header-pgp-verify')[1]
+ with open(savefile, 'wb') as fh:
+ fh.write(dsig)
+
+ gpgargs = list()
config = get_main_config()
- queryurl = '%s?%s' % (config['attestation-query-url'],
- urllib.parse.urlencode({'q': attid, 'x': 'A', 'o': '-1'}))
- logger.debug('Query URL: %s', queryurl)
- session = get_requests_session()
- resp = session.get(queryurl)
- if resp.status_code != 200:
- # Record this as a bad hit
- save_cache(str(resp.status_code), attid, suffix='lookup')
-
- matches = re.findall(
- r'link\s+href="([^"]+)".*?(-----BEGIN PGP SIGNED MESSAGE-----.*?-----END PGP SIGNATURE-----)',
- resp.content.decode('utf-8'), flags=re.DOTALL
- )
-
- if matches:
- for link, sigdata in matches:
- attdoc = LoreAttestationDocument(link, sigdata)
- attdocs.append(attdoc)
-
- return attdocs
+ if config['attestation-trust-model'] == 'tofu':
+ gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
+ gpgargs += ['--verify', '--status-fd=1', savefile, '-']
+ ecode, out, err = gpg_run_command(gpgargs, stdin=smsg)
+ os.unlink(savefile)
+ return LoreAttestationSignature(out.decode(), config['attestation-trust-model'])
@staticmethod
- def load_from_file(afile):
- global ATTESTATIONS
- with open(afile, 'r') as fh:
- sigdata = fh.read()
- ATTESTATIONS.append(LoreAttestationDocument(afile, sigdata))
+ def dkim_canonicalize_header(hname, hval):
+ hname = hname.lower()
+ hval = hval.strip()
+ hval = re.sub(r'\n', '', hval)
+ hval = re.sub(r'\s+', ' ', hval)
+ return hname, hval
@staticmethod
- def load_from_string(source, content):
- global ATTESTATIONS
- ATTESTATIONS.append(LoreAttestationDocument(source, content))
-
-
-class LoreAttestation:
- def __init__(self, i, m, p):
- self.attid = '%s-%s-%s' % (i[:8], m[:8], p[:8])
- self.i = i
- self.m = m
- self.p = p
- self.passing = False
- self.attdocs = list()
+ def get_parts_from_header(hstr: str) -> dict:
+ hstr = re.sub(r'\s*', '', hstr)
+ hdata = dict()
+ for chunk in hstr.split(';'):
+ parts = chunk.split('=', 1)
+ if len(parts) < 2:
+ continue
+ hdata[parts[0]] = parts[1]
+ return hdata
- def _check_if_passing(self):
- global ATTESTATIONS
- hg = (self.i, self.m, self.p)
- for attdoc in ATTESTATIONS:
- if hg in attdoc.hashes and attdoc.passing:
+ def validate(self, msg):
+ shdr = msg.get(self.sig_header_name)
+ if shdr is None:
+ return None
+ sdata = LoreAttestation.get_parts_from_header(shdr)
+ sig = base64.b64decode(sdata['b'])
+ headers = list()
+ hhname, hhval = LoreAttestation.dkim_canonicalize_header(self.hashes_header_name,
+ str(msg.get(self.hashes_header_name)))
+ headers.append(f'{hhname}:{hhval}')
+ # Now we add the sig header itself, without b= content
+ shname, shval = LoreAttestation.dkim_canonicalize_header(self.sig_header_name, shdr)
+ shval = shval.rsplit('; b=')[0] + '; b='
+ headers.append(f'{shname}:{shval}')
+ payload = ('\r\n'.join(headers)).encode()
+ self.lsig = LoreAttestation.get_gpg_attestation(payload, sig)
+ if self.lsig.passing:
+ hdata = LoreAttestation.get_parts_from_header(hhval)
+ if hdata['i'] == self.ib:
+ self.iv = True
+ if hdata['m'] == self.mb:
+ self.mv = True
+ if hdata['p'] == self.pb:
+ self.pv = True
+
+ if self.iv and self.mv and self.pv:
self.passing = True
- self.attdocs.append(attdoc)
-
- def validate(self, lore_lookup=True):
- global ATTESTATIONS
- self._check_if_passing()
-
- if not len(self.attdocs):
- attdocs = LoreAttestationDocument.get_from_cache(self.attid)
- ATTESTATIONS += attdocs
- self._check_if_passing()
- if not len(self.attdocs) and lore_lookup:
- attdocs = LoreAttestationDocument.get_from_lore(self.attid)
- ATTESTATIONS += attdocs
- self._check_if_passing()
- def __repr__(self):
- out = list()
- out.append(' attid: %s' % self.attid)
- out.append(' i: %s' % self.i)
- out.append(' m: %s' % self.m)
- out.append(' p: %s' % self.p)
- out.append(' --- attdocs ---')
- for attdoc in self.attdocs:
- out.append(str(attdoc))
- return '\n'.join(out)
-
-
-def _run_command(cmdargs, stdin=None, logstderr=False):
+def _run_command(cmdargs, stdin=None):
logger.debug('Running %s' % ' '.join(cmdargs))
sp = subprocess.Popen(cmdargs,
@@ -1764,24 +1682,17 @@ def _run_command(cmdargs, stdin=None, logstderr=False):
(output, error) = sp.communicate(input=stdin)
- output = output.decode('utf-8', errors='replace')
-
- if logstderr and len(error.strip()):
- errout = error.decode('utf-8', errors='replace')
- logger.debug('Stderr: %s', errout)
- output += errout
+ return sp.returncode, output, error
- return sp.returncode, output
-
-def gpg_run_command(args, stdin=None, logstderr=False):
+def gpg_run_command(args, stdin=None):
config = get_main_config()
cmdargs = [config['gpgbin'], '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb']
if config['attestation-gnupghome'] is not None:
cmdargs += ['--homedir', config['attestation-gnupghome']]
cmdargs += args
- return _run_command(cmdargs, stdin=stdin, logstderr=logstderr)
+ return _run_command(cmdargs, stdin=stdin)
def git_run_command(gitdir, args, stdin=None, logstderr=False):
@@ -1792,7 +1703,16 @@ def git_run_command(gitdir, args, stdin=None, logstderr=False):
cmdargs += ['--git-dir', gitdir]
cmdargs += args
- return _run_command(cmdargs, stdin=stdin, logstderr=logstderr)
+ ecode, out, err = _run_command(cmdargs, stdin=stdin)
+
+ out = out.decode(errors='replace')
+
+ if logstderr and len(err.strip()):
+ err = err.decode(errors='replace')
+ logger.debug('Stderr: %s', err)
+ out += err
+
+ return ecode, out
def git_get_command_lines(gitdir, args):
diff --git a/b4/attest.py b/b4/attest.py
index 5f9a2b4..4391b19 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -5,178 +5,206 @@
#
import sys
-import os
-import re
import email
import email.utils
import email.message
+import email.header
import smtplib
-import mailbox
import b4
+import argparse
+import base64
+import logging
logger = b4.logger
-def create_attestation(cmdargs):
- attlines = list()
- subject = 'Patch attestation'
- for patchfile in cmdargs.patchfile:
- with open(patchfile, 'r', encoding='utf-8') as fh:
- content = fh.read()
- if content.find('From') != 0:
- logger.info('SKIP | %s', os.path.basename(patchfile))
- continue
- msg = email.message_from_string(content)
- lmsg = b4.LoreMessage(msg)
- lmsg.load_hashes()
- att = lmsg.attestation
- if att is None:
- logger.info('SKIP | %s', os.path.basename(patchfile))
- # See if it's a cover letter
- if lmsg.counters_inferred or lmsg.counter > 0:
- # No
- continue
- newprefs = list()
- for prefix in lmsg.lsubject.prefixes:
- if prefix.lower() == 'patch':
- newprefs.append('PSIGN')
- elif prefix == '%s/%s' % (lmsg.counter, lmsg.expected):
- newprefs.append('X/%s' % lmsg.expected)
- else:
- newprefs.append(prefix)
- subject = '[%s] %s' % (' '.join(newprefs), lmsg.subject)
- continue
- logger.info('HASH | %s', os.path.basename(patchfile))
- attlines.append('%s:' % att.attid)
- attlines.append(' i: %s' % att.i)
- attlines.append(' m: %s' % att.m)
- attlines.append(' p: %s' % att.p)
-
- payload = '\n'.join(attlines)
-
- usercfg = b4.get_user_config()
- gpgargs = list()
- if 'signingkey' in usercfg:
- gpgargs += ['-u', usercfg['signingkey']]
- gpgargs += ['--clearsign',
- '--comment', 'att-fmt-ver: %s' % b4.ATTESTATION_FORMAT_VER,
- '--comment', 'att-hash: sha256',
- ]
-
- ecode, signed = b4.gpg_run_command(gpgargs, stdin=payload.encode('utf-8'))
- if ecode > 0:
- config = b4.get_main_config()
- logger.critical('ERROR: Unable to sign using %s', config['gpgbin'])
+def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None:
+ if lmsg.msg.get(lmsg.attestation.hashes_header_name):
+ if not replace:
+ logger.info(' attest: message already attested')
+ return
+ del lmsg.msg[lmsg.attestation.hashes_header_name]
+ del lmsg.msg[lmsg.attestation.sig_header_name]
+
+ logger.info(' attest: generating attestation hashes')
+ if not lmsg.attestation:
+ raise RuntimeError('Could not calculate patch attestation')
+
+ headers = list()
+ hparts = [
+ 'v=1',
+ 'h=sha256',
+ f'g={lmsg.git_patch_id}',
+ f'i={lmsg.attestation.ib}',
+ f'm={lmsg.attestation.mb}',
+ f'p={lmsg.attestation.pb}',
+ ]
+ hhname, hhval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
+ headers.append(f'{hhname}:{hhval}')
+
+ logger.debug('Signing with mode=%s', mode)
+ if mode == 'pgp':
+ usercfg = b4.get_user_config()
+ keyid = usercfg.get('signingkey')
+ if not keyid:
+ raise RuntimeError('Please set user.signingKey to use this feature')
+
+ logger.debug('Using i=%s, s=0x%s', lmsg.fromemail, keyid.rstrip('!'))
+ gpgargs = ['-b', '-u', f'{keyid}']
+
+ hparts = [
+ 'm=pgp',
+ f'i={lmsg.fromemail}',
+ 's=0x%s' % keyid.rstrip('!'),
+ 'b=',
+ ]
+
+ shname, shval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
+ headers.append(f'{shname}:{shval}')
+ payload = '\r\n'.join(headers).encode()
+ ecode, out, err = b4.gpg_run_command(gpgargs, payload)
+ if ecode > 0:
+ logger.critical('Running gpg failed')
+ logger.critical(err.decode())
+ raise RuntimeError('Running gpg failed')
+ bdata = base64.b64encode(out).decode()
+ shval += header_splitter(bdata)
+ else:
+ raise NotImplementedError('Mode %s not implemented' % mode)
+
+ hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78)
+ shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
+ lmsg.msg[lmsg.attestation.hashes_header_name] = hhdr
+ lmsg.msg[lmsg.attestation.sig_header_name] = shdr
+
+
+def header_splitter(longstr: str, limit: int = 77) -> str:
+ splitstr = list()
+ first = True
+ while len(longstr) > limit:
+ at = limit
+ if first:
+ first = False
+ at -= 2
+ splitstr.append(longstr[:at])
+ longstr = longstr[at:]
+ splitstr.append(longstr)
+ return ' '.join(splitstr)
+
+
+def attest_and_send(cmdargs: argparse.Namespace):
+ # Grab the message from stdin as bytes
+ if sys.stdin.isatty():
+ logger.critical('Pass the message to attest as stdin')
sys.exit(1)
- att_msg = email.message.EmailMessage()
- att_msg.set_payload(signed.encode('utf-8'))
- sender = cmdargs.sender
- if '>' not in sender:
- sender = '<%s>' % sender
- att_msg['From'] = sender
- att_msg['To'] = '<signatures@kernel.org>'
- att_msg['Message-Id'] = email.utils.make_msgid(domain='kernel.org')
- att_msg['Subject'] = subject
-
- logger.info('---')
- if not cmdargs.nosubmit:
- # Try to deliver it via mail.kernel.org
- try:
- mailserver = smtplib.SMTP('mail.kernel.org', 587)
- # identify ourselves to smtp gmail client
- mailserver.ehlo()
- # secure our email with tls encryption
- mailserver.starttls()
- # re-identify ourselves as an encrypted connection
- mailserver.ehlo()
- logger.info('Delivering via mail.kernel.org')
- mailserver.sendmail('devnull@kernel.org', 'signatures@kernel.org', att_msg.as_string())
- mailserver.quit()
- sys.exit(0)
- except Exception as ex:
- logger.info('Could not deliver: %s', ex)
-
- # Future iterations will also be able to submit this to a RESTful URL
- # at git.kernel.org, in order not to depend on avaialbility of SMTP gateways
- with open(cmdargs.output, 'wb') as fh:
- fh.write(att_msg.as_bytes())
-
- logger.info('Wrote %s', cmdargs.output)
- logger.info('You can send it using:')
- logger.info(' sendmail -oi signatures@kernel.org < %s', cmdargs.output)
- logger.info(' mutt -H %s', cmdargs.output)
-
-
-def verify_attestation(cmdargs):
- config = b4.get_main_config()
- if cmdargs.tofu:
- config['attestation-trust-model'] = 'tofu'
-
- exact_from_match = True
- if cmdargs.ignorefrom:
- exact_from_match = False
-
- mbx = mailbox.mbox(cmdargs.mbox[0])
- if cmdargs.attfile:
- b4.LoreAttestationDocument.load_from_file(cmdargs.attfile)
- eligible = list()
- for msg in mbx:
- lmsg = b4.LoreMessage(msg)
- if lmsg.has_diff:
- eligible.append(lmsg)
- continue
- # See if body has "att-fmt-ver
- if re.search(r'^Comment: att-fmt-ver:', lmsg.body, re.I | re.M):
- logger.debug('Found attestation message')
- b4.LoreAttestationDocument.load_from_string(lmsg.msgid, lmsg.body)
+ inbytes = sys.stdin.buffer.read()
+ msg = email.message_from_bytes(inbytes)
+ lmsg = b4.LoreMessage(msg)
+ lmsg.load_hashes()
+ if not lmsg.attestation:
+ logger.debug('Nothing to attest in %s, sending as-is')
+ outbytes = inbytes
+ else:
+ in_header_attest(lmsg)
+ outbytes = lmsg.msg.as_bytes()
- logger.debug('SKIP | %s', msg['Subject'])
+ if cmdargs.nosend:
+ logger.info('--- MESSAGE FOLLOWS ---')
+ sys.stdout.buffer.write(outbytes)
+ return
- if not len(eligible):
- logger.error('No patches found in %s', cmdargs.mbox[0])
- sys.exit(1)
+ if cmdargs.identity:
+ cfgname = f'sendemail\\.{cmdargs.identity}\\..*'
+ else:
+ cfgname = 'sendemail\\..*'
- logger.info('---')
- attrailers = set()
- ecode = 1
- if config['attestation-checkmarks'] == 'fancy':
- attpass = b4.PASS_FANCY
- attfail = b4.FAIL_FANCY
+ scfg = b4.get_config_from_git(cfgname)
+ sserver = scfg.get('smtpserver')
+ if not sserver:
+ logger.critical('MISSING: smtpserver option in %s', cfgname)
+ sys.exit(1)
+ if sserver[0] == '/':
+ args = [sserver, '-i'] + cmdargs.recipients
+ extraopts = scfg.get('smtpserveroption')
+ if extraopts:
+ args += extraopts.split()
+ ecode, out, err = b4._run_command(args, outbytes) # noqa
+ sys.stdout.buffer.write(out)
+ sys.stderr.buffer.write(err)
+ sys.exit(ecode)
+
+ sport = int(scfg.get('smtpserverport', '0'))
+ sdomain = scfg.get('smtpdomain')
+ suser = scfg.get('smtpuser')
+ spass = scfg.get('smtppass')
+ senc = scfg.get('smtpencryption', 'tls')
+ sfrom = scfg.get('from')
+ if not sfrom:
+ sfrom = lmsg.fromemail
+
+ logger.info('Connecting to %s', sserver)
+ if senc == 'ssl':
+ sconn = smtplib.SMTP_SSL(host=sserver, port=sport, local_hostname=sdomain)
else:
- attpass = b4.PASS_SIMPLE
- attfail = b4.FAIL_SIMPLE
-
- for lmsg in eligible:
- attdoc = lmsg.get_attestation(lore_lookup=True, exact_from_match=exact_from_match)
- if not attdoc:
- logger.critical('%s %s', attfail, lmsg.full_subject)
- if not cmdargs.nofast:
- logger.critical('Aborting due to failure.')
- ecode = 1
- break
- else:
- ecode = 128
- continue
- if ecode != 128:
- ecode = 0
- logger.critical('%s %s', attpass, lmsg.full_subject)
- attrailers.add(attdoc.lsig.attestor.get_trailer(lmsg.fromemail))
-
- logger.critical('---')
- if ecode > 0:
- logger.critical('Attestation verification failed.')
- errors = set()
- for attdoc in b4.ATTESTATIONS:
- errors.update(attdoc.errors)
- if len(errors):
- logger.critical('---')
- logger.critical('The validation process reported the following errors:')
- for error in errors:
- logger.critical(' %s %s', attfail, error)
+ sconn = smtplib.SMTP(host=sserver, port=sport, local_hostname=sdomain)
+ if senc == 'tls':
+ sconn.starttls()
+ if suser:
+ logger.info('Logging in as user %s', suser)
+ sconn.login(suser, spass)
+
+ logger.info('Sending %s', lmsg.full_subject)
+ sconn.sendmail(sfrom, cmdargs.recipients, outbytes)
+
+
+def attest_patches(cmdargs: argparse.Namespace) -> None:
+ for pf in cmdargs.patchfile:
+ with open(pf, 'rb') as fh:
+ msg = email.message_from_bytes(fh.read())
+ lmsg = b4.LoreMessage(msg)
+ lmsg.load_hashes()
+ if not lmsg.attestation:
+ logger.debug('Nothing to attest in %s, skipped')
+ continue
+ logger.info('Attesting: %s', pf)
+ in_header_attest(lmsg, replace=True)
+ with open(pf, 'wb') as fh:
+ fh.write(lmsg.msg.as_bytes())
+
+
+if __name__ == '__main__':
+ # Special mode for running b4-send-email
+ # noinspection PyTypeChecker
+ parser = argparse.ArgumentParser(
+ prog='b4-send-email',
+ description='A drop-in wrapper for git-send-email to attest patches before sending',
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+ )
+ parser.add_argument('-d', '--debug', action='store_true', default=False,
+ help='Add more debugging info to the output')
+ parser.add_argument('-q', '--quiet', action='store_true', default=False,
+ help='Output critical information only')
+ parser.add_argument('-i', dest='_compat', action='store_true', default=True,
+ help='Sendmail compatibility thingamabob')
+ parser.add_argument('--identity', default='b4',
+ help='The sendemail identity to use for real smtp/sendmail settings')
+ parser.add_argument('-n', '--no-send', dest='nosend', action='store_true', default=False,
+ help='Do not send, just output what would be sent')
+ parser.add_argument('recipients', nargs='+', help='Message recipients')
+ _cmdargs = parser.parse_args()
+ logger.setLevel(logging.DEBUG)
+
+ ch = logging.StreamHandler()
+ formatter = logging.Formatter('b4: %(message)s')
+ ch.setFormatter(formatter)
+
+ if _cmdargs.quiet:
+ ch.setLevel(logging.CRITICAL)
+ elif _cmdargs.debug:
+ ch.setLevel(logging.DEBUG)
else:
- logger.critical('All patches passed attestation:')
- for attrailer in attrailers:
- logger.critical(' %s %s', attpass, attrailer)
+ ch.setLevel(logging.INFO)
- sys.exit(ecode)
+ logger.addHandler(ch)
+ attest_and_send(_cmdargs)
diff --git a/b4/command.py b/b4/command.py
index ffa58ef..d3624e9 100644
--- a/b4/command.py
+++ b/b4/command.py
@@ -42,7 +42,7 @@ def cmd_am(cmdargs):
def cmd_attest(cmdargs):
import b4.attest
- b4.attest.create_attestation(cmdargs)
+ b4.attest.attest_patches(cmdargs)
def cmd_verify(cmdargs):
@@ -116,14 +116,13 @@ def cmd():
sp_am.set_defaults(func=cmd_am)
# b4 attest
- sp_att = subparsers.add_parser('attest', help='Submit cryptographic attestation for patches')
- # GDPR-proofing: by default, we add as little PII-sensitive info as possible
- sp_att.add_argument('-f', '--from', dest='sender', default='devnull@kernel.org',
- help='Use a custom From field')
+ sp_att = subparsers.add_parser('attest', help='Create cryptographic attestation for a set of patches')
+ sp_att.add_argument('-f', '--from', dest='sender', default=None,
+ help='OBSOLETE: this option does nothing and will be removed')
sp_att.add_argument('-n', '--no-submit', dest='nosubmit', action='store_true', default=False,
- help='Do not submit attestation, just save the message ready to send')
- sp_att.add_argument('-o', '--output', default='xxxx-attestation-letter.patch',
- help='Save attestation message in this file if not submitting it')
+ help='OBSOLETE: this option does nothing and will be removed')
+ sp_att.add_argument('-o', '--output', default=None,
+ help='OBSOLETE: this option does nothing and will be removed')
sp_att.add_argument('patchfile', nargs='+', help='Patches to attest')
sp_att.set_defaults(func=cmd_attest)