path: root/misc/send-receive.py
diff options
Diffstat (limited to 'misc/send-receive.py')
1 files changed, 545 insertions, 0 deletions
diff --git a/misc/send-receive.py b/misc/send-receive.py
new file mode 100644
index 0000000..c15e12a
--- /dev/null
+++ b/misc/send-receive.py
@@ -0,0 +1,545 @@
+#!/usr/bin/env python3
+# noinspection PyUnresolvedReferences
+import falcon
+import os
+import sys
+import logging
+import logging.handlers
+import json
+import sqlalchemy as sa
+import patatt
+import smtplib
+import email
+import email.header
+import email.policy
+import re
+import ezpi
+import copy
+from configparser import ConfigParser, ExtendedInterpolation
+from string import Template
+from email import utils
+from typing import Tuple, Union
+from email import charset
+charset.add_charset('utf-8', None)
+emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None)
+logger = logging.getLogger('b4-send-receive')
+# noinspection PyBroadException, PyMethodMayBeStatic
+class SendReceiveListener(object):
+ def __init__(self, _engine, _config) -> None:
+ self._engine = _engine
+ self._config = _config
+ # You shouldn't use this in production
+ if self._engine.driver == 'pysqlite':
+ self._init_sa_db()
+ logfile = _config['main'].get('logfile')
+ loglevel = _config['main'].get('loglevel', 'info')
+ if logfile:
+ self._init_logger(logfile, loglevel)
+ def _init_logger(self, logfile: str, loglevel: str) -> None:
+ global logger
+ lch = logging.handlers.WatchedFileHandler(os.path.expanduser(logfile))
+ lfmt = logging.Formatter('[%(process)d] %(asctime)s - %(levelname)s - %(message)s')
+ lch.setFormatter(lfmt)
+ if loglevel == 'critical':
+ lch.setLevel(logging.CRITICAL)
+ elif loglevel == 'debug':
+ lch.setLevel(logging.DEBUG)
+ else:
+ lch.setLevel(logging.INFO)
+ logger.addHandler(lch)
+ def _init_sa_db(self) -> None:
+ logger.info('Setting up SQLite database')
+ conn = self._engine.connect()
+ md = sa.MetaData()
+ meta = sa.Table('meta', md,
+ sa.Column('version', sa.Integer())
+ )
+ auth = sa.Table('auth', md,
+ sa.Column('auth_id', sa.Integer(), primary_key=True),
+ sa.Column('created', sa.DateTime(), nullable=False, server_default=sa.sql.func.now()),
+ sa.Column('identity', sa.Text(), nullable=False),
+ sa.Column('selector', sa.Text(), nullable=False),
+ sa.Column('pubkey', sa.Text(), nullable=False),
+ sa.Column('challenge', sa.Text(), nullable=True),
+ sa.Column('verified', sa.Integer(), nullable=False),
+ )
+ sa.Index('idx_identity_selector', auth.c.identity, auth.c.selector, unique=True)
+ md.create_all(self._engine)
+ q = sa.insert(meta).values(version=DB_VERSION)
+ conn.execute(q)
+ conn.close()
+ def on_get(self, req, resp): # noqa
+ resp.status = falcon.HTTP_200
+ resp.content_type = falcon.MEDIA_TEXT
+ resp.text = "We don't serve GETs here\n"
+ def send_error(self, resp, message: str) -> None:
+ resp.status = falcon.HTTP_500
+ logger.critical('Returning error: %s', message)
+ resp.text = json.dumps({'result': 'error', 'message': message})
+ def send_success(self, resp, message: str) -> None:
+ resp.status = falcon.HTTP_200
+ logger.debug('Returning success: %s', message)
+ resp.text = json.dumps({'result': 'success', 'message': message})
+ def get_smtp(self) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL, None], Tuple[str, str]]:
+ sconfig = self._config['sendemail']
+ server = sconfig.get('smtpserver', 'localhost')
+ port = sconfig.get('smtpserverport', 0)
+ encryption = sconfig.get('smtpencryption')
+ logger.debug('Connecting to %s:%s', server, port)
+ # We only authenticate if we have encryption
+ if encryption:
+ if encryption in ('tls', 'starttls'):
+ # We do startssl
+ smtp = smtplib.SMTP(server, port)
+ # Introduce ourselves
+ smtp.ehlo()
+ # Start encryption
+ smtp.starttls()
+ # Introduce ourselves again to get new criteria
+ smtp.ehlo()
+ elif encryption in ('ssl', 'smtps'):
+ # We do TLS from the get-go
+ smtp = smtplib.SMTP_SSL(server, port)
+ else:
+ raise smtplib.SMTPException('Unclear what to do with smtpencryption=%s' % encryption)
+ # If we got to this point, we should do authentication.
+ auser = sconfig.get('smtpuser')
+ apass = sconfig.get('smtppass')
+ if auser and apass:
+ # Let any exceptions bubble up
+ smtp.login(auser, apass)
+ else:
+ # We assume you know what you're doing if you don't need encryption
+ smtp = smtplib.SMTP(server, port)
+ frompair = utils.getaddresses([sconfig.get('from')])[0]
+ return smtp, frompair
+ def auth_new(self, jdata, resp) -> None:
+ # Is it already authorized?
+ conn = self._engine.connect()
+ md = sa.MetaData()
+ identity = jdata.get('identity')
+ selector = jdata.get('selector')
+ logger.info('New authentication request for %s/%s', identity, selector)
+ pubkey = jdata.get('pubkey')
+ t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine)
+ q = sa.select([t_auth.c.auth_id]).where(t_auth.c.identity == identity, t_auth.c.selector == selector,
+ t_auth.c.verified == 1)
+ rp = conn.execute(q)
+ if len(rp.fetchall()):
+ self.send_error(resp, message='i=%s;s=%s is already authorized' % (identity, selector))
+ return
+ # delete any existing challenges for this and create a new one
+ q = sa.delete(t_auth).where(t_auth.c.identity == identity, t_auth.c.selector == selector,
+ t_auth.c.verified == 0)
+ conn.execute(q)
+ # create new challenge
+ import uuid
+ cstr = str(uuid.uuid4())
+ q = sa.insert(t_auth).values(identity=identity, selector=selector, pubkey=pubkey, challenge=cstr,
+ verified=0)
+ conn.execute(q)
+ logger.info('Created new challenge for %s/%s: %s', identity, selector, cstr)
+ conn.close()
+ smtp, frompair = self.get_smtp()
+ cmsg = email.message.EmailMessage()
+ fromname, fromaddr = frompair
+ if len(fromname):
+ cmsg.add_header('From', f'{fromname} <{fromaddr}>')
+ else:
+ cmsg.add_header('From', fromaddr)
+ tpt_subject = self._config['templates']['verify-subject'].strip()
+ tpt_body = self._config['templates']['verify-body'].strip()
+ signature = self._config['templates']['signature'].strip()
+ subject = Template(tpt_subject).safe_substitute({'identity': jdata.get('identity')})
+ cmsg.add_header('Subject', subject)
+ name = jdata.get('name', 'Anonymous Llama')
+ cmsg.add_header('To', f'{name} <{identity}>')
+ cmsg.add_header('Message-Id', utils.make_msgid('b4-verify'))
+ vals = {
+ 'name': name,
+ 'myurl': self._config['main'].get('myurl'),
+ 'challenge': cstr,
+ }
+ body = Template(tpt_body).safe_substitute(vals)
+ body += '\n-- \n'
+ body += Template(signature).safe_substitute(vals)
+ body += '\n'
+ cmsg.set_payload(body, charset='utf-8')
+ bdata = cmsg.as_bytes(policy=emlpolicy)
+ destaddrs = [identity]
+ alwaysbcc = self._config['main'].get('alwayscc')
+ if alwaysbcc:
+ destaddrs += [x[1] for x in utils.getaddresses(alwaysbcc)]
+ logger.info('Sending challenge to %s', identity)
+ smtp.sendmail(fromaddr, [identity], bdata)
+ smtp.close()
+ self.send_success(resp, message=f'Challenge generated and sent to {identity}')
+ def validate_message(self, conn, t_auth, bdata, verified=1) -> Tuple[str, str, int]:
+ # Returns auth_id of the matching record
+ pm = patatt.PatattMessage(bdata)
+ if not pm.signed:
+ raise patatt.ValidationError('Message is not signed')
+ auth_id = identity = selector = pubkey = None
+ for ds in pm.get_sigs():
+ selector = 'default'
+ identity = ''
+ i = ds.get_field('i')
+ if i:
+ identity = i.decode()
+ s = ds.get_field('s')
+ if s:
+ selector = s.decode()
+ logger.debug('i=%s; s=%s', identity, selector)
+ q = sa.select([t_auth.c.auth_id, t_auth.c.pubkey]).where(t_auth.c.identity == identity,
+ t_auth.c.selector == selector,
+ t_auth.c.verified == verified)
+ rp = conn.execute(q)
+ res = rp.fetchall()
+ if res:
+ auth_id, pubkey = res[0]
+ break
+ if not auth_id:
+ logger.debug('Did not find a matching identity!')
+ raise patatt.NoKeyError('No match for this identity')
+ logger.debug('Found matching %s/%s with auth_id=%s', identity, selector, auth_id)
+ pm.validate(identity, pubkey.encode())
+ return identity, selector, auth_id
+ def auth_verify(self, jdata, resp) -> None:
+ msg = jdata.get('msg')
+ if msg.find('\nverify:') < 0:
+ self.send_error(resp, message='Invalid verification message')
+ return
+ conn = self._engine.connect()
+ md = sa.MetaData()
+ t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine)
+ bdata = msg.encode()
+ try:
+ identity, selector, auth_id = self.validate_message(conn, t_auth, bdata, verified=0)
+ except Exception as ex:
+ self.send_error(resp, message='Signature validation failed: %s' % ex)
+ return
+ logger.debug('Message validation passed for %s/%s with auth_id=%s', identity, selector, auth_id)
+ # Now compare the challenge to what we received
+ q = sa.select([t_auth.c.challenge]).where(t_auth.c.auth_id == auth_id)
+ rp = conn.execute(q)
+ res = rp.fetchall()
+ challenge = res[0][0]
+ if msg.find(f'\nverify:{challenge}') < 0:
+ self.send_error(resp, message='Challenge verification for %s/%s did not match' % (identity, selector))
+ return
+ logger.info('Successfully verified challenge for %s/%s with auth_id=%s', identity, selector, auth_id)
+ q = sa.update(t_auth).where(t_auth.c.auth_id == auth_id).values(challenge=None, verified=1)
+ conn.execute(q)
+ conn.close()
+ self.send_success(resp, message='Challenge verified for %s/%s' % (identity, selector))
+ def auth_delete(self, jdata, resp) -> None:
+ msg = jdata.get('msg')
+ if msg.find('\nauth-delete') < 0:
+ self.send_error(resp, message='Invalid key delete message')
+ return
+ conn = self._engine.connect()
+ md = sa.MetaData()
+ t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine)
+ bdata = msg.encode()
+ try:
+ identity, selector, auth_id = self.validate_message(conn, t_auth, bdata)
+ except Exception as ex:
+ self.send_error(resp, message='Signature validation failed: %s' % ex)
+ return
+ logger.info('Deleting record for %s/%s with auth_id=%s', identity, selector, auth_id)
+ q = sa.delete(t_auth).where(t_auth.c.auth_id == auth_id)
+ conn.execute(q)
+ conn.close()
+ self.send_success(resp, message='Record deleted for %s/%s' % (identity, selector))
+ def clean_header(self, hdrval: str) -> str:
+ if hdrval is None:
+ return ''
+ decoded = ''
+ for hstr, hcs in email.header.decode_header(hdrval):
+ if hcs is None:
+ hcs = 'utf-8'
+ try:
+ decoded += hstr.decode(hcs, errors='replace')
+ except LookupError:
+ # Try as utf-u
+ decoded += hstr.decode('utf-8', errors='replace')
+ except (UnicodeDecodeError, AttributeError):
+ decoded += hstr
+ new_hdrval = re.sub(r'\n?\s+', ' ', decoded)
+ return new_hdrval.strip()
+ def receive(self, jdata, resp) -> None:
+ servicename = self._config['main'].get('myname')
+ if not servicename:
+ servicename = 'Web Endpoint'
+ umsgs = jdata.get('messages')
+ if not umsgs:
+ self.send_error(resp, message='Missing the messages array')
+ return
+ logger.debug('Received a request for %s messages', len(umsgs))
+ diffre = re.compile(r'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)', flags=re.M | re.I)
+ diffstatre = re.compile(r'^\s*\d+ file.*\d+ (insertion|deletion)', flags=re.M | re.I)
+ msgs = list()
+ conn = self._engine.connect()
+ md = sa.MetaData()
+ t_auth = sa.Table('auth', md, autoload=True, autoload_with=self._engine)
+ mustdest = self._config['main'].get('mustdest')
+ # First, validate all messages
+ seenid = identity = selector = None
+ for umsg in umsgs:
+ try:
+ identity, selector, auth_id = self.validate_message(conn, t_auth, umsg.encode())
+ except patatt.NoKeyError as ex: # noqa
+ self.send_error(resp, message='No matching record found, maybe you need to auth-verify first?')
+ return
+ except Exception as ex:
+ self.send_error(resp, message='Signature validation failed: %s' % ex)
+ return
+ # Make sure only a single auth_id is used within a receive session
+ if seenid is None:
+ seenid = auth_id
+ elif seenid != auth_id:
+ self.send_error(resp, message='We only support a single signing identity across patch series.')
+ return
+ msg = email.message_from_string(umsg)
+ logger.debug('Checking sanity on message: %s', msg.get('Subject'))
+ # Some quick sanity checking:
+ # - Subject has to start with [PATCH
+ # - Content-type may ONLY be text/plain
+ # - Has to include a diff or a diffstat
+ passes = True
+ if not msg.get('Subject', '').startswith('[PATCH '):
+ passes = False
+ if passes:
+ cte = msg.get_content_type()
+ if cte.lower() != 'text/plain':
+ passes = False
+ if passes:
+ payload = msg.get_payload()
+ if not (diffre.search(payload) or diffstatre.search(payload)):
+ passes = False
+ if not passes:
+ self.send_error(resp, message='This service only accepts patches')
+ return
+ # Make sure that From, Date, Subject, and Message-Id headers exist
+ if not msg.get('From') or not msg.get('Date') or not msg.get('Subject') or not msg.get('Message-Id'):
+ self.send_error(resp, message='Message is missing some required headers.')
+ return
+ # Check that To/Cc have a mailing list we recognize
+ alldests = utils.getaddresses([str(x) for x in msg.get_all('to', [])])
+ alldests += utils.getaddresses([str(x) for x in msg.get_all('cc', [])])
+ destaddrs = {x[1] for x in alldests}
+ if mustdest:
+ matched = False
+ for destaddr in destaddrs:
+ if re.search(mustdest, destaddr, flags=re.I):
+ matched = True
+ break
+ if not matched:
+ self.send_error(resp, message='Destinations must include a mailing list we recognize.')
+ return
+ msg.add_header('X-Endpoint-Received', f'by {servicename} for {identity}/{selector} with auth_id={auth_id}')
+ msgs.append((msg, destaddrs))
+ conn.close()
+ # All signatures verified. Prepare messages for sending.
+ cfgdomains = self._config['main'].get('mydomains')
+ if cfgdomains is not None:
+ mydomains = [x.strip() for x in cfgdomains.split(',')]
+ else:
+ mydomains = list()
+ smtp, frompair = self.get_smtp()
+ bccaddrs = set()
+ _bcc = self._config['main'].get('alwaysbcc')
+ if _bcc:
+ bccaddrs.update([x[1] for x in utils.getaddresses([_bcc])])
+ repo = listid = None
+ if 'public-inbox' in self._config and self._config['public-inbox'].get('repo'):
+ repo = self._config['public-inbox'].get('repo')
+ listid = self._config['public-inbox'].get('listid')
+ if not os.path.isdir(repo):
+ repo = None
+ logger.info('Sending %s messages for %s/%s', len(msgs), identity, selector)
+ for msg, destaddrs in msgs:
+ subject = self.clean_header(msg.get('Subject'))
+ if repo:
+ pmsg = copy.deepcopy(msg)
+ if pmsg.get('List-Id'):
+ pmsg.replace_header('List-Id', listid)
+ else:
+ pmsg.add_header('List-Id', listid)
+ ezpi.add_rfc822(repo, pmsg)
+ logger.debug('Wrote %s to public-inbox at %s', subject, repo)
+ origfrom = self.clean_header(msg.get('From'))
+ origpair = utils.getaddresses([origfrom])[0]
+ origaddr = origpair[1]
+ # Does it match one of our domains
+ mydomain = False
+ for _domain in mydomains:
+ if origaddr.endswith(f'@{_domain}'):
+ mydomain = True
+ break
+ if mydomain:
+ logger.debug('%s matches mydomain, no substitution required', origaddr)
+ fromaddr = origaddr
+ else:
+ logger.debug('%s does not match mydomain, substitution required', origaddr)
+ fromaddr = frompair[1]
+ # We can't just send this as-is due to DMARC policies. Therefore, we set
+ # Reply-To and X-Original-From.
+ origname = origpair[0]
+ if not origname:
+ origname = origpair[1]
+ msg.replace_header('From', f'{origname} via {servicename} <{fromaddr}>')
+ if msg.get('X-Original-From'):
+ msg.replace_header('X-Original-From', origfrom)
+ else:
+ msg.add_header('X-Original-From', origfrom)
+ if msg.get('Reply-To'):
+ msg.replace_header('Reply-To', f'<{origpair[1]}>')
+ else:
+ msg.add_header('Reply-To', f'<{origpair[1]}>')
+ body = msg.get_payload()
+ # Parse it as a message and see if we get a From: header
+ cmsg = email.message_from_string(body)
+ if cmsg.get('From') is None:
+ cmsg.add_header('From', origfrom)
+ msg.set_payload(cmsg.as_string(policy=emlpolicy, maxheaderlen=0), charset='utf-8')
+ if bccaddrs:
+ destaddrs.update(bccaddrs)
+ bdata = msg.as_string(policy=emlpolicy).encode()
+ if not self._config['main'].getboolean('dryrun'):
+ smtp.sendmail(fromaddr, list(destaddrs), bdata)
+ logger.info('Sent: %s', subject)
+ else:
+ logger.info('---DRYRUN MSG START---')
+ logger.info(msg)
+ logger.info('---DRYRUN MSG END---')
+ smtp.close()
+ if repo:
+ # run it once after writing all messages
+ logger.debug('Running public-inbox repo hook (if present)')
+ ezpi.run_hook(repo)
+ logger.info('Sent %s messages for %s/%s', len(msgs), identity, selector)
+ self.send_success(resp, message=f'Sent {len(msgs)} messages for {identity}/{selector}')
+ def on_post(self, req, resp):
+ if not req.content_length:
+ resp.status = falcon.HTTP_500
+ resp.content_type = falcon.MEDIA_TEXT
+ resp.text = 'Payload required\n'
+ return
+ raw = req.bounded_stream.read()
+ try:
+ jdata = json.loads(raw)
+ except:
+ resp.status = falcon.HTTP_500
+ resp.content_type = falcon.MEDIA_TEXT
+ resp.text = 'Failed to parse the request\n'
+ return
+ action = jdata.get('action')
+ if not action:
+ logger.critical('Action not set from %s', req.remote_addr)
+ logger.info('Action: %s; from: %s', action, req.remote_addr)
+ if action == 'auth-new':
+ self.auth_new(jdata, resp)
+ return
+ if action == 'auth-verify':
+ self.auth_verify(jdata, resp)
+ return
+ if action == 'auth-delete':
+ self.auth_delete(jdata, resp)
+ return
+ if action == 'receive':
+ self.receive(jdata, resp)
+ return
+ resp.status = falcon.HTTP_500
+ resp.content_type = falcon.MEDIA_TEXT
+ resp.text = 'Unknown action: %s\n' % action
+parser = ConfigParser(interpolation=ExtendedInterpolation())
+cfgfile = os.getenv('CONFIG')
+if not cfgfile or not os.path.exists(cfgfile):
+ sys.stderr.write('CONFIG env var is not set or is not valid')
+ sys.exit(1)
+gpgbin = parser['main'].get('gpgbin')
+if gpgbin:
+ patatt.GPGBIN = gpgbin
+dburl = parser['main'].get('dburl')
+# By default, recycle db connections after 5 min
+db_pool_recycle = parser['main'].getint('dbpoolrecycle', 300)
+engine = sa.create_engine(dburl, pool_recycle=db_pool_recycle)
+srl = SendReceiveListener(engine, parser)
+app = falcon.App()
+mp = os.getenv('MOUNTPOINT', '/_b4_submit')
+app.add_route(mp, srl)
+if __name__ == '__main__':
+ from wsgiref.simple_server import make_server
+ logger.setLevel(logging.DEBUG)
+ ch = logging.StreamHandler()
+ formatter = logging.Formatter('%(message)s')
+ ch.setFormatter(formatter)
+ ch.setLevel(logging.DEBUG)
+ logger.addHandler(ch)
+ with make_server('', 8000, app) as httpd:
+ logger.info('Serving on port 8000...')
+ # Serve until process is killed
+ httpd.serve_forever()