aboutsummaryrefslogtreecommitdiff
path: root/b4/attest.py
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-10-02 21:03:25 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-10-02 21:03:25 -0400
commit0d759ee9113b371a678c4acafffafa79570e9511 (patch)
tree5f2b074ea2c81208802c1db1efef29c0a1c438b1 /b4/attest.py
parentca6d35e7728c17b505e6be62ec3b6687aa5bf26b (diff)
downloadb4-0d759ee9113b371a678c4acafffafa79570e9511.tar.gz
Reimplement attestation for in-header hashes
Rewrite attestation to implement in-header hashing and signing. For now, just implementing mode=pgp, but other modes are coming next. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
Diffstat (limited to 'b4/attest.py')
-rw-r--r--b4/attest.py344
1 files changed, 186 insertions, 158 deletions
diff --git a/b4/attest.py b/b4/attest.py
index 5f9a2b4..4391b19 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -5,178 +5,206 @@
#
import sys
-import os
-import re
import email
import email.utils
import email.message
+import email.header
import smtplib
-import mailbox
import b4
+import argparse
+import base64
+import logging
logger = b4.logger
-def create_attestation(cmdargs):
- attlines = list()
- subject = 'Patch attestation'
- for patchfile in cmdargs.patchfile:
- with open(patchfile, 'r', encoding='utf-8') as fh:
- content = fh.read()
- if content.find('From') != 0:
- logger.info('SKIP | %s', os.path.basename(patchfile))
- continue
- msg = email.message_from_string(content)
- lmsg = b4.LoreMessage(msg)
- lmsg.load_hashes()
- att = lmsg.attestation
- if att is None:
- logger.info('SKIP | %s', os.path.basename(patchfile))
- # See if it's a cover letter
- if lmsg.counters_inferred or lmsg.counter > 0:
- # No
- continue
- newprefs = list()
- for prefix in lmsg.lsubject.prefixes:
- if prefix.lower() == 'patch':
- newprefs.append('PSIGN')
- elif prefix == '%s/%s' % (lmsg.counter, lmsg.expected):
- newprefs.append('X/%s' % lmsg.expected)
- else:
- newprefs.append(prefix)
- subject = '[%s] %s' % (' '.join(newprefs), lmsg.subject)
- continue
- logger.info('HASH | %s', os.path.basename(patchfile))
- attlines.append('%s:' % att.attid)
- attlines.append(' i: %s' % att.i)
- attlines.append(' m: %s' % att.m)
- attlines.append(' p: %s' % att.p)
-
- payload = '\n'.join(attlines)
-
- usercfg = b4.get_user_config()
- gpgargs = list()
- if 'signingkey' in usercfg:
- gpgargs += ['-u', usercfg['signingkey']]
- gpgargs += ['--clearsign',
- '--comment', 'att-fmt-ver: %s' % b4.ATTESTATION_FORMAT_VER,
- '--comment', 'att-hash: sha256',
- ]
-
- ecode, signed = b4.gpg_run_command(gpgargs, stdin=payload.encode('utf-8'))
- if ecode > 0:
- config = b4.get_main_config()
- logger.critical('ERROR: Unable to sign using %s', config['gpgbin'])
+def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None:
+ if lmsg.msg.get(lmsg.attestation.hashes_header_name):
+ if not replace:
+ logger.info(' attest: message already attested')
+ return
+ del lmsg.msg[lmsg.attestation.hashes_header_name]
+ del lmsg.msg[lmsg.attestation.sig_header_name]
+
+ logger.info(' attest: generating attestation hashes')
+ if not lmsg.attestation:
+ raise RuntimeError('Could not calculate patch attestation')
+
+ headers = list()
+ hparts = [
+ 'v=1',
+ 'h=sha256',
+ f'g={lmsg.git_patch_id}',
+ f'i={lmsg.attestation.ib}',
+ f'm={lmsg.attestation.mb}',
+ f'p={lmsg.attestation.pb}',
+ ]
+ hhname, hhval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.hashes_header_name, '; '.join(hparts))
+ headers.append(f'{hhname}:{hhval}')
+
+ logger.debug('Signing with mode=%s', mode)
+ if mode == 'pgp':
+ usercfg = b4.get_user_config()
+ keyid = usercfg.get('signingkey')
+ if not keyid:
+ raise RuntimeError('Please set user.signingKey to use this feature')
+
+ logger.debug('Using i=%s, s=0x%s', lmsg.fromemail, keyid.rstrip('!'))
+ gpgargs = ['-b', '-u', f'{keyid}']
+
+ hparts = [
+ 'm=pgp',
+ f'i={lmsg.fromemail}',
+ 's=0x%s' % keyid.rstrip('!'),
+ 'b=',
+ ]
+
+ shname, shval = b4.LoreAttestation.dkim_canonicalize_header(lmsg.attestation.sig_header_name, '; '.join(hparts))
+ headers.append(f'{shname}:{shval}')
+ payload = '\r\n'.join(headers).encode()
+ ecode, out, err = b4.gpg_run_command(gpgargs, payload)
+ if ecode > 0:
+ logger.critical('Running gpg failed')
+ logger.critical(err.decode())
+ raise RuntimeError('Running gpg failed')
+ bdata = base64.b64encode(out).decode()
+ shval += header_splitter(bdata)
+ else:
+ raise NotImplementedError('Mode %s not implemented' % mode)
+
+ hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78)
+ shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
+ lmsg.msg[lmsg.attestation.hashes_header_name] = hhdr
+ lmsg.msg[lmsg.attestation.sig_header_name] = shdr
+
+
+def header_splitter(longstr: str, limit: int = 77) -> str:
+ splitstr = list()
+ first = True
+ while len(longstr) > limit:
+ at = limit
+ if first:
+ first = False
+ at -= 2
+ splitstr.append(longstr[:at])
+ longstr = longstr[at:]
+ splitstr.append(longstr)
+ return ' '.join(splitstr)
+
+
+def attest_and_send(cmdargs: argparse.Namespace):
+ # Grab the message from stdin as bytes
+ if sys.stdin.isatty():
+ logger.critical('Pass the message to attest as stdin')
sys.exit(1)
- att_msg = email.message.EmailMessage()
- att_msg.set_payload(signed.encode('utf-8'))
- sender = cmdargs.sender
- if '>' not in sender:
- sender = '<%s>' % sender
- att_msg['From'] = sender
- att_msg['To'] = '<signatures@kernel.org>'
- att_msg['Message-Id'] = email.utils.make_msgid(domain='kernel.org')
- att_msg['Subject'] = subject
-
- logger.info('---')
- if not cmdargs.nosubmit:
- # Try to deliver it via mail.kernel.org
- try:
- mailserver = smtplib.SMTP('mail.kernel.org', 587)
- # identify ourselves to smtp gmail client
- mailserver.ehlo()
- # secure our email with tls encryption
- mailserver.starttls()
- # re-identify ourselves as an encrypted connection
- mailserver.ehlo()
- logger.info('Delivering via mail.kernel.org')
- mailserver.sendmail('devnull@kernel.org', 'signatures@kernel.org', att_msg.as_string())
- mailserver.quit()
- sys.exit(0)
- except Exception as ex:
- logger.info('Could not deliver: %s', ex)
-
- # Future iterations will also be able to submit this to a RESTful URL
- # at git.kernel.org, in order not to depend on avaialbility of SMTP gateways
- with open(cmdargs.output, 'wb') as fh:
- fh.write(att_msg.as_bytes())
-
- logger.info('Wrote %s', cmdargs.output)
- logger.info('You can send it using:')
- logger.info(' sendmail -oi signatures@kernel.org < %s', cmdargs.output)
- logger.info(' mutt -H %s', cmdargs.output)
-
-
-def verify_attestation(cmdargs):
- config = b4.get_main_config()
- if cmdargs.tofu:
- config['attestation-trust-model'] = 'tofu'
-
- exact_from_match = True
- if cmdargs.ignorefrom:
- exact_from_match = False
-
- mbx = mailbox.mbox(cmdargs.mbox[0])
- if cmdargs.attfile:
- b4.LoreAttestationDocument.load_from_file(cmdargs.attfile)
- eligible = list()
- for msg in mbx:
- lmsg = b4.LoreMessage(msg)
- if lmsg.has_diff:
- eligible.append(lmsg)
- continue
- # See if body has "att-fmt-ver
- if re.search(r'^Comment: att-fmt-ver:', lmsg.body, re.I | re.M):
- logger.debug('Found attestation message')
- b4.LoreAttestationDocument.load_from_string(lmsg.msgid, lmsg.body)
+ inbytes = sys.stdin.buffer.read()
+ msg = email.message_from_bytes(inbytes)
+ lmsg = b4.LoreMessage(msg)
+ lmsg.load_hashes()
+ if not lmsg.attestation:
+ logger.debug('Nothing to attest in %s, sending as-is')
+ outbytes = inbytes
+ else:
+ in_header_attest(lmsg)
+ outbytes = lmsg.msg.as_bytes()
- logger.debug('SKIP | %s', msg['Subject'])
+ if cmdargs.nosend:
+ logger.info('--- MESSAGE FOLLOWS ---')
+ sys.stdout.buffer.write(outbytes)
+ return
- if not len(eligible):
- logger.error('No patches found in %s', cmdargs.mbox[0])
- sys.exit(1)
+ if cmdargs.identity:
+ cfgname = f'sendemail\\.{cmdargs.identity}\\..*'
+ else:
+ cfgname = 'sendemail\\..*'
- logger.info('---')
- attrailers = set()
- ecode = 1
- if config['attestation-checkmarks'] == 'fancy':
- attpass = b4.PASS_FANCY
- attfail = b4.FAIL_FANCY
+ scfg = b4.get_config_from_git(cfgname)
+ sserver = scfg.get('smtpserver')
+ if not sserver:
+ logger.critical('MISSING: smtpserver option in %s', cfgname)
+ sys.exit(1)
+ if sserver[0] == '/':
+ args = [sserver, '-i'] + cmdargs.recipients
+ extraopts = scfg.get('smtpserveroption')
+ if extraopts:
+ args += extraopts.split()
+ ecode, out, err = b4._run_command(args, outbytes) # noqa
+ sys.stdout.buffer.write(out)
+ sys.stderr.buffer.write(err)
+ sys.exit(ecode)
+
+ sport = int(scfg.get('smtpserverport', '0'))
+ sdomain = scfg.get('smtpdomain')
+ suser = scfg.get('smtpuser')
+ spass = scfg.get('smtppass')
+ senc = scfg.get('smtpencryption', 'tls')
+ sfrom = scfg.get('from')
+ if not sfrom:
+ sfrom = lmsg.fromemail
+
+ logger.info('Connecting to %s', sserver)
+ if senc == 'ssl':
+ sconn = smtplib.SMTP_SSL(host=sserver, port=sport, local_hostname=sdomain)
else:
- attpass = b4.PASS_SIMPLE
- attfail = b4.FAIL_SIMPLE
-
- for lmsg in eligible:
- attdoc = lmsg.get_attestation(lore_lookup=True, exact_from_match=exact_from_match)
- if not attdoc:
- logger.critical('%s %s', attfail, lmsg.full_subject)
- if not cmdargs.nofast:
- logger.critical('Aborting due to failure.')
- ecode = 1
- break
- else:
- ecode = 128
- continue
- if ecode != 128:
- ecode = 0
- logger.critical('%s %s', attpass, lmsg.full_subject)
- attrailers.add(attdoc.lsig.attestor.get_trailer(lmsg.fromemail))
-
- logger.critical('---')
- if ecode > 0:
- logger.critical('Attestation verification failed.')
- errors = set()
- for attdoc in b4.ATTESTATIONS:
- errors.update(attdoc.errors)
- if len(errors):
- logger.critical('---')
- logger.critical('The validation process reported the following errors:')
- for error in errors:
- logger.critical(' %s %s', attfail, error)
+ sconn = smtplib.SMTP(host=sserver, port=sport, local_hostname=sdomain)
+ if senc == 'tls':
+ sconn.starttls()
+ if suser:
+ logger.info('Logging in as user %s', suser)
+ sconn.login(suser, spass)
+
+ logger.info('Sending %s', lmsg.full_subject)
+ sconn.sendmail(sfrom, cmdargs.recipients, outbytes)
+
+
+def attest_patches(cmdargs: argparse.Namespace) -> None:
+ for pf in cmdargs.patchfile:
+ with open(pf, 'rb') as fh:
+ msg = email.message_from_bytes(fh.read())
+ lmsg = b4.LoreMessage(msg)
+ lmsg.load_hashes()
+ if not lmsg.attestation:
+ logger.debug('Nothing to attest in %s, skipped')
+ continue
+ logger.info('Attesting: %s', pf)
+ in_header_attest(lmsg, replace=True)
+ with open(pf, 'wb') as fh:
+ fh.write(lmsg.msg.as_bytes())
+
+
+if __name__ == '__main__':
+ # Special mode for running b4-send-email
+ # noinspection PyTypeChecker
+ parser = argparse.ArgumentParser(
+ prog='b4-send-email',
+ description='A drop-in wrapper for git-send-email to attest patches before sending',
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+ )
+ parser.add_argument('-d', '--debug', action='store_true', default=False,
+ help='Add more debugging info to the output')
+ parser.add_argument('-q', '--quiet', action='store_true', default=False,
+ help='Output critical information only')
+ parser.add_argument('-i', dest='_compat', action='store_true', default=True,
+ help='Sendmail compatibility thingamabob')
+ parser.add_argument('--identity', default='b4',
+ help='The sendemail identity to use for real smtp/sendmail settings')
+ parser.add_argument('-n', '--no-send', dest='nosend', action='store_true', default=False,
+ help='Do not send, just output what would be sent')
+ parser.add_argument('recipients', nargs='+', help='Message recipients')
+ _cmdargs = parser.parse_args()
+ logger.setLevel(logging.DEBUG)
+
+ ch = logging.StreamHandler()
+ formatter = logging.Formatter('b4: %(message)s')
+ ch.setFormatter(formatter)
+
+ if _cmdargs.quiet:
+ ch.setLevel(logging.CRITICAL)
+ elif _cmdargs.debug:
+ ch.setLevel(logging.DEBUG)
else:
- logger.critical('All patches passed attestation:')
- for attrailer in attrailers:
- logger.critical(' %s %s', attpass, attrailer)
+ ch.setLevel(logging.INFO)
- sys.exit(ecode)
+ logger.addHandler(ch)
+ attest_and_send(_cmdargs)