aboutsummaryrefslogtreecommitdiff
path: root/b4/attest.py
blob: 5e3d3846dfce4f9222e75b3a857970509c6b58c3 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2020 by the Linux Foundation
#

import sys
import email
import email.utils
import email.message
import email.header
import b4
import argparse
import base64

logger = b4.logger


def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None:
    if lmsg.msg.get(b4.HDR_PATCH_HASHES):
        if not replace:
            logger.info(' attest: message already attested')
            return
        del lmsg.msg[b4.HDR_PATCH_HASHES]
        del lmsg.msg[b4.HDR_PATCH_SIG]

    logger.info(' attest: generating attestation hashes')
    if not lmsg.attestation:
        raise RuntimeError('Could not calculate patch attestation')

    headers = list()
    hparts = [
        'v=1',
        'h=sha256',
        f'g={lmsg.git_patch_id}',
        f'i={lmsg.attestation.ib}',
        f'm={lmsg.attestation.mb}',
        f'p={lmsg.attestation.pb}',
    ]
    hhname, hhval = b4.dkim_canonicalize_header(b4.HDR_PATCH_HASHES, '; '.join(hparts))
    headers.append(f'{hhname}:{hhval}')

    logger.debug('Signing with mode=%s', mode)
    if mode == 'pgp':
        usercfg = b4.get_user_config()
        keyid = usercfg.get('signingkey')
        if not keyid:
            raise RuntimeError('Please set user.signingKey to use this feature')

        logger.debug('Using i=%s, s=0x%s', lmsg.fromemail, keyid.rstrip('!'))
        gpgargs = ['-b', '-u', f'{keyid}']

        hparts = [
            'm=pgp',
            f'i={lmsg.fromemail}',
            's=0x%s' % keyid.rstrip('!'),
            'b=',
        ]

        shname, shval = b4.dkim_canonicalize_header(b4.HDR_PATCH_SIG, '; '.join(hparts))
        headers.append(f'{shname}:{shval}')
        payload = '\r\n'.join(headers).encode()
        ecode, out, err = b4.gpg_run_command(gpgargs, payload)
        if ecode > 0:
            logger.critical('Running gpg failed')
            logger.critical(err.decode())
            raise RuntimeError('Running gpg failed')
        bdata = base64.b64encode(out).decode()
        shval += header_splitter(bdata)
    else:
        raise NotImplementedError('Mode %s not implemented' % mode)

    hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78)
    shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
    lmsg.msg[b4.HDR_PATCH_HASHES] = hhdr
    lmsg.msg[b4.HDR_PATCH_SIG] = shdr


def header_splitter(longstr: str, limit: int = 77) -> str:
    splitstr = list()
    first = True
    while len(longstr) > limit:
        at = limit
        if first:
            first = False
            at -= 2
        splitstr.append(longstr[:at])
        longstr = longstr[at:]
    splitstr.append(longstr)
    return ' '.join(splitstr)


def attest_patches(cmdargs: argparse.Namespace) -> None:
    for pf in cmdargs.patchfile:
        with open(pf, 'rb') as fh:
            msg = email.message_from_bytes(fh.read())
        lmsg = b4.LoreMessage(msg)
        lmsg.load_hashes()
        if not lmsg.attestation:
            logger.debug('Nothing to attest in %s, skipped')
            continue
        logger.info('Attesting: %s', pf)
        in_header_attest(lmsg, replace=True)
        with open(pf, 'wb') as fh:
            fh.write(lmsg.msg.as_bytes())


def mutt_filter() -> None:
    if sys.stdin.isatty():
        logger.error('Error: Mutt mode expects a message on stdin')
        sys.exit(1)
    inb = sys.stdin.buffer.read()
    try:
        msg = email.message_from_bytes(inb)
        if msg.get('x-patch-sig'):
            lmsg = b4.LoreMessage(msg)
            lmsg.load_hashes()
            latt = lmsg.attestation
            if latt and latt.validate(msg):
                trailer = latt.lsig.attestor.get_trailer(lmsg.fromemail)
                msg.add_header('Attested-By', trailer)
                # Delete the x-patch-hashes and x-patch-sig headers so
                # they don't boggle up the view
                for i in reversed(range(len(msg._headers))):  # noqa
                    hdrName = msg._headers[i][0].lower()  # noqa
                    if hdrName in ('x-patch-hashes', 'x-patch-sig'):
                        del msg._headers[i]  # noqa
    except:  # noqa
        # Don't prevent email from being displayed even if we died horribly
        sys.stdout.buffer.write(inb)
        return
    sys.stdout.buffer.write(msg.as_bytes(policy=b4.emlpolicy))