aboutsummaryrefslogtreecommitdiff
path: root/b4/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'b4/__init__.py')
-rw-r--r--b4/__init__.py53
1 files changed, 50 insertions, 3 deletions
diff --git a/b4/__init__.py b/b4/__init__.py
index 1897699..41100ad 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -1045,6 +1045,32 @@ class LoreMessage:
# No exact domain matches, so return everything we have
self._attestors += seenatts
+ def _trim_body(self) -> None:
+ # Get the length specified in the X-Developer-Signature header
+ xdsh = self.msg.get('X-Developer-Signature')
+ if not xdsh:
+ return
+ matches = re.search(r'\s+l=(\d+)', xdsh)
+ if not matches:
+ return
+ bl = int(matches.groups()[0])
+ with tempfile.TemporaryDirectory() as tfd:
+ m_out = os.path.join(tfd, 'm')
+ p_out = os.path.join(tfd, 'p')
+ cmdargs = ['mailinfo', '--encoding=UTF-8', '--no-scissors', m_out, p_out]
+ ecode, info = git_run_command(None, cmdargs, self.msg.as_bytes())
+ if not len(info.strip()):
+ return
+ with open(m_out, 'rb') as mfh:
+ bm = mfh.read()
+ with open(p_out, 'rb') as pfh:
+ bp = pfh.read()
+ bb = b''
+ for line in re.sub(rb'[\r\n]*$', b'', bm + bp).split(b'\n'):
+ bb += re.sub(rb'[\r\n]*$', b'', line) + b'\r\n'
+ if len(bb) > bl:
+ self.body = bb[:bl].decode()
+
def _load_patatt_attestors(self) -> None:
if not can_patatt:
logger.debug('Message has %s headers, but can_patatt is off', DEVSIG_HDR)
@@ -1066,7 +1092,24 @@ class LoreMessage:
logger.debug('Loading patatt attestations with sources=%s', str(sources))
- attestations = patatt.validate_message(self.msg.as_bytes(), sources)
+ success = False
+ trim_body = False
+ while True:
+ attestations = patatt.validate_message(self.msg.as_bytes(), sources, trim_body=trim_body)
+ # Do we have any successes?
+ for attestation in attestations:
+ if attestation[0] == patatt.RES_VALID:
+ success = True
+ break
+ if success:
+ if trim_body:
+ # set the body to the trimmed version
+ self._trim_body()
+ break
+ if not success and trim_body:
+ break
+ trim_body = True
+
for result, identity, signtime, keysrc, keyalgo, errors in attestations:
if keysrc and keysrc.startswith('(default keyring)/'):
fpr = keysrc.split('/', 1)[1]
@@ -1081,7 +1124,10 @@ class LoreMessage:
parts = email.utils.parseaddr(uids[0])
identity = parts[1]
- signdt = LoreAttestor.parse_ts(signtime)
+ if signtime:
+ signdt = LoreAttestor.parse_ts(signtime)
+ else:
+ signdt = None
attestor = LoreAttestorPatatt(result, identity, signdt, keysrc, keyalgo, errors)
self._attestors.append(attestor)
@@ -1105,7 +1151,8 @@ class LoreMessage:
# This is not critical even in hardfail
continue
elif attpolicy in ('softfail', 'hardfail'):
- checkmark = attestor.checkmark
+ if not checkmark:
+ checkmark = attestor.checkmark
trailers.append('%s BADSIG: %s' % (attestor.checkmark, attestor.trailer))
if attpolicy == 'hardfail':