Source code for arborist.mesh.crypto

"""Cryptographic primitives for the mesh layer.

Ed25519 for signing (membership events, gossip envelopes).
X25519 for key agreement (wrap epoch secrets per-member).
ChaCha20-Poly1305 for AEAD (optional payload encryption).

All key material is bytes (raw 32-byte forms) so the storage layer can
keep keys in BLOB columns without serialization. The `cryptography`
library is the audited backend; this module is a thin wrapper that
hides the import surface and enforces consistent error handling.
"""

from __future__ import annotations

from cryptography.exceptions import InvalidSignature, InvalidTag
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ed25519, x25519
from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305
from cryptography.hazmat.primitives.kdf.hkdf import HKDF


# -----------------------------------------------------------------------------
# Ed25519 — signing
# -----------------------------------------------------------------------------


[docs] def generate_signing_keypair() -> tuple[bytes, bytes]: """Return (priv_bytes, pub_bytes) for a fresh Ed25519 keypair.""" priv = ed25519.Ed25519PrivateKey.generate() priv_bytes = priv.private_bytes_raw() pub_bytes = priv.public_key().public_bytes_raw() return priv_bytes, pub_bytes
[docs] def sign(priv_bytes: bytes, message: bytes) -> bytes: """Ed25519 sign. 64-byte signature.""" priv = ed25519.Ed25519PrivateKey.from_private_bytes(priv_bytes) return priv.sign(message)
[docs] def verify(pub_bytes: bytes, signature: bytes, message: bytes) -> bool: """Ed25519 verify. Returns True/False — never raises.""" try: pub = ed25519.Ed25519PublicKey.from_public_bytes(pub_bytes) pub.verify(signature, message) return True except (InvalidSignature, ValueError): return False
# ----------------------------------------------------------------------------- # X25519 — key agreement for wrapping epoch secrets per-member # -----------------------------------------------------------------------------
[docs] def generate_dh_keypair() -> tuple[bytes, bytes]: """Return (priv_bytes, pub_bytes) for a fresh X25519 keypair.""" priv = x25519.X25519PrivateKey.generate() priv_bytes = priv.private_bytes_raw() pub_bytes = priv.public_key().public_bytes_raw() return priv_bytes, pub_bytes
[docs] def ecdh_shared_secret(priv_bytes: bytes, peer_pub_bytes: bytes) -> bytes: """X25519 ECDH -> 32-byte shared secret (HKDF-extracted).""" priv = x25519.X25519PrivateKey.from_private_bytes(priv_bytes) peer = x25519.X25519PublicKey.from_public_bytes(peer_pub_bytes) raw = priv.exchange(peer) # HKDF-Extract+Expand to a 32-byte AEAD key. The salt is fixed; the # info string distinguishes this key from any other use of ECDH on # the same peer-pair (e.g. if we ever add another protocol layer). return HKDF( algorithm=hashes.SHA256(), length=32, salt=b"arborist.mesh.epoch.v1", info=b"epoch-secret-wrap", ).derive(raw)
# ----------------------------------------------------------------------------- # AEAD — ChaCha20-Poly1305 for envelope payloads + secret wrapping # -----------------------------------------------------------------------------
[docs] def aead_encrypt(key: bytes, nonce: bytes, plaintext: bytes, aad: bytes = b"") -> bytes: """Encrypt + authenticate. Returns ciphertext||tag.""" if len(key) != 32: raise ValueError("AEAD key must be 32 bytes") if len(nonce) != 12: raise ValueError("ChaCha20-Poly1305 nonce must be 12 bytes") cipher = ChaCha20Poly1305(key) return cipher.encrypt(nonce, plaintext, aad)
[docs] def aead_decrypt(key: bytes, nonce: bytes, ciphertext: bytes, aad: bytes = b"") -> bytes: """Decrypt + verify. Raises ValueError on tag mismatch (no plaintext leak).""" if len(key) != 32: raise ValueError("AEAD key must be 32 bytes") if len(nonce) != 12: raise ValueError("ChaCha20-Poly1305 nonce must be 12 bytes") cipher = ChaCha20Poly1305(key) try: return cipher.decrypt(nonce, ciphertext, aad) except InvalidTag as e: raise ValueError("AEAD authentication failed") from e