#!/usr/bin/env python3 # noinspection PyUnresolvedReferences import falcon import os import sys import logging import logging.handlers import json import sqlalchemy as sa import patatt import smtplib import email import email.header import email.policy import re import ezpi import copy from configparser import ConfigParser, ExtendedInterpolation from string import Template from email import utils from typing import Tuple, Union from email import charset charset.add_charset('utf-8', None) emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None) DB_VERSION = 1 logger = logging.getLogger('b4-send-receive') logger.setLevel(logging.DEBUG) # noinspection PyBroadException, PyMethodMayBeStatic class SendReceiveListener(object): def __init__(self, _engine, _config) -> None: self._engine = _engine self._config = _config # You shouldn't use this in production if self._engine.driver == 'pysqlite': self._init_sa_db() logfile = _config['main'].get('logfile') loglevel = _config['main'].get('loglevel', 'info') if logfile: self._init_logger(logfile, loglevel) def _init_logger(self, logfile: str, loglevel: str) -> None: global logger lch = logging.handlers.WatchedFileHandler(os.path.expanduser(logfile)) lfmt = logging.Formatter('[%(process)d] %(asctime)s - %(levelname)s - %(message)s') lch.setFormatter(lfmt) if loglevel == 'critical': lch.setLevel(logging.CRITICAL) elif loglevel == 'debug': lch.setLevel(logging.DEBUG) else: lch.setLevel(logging.INFO) logger.addHandler(lch) def _init_sa_db(self) -> None: logger.info('Setting up SQLite database') conn = self._engine.connect() md = sa.MetaData() meta = sa.Table('meta', md, sa.Column('version', sa.Integer()) ) auth = sa.Table('auth', md, sa.Column('auth_id', sa.Integer(), primary_key=True), sa.Column('created', sa.DateTime(), nullable=False, server_default=sa.sql.func.now()), sa.Column('identity', sa.Text(), nullable=False), sa.Column('selector', sa.Text(), nullable=False), sa.Column('pubkey', sa.Text(), nullable=False), sa.Column('challenge', sa.Text(), nullable=True), sa.Column('verified', sa.Integer(), nullable=False), ) sa.Index('idx_identity_selector', auth.c.identity, auth.c.selector, unique=True) md.create_all(self._engine) q = sa.insert(meta).values(version=DB_VERSION) conn.execute(q) conn.close() def on_get(self, req, resp): # noqa resp.status = falcon.HTTP_200 resp.content_type = falcon.MEDIA_TEXT resp.text = "We don't serve GETs here\n" def send_error(self, resp, message: str) -> None: resp.status = falcon.HTTP_500 logger.critical('Returning error: %s', message) resp.text = json.dumps({'result': 'error', 'message': message}) def send_success(self, resp, message: str) -> None: resp.status = falcon.HTTP_200 logger.debug('Returning success: %s', message) resp.text = json.dumps({'result': 'success', 'message': message}) def get_smtp(self) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL, None], Tuple[str, str]]: sconfig = self._config['sendemail'] server = sconfig.get('smtpserver', 'localhost') port = sconfig.get('smtpserverport', 0) encryption = sconfig.get('smtpencryption') logger.debug('Connecting to %s:%s', server, port) # We only authenticate if we have encryption if encryption: if encryption in ('tls', 'starttls'): # We do startssl smtp = smtplib.SMTP(server, port) # Introduce ourselves smtp.ehlo() # Start encryption smtp.starttls() # Introduce ourselves again to get new criteria smtp.ehlo() elif encryption in ('ssl', 'smtps'): # We do TLS from the get-go smtp = smtplib.SMTP_SSL(server, port) else: raise smtplib.SMTPException('Unclear what to do with smtpencryption=%s' % encryption) # If we got to this point, we should do authentication. auser = sconfig.get('smtpuser') apass = sconfig.get('smtppass') if auser and apass: # Let any exceptions bubble up smtp.login(auser, apass) else: # We assume you know what you're doing if you don't need encryption smtp = smtplib.SMTP(server, port) frompair = utils.getaddresses([sconfig.get('from')])[0] return smtp, frompair def auth_new(self, jdata, resp) -> None: # Is it already authorized? conn = self._engine.connect() md = sa.MetaData() identity = jdata.get('identity') selector = jdata.get('selector') logger.info('New authentication request for %s/%s', identity, selector) pubkey = jdata.get('pubkey') t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine) q = sa.select([t_auth.c.auth_id]).where(t_auth.c.identity == identity, t_auth.c.selector == selector, t_auth.c.verified == 1) rp = conn.execute(q) if len(rp.fetchall()): self.send_error(resp, message='i=%s;s=%s is already authorized' % (identity, selector)) return # delete any existing challenges for this and create a new one q = sa.delete(t_auth).where(t_auth.c.identity == identity, t_auth.c.selector == selector, t_auth.c.verified == 0) conn.execute(q) # create new challenge import uuid cstr = str(uuid.uuid4()) q = sa.insert(t_auth).values(identity=identity, selector=selector, pubkey=pubkey, challenge=cstr, verified=0) conn.execute(q) logger.info('Created new challenge for %s/%s: %s', identity, selector, cstr) conn.close() smtp, frompair = self.get_smtp() cmsg = email.message.EmailMessage() fromname, fromaddr = frompair if len(fromname): cmsg.add_header('From', f'{fromname} <{fromaddr}>') else: cmsg.add_header('From', fromaddr) tpt_subject = self._config['templates']['verify-subject'].strip() tpt_body = self._config['templates']['verify-body'].strip() signature = self._config['templates']['signature'].strip() subject = Template(tpt_subject).safe_substitute({'identity': jdata.get('identity')}) cmsg.add_header('Subject', subject) name = jdata.get('name', 'Anonymous Llama') cmsg.add_header('To', f'{name} <{identity}>') cmsg.add_header('Message-Id', utils.make_msgid('b4-verify')) vals = { 'name': name, 'myurl': self._config['main'].get('myurl'), 'challenge': cstr, } body = Template(tpt_body).safe_substitute(vals) body += '\n-- \n' body += Template(signature).safe_substitute(vals) body += '\n' cmsg.set_payload(body, charset='utf-8') bdata = cmsg.as_bytes(policy=emlpolicy) destaddrs = [identity] alwaysbcc = self._config['main'].get('alwayscc') if alwaysbcc: destaddrs += [x[1] for x in utils.getaddresses(alwaysbcc)] logger.info('Sending challenge to %s', identity) smtp.sendmail(fromaddr, [identity], bdata) smtp.close() self.send_success(resp, message=f'Challenge generated and sent to {identity}') def validate_message(self, conn, t_auth, bdata, verified=1) -> Tuple[str, str, int]: # Returns auth_id of the matching record pm = patatt.PatattMessage(bdata) if not pm.signed: raise patatt.ValidationError('Message is not signed') auth_id = identity = selector = pubkey = None for ds in pm.get_sigs(): selector = 'default' identity = '' i = ds.get_field('i') if i: identity = i.decode() s = ds.get_field('s') if s: selector = s.decode() logger.debug('i=%s; s=%s', identity, selector) q = sa.select([t_auth.c.auth_id, t_auth.c.pubkey]).where(t_auth.c.identity == identity, t_auth.c.selector == selector, t_auth.c.verified == verified) rp = conn.execute(q) res = rp.fetchall() if res: auth_id, pubkey = res[0] break if not auth_id: logger.debug('Did not find a matching identity!') raise patatt.NoKeyError('No match for this identity') logger.debug('Found matching %s/%s with auth_id=%s', identity, selector, auth_id) pm.validate(identity, pubkey.encode()) return identity, selector, auth_id def auth_verify(self, jdata, resp) -> None: msg = jdata.get('msg') if msg.find('\nverify:') < 0: self.send_error(resp, message='Invalid verification message') return conn = self._engine.connect() md = sa.MetaData() t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine) bdata = msg.encode() try: identity, selector, auth_id = self.validate_message(conn, t_auth, bdata, verified=0) except Exception as ex: self.send_error(resp, message='Signature validation failed: %s' % ex) return logger.debug('Message validation passed for %s/%s with auth_id=%s', identity, selector, auth_id) # Now compare the challenge to what we received q = sa.select([t_auth.c.challenge]).where(t_auth.c.auth_id == auth_id) rp = conn.execute(q) res = rp.fetchall() challenge = res[0][0] if msg.find(f'\nverify:{challenge}') < 0: self.send_error(resp, message='Challenge verification for %s/%s did not match' % (identity, selector)) return logger.info('Successfully verified challenge for %s/%s with auth_id=%s', identity, selector, auth_id) q = sa.update(t_auth).where(t_auth.c.auth_id == auth_id).values(challenge=None, verified=1) conn.execute(q) conn.close() self.send_success(resp, message='Challenge verified for %s/%s' % (identity, selector)) def auth_delete(self, jdata, resp) -> None: msg = jdata.get('msg') if msg.find('\nauth-delete') < 0: self.send_error(resp, message='Invalid key delete message') return conn = self._engine.connect() md = sa.MetaData() t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine) bdata = msg.encode() try: identity, selector, auth_id = self.validate_message(conn, t_auth, bdata) except Exception as ex: self.send_error(resp, message='Signature validation failed: %s' % ex) return logger.info('Deleting record for %s/%s with auth_id=%s', identity, selector, auth_id) q = sa.delete(t_auth).where(t_auth.c.auth_id == auth_id) conn.execute(q) conn.close() self.send_success(resp, message='Record deleted for %s/%s' % (identity, selector)) def clean_header(self, hdrval: str) -> str: if hdrval is None: return '' decoded = '' for hstr, hcs in email.header.decode_header(hdrval): if hcs is None: hcs = 'utf-8' try: decoded += hstr.decode(hcs, errors='replace') except LookupError: # Try as utf-u decoded += hstr.decode('utf-8', errors='replace') except (UnicodeDecodeError, AttributeError): decoded += hstr new_hdrval = re.sub(r'\n?\s+', ' ', decoded) return new_hdrval.strip() def receive(self, jdata, resp) -> None: servicename = self._config['main'].get('myname') if not servicename: servicename = 'Web Endpoint' umsgs = jdata.get('messages') if not umsgs: self.send_error(resp, message='Missing the messages array') return logger.debug('Received a request for %s messages', len(umsgs)) diffre = re.compile(r'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)', flags=re.M | re.I) diffstatre = re.compile(r'^\s*\d+ file.*\d+ (insertion|deletion)', flags=re.M | re.I) msgs = list() conn = self._engine.connect() md = sa.MetaData() t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine) mustdest = self._config['main'].get('mustdest') # First, validate all messages seenid = identity = selector = None for umsg in umsgs: try: identity, selector, auth_id = self.validate_message(conn, t_auth, umsg.encode()) except patatt.NoKeyError as ex: # noqa self.send_error(resp, message='No matching record found, maybe you need to auth-verify first?') return except Exception as ex: self.send_error(resp, message='Signature validation failed: %s' % ex) return # Make sure only a single auth_id is used within a receive session if seenid is None: seenid = auth_id elif seenid != auth_id: self.send_error(resp, message='We only support a single signing identity across patch series.') return msg = email.message_from_string(umsg) logger.debug('Checking sanity on message: %s', msg.get('Subject')) # Some quick sanity checking: # - Subject has to start with [PATCH # - Content-type may ONLY be text/plain # - Has to include a diff or a diffstat passes = True if not msg.get('Subject', '').startswith('[PATCH '): passes = False if passes: cte = msg.get_content_type() if cte.lower() != 'text/plain': passes = False if passes: payload = msg.get_payload() if not (diffre.search(payload) or diffstatre.search(payload)): passes = False if not passes: self.send_error(resp, message='This service only accepts patches') return # Make sure that From, Date, Subject, and Message-Id headers exist if not msg.get('From') or not msg.get('Date') or not msg.get('Subject') or not msg.get('Message-Id'): self.send_error(resp, message='Message is missing some required headers.') return # Check that To/Cc have a mailing list we recognize alldests = utils.getaddresses([str(x) for x in msg.get_all('to', [])]) alldests += utils.getaddresses([str(x) for x in msg.get_all('cc', [])]) destaddrs = {x[1] for x in alldests} if mustdest: matched = False for destaddr in destaddrs: if re.search(mustdest, destaddr, flags=re.I): matched = True break if not matched: self.send_error(resp, message='Destinations must include a mailing list we recognize.') return msg.add_header('X-Endpoint-Received', f'by {servicename} for {identity}/{selector} with auth_id={auth_id}') msgs.append((msg, destaddrs)) conn.close() # All signatures verified. Prepare messages for sending. cfgdomains = self._config['main'].get('mydomains') if cfgdomains is not None: mydomains = [x.strip() for x in cfgdomains.split(',')] else: mydomains = list() smtp, frompair = self.get_smtp() bccaddrs = set() _bcc = self._config['main'].get('alwaysbcc') if _bcc: bccaddrs.update([x[1] for x in utils.getaddresses([_bcc])]) repo = listid = None if 'public-inbox' in self._config and self._config['public-inbox'].get('repo'): repo = self._config['public-inbox'].get('repo') listid = self._config['public-inbox'].get('listid') if not os.path.isdir(repo): repo = None logger.info('Sending %s messages for %s/%s', len(msgs), identity, selector) for msg, destaddrs in msgs: subject = self.clean_header(msg.get('Subject')) if repo: pmsg = copy.deepcopy(msg) if pmsg.get('List-Id'): pmsg.replace_header('List-Id', listid) else: pmsg.add_header('List-Id', listid) ezpi.add_rfc822(repo, pmsg) logger.debug('Wrote %s to public-inbox at %s', subject, repo) origfrom = self.clean_header(msg.get('From')) origpair = utils.getaddresses([origfrom])[0] origaddr = origpair[1] # Does it match one of our domains mydomain = False for _domain in mydomains: if origaddr.endswith(f'@{_domain}'): mydomain = True break if mydomain: logger.debug('%s matches mydomain, no substitution required', origaddr) fromaddr = origaddr else: logger.debug('%s does not match mydomain, substitution required', origaddr) fromaddr = frompair[1] # We can't just send this as-is due to DMARC policies. Therefore, we set # Reply-To and X-Original-From. origname = origpair[0] if not origname: origname = origpair[1] msg.replace_header('From', f'{origname} via {servicename} <{fromaddr}>') if msg.get('X-Original-From'): msg.replace_header('X-Original-From', origfrom) else: msg.add_header('X-Original-From', origfrom) if msg.get('Reply-To'): msg.replace_header('Reply-To', f'<{origpair[1]}>') else: msg.add_header('Reply-To', f'<{origpair[1]}>') body = msg.get_payload() # Parse it as a message and see if we get a From: header cmsg = email.message_from_string(body) if cmsg.get('From') is None: cmsg.add_header('From', origfrom) msg.set_payload(cmsg.as_string(policy=emlpolicy, maxheaderlen=0), charset='utf-8') if bccaddrs: destaddrs.update(bccaddrs) bdata = msg.as_string(policy=emlpolicy).encode() if not self._config['main'].getboolean('dryrun'): smtp.sendmail(fromaddr, list(destaddrs), bdata) logger.info('Sent: %s', subject) else: logger.info('---DRYRUN MSG START---') logger.info(msg) logger.info('---DRYRUN MSG END---') smtp.close() if repo: # run it once after writing all messages logger.debug('Running public-inbox repo hook (if present)') ezpi.run_hook(repo) logger.info('Sent %s messages for %s/%s', len(msgs), identity, selector) self.send_success(resp, message=f'Sent {len(msgs)} messages for {identity}/{selector}') def on_post(self, req, resp): if not req.content_length: resp.status = falcon.HTTP_500 resp.content_type = falcon.MEDIA_TEXT resp.text = 'Payload required\n' return raw = req.bounded_stream.read() try: jdata = json.loads(raw) except: resp.status = falcon.HTTP_500 resp.content_type = falcon.MEDIA_TEXT resp.text = 'Failed to parse the request\n' return action = jdata.get('action') if not action: logger.critical('Action not set from %s', req.remote_addr) logger.info('Action: %s; from: %s', action, req.remote_addr) if action == 'auth-new': self.auth_new(jdata, resp) return if action == 'auth-verify': self.auth_verify(jdata, resp) return if action == 'auth-delete': self.auth_delete(jdata, resp) return if action == 'receive': self.receive(jdata, resp) return resp.status = falcon.HTTP_500 resp.content_type = falcon.MEDIA_TEXT resp.text = 'Unknown action: %s\n' % action parser = ConfigParser(interpolation=ExtendedInterpolation()) cfgfile = os.getenv('CONFIG') if not cfgfile or not os.path.exists(cfgfile): sys.stderr.write('CONFIG env var is not set or is not valid') sys.exit(1) parser.read(cfgfile) gpgbin = parser['main'].get('gpgbin') if gpgbin: patatt.GPGBIN = gpgbin dburl = parser['main'].get('dburl') engine = sa.create_engine(dburl) srl = SendReceiveListener(engine, parser) app = falcon.App() mp = os.getenv('MOUNTPOINT', '/_b4_submit') app.add_route(mp, srl) if __name__ == '__main__': from wsgiref.simple_server import make_server logger.setLevel(logging.DEBUG) ch = logging.StreamHandler() formatter = logging.Formatter('%(message)s') ch.setFormatter(formatter) ch.setLevel(logging.DEBUG) logger.addHandler(ch) with make_server('', 8000, app) as httpd: logger.info('Serving on port 8000...') # Serve until process is killed httpd.serve_forever()