aboutsummaryrefslogtreecommitdiff
path: root/b4/__init__.py
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2022-08-31 15:09:47 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2022-08-31 15:09:47 -0400
commit63d5313664c96cdfcee345f5c880f9973a3ee1da (patch)
tree896d310748c1a079552199eb67c4e77ed5bfb6b3 /b4/__init__.py
parente7564108c7e68094f1a6320f4166185a3e29cac1 (diff)
downloadb4-63d5313664c96cdfcee345f5c880f9973a3ee1da.tar.gz
Refactor how we handle trailers
With the addition of b4 trailers it became pretty obvious that the way we originally implemented trailers didn't age well. This refactor does the following: - introduces LoreTrailer class to replace passing trailers as tuples - reimplements trailer-order with strict adherence to chain-of-custody rules - adds tests to most common trailer follow-up/ordering cases Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
Diffstat (limited to 'b4/__init__.py')
-rw-r--r--b4/__init__.py356
1 files changed, 238 insertions, 118 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
index 84ef073..e8e31fe 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -304,14 +304,14 @@ class LoreMailbox:
continue
trailers, mismatches = fmsg.get_trailers(sloppy=sloppytrailers)
- for trailer in mismatches:
- lser.trailer_mismatches.add((trailer[0], trailer[1], fmsg.fromname, fmsg.fromemail))
+ for ltr in mismatches:
+ lser.trailer_mismatches.add((ltr.name, ltr.value, fmsg.fromname, fmsg.fromemail))
lvl = 1
while True:
logger.debug('%sParent: %s', ' ' * lvl, pmsg.full_subject)
logger.debug('%sTrailers:', ' ' * lvl)
- for trailer in trailers:
- logger.debug('%s%s: %s', ' ' * (lvl+1), trailer[0], trailer[1])
+ for ltr in trailers:
+ logger.debug('%s%s: %s', ' ' * (lvl+1), ltr.name, ltr.value)
if pmsg.has_diff and not pmsg.reply:
# We found the patch for these trailers
if pmsg.revision != revision:
@@ -330,9 +330,9 @@ class LoreMailbox:
break
if pmsg.in_reply_to and pmsg.in_reply_to in self.msgid_map:
lvl += 1
- for ptrailer in pmsg.trailers:
- print(ptrailer)
- trailers.append(tuple(ptrailer + (pmsg,)))
+ for pltr in pmsg.trailers:
+ pltr.lmsg = pmsg
+ trailers.append(pltr)
pmsg = self.msgid_map[pmsg.in_reply_to]
continue
break
@@ -557,11 +557,13 @@ class LoreSeries:
raise KeyError('Cherrypick not in series')
if lmsg is not None:
+ extras = list()
if addlink:
- lmsg.followup_trailers.append(('Link', linkmask % lmsg.msgid, None, None))
- if addmysob:
- lmsg.followup_trailers.append(('Signed-off-by',
- '%s <%s>' % (usercfg['name'], usercfg['email']), None, None))
+ if linkmask is None:
+ linkmask = config.get('linkmask')
+ linkval = linkmask % lmsg.msgid
+ lltr = LoreTrailer(name='Link', value=linkval)
+ extras.append(lltr)
if attsame and not attcrit:
if attmark:
@@ -588,7 +590,8 @@ class LoreSeries:
add_trailers = True
if noaddtrailers:
add_trailers = False
- msg = lmsg.get_am_message(add_trailers=add_trailers, copyccs=copyccs, allowbadchars=allowbadchars)
+ msg = lmsg.get_am_message(add_trailers=add_trailers, extras=extras, copyccs=copyccs,
+ addmysob=addmysob, allowbadchars=allowbadchars)
msgs.append(msg)
else:
logger.error(' ERROR: missing [%s/%s]!', at, self.expected)
@@ -832,6 +835,101 @@ class LoreSeries:
logger.critical('Cover: %s', outfile)
+class LoreTrailer:
+ type: str
+ name: str
+ lname: str
+ value: str
+ extinfo: Optional[str] = None
+ addr: Optional[Tuple[str, str]] = None
+ lmsg = None
+ # Small list of recognized utility trailers
+ _utility: Set[str] = {'fixes', 'link', 'buglink', 'obsoleted-by', 'message-id', 'change-id'}
+
+ def __init__(self, name: Optional[str] = None, value: Optional[str] = None, extinfo: Optional[str] = None,
+ msg: Optional[email.message.Message] = None):
+ if name is None:
+ self.name = 'Signed-off-by'
+ ucfg = get_user_config()
+ self.value = '%s <%s>' % (ucfg['name'], ucfg['email'])
+ self.type = 'person'
+ self.addr = (ucfg['name'], ucfg['email'])
+ else:
+ self.name = name
+ self.value = value
+ if name.lower() in self._utility:
+ self.type = 'utility'
+ elif re.search(r'\S+@\S+\.\S+', value):
+ self.type = 'person'
+ self.addr = email.utils.parseaddr(value)
+ else:
+ self.type = 'unknown'
+ self.lname = self.name.lower()
+ self.extinfo = extinfo
+ self.msg = msg
+
+ def as_string(self, omit_extinfo: bool = False) -> str:
+ ret = f'{self.name}: {self.value}'
+ if not self.extinfo or omit_extinfo:
+ return ret
+ # extinfo can be either be [on the next line], or # at the end
+ if self.extinfo.lstrip()[0] == '#':
+ ret += self.extinfo
+ else:
+ ret += f'\n{self.extinfo}'
+
+ return ret
+
+ def email_eq(self, cmp_email: str, fuzzy: bool = True) -> bool:
+ if not self.addr:
+ return False
+ our = self.addr[1].lower()
+ their = cmp_email.lower()
+ if our == their:
+ return True
+ if not fuzzy:
+ return False
+
+ if '@' not in our or '@' not in their:
+ return False
+
+ # Strip extended local parts often added by people, e.g.:
+ # comparing foo@example.com and foo+kernel@example.com should match
+ our = re.sub(r'\+[^@]+@', '@', our)
+ their = re.sub(r'\+[^@]+@', '@', their)
+ if our == their:
+ return True
+
+ # See if domain part of one of the addresses is a subset of the other one,
+ # which should match cases like foo@linux.intel.com and foo@intel.com
+ olocal, odomain = our.split('@', maxsplit=1)
+ tlocal, tdomain = their.split('@', maxsplit=1)
+ if olocal != tlocal:
+ return False
+
+ if (abs(odomain.count('.')-tdomain.count('.')) == 1
+ and (odomain.endswith(f'.{tdomain}') or tdomain.endswith(f'.{odomain}'))):
+ return True
+
+ return False
+
+ def __eq__(self, other):
+ # We never compare extinfo, we just tack it if we find a match
+ return self.lname == other.lname and self.value.lower() == other.value.lower()
+
+ def __hash__(self):
+ return hash(f'{self.lname}: {self.value}')
+
+ def __repr__(self):
+ out = list()
+ out.append(' type: %s' % self.type)
+ out.append(' name: %s' % self.name)
+ out.append(' value: %s' % self.value)
+ out.append(' extinfo: %s' % self.extinfo)
+
+ return '\n'.join(out)
+
+
class LoreMessage:
def __init__(self, msg):
self.msg = msg
@@ -971,41 +1069,32 @@ class LoreMessage:
trailers, others = LoreMessage.find_trailers(self.body, followup=True)
for trailer in trailers:
# These are commonly part of patch/commit metadata
- badtrailers = ('from', 'author', 'cc', 'to')
- if trailer[0].lower() not in badtrailers:
+ badtrailers = {'from', 'author', 'cc', 'to'}
+ if trailer.lname not in badtrailers:
self.trailers.append(trailer)
- def get_trailers(self, sloppy=False):
+ def get_trailers(self, sloppy: bool = False) -> Tuple[List[LoreTrailer], List[LoreTrailer]]:
trailers = list()
- mismatches = set()
+ mismatches = list()
- for tname, tvalue, extdata in self.trailers:
- if sloppy or tname.lower() in ('fixes', 'obsoleted-by'):
- trailers.append((tname, tvalue, extdata, self))
+ for ltr in self.trailers:
+ ltr.lmsg = self
+ if sloppy or ltr.type != 'person':
+ trailers.append(ltr)
continue
- tmatch = False
- namedata = email.utils.getaddresses([tvalue])[0]
- tfrom = re.sub(r'\+[^@]+@', '@', namedata[1].lower())
- hfrom = re.sub(r'\+[^@]+@', '@', self.fromemail.lower())
- tlname = namedata[0].lower()
- hlname = self.fromname.lower()
- tchunks = tfrom.split('@')
- hchunks = hfrom.split('@')
- if tfrom == hfrom:
- logger.debug(' trailer exact email match')
- tmatch = True
- # See if domain part of one of the addresses is a subset of the other one,
- # which should match cases like @linux.intel.com and @intel.com
- elif (len(tchunks) == 2 and len(hchunks) == 2
- and tchunks[0] == hchunks[0]
- and (tchunks[1].find(hchunks[1]) >= 0 or hchunks[1].find(tchunks[1]) >= 0)):
- logger.debug(' trailer fuzzy email match')
- tmatch = True
+ if ltr.email_eq(self.fromemail):
+ logger.debug(' trailer email match')
+ trailers.append(ltr)
+
# Does the name match, at least?
- elif tlname == hlname:
+ nmatch = False
+ tlname = ltr.addr[0].lower()
+ hlname = self.fromname.lower()
+
+ if tlname == hlname:
logger.debug(' trailer exact name match')
- tmatch = True
+ nmatch = True
# Finally, see if the header From has a comma in it and try to find all
# parts in the trailer name
elif hlname.find(',') > 0:
@@ -1014,13 +1103,12 @@ class LoreMessage:
if hlname.find(nchunk.strip()) < 0:
nmatch = False
break
- if nmatch:
- logger.debug(' trailer fuzzy name match')
- tmatch = True
- if tmatch:
- trailers.append((tname, tvalue, extdata, self))
- else:
- mismatches.add((tname, tvalue, extdata, self))
+ if nmatch:
+ logger.debug(' trailer fuzzy name match')
+ trailers.append(ltr)
+ continue
+
+ mismatches.append(ltr)
return trailers, mismatches
@@ -1409,7 +1497,7 @@ class LoreMessage:
return indexes
@staticmethod
- def find_trailers(body: str, followup: bool = False) -> Tuple[List[Tuple], List[str]]:
+ def find_trailers(body: str, followup: bool = False) -> Tuple[List[LoreTrailer], List[str]]:
ignores = {'phone', 'email'}
headers = {'subject', 'date', 'from'}
nonperson = {'fixes', 'subject', 'date', 'link', 'buglink', 'obsoleted-by'}
@@ -1434,43 +1522,51 @@ class LoreMessage:
line = line.strip('\r')
matches = re.search(r'^\s*(\w\S+):\s+(\S.*)', line, flags=re.I)
if matches:
- groups = list(matches.groups())
+ oname, ovalue = list(matches.groups())
# We only accept headers if we haven't seen any non-trailer lines
- tname = groups[0].lower()
- if tname in ignores:
+ lname = oname.lower()
+ if lname in ignores:
logger.debug('Ignoring known non-trailer: %s', line)
continue
- if len(others) and tname in headers:
+ if len(others) and lname in headers:
logger.debug('Ignoring %s (header after other content)', line)
continue
if followup:
- if not tname.isascii():
- logger.debug('Ignoring known non-ascii follow-up trailer: %s', tname)
+ if not lname.isascii():
+ logger.debug('Ignoring known non-ascii follow-up trailer: %s', lname)
continue
- mperson = re.search(r'\S+@\S+\.\S+', groups[1])
- if not mperson and tname not in nonperson:
+ mperson = re.search(r'\S+@\S+\.\S+', ovalue)
+ if not mperson and lname not in nonperson:
logger.debug('Ignoring %s (not a recognized non-person trailer)', line)
continue
+
+ extinfo = None
+ mextinfo = re.search(r'(.*\S+)(\s+#[^#]+)$', ovalue)
+ if mextinfo:
+ logger.debug('Trailer contains hashtag extinfo: ', line)
+ # Found extinfo of the hashtag genre
+ egr = mextinfo.groups()
+ ovalue = egr[0]
+ extinfo = egr[1]
+
was_trailer = True
- groups.append(None)
- trailers.append(groups)
+ ltrailer = LoreTrailer(name=oname, value=ovalue, extinfo=extinfo)
+ trailers.append(ltrailer)
continue
# Is it an extended info line, e.g.:
# Signed-off-by: Foo Foo <foo@foo.com>
# [for the foo bits]
- if len(line) > 2 and line[0] == '[' and line[-1] == ']' and was_trailer:
- trailers[-1][2] = line
+ if len(line) > 2 and was_trailer and re.search(r'^\s*\[[^]]+]\s*$', line):
+ trailers[-1].extinfo = line
was_trailer = False
continue
was_trailer = False
others.append(line)
- # convert to tuples for ease of matching
- ttrailers = [tuple(x) for x in trailers]
- return ttrailers, others
+ return trailers, others
@staticmethod
- def get_body_parts(body):
+ def get_body_parts(body: str) -> Tuple[List[LoreTrailer], str, List[LoreTrailer], str, str]:
# remove any starting/trailing blank lines
body = body.replace('\r', '')
body = body.strip('\n')
@@ -1533,13 +1629,28 @@ class LoreMessage:
return githeaders, message, trailers, basement, signature
- def fix_trailers(self, copyccs=False, signoff=None):
+ def fix_trailers(self, extras: Optional[List[LoreTrailer]] = None,
+ copyccs: bool = False, addmysob: bool = False) -> None:
+
config = get_main_config()
- attpolicy = config['attestation-policy']
bheaders, message, btrailers, basement, signature = LoreMessage.get_body_parts(self.body)
- # Now we add mix-in trailers
- trailers = btrailers + self.followup_trailers
+
+ sobtr = LoreTrailer()
+ hasmysob = False
+ if sobtr in btrailers:
+ # Our own signoff always moves to the bottom of all trailers
+ hasmysob = True
+ btrailers.remove(sobtr)
+
+ new_trailers = self.followup_trailers
+ if extras:
+ new_trailers += extras
+
+ if sobtr in new_trailers:
+ # Our own signoff always moves to the bottom of all trailers
+ new_trailers.remove(sobtr)
+ addmysob = True
if copyccs:
alldests = email.utils.getaddresses([str(x) for x in self.msg.get_all('to', [])])
@@ -1548,66 +1659,77 @@ class LoreMessage:
alldests.sort(key=lambda x: x[1].find('@') > 0 and x[1].split('@')[1] + x[1].split('@')[0] or x[1])
for pair in alldests:
found = False
- for ftr in trailers:
- if ftr[1].lower().find(pair[1].lower()) >= 0:
+ for fltr in btrailers + new_trailers:
+ if fltr.email_eq(pair[1]):
# already present
found = True
break
if not found:
if len(pair[0]):
- trailers.append(('Cc', f'{pair[0]} <{pair[1]}>', None, None)) # noqa
+ altr = LoreTrailer(name='Cc', value=f'{pair[0]} <{pair[1]}>')
else:
- trailers.append(('Cc', pair[1], None, None)) # noqa
-
- fixtrailers = list()
- # If we received a signoff trailer:
- # - if it's already present, we move it to the bottom
- # - if not already present, we add it
- new_signoff = True
- for trailer in trailers:
- if list(trailer[:3]) in fixtrailers:
- # Dupe
- continue
+ altr = LoreTrailer(name='Cc', value=pair[1])
+ new_trailers.append(altr)
+
+ torder = config.get('trailer-order')
+ if torder and torder != '*':
+ # this only applies to trailers within our chain of custody, so walk existing
+ # body trailers backwards and stop at the outermost Signed-off-by we find (if any)
+ for bltr in reversed(btrailers):
+ if bltr.lname == 'signed-off-by':
+ break
+ btrailers.remove(bltr)
+ new_trailers.insert(0, bltr)
+
+ ordered_trailers = list()
+ for glob in [x.strip().lower() for x in torder.split(',')]:
+ if not len(new_trailers):
+ break
+ for ltr in list(new_trailers):
+ if fnmatch.fnmatch(ltr.lname, glob):
+ ordered_trailers.append(ltr)
+ new_trailers.remove(ltr)
+ if len(new_trailers):
+ # Tack them to the bottom
+ ordered_trailers += new_trailers
+ new_trailers = ordered_trailers
- if signoff and tuple(trailer[:3]) == tuple(signoff):
- # Skip it, we'll add it at the bottom
- new_signoff = False
- logger.debug(' . %s: %s', signoff[0], signoff[1])
+ attpolicy = config['attestation-policy']
+ fixtrailers = btrailers
+
+ for ltr in new_trailers:
+ if ltr in fixtrailers:
continue
- fixtrailers.append(list(trailer[:3]))
- if trailer[:3] not in btrailers:
- extra = ''
- if trailer[3] is not None:
- fmsg = trailer[3]
- for attestor in fmsg.attestors: # noqa
- if attestor.passing:
- extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
- elif attpolicy in ('hardfail', 'softfail'):
- extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
- if attpolicy == 'hardfail':
- import sys
- logger.critical('---')
- logger.critical('Exiting due to attestation-policy: hardfail')
- sys.exit(1)
-
- logger.info(' + %s: %s%s', trailer[0], trailer[1], extra)
- else:
- logger.debug(' . %s: %s', trailer[0], trailer[1])
+ fixtrailers.append(ltr)
+ extra = ''
+ if ltr.lmsg is not None:
+ for attestor in ltr.lmsg.attestors:
+ if attestor.passing:
+ extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
+ elif attpolicy in ('hardfail', 'softfail'):
+ extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
+ if attpolicy == 'hardfail':
+ import sys
+ logger.critical('---')
+ logger.critical('Exiting due to attestation-policy: hardfail')
+ sys.exit(1)
+
+ logger.info(' + %s%s', ltr.as_string(omit_extinfo=True), extra)
- if signoff:
+ if addmysob:
# Tack on our signoff at the bottom
- fixtrailers.append(list(signoff))
- if new_signoff:
- logger.info(' + %s: %s', signoff[0], signoff[1])
+ fixtrailers.append(sobtr)
+ if not hasmysob:
+ logger.info(' + %s', sobtr.as_string(omit_extinfo=True))
# Reconstitute the message
self.body = ''
if bheaders:
- for bheader in bheaders:
+ for bltr in bheaders:
# There is no [extdata] in git headers, so we ignore bheader[2]
- self.body += '%s: %s\n' % (bheader[0], bheader[1])
+ self.body += bltr.as_string(omit_extinfo=True) + '\n'
self.body += '\n'
newmessage = ''
@@ -1617,20 +1739,18 @@ class LoreMessage:
newmessage += '\n'
if len(fixtrailers):
- for trailer in fixtrailers:
- newmessage += '%s: %s\n' % (trailer[0], trailer[1])
- if trailer[2]:
- newmessage += '%s\n' % trailer[2]
+ for ltr in fixtrailers:
+ newmessage += ltr.as_string() + '\n'
self.message = self.subject + '\n\n' + newmessage
self.body += newmessage
if len(basement):
self.body += '---\n'
- self.body += basement.rstrip('\r\n') + '\n\n'
+ self.body += basement.rstrip('\r\n') + '\n'
if len(signature):
self.body += '-- \n'
- self.body += signature.rstrip('\r\n') + '\n\n'
+ self.body += signature.rstrip('\r\n') + '\n'
def get_am_subject(self, indicate_reroll=True):
# Return a clean patch subject
@@ -1652,9 +1772,9 @@ class LoreMessage:
return '[%s] %s' % (' '.join(parts), self.lsubject.subject)
- def get_am_message(self, add_trailers=True, copyccs=False, allowbadchars=False):
+ def get_am_message(self, add_trailers=True, addmysob=False, extras=None, copyccs=False, allowbadchars=False):
if add_trailers:
- self.fix_trailers(copyccs=copyccs)
+ self.fix_trailers(copyccs=copyccs, addmysob=addmysob, extras=extras)
bbody = self.body.encode()
# Look through the body to make sure there aren't any suspicious unicode control flow chars
# First, encode into ascii and compare for a quickie utf8 presence test