1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# SPDX-License-Identifier: GPL-2.0-or-later
# Copyright (C) 2020 by the Linux Foundation
#
import sys
import email
import email.utils
import email.message
import email.header
import b4
import argparse
import base64
logger = b4.logger
def in_header_attest(lmsg: b4.LoreMessage, mode: str = 'pgp', replace: bool = False) -> None:
if lmsg.msg.get(b4.HDR_PATCH_HASHES):
if not replace:
logger.info(' attest: message already attested')
return
del lmsg.msg[b4.HDR_PATCH_HASHES]
del lmsg.msg[b4.HDR_PATCH_SIG]
logger.info(' attest: generating attestation hashes')
if not lmsg.attestation:
raise RuntimeError('Could not calculate patch attestation')
headers = list()
hparts = [
'v=1',
'h=sha256',
f'i={lmsg.attestation.ib}',
f'm={lmsg.attestation.mb}',
f'p={lmsg.attestation.pb}',
]
if lmsg.git_patch_id:
hparts.append(f'g={lmsg.git_patch_id}')
hhname, hhval = b4.dkim_canonicalize_header(b4.HDR_PATCH_HASHES, '; '.join(hparts))
headers.append(f'{hhname}:{hhval}')
logger.debug('Signing with mode=%s', mode)
if mode == 'pgp':
usercfg = b4.get_user_config()
keyid = usercfg.get('signingkey')
identity = usercfg.get('email')
if not identity:
raise RuntimeError('Please set user.email to use this feature')
if not keyid:
raise RuntimeError('Please set user.signingKey to use this feature')
logger.debug('Using i=%s, s=0x%s', identity, keyid.rstrip('!'))
gpgargs = ['-b', '-u', f'{keyid}']
hparts = [
'm=pgp',
f'i={identity}',
's=0x%s' % keyid.rstrip('!'),
'b=',
]
shname, shval = b4.dkim_canonicalize_header(b4.HDR_PATCH_SIG, '; '.join(hparts))
headers.append(f'{shname}:{shval}')
payload = '\r\n'.join(headers).encode()
ecode, out, err = b4.gpg_run_command(gpgargs, payload)
if ecode > 0:
logger.critical('Running gpg failed')
logger.critical(err.decode())
raise RuntimeError('Running gpg failed')
bdata = base64.b64encode(out).decode()
shval += header_splitter(bdata)
else:
raise NotImplementedError('Mode %s not implemented' % mode)
hhdr = email.header.make_header([(hhval.encode(), 'us-ascii')], maxlinelen=78)
shdr = email.header.make_header([(shval.encode(), 'us-ascii')], maxlinelen=78)
lmsg.msg[b4.HDR_PATCH_HASHES] = hhdr
lmsg.msg[b4.HDR_PATCH_SIG] = shdr
def header_splitter(longstr: str, limit: int = 77) -> str:
splitstr = list()
first = True
while len(longstr) > limit:
at = limit
if first:
first = False
at -= 2
splitstr.append(longstr[:at])
longstr = longstr[at:]
splitstr.append(longstr)
return ' '.join(splitstr)
def attest_patches(cmdargs: argparse.Namespace) -> None:
for pf in cmdargs.patchfile:
with open(pf, 'rb') as fh:
msg = email.message_from_bytes(fh.read())
lmsg = b4.LoreMessage(msg)
lmsg.load_hashes()
if not lmsg.attestation:
logger.debug('Nothing to attest in %s, skipped')
continue
logger.info('Attesting: %s', pf)
in_header_attest(lmsg, replace=True)
with open(pf, 'wb') as fh:
fh.write(lmsg.msg.as_bytes())
def mutt_filter() -> None:
if sys.stdin.isatty():
logger.error('Error: Mutt mode expects a message on stdin')
sys.exit(1)
inb = sys.stdin.buffer.read()
# Quick exit if we don't find x-patch-sig
if inb.find(b'X-Patch-Sig:') < 0:
sys.stdout.buffer.write(inb)
return
msg = email.message_from_bytes(inb)
try:
if msg.get('x-patch-sig'):
lmsg = b4.LoreMessage(msg)
lmsg.load_hashes()
latt = lmsg.attestation
if latt:
if latt.validate(msg):
trailer = latt.lsig.attestor.get_trailer(lmsg.fromemail)
msg.add_header('Attested-By', trailer)
elif latt.lsig:
if not latt.lsig.errors:
failed = list()
if not latt.pv:
failed.append('patch content')
if not latt.mv:
failed.append('commit message')
if not latt.iv:
failed.append('patch metadata')
latt.lsig.errors.add('signature failed (%s)' % ', '.join(failed))
msg.add_header('Attestation-Failed', ', '.join(latt.lsig.errors))
# Delete the x-patch-hashes and x-patch-sig headers so
# they don't boggle up the view
for i in reversed(range(len(msg._headers))): # noqa
hdrName = msg._headers[i][0].lower() # noqa
if hdrName in ('x-patch-hashes', 'x-patch-sig'):
del msg._headers[i] # noqa
except: # noqa
# Don't prevent email from being displayed even if we died horribly
sys.stdout.buffer.write(inb)
return
sys.stdout.buffer.write(msg.as_bytes(policy=b4.emlpolicy))
|