summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKonstantin Ryabitsev <konstantin@linuxfoundation.org>2021-05-11 14:56:05 -0400
committerKonstantin Ryabitsev <konstantin@linuxfoundation.org>2021-05-11 14:56:05 -0400
commitf1a2700e70018349d5c63f2053ba4b0e7ebe351a (patch)
treed3bea7c91ff7d679cbe9d4614b02f40e09cac94c
parent31348a14afdb1d39e7faf9576eaddea1ced76e19 (diff)
downloadb4-f1a2700e70018349d5c63f2053ba4b0e7ebe351a.tar.gz
Reimplement attestation code one more time
Move end-to-end attestation code into its own library: patatt. See https://git.kernel.org/pub/scm/utils/patatt/patatt.git/about/ It is included into b4 as a submodule, but you will need to init it first: git submodule update --init This change significantly simplifies our attestation code, dropping thousands of lines of rather hairy code. Notably, patatt-style attestation is incompatible with previous attestation implementations done directly in b4, but that's just as well -- we've always marked it as "experimental" and the lack of adoption was proving that we weren't on the right path. Next to come is keyring management and documentation. Signed-off-by: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
-rw-r--r--.gitmodules3
-rw-r--r--README.rst93
-rwxr-xr-xb4.sh2
-rw-r--r--b4/__init__.py1149
-rw-r--r--b4/attest.py154
-rw-r--r--b4/command.py8
-rw-r--r--b4/mbox.py3
-rw-r--r--b4/pr.py29
m---------patatt0
-rw-r--r--requirements.txt8
10 files changed, 456 insertions, 993 deletions
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..58f0214
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule "patatt"]
+ path = patatt
+ url = https://git.kernel.org/pub/scm/utils/patatt/patatt.git
diff --git a/README.rst b/README.rst
index 166eb9a..0fe5222 100644
--- a/README.rst
+++ b/README.rst
@@ -34,52 +34,21 @@ Setting up a symlink should also be possible.
Patch attestation (EXPERIMENTAL)
--------------------------------
-Starting with version 0.6, b4 implements in-header patch attestation,
-following the approach proposed here:
-
-https://git.kernel.org/pub/scm/linux/kernel/git/mricon/patch-attestation-poc.git/tree/README.rst
-
-At this time, only PGP mode is implemented, but further work is expected
-in future versions of b4.
-
-Attesting your own patches
-~~~~~~~~~~~~~~~~~~~~~~~~~~
-Patch attestation is done via message headers and stays out of the way
-of usual code submission and review workflow. At this time, only
-maintainers using b4 to retrieve patches and patch series will benefit
-from patch attestation, but everyone is encouraged to submit
-cryptographic patch attestation with their work anyway, in hopes that it
-becomes a common and widely used procedure.
-
-To start attesting your own patches:
-
-1. Make sure you have b4 version 0.6.0 or above:
- ``b4 --version``
-2. If you don't already have a PGP key, you can follow the following
- guide on how to generate it:
- https://www.kernel.org/doc/html/latest/process/maintainer-pgp-guide.html
-3. It is strongly recommended to use ed25519 as your signing key
- algorithm, as it will result in much smaller signatures, preventing
- unnecessary email header bloat.
-4. Make sure your ``user.email`` and ``user.signingkey`` are set either
- globally, or in the repository you will be using for attestation.
-5. Add the ``sendemail-validate`` hook to each repository you want
- enabled for attestation, with the following single line of content as
- the hook body:
- ``b4 attest $1``.
-
-If you are using b4 from git checkout, you can use a symlink instead::
-
- ln -s path/to/b4/hooks/sendemail-validate-attestation-hook \
- .git/hooks/sendemail-validate
-
-(Note, that there's a second "E" in send*E*mail.)
-
-Next time you run ``git send-email``, b4 will automatically add
-attestation headers to all patches before they go out.
-
-Verifying attestation on received patches
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+B4 implements two attestation verification mechanisms:
+
+- DKIM attestation using the dkimpy library
+- X-Developer-Signature attestation using the patatt library
+
+If you installed from pip, you should have pulled both of these
+dependencies in automatically. Alternatively, you can install dkimpy
+from your OS packaging and then run "git submodule update --init" to
+clone patatt as a submodule of b4.
+
+For attesting your outgoing patches, see patatt documentation.
+https://git.kernel.org/pub/scm/utils/patatt/patatt.git/about/
+
+Showing attestation on received patches
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
There are three attestation verification policies in b4:
- check (default)
@@ -101,38 +70,8 @@ You can set the preferred policy via the git configuration file::
[b4]
attestation-policy = softfail
-Using with mutt
-~~~~~~~~~~~~~~~
-You can show patch attestation data with mutt, using the following
-configuration parameters::
-
- set display_filter="b4 -q attest -m"
- ignore *
- unignore from date subject to cc list-id:
- unignore x-patch-hashes: x-patch-sig:
- unignore attested-by: attestation-failed:
-
-When displaying a message containing in-header PGP attestation
-signatures, mutt will display either the "Attested-By" or the
-"Attestation-Failed" headers, e.g.::
-
- Date: Mon, 23 Nov 2020 13:38:50 -0500
- From: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
- To: mricon@kernel.org
- Subject: [PATCH 3/5] Fix in-header attestation code
- Attested-By: Konstantin Ryabitsev <konstantin@linuxfoundation.org> (pgp: B6C41CE35664996C)
-
-or::
-
- Date: Mon, 23 Nov 2020 13:38:48 -0500
- From: Konstantin Ryabitsev <konstantin@linuxfoundation.org>
- To: mricon@kernel.org
- Subject: [PATCH 1/5] Add not very simple dkim key caching
- Attestation-Failed: signature failed (commit message, patch metadata)
-
-
Support
-------
For support or with any other questions, please email
tools@linux.kernel.org, or browse the list archive at
-https://linux.kernel.org/g/tools.
+https://lore.kernel.org/tools.
diff --git a/b4.sh b/b4.sh
index e1f6c4e..10d33b7 100755
--- a/b4.sh
+++ b/b4.sh
@@ -6,4 +6,4 @@
REAL_SCRIPT=$(realpath -e ${BASH_SOURCE[0]})
SCRIPT_TOP="${SCRIPT_TOP:-$(dirname ${REAL_SCRIPT})}"
-exec env PYTHONPATH="${SCRIPT_TOP}" python3 "${SCRIPT_TOP}/b4/command.py" "${@}"
+exec env PYTHONPATH="${SCRIPT_TOP}:${SCRIPT_TOP}/patatt" python3 "${SCRIPT_TOP}/b4/command.py" "${@}"
diff --git a/b4/__init__.py b/b4/__init__.py
index 32b5c02..6a09a47 100644
--- a/b4/__init__.py
+++ b/b4/__init__.py
@@ -18,25 +18,27 @@ import time
import shutil
import mailbox
import pwd
-import base64
from pathlib import Path
from tempfile import mkstemp, TemporaryDirectory
from contextlib import contextmanager
+from typing import Optional, Tuple, Set
from email import charset
charset.add_charset('utf-8', None)
emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None)
try:
- import dns.resolver
import dkim
+ can_dkim = True
+except ModuleNotFoundError:
+ can_dkim = False
- can_dkim_verify = True
- _resolver = dns.resolver.get_default_resolver()
+try:
+ import patatt
+ can_patatt = True
except ModuleNotFoundError:
- can_dkim_verify = False
- _resolver = None
+ can_patatt = False
__VERSION__ = '0.7.0-dev'
@@ -45,15 +47,12 @@ logger = logging.getLogger('b4')
HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@')
FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)')
-PASS_SIMPLE = '[P]'
-WEAK_SIMPLE = '[D]'
-FAIL_SIMPLE = '[F]'
-PASS_FANCY = '\033[32m\u2714\033[0m'
-WEAK_FANCY = '\033[32m\u2713\033[0m'
-FAIL_FANCY = '\033[31m\u2717\033[0m'
+ATT_PASS_SIMPLE = 'v'
+ATT_FAIL_SIMPLE = 'x'
+ATT_PASS_FANCY = '\033[32m\u2713\033[0m'
+ATT_FAIL_FANCY = '\033[31m\u2717\033[0m'
-HDR_PATCH_HASHES = 'X-Patch-Hashes'
-HDR_PATCH_SIG = 'X-Patch-Sig'
+DEVSIG_HDR = 'X-Developer-Signature'
# You can use bash-style globbing here
WANTHDRS = [
@@ -130,19 +129,19 @@ MAIN_CONFIG = None
# This is git-config user.*
USER_CONFIG = None
-# Used for tracking attestations we have already looked up
-ATTESTATIONS = list()
-# Used for keeping a cache of subkey lookups to minimize shelling out to gpg
-SUBKEY_DATA = dict()
# Used for storing our requests session
REQSESSION = None
# Indicates that we've cleaned cache already
_CACHE_CLEANED = False
-# Used for dkim key lookups
-_DKIM_DNS_CACHE = dict()
class LoreMailbox:
+ msgid_map: dict[str]
+ series: dict
+ covers: dict
+ followups: list
+ unknowns: list
+
def __init__(self):
self.msgid_map = dict()
self.series = dict()
@@ -312,17 +311,18 @@ class LoreMailbox:
for trailer in trailers:
logger.debug('%s%s: %s', ' ' * (lvl+1), trailer[0], trailer[1])
if pmsg.has_diff and not pmsg.reply:
+ # TODO: See below
# We found the patch for these trailers
- if pmsg.revision != revision:
- # add this into our trailer map to carry over trailers from
- # previous revisions to current revision if patch/metadata did
- # not change
- pmsg.load_hashes()
- if pmsg.attestation:
- attid = pmsg.attestation.attid
- if attid not in self.trailer_map:
- self.trailer_map[attid] = list()
- self.trailer_map[attid] += trailers
+ # if pmsg.revision != revision:
+ # # add this into our trailer map to carry over trailers from
+ # # previous revisions to current revision if patch/metadata did
+ # # not change
+ # pmsg.load_hashes()
+ # if pmsg.attestation:
+ # attid = pmsg.attestation.attid
+ # if attid not in self.trailer_map:
+ # self.trailer_map[attid] = list()
+ # self.trailer_map[attid] += trailers
pmsg.followup_trailers += trailers
break
if not pmsg.reply:
@@ -337,12 +337,13 @@ class LoreMailbox:
break
# Carry over trailers from previous series if patch/metadata did not change
- for lmsg in lser.patches:
- if lmsg is None or lmsg.attestation is None:
- continue
- lmsg.load_hashes()
- if lmsg.attestation.attid in self.trailer_map:
- lmsg.followup_trailers += self.trailer_map[lmsg.attestation.attid]
+ # TODO: This needs fixing, because we no longer generate the three hashes
+ # TODO: and patchwork_hash only gives us the hash of the actual patch
+ # for lmsg in lser.patches:
+ # if lmsg is None or lmsg.patchwork_hash is None:
+ # continue
+ # if lmsg.patchwork_hash in self.trailer_map:
+ # lmsg.followup_trailers += self.trailer_map[lmsg.patchwork_hash]
return lser
@@ -496,20 +497,33 @@ class LoreSeries:
logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email')
addmysob = False
- attdata = [(None, None)] * len(self.patches[1:])
attpolicy = config['attestation-policy']
- if config['attestation-checkmarks'] == 'fancy':
- attpass = PASS_FANCY
- attfail = FAIL_FANCY
- attweak = WEAK_FANCY
- else:
- attpass = PASS_SIMPLE
- attfail = FAIL_SIMPLE
- attweak = WEAK_SIMPLE
+ # Loop through all patches and see if attestation is the same for all of them,
+ # since it usually is
+ attref = None
+ attsame = True
+ attmark = None
+ attcrit = False
+ if attpolicy != 'off':
+ logger.info('Checking attestation on all messages, may take a moment...')
+ for lmsg in self.patches[1:]:
+ if lmsg is None:
+ attsame = False
+ break
+
+ checkmark, trailers, attcrit = lmsg.get_attestation_trailers(attpolicy)
+ if attref is None:
+ attref = trailers
+ attmark = checkmark
+ continue
+ if set(trailers) == set(attref):
+ continue
+ attsame = False
+ logger.debug('Attestation info is not the same')
+ break
at = 1
- atterrors = list()
for lmsg in self.patches[1:]:
if cherrypick is not None:
if at not in cherrypick:
@@ -529,35 +543,23 @@ class LoreSeries:
if addlink:
lmsg.followup_trailers.append(('Link', linkmask % lmsg.msgid, None, None))
- if attpolicy != 'off':
- lmsg.load_hashes()
- latt = lmsg.attestation
- if latt and latt.validate(lmsg.msg):
- if latt.lsig.attestor and latt.lsig.attestor.mode == 'domain':
- logger.info(' %s %s', attweak, lmsg.full_subject)
- attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attweak) # noqa
- else:
- logger.info(' %s %s', attpass, lmsg.full_subject)
- attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attpass) # noqa
+ if attsame and not attcrit:
+ if attmark:
+ logger.info(' %s %s', attmark, lmsg.full_subject)
else:
- if latt and latt.lsig and attpolicy in ('softfail', 'hardfail'):
- logger.info(' %s %s', attfail, lmsg.full_subject)
- if latt and latt.lsig and latt.lsig.attestor and latt.lsig.attestor.mode == 'domain':
- atterrors.append('Failed %s attestation' % latt.lsig.attestor.get_trailer())
- elif latt and latt.lsig and latt.lsig.attestor:
- failed = list()
- if not latt.pv:
- failed.append('patch content')
- if not latt.mv:
- failed.append('commit message')
- if not latt.iv:
- failed.append('patch metadata')
- atterrors.append('Patch %s/%s failed attestation (%s)' % (at, lmsg.expected,
- ', '.join(failed)))
- else:
- logger.info(' %s', lmsg.full_subject)
+ logger.info(' %s', lmsg.full_subject)
+
else:
- logger.info(' %s', lmsg.full_subject)
+ checkmark, trailers, critical = lmsg.get_attestation_trailers(attpolicy)
+ logger.info(' %s %s', checkmark, lmsg.full_subject)
+ for trailer in trailers:
+ logger.info(' %s', trailer)
+
+ if critical:
+ import sys
+ logger.critical('---')
+ logger.critical('Exiting due to attestation-policy: hardfail')
+ sys.exit(128)
add_trailers = True
if noaddtrailers:
@@ -572,29 +574,17 @@ class LoreSeries:
if attpolicy == 'off':
return mbx
- failed = (None, None) in attdata
- if not failed:
+ if attsame and attref:
logger.info(' ---')
- for trailer, attmode in set(attdata):
- logger.info(' %s Attestation-by: %s', attmode, trailer)
- return mbx
- elif not can_dkim_verify and config.get('attestation-check-dkim') == 'yes':
- logger.info(' ---')
- logger.info(' NOTE: install dkimpy for DKIM signature verification')
-
- errors = set(atterrors)
- for attdoc in ATTESTATIONS:
- errors.update(attdoc.errors)
-
- if errors:
- logger.critical(' ---')
- logger.critical(' Attestation is available, but did not succeed:')
- for error in errors:
- logger.critical(' %s %s', attfail, error)
+ for trailer in attref:
+ logger.info(' %s', trailer)
- if attpolicy == 'hardfail':
- import sys
- sys.exit(128)
+ if not (can_dkim and can_patatt):
+ logger.info(' ---')
+ if not can_dkim:
+ logger.info(' NOTE: install dkimpy for DKIM signature verification')
+ if not can_patatt:
+ logger.info(' NOTE: install patatt for end-to-end signature verification')
return mbx
@@ -691,7 +681,6 @@ class LoreSeries:
logger.critical('ERROR: v%s series incomplete; unable to create a fake-am range', self.revision)
return None, None
logger.debug('Looking at %s', lmsg.full_subject)
- lmsg.load_hashes()
if not lmsg.blob_indexes:
logger.critical('ERROR: some patches do not have indexes')
logger.critical(' unable to create a fake-am range')
@@ -795,11 +784,8 @@ class LoreMessage:
self.pr_tip_commit = None
self.pr_remote_tip_commit = None
- self.attestation = None
# Patchwork hash
self.pwhash = None
- # Git patch-id
- self.git_patch_id = None
# Blob indexes
self.blob_indexes = None
@@ -815,6 +801,9 @@ class LoreMessage:
self.revision_inferred = self.lsubject.revision_inferred
self.counters_inferred = self.lsubject.counters_inferred
+ # Loaded when attestors property is called
+ self._attestors = None
+
# Handle [PATCH 6/5]
if self.counter > self.expected:
self.expected = self.counter
@@ -884,6 +873,8 @@ class LoreMessage:
self.has_diffstat = True
if diffre.search(self.body):
self.has_diff = True
+ self.pwhash = LoreMessage.get_patchwork_hash(self.body)
+ self.blob_indexes = LoreMessage.get_indexes(self.body)
# We only pay attention to trailers that are sent in reply
if self.reply:
@@ -945,6 +936,124 @@ class LoreMessage:
return trailers, mismatches
+ @property
+ def attestors(self):
+ if self._attestors is not None:
+ return self._attestors
+
+ self._attestors = list()
+
+ config = get_main_config()
+ if config['attestation-policy'] == 'off':
+ return self._attestors
+
+ if self.msg.get(DEVSIG_HDR):
+ self._load_patatt_attestors()
+ if self.msg.get('dkim-signature'):
+ self._load_dkim_attestors()
+
+ return self._attestors
+
+ def _load_dkim_attestors(self) -> None:
+ if not can_dkim:
+ logger.debug('Message has DKIM signatures, but can_dkim is off')
+ return
+
+ # Yank out all DKIM-Signature headers and try them in reverse order
+ # until we come to a passing one
+ dkhdrs = list()
+ for header in list(self.msg._headers): # noqa
+ if header[0].lower() == 'dkim-signature':
+ dkhdrs.append(header)
+ self.msg._headers.remove(header) # noqa
+ dkhdrs.reverse()
+
+ seenatts = list()
+ for hn, hval in dkhdrs:
+ errors = list()
+ hdata = LoreMessage.get_parts_from_header(hval)
+ logger.debug('Loading DKIM attestation for d=%s, s=%s', hdata['d'], hdata['s'])
+
+ signtime = hdata.get('t')
+ identity = hdata.get('i')
+ if not identity:
+ identity = hdata.get('d')
+
+ self.msg._headers.append((hn, hval)) # noqa
+ res = dkim.verify(self.msg.as_bytes())
+
+ attestor = LoreAttestorDKIM(res, identity, signtime, errors)
+ logger.debug('DKIM verify results: %s=%s', identity, res)
+ if attestor.check_identity(self.fromemail):
+ # use this one, regardless of any other DKIM signatures
+ self._attestors.append(attestor)
+ return
+
+ self.msg._headers.pop(-1) # noqa
+ seenatts.append(attestor)
+
+ # No exact domain matches, so return everything we have
+ self._attestors += seenatts
+
+ def _load_patatt_attestors(self) -> None:
+ if not can_patatt:
+ logger.debug('Message has %s headers, but can_patatt is off', DEVSIG_HDR)
+ return
+
+ # load our key sources if necessary
+ ddir = get_data_dir()
+ pdir = os.path.join(ddir, 'keyring')
+ config = get_main_config()
+ sources = config.get('keyringsrc')
+ if not sources:
+ sources = ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:']
+ if pdir not in sources:
+ sources.append(pdir)
+
+ # Push our logger and GPGBIN into patatt
+ patatt.logger = logger
+ patatt.GPGBIN = config['gpgbin']
+
+ logger.debug('Loading patatt attestations with sources=%s', str(sources))
+
+ attestations = patatt.validate_message(self.msg.as_bytes(), sources)
+ for passing, identity, signtime, keysrc, keyalgo, errors in attestations:
+ attestor = LoreAttestorPatatt(passing, identity, signtime, keysrc, keyalgo, errors)
+ self._attestors.append(attestor)
+
+ def get_attestation_trailers(self, attpolicy: str) -> Tuple[str, list, bool]:
+ trailers = list()
+ checkmark = None
+ critical = False
+ for attestor in self.attestors:
+ if not attestor.passing:
+ # Is it a person-trailer for which we have a key?
+ if attestor.level == 'person':
+ if attestor.have_key:
+ # This was signed, and we have a key, but it's failing
+ trailers.append('%s BADSIG: %s' % (attestor.checkmark, attestor.trailer))
+ checkmark = attestor.checkmark
+ elif attpolicy in ('softfail', 'hardfail'):
+ trailers.append('%s No key: %s' % (attestor.checkmark, attestor.trailer))
+ # This is not critical even in hardfail
+ continue
+ elif attpolicy in ('softfail', 'hardfail'):
+ checkmark = attestor.checkmark
+ trailers.append('%s BADSIG: %s' % (attestor.checkmark, attestor.trailer))
+
+ if attpolicy == 'hardfail':
+ critical = True
+ else:
+ if not checkmark:
+ checkmark = attestor.checkmark
+ if attestor.check_identity(self.fromemail):
+ trailers.append('%s Signed: %s' % (attestor.checkmark, attestor.trailer))
+ else:
+ trailers.append('%s Signed: %s (From: %s)' % (attestor.checkmark, attestor.trailer,
+ self.fromemail))
+
+ return checkmark, trailers, critical
+
def __repr__(self):
out = list()
out.append('msgid: %s' % self.msgid)
@@ -971,6 +1080,10 @@ class LoreMessage:
for trailer in self.followup_trailers:
out.append(' |%s' % str(trailer))
out.append(' --- end trailers ---')
+ out.append(' --- begin attestors ---')
+ for attestor in self.attestors:
+ out.append(' |%s' % str(attestor))
+ out.append(' --- end attestors ---')
return '\n'.join(out)
@@ -994,6 +1107,17 @@ class LoreMessage:
return new_hdrval.strip()
@staticmethod
+ def get_parts_from_header(hstr: str) -> dict:
+ hstr = re.sub(r'\s*', '', hstr)
+ hdata = dict()
+ for chunk in hstr.split(';'):
+ parts = chunk.split('=', 1)
+ if len(parts) < 2:
+ continue
+ hdata[parts[0]] = parts[1]
+ return hdata
+
+ @staticmethod
def get_clean_msgid(msg, header='Message-Id'):
msgid = None
raw = msg.get(header)
@@ -1004,9 +1128,7 @@ class LoreMessage:
return msgid
@staticmethod
- def get_patchwork_hash(diff):
- # Make sure we just have the diff without any extraneous content.
- diff = LoreMessage.get_clean_diff(diff)
+ def get_patchwork_hash(diff: str) -> str:
"""Generate a hash from a diff. Lifted verbatim from patchwork."""
prefixes = ['-', '+', ' ']
@@ -1049,7 +1171,7 @@ class LoreMessage:
return hashed.hexdigest()
@staticmethod
- def get_indexes(diff):
+ def get_indexes(diff: str) -> Set[tuple]:
indexes = set()
curfile = None
for line in diff.split('\n'):
@@ -1065,135 +1187,6 @@ class LoreMessage:
return indexes
@staticmethod
- def get_clean_diff(diff):
- diff = diff.replace('\r', '')
-
- # For keeping a buffer of lines preceding @@ ... @@
- buflines = list()
- difflines = ''
-
- # Used for counting where we are in the patch
- pp = mm = 0
- inside_binary_chunk = False
- for line in diff.split('\n'):
- if not len(line):
- if inside_binary_chunk:
- inside_binary_chunk = False
- # add all buflines to difflines
- difflines += '\n'.join(buflines) + '\n\n'
- buflines = list()
- continue
- buflines.append(line)
- continue
- elif inside_binary_chunk:
- buflines.append(line)
- continue
- # If line starts with 'index ' and previous line starts with 'deleted ', then
- # it's a file delete and therefore doesn't have a regular hunk.
- if line.find('index ') == 0 and len(buflines) > 1 and buflines[-1].find('deleted ') == 0:
- # add this and 2 preceding lines to difflines and reset buflines
- buflines.append(line)
- difflines += '\n'.join(buflines[-3:]) + '\n'
- buflines = list()
- continue
- if line.find('delta ') == 0 or line.find('literal ') == 0:
- # we are inside a binary patch
- inside_binary_chunk = True
- buflines.append(line)
- continue
- hunk_match = HUNK_RE.match(line)
- if hunk_match:
- # logger.debug('Crunching %s', line)
- mlines, plines = hunk_match.groups()
- try:
- pp = int(plines)
- except TypeError:
- pp = 1
- try:
- mm = int(mlines)
- except TypeError:
- mm = 1
- addlines = list()
- for bline in reversed(buflines):
- # Go backward and add lines until we get to the start
- # or encounter a blank line
- if len(bline.strip()) == 0:
- break
- addlines.append(bline)
- if addlines:
- difflines += '\n'.join(reversed(addlines)) + '\n'
- buflines = list()
- # Feed this line to the hasher
- difflines += line + '\n'
- continue
- if pp > 0 or mm > 0:
- # Inside the patch
- difflines += line + '\n'
- if line[0] in (' ', '-'):
- mm -= 1
- if line[0] in (' ', '+'):
- pp -= 1
- continue
- # Not anything we recognize, so stick into buflines
- buflines.append(line)
- return difflines
-
- def load_hashes(self):
- if self.attestation is not None:
- return
- logger.debug('Calculating hashes for: %s', self.full_subject)
- # Calculate git-patch-id first
- cmdargs = ['patch-id', '--stable']
- msg = self.get_am_message(add_trailers=False)
- stdin = msg.as_string(policy=emlpolicy).encode()
- ecode, out = git_run_command(None, cmdargs, stdin)
- if ecode > 0:
- # Git doesn't think there's a patch there
- return
- fline = out.split('\n')[0]
- if len(fline) >= 40:
- self.git_patch_id = fline[:40]
-
- msg_out = mkstemp()
- patch_out = mkstemp()
- cmdargs = ['mailinfo', '--encoding=UTF-8', msg_out[1], patch_out[1]]
- ecode, info = git_run_command(None, cmdargs, stdin)
- if ecode > 0:
- logger.debug('ERROR: Could not get mailinfo')
- return
- i = hashlib.sha256()
- m = hashlib.sha256()
- p = hashlib.sha256()
-
- for line in info.split('\n'):
- # We don't use the "Date:" field because it is likely to be
- # mangled between when git-format-patch generates it and
- # when it is sent out by git-send-email (or other tools).
- if re.search(r'^(Author|Email|Subject):', line):
- i.update((line + '\n').encode())
-
- with open(msg_out[1], 'rb') as mfh:
- msg = mfh.read()
- m.update(msg)
- os.unlink(msg_out[1])
-
- with open(patch_out[1], 'rb') as pfh:
- patch = pfh.read().decode(self.charset, errors='replace')
- if len(patch.strip()):
- diff = LoreMessage.get_clean_diff(patch)
- p.update(diff.encode())
- self.pwhash = LoreMessage.get_patchwork_hash(patch)
- # Load the indexes, if we have them
- self.blob_indexes = LoreMessage.get_indexes(diff)
- else:
- p = None
-
- os.unlink(patch_out[1])
-
- if i and m and p:
- self.attestation = LoreAttestation(i, m, p)
-
- @staticmethod
def find_trailers(body, followup=False):
headers = ('subject', 'date', 'from')
nonperson = ('fixes', 'subject', 'date', 'link', 'buglink')
@@ -1311,13 +1304,6 @@ class LoreMessage:
config = get_main_config()
attpolicy = config['attestation-policy']
- if config['attestation-checkmarks'] == 'fancy':
- attfail = FAIL_FANCY
- attweak = WEAK_FANCY
- else:
- attfail = FAIL_SIMPLE
- attweak = WEAK_SIMPLE
-
bheaders, message, btrailers, basement, signature = LoreMessage.get_body_parts(self.body)
# Now we add mix-in trailers
trailers = btrailers + self.followup_trailers
@@ -1346,6 +1332,7 @@ class LoreMessage:
trailer_order = DEFAULT_TRAILER_ORDER
elif trailer_order in ('preserve', '_preserve_'):
trailer_order = '*'
+
for trailermatch in trailer_order:
for trailer in trailers:
if list(trailer[:3]) in fixtrailers:
@@ -1355,15 +1342,19 @@ class LoreMessage:
fixtrailers.append(list(trailer[:3]))
if trailer[:3] not in btrailers:
extra = ''
- if can_dkim_verify and config.get('attestation-check-dkim') == 'yes' and attpolicy != 'off':
- if len(trailer) > 3 and trailer[3] is not None:
- fmsg = trailer[3]
- attsig = LoreAttestationSignatureDKIM(fmsg.msg) # noqa
- if attsig.present:
- if attsig.passing:
- extra = ' (%s %s)' % (attweak, attsig.attestor.get_trailer())
- elif attpolicy in ('softfail', 'hardfail'):
- extra = ' (%s %s)' % (attfail, attsig.attestor.get_trailer())
+ if len(trailer) > 3 and trailer[3] is not None:
+ fmsg = trailer[3]
+ for attestor in fmsg.attestors: # noqa
+ if attestor.passing:
+ extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
+ elif attpolicy in ('hardfail', 'softfail'):
+ extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer)
+ if attpolicy == 'hardfail':
+ import sys
+ logger.critical('---')
+ logger.critical('Exiting due to attestation-policy: hardfail')
+ sys.exit(1)
+
logger.info(' + %s: %s%s', trailer[0], trailer[1], extra)
else:
logger.debug(' . %s: %s', trailer[0], trailer[1])
@@ -1507,458 +1498,134 @@ class LoreSubject:
class LoreAttestor:
- def __init__(self, keyid):
- self.keyid = keyid
- self.uids = list()
-
- def __repr__(self):
- out = list()
- out.append(' keyid: %s' % self.keyid)
- for uid in self.uids:
- out.append(' uid: %s <%s>' % uid)
- return '\n'.join(out)
-
-
-class LoreAttestorDKIM(LoreAttestor):
- def __init__(self, keyid):
- self.mode = 'domain'
- super().__init__(keyid)
-
- def get_trailer(self, fromaddr=None): # noqa
- if fromaddr:
- return 'DKIM/%s (From: %s)' % (self.keyid, fromaddr)
- return 'DKIM/%s' % self.keyid
-
-
-class LoreAttestorPGP(LoreAttestor):
- def __init__(self, keyid):
- super().__init__(keyid)
- self.mode = 'person'
- self.load_subkey_uids()
-
- def load_subkey_uids(self):
- global SUBKEY_DATA
- if self.keyid not in SUBKEY_DATA:
- gpgargs = ['--with-colons', '--list-keys', self.keyid]
- ecode, out, err = gpg_run_command(gpgargs)
- if ecode > 0:
- logger.critical('ERROR: Unable to get UIDs list matching key %s', self.keyid)
- return
-
- keyinfo = out.decode()
-
- uids = list()
- for line in keyinfo.split('\n'):
- if line[:4] != 'uid:':
- continue
- chunks = line.split(':')
- if chunks[1] in ('r',):
- # Revoked UID, ignore
- continue
- uids.append(chunks[9])
- SUBKEY_DATA[self.keyid] = email.utils.getaddresses(uids)
-
- self.uids = SUBKEY_DATA[self.keyid]
-
- def get_primary_uid(self):
- return self.uids[0]
-
- def get_matching_uid(self, fromaddr):
- for uid in self.uids:
- if fromaddr == uid[1]:
- return uid
-
- logger.debug('No exact match, returning primary UID')
- return self.uids[0]
-
- def get_trailer(self, fromaddr):
- if fromaddr:
- uid = self.get_matching_uid(fromaddr)
- else:
- uid = self.uids[0]
-
- return '%s <%s> (pgp: %s)' % (uid[0], uid[1], self.keyid)
-
-
-class LoreAttestationSignature:
- def __init__(self, msg):
- self.msg = msg
+ mode: Optional[str]
+ level: Optional[str]
+ identity: Optional[str]
+ signtime: Optional[str]
+ keysrc: Optional[str]
+ keyalgo: Optional[str]
+ passing: bool
+ have_key: bool
+ errors: list
+
+ def __init__(self) -> None:
self.mode = None
- self.present = False
- self.good = False
- self.valid = False
- self.trusted = False
+ self.level = None
+ self.identity = None
+ self.signtime = None
+ self.keysrc = None
+ self.keyalgo = None
self.passing = False
- self.sigdate = None
- self.attestor = None
- self.errors = set()
+ self.have_key = False
+ self.errors = list()
+ @property
+ def checkmark(self) -> str:
config = get_main_config()
- try:
- driftd = int(config['attestation-staleness-days'])
- except ValueError:
- driftd = 30
-
- self.maxdrift = datetime.timedelta(days=driftd)
-
- def verify_time_drift(self) -> None:
- msgdt = email.utils.parsedate_to_datetime(str(self.msg['Date']))
- sdrift = self.sigdate - msgdt
- if sdrift > self.maxdrift:
- self.passing = False
- self.errors.add('Time drift between Date and t too great (%s)' % sdrift)
- return
- logger.debug('PASS : time drift between Date and t (%s)', sdrift)
-
- def verify_identity_domain(self, identity: str, domain: str):
- # Domain is supposed to be present in identity
- if not identity.endswith(domain):
- logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity)
- self.passing = False
- return
- fromeml = email.utils.getaddresses(self.msg.get_all('from', []))[0][1]
- if identity.find('@') < 0:
- logger.debug('identity must contain @ (i=%s)', identity)
- self.passing = False
- return
- ilocal, idomain = identity.split('@')
- # identity is supposed to be present in from
- if not fromeml.endswith(f'@{idomain}'):
- self.errors.add('identity (i=%s) does not match from (from=%s)' % (identity, fromeml))
- self.passing = False
- return
- logger.debug('identity and domain match From header')
-
- # @staticmethod
- # def get_dkim_key(domain: str, selector: str, timeout: int = 5) -> Tuple[str, str]:
- # global DNSCACHE
- # if (domain, selector) in DNSCACHE:
- # return DNSCACHE[(domain, selector)]
- #
- # name = f'{selector}._domainkey.{domain}.'
- # logger.debug('DNS-lookup: %s', name)
- # keydata = None
- # try:
- # a = dns.resolver.resolve(name, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout) # noqa
- # # Find v=DKIM1
- # for r in a.response.answer:
- # if r.rdtype == dns.rdatatype.TXT:
- # for item in r.items:
- # # Concatenate all strings
- # txtdata = b''.join(item.strings)
- # if txtdata.find(b'v=DKIM1') >= 0:
- # keydata = txtdata.decode()
- # break
- # if keydata:
- # break
- # except dns.resolver.NXDOMAIN: # noqa
- # raise LookupError('Domain %s does not exist', name)
- #
- # if not keydata:
- # raise LookupError('Domain %s does not contain a DKIM record', name)
- #
- # parts = get_parts_from_header(keydata)
- # if 'p' not in parts:
- # raise LookupError('Domain %s does not contain a DKIM key', name)
- # if 'k' not in parts:
- # raise LookupError('Domain %s does not indicate key time', name)
- #
- # DNSCACHE[(domain, selector)] = (parts['k'], parts['p'])
- # logger.debug('k=%s, p=%s', parts['k'], parts['p'])
- # return parts['k'], parts['p']
-
- def __repr__(self):
- out = list()
- out.append(' mode: %s' % self.mode)
- out.append('present: %s' % self.present)
- out.append(' good: %s' % self.good)
- out.append(' valid: %s' % self.valid)
- out.append('trusted: %s' % self.trusted)
- if self.attestor is not None:
- out.append(' attestor: %s' % self.attestor.keyid)
-
- out.append(' --- validation errors ---')
- for error in self.errors:
- out.append(' | %s' % error)
- return '\n'.join(out)
-
-
-class LoreAttestationSignatureDKIM(LoreAttestationSignature):
- def __init__(self, msg):
- super().__init__(msg)
- self.mode = 'dkim'
- # Doesn't quite work right, so just use dkimpy's native
- # self.native_verify()
- # return
-
- ejected = set()
- while True:
- dks = self.msg.get('dkim-signature')
- if not dks:
- logger.debug('No DKIM-Signature headers in the message')
- return
-
- self.present = True
-
- ddata = get_parts_from_header(dks)
- self.attestor = LoreAttestorDKIM(ddata['d'])
- # Do we have a resolve method?
- if _resolver and hasattr(_resolver, 'resolve'):
- res = dkim.verify(self.msg.as_bytes(), dnsfunc=dkim_get_txt)
- else:
- res = dkim.verify(self.msg.as_bytes())
- if not res:
- # is list-archive or archived-at part of h=?
- hline = ddata.get('h')
- if hline:
- hsigned = set(hline.lower().split(':'))
- if 'list-archive' in hsigned or 'archived-at' in hsigned:
- # Public-inbox inserts additional List-Archive and Archived-At headers,
- # which breaks DKIM signatures if these headers are included in the hash.
- # Eject the ones created by public-inbox and try again.
- # XXX: This may no longer be necessary at some point if public-inbox takes care
- # of this scenario automatically:
- # https://public-inbox.org/meta/20201210202145.7agtcmrtl5jec42d@chatter.i7.local
- logger.debug('Ejecting extra List-Archive headers and retrying')
- changed = False
- for header in reversed(self.msg._headers): # noqa
- hl = header[0].lower()
- if hl in ('list-archive', 'archived-at') and hl not in ejected:
- self.msg._headers.remove(header) # noqa
- ejected.add(hl)
- changed = True
- break
- if changed:
- # go for another round
- continue
-
- logger.debug('DKIM signature did NOT verify')
- logger.debug('Retrying with the next DKIM-Signature header, if any')
- at = 0
- for header in self.msg._headers: # noqa
- if header[0].lower() == 'dkim-signature':
- del(self.msg._headers[at]) # noqa
- break
- at += 1
- continue
+ if config['attestation-checkmarks'] == 'fancy':
+ if self.passing:
+ return ATT_PASS_FANCY
+ return ATT_FAIL_FANCY
+ if self.passing:
+ return ATT_PASS_SIMPLE
+ return ATT_FAIL_SIMPLE
- self.good = True
+ @property
+ def trailer(self):
+ if self.keyalgo:
+ mode = self.keyalgo
+ else:
+ mode = self.mode
- # Grab toplevel signature that we just verified
- self.valid = True
- self.trusted = True
- self.passing = True
+ return '%s/%s' % (mode, self.identity)
- if ddata.get('t'):
- self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc)
- else:
- self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date']))
- return
+ def check_time_drift(self, emldate, maxdays: int = 7) -> bool:
+ if not self.passing or self.signtime is None:
+ return False
- # def native_verify(self):
- # dks = self.msg.get('dkim-signature')
- # ddata = get_parts_from_header(dks)
- # try:
- # kt, kp = LoreAttestationSignature.get_dkim_key(ddata['d'], ddata['s'])
- # if kt not in ('rsa',): # 'ed25519'):
- # logger.debug('DKIM key type %s not supported', kt)
- # return
- # pk = base64.b64decode(kp)
- # sig = base64.b64decode(ddata['b'])
- # except (LookupError, binascii.Error) as ex:
- # logger.debug('Unable to look up DKIM key: %s', ex)
- # return
- #
- # headers = list()
- #
- # for header in ddata['h'].split(':'):
- # # For the POC, we assume 'relaxed/'
- # hval = self.msg.get(header)
- # if hval is None:
- # # Missing headers are omitted by the DKIM RFC
- # continue
- # if ddata['c'].startswith('relaxed/'):
- # hname, hval = dkim_canonicalize_header(header, str(self.msg.get(header)))
- # else:
- # hname = header
- # hval = str(self.msg.get(header))
- # headers.append(f'{hname}:{hval}')
- #
- # # Now we add the dkim-signature header itself, without b= content
- # if ddata['c'].startswith('relaxed/'):
- # dname, dval = dkim_canonicalize_header('dkim-signature', dks)
- # else:
- # dname = 'DKIM-Signature'
- # dval = dks
- #
- # dval = dval.rsplit('; b=')[0] + '; b='
- # headers.append(f'{dname}:{dval}')
- # payload = ('\r\n'.join(headers)).encode()
- # key = RSA.import_key(pk)
- # hashed = SHA256.new(payload)
- # try:
- # # noinspection PyTypeChecker
- # pkcs1_15.new(key).verify(hashed, sig)
- # except (ValueError, TypeError):
- # logger.debug('DKIM signature did not verify')
- # self.errors.add('The DKIM signature did NOT verify!')
- # return
- #
- # self.good = True
- # if not ddata.get('i'):
- # ddata['i'] = '@' + ddata['d']
- #
- # logger.debug('PASS : DKIM signature for d=%s, s=%s', ddata['d'], ddata['s'])
- #
- # self.attestor = LoreAttestorDKIM(ddata['d'])
- # self.valid = True
- # self.trusted = True
- # self.passing = True
- #
- # self.verify_identity_domain(ddata['i'], ddata['d'])
- # if ddata.get('t'):
- # self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc)
- # self.verify_time_drift()
- # else:
- # self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date']))
-
-
-class LoreAttestationSignaturePGP(LoreAttestationSignature):
- def __init__(self, msg):
- super().__init__(msg)
- self.mode = 'pgp'
+ try:
+ sigdate = datetime.datetime.utcfromtimestamp(int(self.signtime))
+ except: # noqa
+ self.errors.append('failed parsing signature date: %s' % self.signtime)
+ return False
- shdr = msg.get(HDR_PATCH_SIG)
- if not shdr:
- return
+ maxdrift = datetime.timedelta(days=maxdays)
- self.present = True
- sdata = get_parts_from_header(shdr)
- hhdr = msg.get(HDR_PATCH_HASHES)
- sig = base64.b64decode(sdata['b'])
- headers = list()
- hhname, hhval = dkim_canonicalize_header(HDR_PATCH_HASHES, str(hhdr))
- headers.append(f'{hhname}:{hhval}')
- # Now we add the sig header itself, without b= content
- shname, shval = dkim_canonicalize_header(HDR_PATCH_SIG, shdr)
- shval = shval.rsplit('; b=')[0] + '; b='
- headers.append(f'{shname}:{shval}')
- payload = ('\r\n'.join(headers)).encode()
- savefile = mkstemp('in-header-pgp-verify')[1]
- with open(savefile, 'wb') as fh:
- fh.write(sig)
-
- gpgargs = list()
- config = get_main_config()
- trustmodel = config.get('attestation-trust-model', 'tofu')
- if trustmodel == 'tofu':
- gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good']
- gpgargs += ['--verify', '--status-fd=1', savefile, '-']
- ecode, out, err = gpg_run_command(gpgargs, stdin=payload)
- os.unlink(savefile)
- output = out.decode()
-
- self.good, self.valid, self.trusted, self.attestor, self.sigdate, self.errors = \
- validate_gpg_signature(output, trustmodel)
-
- if self.good and self.valid and self.trusted:
- self.passing = True
- self.verify_time_drift()
- # XXX: Need to verify identity domain
+ sdrift = sigdate - emldate
+ if sdrift > maxdrift:
+ self.errors.append('Time drift between Date and t too great (%s)' % sdrift)
+ return False
+ logger.debug('PASS : time drift between Date and t (%s)', sdrift)
+ return True
-class LoreAttestation:
- def __init__(self, _i, _m, _p):
- self.i = _i.hexdigest()
- self.m = _m.hexdigest()
- self.p = _p.hexdigest()
- self.ib = base64.b64encode(_i.digest()).decode()
- self.mb = base64.b64encode(_m.digest()).decode()
- self.pb = base64.b64encode(_p.digest()).decode()
+ def check_identity(self, emlfrom: str) -> bool:
+ if not self.passing or not emlfrom:
+ return False
- self.lsig = None
- self.passing = False
- self.iv = False
- self.mv = False
- self.pv = False
+ if self.level == 'domain':
+ if emlfrom.endswith('@' + self.identity):
+ logger.debug('PASS : sig domain %s matches from identity %s', self.identity, emlfrom)
+ return True
+ self.errors.append('signing domain %s does not match From: %s' % (self.identity, emlfrom))
+ return False
- @property
- def attid(self):
- return '%s-%s-%s' % (self.i[:8], self.m[:8], self.p[:8])
+ if emlfrom == self.identity:
+ logger.debug('PASS : sig identity %s matches from identity %s', self.identity, emlfrom)
+ return True
+ self.errors.append('signing identity %s does not match From: %s' % (self.identity, emlfrom))
+ return False
def __repr__(self):
out = list()
- out.append(' i: %s' % self.i)
- out.append(' m: %s' % self.m)
- out.append(' p: %s' % self.p)
- out.append(' ib: %s' % self.ib)
- out.append(' mb: %s' % self.mb)
- out.append(' pb: %s' % self.pb)
- out.append(' iv: %s' % self.iv)
- out.append(' mv: %s' % self.mv)
- out.append(' pv: %s' % self.pv)
- out.append(' pass: %s' % self.passing)
+ out.append(' mode: %s' % self.mode)
+ out.append(' level: %s' % self.level)
+ out.append('identity: %s' % self.identity)
+ out.append('signtime: %s' % self.signtime)
+ out.append(' keysrc: %s' % self.keysrc)
+ out.append(' keyalgo: %s' % self.keyalgo)
+ out.append(' passing: %s' % self.passing)
+ out.append('have_key: %s' % self.have_key)
+ out.append(' errors: %s' % ','.join(self.errors))
return '\n'.join(out)
- def validate(self, msg):
- # Check if we have a X-Patch-Sig header. At this time, we only support two modes:
- # - GPG mode, which we check for fist
- # - Plain DKIM mode, which we check as fall-back
- # More modes may be coming in the future, depending on feedback.
- shdr = msg.get(HDR_PATCH_SIG)
- hhdr = msg.get(HDR_PATCH_HASHES)
- if hhdr is None:
- # Do we have a dkim signature header?
- if can_dkim_verify and msg.get('DKIM-Signature'):
- config = get_main_config()
- if config.get('attestation-check-dkim') == 'yes':
- self.lsig = LoreAttestationSignatureDKIM(msg)
- if self.lsig.passing:
- self.passing = True
- self.iv = True
- self.mv = True
- self.pv = True
- return self.passing
- return None
-
- if shdr is None:
- return None
-
- sdata = get_parts_from_header(shdr)
- if sdata.get('m') == 'pgp':
- self.lsig = LoreAttestationSignaturePGP(msg)
- if self.lsig.passing:
- hdata = get_parts_from_header(hhdr)
- if hdata['i'] == self.ib:
- self.iv = True
- if hdata['m'] == self.mb:
- self.mv = True
- if hdata['p'] == self.pb:
- self.pv = True
-
- if self.iv and self.mv and self.pv:
- self.passing = True
-
- if self.lsig is None:
- return None
- return self.passing
+class LoreAttestorDKIM(LoreAttestor):
+ def __init__(self, passing: bool, identity: str, signtime: str, errors: list) -> None:
+ super().__init__()
+ self.mode = 'DKIM'
+ self.level = 'domain'
+ self.keysrc = 'DNS'
+ self.signtime = signtime
+ self.passing = passing
+ self.identity = identity.lstrip('@')
+ self.errors = errors
+
+
+class LoreAttestorPatatt(LoreAttestor):
+ def __init__(self, result: bool, identity: str, signtime: str, keysrc: str, keyalgo: str, errors: list) -> None:
+ super().__init__()
+ self.mode = 'patatt'
+ self.level = 'person'
+ self.identity = identity
+ self.signtime = signtime
+ self.keysrc = keysrc
+ self.keyalgo = keyalgo
+ self.errors = errors
+ if result == patatt.RES_VALID:
+ self.passing = True
+ self.have_key = True
-def _run_command(cmdargs, stdin=None):
+def _run_command(cmdargs: list, stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
logger.debug('Running %s' % ' '.join(cmdargs))
-
- sp = subprocess.Popen(cmdargs,
- stdout=subprocess.PIPE,
- stdin=subprocess.PIPE,
- stderr=subprocess.PIPE)
-
+ sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)
(output, error) = sp.communicate(input=stdin)
return sp.returncode, output, error
-def gpg_run_command(args, stdin=None):
+def gpg_run_command(args: list[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]:
config = get_main_config()
cmdargs = [config['gpgbin'], '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb']
if config['attestation-gnupghome'] is not None:
@@ -1968,7 +1635,8 @@ def gpg_run_command(args, stdin=None):
return _run_command(cmdargs, stdin=stdin)
-def git_run_command(gitdir, args, stdin=None, logstderr=False):
+def git_run_command(gitdir: Optional[str], args: list[str], stdin: Optional[bytes] = None,
+ logstderr: bool = False) -> Tuple[int, str]:
cmdargs = ['git', '--no-pager']
if gitdir:
if os.path.isdir(os.path.join(gitdir, '.git')):
@@ -1988,7 +1656,7 @@ def git_run_command(gitdir, args, stdin=None, logstderr=False):
return ecode, out
-def git_get_command_lines(gitdir, args):
+def git_get_command_lines(gitdir: Optional[str], args: list) -> list[str]:
ecode, out = git_run_command(gitdir, args)
lines = list()
if out:
@@ -2048,7 +1716,9 @@ def in_directory(dirname):
os.chdir(cdir)
-def get_config_from_git(regexp, defaults=None):
+def get_config_from_git(regexp: str, defaults: Optional[dict] = None, multivals: Optional[list] = None) -> dict:
+ if multivals is None:
+ multivals = list()
args = ['config', '-z', '--get-regexp', regexp]
ecode, out = git_run_command(None, args)
gitconfig = defaults
@@ -2063,18 +1733,23 @@ def get_config_from_git(regexp, defaults=None):
key, value = line.split('\n', 1)
try:
chunks = key.split('.')
- cfgkey = chunks[-1]
- gitconfig[cfgkey.lower()] = value
+ cfgkey = chunks[-1].lower()
+ if cfgkey in multivals:
+ if cfgkey not in gitconfig:
+ gitconfig[cfgkey] = list()
+ gitconfig[cfgkey].append(value)
+ else:
+ gitconfig[cfgkey] = value
except ValueError:
logger.debug('Ignoring git config entry %s', line)
return gitconfig
-def get_main_config():
+def get_main_config() -> dict:
global MAIN_CONFIG
if MAIN_CONFIG is None:
- config = get_config_from_git(r'b4\..*', defaults=DEFAULT_CONFIG)
+ config = get_config_from_git(r'b4\..*', defaults=DEFAULT_CONFIG, multivals=['keyringsrc'])
# Legacy name was get-lore-mbox, so load those as well
config = get_config_from_git(r'get-lore-mbox\..*', defaults=config)
config['trailer-order'] = config['trailer-order'].split(',')
@@ -2085,23 +1760,23 @@ def get_main_config():
return MAIN_CONFIG
-def get_data_dir():
+def get_data_dir(appname: str = 'b4') -> str:
if 'XDG_DATA_HOME' in os.environ:
datahome = os.environ['XDG_DATA_HOME']
else:
datahome = os.path.join(str(Path.home()), '.local', 'share')
- datadir = os.path.join(datahome, 'b4')
+ datadir = os.path.join(datahome, appname)
Path(datadir).mkdir(parents=True, exist_ok=True)
return datadir
-def get_cache_dir():
+def get_cache_dir(appname: str = 'b4') -> str:
global _CACHE_CLEANED
if 'XDG_CACHE_HOME' in os.environ:
cachehome = os.environ['XDG_CACHE_HOME']
else:
cachehome = os.path.join(str(Path.home()), '.cache')
- cachedir = os.path.join(cachehome, 'b4')
+ cachedir = os.path.join(cachehome, appname)
Path(cachedir).mkdir(parents=True, exist_ok=True)
if _CACHE_CLEANED:
return cachedir
@@ -2426,83 +2101,43 @@ def parse_int_range(intrange, upper=None):
logger.critical('Unknown range value specified: %s', n)
-def dkim_canonicalize_header(hname, hval):
- hname = hname.lower()
- hval = hval.strip()
- hval = re.sub(r'\n', '', hval)
- hval = re.sub(r'\s+', ' ', hval)
- return hname, hval
-
-
-def get_parts_from_header(hstr: str) -> dict:
- hstr = re.sub(r'\s*', '', hstr)
- hdata = dict()
- for chunk in hstr.split(';'):
- parts = chunk.split('=', 1)
- if len(parts) < 2:
- continue
- hdata[parts[0]] = parts[1]
- return hdata
-
-
-def validate_gpg_signature(output, trustmodel):
+def check_gpg_status(status: str) -> Tuple[bool, bool, bool, str, str]:
good = False
valid = False
trusted = False
- attestor = None
- sigdate = None
- errors = set()
- gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M)
+ keyid = None
+ signtime = ''
+
+ gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M)
if gs_matches:
- logger.debug(' GOODSIG')
good = True
- keyid = gs_matches.groups()[0]
- attestor = LoreAttestorPGP(keyid)
- puid = '%s <%s>' % attestor.get_primary_uid()
- vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M)
- if vs_matches:
- logger.debug(' VALIDSIG')
- valid = True
- ymd = vs_matches.groups()[1]
- sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc)
- # Do we have a TRUST_(FULLY|ULTIMATE)?
- ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M)
- if ts_matches:
- logger.debug(' TRUST_%s', ts_matches.groups()[0])
- trusted = True
- else:
- errors.add('Insufficient trust (model=%s): %s (%s)' % (trustmodel, keyid, puid))
- else:
- errors.add('Signature not valid from key: %s (%s)' % (attestor.keyid, puid))
- else:
- # Are we missing a key?
- matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M)
- if matches:
- errors.add('Missing public key: %s' % matches.groups()[0])
- # Is the key expired?
- matches = re.search(r'^\[GNUPG:] EXPKEYSIG (.*)$', output, re.M)
- if matches:
- errors.add('Expired key: %s' % matches.groups()[0])
-
- return good, valid, trusted, attestor, sigdate, errors
-
-
-def dkim_get_txt(name: bytes, timeout: int = 5):
- global _DKIM_DNS_CACHE
- if name not in _DKIM_DNS_CACHE:
- lookup = name.decode()
- logger.debug('DNS-lookup: %s', lookup)
- try:
- a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True)
- for r in a.response.answer:
- if r.rdtype == dns.rdatatype.TXT:
- for item in r.items:
- # Concatenate all strings
- txtdata = b''.join(item.strings)
- if txtdata.find(b'p=') >= 0:
- _DKIM_DNS_CACHE[name] = txtdata
- return txtdata
- except dns.resolver.NXDOMAIN:
- pass
- _DKIM_DNS_CACHE[name] = None
- return _DKIM_DNS_CACHE[name]
+ vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M)
+ if vs_matches:
+ valid = True
+ keyid = vs_matches.groups()[0]
+ signtime = vs_matches.groups()[2]
+ ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M)
+ if ts_matches:
+ trusted = True
+
+ return good, valid, trusted, keyid, signtime
+
+
+def get_gpg_uids(keyid: str) -> list:
+ gpgargs = ['--with-colons', '--list-keys', keyid]
+ ecode, out, err = gpg_run_command(gpgargs)
+ if ecode > 0:
+ raise KeyError('Unable to get UIDs list matching key %s' % keyid)
+
+ keyinfo = out.decode()
+ uids = list()
+ for line in keyinfo.split('\n'):
+ if line[:4] != 'uid:':
+ continue
+ chunks = line.split(':')
+ if chunks[1] in ('r',):
+ # Revoked UID, ignore
+ continue
+ uids.append(chunks[9])
+
+ return uids \ No newline at end of file
diff --git a/b4/attest.py b/b4/attest.py
index 2b9ccab..a6acb28 100644
--- a/b4/attest.py
+++ b/b4/attest.py
@@ -5,150 +5,28 @@
#
import sys
-import email
-import email.utils
-import email.message
-import email.header
import b4
import argparse
-import base64
+try:
+ import patatt
+ can_patatt = True
+except ModuleNotFoundError:
+ can_patatt = False
-logger = b4.logger
-
-
-def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None:
- if lmsg.msg.get(b4.HDR_PATCH_HASHES):
- if not replace:
- logger.info(' attest: message already attested')
- return
- del lmsg.msg[b4.HDR_PATCH_HASHES]
- del lmsg.msg[b4.HDR_PATCH_SIG]
-
- logger.info(' attest: generating attestation hashes')
- if not lmsg.attestation:
- raise RuntimeError('Could not calculate patch attestation')
-
- headers = list()
- hparts = [
- 'v=1',
- 'h=sha256',
- f'i={lmsg.attestation.ib}',
- f'm={lmsg.attestation.mb}',
- f'p={lmsg.attestation.pb}',
- ]
- if lmsg.git_patch_id:
- hparts.append(f'g={lmsg.git_patch_id}')
-
- hhname, hhval = b4.dkim_canonicalize_header(b4.HDR_PATCH_HASHES, '; '.join(hparts))
- headers.append(f'{hhname}:{hhval}')
-
- logger.debug('Signing with mode=%s', mode)
- if mode == 'pgp':
- usercfg = b4.get_user_config()
- keyid = usercfg.get('signingkey')
- identity = usercfg.get('email')
- if not identity:
- raise RuntimeError('Please set user.email to use this feature')
- if not keyid:
- raise RuntimeError('Please set user.signingKey to use this feature')
-
- logger.debug('Using i=%s, s=0x%s', identity, keyid.rstrip('!'))
- gpgargs = ['-b', '-u', f'{keyid}']
+from collections import namedtuple
- hparts = [
- 'm=pgp',
- f'i={identity}',
- 's=0x%s' % keyid.rstrip('!'),
- 'b=',
- ]
-
- shname, shval = b4.dkim_canonicalize_header(b4.HDR_PATCH_SIG, '; '.join(hparts))
- headers.append(f'{shname}:{shval}')
- payload = '\r\n'.join(headers).encode()
- ecode, out, err = b4.gpg_run_command(gpgargs, payload)
- if ecode > 0:
- logger.critical('Running gpg failed')
- logger.critical(err.decode())
- raise RuntimeError('Running gpg failed')
- bdata = base64.b64encode(out).decode()
- shval += header_splitter(bdata)
- else:
- raise NotImplementedError('Mode %s not implemented' % mode)
-
- hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78)
- shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
- lmsg.msg[b4.HDR_PATCH_HASHES] = hhdr
- lmsg.msg[b4.HDR_PATCH_SIG] = shdr
-
-
-def header_splitter(longstr: str, limit: int = 77) -> str:
- splitstr = list()
- first = True
- while len(longstr) > limit:
- at = limit
- if first:
- first = False
- at -= 2
- splitstr.append(longstr[:at])
- longstr = longstr[at:]
- splitstr.append(longstr)
- return ' '.join(splitstr)
+logger = b4.logger
def attest_patches(cmdargs: argparse.Namespace) -> None:
- for pf in cmdargs.patchfile:
- with open(pf, 'rb') as fh:
- msg = email.message_from_bytes(fh.read())
- lmsg = b4.LoreMessage(msg)
- lmsg.load_hashes()
- if not lmsg.attestation:
- logger.debug('Nothing to attest in %s, skipped')
- continue
- logger.info('Attesting: %s', pf)
- in_header_attest(lmsg, replace=True)
- with open(pf, 'wb') as fh:
- fh.write(lmsg.msg.as_bytes())
-
-
-def mutt_filter() -> None:
- if sys.stdin.isatty():
- logger.error('Error: Mutt mode expects a message on stdin')
+ if not can_patatt:
+ logger.critical('ERROR: b4 now uses patatt for patch attestation. See:')
+ logger.critical(' https://git.kernel.org/pub/scm/utils/patatt/patatt.git/about/')
sys.exit(1)
- inb = sys.stdin.buffer.read()
- # Quick exit if we don't find x-patch-sig
- if inb.find(b'X-Patch-Sig:') < 0:
- sys.stdout.buffer.write(inb)
- return
- msg = email.message_from_bytes(inb)
- try:
- if msg.get('x-patch-sig'):
- lmsg = b4.LoreMessage(msg)
- lmsg.load_hashes()
- latt = lmsg.attestation
- if latt:
- if latt.validate(msg):
- trailer = latt.lsig.attestor.get_trailer(lmsg.fromemail)
- msg.add_header('Attested-By', trailer)
- elif latt.lsig:
- if not latt.lsig.errors:
- failed = list()
- if not latt.pv:
- failed.append('patch content')
- if not latt.mv:
- failed.append('commit message')
- if not latt.iv:
- failed.append('patch metadata')
- latt.lsig.errors.add('signature failed (%s)' % ', '.join(failed))
- msg.add_header('Attestation-Failed', ', '.join(latt.lsig.errors))
- # Delete the x-patch-hashes and x-patch-sig headers so
- # they don't boggle up the view
- for i in reversed(range(len(msg._headers))): # noqa
- hdrName = msg._headers[i][0].lower() # noqa
- if hdrName in ('x-patch-hashes', 'x-patch-sig'):
- del msg._headers[i] # noqa
- except: # noqa
- # Don't prevent email from being displayed even if we died horribly
- sys.stdout.buffer.write(inb)
- return
- sys.stdout.buffer.write(msg.as_bytes(policy=b4.emlpolicy))
+ # directly invoke cmd_sign in patatt
+ config = patatt.get_config_from_git(r'patatt\..*', multivals=['keyringsrc'])
+ fakeargs = namedtuple('Struct', ['hookmode', 'msgfile'])
+ fakeargs.hookmode = True
+ fakeargs.msgfile = cmdargs.patchfile
+ patatt.cmd_sign(fakeargs, config)
diff --git a/b4/command.py b/b4/command.py
index 59fefbf..ff457dd 100644
--- a/b4/command.py
+++ b/b4/command.py
@@ -42,9 +42,7 @@ def cmd_am(cmdargs):
def cmd_attest(cmdargs):
import b4.attest
- if cmdargs.mutt_filter:
- b4.attest.mutt_filter()
- elif len(cmdargs.patchfile):
+ if len(cmdargs.patchfile):
b4.attest.attest_patches(cmdargs)
else:
logger.critical('ERROR: missing patches to attest')
@@ -128,8 +126,8 @@ def cmd():
help='OBSOLETE: this option does nothing and will be removed')
sp_att.add_argument('-o', '--output', default=None,
help='OBSOLETE: this option does nothing and will be removed')
- sp_att.add_argument('-m', '--mutt-filter', action='store_true', default=False,
- help='Run in mutt filter mode')
+ sp_att.add_argument('-m', '--mutt-filter', default=None,
+ help='OBSOLETE: this option does nothign and will be removed')
sp_att.add_argument('patchfile', nargs='*', help='Patches to attest')
sp_att.set_defaults(func=cmd_attest)
diff --git a/b4/mbox.py b/b4/mbox.py
index d3bde25..275a42a 100644
--- a/b4/mbox.py
+++ b/b4/mbox.py
@@ -273,8 +273,7 @@ def thanks_record_am(lser, cherrypick=None):
at += 1
continue
- pmsg.load_hashes()
- if pmsg.attestation is None:
+ if pmsg.pwhash is None:
logger.debug('Unable to get hashes for all patches, not tracking for thanks')
return
diff --git a/b4/pr.py b/b4/pr.py
index 5e6c7a1..374d8ed 100644
--- a/b4/pr.py
+++ b/b4/pr.py
@@ -122,11 +122,11 @@ def attest_fetch_head(gitdir, lmsg):
config = b4.get_main_config()
attpolicy = config['attestation-policy']
if config['attestation-checkmarks'] == 'fancy':
- attpass = b4.PASS_FANCY
- attfail = b4.FAIL_FANCY
+ attpass = b4.ATT_PASS_FANCY
+ attfail = b4.ATT_FAIL_FANCY
else:
- attpass = b4.PASS_SIMPLE
- attfail = b4.FAIL_SIMPLE
+ attpass = b4.ATT_PASS_SIMPLE
+ attfail = b4.ATT_FAIL_SIMPLE
# Is FETCH_HEAD a tag or a commit?
htype = b4.git_get_command_lines(gitdir, ['cat-file', '-t', 'FETCH_HEAD'])
passing = False
@@ -139,17 +139,30 @@ def attest_fetch_head(gitdir, lmsg):
elif otype == 'commit':
ecode, out = b4.git_run_command(gitdir, ['verify-commit', '--raw', 'FETCH_HEAD'], logstderr=True)
- good, valid, trusted, attestor, sigdate, errors = b4.validate_gpg_signature(out, 'pgp')
-
- if good and valid and trusted:
+ good, valid, trusted, keyid, sigtime = b4.check_gpg_status(out)
+ try:
+ uids = b4.get_gpg_uids(keyid)
+ signer = None
+ for uid in uids:
+ if uid.find(f'<{lmsg.fromemail}') >= 0:
+ signer = uid
+ break
+ if not signer:
+ signer = uids[0]
+
+ except KeyError:
+ signer = f'{lmsg.fromname} <{lmsg.fromemail}'
+
+ if good and valid:
passing = True
out = out.strip()
+ errors = set()
if not len(out) and attpolicy != 'check':
errors.add('Remote %s is not signed!' % otype)
if passing:
- trailer = attestor.get_trailer(lmsg.fromemail)
+ trailer = 'Signed: %s' % signer
logger.info(' ---')
logger.info(' %s %s', attpass, trailer)
return
diff --git a/patatt b/patatt
new file mode 160000
+Subproject e68ab474eb9baa62df43161d4cc46a512c853d5
diff --git a/requirements.txt b/requirements.txt
index 11d400d..a9a4be4 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,5 @@
-requests ~= 2.24.0
+requests~=2.25.0
# These are optional, needed for attestation features
-dnspython~=2.0.0
+dnspython~=2.1.0
dkimpy~=1.0.5
-# These may be required in the future for other patch attestation features
-#pycryptodomex~=3.9.9
-#PyNaCl~=1.4.0
+patatt>=0.2.0,<2.0