From 8ddcf5c0ee341a1155a86141f44069819c05f68c Mon Sep 17 00:00:00 2001 From: Konstantin Ryabitsev Date: Mon, 7 Dec 2020 13:48:12 -0500 Subject: Try all DKIM headers if failed on the first dkim.verify will only try the topmost DKIM-Signature header, so in case of a failure, pop the failed header and retry with the next one (if any). Signed-off-by: Konstantin Ryabitsev --- b4/__init__.py | 57 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 34 insertions(+), 23 deletions(-) diff --git a/b4/__init__.py b/b4/__init__.py index 28d22a6..1cc4577 100644 --- a/b4/__init__.py +++ b/b4/__init__.py @@ -1690,33 +1690,44 @@ class LoreAttestationSignatureDKIM(LoreAttestationSignature): # self.native_verify() # return - dks = self.msg.get('dkim-signature') - if not dks: - return + while True: + dks = self.msg.get('dkim-signature') + if not dks: + logger.debug('No DKIM-Signature headers in the message') + return - self.present = True + self.present = True - ddata = get_parts_from_header(dks) - self.attestor = LoreAttestorDKIM(ddata['d']) - # Do we have a resolve method? - if _resolver and hasattr(_resolver, 'resolve'): - res = dkim.verify(self.msg.as_bytes(), dnsfunc=dkim_get_txt) - else: - res = dkim.verify(self.msg.as_bytes()) - if not res: - logger.debug('DKIM signature did NOT verify') - return - self.good = True + ddata = get_parts_from_header(dks) + self.attestor = LoreAttestorDKIM(ddata['d']) + # Do we have a resolve method? + if _resolver and hasattr(_resolver, 'resolve'): + res = dkim.verify(self.msg.as_bytes(), dnsfunc=dkim_get_txt) + else: + res = dkim.verify(self.msg.as_bytes()) + if not res: + logger.debug('DKIM signature did NOT verify') + logger.debug('Retrying with the next DKIM-Signature header, if any') + at = 0 + for header in self.msg._headers: # noqa + if header[0].lower() == 'dkim-signature': + del(self.msg._headers[at]) # noqa + break + at += 1 + continue - # Grab toplevel signature that we just verified - self.valid = True - self.trusted = True - self.passing = True + self.good = True - if ddata.get('t'): - self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc) - else: - self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date'])) + # Grab toplevel signature that we just verified + self.valid = True + self.trusted = True + self.passing = True + + if ddata.get('t'): + self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc) + else: + self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date'])) + return # def native_verify(self): # dks = self.msg.get('dkim-signature') -- cgit v1.2.3