From f1a2700e70018349d5c63f2053ba4b0e7ebe351a Mon Sep 17 00:00:00 2001 From: Konstantin Ryabitsev Date: Tue, 11 May 2021 14:56:05 -0400 Subject: Reimplement attestation code one more time Move end-to-end attestation code into its own library: patatt. See https://git.kernel.org/pub/scm/utils/patatt/patatt.git/about/ It is included into b4 as a submodule, but you will need to init it first: git submodule update --init This change significantly simplifies our attestation code, dropping thousands of lines of rather hairy code. Notably, patatt-style attestation is incompatible with previous attestation implementations done directly in b4, but that's just as well -- we've always marked it as "experimental" and the lack of adoption was proving that we weren't on the right path. Next to come is keyring management and documentation. Signed-off-by: Konstantin Ryabitsev --- .gitmodules | 3 + README.rst | 93 +---- b4.sh | 2 +- b4/__init__.py | 1149 +++++++++++++++++++----------------------------------- b4/attest.py | 154 +------- b4/command.py | 8 +- b4/mbox.py | 3 +- b4/pr.py | 29 +- patatt | 1 + requirements.txt | 8 +- 10 files changed, 457 insertions(+), 993 deletions(-) create mode 100644 .gitmodules create mode 160000 patatt diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..58f0214 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "patatt"] + path = patatt + url = https://git.kernel.org/pub/scm/utils/patatt/patatt.git diff --git a/README.rst b/README.rst index 166eb9a..0fe5222 100644 --- a/README.rst +++ b/README.rst @@ -34,52 +34,21 @@ Setting up a symlink should also be possible. Patch attestation (EXPERIMENTAL) -------------------------------- -Starting with version 0.6, b4 implements in-header patch attestation, -following the approach proposed here: - -https://git.kernel.org/pub/scm/linux/kernel/git/mricon/patch-attestation-poc.git/tree/README.rst - -At this time, only PGP mode is implemented, but further work is expected -in future versions of b4. - -Attesting your own patches -~~~~~~~~~~~~~~~~~~~~~~~~~~ -Patch attestation is done via message headers and stays out of the way -of usual code submission and review workflow. At this time, only -maintainers using b4 to retrieve patches and patch series will benefit -from patch attestation, but everyone is encouraged to submit -cryptographic patch attestation with their work anyway, in hopes that it -becomes a common and widely used procedure. - -To start attesting your own patches: - -1. Make sure you have b4 version 0.6.0 or above: - ``b4 --version`` -2. If you don't already have a PGP key, you can follow the following - guide on how to generate it: - https://www.kernel.org/doc/html/latest/process/maintainer-pgp-guide.html -3. It is strongly recommended to use ed25519 as your signing key - algorithm, as it will result in much smaller signatures, preventing - unnecessary email header bloat. -4. Make sure your ``user.email`` and ``user.signingkey`` are set either - globally, or in the repository you will be using for attestation. -5. Add the ``sendemail-validate`` hook to each repository you want - enabled for attestation, with the following single line of content as - the hook body: - ``b4 attest $1``. - -If you are using b4 from git checkout, you can use a symlink instead:: - - ln -s path/to/b4/hooks/sendemail-validate-attestation-hook \ - .git/hooks/sendemail-validate - -(Note, that there's a second "E" in send*E*mail.) - -Next time you run ``git send-email``, b4 will automatically add -attestation headers to all patches before they go out. - -Verifying attestation on received patches -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +B4 implements two attestation verification mechanisms: + +- DKIM attestation using the dkimpy library +- X-Developer-Signature attestation using the patatt library + +If you installed from pip, you should have pulled both of these +dependencies in automatically. Alternatively, you can install dkimpy +from your OS packaging and then run "git submodule update --init" to +clone patatt as a submodule of b4. + +For attesting your outgoing patches, see patatt documentation. +https://git.kernel.org/pub/scm/utils/patatt/patatt.git/about/ + +Showing attestation on received patches +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ There are three attestation verification policies in b4: - check (default) @@ -101,38 +70,8 @@ You can set the preferred policy via the git configuration file:: [b4] attestation-policy = softfail -Using with mutt -~~~~~~~~~~~~~~~ -You can show patch attestation data with mutt, using the following -configuration parameters:: - - set display_filter="b4 -q attest -m" - ignore * - unignore from date subject to cc list-id: - unignore x-patch-hashes: x-patch-sig: - unignore attested-by: attestation-failed: - -When displaying a message containing in-header PGP attestation -signatures, mutt will display either the "Attested-By" or the -"Attestation-Failed" headers, e.g.:: - - Date: Mon, 23 Nov 2020 13:38:50 -0500 - From: Konstantin Ryabitsev - To: mricon@kernel.org - Subject: [PATCH 3/5] Fix in-header attestation code - Attested-By: Konstantin Ryabitsev (pgp: B6C41CE35664996C) - -or:: - - Date: Mon, 23 Nov 2020 13:38:48 -0500 - From: Konstantin Ryabitsev - To: mricon@kernel.org - Subject: [PATCH 1/5] Add not very simple dkim key caching - Attestation-Failed: signature failed (commit message, patch metadata) - - Support ------- For support or with any other questions, please email tools@linux.kernel.org, or browse the list archive at -https://linux.kernel.org/g/tools. +https://lore.kernel.org/tools. diff --git a/b4.sh b/b4.sh index e1f6c4e..10d33b7 100755 --- a/b4.sh +++ b/b4.sh @@ -6,4 +6,4 @@ REAL_SCRIPT=$(realpath -e ${BASH_SOURCE[0]}) SCRIPT_TOP="${SCRIPT_TOP:-$(dirname ${REAL_SCRIPT})}" -exec env PYTHONPATH="${SCRIPT_TOP}" python3 "${SCRIPT_TOP}/b4/command.py" "${@}" +exec env PYTHONPATH="${SCRIPT_TOP}:${SCRIPT_TOP}/patatt" python3 "${SCRIPT_TOP}/b4/command.py" "${@}" diff --git a/b4/__init__.py b/b4/__init__.py index 32b5c02..6a09a47 100644 --- a/b4/__init__.py +++ b/b4/__init__.py @@ -18,25 +18,27 @@ import time import shutil import mailbox import pwd -import base64 from pathlib import Path from tempfile import mkstemp, TemporaryDirectory from contextlib import contextmanager +from typing import Optional, Tuple, Set from email import charset charset.add_charset('utf-8', None) emlpolicy = email.policy.EmailPolicy(utf8=True, cte_type='8bit', max_line_length=None) try: - import dns.resolver import dkim + can_dkim = True +except ModuleNotFoundError: + can_dkim = False - can_dkim_verify = True - _resolver = dns.resolver.get_default_resolver() +try: + import patatt + can_patatt = True except ModuleNotFoundError: - can_dkim_verify = False - _resolver = None + can_patatt = False __VERSION__ = '0.7.0-dev' @@ -45,15 +47,12 @@ logger = logging.getLogger('b4') HUNK_RE = re.compile(r'^@@ -\d+(?:,(\d+))? \+\d+(?:,(\d+))? @@') FILENAME_RE = re.compile(r'^(---|\+\+\+) (\S+)') -PASS_SIMPLE = '[P]' -WEAK_SIMPLE = '[D]' -FAIL_SIMPLE = '[F]' -PASS_FANCY = '\033[32m\u2714\033[0m' -WEAK_FANCY = '\033[32m\u2713\033[0m' -FAIL_FANCY = '\033[31m\u2717\033[0m' +ATT_PASS_SIMPLE = 'v' +ATT_FAIL_SIMPLE = 'x' +ATT_PASS_FANCY = '\033[32m\u2713\033[0m' +ATT_FAIL_FANCY = '\033[31m\u2717\033[0m' -HDR_PATCH_HASHES = 'X-Patch-Hashes' -HDR_PATCH_SIG = 'X-Patch-Sig' +DEVSIG_HDR = 'X-Developer-Signature' # You can use bash-style globbing here WANTHDRS = [ @@ -130,19 +129,19 @@ MAIN_CONFIG = None # This is git-config user.* USER_CONFIG = None -# Used for tracking attestations we have already looked up -ATTESTATIONS = list() -# Used for keeping a cache of subkey lookups to minimize shelling out to gpg -SUBKEY_DATA = dict() # Used for storing our requests session REQSESSION = None # Indicates that we've cleaned cache already _CACHE_CLEANED = False -# Used for dkim key lookups -_DKIM_DNS_CACHE = dict() class LoreMailbox: + msgid_map: dict[str] + series: dict + covers: dict + followups: list + unknowns: list + def __init__(self): self.msgid_map = dict() self.series = dict() @@ -312,17 +311,18 @@ class LoreMailbox: for trailer in trailers: logger.debug('%s%s: %s', ' ' * (lvl+1), trailer[0], trailer[1]) if pmsg.has_diff and not pmsg.reply: + # TODO: See below # We found the patch for these trailers - if pmsg.revision != revision: - # add this into our trailer map to carry over trailers from - # previous revisions to current revision if patch/metadata did - # not change - pmsg.load_hashes() - if pmsg.attestation: - attid = pmsg.attestation.attid - if attid not in self.trailer_map: - self.trailer_map[attid] = list() - self.trailer_map[attid] += trailers + # if pmsg.revision != revision: + # # add this into our trailer map to carry over trailers from + # # previous revisions to current revision if patch/metadata did + # # not change + # pmsg.load_hashes() + # if pmsg.attestation: + # attid = pmsg.attestation.attid + # if attid not in self.trailer_map: + # self.trailer_map[attid] = list() + # self.trailer_map[attid] += trailers pmsg.followup_trailers += trailers break if not pmsg.reply: @@ -337,12 +337,13 @@ class LoreMailbox: break # Carry over trailers from previous series if patch/metadata did not change - for lmsg in lser.patches: - if lmsg is None or lmsg.attestation is None: - continue - lmsg.load_hashes() - if lmsg.attestation.attid in self.trailer_map: - lmsg.followup_trailers += self.trailer_map[lmsg.attestation.attid] + # TODO: This needs fixing, because we no longer generate the three hashes + # TODO: and patchwork_hash only gives us the hash of the actual patch + # for lmsg in lser.patches: + # if lmsg is None or lmsg.patchwork_hash is None: + # continue + # if lmsg.patchwork_hash in self.trailer_map: + # lmsg.followup_trailers += self.trailer_map[lmsg.patchwork_hash] return lser @@ -496,20 +497,33 @@ class LoreSeries: logger.critical('WARNING: Unable to add your Signed-off-by: git returned no user.name or user.email') addmysob = False - attdata = [(None, None)] * len(self.patches[1:]) attpolicy = config['attestation-policy'] - if config['attestation-checkmarks'] == 'fancy': - attpass = PASS_FANCY - attfail = FAIL_FANCY - attweak = WEAK_FANCY - else: - attpass = PASS_SIMPLE - attfail = FAIL_SIMPLE - attweak = WEAK_SIMPLE + # Loop through all patches and see if attestation is the same for all of them, + # since it usually is + attref = None + attsame = True + attmark = None + attcrit = False + if attpolicy != 'off': + logger.info('Checking attestation on all messages, may take a moment...') + for lmsg in self.patches[1:]: + if lmsg is None: + attsame = False + break + + checkmark, trailers, attcrit = lmsg.get_attestation_trailers(attpolicy) + if attref is None: + attref = trailers + attmark = checkmark + continue + if set(trailers) == set(attref): + continue + attsame = False + logger.debug('Attestation info is not the same') + break at = 1 - atterrors = list() for lmsg in self.patches[1:]: if cherrypick is not None: if at not in cherrypick: @@ -529,35 +543,23 @@ class LoreSeries: if addlink: lmsg.followup_trailers.append(('Link', linkmask % lmsg.msgid, None, None)) - if attpolicy != 'off': - lmsg.load_hashes() - latt = lmsg.attestation - if latt and latt.validate(lmsg.msg): - if latt.lsig.attestor and latt.lsig.attestor.mode == 'domain': - logger.info(' %s %s', attweak, lmsg.full_subject) - attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attweak) # noqa - else: - logger.info(' %s %s', attpass, lmsg.full_subject) - attdata[at-1] = (latt.lsig.attestor.get_trailer(lmsg.fromemail), attpass) # noqa + if attsame and not attcrit: + if attmark: + logger.info(' %s %s', attmark, lmsg.full_subject) else: - if latt and latt.lsig and attpolicy in ('softfail', 'hardfail'): - logger.info(' %s %s', attfail, lmsg.full_subject) - if latt and latt.lsig and latt.lsig.attestor and latt.lsig.attestor.mode == 'domain': - atterrors.append('Failed %s attestation' % latt.lsig.attestor.get_trailer()) - elif latt and latt.lsig and latt.lsig.attestor: - failed = list() - if not latt.pv: - failed.append('patch content') - if not latt.mv: - failed.append('commit message') - if not latt.iv: - failed.append('patch metadata') - atterrors.append('Patch %s/%s failed attestation (%s)' % (at, lmsg.expected, - ', '.join(failed))) - else: - logger.info(' %s', lmsg.full_subject) + logger.info(' %s', lmsg.full_subject) + else: - logger.info(' %s', lmsg.full_subject) + checkmark, trailers, critical = lmsg.get_attestation_trailers(attpolicy) + logger.info(' %s %s', checkmark, lmsg.full_subject) + for trailer in trailers: + logger.info(' %s', trailer) + + if critical: + import sys + logger.critical('---') + logger.critical('Exiting due to attestation-policy: hardfail') + sys.exit(128) add_trailers = True if noaddtrailers: @@ -572,29 +574,17 @@ class LoreSeries: if attpolicy == 'off': return mbx - failed = (None, None) in attdata - if not failed: + if attsame and attref: logger.info(' ---') - for trailer, attmode in set(attdata): - logger.info(' %s Attestation-by: %s', attmode, trailer) - return mbx - elif not can_dkim_verify and config.get('attestation-check-dkim') == 'yes': - logger.info(' ---') - logger.info(' NOTE: install dkimpy for DKIM signature verification') - - errors = set(atterrors) - for attdoc in ATTESTATIONS: - errors.update(attdoc.errors) - - if errors: - logger.critical(' ---') - logger.critical(' Attestation is available, but did not succeed:') - for error in errors: - logger.critical(' %s %s', attfail, error) + for trailer in attref: + logger.info(' %s', trailer) - if attpolicy == 'hardfail': - import sys - sys.exit(128) + if not (can_dkim and can_patatt): + logger.info(' ---') + if not can_dkim: + logger.info(' NOTE: install dkimpy for DKIM signature verification') + if not can_patatt: + logger.info(' NOTE: install patatt for end-to-end signature verification') return mbx @@ -691,7 +681,6 @@ class LoreSeries: logger.critical('ERROR: v%s series incomplete; unable to create a fake-am range', self.revision) return None, None logger.debug('Looking at %s', lmsg.full_subject) - lmsg.load_hashes() if not lmsg.blob_indexes: logger.critical('ERROR: some patches do not have indexes') logger.critical(' unable to create a fake-am range') @@ -795,11 +784,8 @@ class LoreMessage: self.pr_tip_commit = None self.pr_remote_tip_commit = None - self.attestation = None # Patchwork hash self.pwhash = None - # Git patch-id - self.git_patch_id = None # Blob indexes self.blob_indexes = None @@ -815,6 +801,9 @@ class LoreMessage: self.revision_inferred = self.lsubject.revision_inferred self.counters_inferred = self.lsubject.counters_inferred + # Loaded when attestors property is called + self._attestors = None + # Handle [PATCH 6/5] if self.counter > self.expected: self.expected = self.counter @@ -884,6 +873,8 @@ class LoreMessage: self.has_diffstat = True if diffre.search(self.body): self.has_diff = True + self.pwhash = LoreMessage.get_patchwork_hash(self.body) + self.blob_indexes = LoreMessage.get_indexes(self.body) # We only pay attention to trailers that are sent in reply if self.reply: @@ -945,6 +936,124 @@ class LoreMessage: return trailers, mismatches + @property + def attestors(self): + if self._attestors is not None: + return self._attestors + + self._attestors = list() + + config = get_main_config() + if config['attestation-policy'] == 'off': + return self._attestors + + if self.msg.get(DEVSIG_HDR): + self._load_patatt_attestors() + if self.msg.get('dkim-signature'): + self._load_dkim_attestors() + + return self._attestors + + def _load_dkim_attestors(self) -> None: + if not can_dkim: + logger.debug('Message has DKIM signatures, but can_dkim is off') + return + + # Yank out all DKIM-Signature headers and try them in reverse order + # until we come to a passing one + dkhdrs = list() + for header in list(self.msg._headers): # noqa + if header[0].lower() == 'dkim-signature': + dkhdrs.append(header) + self.msg._headers.remove(header) # noqa + dkhdrs.reverse() + + seenatts = list() + for hn, hval in dkhdrs: + errors = list() + hdata = LoreMessage.get_parts_from_header(hval) + logger.debug('Loading DKIM attestation for d=%s, s=%s', hdata['d'], hdata['s']) + + signtime = hdata.get('t') + identity = hdata.get('i') + if not identity: + identity = hdata.get('d') + + self.msg._headers.append((hn, hval)) # noqa + res = dkim.verify(self.msg.as_bytes()) + + attestor = LoreAttestorDKIM(res, identity, signtime, errors) + logger.debug('DKIM verify results: %s=%s', identity, res) + if attestor.check_identity(self.fromemail): + # use this one, regardless of any other DKIM signatures + self._attestors.append(attestor) + return + + self.msg._headers.pop(-1) # noqa + seenatts.append(attestor) + + # No exact domain matches, so return everything we have + self._attestors += seenatts + + def _load_patatt_attestors(self) -> None: + if not can_patatt: + logger.debug('Message has %s headers, but can_patatt is off', DEVSIG_HDR) + return + + # load our key sources if necessary + ddir = get_data_dir() + pdir = os.path.join(ddir, 'keyring') + config = get_main_config() + sources = config.get('keyringsrc') + if not sources: + sources = ['ref:::.keys', 'ref:::.local-keys', 'ref::refs/meta/keyring:'] + if pdir not in sources: + sources.append(pdir) + + # Push our logger and GPGBIN into patatt + patatt.logger = logger + patatt.GPGBIN = config['gpgbin'] + + logger.debug('Loading patatt attestations with sources=%s', str(sources)) + + attestations = patatt.validate_message(self.msg.as_bytes(), sources) + for passing, identity, signtime, keysrc, keyalgo, errors in attestations: + attestor = LoreAttestorPatatt(passing, identity, signtime, keysrc, keyalgo, errors) + self._attestors.append(attestor) + + def get_attestation_trailers(self, attpolicy: str) -> Tuple[str, list, bool]: + trailers = list() + checkmark = None + critical = False + for attestor in self.attestors: + if not attestor.passing: + # Is it a person-trailer for which we have a key? + if attestor.level == 'person': + if attestor.have_key: + # This was signed, and we have a key, but it's failing + trailers.append('%s BADSIG: %s' % (attestor.checkmark, attestor.trailer)) + checkmark = attestor.checkmark + elif attpolicy in ('softfail', 'hardfail'): + trailers.append('%s No key: %s' % (attestor.checkmark, attestor.trailer)) + # This is not critical even in hardfail + continue + elif attpolicy in ('softfail', 'hardfail'): + checkmark = attestor.checkmark + trailers.append('%s BADSIG: %s' % (attestor.checkmark, attestor.trailer)) + + if attpolicy == 'hardfail': + critical = True + else: + if not checkmark: + checkmark = attestor.checkmark + if attestor.check_identity(self.fromemail): + trailers.append('%s Signed: %s' % (attestor.checkmark, attestor.trailer)) + else: + trailers.append('%s Signed: %s (From: %s)' % (attestor.checkmark, attestor.trailer, + self.fromemail)) + + return checkmark, trailers, critical + def __repr__(self): out = list() out.append('msgid: %s' % self.msgid) @@ -971,6 +1080,10 @@ class LoreMessage: for trailer in self.followup_trailers: out.append(' |%s' % str(trailer)) out.append(' --- end trailers ---') + out.append(' --- begin attestors ---') + for attestor in self.attestors: + out.append(' |%s' % str(attestor)) + out.append(' --- end attestors ---') return '\n'.join(out) @@ -993,6 +1106,17 @@ class LoreMessage: new_hdrval = re.sub(r'\n?\s+', ' ', decoded) return new_hdrval.strip() + @staticmethod + def get_parts_from_header(hstr: str) -> dict: + hstr = re.sub(r'\s*', '', hstr) + hdata = dict() + for chunk in hstr.split(';'): + parts = chunk.split('=', 1) + if len(parts) < 2: + continue + hdata[parts[0]] = parts[1] + return hdata + @staticmethod def get_clean_msgid(msg, header='Message-Id'): msgid = None @@ -1004,9 +1128,7 @@ class LoreMessage: return msgid @staticmethod - def get_patchwork_hash(diff): - # Make sure we just have the diff without any extraneous content. - diff = LoreMessage.get_clean_diff(diff) + def get_patchwork_hash(diff: str) -> str: """Generate a hash from a diff. Lifted verbatim from patchwork.""" prefixes = ['-', '+', ' '] @@ -1049,7 +1171,7 @@ class LoreMessage: return hashed.hexdigest() @staticmethod - def get_indexes(diff): + def get_indexes(diff: str) -> Set[tuple]: indexes = set() curfile = None for line in diff.split('\n'): @@ -1064,135 +1186,6 @@ class LoreMessage: indexes.add((curfile, matches.groups()[0])) return indexes - @staticmethod - def get_clean_diff(diff): - diff = diff.replace('\r', '') - - # For keeping a buffer of lines preceding @@ ... @@ - buflines = list() - difflines = '' - - # Used for counting where we are in the patch - pp = mm = 0 - inside_binary_chunk = False - for line in diff.split('\n'): - if not len(line): - if inside_binary_chunk: - inside_binary_chunk = False - # add all buflines to difflines - difflines += '\n'.join(buflines) + '\n\n' - buflines = list() - continue - buflines.append(line) - continue - elif inside_binary_chunk: - buflines.append(line) - continue - # If line starts with 'index ' and previous line starts with 'deleted ', then - # it's a file delete and therefore doesn't have a regular hunk. - if line.find('index ') == 0 and len(buflines) > 1 and buflines[-1].find('deleted ') == 0: - # add this and 2 preceding lines to difflines and reset buflines - buflines.append(line) - difflines += '\n'.join(buflines[-3:]) + '\n' - buflines = list() - continue - if line.find('delta ') == 0 or line.find('literal ') == 0: - # we are inside a binary patch - inside_binary_chunk = True - buflines.append(line) - continue - hunk_match = HUNK_RE.match(line) - if hunk_match: - # logger.debug('Crunching %s', line) - mlines, plines = hunk_match.groups() - try: - pp = int(plines) - except TypeError: - pp = 1 - try: - mm = int(mlines) - except TypeError: - mm = 1 - addlines = list() - for bline in reversed(buflines): - # Go backward and add lines until we get to the start - # or encounter a blank line - if len(bline.strip()) == 0: - break - addlines.append(bline) - if addlines: - difflines += '\n'.join(reversed(addlines)) + '\n' - buflines = list() - # Feed this line to the hasher - difflines += line + '\n' - continue - if pp > 0 or mm > 0: - # Inside the patch - difflines += line + '\n' - if line[0] in (' ', '-'): - mm -= 1 - if line[0] in (' ', '+'): - pp -= 1 - continue - # Not anything we recognize, so stick into buflines - buflines.append(line) - return difflines - - def load_hashes(self): - if self.attestation is not None: - return - logger.debug('Calculating hashes for: %s', self.full_subject) - # Calculate git-patch-id first - cmdargs = ['patch-id', '--stable'] - msg = self.get_am_message(add_trailers=False) - stdin = msg.as_string(policy=emlpolicy).encode() - ecode, out = git_run_command(None, cmdargs, stdin) - if ecode > 0: - # Git doesn't think there's a patch there - return - fline = out.split('\n')[0] - if len(fline) >= 40: - self.git_patch_id = fline[:40] - - msg_out = mkstemp() - patch_out = mkstemp() - cmdargs = ['mailinfo', '--encoding=UTF-8', msg_out[1], patch_out[1]] - ecode, info = git_run_command(None, cmdargs, stdin) - if ecode > 0: - logger.debug('ERROR: Could not get mailinfo') - return - i = hashlib.sha256() - m = hashlib.sha256() - p = hashlib.sha256() - - for line in info.split('\n'): - # We don't use the "Date:" field because it is likely to be - # mangled between when git-format-patch generates it and - # when it is sent out by git-send-email (or other tools). - if re.search(r'^(Author|Email|Subject):', line): - i.update((line + '\n').encode()) - - with open(msg_out[1], 'rb') as mfh: - msg = mfh.read() - m.update(msg) - os.unlink(msg_out[1]) - - with open(patch_out[1], 'rb') as pfh: - patch = pfh.read().decode(self.charset, errors='replace') - if len(patch.strip()): - diff = LoreMessage.get_clean_diff(patch) - p.update(diff.encode()) - self.pwhash = LoreMessage.get_patchwork_hash(patch) - # Load the indexes, if we have them - self.blob_indexes = LoreMessage.get_indexes(diff) - else: - p = None - - os.unlink(patch_out[1]) - - if i and m and p: - self.attestation = LoreAttestation(i, m, p) - @staticmethod def find_trailers(body, followup=False): headers = ('subject', 'date', 'from') @@ -1311,13 +1304,6 @@ class LoreMessage: config = get_main_config() attpolicy = config['attestation-policy'] - if config['attestation-checkmarks'] == 'fancy': - attfail = FAIL_FANCY - attweak = WEAK_FANCY - else: - attfail = FAIL_SIMPLE - attweak = WEAK_SIMPLE - bheaders, message, btrailers, basement, signature = LoreMessage.get_body_parts(self.body) # Now we add mix-in trailers trailers = btrailers + self.followup_trailers @@ -1346,6 +1332,7 @@ class LoreMessage: trailer_order = DEFAULT_TRAILER_ORDER elif trailer_order in ('preserve', '_preserve_'): trailer_order = '*' + for trailermatch in trailer_order: for trailer in trailers: if list(trailer[:3]) in fixtrailers: @@ -1355,15 +1342,19 @@ class LoreMessage: fixtrailers.append(list(trailer[:3])) if trailer[:3] not in btrailers: extra = '' - if can_dkim_verify and config.get('attestation-check-dkim') == 'yes' and attpolicy != 'off': - if len(trailer) > 3 and trailer[3] is not None: - fmsg = trailer[3] - attsig = LoreAttestationSignatureDKIM(fmsg.msg) # noqa - if attsig.present: - if attsig.passing: - extra = ' (%s %s)' % (attweak, attsig.attestor.get_trailer()) - elif attpolicy in ('softfail', 'hardfail'): - extra = ' (%s %s)' % (attfail, attsig.attestor.get_trailer()) + if len(trailer) > 3 and trailer[3] is not None: + fmsg = trailer[3] + for attestor in fmsg.attestors: # noqa + if attestor.passing: + extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer) + elif attpolicy in ('hardfail', 'softfail'): + extra = ' (%s %s)' % (attestor.checkmark, attestor.trailer) + if attpolicy == 'hardfail': + import sys + logger.critical('---') + logger.critical('Exiting due to attestation-policy: hardfail') + sys.exit(1) + logger.info(' + %s: %s%s', trailer[0], trailer[1], extra) else: logger.debug(' . %s: %s', trailer[0], trailer[1]) @@ -1507,458 +1498,134 @@ class LoreSubject: class LoreAttestor: - def __init__(self, keyid): - self.keyid = keyid - self.uids = list() - - def __repr__(self): - out = list() - out.append(' keyid: %s' % self.keyid) - for uid in self.uids: - out.append(' uid: %s <%s>' % uid) - return '\n'.join(out) - - -class LoreAttestorDKIM(LoreAttestor): - def __init__(self, keyid): - self.mode = 'domain' - super().__init__(keyid) - - def get_trailer(self, fromaddr=None): # noqa - if fromaddr: - return 'DKIM/%s (From: %s)' % (self.keyid, fromaddr) - return 'DKIM/%s' % self.keyid - - -class LoreAttestorPGP(LoreAttestor): - def __init__(self, keyid): - super().__init__(keyid) - self.mode = 'person' - self.load_subkey_uids() - - def load_subkey_uids(self): - global SUBKEY_DATA - if self.keyid not in SUBKEY_DATA: - gpgargs = ['--with-colons', '--list-keys', self.keyid] - ecode, out, err = gpg_run_command(gpgargs) - if ecode > 0: - logger.critical('ERROR: Unable to get UIDs list matching key %s', self.keyid) - return - - keyinfo = out.decode() - - uids = list() - for line in keyinfo.split('\n'): - if line[:4] != 'uid:': - continue - chunks = line.split(':') - if chunks[1] in ('r',): - # Revoked UID, ignore - continue - uids.append(chunks[9]) - SUBKEY_DATA[self.keyid] = email.utils.getaddresses(uids) - - self.uids = SUBKEY_DATA[self.keyid] - - def get_primary_uid(self): - return self.uids[0] - - def get_matching_uid(self, fromaddr): - for uid in self.uids: - if fromaddr == uid[1]: - return uid - - logger.debug('No exact match, returning primary UID') - return self.uids[0] - - def get_trailer(self, fromaddr): - if fromaddr: - uid = self.get_matching_uid(fromaddr) - else: - uid = self.uids[0] - - return '%s <%s> (pgp: %s)' % (uid[0], uid[1], self.keyid) - - -class LoreAttestationSignature: - def __init__(self, msg): - self.msg = msg + mode: Optional[str] + level: Optional[str] + identity: Optional[str] + signtime: Optional[str] + keysrc: Optional[str] + keyalgo: Optional[str] + passing: bool + have_key: bool + errors: list + + def __init__(self) -> None: self.mode = None - self.present = False - self.good = False - self.valid = False - self.trusted = False + self.level = None + self.identity = None + self.signtime = None + self.keysrc = None + self.keyalgo = None self.passing = False - self.sigdate = None - self.attestor = None - self.errors = set() + self.have_key = False + self.errors = list() + @property + def checkmark(self) -> str: config = get_main_config() - try: - driftd = int(config['attestation-staleness-days']) - except ValueError: - driftd = 30 - - self.maxdrift = datetime.timedelta(days=driftd) - - def verify_time_drift(self) -> None: - msgdt = email.utils.parsedate_to_datetime(str(self.msg['Date'])) - sdrift = self.sigdate - msgdt - if sdrift > self.maxdrift: - self.passing = False - self.errors.add('Time drift between Date and t too great (%s)' % sdrift) - return - logger.debug('PASS : time drift between Date and t (%s)', sdrift) - - def verify_identity_domain(self, identity: str, domain: str): - # Domain is supposed to be present in identity - if not identity.endswith(domain): - logger.debug('domain (d=%s) is not in identity (i=%s)', domain, identity) - self.passing = False - return - fromeml = email.utils.getaddresses(self.msg.get_all('from', []))[0][1] - if identity.find('@') < 0: - logger.debug('identity must contain @ (i=%s)', identity) - self.passing = False - return - ilocal, idomain = identity.split('@') - # identity is supposed to be present in from - if not fromeml.endswith(f'@{idomain}'): - self.errors.add('identity (i=%s) does not match from (from=%s)' % (identity, fromeml)) - self.passing = False - return - logger.debug('identity and domain match From header') - - # @staticmethod - # def get_dkim_key(domain: str, selector: str, timeout: int = 5) -> Tuple[str, str]: - # global DNSCACHE - # if (domain, selector) in DNSCACHE: - # return DNSCACHE[(domain, selector)] - # - # name = f'{selector}._domainkey.{domain}.' - # logger.debug('DNS-lookup: %s', name) - # keydata = None - # try: - # a = dns.resolver.resolve(name, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout) # noqa - # # Find v=DKIM1 - # for r in a.response.answer: - # if r.rdtype == dns.rdatatype.TXT: - # for item in r.items: - # # Concatenate all strings - # txtdata = b''.join(item.strings) - # if txtdata.find(b'v=DKIM1') >= 0: - # keydata = txtdata.decode() - # break - # if keydata: - # break - # except dns.resolver.NXDOMAIN: # noqa - # raise LookupError('Domain %s does not exist', name) - # - # if not keydata: - # raise LookupError('Domain %s does not contain a DKIM record', name) - # - # parts = get_parts_from_header(keydata) - # if 'p' not in parts: - # raise LookupError('Domain %s does not contain a DKIM key', name) - # if 'k' not in parts: - # raise LookupError('Domain %s does not indicate key time', name) - # - # DNSCACHE[(domain, selector)] = (parts['k'], parts['p']) - # logger.debug('k=%s, p=%s', parts['k'], parts['p']) - # return parts['k'], parts['p'] - - def __repr__(self): - out = list() - out.append(' mode: %s' % self.mode) - out.append('present: %s' % self.present) - out.append(' good: %s' % self.good) - out.append(' valid: %s' % self.valid) - out.append('trusted: %s' % self.trusted) - if self.attestor is not None: - out.append(' attestor: %s' % self.attestor.keyid) - - out.append(' --- validation errors ---') - for error in self.errors: - out.append(' | %s' % error) - return '\n'.join(out) - - -class LoreAttestationSignatureDKIM(LoreAttestationSignature): - def __init__(self, msg): - super().__init__(msg) - self.mode = 'dkim' - # Doesn't quite work right, so just use dkimpy's native - # self.native_verify() - # return - - ejected = set() - while True: - dks = self.msg.get('dkim-signature') - if not dks: - logger.debug('No DKIM-Signature headers in the message') - return - - self.present = True - - ddata = get_parts_from_header(dks) - self.attestor = LoreAttestorDKIM(ddata['d']) - # Do we have a resolve method? - if _resolver and hasattr(_resolver, 'resolve'): - res = dkim.verify(self.msg.as_bytes(), dnsfunc=dkim_get_txt) - else: - res = dkim.verify(self.msg.as_bytes()) - if not res: - # is list-archive or archived-at part of h=? - hline = ddata.get('h') - if hline: - hsigned = set(hline.lower().split(':')) - if 'list-archive' in hsigned or 'archived-at' in hsigned: - # Public-inbox inserts additional List-Archive and Archived-At headers, - # which breaks DKIM signatures if these headers are included in the hash. - # Eject the ones created by public-inbox and try again. - # XXX: This may no longer be necessary at some point if public-inbox takes care - # of this scenario automatically: - # https://public-inbox.org/meta/20201210202145.7agtcmrtl5jec42d@chatter.i7.local - logger.debug('Ejecting extra List-Archive headers and retrying') - changed = False - for header in reversed(self.msg._headers): # noqa - hl = header[0].lower() - if hl in ('list-archive', 'archived-at') and hl not in ejected: - self.msg._headers.remove(header) # noqa - ejected.add(hl) - changed = True - break - if changed: - # go for another round - continue - - logger.debug('DKIM signature did NOT verify') - logger.debug('Retrying with the next DKIM-Signature header, if any') - at = 0 - for header in self.msg._headers: # noqa - if header[0].lower() == 'dkim-signature': - del(self.msg._headers[at]) # noqa - break - at += 1 - continue + if config['attestation-checkmarks'] == 'fancy': + if self.passing: + return ATT_PASS_FANCY + return ATT_FAIL_FANCY + if self.passing: + return ATT_PASS_SIMPLE + return ATT_FAIL_SIMPLE - self.good = True + @property + def trailer(self): + if self.keyalgo: + mode = self.keyalgo + else: + mode = self.mode - # Grab toplevel signature that we just verified - self.valid = True - self.trusted = True - self.passing = True + return '%s/%s' % (mode, self.identity) - if ddata.get('t'): - self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc) - else: - self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date'])) - return + def check_time_drift(self, emldate, maxdays: int = 7) -> bool: + if not self.passing or self.signtime is None: + return False - # def native_verify(self): - # dks = self.msg.get('dkim-signature') - # ddata = get_parts_from_header(dks) - # try: - # kt, kp = LoreAttestationSignature.get_dkim_key(ddata['d'], ddata['s']) - # if kt not in ('rsa',): # 'ed25519'): - # logger.debug('DKIM key type %s not supported', kt) - # return - # pk = base64.b64decode(kp) - # sig = base64.b64decode(ddata['b']) - # except (LookupError, binascii.Error) as ex: - # logger.debug('Unable to look up DKIM key: %s', ex) - # return - # - # headers = list() - # - # for header in ddata['h'].split(':'): - # # For the POC, we assume 'relaxed/' - # hval = self.msg.get(header) - # if hval is None: - # # Missing headers are omitted by the DKIM RFC - # continue - # if ddata['c'].startswith('relaxed/'): - # hname, hval = dkim_canonicalize_header(header, str(self.msg.get(header))) - # else: - # hname = header - # hval = str(self.msg.get(header)) - # headers.append(f'{hname}:{hval}') - # - # # Now we add the dkim-signature header itself, without b= content - # if ddata['c'].startswith('relaxed/'): - # dname, dval = dkim_canonicalize_header('dkim-signature', dks) - # else: - # dname = 'DKIM-Signature' - # dval = dks - # - # dval = dval.rsplit('; b=')[0] + '; b=' - # headers.append(f'{dname}:{dval}') - # payload = ('\r\n'.join(headers)).encode() - # key = RSA.import_key(pk) - # hashed = SHA256.new(payload) - # try: - # # noinspection PyTypeChecker - # pkcs1_15.new(key).verify(hashed, sig) - # except (ValueError, TypeError): - # logger.debug('DKIM signature did not verify') - # self.errors.add('The DKIM signature did NOT verify!') - # return - # - # self.good = True - # if not ddata.get('i'): - # ddata['i'] = '@' + ddata['d'] - # - # logger.debug('PASS : DKIM signature for d=%s, s=%s', ddata['d'], ddata['s']) - # - # self.attestor = LoreAttestorDKIM(ddata['d']) - # self.valid = True - # self.trusted = True - # self.passing = True - # - # self.verify_identity_domain(ddata['i'], ddata['d']) - # if ddata.get('t'): - # self.sigdate = datetime.datetime.utcfromtimestamp(int(ddata['t'])).replace(tzinfo=datetime.timezone.utc) - # self.verify_time_drift() - # else: - # self.sigdate = email.utils.parsedate_to_datetime(str(self.msg['Date'])) - - -class LoreAttestationSignaturePGP(LoreAttestationSignature): - def __init__(self, msg): - super().__init__(msg) - self.mode = 'pgp' + try: + sigdate = datetime.datetime.utcfromtimestamp(int(self.signtime)) + except: # noqa + self.errors.append('failed parsing signature date: %s' % self.signtime) + return False - shdr = msg.get(HDR_PATCH_SIG) - if not shdr: - return + maxdrift = datetime.timedelta(days=maxdays) - self.present = True - sdata = get_parts_from_header(shdr) - hhdr = msg.get(HDR_PATCH_HASHES) - sig = base64.b64decode(sdata['b']) - headers = list() - hhname, hhval = dkim_canonicalize_header(HDR_PATCH_HASHES, str(hhdr)) - headers.append(f'{hhname}:{hhval}') - # Now we add the sig header itself, without b= content - shname, shval = dkim_canonicalize_header(HDR_PATCH_SIG, shdr) - shval = shval.rsplit('; b=')[0] + '; b=' - headers.append(f'{shname}:{shval}') - payload = ('\r\n'.join(headers)).encode() - savefile = mkstemp('in-header-pgp-verify')[1] - with open(savefile, 'wb') as fh: - fh.write(sig) - - gpgargs = list() - config = get_main_config() - trustmodel = config.get('attestation-trust-model', 'tofu') - if trustmodel == 'tofu': - gpgargs += ['--trust-model', 'tofu', '--tofu-default-policy', 'good'] - gpgargs += ['--verify', '--status-fd=1', savefile, '-'] - ecode, out, err = gpg_run_command(gpgargs, stdin=payload) - os.unlink(savefile) - output = out.decode() - - self.good, self.valid, self.trusted, self.attestor, self.sigdate, self.errors = \ - validate_gpg_signature(output, trustmodel) - - if self.good and self.valid and self.trusted: - self.passing = True - self.verify_time_drift() - # XXX: Need to verify identity domain + sdrift = sigdate - emldate + if sdrift > maxdrift: + self.errors.append('Time drift between Date and t too great (%s)' % sdrift) + return False + logger.debug('PASS : time drift between Date and t (%s)', sdrift) + return True -class LoreAttestation: - def __init__(self, _i, _m, _p): - self.i = _i.hexdigest() - self.m = _m.hexdigest() - self.p = _p.hexdigest() - self.ib = base64.b64encode(_i.digest()).decode() - self.mb = base64.b64encode(_m.digest()).decode() - self.pb = base64.b64encode(_p.digest()).decode() + def check_identity(self, emlfrom: str) -> bool: + if not self.passing or not emlfrom: + return False - self.lsig = None - self.passing = False - self.iv = False - self.mv = False - self.pv = False + if self.level == 'domain': + if emlfrom.endswith('@' + self.identity): + logger.debug('PASS : sig domain %s matches from identity %s', self.identity, emlfrom) + return True + self.errors.append('signing domain %s does not match From: %s' % (self.identity, emlfrom)) + return False - @property - def attid(self): - return '%s-%s-%s' % (self.i[:8], self.m[:8], self.p[:8]) + if emlfrom == self.identity: + logger.debug('PASS : sig identity %s matches from identity %s', self.identity, emlfrom) + return True + self.errors.append('signing identity %s does not match From: %s' % (self.identity, emlfrom)) + return False def __repr__(self): out = list() - out.append(' i: %s' % self.i) - out.append(' m: %s' % self.m) - out.append(' p: %s' % self.p) - out.append(' ib: %s' % self.ib) - out.append(' mb: %s' % self.mb) - out.append(' pb: %s' % self.pb) - out.append(' iv: %s' % self.iv) - out.append(' mv: %s' % self.mv) - out.append(' pv: %s' % self.pv) - out.append(' pass: %s' % self.passing) + out.append(' mode: %s' % self.mode) + out.append(' level: %s' % self.level) + out.append('identity: %s' % self.identity) + out.append('signtime: %s' % self.signtime) + out.append(' keysrc: %s' % self.keysrc) + out.append(' keyalgo: %s' % self.keyalgo) + out.append(' passing: %s' % self.passing) + out.append('have_key: %s' % self.have_key) + out.append(' errors: %s' % ','.join(self.errors)) return '\n'.join(out) - def validate(self, msg): - # Check if we have a X-Patch-Sig header. At this time, we only support two modes: - # - GPG mode, which we check for fist - # - Plain DKIM mode, which we check as fall-back - # More modes may be coming in the future, depending on feedback. - shdr = msg.get(HDR_PATCH_SIG) - hhdr = msg.get(HDR_PATCH_HASHES) - if hhdr is None: - # Do we have a dkim signature header? - if can_dkim_verify and msg.get('DKIM-Signature'): - config = get_main_config() - if config.get('attestation-check-dkim') == 'yes': - self.lsig = LoreAttestationSignatureDKIM(msg) - if self.lsig.passing: - self.passing = True - self.iv = True - self.mv = True - self.pv = True - return self.passing - return None - - if shdr is None: - return None - - sdata = get_parts_from_header(shdr) - if sdata.get('m') == 'pgp': - self.lsig = LoreAttestationSignaturePGP(msg) - if self.lsig.passing: - hdata = get_parts_from_header(hhdr) - if hdata['i'] == self.ib: - self.iv = True - if hdata['m'] == self.mb: - self.mv = True - if hdata['p'] == self.pb: - self.pv = True - - if self.iv and self.mv and self.pv: - self.passing = True - - if self.lsig is None: - return None - return self.passing +class LoreAttestorDKIM(LoreAttestor): + def __init__(self, passing: bool, identity: str, signtime: str, errors: list) -> None: + super().__init__() + self.mode = 'DKIM' + self.level = 'domain' + self.keysrc = 'DNS' + self.signtime = signtime + self.passing = passing + self.identity = identity.lstrip('@') + self.errors = errors + + +class LoreAttestorPatatt(LoreAttestor): + def __init__(self, result: bool, identity: str, signtime: str, keysrc: str, keyalgo: str, errors: list) -> None: + super().__init__() + self.mode = 'patatt' + self.level = 'person' + self.identity = identity + self.signtime = signtime + self.keysrc = keysrc + self.keyalgo = keyalgo + self.errors = errors + if result == patatt.RES_VALID: + self.passing = True + self.have_key = True -def _run_command(cmdargs, stdin=None): +def _run_command(cmdargs: list, stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]: logger.debug('Running %s' % ' '.join(cmdargs)) - - sp = subprocess.Popen(cmdargs, - stdout=subprocess.PIPE, - stdin=subprocess.PIPE, - stderr=subprocess.PIPE) - + sp = subprocess.Popen(cmdargs, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE) (output, error) = sp.communicate(input=stdin) return sp.returncode, output, error -def gpg_run_command(args, stdin=None): +def gpg_run_command(args: list[str], stdin: Optional[bytes] = None) -> Tuple[int, bytes, bytes]: config = get_main_config() cmdargs = [config['gpgbin'], '--batch', '--no-auto-key-retrieve', '--no-auto-check-trustdb'] if config['attestation-gnupghome'] is not None: @@ -1968,7 +1635,8 @@ def gpg_run_command(args, stdin=None): return _run_command(cmdargs, stdin=stdin) -def git_run_command(gitdir, args, stdin=None, logstderr=False): +def git_run_command(gitdir: Optional[str], args: list[str], stdin: Optional[bytes] = None, + logstderr: bool = False) -> Tuple[int, str]: cmdargs = ['git', '--no-pager'] if gitdir: if os.path.isdir(os.path.join(gitdir, '.git')): @@ -1988,7 +1656,7 @@ def git_run_command(gitdir, args, stdin=None, logstderr=False): return ecode, out -def git_get_command_lines(gitdir, args): +def git_get_command_lines(gitdir: Optional[str], args: list) -> list[str]: ecode, out = git_run_command(gitdir, args) lines = list() if out: @@ -2048,7 +1716,9 @@ def in_directory(dirname): os.chdir(cdir) -def get_config_from_git(regexp, defaults=None): +def get_config_from_git(regexp: str, defaults: Optional[dict] = None, multivals: Optional[list] = None) -> dict: + if multivals is None: + multivals = list() args = ['config', '-z', '--get-regexp', regexp] ecode, out = git_run_command(None, args) gitconfig = defaults @@ -2063,18 +1733,23 @@ def get_config_from_git(regexp, defaults=None): key, value = line.split('\n', 1) try: chunks = key.split('.') - cfgkey = chunks[-1] - gitconfig[cfgkey.lower()] = value + cfgkey = chunks[-1].lower() + if cfgkey in multivals: + if cfgkey not in gitconfig: + gitconfig[cfgkey] = list() + gitconfig[cfgkey].append(value) + else: + gitconfig[cfgkey] = value except ValueError: logger.debug('Ignoring git config entry %s', line) return gitconfig -def get_main_config(): +def get_main_config() -> dict: global MAIN_CONFIG if MAIN_CONFIG is None: - config = get_config_from_git(r'b4\..*', defaults=DEFAULT_CONFIG) + config = get_config_from_git(r'b4\..*', defaults=DEFAULT_CONFIG, multivals=['keyringsrc']) # Legacy name was get-lore-mbox, so load those as well config = get_config_from_git(r'get-lore-mbox\..*', defaults=config) config['trailer-order'] = config['trailer-order'].split(',') @@ -2085,23 +1760,23 @@ def get_main_config(): return MAIN_CONFIG -def get_data_dir(): +def get_data_dir(appname: str = 'b4') -> str: if 'XDG_DATA_HOME' in os.environ: datahome = os.environ['XDG_DATA_HOME'] else: datahome = os.path.join(str(Path.home()), '.local', 'share') - datadir = os.path.join(datahome, 'b4') + datadir = os.path.join(datahome, appname) Path(datadir).mkdir(parents=True, exist_ok=True) return datadir -def get_cache_dir(): +def get_cache_dir(appname: str = 'b4') -> str: global _CACHE_CLEANED if 'XDG_CACHE_HOME' in os.environ: cachehome = os.environ['XDG_CACHE_HOME'] else: cachehome = os.path.join(str(Path.home()), '.cache') - cachedir = os.path.join(cachehome, 'b4') + cachedir = os.path.join(cachehome, appname) Path(cachedir).mkdir(parents=True, exist_ok=True) if _CACHE_CLEANED: return cachedir @@ -2426,83 +2101,43 @@ def parse_int_range(intrange, upper=None): logger.critical('Unknown range value specified: %s', n) -def dkim_canonicalize_header(hname, hval): - hname = hname.lower() - hval = hval.strip() - hval = re.sub(r'\n', '', hval) - hval = re.sub(r'\s+', ' ', hval) - return hname, hval - - -def get_parts_from_header(hstr: str) -> dict: - hstr = re.sub(r'\s*', '', hstr) - hdata = dict() - for chunk in hstr.split(';'): - parts = chunk.split('=', 1) - if len(parts) < 2: - continue - hdata[parts[0]] = parts[1] - return hdata - - -def validate_gpg_signature(output, trustmodel): +def check_gpg_status(status: str) -> Tuple[bool, bool, bool, str, str]: good = False valid = False trusted = False - attestor = None - sigdate = None - errors = set() - gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+.*$', output, re.M) + keyid = None + signtime = '' + + gs_matches = re.search(r'^\[GNUPG:] GOODSIG ([0-9A-F]+)\s+(.*)$', status, flags=re.M) if gs_matches: - logger.debug(' GOODSIG') good = True - keyid = gs_matches.groups()[0] - attestor = LoreAttestorPGP(keyid) - puid = '%s <%s>' % attestor.get_primary_uid() - vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', output, re.M) - if vs_matches: - logger.debug(' VALIDSIG') - valid = True - ymd = vs_matches.groups()[1] - sigdate = datetime.datetime.strptime(ymd, '%Y-%m-%d').replace(tzinfo=datetime.timezone.utc) - # Do we have a TRUST_(FULLY|ULTIMATE)? - ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', output, re.M) - if ts_matches: - logger.debug(' TRUST_%s', ts_matches.groups()[0]) - trusted = True - else: - errors.add('Insufficient trust (model=%s): %s (%s)' % (trustmodel, keyid, puid)) - else: - errors.add('Signature not valid from key: %s (%s)' % (attestor.keyid, puid)) - else: - # Are we missing a key? - matches = re.search(r'^\[GNUPG:] NO_PUBKEY ([0-9A-F]+)$', output, re.M) - if matches: - errors.add('Missing public key: %s' % matches.groups()[0]) - # Is the key expired? - matches = re.search(r'^\[GNUPG:] EXPKEYSIG (.*)$', output, re.M) - if matches: - errors.add('Expired key: %s' % matches.groups()[0]) - - return good, valid, trusted, attestor, sigdate, errors - - -def dkim_get_txt(name: bytes, timeout: int = 5): - global _DKIM_DNS_CACHE - if name not in _DKIM_DNS_CACHE: - lookup = name.decode() - logger.debug('DNS-lookup: %s', lookup) - try: - a = _resolver.resolve(lookup, dns.rdatatype.TXT, raise_on_no_answer=False, lifetime=timeout, search=True) - for r in a.response.answer: - if r.rdtype == dns.rdatatype.TXT: - for item in r.items: - # Concatenate all strings - txtdata = b''.join(item.strings) - if txtdata.find(b'p=') >= 0: - _DKIM_DNS_CACHE[name] = txtdata - return txtdata - except dns.resolver.NXDOMAIN: - pass - _DKIM_DNS_CACHE[name] = None - return _DKIM_DNS_CACHE[name] + vs_matches = re.search(r'^\[GNUPG:] VALIDSIG ([0-9A-F]+) (\d{4}-\d{2}-\d{2}) (\d+)', status, flags=re.M) + if vs_matches: + valid = True + keyid = vs_matches.groups()[0] + signtime = vs_matches.groups()[2] + ts_matches = re.search(r'^\[GNUPG:] TRUST_(FULLY|ULTIMATE)', status, flags=re.M) + if ts_matches: + trusted = True + + return good, valid, trusted, keyid, signtime + + +def get_gpg_uids(keyid: str) -> list: + gpgargs = ['--with-colons', '--list-keys', keyid] + ecode, out, err = gpg_run_command(gpgargs) + if ecode > 0: + raise KeyError('Unable to get UIDs list matching key %s' % keyid) + + keyinfo = out.decode() + uids = list() + for line in keyinfo.split('\n'): + if line[:4] != 'uid:': + continue + chunks = line.split(':') + if chunks[1] in ('r',): + # Revoked UID, ignore + continue + uids.append(chunks[9]) + + return uids \ No newline at end of file diff --git a/b4/attest.py b/b4/attest.py index 2b9ccab..a6acb28 100644 --- a/b4/attest.py +++ b/b4/attest.py @@ -5,150 +5,28 @@ # import sys -import email -import email.utils -import email.message -import email.header import b4 import argparse -import base64 +try: + import patatt + can_patatt = True +except ModuleNotFoundError: + can_patatt = False -logger = b4.logger - - -def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None: - if lmsg.msg.get(b4.HDR_PATCH_HASHES): - if not replace: - logger.info(' attest: message already attested') - return - del lmsg.msg[b4.HDR_PATCH_HASHES] - del lmsg.msg[b4.HDR_PATCH_SIG] - - logger.info(' attest: generating attestation hashes') - if not lmsg.attestation: - raise RuntimeError('Could not calculate patch attestation') - - headers = list() - hparts = [ - 'v=1', - 'h=sha256', - f'i={lmsg.attestation.ib}', - f'm={lmsg.attestation.mb}', - f'p={lmsg.attestation.pb}', - ] - if lmsg.git_patch_id: - hparts.append(f'g={lmsg.git_patch_id}') - - hhname, hhval = b4.dkim_canonicalize_header(b4.HDR_PATCH_HASHES, '; '.join(hparts)) - headers.append(f'{hhname}:{hhval}') - - logger.debug('Signing with mode=%s', mode) - if mode == 'pgp': - usercfg = b4.get_user_config() - keyid = usercfg.get('signingkey') - identity = usercfg.get('email') - if not identity: - raise RuntimeError('Please set user.email to use this feature') - if not keyid: - raise RuntimeError('Please set user.signingKey to use this feature') - - logger.debug('Using i=%s, s=0x%s', identity, keyid.rstrip('!')) - gpgargs = ['-b', '-u', f'{keyid}'] +from collections import namedtuple - hparts = [ - 'm=pgp', - f'i={identity}', - 's=0x%s' % keyid.rstrip('!'), - 'b=', - ] - - shname, shval = b4.dkim_canonicalize_header(b4.HDR_PATCH_SIG, '; '.join(hparts)) - headers.append(f'{shname}:{shval}') - payload = '\r\n'.join(headers).encode() - ecode, out, err = b4.gpg_run_command(gpgargs, payload) - if ecode > 0: - logger.critical('Running gpg failed') - logger.critical(err.decode()) - raise RuntimeError('Running gpg failed') - bdata = base64.b64encode(out).decode() - shval += header_splitter(bdata) - else: - raise NotImplementedError('Mode %s not implemented' % mode) - - hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78) - shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78) - lmsg.msg[b4.HDR_PATCH_HASHES] = hhdr - lmsg.msg[b4.HDR_PATCH_SIG] = shdr - - -def header_splitter(longstr: str, limit: int = 77) -> str: - splitstr = list() - first = True - while len(longstr) > limit: - at = limit - if first: - first = False - at -= 2 - splitstr.append(longstr[:at]) - longstr = longstr[at:] - splitstr.append(longstr) - return ' '.join(splitstr) +logger = b4.logger def attest_patches(cmdargs: argparse.Namespace) -> None: - for pf in cmdargs.patchfile: - with open(pf, 'rb') as fh: - msg = email.message_from_bytes(fh.read()) - lmsg = b4.LoreMessage(msg) - lmsg.load_hashes() - if not lmsg.attestation: - logger.debug('Nothing to attest in %s, skipped') - continue - logger.info('Attesting: %s', pf) - in_header_attest(lmsg, replace=True) - with open(pf, 'wb') as fh: - fh.write(lmsg.msg.as_bytes()) - - -def mutt_filter() -> None: - if sys.stdin.isatty(): - logger.error('Error: Mutt mode expects a message on stdin') + if not can_patatt: + logger.critical('ERROR: b4 now uses patatt for patch attestation. See:') + logger.critical(' https://git.kernel.org/pub/scm/utils/patatt/patatt.git/about/') sys.exit(1) - inb = sys.stdin.buffer.read() - # Quick exit if we don't find x-patch-sig - if inb.find(b'X-Patch-Sig:') < 0: - sys.stdout.buffer.write(inb) - return - msg = email.message_from_bytes(inb) - try: - if msg.get('x-patch-sig'): - lmsg = b4.LoreMessage(msg) - lmsg.load_hashes() - latt = lmsg.attestation - if latt: - if latt.validate(msg): - trailer = latt.lsig.attestor.get_trailer(lmsg.fromemail) - msg.add_header('Attested-By', trailer) - elif latt.lsig: - if not latt.lsig.errors: - failed = list() - if not latt.pv: - failed.append('patch content') - if not latt.mv: - failed.append('commit message') - if not latt.iv: - failed.append('patch metadata') - latt.lsig.errors.add('signature failed (%s)' % ', '.join(failed)) - msg.add_header('Attestation-Failed', ', '.join(latt.lsig.errors)) - # Delete the x-patch-hashes and x-patch-sig headers so - # they don't boggle up the view - for i in reversed(range(len(msg._headers))): # noqa - hdrName = msg._headers[i][0].lower() # noqa - if hdrName in ('x-patch-hashes', 'x-patch-sig'): - del msg._headers[i] # noqa - except: # noqa - # Don't prevent email from being displayed even if we died horribly - sys.stdout.buffer.write(inb) - return - sys.stdout.buffer.write(msg.as_bytes(policy=b4.emlpolicy)) + # directly invoke cmd_sign in patatt + config = patatt.get_config_from_git(r'patatt\..*', multivals=['keyringsrc']) + fakeargs = namedtuple('Struct', ['hookmode', 'msgfile']) + fakeargs.hookmode = True + fakeargs.msgfile = cmdargs.patchfile + patatt.cmd_sign(fakeargs, config) diff --git a/b4/command.py b/b4/command.py index 59fefbf..ff457dd 100644 --- a/b4/command.py +++ b/b4/command.py @@ -42,9 +42,7 @@ def cmd_am(cmdargs): def cmd_attest(cmdargs): import b4.attest - if cmdargs.mutt_filter: - b4.attest.mutt_filter() - elif len(cmdargs.patchfile): + if len(cmdargs.patchfile): b4.attest.attest_patches(cmdargs) else: logger.critical('ERROR: missing patches to attest') @@ -128,8 +126,8 @@ def cmd(): help='OBSOLETE: this option does nothing and will be removed') sp_att.add_argument('-o', '--output', default=None, help='OBSOLETE: this option does nothing and will be removed') - sp_att.add_argument('-m', '--mutt-filter', action='store_true', default=False, - help='Run in mutt filter mode') + sp_att.add_argument('-m', '--mutt-filter', default=None, + help='OBSOLETE: this option does nothign and will be removed') sp_att.add_argument('patchfile', nargs='*', help='Patches to attest') sp_att.set_defaults(func=cmd_attest) diff --git a/b4/mbox.py b/b4/mbox.py index d3bde25..275a42a 100644 --- a/b4/mbox.py +++ b/b4/mbox.py @@ -273,8 +273,7 @@ def thanks_record_am(lser, cherrypick=None): at += 1 continue - pmsg.load_hashes() - if pmsg.attestation is None: + if pmsg.pwhash is None: logger.debug('Unable to get hashes for all patches, not tracking for thanks') return diff --git a/b4/pr.py b/b4/pr.py index 5e6c7a1..374d8ed 100644 --- a/b4/pr.py +++ b/b4/pr.py @@ -122,11 +122,11 @@ def attest_fetch_head(gitdir, lmsg): config = b4.get_main_config() attpolicy = config['attestation-policy'] if config['attestation-checkmarks'] == 'fancy': - attpass = b4.PASS_FANCY - attfail = b4.FAIL_FANCY + attpass = b4.ATT_PASS_FANCY + attfail = b4.ATT_FAIL_FANCY else: - attpass = b4.PASS_SIMPLE - attfail = b4.FAIL_SIMPLE + attpass = b4.ATT_PASS_SIMPLE + attfail = b4.ATT_FAIL_SIMPLE # Is FETCH_HEAD a tag or a commit? htype = b4.git_get_command_lines(gitdir, ['cat-file', '-t', 'FETCH_HEAD']) passing = False @@ -139,17 +139,30 @@ def attest_fetch_head(gitdir, lmsg): elif otype == 'commit': ecode, out = b4.git_run_command(gitdir, ['verify-commit', '--raw', 'FETCH_HEAD'], logstderr=True) - good, valid, trusted, attestor, sigdate, errors = b4.validate_gpg_signature(out, 'pgp') - - if good and valid and trusted: + good, valid, trusted, keyid, sigtime = b4.check_gpg_status(out) + try: + uids = b4.get_gpg_uids(keyid) + signer = None + for uid in uids: + if uid.find(f'<{lmsg.fromemail}') >= 0: + signer = uid + break + if not signer: + signer = uids[0] + + except KeyError: + signer = f'{lmsg.fromname} <{lmsg.fromemail}' + + if good and valid: passing = True out = out.strip() + errors = set() if not len(out) and attpolicy != 'check': errors.add('Remote %s is not signed!' % otype) if passing: - trailer = attestor.get_trailer(lmsg.fromemail) + trailer = 'Signed: %s' % signer logger.info(' ---') logger.info(' %s %s', attpass, trailer) return diff --git a/patatt b/patatt new file mode 160000 index 0000000..e68ab47 --- /dev/null +++ b/patatt @@ -0,0 +1 @@ +Subproject commit e68ab474eb9baa62df43161d4cc46a512c853d52 diff --git a/requirements.txt b/requirements.txt index 11d400d..a9a4be4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,5 @@ -requests ~= 2.24.0 +requests~=2.25.0 # These are optional, needed for attestation features -dnspython~=2.0.0 +dnspython~=2.1.0 dkimpy~=1.0.5 -# These may be required in the future for other patch attestation features -#pycryptodomex~=3.9.9 -#PyNaCl~=1.4.0 +patatt>=0.2.0,<2.0 -- cgit v1.2.3