Source code for arborist.mesh.crypto
"""Cryptographic primitives for the mesh layer.
Ed25519 for signing (membership events, gossip envelopes).
X25519 for key agreement (wrap epoch secrets per-member).
ChaCha20-Poly1305 for AEAD (optional payload encryption).
All key material is bytes (raw 32-byte forms) so the storage layer can
keep keys in BLOB columns without serialization. The `cryptography`
library is the audited backend; this module is a thin wrapper that
hides the import surface and enforces consistent error handling.
"""
from __future__ import annotations
from cryptography.exceptions import InvalidSignature, InvalidTag
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ed25519, x25519
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
# -----------------------------------------------------------------------------
# Ed25519 — signing
# -----------------------------------------------------------------------------
[docs]
def generate_signing_keypair() -> tuple[bytes, bytes]:
"""Return (priv_bytes, pub_bytes) for a fresh Ed25519 keypair."""
priv = ed25519.Ed25519PrivateKey.generate()
priv_bytes = priv.private_bytes_raw()
pub_bytes = priv.public_key().public_bytes_raw()
return priv_bytes, pub_bytes
[docs]
def sign(priv_bytes: bytes, message: bytes) -> bytes:
"""Ed25519 sign. 64-byte signature."""
priv = ed25519.Ed25519PrivateKey.from_private_bytes(priv_bytes)
return priv.sign(message)
[docs]
def verify(pub_bytes: bytes, signature: bytes, message: bytes) -> bool:
"""Ed25519 verify. Returns True/False — never raises."""
try:
pub = ed25519.Ed25519PublicKey.from_public_bytes(pub_bytes)
pub.verify(signature, message)
return True
except (InvalidSignature, ValueError):
return False
# -----------------------------------------------------------------------------
# X25519 — key agreement for wrapping epoch secrets per-member
# -----------------------------------------------------------------------------
[docs]
def generate_dh_keypair() -> tuple[bytes, bytes]:
"""Return (priv_bytes, pub_bytes) for a fresh X25519 keypair."""
priv = x25519.X25519PrivateKey.generate()
priv_bytes = priv.private_bytes_raw()
pub_bytes = priv.public_key().public_bytes_raw()
return priv_bytes, pub_bytes
[docs]
def ecdh_shared_secret(priv_bytes: bytes, peer_pub_bytes: bytes) -> bytes:
"""X25519 ECDH -> 32-byte shared secret (HKDF-extracted)."""
priv = x25519.X25519PrivateKey.from_private_bytes(priv_bytes)
peer = x25519.X25519PublicKey.from_public_bytes(peer_pub_bytes)
raw = priv.exchange(peer)
# HKDF-Extract+Expand to a 32-byte AEAD key. The salt is fixed; the
# info string distinguishes this key from any other use of ECDH on
# the same peer-pair (e.g. if we ever add another protocol layer).
return HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=b"arborist.mesh.epoch.v1",
info=b"epoch-secret-wrap",
).derive(raw)
# -----------------------------------------------------------------------------
# AEAD — ChaCha20-Poly1305 for envelope payloads + secret wrapping
# -----------------------------------------------------------------------------
[docs]
def aead_encrypt(key: bytes, nonce: bytes, plaintext: bytes, aad: bytes = b"") -> bytes:
"""Encrypt + authenticate. Returns ciphertext||tag."""
if len(key) != 32:
raise ValueError("AEAD key must be 32 bytes")
if len(nonce) != 12:
raise ValueError("ChaCha20-Poly1305 nonce must be 12 bytes")
cipher = ChaCha20Poly1305(key)
return cipher.encrypt(nonce, plaintext, aad)
[docs]
def aead_decrypt(key: bytes, nonce: bytes, ciphertext: bytes, aad: bytes = b"") -> bytes:
"""Decrypt + verify. Raises ValueError on tag mismatch (no plaintext leak)."""
if len(key) != 32:
raise ValueError("AEAD key must be 32 bytes")
if len(nonce) != 12:
raise ValueError("ChaCha20-Poly1305 nonce must be 12 bytes")
cipher = ChaCha20Poly1305(key)
try:
return cipher.decrypt(nonce, ciphertext, aad)
except InvalidTag as e:
raise ValueError("AEAD authentication failed") from e