summaryrefslogtreecommitdiff
path: root/b4/attest.py
diff options
context:
space:
mode:
Diffstat (limited to 'b4/attest.py')
-rw-r--r--b4/attest.py154
1 files changed, 16 insertions, 138 deletions
diff --git a/b4/attest.py b/b4/attest.py
index 2b9ccab..a6acb28 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -5,150 +5,28 @@
#
import sys
-import email
-import email.utils
-import email.message
-import email.header
import b4
import argparse
-import base64
+try:
+ import patatt
+ can_patatt = True
+except ModuleNotFoundError:
+ can_patatt = False
-logger = b4.logger
-
-
-def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None:
- if lmsg.msg.get(b4.HDR_PATCH_HASHES):
- if not replace:
- logger.info(' attest: message already attested')
- return
- del lmsg.msg[b4.HDR_PATCH_HASHES]
- del lmsg.msg[b4.HDR_PATCH_SIG]
-
- logger.info(' attest: generating attestation hashes')
- if not lmsg.attestation:
- raise RuntimeError('Could not calculate patch attestation')
-
- headers = list()
- hparts = [
- 'v=1',
- 'h=sha256',
- f'i={lmsg.attestation.ib}',
- f'm={lmsg.attestation.mb}',
- f'p={lmsg.attestation.pb}',
- ]
- if lmsg.git_patch_id:
- hparts.append(f'g={lmsg.git_patch_id}')
-
- hhname, hhval = b4.dkim_canonicalize_header(b4.HDR_PATCH_HASHES, '; '.join(hparts))
- headers.append(f'{hhname}:{hhval}')
-
- logger.debug('Signing with mode=%s', mode)
- if mode == 'pgp':
- usercfg = b4.get_user_config()
- keyid = usercfg.get('signingkey')
- identity = usercfg.get('email')
- if not identity:
- raise RuntimeError('Please set user.email to use this feature')
- if not keyid:
- raise RuntimeError('Please set user.signingKey to use this feature')
-
- logger.debug('Using i=%s, s=0x%s', identity, keyid.rstrip('!'))
- gpgargs = ['-b', '-u', f'{keyid}']
+from collections import namedtuple
- hparts = [
- 'm=pgp',
- f'i={identity}',
- 's=0x%s' % keyid.rstrip('!'),
- 'b=',
- ]
-
- shname, shval = b4.dkim_canonicalize_header(b4.HDR_PATCH_SIG, '; '.join(hparts))
- headers.append(f'{shname}:{shval}')
- payload = '\r\n'.join(headers).encode()
- ecode, out, err = b4.gpg_run_command(gpgargs, payload)
- if ecode > 0:
- logger.critical('Running gpg failed')
- logger.critical(err.decode())
- raise RuntimeError('Running gpg failed')
- bdata = base64.b64encode(out).decode()
- shval += header_splitter(bdata)
- else:
- raise NotImplementedError('Mode %s not implemented' % mode)
-
- hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78)
- shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
- lmsg.msg[b4.HDR_PATCH_HASHES] = hhdr
- lmsg.msg[b4.HDR_PATCH_SIG] = shdr
-
-
-def header_splitter(longstr: str, limit: int = 77) -> str:
- splitstr = list()
- first = True
- while len(longstr) > limit:
- at = limit
- if first:
- first = False
- at -= 2
- splitstr.append(longstr[:at])
- longstr = longstr[at:]
- splitstr.append(longstr)
- return ' '.join(splitstr)
+logger = b4.logger
def attest_patches(cmdargs: argparse.Namespace) -> None:
- for pf in cmdargs.patchfile:
- with open(pf, 'rb') as fh:
- msg = email.message_from_bytes(fh.read())
- lmsg = b4.LoreMessage(msg)
- lmsg.load_hashes()
- if not lmsg.attestation:
- logger.debug('Nothing to attest in %s, skipped')
- continue
- logger.info('Attesting: %s', pf)
- in_header_attest(lmsg, replace=True)
- with open(pf, 'wb') as fh:
- fh.write(lmsg.msg.as_bytes())
-
-
-def mutt_filter() -> None:
- if sys.stdin.isatty():
- logger.error('Error: Mutt mode expects a message on stdin')
+ if not can_patatt:
+ logger.critical('ERROR: b4 now uses patatt for patch attestation. See:')
+ logger.critical(' https://git.kernel.org/pub/scm/utils/patatt/patatt.git/about/')
sys.exit(1)
- inb = sys.stdin.buffer.read()
- # Quick exit if we don't find x-patch-sig
- if inb.find(b'X-Patch-Sig:') < 0:
- sys.stdout.buffer.write(inb)
- return
- msg = email.message_from_bytes(inb)
- try:
- if msg.get('x-patch-sig'):
- lmsg = b4.LoreMessage(msg)
- lmsg.load_hashes()
- latt = lmsg.attestation
- if latt:
- if latt.validate(msg):
- trailer = latt.lsig.attestor.get_trailer(lmsg.fromemail)
- msg.add_header('Attested-By', trailer)
- elif latt.lsig:
- if not latt.lsig.errors:
- failed = list()
- if not latt.pv:
- failed.append('patch content')
- if not latt.mv:
- failed.append('commit message')
- if not latt.iv:
- failed.append('patch metadata')
- latt.lsig.errors.add('signature failed (%s)' % ', '.join(failed))
- msg.add_header('Attestation-Failed', ', '.join(latt.lsig.errors))
- # Delete the x-patch-hashes and x-patch-sig headers so
- # they don't boggle up the view
- for i in reversed(range(len(msg._headers))): # noqa
- hdrName = msg._headers[i][0].lower() # noqa
- if hdrName in ('x-patch-hashes', 'x-patch-sig'):
- del msg._headers[i] # noqa
- except: # noqa
- # Don't prevent email from being displayed even if we died horribly
- sys.stdout.buffer.write(inb)
- return
- sys.stdout.buffer.write(msg.as_bytes(policy=b4.emlpolicy))
+ # directly invoke cmd_sign in patatt
+ config = patatt.get_config_from_git(r'patatt\..*', multivals=['keyringsrc'])
+ fakeargs = namedtuple('Struct', ['hookmode', 'msgfile'])
+ fakeargs.hookmode = True
+ fakeargs.msgfile = cmdargs.patchfile
+ patatt.cmd_sign(fakeargs, config)