aboutsummaryrefslogtreecommitdiff
path: root/b4/attest.py
diff options
context:
space:
mode:
Diffstat (limited to 'b4/attest.py')
-rw-r--r--b4/attest.py128
1 files changed, 25 insertions, 103 deletions
diff --git a/b4/attest.py b/b4/attest.py
index 1b464d0..5e3d384 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -9,11 +9,9 @@ import email
import email.utils
import email.message
import email.header
-import smtplib
import b4
import argparse
import base64
-import logging
logger = b4.logger
@@ -92,72 +90,6 @@ def header_splitter(longstr: str, limit: int = 77) -> str:
return ' '.join(splitstr)
-def attest_and_send(cmdargs: argparse.Namespace):
- # Grab the message from stdin as bytes
- if sys.stdin.isatty():
- logger.critical('Pass the message to attest as stdin')
- sys.exit(1)
-
- inbytes = sys.stdin.buffer.read()
- msg = email.message_from_bytes(inbytes)
- lmsg = b4.LoreMessage(msg)
- lmsg.load_hashes()
- if not lmsg.attestation:
- logger.debug('Nothing to attest in %s, sending as-is')
- outbytes = inbytes
- else:
- in_header_attest(lmsg)
- outbytes = lmsg.msg.as_bytes()
-
- if cmdargs.nosend:
- logger.info('--- MESSAGE FOLLOWS ---')
- sys.stdout.buffer.write(outbytes)
- return
-
- if cmdargs.identity:
- cfgname = f'sendemail\\.{cmdargs.identity}\\..*'
- else:
- cfgname = 'sendemail\\..*'
-
- scfg = b4.get_config_from_git(cfgname)
- sserver = scfg.get('smtpserver')
- if not sserver:
- logger.critical('MISSING: smtpserver option in %s', cfgname)
- sys.exit(1)
- if sserver[0] == '/':
- args = [sserver, '-i'] + cmdargs.recipients
- extraopts = scfg.get('smtpserveroption')
- if extraopts:
- args += extraopts.split()
- ecode, out, err = b4._run_command(args, outbytes) # noqa
- sys.stdout.buffer.write(out)
- sys.stderr.buffer.write(err)
- sys.exit(ecode)
-
- sport = int(scfg.get('smtpserverport', '0'))
- sdomain = scfg.get('smtpdomain')
- suser = scfg.get('smtpuser')
- spass = scfg.get('smtppass')
- senc = scfg.get('smtpencryption', 'tls')
- sfrom = scfg.get('from')
- if not sfrom:
- sfrom = lmsg.fromemail
-
- logger.info('Connecting to %s', sserver)
- if senc == 'ssl':
- sconn = smtplib.SMTP_SSL(host=sserver, port=sport, local_hostname=sdomain)
- else:
- sconn = smtplib.SMTP(host=sserver, port=sport, local_hostname=sdomain)
- if senc == 'tls':
- sconn.starttls()
- if suser:
- logger.info('Logging in as user %s', suser)
- sconn.login(suser, spass)
-
- logger.info('Sending %s', lmsg.full_subject)
- sconn.sendmail(sfrom, cmdargs.recipients, outbytes)
-
-
def attest_patches(cmdargs: argparse.Namespace) -> None:
for pf in cmdargs.patchfile:
with open(pf, 'rb') as fh:
@@ -173,38 +105,28 @@ def attest_patches(cmdargs: argparse.Namespace) -> None:
fh.write(lmsg.msg.as_bytes())
-if __name__ == '__main__':
- # Special mode for running b4-send-email
- # noinspection PyTypeChecker
- parser = argparse.ArgumentParser(
- prog='b4-send-email',
- description='A drop-in wrapper for git-send-email to attest patches before sending',
- formatter_class=argparse.ArgumentDefaultsHelpFormatter,
- )
- parser.add_argument('-d', '--debug', action='store_true', default=False,
- help='Add more debugging info to the output')
- parser.add_argument('-q', '--quiet', action='store_true', default=False,
- help='Output critical information only')
- parser.add_argument('-i', dest='_compat', action='store_true', default=True,
- help='Sendmail compatibility thingamabob')
- parser.add_argument('--identity', default='b4',
- help='The sendemail identity to use for real smtp/sendmail settings')
- parser.add_argument('-n', '--no-send', dest='nosend', action='store_true', default=False,
- help='Do not send, just output what would be sent')
- parser.add_argument('recipients', nargs='+', help='Message recipients')
- _cmdargs = parser.parse_args()
- logger.setLevel(logging.DEBUG)
-
- ch = logging.StreamHandler()
- formatter = logging.Formatter('b4: %(message)s')
- ch.setFormatter(formatter)
-
- if _cmdargs.quiet:
- ch.setLevel(logging.CRITICAL)
- elif _cmdargs.debug:
- ch.setLevel(logging.DEBUG)
- else:
- ch.setLevel(logging.INFO)
-
- logger.addHandler(ch)
- attest_and_send(_cmdargs)
+def mutt_filter() -> None:
+ if sys.stdin.isatty():
+ logger.error('Error: Mutt mode expects a message on stdin')
+ sys.exit(1)
+ inb = sys.stdin.buffer.read()
+ try:
+ msg = email.message_from_bytes(inb)
+ if msg.get('x-patch-sig'):
+ lmsg = b4.LoreMessage(msg)
+ lmsg.load_hashes()
+ latt = lmsg.attestation
+ if latt and latt.validate(msg):
+ trailer = latt.lsig.attestor.get_trailer(lmsg.fromemail)
+ msg.add_header('Attested-By', trailer)
+ # Delete the x-patch-hashes and x-patch-sig headers so
+ # they don't boggle up the view
+ for i in reversed(range(len(msg._headers))): # noqa
+ hdrName = msg._headers[i][0].lower() # noqa
+ if hdrName in ('x-patch-hashes', 'x-patch-sig'):
+ del msg._headers[i] # noqa
+ except: # noqa
+ # Don't prevent email from being displayed even if we died horribly
+ sys.stdout.buffer.write(inb)
+ return
+ sys.stdout.buffer.write(msg.as_bytes(policy=b4.emlpolicy))