"""Mesh state persisted in the standard arborist DB.
Three tables (defined in `arborist.store.SCHEMA_SQL`):
mesh_identity — this peer's keys + group name (singleton)
mesh_roster — per-epoch (member_id, sign_pub, dh_pub, role) tuples
mesh_epochs — epoch lifecycle: started_at, audit linkage, secret envelope
The `meta.mesh.enabled` flag gates everything. Default off. Mesh-related
CLI commands and any future network code paths short-circuit when the
flag is unset / '0'.
"""
from __future__ import annotations
import base64
import json
import os
import sqlite3
import time
import uuid
from dataclasses import dataclass
from arborist.mesh.crypto import (
aead_decrypt,
aead_encrypt,
ecdh_shared_secret,
generate_dh_keypair,
generate_signing_keypair,
)
from arborist.store import (
append_audit,
get_meta,
set_meta,
transaction,
)
MESH_ENABLED_KEY = "mesh.enabled"
[docs]
@dataclass
class MeshIdentity:
member_id: str
sign_priv: bytes
sign_pub: bytes
dh_priv: bytes
dh_pub: bytes
group_name: str
created_at: int
[docs]
@dataclass
class MeshRosterEntry:
member_id: str
sign_pub: bytes
dh_pub: bytes
role: str # 'admin' | 'member'
# ---------------------------------------------------------------------------
# Enable / disable flag
# ---------------------------------------------------------------------------
[docs]
def is_enabled(conn: sqlite3.Connection) -> bool:
return get_meta(conn, MESH_ENABLED_KEY) == "1"
[docs]
def set_enabled(conn: sqlite3.Connection, enabled: bool) -> None:
with transaction(conn):
set_meta(conn, MESH_ENABLED_KEY, "1" if enabled else "0")
append_audit(
conn,
event_type="mesh_enable" if enabled else "mesh_disable",
body={"enabled": bool(enabled)},
)
# ---------------------------------------------------------------------------
# Identity
# ---------------------------------------------------------------------------
[docs]
def init_identity(
conn: sqlite3.Connection,
*,
group_name: str,
member_id: str | None = None,
) -> MeshIdentity:
"""Generate this peer's keys and seed epoch 0 with this peer as founding admin.
Idempotent on re-call only in the sense that it raises — the schema
enforces a singleton via PK = 1. Caller is expected to check
load_identity() first.
"""
if load_identity(conn) is not None:
raise RuntimeError("mesh identity already initialized")
sign_priv, sign_pub = generate_signing_keypair()
dh_priv, dh_pub = generate_dh_keypair()
member_id = member_id or _short_id()
now = int(time.time())
with transaction(conn):
conn.execute(
"INSERT INTO mesh_identity "
"(id, member_id, sign_priv, sign_pub, dh_priv, dh_pub, group_name, created_at) "
"VALUES (1, ?, ?, ?, ?, ?, ?, ?)",
(member_id, sign_priv, sign_pub, dh_priv, dh_pub, group_name, now),
)
# Epoch 0 — founder is sole admin.
conn.execute(
"INSERT INTO mesh_roster (epoch_id, member_id, sign_pub, dh_pub, role) "
"VALUES (0, ?, ?, ?, 'admin')",
(member_id, sign_pub, dh_pub),
)
# Epoch 0 secret: random 32-byte symmetric key, wrapped to founder's
# own DH key. (One-member envelope is degenerate but the structure
# is consistent — every later rotate appends a fresh entry.)
secret = os.urandom(32)
envelope = _wrap_secret_for_members(
secret,
members=[(member_id, dh_pub)],
sender_dh_priv=dh_priv,
)
conn.execute(
"INSERT INTO mesh_epochs "
"(epoch_id, started_at, started_event_hash, secret_envelope, reason) "
"VALUES (0, ?, '', ?, 'genesis')",
(now, json.dumps(envelope, separators=(",", ":"), sort_keys=True)),
)
event_hash = append_audit(
conn,
event_type="mesh_init",
body={
"group_name": group_name,
"founder": member_id,
"sign_pub_hex": sign_pub.hex(),
"dh_pub_hex": dh_pub.hex(),
},
)
# Backfill the epoch's audit linkage.
with transaction(conn):
conn.execute(
"UPDATE mesh_epochs SET started_event_hash = ? WHERE epoch_id = 0",
(event_hash,),
)
return MeshIdentity(
member_id=member_id,
sign_priv=sign_priv,
sign_pub=sign_pub,
dh_priv=dh_priv,
dh_pub=dh_pub,
group_name=group_name,
created_at=now,
)
[docs]
def load_identity(conn: sqlite3.Connection) -> MeshIdentity | None:
row = conn.execute(
"SELECT member_id, sign_priv, sign_pub, dh_priv, dh_pub, group_name, created_at "
"FROM mesh_identity WHERE id = 1"
).fetchone()
if row is None:
return None
return MeshIdentity(
member_id=row["member_id"],
sign_priv=bytes(row["sign_priv"]),
sign_pub=bytes(row["sign_pub"]),
dh_priv=bytes(row["dh_priv"]),
dh_pub=bytes(row["dh_pub"]),
group_name=row["group_name"],
created_at=int(row["created_at"]),
)
# ---------------------------------------------------------------------------
# Roster + epoch queries
# ---------------------------------------------------------------------------
[docs]
def current_epoch(conn: sqlite3.Connection) -> int | None:
row = conn.execute("SELECT MAX(epoch_id) AS e FROM mesh_epochs").fetchone()
if row is None or row["e"] is None:
return None
return int(row["e"])
def roster_at(conn: sqlite3.Connection, epoch_id: int) -> list[MeshRosterEntry]:
rows = conn.execute(
"SELECT member_id, sign_pub, dh_pub, role "
"FROM mesh_roster WHERE epoch_id = ? ORDER BY member_id",
(epoch_id,),
).fetchall()
return [
MeshRosterEntry(
member_id=r["member_id"],
sign_pub=bytes(r["sign_pub"]),
dh_pub=bytes(r["dh_pub"]),
role=r["role"],
)
for r in rows
]
# ---------------------------------------------------------------------------
# Epoch rotation (also used for join + kick)
# ---------------------------------------------------------------------------
def rotate_epoch(
conn: sqlite3.Connection,
*,
new_members: list[MeshRosterEntry],
reason: str,
actor_member_id: str,
) -> int:
"""Create a new epoch with the given roster.
`new_members` is the FULL post-rotation roster (not a diff). Eviction =
omit a member from new_members. Joins = include a new member. The
secret envelope is regenerated and wrapped to every new member's DH
pubkey via ECDH from this peer's own DH private key.
Caller is responsible for verifying authority (e.g., admin role) before
calling. Audit chain records the rotation rationale.
"""
me = load_identity(conn)
if me is None:
raise RuntimeError("no mesh identity; call init_identity first")
prior = current_epoch(conn)
if prior is None:
raise RuntimeError("no prior epoch; call init_identity first")
new_epoch = prior + 1
secret = os.urandom(32)
envelope = _wrap_secret_for_members(
secret,
members=[(m.member_id, m.dh_pub) for m in new_members],
sender_dh_priv=me.dh_priv,
)
with transaction(conn):
for m in new_members:
conn.execute(
"INSERT INTO mesh_roster (epoch_id, member_id, sign_pub, dh_pub, role) "
"VALUES (?, ?, ?, ?, ?)",
(new_epoch, m.member_id, m.sign_pub, m.dh_pub, m.role),
)
conn.execute(
"INSERT INTO mesh_epochs "
"(epoch_id, started_at, started_event_hash, secret_envelope, reason) "
"VALUES (?, ?, '', ?, ?)",
(
new_epoch,
int(time.time()),
json.dumps(envelope, separators=(",", ":"), sort_keys=True),
reason,
),
)
event_hash = append_audit(
conn,
event_type="mesh_epoch_rotate",
body={
"actor": actor_member_id,
"new_epoch": new_epoch,
"members": [m.member_id for m in new_members],
"reason": reason,
},
)
with transaction(conn):
conn.execute(
"UPDATE mesh_epochs SET started_event_hash = ? WHERE epoch_id = ?",
(event_hash, new_epoch),
)
return new_epoch
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _short_id() -> str:
"""8-hex-char member id. Collisions across a small mesh are negligible."""
return uuid.uuid4().hex[:8]
def _wrap_secret_for_members(
secret: bytes,
*,
members: list[tuple[str, bytes]],
sender_dh_priv: bytes,
) -> dict[str, dict[str, str]]:
"""Per-member ECDH+AEAD wrap of the epoch secret.
Each member gets a dict {nonce_b64, ct_b64}. Decrypt path: derive the
same shared secret via ECDH(member_dh_priv, sender_dh_pub), then
ChaCha20-Poly1305 decrypt with the recorded nonce.
"""
envelope: dict[str, dict[str, str]] = {}
for member_id, member_dh_pub in members:
shared = ecdh_shared_secret(sender_dh_priv, member_dh_pub)
nonce = os.urandom(12)
ct = aead_encrypt(shared, nonce, secret, aad=member_id.encode("utf-8"))
envelope[member_id] = {
"nonce_b64": base64.b64encode(nonce).decode("ascii"),
"ct_b64": base64.b64encode(ct).decode("ascii"),
}
return envelope
def unwrap_secret_for_self(
conn: sqlite3.Connection,
*,
epoch_id: int,
sender_dh_pub: bytes,
) -> bytes:
"""Recover the symmetric epoch secret using this peer's DH private key.
`sender_dh_pub` is the wrapping peer's X25519 pubkey (typically the
epoch's rotator). Raises ValueError if this peer has no entry in the
epoch's envelope (i.e. they were evicted) or if the AEAD tag mismatches.
"""
me = load_identity(conn)
if me is None:
raise RuntimeError("no mesh identity")
row = conn.execute(
"SELECT secret_envelope FROM mesh_epochs WHERE epoch_id = ?",
(epoch_id,),
).fetchone()
if row is None:
raise ValueError(f"unknown epoch: {epoch_id}")
envelope = json.loads(row["secret_envelope"])
entry = envelope.get(me.member_id)
if entry is None:
raise ValueError(
f"this peer ({me.member_id}) is not in epoch {epoch_id}'s envelope"
)
nonce = base64.b64decode(entry["nonce_b64"])
ct = base64.b64decode(entry["ct_b64"])
shared = ecdh_shared_secret(me.dh_priv, sender_dh_pub)
return aead_decrypt(shared, nonce, ct, aad=me.member_id.encode("utf-8"))
def recover_epoch_secret(
conn: sqlite3.Connection,
*,
epoch_id: int,
) -> bytes:
"""Convenience: locate the rotator's dh_pub locally and unwrap our slot.
The rotator is recorded in the epoch's audit event body — `actor` for
rotations (epoch >= 1), `founder` for the genesis epoch 0. We resolve
the actor's dh_pub via `mesh_roster` at that epoch (the actor was a
member of the post-rotation roster by construction). Then we hand off
to `unwrap_secret_for_self`.
This is the path used by both senders (re-unwrapping their own slot
to AEAD-encrypt outbound gossip) and receivers (decrypting an inbound
encrypted body). Sender-side it's the simplest answer to "where does
the sender get the epoch secret" without a new caching layer.
Raises:
RuntimeError if mesh identity not initialized.
ValueError if the epoch row is missing, the audit event is missing,
the actor is not in the epoch's roster, this peer has no slot in
the envelope (evicted), or the AEAD tag mismatches.
"""
row = conn.execute(
"SELECT started_event_hash FROM mesh_epochs WHERE epoch_id = ?",
(epoch_id,),
).fetchone()
if row is None:
raise ValueError(f"unknown epoch: {epoch_id}")
started_event_hash = row["started_event_hash"]
if not started_event_hash:
raise ValueError(f"epoch {epoch_id} has no audit linkage")
ev = conn.execute(
"SELECT body FROM audit_events WHERE event_hash = ?",
(started_event_hash,),
).fetchone()
if ev is None:
raise ValueError(
f"epoch {epoch_id} audit event {started_event_hash!r} not found"
)
body = json.loads(ev["body"])
# Genesis writes "founder"; rotations write "actor".
actor = body.get("actor") or body.get("founder")
if not actor:
raise ValueError(
f"epoch {epoch_id} audit body has no actor/founder field"
)
rotator = next(
(m for m in roster_at(conn, epoch_id) if m.member_id == actor),
None,
)
if rotator is None:
raise ValueError(
f"rotator {actor!r} not in epoch {epoch_id} roster"
)
return unwrap_secret_for_self(
conn, epoch_id=epoch_id, sender_dh_pub=rotator.dh_pub
)