aboutsummaryrefslogtreecommitdiff
path: root/b4/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'b4/__init__.py')
-rw-r--r--b4/__init__.py1250
1 files changed, 1013 insertions, 237 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
index e0a03db..f25b518 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -14,8 +14,10 @@ import email.header
import email.generator
import tempfile
import pathlib
+import argparse
+import smtplib
+import shlex
-import requests
import urllib.parse
import datetime
import time
@@ -25,8 +27,11 @@ import mailbox
# noinspection PyCompatibility
import pwd
+import requests
+
+from pathlib import Path
from contextlib import contextmanager
-from typing import Optional, Tuple, Set, List, TextIO
+from typing import Optional, Tuple, Set, List, TextIO, Union, Sequence
from email import charset
charset.add_charset('utf-8', None)
@@ -44,7 +49,8 @@ try:
except ModuleNotFoundError:
can_patatt = False
-__VERSION__ = '0.8-dev'
+__VERSION__ = '0.10.0-dev'
+PW_REST_API_VERSION = '1.2'
def _dkim_log_filter(record):
@@ -84,25 +90,12 @@ AMHDRS = [
'List-Id',
]
-# You can use bash-style globbing here
-# end with '*' to include any other trailers
-# You can change the default in your ~/.gitconfig, e.g.:
-# [b4]
-# # remember to end with ,*
-# trailer-order=link*,fixes*,cc*,reported*,suggested*,original*,co-*,tested*,reviewed*,acked*,signed-off*,*
-# (another common)
-# trailer-order=fixes*,reported*,suggested*,original*,co-*,signed-off*,tested*,reviewed*,acked*,cc*,link*,*
-#
-# Or use _preserve_ (alias to *) to keep the order unchanged
-
-DEFAULT_TRAILER_ORDER = '*'
-
LOREADDR = 'https://lore.kernel.org'
DEFAULT_CONFIG = {
- 'midmask': LOREADDR + '/r/%s',
+ 'midmask': LOREADDR + '/all/%s',
'linkmask': LOREADDR + '/r/%s',
- 'trailer-order': DEFAULT_TRAILER_ORDER,
+ 'searchmask': LOREADDR + '/all/?x=m&t=1&q=%s',
'listid-preference': '*.feeds.kernel.org,*.linux.dev,*.kernel.org,*',
'save-maildirs': 'no',
# off: do not bother checking attestation
@@ -130,6 +123,8 @@ DEFAULT_CONFIG = {
# git-config for gpg.program, and if that's not set,
# we'll use "gpg" and hope for the better
'gpgbin': None,
+ # When sending mail, use this sendemail identity configuration
+ 'sendemail-identity': None,
}
# This is where we store actual config
@@ -141,6 +136,8 @@ USER_CONFIG = None
REQSESSION = None
# Indicates that we've cleaned cache already
_CACHE_CLEANED = False
+# Used to track mailmap replacements
+MAILMAP_INFO = dict()
class LoreMailbox:
@@ -171,7 +168,7 @@ class LoreMailbox:
return '\n'.join(out)
- def get_by_msgid(self, msgid):
+ def get_by_msgid(self, msgid: str) -> Optional['LoreMessage']:
if msgid in self.msgid_map:
return self.msgid_map[msgid]
return None
@@ -238,7 +235,7 @@ class LoreMailbox:
lser.subject = pser.subject
logger.debug('Reconstituted successfully')
- def get_series(self, revision=None, sloppytrailers=False, reroll=True):
+ def get_series(self, revision=None, sloppytrailers=False, reroll=True) -> Optional['LoreSeries']:
if revision is None:
if not len(self.series):
return None
@@ -307,14 +304,14 @@ class LoreMailbox:
continue
trailers, mismatches = fmsg.get_trailers(sloppy=sloppytrailers)
- for trailer in mismatches:
- lser.trailer_mismatches.add((trailer[0], trailer[1], fmsg.fromname, fmsg.fromemail))
+ for ltr in mismatches:
+ lser.trailer_mismatches.add((ltr.name, ltr.value, fmsg.fromname, fmsg.fromemail))
lvl = 1
while True:
logger.debug('%sParent: %s', ' ' * lvl, pmsg.full_subject)
logger.debug('%sTrailers:', ' ' * lvl)
- for trailer in trailers:
- logger.debug('%s%s: %s', ' ' * (lvl+1), trailer[0], trailer[1])
+ for ltr in trailers:
+ logger.debug('%s%s: %s', ' ' * (lvl+1), ltr.name, ltr.value)
if pmsg.has_diff and not pmsg.reply:
# We found the patch for these trailers
if pmsg.revision != revision:
@@ -333,8 +330,9 @@ class LoreMailbox:
break
if pmsg.in_reply_to and pmsg.in_reply_to in self.msgid_map:
lvl += 1
- for ptrailer in pmsg.trailers:
- trailers.append(tuple(ptrailer + [pmsg]))
+ for pltr in pmsg.trailers:
+ pltr.lmsg = pmsg
+ trailers.append(pltr)
pmsg = self.msgid_map[pmsg.in_reply_to]
continue
break
@@ -348,15 +346,17 @@ class LoreMailbox:
return lser
- def add_message(self, msg):
+ def add_message(self, msg: email.message.Message) -> None:
msgid = LoreMessage.get_clean_msgid(msg)
- if msgid in self.msgid_map:
+ if msgid and msgid in self.msgid_map:
logger.debug('Already have a message with this msgid, skipping %s', msgid)
return
lmsg = LoreMessage(msg)
logger.debug('Looking at: %s', lmsg.full_subject)
- self.msgid_map[lmsg.msgid] = lmsg
+
+ if msgid:
+ self.msgid_map[lmsg.msgid] = lmsg
if lmsg.reply:
# We'll figure out where this belongs later
@@ -393,7 +393,7 @@ class LoreMailbox:
logger.debug('Found new series v%s', lmsg.revision)
# Attempt to auto-number series from the same author who did not bother
- # to set v2, v3, etc in the patch revision
+ # to set v2, v3, etc. in the patch revision
if (lmsg.counter == 1 and lmsg.counters_inferred
and not lmsg.reply and lmsg.lsubject.patch and not lmsg.lsubject.resend):
omsg = self.series[lmsg.revision].patches[lmsg.counter]
@@ -411,18 +411,26 @@ class LoreMailbox:
class LoreSeries:
- def __init__(self, revision, expected):
+ revision: int
+ expected: int
+ patches: List[Optional['LoreMessage']]
+ followups: List['LoreMessage']
+ trailer_mismatches: Set[Tuple[str, str, str, str]]
+ complete: bool = False
+ has_cover: bool = False
+ partial_reroll: bool = False
+ subject: str
+ indexes: Optional[List[Tuple[str, str]]] = None
+ base_commit: Optional[str] = None
+ change_id: Optional[str] = None
+
+ def __init__(self, revision: int, expected: int) -> None:
self.revision = revision
self.expected = expected
self.patches = [None] * (expected+1)
self.followups = list()
self.trailer_mismatches = set()
- self.complete = False
- self.has_cover = False
- self.partial_reroll = False
self.subject = '(untitled)'
- # Used for base matching
- self._indexes = None
def __repr__(self):
out = list()
@@ -431,6 +439,8 @@ class LoreSeries:
out.append(' expected: %s' % self.expected)
out.append(' complete: %s' % self.complete)
out.append(' has_cover: %s' % self.has_cover)
+ out.append(' base_commit: %s' % self.base_commit)
+ out.append(' change_id: %s' % self.change_id)
out.append(' partial_reroll: %s' % self.partial_reroll)
out.append(' patches:')
at = 0
@@ -443,7 +453,7 @@ class LoreSeries:
return '\n'.join(out)
- def add_patch(self, lmsg):
+ def add_patch(self, lmsg: 'LoreMessage') -> None:
while len(self.patches) < lmsg.expected + 1:
self.patches.append(None)
self.expected = lmsg.expected
@@ -457,14 +467,23 @@ class LoreSeries:
else:
self.patches[lmsg.counter] = lmsg
self.complete = not (None in self.patches[1:])
+ if lmsg.counter == 0:
+ # This is a cover letter
+ if '\nbase-commit:' in lmsg.body:
+ matches = re.search(r'^base-commit: .*?([\da-f]+)', lmsg.body, flags=re.I | re.M)
+ if matches:
+ self.base_commit = matches.groups()[0]
+ if '\nchange-id:' in lmsg.body:
+ matches = re.search(r'^change-id:\s+(\S+)', lmsg.body, flags=re.I | re.M)
+ if matches:
+ self.change_id = matches.groups()[0]
+
if self.patches[0] is not None:
- # noinspection PyUnresolvedReferences
self.subject = self.patches[0].subject
elif self.patches[1] is not None:
- # noinspection PyUnresolvedReferences
self.subject = self.patches[1].subject
- def get_slug(self, extended=False):
+ def get_slug(self, extended: bool = False) -> str:
# Find the first non-None entry
lmsg = None
for lmsg in self.patches:
@@ -489,8 +508,18 @@ class LoreSeries:
return slug[:100]
- def get_am_ready(self, noaddtrailers=False, covertrailers=False, trailer_order=None, addmysob=False,
- addlink=False, linkmask=None, cherrypick=None, copyccs=False) -> list:
+ def add_extra_trailers(self, trailers: tuple) -> None:
+ for lmsg in self.patches[1:]:
+ if lmsg is None:
+ continue
+ lmsg.followup_trailers += trailers
+
+ def add_cover_trailers(self) -> None:
+ if self.patches[0] and self.patches[0].followup_trailers: # noqa
+ self.add_extra_trailers(self.patches[0].followup_trailers) # noqa
+
+ def get_am_ready(self, noaddtrailers=False, covertrailers=False, addmysob=False, addlink=False,
+ linkmask=None, cherrypick=None, copyccs=False, allowbadchars=False) -> List[email.message.Message]:
usercfg = get_user_config()
config = get_main_config()
@@ -531,6 +560,9 @@ class LoreSeries:
logger.debug('Attestation info is not the same')
break
+ if covertrailers:
+ self.add_cover_trailers()
+
at = 1
msgs = list()
logger.info('---')
@@ -545,13 +577,13 @@ class LoreSeries:
raise KeyError('Cherrypick not in series')
if lmsg is not None:
- if self.has_cover and covertrailers and self.patches[0].followup_trailers: # noqa
- lmsg.followup_trailers += self.patches[0].followup_trailers # noqa
- if addmysob:
- lmsg.followup_trailers.append(('Signed-off-by',
- '%s <%s>' % (usercfg['name'], usercfg['email']), None, None))
+ extras = list()
if addlink:
- lmsg.followup_trailers.append(('Link', linkmask % lmsg.msgid, None, None))
+ if linkmask is None:
+ linkmask = config.get('linkmask')
+ linkval = linkmask % lmsg.msgid
+ lltr = LoreTrailer(name='Link', value=linkval)
+ extras.append(lltr)
if attsame and not attcrit:
if attmark:
@@ -578,7 +610,8 @@ class LoreSeries:
add_trailers = True
if noaddtrailers:
add_trailers = False
- msg = lmsg.get_am_message(add_trailers=add_trailers, trailer_order=trailer_order, copyccs=copyccs)
+ msg = lmsg.get_am_message(add_trailers=add_trailers, extras=extras, copyccs=copyccs,
+ addmysob=addmysob, allowbadchars=allowbadchars)
msgs.append(msg)
else:
logger.error(' ERROR: missing [%s/%s]!', at, self.expected)
@@ -601,28 +634,31 @@ class LoreSeries:
return msgs
- def check_applies_clean(self, gitdir: str, at: Optional[str] = None) -> Tuple[int, list]:
- if self._indexes is None:
- self._indexes = list()
- seenfiles = set()
- for lmsg in self.patches[1:]:
- if lmsg is None or lmsg.blob_indexes is None:
+ def populate_indexes(self):
+ self.indexes = list()
+ seenfiles = set()
+ for lmsg in self.patches[1:]:
+ if lmsg is None or lmsg.blob_indexes is None:
+ continue
+ for fn, bh in lmsg.blob_indexes:
+ if fn in seenfiles:
+ # if we have seen this file once already, then it's a repeat patch
+ # it's no longer going to match current hash
continue
- for fn, bh in lmsg.blob_indexes:
- if fn in seenfiles:
- # if we have seen this file once already, then it's a repeat patch
- # and it's no longer going to match current hash
- continue
- seenfiles.add(fn)
- if set(bh) == {'0'}:
- # New file, will for sure apply clean
- continue
- self._indexes.append((fn, bh))
+ seenfiles.add(fn)
+ if set(bh) == {'0'}:
+ # New file, will for sure apply clean
+ continue
+ self.indexes.append((fn, bh))
+
+ def check_applies_clean(self, gitdir: str, at: Optional[str] = None) -> Tuple[int, list]:
+ if self.indexes is None:
+ self.populate_indexes()
mismatches = list()
if at is None:
at = 'HEAD'
- for fn, bh in self._indexes:
+ for fn, bh in self.indexes:
ecode, out = git_run_command(gitdir, ['ls-tree', at, fn])
if ecode == 0 and len(out):
chunks = out.split()
@@ -636,9 +672,9 @@ class LoreSeries:
logger.debug('Could not look up %s:%s', at, fn)
mismatches.append((fn, bh))
- return len(self._indexes), mismatches
+ return len(self.indexes), mismatches
- def find_base(self, gitdir: str, branches: Optional[str] = None, maxdays: int = 30) -> Tuple[str, len, len]:
+ def find_base(self, gitdir: str, branches: Optional[list] = None, maxdays: int = 30) -> Tuple[str, len, len]:
# Find the date of the first patch we have
pdate = datetime.datetime.now()
for lmsg in self.patches:
@@ -647,10 +683,10 @@ class LoreSeries:
pdate = lmsg.date
break
- # Find latest commit on that date
+ # Find the latest commit on that date
guntil = pdate.strftime('%Y-%m-%d')
if branches:
- where = ['--branches', branches]
+ where = branches
else:
where = ['--all']
@@ -677,7 +713,7 @@ class LoreSeries:
for line in lines:
commit = line.split()[0]
logger.debug('commit=%s', commit)
- # We try both that commit and the one preceding it, in case it was a delete
+ # We try both that commit and the one preceding it, in case it was a deletion
# Keep track of the fewest mismatches
for tc in [commit, f'{commit}~1']:
sc, sm = self.check_applies_clean(gitdir, tc)
@@ -695,13 +731,13 @@ class LoreSeries:
break
else:
best = commit
- if fewest == len(self._indexes):
+ if fewest == len(self.indexes):
# None of the blobs matched
raise IndexError
lines = git_get_command_lines(gitdir, ['describe', '--all', best])
if len(lines):
- return lines[0], len(self._indexes), fewest
+ return lines[0], len(self.indexes), fewest
raise IndexError
@@ -813,12 +849,107 @@ class LoreSeries:
def save_cover(self, outfile):
# noinspection PyUnresolvedReferences
- cover_msg = self.patches[0].get_am_message(add_trailers=False, trailer_order=None)
+ cover_msg = self.patches[0].get_am_message(add_trailers=False)
with open(outfile, 'w') as fh:
fh.write(cover_msg.as_string(policy=emlpolicy))
logger.critical('Cover: %s', outfile)
+class LoreTrailer:
+ type: str
+ name: str
+ lname: str
+ value: str
+ extinfo: Optional[str] = None
+ addr: Optional[Tuple[str, str]] = None
+ lmsg = None
+ # Small list of recognized utility trailers
+ _utility: Set[str] = {'fixes', 'link', 'buglink', 'obsoleted-by', 'message-id', 'change-id'}
+
+ def __init__(self, name: Optional[str] = None, value: Optional[str] = None, extinfo: Optional[str] = None,
+ msg: Optional[email.message.Message] = None):
+ if name is None:
+ self.name = 'Signed-off-by'
+ ucfg = get_user_config()
+ self.value = '%s <%s>' % (ucfg['name'], ucfg['email'])
+ self.type = 'person'
+ self.addr = (ucfg['name'], ucfg['email'])
+ else:
+ self.name = name
+ self.value = value
+ if name.lower() in self._utility:
+ self.type = 'utility'
+ elif re.search(r'\S+@\S+\.\S+', value):
+ self.type = 'person'
+ self.addr = email.utils.parseaddr(value)
+ else:
+ self.type = 'unknown'
+ self.lname = self.name.lower()
+ self.extinfo = extinfo
+ self.msg = msg
+
+ def as_string(self, omit_extinfo: bool = False) -> str:
+ ret = f'{self.name}: {self.value}'
+ if not self.extinfo or omit_extinfo:
+ return ret
+ # extinfo can be either be [on the next line], or # at the end
+ if self.extinfo.lstrip()[0] == '#':
+ ret += self.extinfo
+ else:
+ ret += f'\n{self.extinfo}'
+
+ return ret
+
+ def email_eq(self, cmp_email: str, fuzzy: bool = True) -> bool:
+ if not self.addr:
+ return False
+ our = self.addr[1].lower()
+ their = cmp_email.lower()
+ if our == their:
+ return True
+ if not fuzzy:
+ return False
+
+ if '@' not in our or '@' not in their:
+ return False
+
+ # Strip extended local parts often added by people, e.g.:
+ # comparing foo@example.com and foo+kernel@example.com should match
+ our = re.sub(r'\+[^@]+@', '@', our)
+ their = re.sub(r'\+[^@]+@', '@', their)
+ if our == their:
+ return True
+
+ # See if domain part of one of the addresses is a subset of the other one,
+ # which should match cases like foo@linux.intel.com and foo@intel.com
+ olocal, odomain = our.split('@', maxsplit=1)
+ tlocal, tdomain = their.split('@', maxsplit=1)
+ if olocal != tlocal:
+ return False
+
+ if (abs(odomain.count('.')-tdomain.count('.')) == 1
+ and (odomain.endswith(f'.{tdomain}') or tdomain.endswith(f'.{odomain}'))):
+ return True
+
+ return False
+
+ def __eq__(self, other):
+ # We never compare extinfo, we just tack it if we find a match
+ return self.lname == other.lname and self.value.lower() == other.value.lower()
+
+ def __hash__(self):
+ return hash(f'{self.lname}: {self.value}')
+
+ def __repr__(self):
+ out = list()
+ out.append(' type: %s' % self.type)
+ out.append(' name: %s' % self.name)
+ out.append(' value: %s' % self.value)
+ out.append(' extinfo: %s' % self.extinfo)
+
+ return '\n'.join(out)
+
+
class LoreMessage:
def __init__(self, msg):
self.msg = msg
@@ -844,6 +975,7 @@ class LoreMessage:
# Body and body-based info
self.body = None
+ self.message = None
self.charset = 'utf-8'
self.has_diff = False
self.has_diffstat = False
@@ -903,8 +1035,8 @@ class LoreMessage:
if self.date.tzinfo is None:
self.date = self.date.replace(tzinfo=datetime.timezone.utc)
- diffre = re.compile(r'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)', re.M | re.I)
- diffstatre = re.compile(r'^\s*\d+ file.*\d+ (insertion|deletion)', re.M | re.I)
+ diffre = re.compile(r'^(---.*\n\+\+\+|GIT binary patch|diff --git \w/\S+ \w/\S+)', flags=re.M | re.I)
+ diffstatre = re.compile(r'^\s*\d+ file.*\d+ (insertion|deletion)', flags=re.M | re.I)
# walk until we find the first text/plain part
mcharset = self.msg.get_content_charset()
@@ -957,41 +1089,33 @@ class LoreMessage:
trailers, others = LoreMessage.find_trailers(self.body, followup=True)
for trailer in trailers:
# These are commonly part of patch/commit metadata
- badtrailers = ('from', 'author', 'cc', 'to')
- if trailer[0].lower() not in badtrailers:
+ badtrailers = {'from', 'author', 'cc', 'to'}
+ if trailer.lname not in badtrailers:
self.trailers.append(trailer)
- def get_trailers(self, sloppy=False):
+ def get_trailers(self, sloppy: bool = False) -> Tuple[List[LoreTrailer], Set[LoreTrailer]]:
trailers = list()
mismatches = set()
- for tname, tvalue, extdata in self.trailers:
- if sloppy or tname.lower() in ('fixes', 'obsoleted-by'):
- trailers.append((tname, tvalue, extdata, self))
+ for ltr in self.trailers:
+ ltr.lmsg = self
+ if sloppy or ltr.type != 'person':
+ trailers.append(ltr)
+ continue
+
+ if ltr.email_eq(self.fromemail):
+ logger.debug(' trailer email match')
+ trailers.append(ltr)
continue
- tmatch = False
- namedata = email.utils.getaddresses([tvalue])[0]
- tfrom = re.sub(r'\+[^@]+@', '@', namedata[1].lower())
- hfrom = re.sub(r'\+[^@]+@', '@', self.fromemail.lower())
- tlname = namedata[0].lower()
- hlname = self.fromname.lower()
- tchunks = tfrom.split('@')
- hchunks = hfrom.split('@')
- if tfrom == hfrom:
- logger.debug(' trailer exact email match')
- tmatch = True
- # See if domain part of one of the addresses is a subset of the other one,
- # which should match cases like @linux.intel.com and @intel.com
- elif (len(tchunks) == 2 and len(hchunks) == 2
- and tchunks[0] == hchunks[0]
- and (tchunks[1].find(hchunks[1]) >= 0 or hchunks[1].find(tchunks[1]) >= 0)):
- logger.debug(' trailer fuzzy email match')
- tmatch = True
# Does the name match, at least?
- elif tlname == hlname:
+ nmatch = False
+ tlname = ltr.addr[0].lower()
+ hlname = self.fromname.lower()
+
+ if tlname == hlname:
logger.debug(' trailer exact name match')
- tmatch = True
+ nmatch = True
# Finally, see if the header From has a comma in it and try to find all
# parts in the trailer name
elif hlname.find(',') > 0:
@@ -1000,13 +1124,13 @@ class LoreMessage:
if hlname.find(nchunk.strip()) < 0:
nmatch = False
break
- if nmatch:
- logger.debug(' trailer fuzzy name match')
- tmatch = True
- if tmatch:
- trailers.append((tname, tvalue, extdata, self))
- else:
- mismatches.add((tname, tvalue, extdata, self))
+ if nmatch:
+ logger.debug(' trailer fuzzy name match')
+ trailers.append(ltr)
+ continue
+
+ logger.debug('trailer did not match: %s: %s', ltr.name, ltr.value)
+ mismatches.add(ltr)
return trailers, mismatches
@@ -1115,7 +1239,11 @@ class LoreMessage:
config = get_main_config()
sources = config.get('keyringsrc')
if not sources:
- sources = ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:']
+ # fallback to patatt's keyring if none is specified for b4
+ patatt_config = patatt.get_config_from_git(r'patatt\..*', multivals=['keyringsrc'])
+ sources = patatt_config.get('keyringsrc')
+ if not sources:
+ sources = ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:']
if pdir not in sources:
sources.append(pdir)
@@ -1193,9 +1321,27 @@ class LoreMessage:
if attpolicy == 'hardfail':
critical = True
else:
+ passing = False
if not checkmark:
checkmark = attestor.checkmark
if attestor.check_identity(self.fromemail):
+ passing = True
+ else:
+ # Do we have an x-original-from?
+ xofh = self.msg.get('X-Original-From')
+ if xofh:
+ logger.debug('Using X-Original-From for identity check')
+ xpair = email.utils.getaddresses([xofh])[0]
+ if attestor.check_identity(xpair[1]):
+ passing = True
+ # Fix our fromname and fromemail, mostly for thanks-tracking
+ self.fromname = xpair[0]
+ self.fromemail = xpair[1]
+ # Drop the reply-to header if it's exactly the same
+ for header in list(self.msg._headers): # noqa
+ if header[0].lower() == 'reply-to' and header[1].find(xpair[1]) > 0:
+ self.msg._headers.remove(header) # noqa
+ if passing:
trailers.append('%s Signed: %s' % (attestor.checkmark, attestor.trailer))
else:
trailers.append('%s Signed: %s (From: %s)' % (attestor.checkmark, attestor.trailer,
@@ -1306,6 +1452,14 @@ class LoreMessage:
return msg2
@staticmethod
+ def get_patch_id(diff: str) -> Optional[str]:
+ gitargs = ['patch-id', '--stable']
+ ecode, out = git_run_command(None, gitargs, stdin=diff.encode())
+ if ecode > 0 or not len(out.strip()):
+ return None
+ return out.split(maxsplit=1)[0]
+
+ @staticmethod
def get_patchwork_hash(diff: str) -> str:
"""Generate a hash from a diff. Lifted verbatim from patchwork."""
@@ -1359,13 +1513,13 @@ class LoreMessage:
if matches and matches.groups()[0] == matches.groups()[1]:
curfile = matches.groups()[0]
continue
- matches = re.search(r'^index\s+([0-9a-f]+)\.\.[0-9a-f]+.*$', line)
+ matches = re.search(r'^index\s+([\da-f]+)\.\.[\da-f]+.*$', line)
if matches and curfile is not None:
indexes.add((curfile, matches.groups()[0]))
return indexes
@staticmethod
- def find_trailers(body, followup=False):
+ def find_trailers(body: str, followup: bool = False) -> Tuple[List[LoreTrailer], List[str]]:
ignores = {'phone', 'email'}
headers = {'subject', 'date', 'from'}
nonperson = {'fixes', 'subject', 'date', 'link', 'buglink', 'obsoleted-by'}
@@ -1374,7 +1528,7 @@ class LoreMessage:
# Fix some more common copypasta trailer wrapping
# Fixes: abcd0123 (foo bar
# baz quux)
- body = re.sub(r'^(\S+:\s+[0-9a-f]+\s+\([^)]+)\n([^\n]+\))', r'\1 \2', body, flags=re.M)
+ body = re.sub(r'^(\S+:\s+[\da-f]+\s+\([^)]+)\n([^\n]+\))', r'\1 \2', body, flags=re.M)
# Signed-off-by: Long Name
# <email.here@example.com>
body = re.sub(r'^(\S+:\s+[^<]+)\n(<[^>]+>)$', r'\1 \2', body, flags=re.M)
@@ -1388,31 +1542,47 @@ class LoreMessage:
was_trailer = False
for line in body.split('\n'):
line = line.strip('\r')
- matches = re.search(r'^(\w\S+):\s+(\S.*)', line, flags=re.I)
+ matches = re.search(r'^\s*(\w\S+):\s+(\S.*)', line, flags=re.I)
if matches:
- groups = list(matches.groups())
+ oname, ovalue = list(matches.groups())
# We only accept headers if we haven't seen any non-trailer lines
- tname = groups[0].lower()
- if tname in ignores:
+ lname = oname.lower()
+ if lname in ignores:
logger.debug('Ignoring known non-trailer: %s', line)
continue
- if len(others) and tname in headers:
+ if len(others) and lname in headers:
logger.debug('Ignoring %s (header after other content)', line)
continue
if followup:
- mperson = re.search(r'\S+@\S+\.\S+', groups[1])
- if not mperson and tname not in nonperson:
+ if not lname.isascii():
+ logger.debug('Ignoring known non-ascii follow-up trailer: %s', lname)
+ continue
+ mperson = re.search(r'\S+@\S+\.\S+', ovalue)
+ if not mperson and lname not in nonperson:
logger.debug('Ignoring %s (not a recognized non-person trailer)', line)
continue
+ if re.search(r'https?://', ovalue):
+ logger.debug('Ignoring %s (not a recognized link trailer)', line)
+ continue
+
+ extinfo = None
+ mextinfo = re.search(r'(.*\S+)(\s+#[^#]+)$', ovalue)
+ if mextinfo:
+ logger.debug('Trailer contains hashtag extinfo: %s', line)
+ # Found extinfo of the hashtag genre
+ egr = mextinfo.groups()
+ ovalue = egr[0]
+ extinfo = egr[1]
+
was_trailer = True
- groups.append(None)
- trailers.append(groups)
+ ltrailer = LoreTrailer(name=oname, value=ovalue, extinfo=extinfo)
+ trailers.append(ltrailer)
continue
# Is it an extended info line, e.g.:
# Signed-off-by: Foo Foo <foo@foo.com>
# [for the foo bits]
- if len(line) > 2 and line[0] == '[' and line[-1] == ']' and was_trailer:
- trailers[-1][2] = line
+ if len(line) > 2 and was_trailer and re.search(r'^\s*\[[^]]+]\s*$', line):
+ trailers[-1].extinfo = line
was_trailer = False
continue
was_trailer = False
@@ -1421,7 +1591,7 @@ class LoreMessage:
return trailers, others
@staticmethod
- def get_body_parts(body):
+ def get_body_parts(body: str) -> Tuple[List[LoreTrailer], str, List[LoreTrailer], str, str]:
# remove any starting/trailing blank lines
body = body.replace('\r', '')
body = body.strip('\n')
@@ -1484,13 +1654,28 @@ class LoreMessage:
return githeaders, message, trailers, basement, signature
- def fix_trailers(self, trailer_order=None, copyccs=False):
+ def fix_trailers(self, extras: Optional[List[LoreTrailer]] = None,
+ copyccs: bool = False, addmysob: bool = False) -> None:
+
config = get_main_config()
- attpolicy = config['attestation-policy']
bheaders, message, btrailers, basement, signature = LoreMessage.get_body_parts(self.body)
- # Now we add mix-in trailers
- trailers = btrailers + self.followup_trailers
+
+ sobtr = LoreTrailer()
+ hasmysob = False
+ if sobtr in btrailers:
+ # Our own signoff always moves to the bottom of all trailers
+ hasmysob = True
+ btrailers.remove(sobtr)
+
+ new_trailers = self.followup_trailers
+ if extras:
+ new_trailers += extras
+
+ if sobtr in new_trailers:
+ # Our own signoff always moves to the bottom of all trailers
+ new_trailers.remove(sobtr)
+ addmysob = True
if copyccs:
alldests = email.utils.getaddresses([str(x) for x in self.msg.get_all('to', [])])
@@ -1499,74 +1684,98 @@ class LoreMessage:
alldests.sort(key=lambda x: x[1].find('@') > 0 and x[1].split('@')[1] + x[1].split('@')[0] or x[1])
for pair in alldests:
found = False
- for ftr in trailers:
- if ftr[1].lower().find(pair[1].lower()) >= 0:
+ for fltr in btrailers + new_trailers:
+ if fltr.email_eq(pair[1]):
# already present
found = True
break
if not found:
if len(pair[0]):
- trailers.append(('Cc', f'{pair[0]} <{pair[1]}>', None, None)) # noqa
+ altr = LoreTrailer(name='Cc', value=f'{pair[0]} <{pair[1]}>')
else:
- trailers.append(('Cc', pair[1], None, None)) # noqa
+ altr = LoreTrailer(name='Cc', value=pair[1])
+ new_trailers.append(altr)
+
+ torder = config.get('trailer-order')
+ if torder and torder != '*':
+ # this only applies to trailers within our chain of custody, so walk existing
+ # body trailers backwards and stop at the outermost Signed-off-by we find (if any)
+ for bltr in reversed(btrailers):
+ if bltr.lname == 'signed-off-by':
+ break
+ btrailers.remove(bltr)
+ new_trailers.insert(0, bltr)
- fixtrailers = list()
- if trailer_order is None:
- trailer_order = DEFAULT_TRAILER_ORDER
- elif trailer_order in ('preserve', '_preserve_'):
- trailer_order = '*'
+ ordered_trailers = list()
+ for glob in [x.strip().lower() for x in torder.split(',')]:
+ if not len(new_trailers):
+ break
+ for ltr in list(new_trailers):
+ if fnmatch.fnmatch(ltr.lname, glob):
+ ordered_trailers.append(ltr)
+ new_trailers.remove(ltr)
+ if len(new_trailers):
+ # Tack them to the bottom
+ ordered_trailers += new_trailers
+ new_trailers = ordered_trailers
- for trailermatch in trailer_order:
- for trailer in trailers:
- if list(trailer[:3]) in fixtrailers:
- # Dupe
- continue
- if fnmatch.fnmatch(trailer[0].lower(), trailermatch.strip()):
- fixtrailers.append(list(trailer[:3]))
- if trailer[:3] not in btrailers:
- extra = ''
- if trailer[3] is not None:
- fmsg = trailer[3]
- for attestor in fmsg.attestors: # noqa
- if attestor.passing:
- extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
- elif attpolicy in ('hardfail', 'softfail'):
- extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
- if attpolicy == 'hardfail':
- import sys
- logger.critical('---')
- logger.critical('Exiting due to attestation-policy: hardfail')
- sys.exit(1)
-
- logger.info(' + %s: %s%s', trailer[0], trailer[1], extra)
- else:
- logger.debug(' . %s: %s', trailer[0], trailer[1])
+ attpolicy = config['attestation-policy']
+ fixtrailers = btrailers
+
+ for ltr in new_trailers:
+ if ltr in fixtrailers:
+ continue
+
+ fixtrailers.append(ltr)
+ extra = ''
+ if ltr.lmsg is not None:
+ for attestor in ltr.lmsg.attestors:
+ if attestor.passing:
+ extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
+ elif attpolicy in ('hardfail', 'softfail'):
+ extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
+ if attpolicy == 'hardfail':
+ import sys
+ logger.critical('---')
+ logger.critical('Exiting due to attestation-policy: hardfail')
+ sys.exit(1)
+
+ logger.info(' + %s%s', ltr.as_string(omit_extinfo=True), extra)
+
+ if addmysob or hasmysob:
+ # Tack on our signoff at the bottom
+ fixtrailers.append(sobtr)
+ if not hasmysob:
+ logger.info(' + %s', sobtr.as_string(omit_extinfo=True))
# Reconstitute the message
self.body = ''
if bheaders:
- for bheader in bheaders:
+ for bltr in bheaders:
# There is no [extdata] in git headers, so we ignore bheader[2]
- self.body += '%s: %s\n' % (bheader[0], bheader[1])
+ self.body += bltr.as_string(omit_extinfo=True) + '\n'
self.body += '\n'
+ newmessage = ''
if len(message):
- self.body += message.rstrip('\r\n') + '\n'
+ newmessage += message.rstrip('\r\n') + '\n'
if len(fixtrailers):
- self.body += '\n'
+ newmessage += '\n'
if len(fixtrailers):
- for trailer in fixtrailers:
- self.body += '%s: %s\n' % (trailer[0], trailer[1])
- if trailer[2]:
- self.body += '%s\n' % trailer[2]
+ for ltr in fixtrailers:
+ newmessage += ltr.as_string() + '\n'
+
+ self.message = self.subject + '\n\n' + newmessage
+ self.body += newmessage
+
if len(basement):
self.body += '---\n'
- self.body += basement.rstrip('\r\n') + '\n\n'
+ self.body += basement.rstrip('\r\n') + '\n'
if len(signature):
self.body += '-- \n'
- self.body += signature.rstrip('\r\n') + '\n\n'
+ self.body += signature.rstrip('\r\n') + '\n'
def get_am_subject(self, indicate_reroll=True):
# Return a clean patch subject
@@ -1588,11 +1797,39 @@ class LoreMessage:
return '[%s] %s' % (' '.join(parts), self.lsubject.subject)
- def get_am_message(self, add_trailers=True, trailer_order=None, copyccs=False):
+ def get_am_message(self, add_trailers=True, addmysob=False, extras=None, copyccs=False, allowbadchars=False):
if add_trailers:
- self.fix_trailers(trailer_order=trailer_order, copyccs=copyccs)
+ self.fix_trailers(copyccs=copyccs, addmysob=addmysob, extras=extras)
+ bbody = self.body.encode()
+ # Look through the body to make sure there aren't any suspicious unicode control flow chars
+ # First, encode into ascii and compare for a quickie utf8 presence test
+ if not allowbadchars and self.body.encode('ascii', errors='replace') != bbody:
+ import unicodedata
+ logger.debug('Body contains non-ascii characters. Running Unicode Cf char tests.')
+ for line in self.body.split('\n'):
+ # Does this line have any unicode?
+ if line.encode() == line.encode('ascii', errors='replace'):
+ continue
+ ucats = {unicodedata.category(ch) for ch in line.rstrip('\r')}
+ # If we have Cf (control flow characters) but not Lo ("letter other") characters,
+ # indicating a language other than latin, then there's likely something funky going on
+ if 'Cf' in ucats and 'Lo' not in ucats:
+ # find the offending char
+ at = 0
+ for c in line.rstrip('\r'):
+ if unicodedata.category(c) == 'Cf':
+ logger.critical('---')
+ logger.critical('WARNING: Message contains suspicious unicode control characters!')
+ logger.critical(' Subject: %s', self.full_subject)
+ logger.critical(' Line: %s', line.rstrip('\r'))
+ logger.critical(' ------%s^', '-'*at)
+ logger.critical(' Char: %s (%s)', unicodedata.name(c), hex(ord(c)))
+ logger.critical(' If you are sure about this, rerun with the right flag to allow.')
+ sys.exit(1)
+ at += 1
+
am_msg = email.message.EmailMessage()
- am_msg.set_payload(self.body.encode())
+ am_msg.set_payload(bbody)
am_msg.add_header('Subject', self.get_am_subject(indicate_reroll=False))
if self.fromname:
am_msg.add_header('From', f'{self.fromname} <{self.fromemail}>')
@@ -1680,9 +1917,11 @@ class LoreSubject:
subject = re.sub(r'^\s*\[[^]]*]\s*', '', subject)
self.subject = subject
- def get_slug(self):
- unsafe = '%04d_%s' % (self.counter, self.subject)
- return re.sub(r'\W+', '_', unsafe).strip('_').lower()
+ def get_slug(self, sep='_', with_counter: bool = True):
+ unsafe = self.subject
+ if with_counter:
+ unsafe = '%04d%s%s' % (self.counter, sep, unsafe)
+ return re.sub(r'\W+', sep, unsafe).strip(sep).lower()
def __repr__(self):
out = list()
@@ -1742,7 +1981,7 @@ class LoreAttestor:
else:
mode = self.mode
- return '%s/%s' % (mode, self.identity)
+ return '%s/%s' % (mode, self.identity.lower())
def check_time_drift(self, emldate, maxdays: int = 30) -> bool:
if not self.passing or self.signtime is None:
@@ -1763,13 +2002,13 @@ class LoreAttestor:
return False
if self.level == 'domain':
- if emlfrom.endswith('@' + self.identity):
+ if emlfrom.lower().endswith('@' + self.identity.lower()):
logger.debug('PASS : sig domain %s matches from identity %s', self.identity, emlfrom)
return True
self.errors.append('signing domain %s does not match From: %s' % (self.identity, emlfrom))
return False
- if emlfrom == self.identity:
+ if emlfrom.lower() == self.identity.lower():
logger.debug('PASS : sig identity %s matches from identity %s', self.identity, emlfrom)
return True
self.errors.append('signing identity %s does not match From: %s' % (self.identity, emlfrom))
@@ -1805,8 +2044,11 @@ class LoreAttestorDKIM(LoreAttestor):
self.keysrc = 'DNS'
self.signtime = signtime
self.passing = passing
- self.identity = identity.lstrip('@')
self.errors = errors
+ if identity.find('@') >= 0:
+ self.identity = identity.split('@')[1]
+ else:
+ self.identity = identity
class LoreAttestorPatatt(LoreAttestor):
@@ -1827,10 +2069,21 @@ class LoreAttestorPatatt(LoreAttestor):
self.have_key = True
-def _run_command(cmdargs: list, stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
+def _run_command(cmdargs: List[str], stdin: Optional[bytes] = None,
+ rundir: Optional[str] = None) -> Tuple[int, bytes, bytes]:
+ if rundir:
+ logger.debug('Changing dir to %s', rundir)
+ curdir = os.getcwd()
+ os.chdir(rundir)
+ else:
+ curdir = None
+
logger.debug('Running %s' % ' '.join(cmdargs))
sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
(output, error) = sp.communicate(input=stdin)
+ if curdir:
+ logger.debug('Changing back into %s', curdir)
+ os.chdir(curdir)
return sp.returncode, output, error
@@ -1846,7 +2099,7 @@ def gpg_run_command(args: List[str], stdin: Optional[bytes] = None) -> Tuple[int
def git_run_command(gitdir: Optional[str], args: List[str], stdin: Optional[bytes] = None,
- logstderr: bool = False) -> Tuple[int, str]:
+ logstderr: bool = False, decode: bool = True) -> Tuple[int, Union[str, bytes]]:
cmdargs = ['git', '--no-pager']
if gitdir:
if os.path.exists(os.path.join(gitdir, '.git')):
@@ -1856,10 +2109,12 @@ def git_run_command(gitdir: Optional[str], args: List[str], stdin: Optional[byte
ecode, out, err = _run_command(cmdargs, stdin=stdin)
- out = out.decode(errors='replace')
+ if decode:
+ out = out.decode(errors='replace')
if logstderr and len(err.strip()):
- err = err.decode(errors='replace')
+ if decode:
+ err = err.decode(errors='replace')
logger.debug('Stderr: %s', err)
out += err
@@ -1878,6 +2133,13 @@ def git_get_command_lines(gitdir: Optional[str], args: list) -> List[str]:
return lines
+def git_get_repo_status(gitdir: Optional[str] = None, untracked: bool = False) -> List[str]:
+ args = ['status', '--porcelain=v1']
+ if not untracked:
+ args.append('--untracked-files=no')
+ return git_get_command_lines(gitdir, args)
+
+
@contextmanager
def git_temp_worktree(gitdir=None, commitish=None):
"""Context manager that creates a temporary work tree and chdirs into it. The
@@ -1893,7 +2155,7 @@ def git_temp_worktree(gitdir=None, commitish=None):
yield dfn
finally:
if dfn is not None:
- git_run_command(gitdir, ['worktree', 'remove', dfn])
+ git_run_command(gitdir, ['worktree', 'remove', '--force', dfn])
@contextmanager
@@ -1926,6 +2188,12 @@ def in_directory(dirname):
os.chdir(cdir)
+def git_set_config(fullpath: Optional[str], param: str, value: str, operation: str = '--replace-all'):
+ args = ['config', operation, param, value]
+ ecode, out = git_run_command(fullpath, args)
+ return ecode
+
+
def get_config_from_git(regexp: str, defaults: Optional[dict] = None, multivals: Optional[list] = None) -> dict:
if multivals is None:
multivals = list()
@@ -1962,9 +2230,6 @@ def get_main_config() -> dict:
config = get_config_from_git(r'b4\..*', defaults=DEFAULT_CONFIG, multivals=['keyringsrc'])
# Legacy name was get-lore-mbox, so load those as well
config = get_config_from_git(r'get-lore-mbox\..*', defaults=config)
- config['trailer-order'] = config['trailer-order'].split(',')
- config['trailer-order'].remove('*')
- config['trailer-order'].append('*')
config['listid-preference'] = config['listid-preference'].split(',')
config['listid-preference'].remove('*')
config['listid-preference'].append('*')
@@ -2073,7 +2338,7 @@ def get_requests_session():
return REQSESSION
-def get_msgid_from_stdin():
+def get_msgid_from_stdin() -> Optional[str]:
if not sys.stdin.isatty():
from email.parser import BytesParser
message = BytesParser().parsebytes(
@@ -2082,7 +2347,7 @@ def get_msgid_from_stdin():
return None
-def get_msgid(cmdargs) -> Optional[str]:
+def get_msgid(cmdargs: argparse.Namespace) -> Optional[str]:
if not cmdargs.msgid:
logger.debug('Getting Message-ID from stdin')
msgid = get_msgid_from_stdin()
@@ -2094,9 +2359,24 @@ def get_msgid(cmdargs) -> Optional[str]:
msgid = msgid.strip('<>')
# Handle the case when someone pastes a full URL to the message
+ # Is this a patchwork URL?
+ matches = re.search(r'^https?://.*/project/.*/patch/([^/]+@[^/]+)', msgid, re.IGNORECASE)
+ if matches:
+ logger.debug('Looks like a patchwork URL')
+ chunks = matches.groups()
+ msgid = urllib.parse.unquote(chunks[0])
+ return msgid
+
+ # Does it look like a public-inbox URL?
matches = re.search(r'^https?://[^/]+/([^/]+)/([^/]+@[^/]+)', msgid, re.IGNORECASE)
if matches:
chunks = matches.groups()
+ config = get_main_config()
+ myloc = urllib.parse.urlparse(config['midmask'])
+ wantloc = urllib.parse.urlparse(msgid)
+ if myloc.netloc != wantloc.netloc:
+ logger.debug('Overriding midmask with passed url parameters')
+ config['midmask'] = f'{wantloc.scheme}://{wantloc.netloc}/{chunks[0]}/%s'
msgid = urllib.parse.unquote(chunks[1])
# Infer the project name from the URL, if possible
if chunks[0] != 'r':
@@ -2108,8 +2388,9 @@ def get_msgid(cmdargs) -> Optional[str]:
return msgid
-def get_strict_thread(msgs, msgid):
+def get_strict_thread(msgs, msgid, noparent=False):
want = {msgid}
+ ignore = set()
got = set()
seen = set()
maybe = dict()
@@ -2117,6 +2398,8 @@ def get_strict_thread(msgs, msgid):
while True:
for msg in msgs:
c_msgid = LoreMessage.get_clean_msgid(msg)
+ if c_msgid in ignore:
+ continue
seen.add(c_msgid)
if c_msgid in got:
continue
@@ -2128,7 +2411,16 @@ def get_strict_thread(msgs, msgid):
msgrefs += email.utils.getaddresses([str(x) for x in msg.get_all('in-reply-to', [])])
if msg.get('References', None):
msgrefs += email.utils.getaddresses([str(x) for x in msg.get_all('references', [])])
+ # If noparent is set, we pretend the message we got passed has no references, and add all
+ # parent references of this message to ignore
+ if noparent and msgid == c_msgid:
+ logger.info('Breaking thread to remove parents of %s', msgid)
+ ignore = set([x[1] for x in msgrefs])
+ msgrefs = list()
+
for ref in set([x[1] for x in msgrefs]):
+ if ref in ignore:
+ continue
if ref in got or ref in want:
want.add(c_msgid)
elif len(ref):
@@ -2166,7 +2458,7 @@ def get_strict_thread(msgs, msgid):
return None
if len(msgs) > len(strict):
- logger.debug('Reduced mbox to strict matches only (%s->%s)', len(msgs), len(strict))
+ logger.debug('Reduced thread to requested matches only (%s->%s)', len(msgs), len(strict))
return strict
@@ -2186,52 +2478,113 @@ def mailsplit_bytes(bmbox: bytes, outdir: str) -> list:
return msgs
-def get_pi_thread_by_url(t_mbx_url, nocache=False):
+def get_pi_search_results(query: str, nocache: bool = False):
+ config = get_main_config()
+ searchmask = config.get('searchmask')
+ if not searchmask:
+ logger.critical('b4.searchmask is not defined')
+ return None
msgs = list()
- cachedir = get_cache_file(t_mbx_url, 'pi.msgs')
+ query = urllib.parse.quote_plus(query)
+ query_url = searchmask % query
+ cachedir = get_cache_file(query_url, 'pi.msgs')
if os.path.exists(cachedir) and not nocache:
logger.debug('Using cached copy: %s', cachedir)
for msg in os.listdir(cachedir):
with open(os.path.join(cachedir, msg), 'rb') as fh:
msgs.append(email.message_from_binary_file(fh))
- else:
- logger.critical('Grabbing thread from %s', t_mbx_url.split('://')[1])
- session = get_requests_session()
- resp = session.get(t_mbx_url)
- if resp.status_code != 200:
- logger.critical('Server returned an error: %s', resp.status_code)
- return None
- t_mbox = gzip.decompress(resp.content)
- resp.close()
- if not len(t_mbox):
- logger.critical('No messages found for that query')
- return None
- # Convert into individual files using git-mailsplit
- with tempfile.TemporaryDirectory(suffix='-mailsplit') as tfd:
- msgs = mailsplit_bytes(t_mbox, tfd)
- if os.path.exists(cachedir):
- shutil.rmtree(cachedir)
- shutil.copytree(tfd, cachedir)
+ return msgs
+
+ loc = urllib.parse.urlparse(query_url)
+ logger.info('Grabbing search results from %s', loc.netloc)
+ session = get_requests_session()
+ # For the query to retrieve a mbox file, we need to send a POST request
+ resp = session.post(query_url, data='')
+ if resp.status_code == 404:
+ logger.info('Nothing matching that query.')
+ return None
+ if resp.status_code != 200:
+ logger.info('Server returned an error: %s', resp.status_code)
+ return None
+ t_mbox = gzip.decompress(resp.content)
+ resp.close()
+ if not len(t_mbox):
+ logger.critical('No messages found for that query')
+ return None
+
+ return split_and_dedupe_pi_results(t_mbox, cachedir=cachedir)
+
+
+def split_and_dedupe_pi_results(t_mbox: bytes, cachedir: Optional[str] = None) -> List[email.message.Message]:
+ # Convert into individual files using git-mailsplit
+ with tempfile.TemporaryDirectory(suffix='-mailsplit') as tfd:
+ msgs = mailsplit_bytes(t_mbox, tfd)
deduped = dict()
+
for msg in msgs:
msgid = LoreMessage.get_clean_msgid(msg)
if msgid in deduped:
deduped[msgid] = LoreMessage.get_preferred_duplicate(deduped[msgid], msg)
continue
deduped[msgid] = msg
- return list(deduped.values())
+
+ msgs = list(deduped.values())
+ if cachedir:
+ if os.path.exists(cachedir):
+ shutil.rmtree(cachedir)
+ pathlib.Path(cachedir).mkdir(parents=True, exist_ok=True)
+ for at, msg in enumerate(msgs):
+ with open(os.path.join(cachedir, '%04d' % at), 'wb') as fh:
+ fh.write(msg.as_bytes())
+
+ return msgs
-def get_pi_thread_by_msgid(msgid, useproject=None, nocache=False, onlymsgids: Optional[set] = None):
+def get_pi_thread_by_url(t_mbx_url: str, nocache: bool = False):
+ msgs = list()
+ cachedir = get_cache_file(t_mbx_url, 'pi.msgs')
+ if os.path.exists(cachedir) and not nocache:
+ logger.debug('Using cached copy: %s', cachedir)
+ for msg in os.listdir(cachedir):
+ with open(os.path.join(cachedir, msg), 'rb') as fh:
+ msgs.append(email.message_from_binary_file(fh))
+ return msgs
+
+ logger.critical('Grabbing thread from %s', t_mbx_url.split('://')[1])
+ session = get_requests_session()
+ resp = session.get(t_mbx_url)
+ if resp.status_code == 404:
+ logger.critical('That message-id is not known.')
+ return None
+ if resp.status_code != 200:
+ logger.critical('Server returned an error: %s', resp.status_code)
+ return None
+ t_mbox = gzip.decompress(resp.content)
+ resp.close()
+ if not len(t_mbox):
+ logger.critical('No messages found for that query')
+ return None
+
+ return split_and_dedupe_pi_results(t_mbox, cachedir=cachedir)
+
+
+def get_pi_thread_by_msgid(msgid: str, useproject: Optional[str] = None, nocache: bool = False,
+ onlymsgids: Optional[set] = None) -> Optional[list]:
qmsgid = urllib.parse.quote_plus(msgid)
config = get_main_config()
- # Grab the head from lore, to see where we are redirected
- midmask = config['midmask'] % qmsgid
- loc = urllib.parse.urlparse(midmask)
+ loc = urllib.parse.urlparse(config['midmask'])
+ # The public-inbox instance may provide a unified index at /all/.
+ # In fact, /all/ naming is arbitrary, but for now we are going to
+ # hardcode it to lore.kernel.org settings and maybe make it configurable
+ # in the future, if necessary.
+ if loc.path.startswith('/all/') and not useproject:
+ useproject = 'all'
if useproject:
projurl = '%s://%s/%s' % (loc.scheme, loc.netloc, useproject)
else:
+ # Grab the head from lore, to see where we are redirected
+ midmask = config['midmask'] % qmsgid
logger.info('Looking up %s', midmask)
session = get_requests_session()
resp = session.head(midmask)
@@ -2264,21 +2617,106 @@ def get_pi_thread_by_msgid(msgid, useproject=None, nocache=False, onlymsgids: Op
return strict
-@contextmanager
-def git_format_patches(gitdir, start, end, prefixes=None, extraopts=None):
- with tempfile.TemporaryDirectory() as tmpd:
- gitargs = ['format-patch', '--cover-letter', '-o', tmpd, '--signature', f'b4 {__VERSION__}']
- if prefixes is not None and len(prefixes):
- gitargs += ['--subject-prefix', ' '.join(prefixes)]
- if extraopts:
- gitargs += extraopts
- gitargs += ['%s..%s' % (start, end)]
- ecode, out = git_run_command(gitdir, gitargs)
+def git_range_to_patches(gitdir: Optional[str], start: str, end: str,
+ covermsg: Optional[email.message.EmailMessage] = None,
+ prefixes: Optional[List[str]] = None,
+ msgid_tpt: Optional[str] = None,
+ seriests: Optional[int] = None,
+ mailfrom: Optional[Tuple[str, str]] = None,
+ extrahdrs: Optional[List[Tuple[str, str]]] = None,
+ ignore_commits: Optional[Set[str]] = None,
+ thread: bool = False,
+ keepdate: bool = False) -> List[Tuple[str, email.message.Message]]:
+ patches = list()
+ commits = git_get_command_lines(gitdir, ['rev-list', '--reverse', f'{start}..{end}'])
+ if not commits:
+ raise RuntimeError(f'Could not run rev-list {start}..{end}')
+ if ignore_commits is None:
+ ignore_commits = set()
+ for commit in commits:
+ if commit in ignore_commits:
+ logger.debug('Ignoring commit %s', commit)
+ continue
+ ecode, out = git_run_command(gitdir, ['show', '--format=email', '--encoding=utf-8', commit], decode=False)
if ecode > 0:
- logger.critical('ERROR: Could not convert pull request into patches')
- logger.critical(out)
- yield None
- yield tmpd
+ raise RuntimeError(f'Could not get a patch out of {commit}')
+ msg = email.message_from_bytes(out)
+ msg.set_charset('utf-8')
+ msg.replace_header('Content-Transfer-Encoding', '8bit')
+ logger.debug(' %s', msg.get('Subject'))
+
+ patches.append((commit, msg))
+
+ startfrom = 1
+ fullcount = len(patches)
+ patches.insert(0, (None, covermsg))
+ if covermsg:
+ startfrom = 0
+
+ # Go through and apply any outstanding fixes
+ if prefixes:
+ prefixes = ' ' + ' '.join(prefixes)
+ else:
+ prefixes = ''
+
+ for counter in range(startfrom, fullcount+1):
+ msg = patches[counter][1]
+ subject = msg.get('Subject')
+ csubject = re.sub(r'^\[PATCH]\s*', '', subject)
+ pline = '[PATCH%s %s/%s]' % (prefixes, str(counter).zfill(len(str(fullcount))), fullcount)
+ msg.replace_header('Subject', f'{pline} {csubject}')
+ inbodyhdrs = list()
+ if mailfrom:
+ # Move the original From and Date into the body
+ origfrom = msg.get('From')
+ if origfrom:
+ origfrom = LoreMessage.clean_header(origfrom)
+ origpair = email.utils.parseaddr(origfrom)
+ if origpair[1] != mailfrom[1]:
+ msg.replace_header('From', format_addrs([mailfrom]))
+ inbodyhdrs.append(f'From: {origfrom}')
+ else:
+ msg.add_header('From', format_addrs([mailfrom]))
+
+ if seriests:
+ patchts = seriests + counter
+ origdate = msg.get('Date')
+ if origdate:
+ if keepdate:
+ inbodyhdrs.append(f'Date: {origdate}')
+ msg.replace_header('Date', email.utils.formatdate(patchts, localtime=True))
+ else:
+ msg.add_header('Date', email.utils.formatdate(patchts, localtime=True))
+
+ payload = msg.get_payload()
+ if inbodyhdrs:
+ payload = '\n'.join(inbodyhdrs) + '\n\n' + payload
+ if not payload.find('\n-- \n') > 0:
+ payload += f'\n-- \nb4 {__VERSION__}\n'
+ msg.set_payload(payload, charset='utf-8')
+
+ if extrahdrs is None:
+ extrahdrs = list()
+ for hdrname, hdrval in extrahdrs:
+ try:
+ msg.replace_header(hdrname, hdrval)
+ except KeyError:
+ msg.add_header(hdrname, hdrval)
+
+ if msgid_tpt:
+ msg.add_header('Message-Id', msgid_tpt % str(counter))
+ refto = None
+ if counter > 0 and covermsg:
+ # Thread to the cover letter
+ refto = msgid_tpt % str(0)
+ if counter > 1 and not covermsg:
+ # Tread to the first patch
+ refto = msgid_tpt % str(1)
+ if refto and thread:
+ msg.add_header('References', refto)
+ msg.add_header('In-Reply-To', refto)
+
+ return patches
def git_commit_exists(gitdir, commit_id):
@@ -2362,16 +2800,16 @@ def check_gpg_status(status: str) -> Tuple[bool, bool, bool, Optional[str], Opti
signtime = None
# Do we have a BADSIG?
- bs_matches = re.search(r'^\[GNUPG:] BADSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M)
+ bs_matches = re.search(r'^\[GNUPG:] BADSIG ([\dA-F]+)\s+(.*)$', status, flags=re.M)
if bs_matches:
keyid = bs_matches.groups()[0]
return good, valid, trusted, keyid, signtime
- gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M)
+ gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([\dA-F]+)\s+(.*)$', status, flags=re.M)
if gs_matches:
good = True
keyid = gs_matches.groups()[0]
- vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)
+ vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([\dA-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)
if vs_matches:
valid = True
signtime = vs_matches.groups()[2]
@@ -2459,3 +2897,341 @@ def get_mailinfo(bmsg: bytes, scissors: bool = False) -> Tuple[dict, bytes, byte
with open(p_out, 'rb') as pfh:
p = pfh.read()
return i, m, p
+
+
+def read_template(tptfile):
+ # bubbles up FileNotFound
+ tpt = ''
+ if tptfile.find('~') >= 0:
+ tptfile = os.path.expanduser(tptfile)
+ if tptfile.find('$') >= 0:
+ tptfile = os.path.expandvars(tptfile)
+ with open(tptfile, 'r', encoding='utf-8') as fh:
+ for line in fh:
+ if len(line) and line[0] == '#':
+ continue
+ tpt += line
+ return tpt
+
+
+def get_smtp(identity: Optional[str] = None,
+ dryrun: bool = False) -> Tuple[Union[smtplib.SMTP, smtplib.SMTP_SSL, list, None], str]:
+ # Get the default settings first
+ _basecfg = get_config_from_git(r'sendemail\.[^.]+$')
+ if identity:
+ # Use this identity to override what we got from the default one
+ sconfig = get_config_from_git(rf'sendemail\.{identity}\..*', defaults=_basecfg)
+ sectname = f'sendemail.{identity}'
+ else:
+ sconfig = _basecfg
+ sectname = 'sendemail'
+ if not len(sconfig):
+ raise smtplib.SMTPException('Unable to find %s settings in any applicable git config' % sectname)
+
+ # Limited support for smtp settings to begin with, but should cover the vast majority of cases
+ fromaddr = sconfig.get('from')
+ server = sconfig.get('smtpserver', 'localhost')
+ port = sconfig.get('smtpserverport', 0)
+ try:
+ port = int(port)
+ except ValueError:
+ raise smtplib.SMTPException('Invalid smtpport entry in %s' % sectname)
+
+ # If server contains slashes, then it's a local command
+ if '/' in server:
+ server = os.path.expanduser(os.path.expandvars(server))
+ sp = shlex.shlex(server, posix=True)
+ sp.whitespace_split = True
+ smtp = list(sp)
+ return smtp, fromaddr
+
+ encryption = sconfig.get('smtpencryption')
+ if dryrun:
+ return None, fromaddr
+
+ logger.info('Connecting to %s:%s', server, port)
+ # We only authenticate if we have encryption
+ if encryption:
+ if encryption in ('tls', 'starttls'):
+ # We do startssl
+ smtp = smtplib.SMTP(server, port)
+ # Introduce ourselves
+ smtp.ehlo()
+ # Start encryption
+ smtp.starttls()
+ # Introduce ourselves again to get new criteria
+ smtp.ehlo()
+ elif encryption in ('ssl', 'smtps'):
+ # We do TLS from the get-go
+ smtp = smtplib.SMTP_SSL(server, port)
+ else:
+ raise smtplib.SMTPException('Unclear what to do with smtpencryption=%s' % encryption)
+
+ # If we got to this point, we should do authentication.
+ auser = sconfig.get('smtpuser')
+ apass = sconfig.get('smtppass')
+ if auser and apass:
+ # Let any exceptions bubble up
+ smtp.login(auser, apass)
+ else:
+ # We assume you know what you're doing if you don't need encryption
+ smtp = smtplib.SMTP(server, port)
+
+ return smtp, fromaddr
+
+
+def get_patchwork_session(pwkey: str, pwurl: str) -> Tuple[requests.Session, str]:
+ session = requests.session()
+ session.headers.update({
+ 'User-Agent': 'b4/%s' % __VERSION__,
+ 'Authorization': f'Token {pwkey}',
+ })
+ url = '/'.join((pwurl.rstrip('/'), 'api', PW_REST_API_VERSION))
+ logger.debug('pw url=%s', url)
+ return session, url
+
+
+def patchwork_set_state(msgids: List[str], state: str) -> bool:
+ # Do we have a pw-key defined in config?
+ config = get_main_config()
+ pwkey = config.get('pw-key')
+ pwurl = config.get('pw-url')
+ pwproj = config.get('pw-project')
+ if not (pwkey and pwurl and pwproj):
+ logger.debug('Patchwork support not configured')
+ return False
+ pses, url = get_patchwork_session(pwkey, pwurl)
+ patches_url = '/'.join((url, 'patches'))
+ tochange = list()
+ seen = set()
+ for msgid in msgids:
+ if msgid in seen:
+ continue
+ # Two calls, first to look up the patch-id, second to update its state
+ params = [
+ ('project', pwproj),
+ ('archived', 'false'),
+ ('msgid', msgid),
+ ]
+ try:
+ logger.debug('looking up patch_id of msgid=%s', msgid)
+ rsp = pses.get(patches_url, params=params, stream=False)
+ rsp.raise_for_status()
+ pdata = rsp.json()
+ for entry in pdata:
+ patch_id = entry.get('id')
+ if patch_id:
+ title = entry.get('name')
+ if entry.get('state') != state:
+ seen.add(msgid)
+ tochange.append((patch_id, title))
+ except requests.exceptions.RequestException as ex:
+ logger.debug('Patchwork REST error: %s', ex)
+
+ if tochange:
+ logger.info('---')
+ loc = urllib.parse.urlparse(pwurl)
+ logger.info('Patchwork: setting state on %s/%s', loc.netloc, pwproj)
+ for patch_id, title in tochange:
+ patchid_url = '/'.join((patches_url, str(patch_id), ''))
+ logger.debug('patchid_url=%s', patchid_url)
+ data = [
+ ('state', state),
+ ]
+ try:
+ rsp = pses.patch(patchid_url, data=data, stream=False)
+ rsp.raise_for_status()
+ newdata = rsp.json()
+ if newdata.get('state') == state:
+ logger.info(' -> %s : %s', state, title)
+ except requests.exceptions.RequestException as ex:
+ logger.debug('Patchwork REST error: %s', ex)
+
+
+def send_mail(smtp: Union[smtplib.SMTP, smtplib.SMTP_SSL, None], msgs: Sequence[email.message.Message],
+ fromaddr: Optional[str], destaddrs: Optional[Union[set, list]] = None,
+ patatt_sign: bool = False, dryrun: bool = False,
+ maxheaderlen: Optional[int] = None, output_dir: Optional[str] = None,
+ use_web_endpoint: bool = False) -> Optional[int]:
+
+ tosend = list()
+ if output_dir is not None:
+ dryrun = True
+ for msg in msgs:
+ if not msg.get('X-Mailer'):
+ msg.add_header('X-Mailer', f'b4 {__VERSION__}')
+ msg.set_charset('utf-8')
+ msg.replace_header('Content-Transfer-Encoding', '8bit')
+ msg.policy = email.policy.EmailPolicy(utf8=True, cte_type='8bit')
+ # Python's sendmail implementation seems to have some logic problems where 8-bit messages are involved.
+ # As far as I understand the difference between 8BITMIME (supported by nearly all smtp servers) and
+ # SMTPUTF8 (supported by very few), SMTPUTF8 is only required when the addresses specified in either
+ # "MAIL FROM" or "RCPT TO" lines of the _protocol exchange_ themselves have 8bit characters, not
+ # anything in the From: header of the DATA payload. Python's smtplib seems to always try to encode
+ # strings as ascii regardless of what was policy was specified.
+ # Work around this by getting the payload as string and then encoding to bytes ourselves.
+ if maxheaderlen is None:
+ if dryrun:
+ # Make it fit the terminal window, but no wider than 120 minus visual padding
+ ts = shutil.get_terminal_size((120, 20))
+ maxheaderlen = ts.columns - 8
+ if maxheaderlen > 112:
+ maxheaderlen = 112
+ else:
+ # Use a sane-ish default (we don't need to stick to 80, but
+ # we need to make sure it's shorter than 255)
+ maxheaderlen = 120
+
+ emldata = msg.as_string(maxheaderlen=maxheaderlen)
+ bdata = emldata.encode()
+ subject = msg.get('Subject', '')
+ ls = LoreSubject(subject)
+ if patatt_sign:
+ import patatt
+ # patatt.logger = logger
+ bdata = patatt.rfc2822_sign(bdata)
+ if dryrun:
+ if output_dir:
+ filen = '%s.eml' % ls.get_slug(sep='-')
+ logger.info(' %s', filen)
+ write_to = os.path.join(output_dir, filen)
+ with open(write_to, 'wb') as fh:
+ fh.write(bdata)
+ continue
+ logger.info(' --- DRYRUN: message follows ---')
+ logger.info(' | ' + bdata.decode().rstrip().replace('\n', '\n | '))
+ logger.info(' --- DRYRUN: message ends ---')
+ continue
+ if not destaddrs:
+ alldests = email.utils.getaddresses([str(x) for x in msg.get_all('to', [])])
+ alldests += email.utils.getaddresses([str(x) for x in msg.get_all('cc', [])])
+ destaddrs = {x[1] for x in alldests}
+
+ tosend.append((destaddrs, bdata, ls))
+
+ if not len(tosend):
+ return 0
+
+ logger.info('---')
+ # Do we have an endpoint defined?
+ config = get_main_config()
+ endpoint = config.get('send-endpoint-web')
+ if use_web_endpoint and endpoint:
+ logger.info('Sending via web endpoint %s', endpoint)
+ req = {
+ 'action': 'receive',
+ 'messages': [x[1].decode() for x in tosend],
+ }
+ ses = get_requests_session()
+ res = ses.post(endpoint, json=req)
+ try:
+ rdata = res.json()
+ if rdata.get('result') == 'success':
+ return len(tosend)
+ except Exception as ex: # noqa
+ logger.critical('Odd response from the endpoint: %s', res.text)
+ return 0
+
+ if rdata.get('result') == 'error':
+ logger.critical('Error from endpoint: %s', rdata.get('message'))
+ return 0
+
+ sent = 0
+ if isinstance(smtp, list):
+ # This is a local command
+ logger.info('Sending via "%s"', ' '.join(smtp))
+ for destaddrs, bdata, lsubject in tosend:
+ logger.info(' %s', lsubject.full_subject)
+ ecode, out, err = _run_command(smtp, stdin=bdata)
+ if ecode > 0:
+ raise RuntimeError('Error running %s: %s' % (' '.join(smtp), err.decode()))
+ sent += 1
+
+ elif smtp:
+ for destaddrs, bdata, lsubject in tosend:
+ # Force compliant eols
+ bdata = re.sub(rb'\r\n|\n|\r(?!\n)', b'\r\n', bdata)
+ logger.info(' %s', lsubject.full_subject)
+ smtp.sendmail(fromaddr, destaddrs, bdata)
+ sent += 1
+
+ return sent
+
+
+def git_get_current_branch(gitdir: Optional[str] = None, short: bool = True) -> Optional[str]:
+ gitargs = ['symbolic-ref', '-q', 'HEAD']
+ ecode, out = git_run_command(gitdir, gitargs)
+ if ecode > 0:
+ logger.critical('Not able to get current branch (git symbolic-ref HEAD)')
+ return None
+ mybranch = out.strip()
+ if short:
+ return re.sub(r'^refs/heads/', '', mybranch)
+ return mybranch
+
+
+def get_excluded_addrs() -> Set[str]:
+ config = get_main_config()
+ excludes = set()
+ c_excludes = config.get('email-exclude')
+ if c_excludes:
+ for entry in c_excludes.split(','):
+ excludes.add(entry.strip())
+
+ return excludes
+
+
+def cleanup_email_addrs(addresses: List[Tuple[str, str]], excludes: Set[str],
+ gitdir: Optional[str]) -> List[Tuple[str, str]]:
+ global MAILMAP_INFO
+ for entry in list(addresses):
+ # Only qualified addresses, please
+ if not len(entry[1].strip()) or '@' not in entry[1]:
+ addresses.remove(entry)
+ continue
+ # Check if it's in excludes
+ removed = False
+ for exclude in excludes:
+ if fnmatch.fnmatch(entry[1], exclude):
+ logger.debug('Removed %s due to matching %s', entry[1], exclude)
+ addresses.remove(entry)
+ removed = True
+ break
+ if removed:
+ continue
+ # Check if it's mailmap-replaced
+ if entry[1] in MAILMAP_INFO:
+ if MAILMAP_INFO[entry[1]]:
+ addresses.remove(entry)
+ addresses.append(MAILMAP_INFO[entry[1]])
+ continue
+ logger.debug('Checking if %s is mailmap-replaced', entry[1])
+ args = ['check-mailmap', f'<{entry[1]}>']
+ ecode, out = git_run_command(gitdir, args)
+ if ecode != 0:
+ MAILMAP_INFO[entry[1]] = None
+ continue
+ replacement = email.utils.getaddresses([out.strip()])
+ if len(replacement) == 1:
+ if entry[1] == replacement[0][1]:
+ MAILMAP_INFO[entry[1]] = None
+ continue
+ logger.debug('Replaced %s with mailmap-updated %s', entry[1], replacement[0][1])
+ MAILMAP_INFO[entry[1]] = replacement[0]
+ addresses.remove(entry)
+ addresses.append(replacement[0])
+
+ return addresses
+
+
+def get_email_signature() -> str:
+ usercfg = get_user_config()
+ # Do we have a .signature file?
+ sigfile = os.path.join(str(Path.home()), '.signature')
+ if os.path.exists(sigfile):
+ with open(sigfile, 'r', encoding='utf-8') as fh:
+ signature = fh.read()
+ else:
+ signature = '%s <%s>' % (usercfg['name'], usercfg['email'])
+
+ return signature