aboutsummaryrefslogtreecommitdiff
path: root/b4/__init__.py
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-03-14 21:54:40 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2020-03-14 21:54:40 -0400
commitae57d6ea0b7abb7f945cac6010f5c9b28b041dde (patch)
treee21b608b991c7d1be5c4d94c1e51feecefbcc628 /b4/__init__.py
downloadb4-ae57d6ea0b7abb7f945cac6010f5c9b28b041dde.tar.gz
Initial commit after porting from korg-helpers
This is the beginning of a new tool that inherits from get-lore-mbox and attest-patches. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
Diffstat (limited to 'b4/__init__.py')
-rw-r--r--b4/__init__.py1226
1 files changed, 1226 insertions, 0 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
new file mode 100644
index 0000000..217508e
--- /dev/null
+++ b/b4/__init__.py
@@ -0,0 +1,1226 @@
+# -*- coding: utf-8 -*-
+# SPDX-License-Identifier: GPL-2.0-or-later
+# Copyright (C) 2020 by the Linux Foundation
+import subprocess
+import logging
+import hashlib
+import re
+import os
+import fnmatch
+import time
+import email.utils
+import email.policy
+import requests
+import urllib.parse
+
+from tempfile import mkstemp
+
+from email import charset
+charset.add_charset('utf-8', None)
+emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None)
+
+VERSION = '0.3.0'
+ATTESTATION_FORMAT_VER = '0.1'
+
+logger = logging.getLogger('b4')
+
+HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@')
+FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)')
+
+PASS_SIMPLE = '[P]'
+FAIL_SIMPLE = '[F]'
+PASS_FANCY = '[\033[32m✓\033[0m]'
+FAIL_FANCY = '[\033[31m✗\033[0m]'
+
+# You can use bash-style globbing here
+WANTHDRS = [
+ 'sender',
+ 'from',
+ 'to',
+ 'cc',
+ 'subject',
+ 'date',
+ 'message-id',
+ 'resent-message-id',
+ 'reply-to',
+ 'in-reply-to',
+ 'references',
+ 'list-id',
+ 'errors-to',
+ 'x-mailing-list',
+ 'resent-to',
+]
+
+# You can use bash-style globbing here
+# end with '*' to include any other trailers
+# You can change the default in your ~/.gitconfig, e.g.:
+# [b4]
+# # remember to end with ,*
+# trailer-order=link*,fixes*,cc*,reported*,suggested*,original*,co-*,tested*,reviewed*,acked*,signed-off*,*
+DEFAULT_TRAILER_ORDER = 'fixes*,reported*,suggested*,original*,co-*,signed-off*,tested*,reviewed*,acked*,cc*,link*,*'
+
+LOREADDR = 'https://lore.kernel.org'
+
+DEFAULT_CONFIG = {
+ 'midmask': LOREADDR + '/r/%s',
+ 'linkmask': LOREADDR + '/r/%s',
+ 'trailer-order': DEFAULT_TRAILER_ORDER,
+ # off: do not bother checking attestation
+ # check: print an attaboy when attestation is found
+ # softfail: print a warning when no attestation found
+ # hardfail: exit with an error when no attestation found
+ 'attestation-policy': 'check',
+ # "gpg" (whatever gpg is configured to do) or "tofu" to force tofu mode
+ 'attestation-trust-model': 'gpg',
+ # strict: must match one of the uids on the key to pass
+ # loose: any valid and trusted key will be accepted
+ 'attestation-uid-match': 'loose',
+ # NB! This whole behaviour will change once public-inbox
+ # gains support for cross-list searches
+ 'attestation-query-url': LOREADDR + '/signatures',
+ # We'll use the default gnupg homedir, unless you set it here
+ 'attestation-gnupghome': None,
+ # Do you like simple or fancy checkmarks?
+ 'attestation-checkmarks': 'fancy',
+ # If this is not set, we'll use what we find in
+ # git-config for gpg.program, and if that's not set,
+ # we'll use "gpg" and hope for the better
+ 'gpgbin': None,
+}
+
+# This is where we store actual config
+MAIN_CONFIG = None
+# This is git-config user.*
+USER_CONFIG = None
+
+# Used for tracking attestations we have already looked up
+ATTESTATIONS = list()
+# Used for keeping a cache of subkey lookups to minimize shelling out to gpg
+SUBKEY_DATA = dict()
+# Used for storing our requests session
+REQSESSION = None
+
+
+class LoreMailbox:
+ def __init__(self):
+ self.msgid_map = dict()
+ self.series = dict()
+ self.followups = list()
+ self.unknowns = list()
+
+ def __repr__(self):
+ out = list()
+ for key, lser in self.series.items():
+ out.append(str(lser))
+ out.append('--- Followups ---')
+ for lmsg in self.followups:
+ out.append(' %s' % lmsg.full_subject)
+ out.append('--- Unknowns ---')
+ for lmsg in self.unknowns:
+ out.append(' %s' % lmsg.full_subject)
+
+ return '\n'.join(out)
+
+ def get_by_msgid(self, msgid):
+ if msgid in self.msgid_map:
+ return self.msgid_map[msgid]
+ return None
+
+ def get_series(self, revision=None):
+ if revision is None:
+ if not len(self.series):
+ return None
+ # Use the highest revision
+ revision = max(self.series.keys())
+ elif revision not in self.series.keys():
+ return None
+
+ lser = self.series[revision]
+
+ # Is it empty?
+ empty = True
+ for lmsg in lser.patches:
+ if lmsg is not None:
+ empty = False
+ break
+ if empty:
+ logger.critical('All patches in series v%s are missing.', lser.revision)
+ return None
+
+ # Do we have a cover letter for it?
+ if not lser.has_cover:
+ # Let's find the first patch with an in-reply-to and see if that
+ # is our cover letter
+ for member in lser.patches:
+ if member is not None and member.in_reply_to is not None:
+ potential = self.get_by_msgid(member.in_reply_to)
+ if potential is not None and potential.has_diffstat and not potential.has_diff:
+ # This is *probably* the cover letter
+ lser.patches[0] = potential
+ lser.has_cover = True
+ break
+
+ # Do we have any follow-ups?
+ for fmsg in self.followups:
+ logger.debug('Analyzing follow-up: %s (%s)', fmsg.full_subject, fmsg.fromemail)
+ # If there are no trailers in this one, ignore it
+ if not len(fmsg.trailers):
+ logger.debug(' no trailers found, skipping')
+ continue
+ # if it's for the wrong revision, ignore it
+ if not fmsg.revision_inferred and lser.revision != fmsg.revision:
+ logger.debug(' follow-up for the wrong revision, skipping')
+ continue
+ # Go up through the follow-ups and tally up trailers until
+ # we either run out of in-reply-tos, or we find a patch in
+ # our series
+ if fmsg.in_reply_to is None:
+ # Check if there's something matching in References
+ refs = fmsg.msg.get('References', '')
+ pmsg = None
+ for ref in refs.split():
+ refid = ref.strip('<>')
+ if refid in self.msgid_map and refid != fmsg.msgid:
+ pmsg = self.msgid_map[refid]
+ break
+ if pmsg is None:
+ # Can't find the message we're replying to here
+ continue
+ else:
+ pmsg = self.msgid_map[fmsg.in_reply_to]
+
+ trailers = fmsg.trailers
+ lvl = 1
+ while True:
+ logger.debug('%sParent: %s', ' ' * lvl, pmsg.full_subject)
+ logger.debug('%sTrailers:', ' ' * lvl)
+ for trailer in set(trailers):
+ logger.debug('%s%s: %s', ' ' * (lvl+1), trailer[0], trailer[1])
+ found = False
+ if lser.revision != pmsg.revision:
+ break
+ for lmsg in lser.patches:
+ if lmsg is not None and lmsg.msgid == pmsg.msgid:
+ # Confirmed, this is our parent patch
+ lmsg.followup_trailers += trailers
+ found = True
+ break
+ if found:
+ break
+ elif pmsg.in_reply_to and pmsg.in_reply_to in self.msgid_map:
+ lvl += 1
+ trailers += pmsg.trailers
+ pmsg = self.msgid_map[pmsg.in_reply_to]
+ else:
+ break
+
+ return lser
+
+ def add_message(self, msg):
+ lmsg = LoreMessage(msg)
+ logger.debug('Looking at: %s', lmsg.full_subject)
+ self.msgid_map[lmsg.msgid] = lmsg
+
+ if lmsg.has_diff or lmsg.has_diffstat:
+ if lmsg.revision not in self.series:
+ self.series[lmsg.revision] = LoreSeries(lmsg.revision, lmsg.expected)
+ if len(self.series) > 1:
+ logger.info('Found new series v%s', lmsg.revision)
+ if lmsg.has_diff:
+ # Attempt to auto-number series from the same author who did not bother
+ # to set v2, v3, etc in the patch revision
+ if (lmsg.counter == 1 and lmsg.counters_inferred
+ and not lmsg.reply and lmsg.lsubject.patch and not lmsg.lsubject.resend):
+ omsg = self.series[lmsg.revision].patches[lmsg.counter]
+ if (omsg is not None and omsg.counters_inferred and lmsg.fromemail == omsg.fromemail
+ and omsg.date < lmsg.date):
+ lmsg.revision = len(self.series) + 1
+ self.series[lmsg.revision] = LoreSeries(lmsg.revision, lmsg.expected)
+ logger.info('Assuming new revision: v%s (%s)', lmsg.revision, lmsg.full_subject)
+ logger.debug(' adding as patch')
+ self.series[lmsg.revision].add_patch(lmsg)
+ elif lmsg.counter == 0 and lmsg.has_diffstat:
+ # Bona-fide cover letter
+ logger.debug(' adding as cover letter')
+ self.series[lmsg.revision].add_cover(lmsg)
+ elif lmsg.reply:
+ # We'll figure out where this belongs later
+ logger.debug(' adding to followups')
+ self.followups.append(lmsg)
+ elif lmsg.reply:
+ logger.debug(' adding to followups')
+ self.followups.append(lmsg)
+ elif re.search(r'^Comment: att-fmt-ver:', lmsg.body, re.I | re.M):
+ logger.debug('Found attestation message')
+ LoreAttestationDocument.load_from_string(lmsg.msgid, lmsg.body)
+ # We don't keep it, because it's not useful for us beyond this point
+ else:
+ logger.debug(' adding to unknowns')
+ self.unknowns.append(lmsg)
+
+
+class LoreSeries:
+ def __init__(self, revision, expected):
+ self.revision = revision
+ self.expected = expected
+ self.patches = [None] * (expected+1)
+ self.followups = list()
+ self.complete = False
+ self.has_cover = False
+
+ def __repr__(self):
+ out = list()
+ if self.has_cover:
+ out.append('- Series: [v%s] %s' % (self.revision, self.patches[0].subject))
+ elif self.patches[1] is not None:
+ out.append('- Series: [v%s] %s' % (self.revision, self.patches[1].subject))
+ else:
+ out.append('- Series: [v%s] (untitled)' % self.revision)
+
+ out.append(' revision: %s' % self.revision)
+ out.append(' expected: %s' % self.expected)
+ out.append(' complete: %s' % self.complete)
+ out.append(' has_cover: %s' % self.has_cover)
+ out.append(' patches:')
+ at = 0
+ for member in self.patches:
+ if member is not None:
+ out.append(' [%s/%s] %s' % (at, self.expected, member.subject))
+ if member.followup_trailers:
+ out.append(' Add: %s' % ', '.join(member.followup_trailers))
+ else:
+ out.append(' [%s/%s] MISSING' % (at, self.expected))
+ at += 1
+
+ return '\n'.join(out)
+
+ def add_patch(self, lmsg):
+ while len(self.patches) < lmsg.expected + 1:
+ self.patches.append(None)
+ self.expected = lmsg.expected
+ if self.patches[lmsg.counter] is not None:
+ # Okay, weird, is the one in there a reply?
+ omsg = self.patches[lmsg.counter]
+ if omsg.reply or (omsg.counters_inferred and not lmsg.counters_inferred):
+ # Replace that one with this one
+ logger.debug(' replacing existing: %s', omsg.subject)
+ self.patches[lmsg.counter] = lmsg
+ else:
+ self.patches[lmsg.counter] = lmsg
+ self.complete = not (None in self.patches[1:])
+
+ def add_cover(self, lmsg):
+ self.add_patch(lmsg)
+ self.has_cover = True
+
+ def get_slug(self):
+ # Find the first non-None entry
+ lmsg = None
+ for lmsg in self.patches:
+ if lmsg is not None:
+ break
+
+ if lmsg is None:
+ return 'undefined'
+
+ prefix = time.strftime('%Y%m%d', lmsg.date[:9])
+ authorline = email.utils.getaddresses(lmsg.msg.get_all('from', []))[0]
+ author = re.sub(r'\W+', '_', authorline[1]).strip('_').lower()
+ slug = '%s_%s' % (prefix, author)
+ if self.revision != 1:
+ slug = 'v%s_%s' % (self.revision, slug)
+
+ return slug
+
+ def save_am_mbox(self, mbx, noaddtrailers, covertrailers,
+ trailer_order=None, addmysob=False, addlink=False, linkmask=None):
+
+ usercfg = get_user_config()
+ config = get_main_config()
+
+ if addmysob:
+ if 'name' not in usercfg or 'email' not in usercfg:
+ logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email')
+ addmysob = False
+
+ attdata = [None] * self.expected
+ attpolicy = config['attestation-policy']
+ exact_from_match = False
+ if config['attestation-uid-match'] == 'strict':
+ exact_from_match = True
+
+ if config['attestation-checkmarks'] == 'fancy':
+ attpass = PASS_FANCY
+ attfail = FAIL_FANCY
+ else:
+ attpass = PASS_SIMPLE
+ attfail = FAIL_SIMPLE
+
+ at = 1
+ for lmsg in self.patches[1:]:
+ if lmsg is not None:
+ if self.has_cover and covertrailers and self.patches[0].followup_trailers:
+ lmsg.followup_trailers += self.patches[0].followup_trailers
+ if addmysob:
+ lmsg.followup_trailers.append(('Signed-off-by', '%s <%s>' % (usercfg['name'], usercfg['email'])))
+ if addlink:
+ lmsg.followup_trailers.append(('Link', linkmask % lmsg.msgid))
+
+ if attpolicy != 'off':
+ lore_lookup = False
+ if at == 1:
+ # We only hit lore on the first patch
+ lore_lookup = True
+ attdoc = lmsg.get_attestation(lore_lookup=lore_lookup, exact_from_match=exact_from_match)
+ if attdoc is None:
+ if attpolicy in ('softfail', 'hardfail'):
+ logger.info(' %s %s', attfail, lmsg.full_subject)
+ else:
+ logger.info(' %s', lmsg.full_subject)
+ else:
+ logger.info(' %s %s', attpass, lmsg.full_subject)
+ attdata[at-1] = attdoc.attestor.get_trailer(lmsg.fromemail)
+ else:
+ logger.info(' %s', lmsg.full_subject)
+
+ add_trailers = True
+ if noaddtrailers:
+ add_trailers = False
+ msg = lmsg.get_am_message(add_trailers=add_trailers, trailer_order=trailer_order)
+ # Pass a policy that avoids most legacy encoding horrors
+ mbx.add(msg.as_bytes(policy=emlpolicy))
+ else:
+ logger.error(' ERROR: missing [%s/%s]!', at, self.expected)
+ at += 1
+
+ if attpolicy == 'off':
+ return mbx
+ failed = None in attdata
+ if not failed:
+ logger.info(' ---')
+ for trailer in set(attdata):
+ logger.info(' %s %s', attpass, trailer)
+ return mbx
+
+ errors = set()
+ for attdoc in ATTESTATIONS:
+ errors.update(attdoc.errors)
+
+ if errors:
+ logger.critical(' ---')
+ logger.critical(' Attestation is available, but did not succeed:')
+ for error in errors:
+ logger.critical(' %s %s', attfail, error)
+
+ if attpolicy == 'hardfail':
+ import sys
+ sys.exit(128)
+
+ return mbx
+
+ def save_cover(self, outfile):
+ cover_msg = self.patches[0].get_am_message(add_trailers=False, trailer_order=None)
+ with open(outfile, 'w') as fh:
+ fh.write(cover_msg.as_string(policy=emlpolicy))
+ logger.critical('Cover: %s', outfile)
+
+
+class LoreMessage:
+ def __init__(self, msg):
+ self.msg = msg
+ self.msgid = None
+
+ # Subject-based info
+ self.lsubject = None
+ self.full_subject = None
+ self.subject = None
+ self.reply = False
+ self.revision = 1
+ self.counter = 1
+ self.expected = 1
+ self.revision_inferred = True
+ self.counters_inferred = True
+
+ # Header-based info
+ self.in_reply_to = None
+ self.fromname = None
+ self.fromemail = None
+ self.date = None
+
+ # Body and body-based info
+ self.body = None
+ self.has_diff = False
+ self.has_diffstat = False
+ self.trailers = list()
+ self.followup_trailers = list()
+
+ self.attestation = None
+
+ self.msgid = LoreMessage.get_clean_msgid(self.msg)
+ self.lsubject = LoreSubject(msg['Subject'])
+ # Copy them into this object for convenience
+ self.full_subject = self.lsubject.full_subject
+ self.subject = self.lsubject.subject
+ self.reply = self.lsubject.reply
+ self.revision = self.lsubject.revision
+ self.counter = self.lsubject.counter
+ self.expected = self.lsubject.expected
+ self.revision_inferred = self.lsubject.revision_inferred
+ self.counters_inferred = self.lsubject.counters_inferred
+
+ # Handle [PATCH 6/5]
+ if self.counter > self.expected:
+ self.expected = self.counter
+
+ self.in_reply_to = LoreMessage.get_clean_msgid(self.msg, header='In-Reply-To')
+
+ try:
+ fromdata = email.utils.getaddresses(self.msg.get_all('from', []))[0]
+ self.fromname = fromdata[0]
+ self.fromemail = fromdata[1]
+ except IndexError:
+ pass
+
+ self.date = email.utils.parsedate_tz(str(self.msg['Date']))
+
+ diffre = re.compile(r'^(---.*\n\+\+\+|GIT binary patch)', re.M | re.I)
+ diffstatre = re.compile(r'^\s*\d+ file.*\d+ (insertion|deletion)', re.M | re.I)
+
+ # walk until we find the first text/plain part
+ mcharset = self.msg.get_content_charset()
+ if not mcharset:
+ mcharset = 'utf-8'
+
+ for part in msg.walk():
+ cte = part.get_content_type()
+ if cte.find('/plain') < 0 and cte.find('/x-patch') < 0:
+ continue
+ payload = part.get_payload(decode=True)
+ if payload is None:
+ continue
+ pcharset = part.get_content_charset()
+ if not pcharset:
+ pcharset = mcharset
+ payload = payload.decode(pcharset, errors='replace')
+ if self.body is None:
+ self.body = payload
+ continue
+ # If we already found a body, but we now find something that contains a diff,
+ # then we prefer this part
+ if diffre.search(payload):
+ self.body = payload
+
+ if diffstatre.search(self.body):
+ self.has_diffstat = True
+ if diffre.search(self.body):
+ self.has_diff = True
+
+ # We only pay attention to trailers that are sent in reply
+ if self.reply:
+ # Do we have something that looks like a person-trailer?
+ matches = re.findall(r'^\s*([\w-]+):[ \t]+(.*<\S+>)\s*$', self.body, re.MULTILINE)
+ if matches:
+ # Basic sanity checking -- the trailer must match the name or the email
+ # in the From header, to avoid false-positive trailer parsing errors
+ for tname, tvalue in matches:
+ tmatch = False
+ namedata = email.utils.getaddresses([tvalue])[0]
+ tfrom = re.sub(r'\+[^@]+@', '@', namedata[1].lower())
+ hfrom = re.sub(r'\+[^@]+@', '@', self.fromemail.lower())
+ tlname = namedata[0].lower()
+ hlname = self.fromname.lower()
+ tchunks = tfrom.split('@')
+ hchunks = hfrom.split('@')
+ if tfrom == hfrom:
+ logger.debug(' trailer exact email match')
+ tmatch = True
+ # See if domain part of one of the addresses is a subset of the other one,
+ # which should match cases like @linux.intel.com and @intel.com
+ elif (len(tchunks) == 2 and len(hchunks) == 2
+ and tchunks[0] == hchunks[0]
+ and (tchunks[1].find(hchunks[1]) >= 0 or hchunks[1].find(tchunks[1]) >= 0)):
+ logger.debug(' trailer fuzzy email match')
+ tmatch = True
+ # Does the name match, at least?
+ elif tlname == hlname:
+ logger.debug(' trailer exact name match')
+ tmatch = True
+ # Finally, see if the header From has a comma in it and try to find all
+ # parts in the trailer name
+ elif hlname.find(',') > 0:
+ nmatch = True
+ for nchunk in hlname.split(','):
+ if hlname.find(nchunk.strip()) < 0:
+ nmatch = False
+ break
+ if nmatch:
+ logger.debug(' trailer fuzzy name match')
+ tmatch = True
+ if tmatch:
+ self.trailers.append((tname, tvalue))
+ else:
+ logger.debug(' ignoring "%s: %s" due to from mismatch (from: %s %s)', tname, tvalue,
+ self.fromname, self.fromemail)
+
+ def __repr__(self):
+ out = list()
+ out.append('msgid: %s' % self.msgid)
+ out.append(str(self.lsubject))
+
+ out.append(' fromname: %s' % self.fromname)
+ out.append(' fromemail: %s' % self.fromemail)
+ out.append(' date: %s' % str(self.date))
+ out.append(' in_reply_to: %s' % self.in_reply_to)
+
+ # Header-based info
+ out.append(' --- begin body ---')
+ for line in self.body.split('\n'):
+ out.append(' |%s' % line)
+ out.append(' --- end body ---')
+
+ # Body and body-based info
+ out.append(' has_diff: %s' % self.has_diff)
+ out.append(' has_diffstat: %s' % self.has_diffstat)
+ out.append(' --- begin my trailers ---')
+ for trailer in self.trailers:
+ out.append(' |%s' % str(trailer))
+ out.append(' --- begin followup trailers ---')
+ for trailer in self.followup_trailers:
+ out.append(' |%s' % str(trailer))
+ out.append(' --- end trailers ---')
+
+ return '\n'.join(out)
+
+ @staticmethod
+ def clean_header(hdrval):
+ uval = hdrval.replace('\n', ' ')
+ new_hdrval = re.sub(r'\s+', ' ', uval)
+ return new_hdrval.strip()
+
+ @staticmethod
+ def get_clean_msgid(msg, header='Message-Id'):
+ msgid = None
+ raw = msg.get(header)
+ if raw:
+ matches = re.search(r'<([^>]+)>', LoreMessage.clean_header(raw))
+ if matches:
+ msgid = matches.groups()[0]
+ return msgid
+
+ @staticmethod
+ def get_patch_hash(diff):
+ # The aim is to represent the patch as if you did the following:
+ # git diff HEAD~.. | dos2unix | sha256sum
+ #
+ # This subroutine removes anything at the beginning of diff data, like
+ # diffstat or any other auxiliary data, and anything trailing at the end
+ # XXX: This currently doesn't work for git binary patches
+ #
+ diff = diff.replace('\r', '')
+ diff = diff.strip() + '\n'
+
+ # For keeping a buffer of lines preceding @@ ... @@
+ buflines = list()
+
+ phasher = hashlib.sha256()
+
+ # Used for counting where we are in the patch
+ pp = 0
+ for line in diff.split('\n'):
+ hunk_match = HUNK_RE.match(line)
+ if hunk_match:
+ # logger.debug('Crunching %s', line)
+ mlines, plines = hunk_match.groups()
+ pp = int(plines)
+ addlines = list()
+ for bline in reversed(buflines):
+ # Go backward and add lines until we get to the start
+ # or encounter a blank line
+ if len(bline.strip()) == 0:
+ break
+ addlines.append(bline)
+ if addlines:
+ phasher.update(('\n'.join(reversed(addlines)) + '\n').encode('utf-8'))
+ buflines = list()
+ # Feed this line to the hasher
+ phasher.update((line + '\n').encode('utf-8'))
+ continue
+ if pp > 0:
+ # Inside the patch
+ phasher.update((line + '\n').encode('utf-8'))
+ if line[0] != '-':
+ pp -= 1
+ continue
+ # Not anything we recognize, so stick into buflines
+ buflines.append(line)
+
+ return phasher.hexdigest()
+
+ def load_hashes(self):
+ msg_out = mkstemp()
+ patch_out = mkstemp()
+ cmdargs = ['mailinfo', '--encoding=UTF-8', msg_out[1], patch_out[1]]
+ ecode, info = git_run_command(None, cmdargs, self.msg.as_bytes())
+ if ecode > 0:
+ logger.debug('ERROR: Could not get mailinfo')
+ return
+ ihasher = hashlib.sha256()
+
+ for line in info.split('\n'):
+ # We don't use the "Date:" field because it is likely to be
+ # mangled between when git-format-patch generates it and
+ # when it is sent out by git-send-email (or other tools).
+ if re.search(r'^(Author|Email|Subject):', line):
+ ihasher.update((line + '\n').encode('utf-8'))
+ i = ihasher.hexdigest()
+
+ with open(msg_out[1], 'r') as mfh:
+ msg = mfh.read()
+ mhasher = hashlib.sha256()
+ mhasher.update(msg.encode('utf-8'))
+ m = mhasher.hexdigest()
+ os.unlink(msg_out[1])
+
+ p = None
+ with open(patch_out[1], 'r') as pfh:
+ patch = pfh.read()
+ if len(patch.strip()):
+ p = LoreMessage.get_patch_hash(patch)
+ os.unlink(patch_out[1])
+
+ if i and m and p:
+ self.attestation = LoreAttestation(i, m, p)
+
+ def fix_trailers(self, trailer_order=None):
+ bodylines = self.body.split('\n')
+ # Get existing trailers
+ # 1. Find the first ---
+ # 2. Go backwards and grab everything matching ^[\w-]+:\s.*$ until a blank line
+ fixlines = list()
+ trailersdone = False
+ for line in bodylines:
+ if trailersdone:
+ fixlines.append(line)
+ continue
+
+ if line.strip() == '---':
+ # Start going backwards in fixlines
+ btrailers = list()
+ for rline in reversed(fixlines):
+ if not len(rline.strip()):
+ break
+ matches = re.search(r'^([\w-]+):\s+(.*)', rline)
+ if not matches:
+ break
+ fixlines.pop()
+ btrailers.append(matches.groups())
+
+ # Now we add mix-in trailers
+ btrailers.reverse()
+ trailers = btrailers + self.followup_trailers
+ added = list()
+ if trailer_order is None:
+ trailer_order = DEFAULT_TRAILER_ORDER
+ for trailermatch in trailer_order:
+ for trailer in trailers:
+ if trailer in added:
+ continue
+ if fnmatch.fnmatch(trailer[0].lower(), trailermatch.strip()):
+ fixlines.append('%s: %s' % trailer)
+ if trailer not in btrailers:
+ logger.info(' Added: %s: %s' % trailer)
+ else:
+ logger.debug(' Kept: %s: %s' % trailer)
+ added.append(trailer)
+ trailersdone = True
+ fixlines.append(line)
+ self.body = '\n'.join(fixlines)
+
+ def get_am_message(self, add_trailers=True, trailer_order=None):
+ if add_trailers:
+ self.fix_trailers(trailer_order=trailer_order)
+ am_body = self.body
+ am_msg = email.message.EmailMessage()
+ am_msg.set_payload(am_body.encode('utf-8'))
+ # Clean up headers
+ for hdrname, hdrval in self.msg.items():
+ lhdrname = hdrname.lower()
+ wanthdr = False
+ for hdrmatch in WANTHDRS:
+ if fnmatch.fnmatch(lhdrname, hdrmatch):
+ wanthdr = True
+ break
+ if wanthdr:
+ new_hdrval = LoreMessage.clean_header(hdrval)
+ # noinspection PyBroadException
+ try:
+ am_msg.add_header(hdrname, new_hdrval)
+ except:
+ # A broad except to handle any potential weird header conditions
+ pass
+ am_msg.set_charset('utf-8')
+ return am_msg
+
+ def _load_attestation(self, lore_lookup=True):
+ self.load_hashes()
+ if self.attestation:
+ self.attestation.validate(lore_lookup=lore_lookup)
+
+ def get_attestation(self, lore_lookup=True, exact_from_match=True):
+ self._load_attestation(lore_lookup=lore_lookup)
+ if not self.attestation or not self.attestation.passing:
+ return None
+
+ for attdoc in self.attestation.attdocs:
+ if not exact_from_match:
+ # We return the first hit
+ return attdoc
+ # Does this doc have an exact match?
+ uid = attdoc.attestor.get_matching_uid(self.fromemail)
+ if uid[1] == self.fromemail:
+ return attdoc
+ # stick an error in the first available attdoc saying
+ # that exact from match failed
+ self.attestation.attdocs[0].errors.add('Exact UID match failed for %s' % self.fromemail)
+
+ return None
+
+
+class LoreSubject:
+ def __init__(self, subject):
+ # Subject-based info
+ self.full_subject = None
+ self.subject = None
+ self.reply = False
+ self.resend = False
+ self.patch = False
+ self.rfc = False
+ self.revision = 1
+ self.counter = 1
+ self.expected = 1
+ self.revision_inferred = True
+ self.counters_inferred = True
+ self.prefixes = list()
+
+ subject = re.sub(r'\s+', ' ', LoreMessage.clean_header(subject)).strip()
+ # Remove any leading [] that don't have "patch", "resend" or "rfc" in them
+ while True:
+ oldsubj = subject
+ subject = re.sub(r'^\s*\[[^\]]*\]\s*(\[[^\]]*(:?patch|resend|rfc).*)', '\\1', subject, flags=re.IGNORECASE)
+ if oldsubj == subject:
+ break
+
+ # Remove any brackets inside brackets
+ while True:
+ oldsubj = subject
+ subject = re.sub(r'^\s*\[([^\]]*)\[([^\]]*)\]', '[\\1\\2]', subject)
+ subject = re.sub(r'^\s*\[([^\]]*)\]([^\]]*)\]', '[\\1\\2]', subject)
+ if oldsubj == subject:
+ break
+
+ self.full_subject = subject
+ # Is it a reply?
+ if re.search(r'^(Re|Aw|Fwd):', subject, re.I) or re.search(r'^\w{2,3}:\s*\[', subject):
+ self.reply = True
+ subject = re.sub(r'^\w+:\s*\[', '[', subject)
+
+ # Find all [foo] in the title
+ while subject.find('[') == 0:
+ matches = re.search(r'^\[([^\]]*)\]', subject)
+ for chunk in matches.groups()[0].split():
+ # Remove any trailing commas or semicolons
+ chunk = chunk.strip(',;')
+ if re.search(r'^\d{1,3}/\d{1,3}$', chunk):
+ counters = chunk.split('/')
+ self.counter = int(counters[0])
+ self.expected = int(counters[1])
+ self.counters_inferred = False
+ elif re.search(r'^v\d+$', chunk, re.IGNORECASE):
+ self.revision = int(chunk[1:])
+ self.revision_inferred = False
+ elif chunk.lower().find('rfc') == 0:
+ self.rfc = True
+ elif chunk.lower().find('resend') == 0:
+ self.resend = True
+ elif chunk.lower().find('patch') == 0:
+ self.patch = True
+ self.prefixes.append(chunk)
+ subject = re.sub(r'^\s*\[[^\]]*\]\s*', '', subject)
+ self.subject = subject
+
+ def __repr__(self):
+ out = list()
+ out.append(' full_subject: %s' % self.full_subject)
+ out.append(' subject: %s' % self.subject)
+ out.append(' reply: %s' % self.reply)
+ out.append(' resend: %s' % self.resend)
+ out.append(' patch: %s' % self.patch)
+ out.append(' rfc: %s' % self.rfc)
+ out.append(' revision: %s' % self.revision)
+ out.append(' revision_inferred: %s' % self.revision_inferred)
+ out.append(' counter: %s' % self.counter)
+ out.append(' expected: %s' % self.expected)
+ out.append(' counters_inferred: %s' % self.counters_inferred)
+ out.append(' prefixes: %s' % ', '.join(self.prefixes))
+
+ return '\n'.join(out)
+
+
+class LoreAttestor:
+ def __init__(self, keyid):
+ self.keyid = keyid
+ self.uids = list()
+
+ self.load_subkey_uids()
+
+ def load_subkey_uids(self):
+ global SUBKEY_DATA
+ if self.keyid not in SUBKEY_DATA:
+ gpgargs = ['--with-colons', '--list-keys', self.keyid]
+ ecode, keyinfo = gpg_run_command(gpgargs)
+ if ecode > 0:
+ logger.critical('ERROR: Unable to get UIDs list matching key %s', self.keyid)
+ return
+
+ uids = list()
+ for line in keyinfo.split('\n'):
+ if line[:4] != 'uid:':
+ continue
+ chunks = line.split(':')
+ if chunks[1] in ('r',):
+ # Revoked UID, ignore
+ continue
+ uids.append(chunks[9])
+ SUBKEY_DATA[self.keyid] = email.utils.getaddresses(uids)
+
+ self.uids = SUBKEY_DATA[self.keyid]
+
+ def get_primary_uid(self):
+ return self.uids[0]
+
+ def get_matching_uid(self, fromaddr):
+ for uid in self.uids:
+ if fromaddr == uid[1]:
+ return uid
+
+ logger.debug('No exact match, returning primary UID')
+ return self.uids[0]
+
+ def get_trailer(self, fromaddr):
+ if fromaddr:
+ uid = self.get_matching_uid(fromaddr)
+ else:
+ uid = self.uids[0]
+
+ return 'Attestation-by: %s <%s> (pgp: %s)' % (uid[0], uid[1], self.keyid)
+
+ def __repr__(self):
+ out = list()
+ out.append(' keyid: %s' % self.keyid)
+ for uid in self.uids:
+ out.append(' uid: %s <%s>' % uid)
+ return '\n'.join(out)
+
+
+class LoreAttestationDocument:
+ def __init__(self, source, sigdata):
+ self.source = source
+ self.good = False
+ self.valid = False
+ self.trusted = False
+ self.passing = False
+ self.attestor = None
+ self.hashes = set()
+ self.errors = set()
+
+ gpgargs = ['--verify', '--status-fd=1']
+ config = get_main_config()
+ if config['attestation-trust-model'] == 'tofu':
+ gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
+
+ logger.debug('Validating document obtained from %s', self.source)
+ ecode, output = gpg_run_command(gpgargs, stdin=sigdata.encode('utf-8'))
+ if ecode == 0:
+ # We're looking for both GOODSIG and VALIDSIG
+ gs_matches = re.search(r'^\[GNUPG:\] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
+ if gs_matches:
+ logger.debug(' GOODSIG')
+ self.good = True
+ keyid = gs_matches.groups()[0]
+ self.attestor = LoreAttestor(keyid)
+ puid = '%s <%s>' % self.attestor.get_primary_uid()
+ if re.search(r'^\[GNUPG:\] VALIDSIG', output, re.M):
+ logger.debug(' VALIDSIG')
+ self.valid = True
+ # Do we have a TRUST_(FULLY|ULTIMATE)?
+ matches = re.search(r'^\[GNUPG:\] TRUST_(FULLY|ULTIMATE)', output, re.M)
+ if matches:
+ logger.debug(' TRUST_%s', matches.groups()[0])
+ self.trusted = True
+ else:
+ self.errors.add('Insufficient trust on key: %s (%s)' % (keyid, puid))
+ else:
+ self.errors.add('Signature not valid from key: %s (%s)' % (keyid, puid))
+ else:
+ # Are we missing a key?
+ matches = re.search(r'^\[GNUPG:\] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
+ if matches:
+ self.errors.add('Missing public key: %s' % matches.groups()[0])
+ else:
+ logger.debug('NOGOOD: Signature on %s failed to verify', self.source)
+ return
+
+ if self.good and self.valid and self.trusted:
+ self.passing = True
+
+ hg = [None, None, None]
+ for line in sigdata.split('\n'):
+ # It's a yaml document, but we don't parse it as yaml for safety reasons
+ line = line.rstrip()
+ if re.search(r'^([0-9a-f-]{26}:|-----BEGIN.*)$', line):
+ if None not in hg:
+ self.hashes.add(tuple(hg))
+ hg = [None, None, None]
+ continue
+ matches = re.search(r'^\s+([imp]):\s*([0-9a-f]{64})$', line)
+ if matches:
+ t, v = matches.groups()
+ if t == 'i':
+ hg[0] = v
+ elif t == 'm':
+ hg[1] = v
+ elif t == 'p':
+ hg[2] = v
+
+ def __repr__(self):
+ out = list()
+ out.append(' source: %s' % self.source)
+ out.append(' good: %s' % self.good)
+ out.append(' valid: %s' % self.valid)
+ out.append(' trusted: %s' % self.trusted)
+ if self.attestor is not None:
+ out.append(' attestor: %s' % self.attestor.keyid)
+
+ out.append(' --- validation errors ---')
+ for error in self.errors:
+ out.append(' | %s' % error)
+ out.append(' --- hashes ---')
+ for hg in self.hashes:
+ out.append(' | %s-%s-%s' % (hg[0][:8], hg[1][:8], hg[2][:8]))
+ return '\n'.join(out)
+
+ @staticmethod
+ def get_from_lore(attid):
+ # XXX: Querying this via the Atom feed is a temporary kludge until we have
+ # proper search API on lore.kernel.org
+ config = get_main_config()
+ queryurl = '%s?%s' % (config['attestation-query-url'],
+ urllib.parse.urlencode({'q': attid, 'x': 'A', 'o': '-1'}))
+ logger.debug('Query URL: %s', queryurl)
+ session = get_requests_session()
+ resp = session.get(queryurl)
+ content = resp.content.decode('utf-8')
+ matches = re.findall(
+ r'link\s+href="([^"]+)".*?(-----BEGIN PGP SIGNED MESSAGE-----.*?-----END PGP SIGNATURE-----)',
+ content, flags=re.DOTALL
+ )
+
+ attdocs = list()
+ if matches:
+ for link, sigdata in matches:
+ attdocs.append(LoreAttestationDocument(link, sigdata))
+
+ return attdocs
+
+ @staticmethod
+ def load_from_file(afile):
+ global ATTESTATIONS
+ with open(afile, 'r') as fh:
+ sigdata = fh.read()
+ ATTESTATIONS.append(LoreAttestationDocument(afile, sigdata))
+
+ @staticmethod
+ def load_from_string(source, content):
+ global ATTESTATIONS
+ ATTESTATIONS.append(LoreAttestationDocument(source, content))
+
+
+class LoreAttestation:
+ def __init__(self, i, m, p):
+ self.attid = '%s-%s-%s' % (i[:8], m[:8], p[:8])
+ self.i = i
+ self.m = m
+ self.p = p
+ self.passing = False
+ self.attdocs = list()
+
+ def validate(self, lore_lookup=True):
+ global ATTESTATIONS
+
+ hg = (self.i, self.m, self.p)
+ for attdoc in ATTESTATIONS:
+ if hg in attdoc.hashes and attdoc.passing:
+ self.passing = True
+ self.attdocs.append(attdoc)
+
+ if len(self.attdocs) or not lore_lookup:
+ return
+
+ attdocs = LoreAttestationDocument.get_from_lore(self.attid)
+ ATTESTATIONS += attdocs
+ for attdoc in attdocs:
+ if hg in attdoc.hashes and attdoc.passing:
+ self.passing = True
+ self.attdocs.append(attdoc)
+
+ def __repr__(self):
+ out = list()
+ out.append(' attid: %s' % self.attid)
+ out.append(' i: %s' % self.i)
+ out.append(' m: %s' % self.m)
+ out.append(' p: %s' % self.p)
+ out.append(' --- attdocs ---')
+ for attdoc in self.attdocs:
+ out.append(str(attdoc))
+ return '\n'.join(out)
+
+
+def _run_command(cmdargs, stdin=None, logstderr=False):
+ logger.debug('Running %s' % ' '.join(cmdargs))
+
+ sp = subprocess.Popen(cmdargs,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ (output, error) = sp.communicate(input=stdin)
+
+ output = output.decode('utf-8', errors='replace')
+
+ if logstderr and len(error.strip()):
+ logger.debug('Stderr: %s', error.decode('utf-8', errors='replace'))
+
+ return sp.returncode, output
+
+
+def gpg_run_command(args, stdin=None, logstderr=False):
+ config = get_main_config()
+ cmdargs = [config['gpgbin'], '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb']
+ if config['attestation-gnupghome'] is not None:
+ cmdargs += ['--homedir', config['attestation-gnupghome']]
+ cmdargs += args
+
+ return _run_command(cmdargs, stdin=stdin, logstderr=logstderr)
+
+
+def git_run_command(gitdir, args, stdin=None, logstderr=False):
+ cmdargs = ['git', '--no-pager']
+ if gitdir:
+ cmdargs += ['--git-dir', gitdir]
+ cmdargs += args
+
+ return _run_command(cmdargs, stdin=stdin, logstderr=logstderr)
+
+
+def git_get_command_lines(gitdir, args):
+ ecode, out = git_run_command(gitdir, args)
+ lines = list()
+ if out:
+ for line in out.split('\n'):
+ if line == '':
+ continue
+ lines.append(line)
+
+ return lines
+
+
+def get_config_from_git(regexp, defaults=None):
+ args = ['config', '-z', '--get-regexp', regexp]
+ ecode, out = git_run_command(None, args)
+ gitconfig = defaults
+ if not gitconfig:
+ gitconfig = dict()
+ if not out:
+ return gitconfig
+
+ for line in out.split('\x00'):
+ if not line:
+ continue
+ key, value = line.split('\n', 1)
+ try:
+ chunks = key.split('.')
+ cfgkey = chunks[-1]
+ gitconfig[cfgkey.lower()] = value
+ except ValueError:
+ logger.debug('Ignoring git config entry %s', line)
+
+ return gitconfig
+
+
+def get_main_config():
+ global MAIN_CONFIG
+ if MAIN_CONFIG is None:
+ config = get_config_from_git(r'b4\..*', defaults=DEFAULT_CONFIG)
+ # Legacy name was get-lore-mbox, so load those as well
+ config = get_config_from_git(r'get-lore-mbox\..*', defaults=config)
+ config['trailer-order'] = config['trailer-order'].split(',')
+ if config['gpgbin'] is None:
+ gpgcfg = get_config_from_git(r'gpg\..*', {'program': 'gpg'})
+ config['gpgbin'] = gpgcfg['program']
+ MAIN_CONFIG = config
+ return MAIN_CONFIG
+
+
+def get_user_config():
+ global USER_CONFIG
+ if USER_CONFIG is None:
+ USER_CONFIG = get_config_from_git(r'user\..*')
+ return USER_CONFIG
+
+
+def get_requests_session():
+ global REQSESSION
+ if REQSESSION is None:
+ REQSESSION = requests.session()
+ REQSESSION.headers.update({'User-Agent': 'b4/%s' % VERSION})
+ return REQSESSION
+
+
+def save_strict_thread(in_mbx, out_mbx, msgid):
+ want = {msgid}
+ got = set()
+ seen = set()
+ while True:
+ for msg in in_mbx:
+ c_msgid = LoreMessage.get_clean_msgid(msg)
+ seen.add(c_msgid)
+ if c_msgid in got:
+ continue
+
+ refs = list()
+ for ref in msg.get('References', msg.get('In-Reply-To', '')).split():
+ ref = ref.strip().strip('<>')
+ if ref in got or ref in want:
+ want.add(c_msgid)
+ elif len(ref):
+ refs.append(ref)
+
+ if c_msgid in want:
+ out_mbx.add(msg)
+ got.add(c_msgid)
+ want.update(refs)
+ want.discard(c_msgid)
+ logger.debug('Kept in thread: %s', c_msgid)
+
+ # Remove any entries not in "seen" (missing messages)
+ for c_msgid in set(want):
+ if c_msgid not in seen:
+ want.remove(c_msgid)
+ if not len(want):
+ break
+
+ if not len(out_mbx):
+ return None
+
+ if len(in_mbx) > len(out_mbx):
+ logger.info('Reduced thread to strict matches only (%s->%s)', len(in_mbx), len(out_mbx))