diff options
Diffstat (limited to 'b4/ez.py')
-rw-r--r-- | b4/ez.py | 219 |
1 files changed, 146 insertions, 73 deletions
@@ -24,9 +24,6 @@ import base64 import textwrap import gzip -# from nacl.signing import SigningKey -# from nacl.encoding import Base64Encoder - from typing import Optional, Tuple, List from email import utils from string import Template @@ -71,76 +68,146 @@ Changes in v${newrev}: """ -# def auth_new(cmdargs: argparse.Namespace) -> None: -# # Check if we have a patatt signingkey already defined -# endpoint, name, email, ptskey = get_configs() -# skey, pkey = get_patatt_ed25519keys(ptskey) -# logger.info('Will submit a new email authorization request to:') -# logger.info(' Endpoint: %s', endpoint) -# logger.info(' Name: %s', name) -# logger.info(' Email: %s', email) -# logger.info(' Key: %s (%s)', pkey, ptskey) -# logger.info('---') -# confirm = input('Confirm selection [y/N]: ') -# if confirm != 'y': -# logger.info('Exiting') -# sys.exit(0) -# req = { -# 'action': 'auth-new', -# 'name': name, -# 'email': email, -# 'key': pkey, -# } -# ses = b4.get_requests_session() -# res = ses.post(endpoint, json=req) -# logger.info('---') -# if res.status_code == 200: -# try: -# rdata = res.json() -# if rdata.get('result') == 'success': -# logger.info('Challenge generated and sent to %s', email) -# logger.info('Once you receive it, run b4 submit --web-auth-verify [challenge-string]') -# sys.exit(0) -# -# except Exception as ex: # noqa -# logger.critical('Odd response from the endpoint: %s', res.text) -# sys.exit(1) -# -# logger.critical('500 response from the endpoint: %s', res.text) -# sys.exit(1) -# -# -# def auth_verify(cmdargs: argparse.Namespace) -> None: -# endpoint, name, email, ptskey = get_configs() -# skey, pkey = get_patatt_ed25519keys(ptskey) -# challenge = cmdargs.auth_verify -# logger.info('Signing challenge using key %s', ptskey) -# sk = SigningKey(skey.encode(), encoder=Base64Encoder) -# bdata = sk.sign(challenge.encode(), encoder=Base64Encoder) -# req = { -# 'action': 'auth-verify', -# 'name': name, -# 'email': email, -# 'challenge': challenge, -# 'sigdata': bdata.decode(), -# } -# ses = b4.get_requests_session() -# res = ses.post(endpoint, json=req) -# logger.info('---') -# if res.status_code == 200: -# try: -# rdata = res.json() -# if rdata.get('result') == 'success': -# logger.info('Challenge successfully verified for %s', email) -# logger.info('You may now use this endpoint for submitting patches.') -# sys.exit(0) -# -# except Exception as ex: # noqa -# logger.critical('Odd response from the endpoint: %s', res.text) -# sys.exit(1) -# -# logger.critical('500 response from the endpoint: %s', res.text) -# sys.exit(1) + +def get_auth_configs() -> Tuple[str, str, str, str, str, str]: + config = b4.get_main_config() + endpoint = config.get('send-endpoint-web') + if not endpoint: + raise RuntimeError('No web submission endpoint defined, set b4.send-endpoint-web') + + usercfg = b4.get_user_config() + myemail = usercfg.get('email') + if not myemail: + raise RuntimeError('No email configured, set user.email') + myname = usercfg.get('name') + pconfig = patatt.get_main_config() + selector = pconfig.get('selector', 'default') + algo, keydata = patatt.get_algo_keydata(pconfig) + return endpoint, myname, myemail, selector, algo, keydata + + +def auth_new(cmdargs: argparse.Namespace) -> None: + try: + endpoint, myname, myemail, selector, algo, keydata = get_auth_configs() + except patatt.NoKeyError as ex: + logger.critical('CRITICAL: no usable signing key configured') + logger.critical(' %s', ex) + sys.exit(1) + except RuntimeError as ex: + logger.critical('CRITICAL: unable to set up web authentication') + logger.critical(' %s', ex) + sys.exit(1) + + if algo == 'openpgp': + gpgargs = ['--export', '--export-options', 'export-minimal', '-a', keydata] + ecode, out, err = b4.gpg_run_command(gpgargs) + if ecode > 0: + logger.critical('CRITICAL: unable to get PGP public key for %s:%s', algo, keydata) + sys.exit(1) + pubkey = out.decode() + elif algo == 'ed25519': + from nacl.signing import SigningKey + from nacl.encoding import Base64Encoder + sk = SigningKey(keydata, encoder=Base64Encoder) + pubkey = base64.b64encode(sk.verify_key.encode()).decode() + else: + logger.critical('CRITICAL: algorithm %s not currently supported for web endpoint submission', algo) + sys.exit(1) + + logger.info('Will submit a new email authorization request to:') + logger.info(' Endpoint: %s', endpoint) + logger.info(' Name: %s', myname) + logger.info(' Identity: %s', myemail) + logger.info(' Selector: %s', selector) + if algo == 'openpgp': + logger.info(' Pubkey: %s:%s', algo, keydata) + else: + logger.info(' Pubkey: %s:%s', algo, pubkey) + logger.info('---') + try: + input('Press Enter to confirm or Ctrl-C to abort') + except KeyboardInterrupt: + logger.info('') + sys.exit(130) + + req = { + 'action': 'auth-new', + 'name': myname, + 'identity': myemail, + 'selector': selector, + 'pubkey': pubkey, + } + logger.info('Submitting new auth request to %s', endpoint) + ses = b4.get_requests_session() + try: + res = ses.post(endpoint, json=req) + res.raise_for_status() + except Exception as ex: + logger.critical('CRITICAL: unable to send endpoint request') + logger.critical(' %s', ex) + sys.exit(1) + logger.info('---') + if res.status_code == 200: + try: + rdata = res.json() + if rdata.get('result') == 'success': + logger.info('Challenge generated and sent to %s', myemail) + logger.info('Once you receive it, run b4 send --web-auth-verify [challenge-string]') + sys.exit(0) + + except Exception as ex: # noqa + logger.critical('Odd response from the endpoint: %s', res.text) + sys.exit(1) + + logger.critical('500 response from the endpoint: %s', res.text) + sys.exit(1) + + +def auth_verify(cmdargs: argparse.Namespace) -> None: + vstr = cmdargs.auth_verify + endpoint, myname, myemail, selector, algo, keydata = get_auth_configs() + logger.info('Signing challenge') + # Create a minimal message + cmsg = email.message.EmailMessage() + cmsg.add_header('From', myemail) + cmsg.add_header('Subject', 'b4-send-verify') + cmsg.set_payload(f'verify:{vstr}\n') + bdata = cmsg.as_bytes(policy=b4.emlpolicy) + try: + bdata = patatt.rfc2822_sign(bdata).decode() + except patatt.SigningError as ex: + logger.critical('CRITICAL: Unable to sign verification message') + logger.critical(' %s', ex) + sys.exit(1) + + req = { + 'action': 'auth-verify', + 'msg': bdata.encode(), + } + logger.info('Submitting verification to %s', endpoint) + ses = b4.get_requests_session() + try: + res = ses.post(endpoint, json=req) + res.raise_for_status() + except Exception as ex: + logger.critical('CRITICAL: unable to send endpoint request') + logger.critical(' %s', ex) + sys.exit(1) + logger.info('---') + if res.status_code == 200: + try: + rdata = res.json() + if rdata.get('result') == 'success': + logger.info('Challenge successfully verified for %s', myemail) + logger.info('You may now use this endpoint for submitting patches.') + sys.exit(0) + + except Exception as ex: # noqa + logger.critical('Odd response from the endpoint: %s', res.text) + sys.exit(1) + + logger.critical('500 response from the endpoint: %s', res.text) + sys.exit(1) def get_rev_count(revrange: str, maxrevs: Optional[int] = 500) -> int: @@ -865,6 +932,12 @@ def format_patch(output_dir: str) -> None: def cmd_send(cmdargs: argparse.Namespace) -> None: + if cmdargs.auth_new: + auth_new(cmdargs) + return + if cmdargs.auth_verify: + auth_verify(cmdargs) + return # Check if the cover letter has 'EDITME' in it cover, tracking = load_cover(strip_comments=True) if 'EDITME' in cover: |