|
#!/usr/bin/env python3 |
|
|
|
""" |
|
Crude example trying to make sense of ECDHE / X25519 key exchange. |
|
|
|
We use ED25519 keys with COSE to mutually authenticate the peers. |
|
We then use ephemeral X25519 keys to do the actual key exchange. |
|
""" |
|
|
|
# BSD 3-Clause License |
|
# |
|
# © Stuart Longland <[email protected]> |
|
# All rights reserved. |
|
# |
|
# Redistribution and use in source and binary forms, with or without |
|
# modification, are permitted provided that the following conditions are met: |
|
# |
|
# * Redistributions of source code must retain the above copyright notice, this |
|
# list of conditions and the following disclaimer. |
|
# |
|
# * Redistributions in binary form must reproduce the above copyright notice, |
|
# this list of conditions and the following disclaimer in the documentation |
|
# and/or other materials provided with the distribution. |
|
# |
|
# * Neither the name of the copyright holder nor the names of its |
|
# contributors may be used to endorse or promote products derived from |
|
# this software without specific prior written permission. |
|
# |
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE |
|
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE |
|
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL |
|
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR |
|
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER |
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, |
|
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE |
|
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
|
|
|
import logging |
|
import argparse |
|
import os.path |
|
import os |
|
|
|
from pycose.algorithms import EdDSA, HMAC256 |
|
from pycose.headers import Algorithm, KID |
|
from pycose.keys import OKPKey, CoseKey |
|
from pycose.keys.keyparam import KpKty, SymKpK, KpKeyOps |
|
from pycose.keys.keytype import KtySymmetric |
|
from pycose.keys.keyops import MacCreateOp, MacVerifyOp |
|
from pycose.messages import Mac0Message, Sign1Message, CoseMessage |
|
|
|
from cryptography.hazmat.primitives import hashes |
|
from cryptography.hazmat.primitives.asymmetric.x25519 import ( |
|
X25519PrivateKey, |
|
X25519PublicKey, |
|
) |
|
from cryptography.hazmat.primitives.kdf.hkdf import HKDF |
|
|
|
import cbor2 |
|
|
|
|
|
# COSE key I/O functions |
|
|
|
|
|
def load_key(path): |
|
with open(path, "rb") as f: |
|
return CoseKey.decode(f.read()) |
|
|
|
|
|
def save_key(path, key): |
|
with open(path, "wb") as f: |
|
f.write(key.encode()) |
|
|
|
|
|
def dump_keyhex(key): |
|
return key.encode().hex() |
|
|
|
|
|
def read_keyhex(prompt, keyhex=None): |
|
if not keyhex: |
|
keyhex = input(prompt) |
|
|
|
keydata = bytes.fromhex(keyhex) |
|
return CoseKey.decode(keydata) |
|
|
|
|
|
def get_pubkey(privkey): |
|
return OKPKey.from_dict( |
|
{ |
|
"CURVE": privkey.crv, |
|
"X": privkey.x, |
|
} |
|
) |
|
|
|
|
|
# Message Authentication |
|
|
|
|
|
class Sign1Authenticator(object): |
|
""" |
|
This class simply manages the COSE OKPKey and |
|
abstracts signing and verification of COSE Sign1 messages |
|
""" |
|
|
|
def __init__(self, log): |
|
self._log = log |
|
self._private_key = None |
|
|
|
def load_private_key(self, path): |
|
""" |
|
Load a private key from a file. |
|
""" |
|
self._private_key = load_key(path) |
|
self._log.info("Private key loaded from %s", path) |
|
|
|
def save_private_key(self, path): |
|
""" |
|
Save a private key to a file. |
|
""" |
|
save_key(path, self._private_key) |
|
self._log.info("Private key saved to %s", path) |
|
|
|
def generate_private_key(self, crv="ED25519"): |
|
""" |
|
Generate a private key. |
|
""" |
|
self._log.info("Private key generated with curve %s", crv) |
|
self._private_key = OKPKey.generate_key(crv=crv) |
|
|
|
@property |
|
def private_key(self): |
|
""" |
|
Return the private key. |
|
""" |
|
return self._private_key |
|
|
|
@property |
|
def public_key(self): |
|
""" |
|
Return the public key. |
|
""" |
|
return get_pubkey(self._private_key) |
|
|
|
def encode_sign1(self, payload, kid=None): |
|
""" |
|
Encode a Sign1 message with the given payload. |
|
""" |
|
phdr = {Algorithm: EdDSA} |
|
|
|
if kid is not None: |
|
phdr[KID] = kid |
|
|
|
msg = Sign1Message(phdr=phdr, payload=bytes(payload)) |
|
msg.key = self._private_key |
|
return msg.encode() |
|
|
|
def decode_sign1(self, encoded, key=None): |
|
""" |
|
Decode and verify a Sign1 message. |
|
""" |
|
decoded = CoseMessage.decode(encoded) |
|
|
|
if key: |
|
self.verify_sign1(decoded, key) |
|
|
|
return decoded |
|
|
|
def verify_sign1(self, decoded, key): |
|
""" |
|
Verify a previously-decoded Sign1 message. |
|
""" |
|
decoded.key = key |
|
|
|
if not decoded.verify_signature(): |
|
raise ValueError("Bad signature") |
|
|
|
|
|
def encode_sign1(authn, args, log, **kwargs): |
|
if args.kid: |
|
kid = args.kid.encode() |
|
else: |
|
kid = None |
|
|
|
if args.payload: |
|
payload = args.payload |
|
else: |
|
payload = input("Payload: ") |
|
|
|
encoded = authn.encode_sign1(payload.encode(), kid) |
|
log.info("Encoded message: %s", encoded.hex()) |
|
|
|
|
|
def decode_sign1(authn, args, log, **kwargs): |
|
if args.key: |
|
keydata = bytes.fromhex(args.key) |
|
else: |
|
keydata = bytes.fromhex(input("Public key: ")) |
|
|
|
# Extract the key if given |
|
if keydata: |
|
key = CoseKey.decode(keydata) |
|
else: |
|
log.warning("Message will be unverified!") |
|
key = None |
|
|
|
if args.encoded: |
|
encoded = bytes.fromhex(args.encoded) |
|
else: |
|
encoded = bytes.fromhex(input("Encoded message: ")) |
|
|
|
decoded = authn.decode_sign1(encoded, key) |
|
log.info("Decoded protected header: %r", decoded.phdr) |
|
log.info("Decoded unprotected header: %r", decoded.uhdr) |
|
log.info("Decoded message: %s", decoded.payload.decode()) |
|
|
|
|
|
# X25519 Key exchange |
|
|
|
|
|
class X25519KeyExchange(object): |
|
""" |
|
X25519 Key Exchange manager. This handles the generation of ephemeral |
|
keys. It works with an authenticator class to protect against spoofing |
|
attacks. |
|
|
|
The protocol used here uses human-readable strings for the purpose of |
|
making a functional educational implementation… a real-life use case would |
|
probably use byte strings for this. This implementation also provides no |
|
protection against replay attacks! |
|
""" |
|
|
|
# Size of the nonce generated in bytes |
|
NONCE_SZ = 32 |
|
|
|
# Size of the salt generated in bytes |
|
SALT_SZ = 32 |
|
|
|
# Size of the info generated in bytes |
|
INFO_SZ = 256 |
|
|
|
# Size of the derived key in bytes |
|
DERIVED_KEY_SZ = 32 |
|
|
|
def __init__(self, authn, log): |
|
self._authn = authn |
|
self._log = log |
|
|
|
# Peer authentication public key |
|
self._peer_auth_key = None |
|
|
|
# Peer ephemeral public key |
|
self._peer_eph_key = None |
|
|
|
# Ephemeral X25519 private key |
|
self._private_key = None |
|
|
|
# Some data that we'll throw in to authenticate the key exchange |
|
self._info = None |
|
|
|
# A salt for protecting the key exchange |
|
self._salt = None |
|
|
|
# Derived key |
|
self._derived_key = None |
|
|
|
# A nonce used to prove we have successfully encoded a shared secret. |
|
self._nonce = None |
|
|
|
# XXX: the python-cryptography example seems to run the X25519 key |
|
# exchange algorithm twice, but the `derived_key` produced the first |
|
# time around is not used. It's not clear why they do this! We'll do |
|
# it just once for educational purposes. |
|
# |
|
# https://cryptography.io/en/latest/hazmat/primitives/asymmetric/x25519/ |
|
|
|
def _generate_key(self): |
|
""" |
|
Generate an ephemeral key for the key agreement. |
|
""" |
|
self._log.info("Generating ephemeral X25519 key") |
|
self._private_key = X25519PrivateKey.generate() |
|
|
|
def initiator_encode_request(self): |
|
""" |
|
Encode a connection request. This encodes a message to be sent to the |
|
peer we wish to communicate with, containing the authentication and |
|
ephemeral keys. |
|
""" |
|
# Begin by generating our ephemeral key |
|
self._generate_key() |
|
|
|
# NB: we're using strings here for readability, but an efficent |
|
# implementation would probably use dedicated byte value constants! |
|
# |
|
# We will send, in a message signed by us, two keys: |
|
# - the authentication key (secret key persisted) that we use to |
|
# "authenticate" ourselves to the remote end and will be used to |
|
# sign all key exchange traffic. |
|
# - the public part of the ephemeral key that we will use to negotiate |
|
# a shared secret. |
|
# |
|
# We will wrap this up in a CBOR-encoded map, with a type field (T) |
|
# which will indicate a connection request to the peer. |
|
|
|
payload = cbor2.dumps( |
|
{ |
|
# Message type: connection request… we wish to initiate a peer |
|
# connection with the recipient of this message. |
|
"T": "CONN-RQ", |
|
# Authentication key: we will sign our traffic with the private |
|
# counterpart of this key. Send the public key so they can verify |
|
# us. |
|
"AK": self._authn.public_key.encode(), |
|
# Ephemeral key: this is the key we'll use for X25519 key |
|
# exchange. It will be discarded when we are done! |
|
"EK": self._private_key.public_key().public_bytes_raw(), |
|
} |
|
) |
|
|
|
# Encode and sign this payload with our key |
|
self._log.info("Encoding CONN-RQ") |
|
return self._authn.encode_sign1(payload) |
|
|
|
def responder_decode_request(self, encoded): |
|
""" |
|
Decode an incoming request. Pull out the public keys being used. |
|
""" |
|
# Decode the message, we don't know what authentication key they're |
|
# using yet, so decode without. |
|
|
|
request_msg = self._authn.decode_sign1(encoded) |
|
request = cbor2.loads(request_msg.payload) |
|
self._log.debug("Received %r", request) |
|
|
|
# Assert that this is a valid connection request |
|
assert isinstance(request, dict), "Malformed message" |
|
assert request["T"] == "CONN-RQ", "Not a connection request" |
|
self._log.info("Got a CONN-RQ from peer") |
|
|
|
# Validate the message against the authentication key |
|
peer_auth_key = CoseKey.decode(request["AK"]) |
|
self._authn.verify_sign1(request_msg, peer_auth_key) |
|
|
|
# XXX: In a real application, we'd then inspect request["AK"] and |
|
# decide whether we "trust" this peer. Is this a public key we know? |
|
# Let's assume for now, we did some checks and everything is fine. We |
|
# store the peer's public authentication key |
|
self._peer_auth_key = peer_auth_key |
|
|
|
# Begin by generating our ephemeral key |
|
self._generate_key() |
|
|
|
# Decode and store their public key |
|
self._peer_eph_key = X25519PublicKey.from_public_bytes(request["EK"]) |
|
|
|
# We trust this peer, so generate some data and send them our |
|
# ephemeral key. |
|
self._salt = os.urandom(self.SALT_SZ) |
|
self._info = os.urandom(self.INFO_SZ) |
|
|
|
# Also generate a nonce for the peer to encode |
|
self._nonce = os.urandom(self.NONCE_SZ) |
|
|
|
payload = cbor2.dumps( |
|
{ |
|
# Message type: connection acknowlegement… we accept their |
|
# authentication key, and will send them an ephemeral key with |
|
# which to do key agreement with. |
|
"T": "CONN-ACK", |
|
# Authentication key: we will sign our traffic with the private |
|
# counterpart of this key. Send the public key so they can verify |
|
# us. |
|
"AK": self._authn.public_key.encode(), |
|
# Ephemeral key: this is the key we'll use for X25519 key |
|
# exchange. It will be discarded when we are done! |
|
"EK": self._private_key.public_key().public_bytes_raw(), |
|
# Information and salt for protecting the key exchange |
|
"I": self._info, |
|
"S": self._salt, |
|
# The nonce we want them to send back |
|
"N": self._nonce, |
|
} |
|
) |
|
|
|
# Encode and sign this payload with our key |
|
self._log.info( |
|
"Encoding CONN-ACK with info=%s salt=%s nonce=%s", |
|
self._info.hex(), |
|
self._salt.hex(), |
|
self._nonce.hex(), |
|
) |
|
return self._authn.encode_sign1(payload) |
|
|
|
def _exchange_and_derive_key(self): |
|
""" |
|
Derive the key from the peer and our own keys. |
|
""" |
|
self._log.info( |
|
"Deriving shared key with info=%s salt=%s", |
|
self._info.hex(), |
|
self._salt.hex(), |
|
) |
|
shared_key = self._private_key.exchange(self._peer_eph_key) |
|
self._log.debug("Shared key is %s", shared_key.hex()) |
|
self._derived_key = HKDF( |
|
algorithm=hashes.SHA256(), |
|
length=self.DERIVED_KEY_SZ, |
|
salt=self._salt, |
|
info=self._info, |
|
).derive(shared_key) |
|
self._log.debug("Derived key is %s", self._derived_key.hex()) |
|
|
|
@property |
|
def derived_cose_key(self): |
|
""" |
|
Return the derived key as a COSE key |
|
""" |
|
return CoseKey.from_dict( |
|
{ |
|
KpKty: KtySymmetric, |
|
SymKpK: self._derived_key, |
|
KpKeyOps: [MacCreateOp, MacVerifyOp], |
|
} |
|
) |
|
|
|
def _encode_nonce(self, nonce): |
|
""" |
|
Encode the given nonce in a MAC0 |
|
""" |
|
verify_msg = Mac0Message(phdr={Algorithm: HMAC256}, payload=nonce) |
|
verify_msg.key = self.derived_cose_key |
|
return verify_msg.encode() |
|
|
|
def initiator_decode_ack(self, encoded): |
|
""" |
|
Decode the incoming CONN-ACK. Derive the shared secret at our end |
|
then send a MAC0 reply to confirm we accept their public key and we |
|
have determined a shared secret. |
|
""" |
|
# Decode the message, we don't know what authentication key they're |
|
# using yet, so decode without. |
|
|
|
response_msg = self._authn.decode_sign1(encoded) |
|
response = cbor2.loads(response_msg.payload) |
|
self._log.debug("Received %r", response) |
|
|
|
# Assert that this is a valid connection response |
|
assert isinstance(response, dict), "Malformed message" |
|
assert response["T"] == "CONN-ACK", "Not a connection response" |
|
|
|
# Validate the message against the authentication key |
|
peer_auth_key = CoseKey.decode(response["AK"]) |
|
self._authn.verify_sign1(response_msg, peer_auth_key) |
|
|
|
# XXX: In a real application, we'd then inspect response["AK"] and |
|
# decide whether we "trust" this peer. Is this a public key we know? |
|
# Let's assume for now, we did some checks and everything is fine. We |
|
# store the peer's public authentication key |
|
self._peer_auth_key = peer_auth_key |
|
|
|
# Decode and store their public key, salt and info |
|
self._peer_eph_key = X25519PublicKey.from_public_bytes(response["EK"]) |
|
self._salt = response["S"] |
|
self._info = response["I"] |
|
|
|
# Peer's nonce |
|
nonce = response["N"] |
|
|
|
# Perform the key exchange |
|
self._exchange_and_derive_key() |
|
|
|
# Prove we generated something by sending back their nonce value. |
|
encoded_nonce = self._encode_nonce(nonce) |
|
|
|
# Generate a nonce for the peer to do the same! |
|
self._nonce = os.urandom(self.NONCE_SZ) |
|
|
|
# Send back our proof and counter-challenge |
|
payload = cbor2.dumps( |
|
{ |
|
# Message type: connection verification… we accept their |
|
# authentication key, have derived a secret, and wish to check |
|
# they got the same numbers as us. |
|
"T": "CONN-VER", |
|
# The nonce we want them to send back |
|
"N": self._nonce, |
|
# Our challenge verification |
|
"V": encoded_nonce, |
|
} |
|
) |
|
|
|
# Encode and sign this payload with our key |
|
self._log.info( |
|
"Encoding CONN-VER with nonce=%s verification=%s", |
|
self._nonce.hex(), |
|
encoded_nonce.hex(), |
|
) |
|
return self._authn.encode_sign1(payload) |
|
|
|
def responder_verify_nonce(self, encoded): |
|
""" |
|
Decode the incoming nonce challenge response and verify it to ensure |
|
we got the same shared key as them. |
|
""" |
|
|
|
# The key that signed this message should be the same as before |
|
response_msg = self._authn.decode_sign1(encoded, self._peer_auth_key) |
|
response = cbor2.loads(response_msg.payload) |
|
self._log.debug("Received %r", response) |
|
|
|
# Assert that this is a valid connection verification message |
|
assert isinstance(response, dict), "Malformed message" |
|
assert response["T"] == "CONN-VER", "Not a connection verification" |
|
|
|
# Peer's nonce |
|
nonce = response["N"] |
|
|
|
# Perform the key exchange |
|
self._exchange_and_derive_key() |
|
self._log.info("Our shared key is %s", self._derived_key.hex()) |
|
|
|
# Validate the peer's challenge response |
|
challenge_msg = CoseMessage.decode(response["V"]) |
|
challenge_msg.key = self.derived_cose_key |
|
if not challenge_msg.verify_tag() or (challenge_msg.payload != self._nonce): |
|
# This did not work, reject the connection |
|
return self._reject_conn("BAD-NONCE") |
|
|
|
# All good… we do the same, send them back their nonce to prove we |
|
# came up with the same numbers. |
|
encoded_nonce = self._encode_nonce(nonce) |
|
payload = cbor2.dumps( |
|
{ |
|
# Message type: connection accepted. |
|
"T": "CONN-ACCEPT", |
|
"V": encoded_nonce, |
|
} |
|
) |
|
|
|
# Encode and sign this payload with our key |
|
self._log.info("Encoding CONN-ACCEPT with verification=%s", encoded_nonce.hex()) |
|
return self._authn.encode_sign1(payload) |
|
|
|
def initiator_verify_accept(self, encoded): |
|
""" |
|
Check the peer validated our connection attempt. |
|
""" |
|
# The key that signed this message should be the same as before |
|
response_msg = self._authn.decode_sign1(encoded, self._peer_auth_key) |
|
response = cbor2.loads(response_msg.payload) |
|
self._log.debug("Received %r", response) |
|
|
|
# Assert that this is a valid connection verification message |
|
assert isinstance(response, dict), "Malformed message" |
|
assert response["T"] == "CONN-ACCEPT", "Not a connection accept" |
|
|
|
# Validate the peer's challenge response |
|
challenge_msg = CoseMessage.decode(response["V"]) |
|
challenge_msg.key = self.derived_cose_key |
|
if (not challenge_msg.verify_tag()) or (challenge_msg.payload != self._nonce): |
|
# This did not work, reject the connection |
|
return self._reject_conn("BAD-NONCE") |
|
|
|
self._log.info("Our shared key is %s", self._derived_key.hex()) |
|
payload = cbor2.dumps( |
|
{ |
|
# Message type: connection confirmation. |
|
"T": "CONN-CONFIRM" |
|
} |
|
) |
|
|
|
# Encode and sign this payload with our key |
|
self._log.info("Encoding CONN-CONFIRM") |
|
return self._authn.encode_sign1(payload) |
|
|
|
def responder_confirm(self, encoded): |
|
""" |
|
Validate the confirmation message |
|
""" |
|
# The key that signed this message should be the same as before |
|
response_msg = self._authn.decode_sign1(encoded, self._peer_auth_key) |
|
response = cbor2.loads(response_msg.payload) |
|
self._log.debug("Received %r", response) |
|
|
|
# Assert that this is a valid connection verification message |
|
assert isinstance(response, dict), "Malformed message" |
|
assert response["T"] == "CONN-CONFIRM", "Not a connection confirmation" |
|
|
|
def _reject_conn(self, reason): |
|
""" |
|
Encode and return a connection rejection message. |
|
""" |
|
payload = cbor2.dumps( |
|
{ |
|
# Message type: connection rejection. |
|
"T": "CONN-REJ", |
|
"R": reason, |
|
} |
|
) |
|
|
|
# Encode and sign this payload with our key |
|
self._log.error("Rejecting connection: %s", reason) |
|
return self._authn.encode_sign1(payload) |
|
|
|
|
|
def x25519_initiate(x25519, args, log, **kwargs): |
|
""" |
|
Interactive X25519 key exchange initiator. |
|
""" |
|
msg = x25519.initiator_encode_request() |
|
print("\n\nSend to peer: %s\n\n" % msg.hex()) |
|
|
|
response = bytes.fromhex(input("Peer acknowledgement: ")) |
|
msg = x25519.initiator_decode_ack(response) |
|
print("\n\nSend to peer: %s\n\n" % msg.hex()) |
|
|
|
response = bytes.fromhex(input("Peer acceptance: ")) |
|
msg = x25519.initiator_verify_accept(response) |
|
print("\n\nSend to peer: %s\n\n" % msg.hex()) |
|
|
|
log.info("Derived key: %s", x25519.derived_cose_key.encode().hex()) |
|
|
|
|
|
def x25519_respond(x25519, args, log, **kwargs): |
|
""" |
|
Interactive X25519 key exchange responder. |
|
""" |
|
request = bytes.fromhex(input("Peer request: ")) |
|
msg = x25519.responder_decode_request(request) |
|
print("\n\nSend to peer: %s\n\n" % msg.hex()) |
|
|
|
response = bytes.fromhex(input("Peer verification: ")) |
|
msg = x25519.responder_verify_nonce(response) |
|
print("\n\nSend to peer: %s\n\n" % msg.hex()) |
|
|
|
response = bytes.fromhex(input("Peer verification: ")) |
|
x25519.responder_confirm(response) |
|
|
|
log.info("Derived key: %s", x25519.derived_cose_key.encode().hex()) |
|
|
|
|
|
# Command line |
|
|
|
|
|
if __name__ == "__main__": |
|
logging.basicConfig(level=logging.INFO) |
|
log = logging.getLogger("x25519") |
|
|
|
authn = Sign1Authenticator(log=log.getChild("authn")) |
|
x25519 = X25519KeyExchange(authn, log=log.getChild("keyexchange")) |
|
|
|
ap = argparse.ArgumentParser() |
|
ap.add_argument("--private-key", help="Private key file") |
|
ap.add_argument("--key-curve", help="Key curve", default="ED25519") |
|
|
|
subparsers = ap.add_subparsers(help="sub-command help", required=True) |
|
|
|
ap_encode_sign1 = subparsers.add_parser( |
|
"encode-sign1", help="Sign and encode message" |
|
) |
|
ap_encode_sign1.add_argument("--kid", help="Include a key ID") |
|
ap_encode_sign1.add_argument("payload", default="", type=str, help="Data to encode") |
|
ap_encode_sign1.set_defaults(fn=encode_sign1) |
|
|
|
ap_decode_sign1 = subparsers.add_parser( |
|
"decode-sign1", help="Verify and decode message" |
|
) |
|
ap_decode_sign1.add_argument( |
|
"key", |
|
default="", |
|
type=str, |
|
help="Public key", |
|
nargs="?", |
|
) |
|
ap_decode_sign1.add_argument( |
|
"encoded", default="", type=str, help="Encoded message", nargs="?" |
|
) |
|
ap_decode_sign1.set_defaults(fn=decode_sign1) |
|
|
|
ap_x25519_initiate = subparsers.add_parser("x25519-initiate", help="Initiate ECDHE") |
|
ap_x25519_initiate.set_defaults(fn=x25519_initiate) |
|
|
|
ap_x25519_respond = subparsers.add_parser("x25519-respond", help="Respond to ECDHE") |
|
ap_x25519_respond.set_defaults(fn=x25519_respond) |
|
|
|
args = ap.parse_args() |
|
|
|
if args.private_key: |
|
if os.path.exists(args.private_key): |
|
authn.load_private_key(args.private_key) |
|
else: |
|
authn.generate_private_key(args.key_curve) |
|
authn.save_private_key(args.private_key) |
|
log.info("My Public key: %s", authn.public_key.encode().hex()) |
|
|
|
if args.fn: |
|
args.fn(authn=authn, x25519=x25519, args=args, log=log.getChild("cmd")) |