Source code for arborist.mesh.state

"""Mesh state persisted in the standard arborist DB.

Three tables (defined in `arborist.store.SCHEMA_SQL`):
  mesh_identity  — this peer's keys + group name (singleton)
  mesh_roster    — per-epoch (member_id, sign_pub, dh_pub, role) tuples
  mesh_epochs    — epoch lifecycle: started_at, audit linkage, secret envelope

The `meta.mesh.enabled` flag gates everything. Default off. Mesh-related
CLI commands and any future network code paths short-circuit when the
flag is unset / '0'.
"""

from __future__ import annotations

import base64
import json
import os
import sqlite3
import time
import uuid
from dataclasses import dataclass

from arborist.mesh.crypto import (
    aead_decrypt,
    aead_encrypt,
    ecdh_shared_secret,
    generate_dh_keypair,
    generate_signing_keypair,
)
from arborist.store import (
    append_audit,
    get_meta,
    set_meta,
    transaction,
)


MESH_ENABLED_KEY = "mesh.enabled"


[docs] @dataclass class MeshIdentity: member_id: str sign_priv: bytes sign_pub: bytes dh_priv: bytes dh_pub: bytes group_name: str created_at: int
[docs] @dataclass class MeshRosterEntry: member_id: str sign_pub: bytes dh_pub: bytes role: str # 'admin' | 'member'
# --------------------------------------------------------------------------- # Enable / disable flag # ---------------------------------------------------------------------------
[docs] def is_enabled(conn: sqlite3.Connection) -> bool: return get_meta(conn, MESH_ENABLED_KEY) == "1"
[docs] def set_enabled(conn: sqlite3.Connection, enabled: bool) -> None: with transaction(conn): set_meta(conn, MESH_ENABLED_KEY, "1" if enabled else "0") append_audit( conn, event_type="mesh_enable" if enabled else "mesh_disable", body={"enabled": bool(enabled)}, )
# --------------------------------------------------------------------------- # Identity # ---------------------------------------------------------------------------
[docs] def init_identity( conn: sqlite3.Connection, *, group_name: str, member_id: str | None = None, ) -> MeshIdentity: """Generate this peer's keys and seed epoch 0 with this peer as founding admin. Idempotent on re-call only in the sense that it raises — the schema enforces a singleton via PK = 1. Caller is expected to check load_identity() first. """ if load_identity(conn) is not None: raise RuntimeError("mesh identity already initialized") sign_priv, sign_pub = generate_signing_keypair() dh_priv, dh_pub = generate_dh_keypair() member_id = member_id or _short_id() now = int(time.time()) with transaction(conn): conn.execute( "INSERT INTO mesh_identity " "(id, member_id, sign_priv, sign_pub, dh_priv, dh_pub, group_name, created_at) " "VALUES (1, ?, ?, ?, ?, ?, ?, ?)", (member_id, sign_priv, sign_pub, dh_priv, dh_pub, group_name, now), ) # Epoch 0 — founder is sole admin. conn.execute( "INSERT INTO mesh_roster (epoch_id, member_id, sign_pub, dh_pub, role) " "VALUES (0, ?, ?, ?, 'admin')", (member_id, sign_pub, dh_pub), ) # Epoch 0 secret: random 32-byte symmetric key, wrapped to founder's # own DH key. (One-member envelope is degenerate but the structure # is consistent — every later rotate appends a fresh entry.) secret = os.urandom(32) envelope = _wrap_secret_for_members( secret, members=[(member_id, dh_pub)], sender_dh_priv=dh_priv, ) conn.execute( "INSERT INTO mesh_epochs " "(epoch_id, started_at, started_event_hash, secret_envelope, reason) " "VALUES (0, ?, '', ?, 'genesis')", (now, json.dumps(envelope, separators=(",", ":"), sort_keys=True)), ) event_hash = append_audit( conn, event_type="mesh_init", body={ "group_name": group_name, "founder": member_id, "sign_pub_hex": sign_pub.hex(), "dh_pub_hex": dh_pub.hex(), }, ) # Backfill the epoch's audit linkage. with transaction(conn): conn.execute( "UPDATE mesh_epochs SET started_event_hash = ? WHERE epoch_id = 0", (event_hash,), ) return MeshIdentity( member_id=member_id, sign_priv=sign_priv, sign_pub=sign_pub, dh_priv=dh_priv, dh_pub=dh_pub, group_name=group_name, created_at=now, )
[docs] def load_identity(conn: sqlite3.Connection) -> MeshIdentity | None: row = conn.execute( "SELECT member_id, sign_priv, sign_pub, dh_priv, dh_pub, group_name, created_at " "FROM mesh_identity WHERE id = 1" ).fetchone() if row is None: return None return MeshIdentity( member_id=row["member_id"], sign_priv=bytes(row["sign_priv"]), sign_pub=bytes(row["sign_pub"]), dh_priv=bytes(row["dh_priv"]), dh_pub=bytes(row["dh_pub"]), group_name=row["group_name"], created_at=int(row["created_at"]), )
# --------------------------------------------------------------------------- # Roster + epoch queries # ---------------------------------------------------------------------------
[docs] def current_epoch(conn: sqlite3.Connection) -> int | None: row = conn.execute("SELECT MAX(epoch_id) AS e FROM mesh_epochs").fetchone() if row is None or row["e"] is None: return None return int(row["e"])
def roster_at(conn: sqlite3.Connection, epoch_id: int) -> list[MeshRosterEntry]: rows = conn.execute( "SELECT member_id, sign_pub, dh_pub, role " "FROM mesh_roster WHERE epoch_id = ? ORDER BY member_id", (epoch_id,), ).fetchall() return [ MeshRosterEntry( member_id=r["member_id"], sign_pub=bytes(r["sign_pub"]), dh_pub=bytes(r["dh_pub"]), role=r["role"], ) for r in rows ] # --------------------------------------------------------------------------- # Epoch rotation (also used for join + kick) # --------------------------------------------------------------------------- def rotate_epoch( conn: sqlite3.Connection, *, new_members: list[MeshRosterEntry], reason: str, actor_member_id: str, ) -> int: """Create a new epoch with the given roster. `new_members` is the FULL post-rotation roster (not a diff). Eviction = omit a member from new_members. Joins = include a new member. The secret envelope is regenerated and wrapped to every new member's DH pubkey via ECDH from this peer's own DH private key. Caller is responsible for verifying authority (e.g., admin role) before calling. Audit chain records the rotation rationale. """ me = load_identity(conn) if me is None: raise RuntimeError("no mesh identity; call init_identity first") prior = current_epoch(conn) if prior is None: raise RuntimeError("no prior epoch; call init_identity first") new_epoch = prior + 1 secret = os.urandom(32) envelope = _wrap_secret_for_members( secret, members=[(m.member_id, m.dh_pub) for m in new_members], sender_dh_priv=me.dh_priv, ) with transaction(conn): for m in new_members: conn.execute( "INSERT INTO mesh_roster (epoch_id, member_id, sign_pub, dh_pub, role) " "VALUES (?, ?, ?, ?, ?)", (new_epoch, m.member_id, m.sign_pub, m.dh_pub, m.role), ) conn.execute( "INSERT INTO mesh_epochs " "(epoch_id, started_at, started_event_hash, secret_envelope, reason) " "VALUES (?, ?, '', ?, ?)", ( new_epoch, int(time.time()), json.dumps(envelope, separators=(",", ":"), sort_keys=True), reason, ), ) event_hash = append_audit( conn, event_type="mesh_epoch_rotate", body={ "actor": actor_member_id, "new_epoch": new_epoch, "members": [m.member_id for m in new_members], "reason": reason, }, ) with transaction(conn): conn.execute( "UPDATE mesh_epochs SET started_event_hash = ? WHERE epoch_id = ?", (event_hash, new_epoch), ) return new_epoch # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _short_id() -> str: """8-hex-char member id. Collisions across a small mesh are negligible.""" return uuid.uuid4().hex[:8] def _wrap_secret_for_members( secret: bytes, *, members: list[tuple[str, bytes]], sender_dh_priv: bytes, ) -> dict[str, dict[str, str]]: """Per-member ECDH+AEAD wrap of the epoch secret. Each member gets a dict {nonce_b64, ct_b64}. Decrypt path: derive the same shared secret via ECDH(member_dh_priv, sender_dh_pub), then ChaCha20-Poly1305 decrypt with the recorded nonce. """ envelope: dict[str, dict[str, str]] = {} for member_id, member_dh_pub in members: shared = ecdh_shared_secret(sender_dh_priv, member_dh_pub) nonce = os.urandom(12) ct = aead_encrypt(shared, nonce, secret, aad=member_id.encode("utf-8")) envelope[member_id] = { "nonce_b64": base64.b64encode(nonce).decode("ascii"), "ct_b64": base64.b64encode(ct).decode("ascii"), } return envelope def unwrap_secret_for_self( conn: sqlite3.Connection, *, epoch_id: int, sender_dh_pub: bytes, ) -> bytes: """Recover the symmetric epoch secret using this peer's DH private key. `sender_dh_pub` is the wrapping peer's X25519 pubkey (typically the epoch's rotator). Raises ValueError if this peer has no entry in the epoch's envelope (i.e. they were evicted) or if the AEAD tag mismatches. """ me = load_identity(conn) if me is None: raise RuntimeError("no mesh identity") row = conn.execute( "SELECT secret_envelope FROM mesh_epochs WHERE epoch_id = ?", (epoch_id,), ).fetchone() if row is None: raise ValueError(f"unknown epoch: {epoch_id}") envelope = json.loads(row["secret_envelope"]) entry = envelope.get(me.member_id) if entry is None: raise ValueError( f"this peer ({me.member_id}) is not in epoch {epoch_id}'s envelope" ) nonce = base64.b64decode(entry["nonce_b64"]) ct = base64.b64decode(entry["ct_b64"]) shared = ecdh_shared_secret(me.dh_priv, sender_dh_pub) return aead_decrypt(shared, nonce, ct, aad=me.member_id.encode("utf-8")) def recover_epoch_secret( conn: sqlite3.Connection, *, epoch_id: int, ) -> bytes: """Convenience: locate the rotator's dh_pub locally and unwrap our slot. The rotator is recorded in the epoch's audit event body — `actor` for rotations (epoch >= 1), `founder` for the genesis epoch 0. We resolve the actor's dh_pub via `mesh_roster` at that epoch (the actor was a member of the post-rotation roster by construction). Then we hand off to `unwrap_secret_for_self`. This is the path used by both senders (re-unwrapping their own slot to AEAD-encrypt outbound gossip) and receivers (decrypting an inbound encrypted body). Sender-side it's the simplest answer to "where does the sender get the epoch secret" without a new caching layer. Raises: RuntimeError if mesh identity not initialized. ValueError if the epoch row is missing, the audit event is missing, the actor is not in the epoch's roster, this peer has no slot in the envelope (evicted), or the AEAD tag mismatches. """ row = conn.execute( "SELECT started_event_hash FROM mesh_epochs WHERE epoch_id = ?", (epoch_id,), ).fetchone() if row is None: raise ValueError(f"unknown epoch: {epoch_id}") started_event_hash = row["started_event_hash"] if not started_event_hash: raise ValueError(f"epoch {epoch_id} has no audit linkage") ev = conn.execute( "SELECT body FROM audit_events WHERE event_hash = ?", (started_event_hash,), ).fetchone() if ev is None: raise ValueError( f"epoch {epoch_id} audit event {started_event_hash!r} not found" ) body = json.loads(ev["body"]) # Genesis writes "founder"; rotations write "actor". actor = body.get("actor") or body.get("founder") if not actor: raise ValueError( f"epoch {epoch_id} audit body has no actor/founder field" ) rotator = next( (m for m in roster_at(conn, epoch_id) if m.member_id == actor), None, ) if rotator is None: raise ValueError( f"rotator {actor!r} not in epoch {epoch_id} roster" ) return unwrap_secret_for_self( conn, epoch_id=epoch_id, sender_dh_pub=rotator.dh_pub )