"""Arborist CLI: ingest / search / verify / stats."""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
from arborist import __version__
from arborist.ingest import ingest_source, verify_random_sample
from arborist.progress import Progress
from arborist.search import FTS5Backend
from arborist.sources import WikipediaCurDump
from arborist.store import (
DEFAULT_DB_PATH,
append_audit,
connect,
connect_query,
connect_readonly,
stats,
transaction,
)
def _cmd_ingest(args: argparse.Namespace) -> int:
"""Ingest a corpus (Wikipedia, HTML, git, etc.) into a shard."""
if args.source in ("wikipedia_cur", "wikipedia_old"):
if not args.path:
print(f"--path is required for {args.source}", file=sys.stderr)
return 2
from arborist.sources import WikipediaSqlDump
table = "cur" if args.source == "wikipedia_cur" else "old"
shard = None
if args.shard:
rank_str, total_str = args.shard.split("/", 1)
shard = (int(rank_str), int(total_str))
src = WikipediaSqlDump(path=args.path, table=table, shard=shard)
elif args.source == "html": # noqa: SIM114 — keep branch shape
try:
from arborist.sources import HtmlPageSource
except ImportError:
print(
"html source requires extras: pip install 'arborist[html]'",
file=sys.stderr,
)
return 2
urls: list[str] = list(args.url or [])
if args.urls_from:
urls.extend(
line.strip()
for line in Path(args.urls_from).read_text(encoding="utf-8").splitlines()
if line.strip() and not line.lstrip().startswith("#")
)
if not urls:
print("html source needs --url or --urls-from", file=sys.stderr)
return 2
src = HtmlPageSource(
urls,
respect_robots=not args.no_robots,
default_author=getattr(args, "author", None),
)
elif args.source in ("grok_export", "grok_media"):
if not args.path:
print(f"--path is required for {args.source}", file=sys.stderr)
return 2
from arborist.sources import GrokExportSource, GrokMediaPostsSource
cls = GrokExportSource if args.source == "grok_export" else GrokMediaPostsSource
src = cls(path=args.path)
elif args.source == "textbook_tex":
# PG-style LaTeX source ingest (#000031). Each --url or --bundle
# points at a TeX URL (e.g., PG eBook /files/N/N-t/N-t.tex).
# `--urls-from FILE` for a list. The strip-tex pipeline produces
# plain prose suitable for the standard 512-token chunker.
urls: list[str] = list(getattr(args, "url", None) or [])
if getattr(args, "bundle", None):
urls.extend(args.bundle)
if args.urls_from:
urls.extend(
line.strip()
for line in Path(args.urls_from).read_text(encoding="utf-8").splitlines()
if line.strip() and not line.lstrip().startswith("#")
)
if not urls:
print(
"textbook_tex source needs --url, --bundle, or --urls-from",
file=sys.stderr,
)
return 2
from arborist.sources import TextbookTexSource
src = TextbookTexSource(
urls,
default_author=getattr(args, "author", None),
)
elif args.source == "claim_pack":
# Companion JSON bundles (axiom + theorem packs) — see ticket
# #000029. --bundle is repeatable so axiom-bundle and theorem-bundle
# parse together, allowing cross-bundle pillar_reference edges to
# resolve to specific record URIs. A single --path also works for
# one-bundle ingest.
paths: list[str] = []
if getattr(args, "bundle", None):
paths.extend(args.bundle)
if args.path:
paths.append(args.path)
if not paths:
print(
"claim_pack needs --bundle (repeatable) or --path",
file=sys.stderr,
)
return 2
from arborist.sources import ClaimPackSource
src = ClaimPackSource(paths)
elif args.source in ("wikipedia_xml", "wikipedia_xml_history", "wikipedia_abstract"):
if not args.path:
print(f"--path is required for {args.source}", file=sys.stderr)
return 2
from arborist.sources import WikipediaAbstractDump, WikipediaXmlDump
if args.source == "wikipedia_abstract":
src = WikipediaAbstractDump(path=args.path)
else:
shard = None
if args.shard:
rank_str, total_str = args.shard.split("/", 1)
shard = (int(rank_str), int(total_str))
src = WikipediaXmlDump(
path=args.path,
shard=shard,
multi_revision=(args.source == "wikipedia_xml_history"),
)
elif args.source in ("git_repo", "hg_repo"):
if not args.path:
print(f"--path is required for {args.source}", file=sys.stderr)
return 2
from arborist.sources import GitRepoSource, MercurialRepoSource
cls = GitRepoSource if args.source == "git_repo" else MercurialRepoSource
src = cls(repo_path=args.path)
elif args.source == "providence":
# Self-reference: promote STRICT live providence_cache records
# past the kindergarten window into the document corpus.
# See docs/self-reference-design.md.
#
# NOTE: ``connect`` lives at module scope (line 19). Re-importing
# it inside this branch makes Python's compiler treat ``connect``
# as a function-local for the entire ``_cmd_ingest`` body, which
# breaks the module-level binding used at line 177 for *every*
# non-providence source. Don't add a local re-import here.
from arborist.sources.providence import (
DEFAULT_KINDERGARTEN_SECONDS,
ProvidenceSource,
)
# The source reads from the SAME shard it's writing into —
# promote each shard's own STRICT records to its own
# documents table. Cross-shard promotion runs as a separate
# invocation per shard.
target_db_for_read = args.db
if args.shards_dir and args.shard:
rank_str, total_str = args.shard.split("/", 1)
rank = int(rank_str)
total = int(total_str)
digits = max(3, len(str(total - 1)))
target_db_for_read = Path(args.shards_dir) / f"{rank:0{digits}d}.db"
if not target_db_for_read:
print("--db or --shards-dir + --shard required for providence source", file=sys.stderr)
return 2
kg_seconds = int(getattr(args, "kindergarten_seconds", None) or DEFAULT_KINDERGARTEN_SECONDS)
# Open a separate connection for reading; ingest opens its own
# write connection downstream.
read_conn = connect(target_db_for_read)
src = ProvidenceSource(read_conn, kindergarten_seconds=kg_seconds)
else:
print(f"unknown source: {args.source}", file=sys.stderr)
return 2
# Resolve target DB: if --shards-dir is set with --shard, write to a
# per-shard file. Each shard owns its own SQLite file, so N parallel
# ingests have ZERO writer-lock contention.
target_db = args.db
if args.shards_dir:
if not args.shard:
print(
"--shards-dir requires --shard rank/total",
file=sys.stderr,
)
return 2
rank_str, total_str = args.shard.split("/", 1)
rank = int(rank_str)
total = int(total_str)
shards_dir = Path(args.shards_dir)
shards_dir.mkdir(parents=True, exist_ok=True)
digits = max(3, len(str(total - 1)))
target_db = shards_dir / f"{rank:0{digits}d}.db"
progress: Progress | None = None
if not args.quiet:
prefix = ""
if args.shard:
prefix = f"[shard {args.shard}] "
progress = Progress(
interval=args.progress_interval,
total_estimate=args.total_estimate,
prefix=prefix,
)
conn = connect(target_db)
embedded = None
try:
result = ingest_source(
conn,
src,
chunker_name=args.chunker,
limit=args.limit,
batch_size=args.batch_size,
resume=args.resume,
progress=progress,
loss_report_enabled=not args.no_loss_report,
loss_report_excerpts=not args.no_loss_excerpts,
loss_report_max_excerpt_bytes=args.loss_excerpt_bytes,
)
if getattr(args, "embed", False):
# Eager opt-in: after the chunk+Merkle-commit pass, embed the
# chunks this run added (incremental — only chunk_ids not
# already in chunk_vecs). Default ingest does NOT embed; the
# lazy out-of-band pass (`arborist embed`, or a cron, or the
# Prometheus-Σ unconscious sweep) is the usual path. See #000039.
from arborist.search.vec import embed_documents
embedded = embed_documents(conn, incremental=True)
finally:
conn.close()
out = dict(result.__dict__)
if embedded is not None:
out["chunks_embedded"] = embedded
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
def _cmd_search(args: argparse.Namespace) -> int:
"""Lexical (FTS5) or semantic (vec) query against chunks; hits as JSON/text."""
backend_name = getattr(args, "backend", "fts5")
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
if backend_name == "vec":
from arborist.search import VecBackend # type: ignore
backend = VecBackend(conn)
if not backend.populated():
print(
"chunk_vecs is empty or missing on this DB — run "
"`arborist --db <db> embed` first.",
file=sys.stderr,
)
return 1
else:
backend = FTS5Backend(conn)
hits = backend.search(args.query, limit=args.limit)
finally:
conn.close()
if args.json:
print(
json.dumps(
[
{
"document_root": h.document_root,
"document_uri": h.document_uri,
"chunk_idx": h.chunk_idx,
"snippet": h.snippet,
"score": h.score,
"audit_mode": h.audit_mode.value,
"title": h.title,
}
for h in hits
],
indent=2, ensure_ascii=False
)
)
else:
for h in hits:
print(f"[{h.audit_mode.value}] {h.score:7.3f} {h.title or h.document_uri}")
print(f" chunk {h.chunk_idx} root={h.document_root[:16]}…")
print(f" {h.snippet}")
print()
return 0
def _cmd_embed(args: argparse.Namespace) -> int:
"""Populate chunk_vecs (semantic embeddings) for the --db shard.
Writes a sibling vec0 virtual table; does not touch chunks /
documents / the audit chain. First run loads (and downloads) the
fastembed bge-small-en-v1.5 ONNX model (~130 MB).
"""
import time as _time
from arborist.search.vec import embed_documents, vec_backend_version
quant = getattr(args, "quant", "float32")
rebuild = getattr(args, "rebuild", False)
conn = connect(args.db)
t0 = _time.monotonic()
last_print = [0.0]
def _progress(done: int, total: int) -> None:
now = _time.monotonic()
if now - last_print[0] >= 1.0 or done == total:
pct = (100.0 * done / total) if total else 100.0
rate = done / max(now - t0, 1e-6)
print(
f" embedded {done}/{total} chunks ({pct:.1f}%, {rate:.0f}/s)",
file=sys.stderr,
)
last_print[0] = now
try:
n = embed_documents(
conn,
limit=args.limit,
batch_size=args.batch_size,
incremental=not rebuild,
rebuild=rebuild,
quant=quant,
progress=_progress,
)
except ValueError as e:
print(f"embed error: {e}", file=sys.stderr)
return 2
finally:
conn.close()
elapsed = _time.monotonic() - t0
print(json.dumps({
"db": str(args.db),
"backend_version": vec_backend_version(quant),
"quant": quant,
"mode": "rebuild" if rebuild else "incremental",
"chunks_embedded": n,
"elapsed_s": round(elapsed, 2),
"rate_per_s": round(n / max(elapsed, 1e-6), 1),
}, indent=2))
return 0
def _cmd_verify(args: argparse.Namespace) -> int:
"""Round-trip Merkle proofs on N random documents (chunk 0 each)."""
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
result = verify_random_sample(conn, n=args.n)
finally:
conn.close()
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0 if result["failed"] == 0 else 1
def _cmd_distill(args: argparse.Namespace) -> int:
"""Distill existing documents into cores (surface→core, or core→core+1)."""
from arborist.distill import get_distiller
from arborist.distill.runner import distill_existing
from arborist.store import discover_shards
try:
distiller = get_distiller(args.process)
except ValueError as e:
print(str(e), file=sys.stderr)
return 2
# Sharded mode: iterate over each shard's DB and distill in place.
# Cores stay in their source shard so the per-shard audit/derivation
# chains remain self-contained.
if args.global_shards_dir:
shard_paths = discover_shards(args.global_shards_dir)
if not shard_paths:
print(f"no shards in {args.global_shards_dir}", file=sys.stderr)
return 2
per_shard: list[dict] = []
totals = {
"scanned": 0,
"distilled": 0,
"skipped_existing": 0,
"skipped_cold": 0,
"skipped_empty": 0,
}
for sp in shard_paths:
conn = connect(sp)
try:
r = distill_existing(
conn,
distiller,
kind=args.kind,
source_type=args.source_type,
limit=args.limit,
chunker_name=args.chunker,
batch_size=args.batch_size,
)
finally:
conn.close()
per_shard.append({"shard": sp.name, **r})
for k in totals:
totals[k] += r[k]
print(json.dumps({**totals, "shards": per_shard}, indent=2, ensure_ascii=False))
return 0
conn = connect(args.db)
try:
result = distill_existing(
conn,
distiller,
kind=args.kind,
source_type=args.source_type,
limit=args.limit,
chunker_name=args.chunker,
batch_size=args.batch_size,
)
finally:
conn.close()
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0
def _cmd_ask(args: argparse.Namespace) -> int:
"""Ask a question against one document; verifier classifies the answer."""
import os
from arborist.qa import ask
from arborist.qa.client import OpenAICompatibleClient, StubClient
base_url = args.endpoint or os.environ.get(
"ARBORIST_LLM_ENDPOINT", "https://hermes.ai.unturf.com/v1"
)
model = args.model or os.environ.get(
"ARBORIST_LLM_MODEL",
"adamo1139/Hermes-3-Llama-3.1-8B-FP8-Dynamic",
)
revision = os.environ.get("ARBORIST_LLM_REVISION", "")
quantization = os.environ.get("ARBORIST_LLM_QUANTIZATION", "fp8-dynamic")
api_key = os.environ.get("ARBORIST_LLM_API_KEY")
client: object
if args.dry_run:
client = StubClient(
answer=f"[STUB] would have answered '{args.question}' against root {args.document_root[:16]}…"
)
else:
client = OpenAICompatibleClient(base_url=base_url, api_key=api_key)
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
# Per-call policy override for --answer-mode. Other knobs flow from
# DEFAULT_POLICY.
from arborist.qa.runner import DEFAULT_POLICY as _DEFAULT_ASK_POLICY
call_policy = dict(_DEFAULT_ASK_POLICY)
if getattr(args, "answer_mode", None):
call_policy["answer_mode"] = args.answer_mode
try:
result = ask(
conn,
document_root=args.document_root,
question=args.question,
client=client,
model_id=model,
revision=revision,
quantization=quantization,
policy=call_policy,
)
finally:
conn.close()
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0 if result.get("status") in ("cache_hit", "cache_miss_then_written") else 1
def _cmd_query(args: argparse.Namespace) -> int:
"""Multi-source RAG: question -> top-K corpus docs -> Hermes -> cache."""
import os
from arborist.qa.client import OpenAICompatibleClient, StubClient
from arborist.qa.progress import from_env as _progress_from_env
from arborist.qa.query import query
base_url = args.endpoint or os.environ.get(
"ARBORIST_LLM_ENDPOINT", "https://hermes.ai.unturf.com/v1"
)
model = args.model or os.environ.get(
"ARBORIST_LLM_MODEL",
"adamo1139/Hermes-3-Llama-3.1-8B-FP8-Dynamic",
)
revision = os.environ.get("ARBORIST_LLM_REVISION", "")
quantization = os.environ.get("ARBORIST_LLM_QUANTIZATION", "fp8-dynamic")
api_key = os.environ.get("ARBORIST_LLM_API_KEY")
client: object
if args.dry_run:
client = StubClient(
answer="[STUB] dry-run: would have asked Hermes-3 with the assembled context."
)
else:
client = OpenAICompatibleClient(base_url=base_url, api_key=api_key)
qa_db = args.qa_db
if qa_db is None:
if args.global_shards_dir:
qa_db = Path(args.global_shards_dir) / "qa.db"
else:
qa_db = Path.home() / ".arborist" / "qa.db"
qa_db = Path(qa_db)
shards_dir = (
Path(args.global_shards_dir) if args.global_shards_dir else None
)
single_db = None if shards_dir else args.db
# Apply per-call policy overrides (question_dedup, repair, answer_mode)
# on top of the default. fidelity is a function-level kwarg, not in
# the policy.
from arborist.qa.query import DEFAULT_QUERY_POLICY
call_policy = dict(DEFAULT_QUERY_POLICY)
if getattr(args, "question_dedup", None):
call_policy["question_dedup"] = args.question_dedup
if getattr(args, "answer_mode", None):
call_policy["answer_mode"] = args.answer_mode
if getattr(args, "repair", False):
# Mechanical-only repair when --repair is set; --repair-reprompts
# adds the optional re-prompt tier on top. Both default off so
# `make query` stays single-shot unless a knob is flipped.
call_policy["repair_enabled"] = True
call_policy["repair_max_reprompts"] = max(
0, int(getattr(args, "repair_reprompts", 0))
)
# Ticket #000008 Phase 4 — quantifier-guard CLI overrides.
# Six-level disable hierarchy at Levels 2 (per-call CLI flag)
# via these flags; Level 3 policy fields are reachable via the
# underlying policy dict.
if getattr(args, "no_quantifier_guard", False):
call_policy["quantifier_guard_enabled"] = False
if getattr(args, "allow_broad", False):
# Keeps the classifier on (telemetry stays useful) but
# zeroes out the apply_caps gate so broad shapes don't
# get clipped during emergent search.
call_policy["quantifier_guard_apply_caps"] = False
if getattr(args, "reject_broad", False):
# Phase 4 reject-broad: the actual rejection happens inside
# query() via the policy field; this CLI flag just sets the
# field. See arborist/qa/query.py for the early-return path.
call_policy["quantifier_reject_broad"] = True
if getattr(args, "apply_quantifier_caps", False):
# Operator opts in to flipping the dry-run gate per-call.
# Bench-first per §10.11.3 — this flag is the path from
# dry-run to live-cap.
call_policy["quantifier_guard_apply_caps"] = True
if getattr(args, "crosslang_guard", False):
# Ticket #000001 §7 Phase 0 — opt in to the cross-language
# guard per-call (default OFF). Pure policy-field set; the
# behaviour lives in query() (crosslang.guard + the two seams).
call_policy["crosslang_guard_enabled"] = True
if getattr(args, "crosslang_translate", False):
# Ticket #000056 — Operation Sandwich implies the Phase-0
# guard (it rides the same non-English signal).
call_policy["crosslang_guard_enabled"] = True
call_policy["crosslang_translate_enabled"] = True
# Ticket #000010 — meta-cognition CLI overrides.
if getattr(args, "no_preflight", False):
call_policy["metacognition_enabled"] = False
if getattr(args, "block_on_contradiction", False):
# Strict mode: hard-block on lexical contradictions instead
# of label-only.
call_policy["metacognition_block_on_contradiction"] = True
if getattr(args, "soft_preflight", False):
# Ticket #000011 — opt-in to model-assisted soft preflight
# sidecar. Adds one short LLM round-trip; NEVER gates
# admissibility (D1 preserved).
call_policy["soft_preflight_enabled"] = True
# Ticket #000028 — multi-modality witness override. CLI > policy.
_witness_override = getattr(args, "witness_override", None)
if _witness_override == "on":
call_policy["canonical_witness_enabled"] = True
elif _witness_override == "off":
call_policy["canonical_witness_enabled"] = False
if getattr(args, "no_canonical_preflight", False):
# Disable the math/logic π* short-circuit per-call. Forces
# RAG even on pure-arithmetic / pure-propositional input —
# useful when bench-comparing against the LLM path.
call_policy["canonical_projection_preflight"] = False
progress = _progress_from_env(
cli_override=getattr(args, "progress_override", None),
)
result = query(
question=args.question,
qa_db=qa_db,
chat_client=client,
model_id=model,
revision=revision,
quantization=quantization,
shards_dir=shards_dir,
single_db=single_db,
top_k=args.top_k,
over_fetch=args.over_fetch,
max_context_chars=args.max_context_chars,
policy=call_policy,
fidelity=getattr(args, "fidelity", None),
burn_existing=bool(getattr(args, "burn", False)),
retrieval_keywords=getattr(args, "retrieval_keywords", None),
progress=progress,
)
# Emit unfirehose-compatible session journal. One JSONL file per
# `make query` invocation, written to ~/.arborist/unfirehose/{slug}/
# {session_uuid}.jsonl. Unfirehose's native-harness watcher picks
# this up automatically (no registration). Failures here must NEVER
# break the query path — wrap in a broad except & swallow.
try:
_emit_query_journal(args.question, result, model)
except Exception: # pragma: no cover — best-effort journaling
pass
# Ticket #000031 Phase 3 — stash shards_dir on the result dict
# so the render-layer warrant-chain tail can look up
# warrant-resolver derivations without needing args. Stripped
# before JSON output to keep the json shape stable.
_shards_dir = args.global_shards_dir or args.shards_dir
if _shards_dir:
result["_shards_dir"] = str(_shards_dir)
if args.json:
# Don't leak the internal-only `_shards_dir` field into JSON
# output — strip it before serialize.
json_result = {k: v for k, v in result.items() if not k.startswith("_")}
print(json.dumps(json_result, indent=2, ensure_ascii=False))
else:
print(_render_query_human(result, args.question))
return (
0
if result.get("status") in (
"cache_hit",
"cache_miss_then_written",
"canonical_projection",
)
else 1
)
def _emit_query_journal(question: str, result: dict, model: str) -> None:
"""Write one unfirehose/1.0 session for this query invocation."""
from arborist.journal import SessionWriter
timings = result.get("timings") or {}
answer = result.get("answer_text") or ""
arborist_meta = {
"audit_mode": result.get("audit_mode"),
"verifier_method": result.get("verifier_method"),
"n_quotes": result.get("n_quotes"),
"n_verified": result.get("n_verified"),
"cache_key": result.get("cache_key"),
"cache_status": result.get("status"),
"lookup_path": result.get("lookup_path"),
"violations": [
{"kind": v.get("kind"), "rationale": (v.get("rationale") or "")[:160]}
for v in (result.get("violations") or [])
],
"sources": [
{"title": s.get("title"), "uri": s.get("document_uri"), "used": s.get("used"), "role": s.get("source_role")}
for s in (result.get("sources") or [])
],
"timings_ms": timings,
"answer_mode": (result.get("policy") or {}).get("answer_mode"),
}
with SessionWriter(first_prompt=question) as s:
s.user_message(question)
s.assistant_message(
answer,
model=model,
provider="hermes",
stop_reason="end_turn",
duration_ms=int(timings.get("total_ms") or 0) or None,
arborist_meta=arborist_meta,
)
# Soft-demote violation kinds that demote STRICT to HYBRID without
# rejecting the pointer outright. WARRANT_MISSING / TITLE_MISMATCH /
# DEFLECTION_DETECTED handled separately as hard demotes (their
# presence determines POINTER-LINKED vs ANCHOR-WARRANTED).
_SOFT_DEMOTE_VIOLATION_KINDS = frozenset({
"LAZY_ANCHOR_DEMOTED",
"POINTER_OVERFLOW_TRIMMED",
"TOO_MANY_CLAIMS",
"BARE_NAME_CLAIM",
# FORMAT_COLLAPSED — model abandoned the claim_lattice_pointer
# protocol (multi-line prose, zero [E\d+] tags). Soft-demotes
# STRICT → HYBRID; pairs with the bottom UNGROUNDED rung when the
# parser found nothing groundable, but at least surfaces the
# collapse cause to the operator at audit-line glance.
"FORMAT_COLLAPSED",
# Ticket #000008 Phase 4 — broad-quantifier soft-demotes (§10.3).
# Per §10.3 these stay as soft demotes (cap at ANCHOR-WARRANTED)
# rather than minting a new audit_mode token. The audit-line tail
# (rendered by _render_warrant_tail) names which one fired so an
# operator can tell at a glance.
"BROAD_QUANTIFIER_RUNAWAY", # raw_line_count >> pointer_count
"BROAD_QUANTIFIER_CAP_APPLIED", # preflight cap fired below default
"BROAD_QUANTIFIER_SCOPE_UNBOUND", # unbounded universal reached the LLM
# BROAD_QUANTIFIER_REJECTED is a HARD demote (UNGROUNDED via
# early-return) — listed here for completeness but doesn't
# belong in the soft-demote set.
})
def _ladder_rung_for_lattice(audit_mode: str, violations: list[dict] | None) -> str:
"""Map (audit_mode, violations) → four-rung ladder for claim-lattice
methods. Renderer-only transformation; schema column unchanged.
The ladder names a strictly stronger property at each rung:
POINTER-LINKED pointer/source/chunk verified;
warrant either didn't apply or failed
ANCHOR-WARRANTED pointer-linked AND cited evidence contains
required anchors (warrant ran & passed);
other soft-demote violations may be present
EVIDENCE-WARRANTED anchor-warranted AND no soft demotes
UNGROUNDED n_verified == 0 (existing audit_mode)
HYBRID adds `-PARTIAL` suffix to whichever rung applies.
The rung logic uses the violations list as the signal — no new
verifier output field needed. Three discriminators:
- WARRANT_MISSING in violations → warrant ran and failed at
least one claim → POINTER-LINKED (pointer ok but warrant
didn't anchor the claim)
- any soft-demote kind in violations → ANCHOR-WARRANTED (the
rung is reached but other demotes pulled it back from STRICT)
- no warrant miss AND no soft demotes → EVIDENCE-WARRANTED
"""
if audit_mode == "UNGROUNDED":
return "UNGROUNDED"
kinds = {v.get("kind") for v in (violations or [])}
# Hard demotes: WARRANT_MISSING, TITLE_MISMATCH, DEFLECTION_DETECTED
# all indicate the citation is structurally misaligned with the
# claim or the answer is structurally off-topic — pointer resolved
# but the cited evidence (span, source, or whole answer) doesn't
# actually support the user's question. All three drop to
# POINTER-LINKED. (DEFLECTION_DETECTED was previously a soft demote
# to ANCHOR-WARRANTED; the comeliness/fetish/investitures emergent
# case showed that "the model totally shifted topic but anchored
# the new topic" earned ANCHOR-WARRANTED unfairly. Off-topic
# belongs at the lower rung.)
if (
"WARRANT_MISSING" in kinds
or "TITLE_MISMATCH" in kinds
or "DEFLECTION_DETECTED" in kinds
):
rung = "POINTER-LINKED"
elif kinds & _SOFT_DEMOTE_VIOLATION_KINDS:
rung = "ANCHOR-WARRANTED"
else:
rung = "EVIDENCE-WARRANTED"
if audit_mode == "HYBRID":
rung = rung + "-PARTIAL"
return rung
def _render_audit_label(
audit_mode: str,
verifier_method: str,
violations: list[dict] | None = None,
) -> str:
"""Map (audit_mode, verifier_method, violations) → human-readable
display label.
Schema-level audit_mode names what the lexical verifier could
confirm; the display label names what THAT means in honesty
terms. STRICT in claim_lattice mode = "every pointer resolves
to a valid evidence object whose source_role is allowed AND
every cited span passes the citation-overlap coverage check."
That is NOT full semantic entailment. The display label spells
out the actual property so users don't read STRICT as "the
answer is correct."
Four-rung ladder for claim-lattice methods (per ticket #000005):
POINTER-LINKED pointer verified; warrant either didn't
apply or failed for some claim
ANCHOR-WARRANTED pointer-linked + warrant passed where
it ran; other soft demotes may apply
EVIDENCE-WARRANTED anchor-warranted + no soft demotes
UNGROUNDED no verified pairs
HYBRID audits get a `-PARTIAL` suffix on whichever rung applies.
Plus the verifier-method tail (`· via claim_lattice`,
`· via claim_lattice_pointer`) so the user can see WHICH
verifier path produced the verdict.
For quote / span / entity / paraphrase mode the audit_mode
labels carry less risk of overclaiming (they verify against
pinned spans, not synthesis) — keep them as-is.
`violations` defaults to None for backward compatibility with
callers that don't have access to the violation list. With
None, the ladder falls back to EVIDENCE-WARRANTED (the most
optimistic rung) — operators get the same surface as before
until callers thread violations through.
"""
is_claim_lattice = verifier_method.startswith("claim_lattice")
if is_claim_lattice:
rung = _ladder_rung_for_lattice(audit_mode, violations)
return f"{rung} · via {verifier_method}"
if verifier_method == "canonical_projection":
# Display label for #000027 — persisted canonical answers
# (math/logic π* outputs). The line-rendering site appends
# pi_star_ref as a tail since this helper doesn't have it.
return "CANONICAL · via canonical_projection"
# Quote / span / entity / paraphrase: keep audit_mode as the
# primary token; append method for clarity.
return f"{audit_mode} · via {verifier_method}"
def _render_warrant_tail(result: dict) -> str:
"""Append a tail to the audit-line label that names the specific
warrant failure mode when one fired. Surfaces the cap reason at
the user-facing layer without overloading audit_mode.
Two failure modes today:
- WARRANT_MISSING: relation/date/etc. anchor extracted from
claim doesn't appear in any cited span (per-span check).
- TITLE_MISMATCH: cited evidence's source title shares no
content tokens with the claim (per-source check). 2026-05-02
spin-glass case: claim about spin glass cited to *Quantum
chromodynamics*.
When both fire on different claims of the same answer, surface
both tails so the operator sees the full picture."""
violations = result.get("violations") or []
kinds = {v.get("kind") for v in violations}
parts: list[str] = []
if "WARRANT_MISSING" in kinds:
parts.append("warrant missing")
# Phase 3 of #000031 — positive signal when a per-claim
# warrant_check would have failed but a Merkle-bound warrant
# chain to a primary-source surface exists (cited chunk's
# document is a claim-pack record with a warrant-resolver
# derivation row). Distinct from the source-level
# `_render_warrant_chain_tail` which counts cited SOURCES with
# chains; this counts CLAIMS that survived because of a chain.
proven_idxs = result.get("warrant_proven_claim_idxs") or []
if proven_idxs:
n = len(proven_idxs)
parts.append(f"warrant proven via chain ×{n}")
if "TITLE_MISMATCH" in kinds:
parts.append("title mismatch")
if "FORMAT_COLLAPSED" in kinds:
parts.append("format collapsed")
# Ticket #000008 Phase 4 — broad-quantifier tails (§10.3 / §10.7).
# Each names what the preflight detected so operators don't have to
# parse violation lists by hand. Cap value comes from
# `claim_cap_applied` on the result when present.
if "BROAD_QUANTIFIER_REJECTED" in kinds:
parts.append("broad rejected")
elif "BROAD_QUANTIFIER_CAP_APPLIED" in kinds:
cap = result.get("claim_cap_applied")
parts.append(f"broad cap {cap}" if cap else "broad cap")
elif "BROAD_QUANTIFIER_SCOPE_UNBOUND" in kinds:
parts.append("broad unbounded")
elif "BROAD_QUANTIFIER_RUNAWAY" in kinds:
parts.append("broad runaway")
# Ticket #000010 — meta-cognition logical-status tails. Pulled
# from result["question_state"]["logical_statuses"] when present.
# Doesn't double up with the broad-quantifier tails above (those
# come from the verifier violation list, not the preflight).
qs = result.get("question_state") or {}
statuses = set(qs.get("logical_statuses") or [])
if "false_premise_suspected" in statuses:
parts.append("false premise")
if "contradictory_question" in statuses:
parts.append("contradictory")
if "stale_risk" in statuses:
parts.append("stale risk")
if "out_of_corpus_risk" in statuses:
parts.append("out of corpus")
if "reference_frame_ambiguous" in statuses:
parts.append("frame ambiguous")
# Ticket #000011 — soft preflight sidecar hint. Renders distinctly
# from the hard tails above so an operator can tell at a glance
# that the signal is advisory. Skips SOFT_DISABLED / SOFT_PARSE_FAIL
# / SOFT_WELL_FORMED (no actionable signal).
soft = result.get("soft_preflight_hint") or {}
soft_label = soft.get("classifier_label") or ""
if soft_label and soft_label not in (
"SOFT_DISABLED", "SOFT_PARSE_FAIL", "SOFT_WELL_FORMED",
):
# Strip SOFT_ prefix + lowercase for tail readability
# (e.g. SOFT_FALSE_PREMISE_SUSPECTED → "false premise suspected").
readable = soft_label.removeprefix("SOFT_").lower().replace("_", " ")
parts.append(f"soft: {readable}")
# Ticket #000026 Phase 3 — authorship warrant tail. Shows operator
# the strongest tier the cited evidence supports for an
# authorship-shaped question. Strong tiers (1-4) advertise the
# win; weak tiers (5-6) flag that the warrant rests on shaky
# evidence. NO_AUTHORSHIP_SIGNAL stays silent (sidecar's
# "doesn't apply" verdict).
authorship = result.get("authorship") or {}
auth_tier = authorship.get("tier")
if auth_tier and auth_tier != "NO_AUTHORSHIP_SIGNAL":
# Strip AUTHOR_ prefix + lowercase for tail readability
# (AUTHOR_COPYRIGHT_FOOTER → "copyright-footer").
readable = auth_tier.removeprefix("AUTHOR_").lower().replace("_", "-")
parts.append(f"warrant: {readable}")
if not parts:
return ""
return " · " + " · ".join(parts)
def _maybe_render_json_envelope_as_bullets(answer: str) -> str:
"""If `answer` is a claim_lattice JSON envelope, render bullets.
JSON-mode runs that land UNGROUNDED have no verified claims so the
runtime's bullet renderer produces empty text and `answer_text`
falls back to the raw model output — a `{"claims":[...]}` envelope.
The user then sees raw JSON for failed runs and bullets for
successful ones, which reads as inconsistent. Detect the JSON
shape, parse it (lenient), and render each claim's `text` as a
bullet line tagged with its evidence_ids so the surface stays
consistent across grounded / ungrounded outcomes.
Falls back to the raw input unchanged if:
- input doesn't look like JSON (no leading `{`)
- parse fails (lenient parser exception)
- parse succeeds but the shape isn't `{"claims": [...]}`
"""
stripped = (answer or "").lstrip()
if not stripped.startswith("{") and not stripped.startswith("```"):
return answer
if "claims" not in stripped:
return answer
try:
from arborist.qa.verify import _lenient_json_parse
parsed, _fixups = _lenient_json_parse(answer)
except Exception:
return answer
if not isinstance(parsed, dict):
return answer
raw_claims = parsed.get("claims")
if not isinstance(raw_claims, list) or not raw_claims:
return answer
out_lines: list[str] = []
for c in raw_claims:
if not isinstance(c, dict):
continue
text = c.get("text") or ""
if not isinstance(text, str) or not text.strip():
continue
eids = c.get("evidence_ids") or []
if isinstance(eids, list) and eids:
ids = ",".join(str(x) for x in eids if isinstance(x, str))
out_lines.append(f"- {text.strip()} [{ids}: unverified]")
else:
out_lines.append(f"- {text.strip()}")
return "\n".join(out_lines) if out_lines else answer
def _render_query_human(result: dict, question: str) -> str:
"""Pretty-print a query result for terminal reading.
Layout:
question
AUDIT_MODE N/M verified via verifier_method Xs (cached|fresh)
answer text...
sources (K):
[1] Title — host/path (shard.db)
[2] ...
unverified (J):
- "..."
cache_key: 35ab7d33… <run with --json for full record>
Errors / no-source paths fall back to a short status line.
"""
status = result.get("status")
if status == "broad_quantifier_rejected":
# Phase 4 reject-broad early-return path. The result carries
# an answer_text with the rejection rationale + a violations
# list; render both so the operator sees WHY without --json.
answer_text = result.get("answer_text") or ""
violations = result.get("violations") or []
kind = next(
(v.get("kind") for v in violations
if v.get("kind") == "BROAD_QUANTIFIER_REJECTED"),
"BROAD_QUANTIFIER_REJECTED",
)
intensity = result.get("quantifier_intensity") or "?"
token = result.get("quantifier_matched_token") or "?"
cap = result.get("claim_cap_applied")
cap_str = f" · cap was {cap}" if cap else ""
return (
f"{question}\n"
f" UNGROUNDED · via {kind} · {intensity} (\"{token}\")"
f"{cap_str} 0/0 0.0s (preflight)\n\n"
f"{answer_text}"
)
if status == "canonical_projection":
# Math/logic π* short-circuited the RAG path — the canonical
# bytes ARE the answer. No retrieval, no LLM, no cache.
pi_star_ref = result.get("pi_star_ref", "?")
answer_text = result.get("answer_text") or ""
timings = result.get("timings") or {}
total_ms = timings.get("total_ms")
elapsed = (
f"{total_ms / 1000:.1f}s"
if isinstance(total_ms, (int, float))
else "?"
)
# Ticket #000028 — render-layer witness tail. When witness ran,
# surface the agreement label + per-modality status. Pure
# render-only; cache_key / governance_policy_hash unaffected.
witness = result.get("witness")
witness_tail = ""
witness_block = ""
if witness:
label = witness.get("agreement_label", "?")
mods = witness.get("modalities") or {}
ground_truth_hex = witness.get("canonical_answer_bytes_hex") or ""
agree_count = sum(
1 for name, m in mods.items()
if name != "kernel"
and m.get("ok")
and (m.get("canonical_bytes_hex") or "") == ground_truth_hex
)
checkable = sum(
1 for name, m in mods.items()
if name != "kernel" and m.get("error") != "ABSENT"
)
witness_tail = f" [{label} · {agree_count}/{checkable} modalities agree]"
lines = ["", "witness:"]
for name in ("kernel", "cache", "llm"):
m = mods.get(name)
if not m:
continue
raw = m.get("raw_answer")
err = m.get("error")
ms = int(m.get("elapsed_ms") or 0)
if err == "ABSENT":
detail = "absent"
elif err == "PIS_REJECT":
detail = f'rejected: "{raw}"'
elif err == "TIMEOUT":
detail = "timeout"
elif err and err.startswith("LLM_ERROR"):
detail = err.lower()
else:
detail = f'"{raw}"'
lines.append(f" {name:<7} {detail} ({ms}ms)")
witness_block = "\n".join(lines)
return (
f"{question}\n"
f" CANONICAL · via {pi_star_ref}{witness_tail} {elapsed} (projected)\n\n"
f"{answer_text}"
f"{witness_block}"
)
if status not in ("cache_hit", "cache_miss_then_written"):
msg = result.get("msg") or status or "unknown error"
return f" {status or 'error'}: {msg}"
audit = result.get("audit_mode", "?")
n_quotes = result.get("n_quotes", 0) or 0
n_verified = result.get("n_verified", 0) or 0
method = result.get("verifier_method", "?")
timings = result.get("timings") or {}
total_ms = timings.get("total_ms")
elapsed = f"{total_ms / 1000:.1f}s" if isinstance(total_ms, (int, float)) else "?"
cache_status = "cached" if status == "cache_hit" else "fresh"
lookup_path = result.get("lookup_path")
# Annotate cache_hits when they came from a fallback ckey, not the
# primary one — useful when an agent ran with fidelity=equivalence_class
# and reused another agent's record.
if lookup_path and lookup_path.endswith("_fallback"):
cache_status = f"cached via {lookup_path}"
# Render-layer label honesty: schema audit_mode (STRICT/HYBRID/
# UNGROUNDED) describes what the lexical verifier checked, not
# full semantic entailment. The display label combines audit_mode
# with verifier_method so the user sees what was actually
# verified. Four-rung ladder for claim-lattice methods (#000005):
# POINTER-LINKED pointer verified; warrant didn't apply
# or failed for some claim
# ANCHOR-WARRANTED pointer-linked + warrant passed where
# it ran; other soft demotes may apply
# EVIDENCE-WARRANTED anchor-warranted + no soft demotes
# UNGROUNDED no verified pairs
# Schema column stays unchanged; pure display.
display_label = _render_audit_label(audit, method, result.get("violations"))
warrant_tail = _render_warrant_tail(result)
# Ticket #000031 Phase 3 — when a cited source has a warrant-
# resolver derivation row tying it to a primary-source surface
# chunk, surface a `· warrant: N proven` tail so the user sees
# the chain-of-custody. Pure render layer; cache_key /
# audit_mode / governance_policy_hash all unchanged. The
# _shards_dir is stashed on the result dict by `_cmd_query`
# before render time (the CLI has it; the renderer doesn't).
warrant_chain_tail = _render_warrant_chain_tail(result)
lines: list[str] = []
lines.append(question)
lines.append(
f" {display_label}{warrant_tail}{warrant_chain_tail} {n_verified}/{n_quotes} "
f"{elapsed} ({cache_status})"
)
lines.append("")
answer = result.get("answer_text") or ""
# When JSON-mode runs land UNGROUNDED, rendered_text is empty and
# answer_text falls back to the raw model output — a JSON envelope.
# Parse it and render each claim as a bullet so the user gets the
# same shape whether the run grounded or not. Falls back to raw
# display if parse fails or output isn't JSON-shaped.
answer = _maybe_render_json_envelope_as_bullets(answer)
lines.append(answer)
lines.append("")
sources = result.get("sources") or []
if sources:
lines.append(f"sources ({len(sources)}):")
for i, s in enumerate(sources, start=1):
uri = s.get("document_uri", "")
title = (s.get("title") or "").strip() or _short_path(uri)
shard = s.get("shard")
shard_part = f" ({shard})" if shard else ""
# Render-layer source-role display + used/unused annotation
# (claim_lattice modes only — quote-mode results don't carry
# the per-source `used` flag). Honestly surfaces "the system
# retrieved noise but did not rely on it" so the user can
# see the model ignoring distractors instead of having to
# infer it. Pre-2026-05-01 the source list showed every
# retrieved doc indistinguishably.
role = s.get("source_role")
used = s.get("used")
pointer_ids = s.get("used_pointer_ids") or []
annotations: list[str] = []
if role:
annotations.append(role)
if used is True:
if pointer_ids:
annotations.append(f"used ({','.join(pointer_ids)})")
else:
annotations.append("used")
elif used is False:
annotations.append("unused")
if annotations:
annotation_part = " — " + " — ".join(annotations)
else:
annotation_part = ""
lines.append(
f" [{i}] {title}{annotation_part} — {_strip_scheme(uri)}{shard_part}"
)
lines.append("")
# Retrieval-purity one-line summary (claim_lattice modes only).
# "primary at #R · used N/M sources · M-N noise unused"
# Surfaces noise-resistance at a glance without making the
# user count rows themselves.
purity = result.get("retrieval_purity")
if purity:
primary_rank = purity.get("primary_rank") or 0
used = purity.get("used_sources", 0)
total = purity.get("total_sources", 0)
noise_unused = (
purity.get("noise_sources_count", 0)
- purity.get("noise_sources_used", 0)
)
primary_part = (
f"primary at #{primary_rank}"
if primary_rank > 0 else "no primary in top-K"
)
noise_part = (
f" · {noise_unused} noise unused" if noise_unused else ""
)
lines.append(
f" retrieval purity: {primary_part} · used "
f"{used}/{total} sources{noise_part}"
)
lines.append("")
partially = result.get("partially_verified_quotes") or []
if partially:
lines.append(f"partially grounded ({len(partially)}):")
for q in partially:
qtxt = q if len(q) <= 100 else q[:97] + "..."
lines.append(f' - "{qtxt}"')
lines.append("")
unverified = result.get("unverified_quotes") or []
if unverified:
lines.append(f"unverified ({len(unverified)}):")
for q in unverified:
qtxt = q if len(q) <= 100 else q[:97] + "..."
lines.append(f' - "{qtxt}"')
lines.append("")
# Anchor-smell sidecar (claim_lattice mode only). Soft signal —
# never persisted, never in cache_key. Surfaces when ≥50% of
# verified claim-pointer pairs share one pointer_id AND there
# are at least 3 verified pairs to compare; below 3 the ratio is
# vacuous (1/1 always = 1.00 even when nothing is wrong).
ratio = result.get("lazy_anchor_ratio")
distribution = result.get("pointer_id_distribution") or {}
total_pairs = sum(distribution.values()) if distribution else 0
if (
method == "claim_lattice"
and isinstance(ratio, (int, float))
and ratio >= 0.5
and total_pairs >= 3
):
top_pid, top_count = max(distribution.items(), key=lambda kv: kv[1])
lines.append(
f"lazy-anchor smell: {top_count} of {total_pairs} verified "
f"pairs cite [{top_pid}] (ratio {ratio:.2f}); "
f"distinct pointers cited: {len(distribution)}"
)
lines.append("")
pc = result.get("prompt_chars") or {}
if pc:
# Compact one-liner — operator at a glance: did STRICT come
# from a tight prompt or a context-stuffed one?
lines.append(
f"capacity: prompt {pc.get('messages_total', 0):,} chars "
f"(sys {pc.get('system_prompt', 0):,} + "
f"reminder {pc.get('grounding_reminder', 0):,} + "
f"evidence {pc.get('evidence_or_context', 0):,} + "
f"question {pc.get('user_question', 0):,}) → "
f"answer {result.get('answer_chars', 0):,} chars"
)
# Per-phase timings — surfaces where the per-call cost lands.
# Hermes-bound queries should show `llm_ms` dominating; if
# search_ms or persist_ms creeps up that's a retrieval / WAL
# signal the operator wants visible. Cache-hit rows skip llm
# entirely so the breakdown also tells you which path you're
# paying for.
timings = result.get("timings") or {}
if timings:
parts: list[str] = []
order = (
("cache_lookup_ms", "cache"),
("search_ms", "search"),
("context_ms", "context"),
("llm_ms", "llm"),
("persist_ms", "persist"),
)
for key, label in order:
v = timings.get(key)
if isinstance(v, (int, float)) and v > 0:
parts.append(f"{label} {v / 1000:.2f}s")
total = timings.get("total_ms")
if isinstance(total, (int, float)):
parts.append(f"**total {total / 1000:.2f}s**")
if parts:
lines.append("timings: " + " · ".join(parts))
# Reference-frame notes (Ticket #000002). When detect_frame
# classified the query as `reference`, surface the named work
# so an operator knows the substrate routed to a fictional
# source. Skipped for literal / no-phrase-route / ambiguous
# rows — the line only appears when there's something to say.
fd = result.get("frame_detection") or {}
if fd.get("kind") == "reference" and fd.get("reference_title"):
lines.append("")
lines.append(f"reference frame: {fd['reference_title']}")
if fd.get("reference_uri"):
lines.append(f" cited as the named work in the answer")
cache_key = (result.get("cache_key") or "")[:8]
lines.append(f"cache_key: {cache_key}… <run with --json for full record>")
return "\n".join(lines)
def _strip_scheme(uri: str) -> str:
"""`https://en.wikipedia.org/wiki/X` -> `en.wikipedia.org/wiki/X`."""
for prefix in ("https://", "http://"):
if uri.startswith(prefix):
return uri[len(prefix):]
return uri
def _short_path(uri: str) -> str:
"""Last URL segment as a fallback display name."""
s = _strip_scheme(uri).rstrip("/")
if "/" in s:
return s.rsplit("/", 1)[1]
return s
def _cmd_inspect(args: argparse.Namespace) -> int:
"""Sidecar diagnostic — pulls source chunks for a cache_key and
classifies each unverified span. Read-only; no audit events, no
providence_cache mutations.
"""
from arborist.qa.inspect import inspect_cache_key
qa_db = args.qa_db
if qa_db is None:
qa_db = (
Path(args.global_shards_dir) / "qa.db"
if args.global_shards_dir
else Path.home() / ".arborist" / "qa.db"
)
shards_dir = (
Path(args.global_shards_dir) if args.global_shards_dir else None
)
single_db = None if shards_dir else Path(args.db) if args.db else None
result = inspect_cache_key(
args.cache_key,
qa_db=Path(qa_db),
shards_dir=shards_dir,
single_db=single_db,
)
if args.json:
print(json.dumps(result, indent=2, ensure_ascii=False))
else:
print(_render_inspect_human(result))
return 0 if result.get("status") == "ok" else 1
def _render_inspect_human(result: dict) -> str:
"""Pretty-print an inspect result so an operator can scan
paraphrase vs invention vs trailing-artifact at a glance."""
if result.get("status") != "ok":
return f" {result.get('status', 'error')}: cache_key={result.get('cache_key', '?')}"
rec = result["record"]
ctx = result["context"]
sources = result["sources"]
diagnoses = result["unverified"]
lines: list[str] = []
lines.append(rec["question_text"])
lines.append(
f" {rec['audit_mode']} {rec['n_verified']}/{rec['n_quotes']} verified "
f"via {rec['verifier_method']} state={rec['falsification_state']}"
)
lines.append("")
lines.append(
f"context: {ctx['raw_chars']:,} raw -> {ctx['base_chars']:,} base "
f"(wikitext-strip {'on' if ctx['wikitext_strip_active'] else 'off'})"
)
if sources:
lines.append(f"sources ({len(sources)}):")
for i, s in enumerate(sources, start=1):
lines.append(
f" [{i}] {s.get('title') or '(untitled)'} — "
f"{s.get('chunk_count', '?')} chunks, "
f"{s.get('raw_chars', 0):,} chars"
)
lines.append("")
coh = result.get("coherence") or {}
if coh.get("kind") not in (None, "ok", "empty"):
ev = coh.get("evidence") or {}
if coh["kind"] == "phrase_component_reuse":
detail = f" (reuses {ev.get('reused_tokens')} from a quoted phrase)"
else: # circular | vacuous
detail = f" (subject={ev.get('subject_tokens')})"
lines.append(f" · incoherent: {coh['kind']}{detail}")
lines.append(f" sentence: {_short(ev.get('sentence', ''), 140)}")
lines.append("")
if not diagnoses:
lines.append("(no unverified spans)")
return "\n".join(lines)
lines.append(f"unverified diagnoses ({len(diagnoses)}):")
for i, d in enumerate(diagnoses, start=1):
span = d.get("span", "")
diag = d.get("diagnosis", "?")
lines.append("")
lines.append(f" [{i}] {diag}")
lines.append(f" span: {_short(span, 140)}")
if diag == "trailing_artifact":
lines.append(f" matched_prefix_chars: {d.get('matched_prefix_chars')}")
lines.append(f" trailing_artifact: {_short(d.get('trailing_artifact', ''), 100)}")
elif diag == "synthetic_elision_inside_quote":
lines.append(
f" [...] inserted by model — "
f"{d.get('prefix_chars', 0)} prefix chars "
f"({'in source' if d.get('prefix_in_source') else 'NOT in source'}), "
f"{d.get('suffix_chars', 0)} suffix chars "
f"({'in source' if d.get('suffix_in_source') else 'NOT in source'})"
)
elif diag == "interior_elision":
lines.append(
f" matched: {d.get('matched_prefix_chars')} prefix + "
f"{d.get('matched_suffix_chars')} suffix chars (parenthetical aside dropped)"
)
lines.append(f" dropped_aside: {_short(d.get('dropped_aside', ''), 120)}")
elif diag in ("paraphrase", "partial_paraphrase"):
lines.append(f" token_coverage: {d.get('token_coverage')}")
counts = d.get("token_counts", {})
if counts:
top = ", ".join(f"{k}×{v}" for k, v in list(counts.items())[:6])
lines.append(f" tokens_in_base: {top}")
missing = d.get("missing_tokens", [])
if missing:
lines.append(f" missing_tokens: {missing[:8]}")
elif diag == "no_overlap":
lines.append(f" tokens_checked: {d.get('tokens_checked', '?')}")
lines.append(f" tokens_present: {d.get('tokens_present', '?')}")
repair = d.get("repair")
if repair:
action = repair.get("action", "?")
reason = repair.get("reason", "")
line = f" repair: {action}"
if reason:
line += f" ({reason})"
lines.append(line)
return "\n".join(lines)
def _short(s: str, n: int) -> str:
"""Truncate string to n chars with ellipsis."""
return s if len(s) <= n else s[: n - 3] + "..."
def _cmd_losses(args: argparse.Namespace) -> int:
"""Sidecar diagnostic: list adapter_loss_reports rows.
Read-only. Filters: --document-root, --chunk-id, --kind, --stage.
With --summary, aggregates per (stage, loss_kind) for a single
document. See ticket #000022 §3.6.
"""
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
where: list[str] = []
params: list = []
if args.document_root:
where.append("document_root = ?")
params.append(args.document_root)
if args.chunk_id is not None:
where.append("chunk_id = ?")
params.append(args.chunk_id)
if args.kind:
where.append("loss_kind = ?")
params.append(args.kind)
if args.stage:
where.append("stage = ?")
params.append(args.stage)
clause = (" WHERE " + " AND ".join(where)) if where else ""
if args.summary:
sql = (
"SELECT stage, loss_kind, loss_mode, "
" SUM(bytes_dropped) AS total_bytes, "
" SUM(occurrence_count) AS total_occ, "
" COUNT(*) AS row_count "
"FROM adapter_loss_reports"
+ clause +
" GROUP BY stage, loss_kind, loss_mode "
"ORDER BY stage, total_bytes DESC"
)
rows = conn.execute(sql, params).fetchall()
if args.json:
out = [dict(r) for r in rows]
print(json.dumps(out, indent=2, ensure_ascii=False))
else:
if not rows:
print("(no loss rows match)")
return 0
print(f"{'stage':<18} {'kind':<28} {'mode':<11} {'occ':>7} {'bytes':>10} {'rows':>6}")
for r in rows:
print(
f"{r['stage']:<18} {r['loss_kind']:<28} "
f"{r['loss_mode']:<11} {r['total_occ']:>7,} "
f"{r['total_bytes']:>10,} {r['row_count']:>6,}"
)
return 0
sql = (
"SELECT chunk_id, document_root, stage, canonicalization_version, "
" loss_kind, loss_mode, bytes_dropped, occurrence_count, "
" input_length_bytes, output_length_bytes, sample_excerpt, "
" sample_hash, adapter_name, adapter_version, "
" loss_report_policy_hash, created_at "
"FROM adapter_loss_reports"
+ clause +
" ORDER BY chunk_id, stage, loss_kind LIMIT ?"
)
params.append(args.limit)
rows = conn.execute(sql, params).fetchall()
if args.json:
out = [dict(r) for r in rows]
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
if not rows:
print("(no loss rows match)")
return 0
for r in rows:
print(
f"chunk={r['chunk_id']} doc={r['document_root'][:12]}.. "
f"stage={r['stage']} kind={r['loss_kind']} "
f"mode={r['loss_mode']} occ={r['occurrence_count']} "
f"bytes={r['bytes_dropped']}"
)
if r["sample_excerpt"]:
print(f" excerpt: {_short(r['sample_excerpt'], 120)!r}")
elif r["sample_hash"]:
print(f" sample_hash: {r['sample_hash'][:16]}..")
return 0
finally:
conn.close()
def _falsify_cache_key(
cache_key_value: str,
*,
state: str,
reason: str,
by_actor: str,
shards_dir: Path | None,
db_path: Path | None,
) -> dict:
"""Mark a providence_cache record as failed/stale/quarantined across shards.
Searches every shard for the cache_key (it lives in exactly one).
Updates the row's falsification_state, appends a falsification log
entry, and writes a 'falsify' audit event so the chain records the act.
"""
import time as _time
from arborist.store import append_audit, discover_shards, transaction
if state not in ("failed", "stale", "quarantined"):
return {"status": "invalid_state", "value": state}
paths: list[Path] = (
discover_shards(shards_dir) if shards_dir else [Path(db_path)]
)
for sp in paths:
c = connect(sp)
try:
row = c.execute(
"SELECT cache_key, falsification_state FROM providence_cache "
"WHERE cache_key = ?",
(cache_key_value,),
).fetchone()
if row is None:
continue
now = int(_time.time())
with transaction(c):
event_hash = append_audit(
c,
event_type="falsify",
subject_root=cache_key_value,
body={
"cache_key": cache_key_value,
"from_state": row["falsification_state"],
"to_state": state,
"reason": reason,
"by_actor": by_actor,
},
ts=now,
)
c.execute(
"UPDATE providence_cache "
"SET falsification_state = ?, audit_event_hash = ? "
"WHERE cache_key = ?",
(state, event_hash, cache_key_value),
)
c.execute(
"INSERT INTO falsifications "
"(cache_key, state, reason, by_actor, at, audit_event_hash) "
"VALUES (?, ?, ?, ?, ?, ?)",
(cache_key_value, state, reason, by_actor, now, event_hash),
)
return {
"status": "falsified",
"cache_key": cache_key_value,
"shard": sp.name,
"from_state": row["falsification_state"],
"to_state": state,
"reason": reason,
"by_actor": by_actor,
"audit_event_hash": event_hash,
"ts": now,
}
finally:
c.close()
return {"status": "not_found", "cache_key": cache_key_value}
def _burn_cache_key(
cache_key_value: str,
*,
reason: str,
by_actor: str,
shards_dir: Path | None,
db_path: Path | None,
force: bool = False,
) -> dict:
"""Delete a providence_cache leaf, but only if it has no children.
"Kindergarten of a tree's genesis" — early/scratch use. Falsify keeps
history; burn removes the row. Children today = falsifications
referencing this cache_key. If any exist, refuse without ``--force``.
Always writes a 'providence_burn' audit event so the chain records
that a leaf was removed and why. Use ``arborist providence --falsify``
instead when downstream consumers may have built on this answer.
"""
import time as _time
from arborist.store import append_audit, discover_shards, transaction
paths: list[Path] = (
discover_shards(shards_dir) if shards_dir else [Path(db_path)]
)
for sp in paths:
c = connect(sp)
try:
row = c.execute(
"SELECT cache_key, audit_mode, n_verified, falsification_state, "
" question_text FROM providence_cache WHERE cache_key = ?",
(cache_key_value,),
).fetchone()
if row is None:
continue
child_falsifications = c.execute(
"SELECT COUNT(*) FROM falsifications WHERE cache_key = ?",
(cache_key_value,),
).fetchone()[0]
if child_falsifications > 0 and not force:
return {
"status": "refused_has_children",
"cache_key": cache_key_value,
"shard": sp.name,
"child_falsifications": int(child_falsifications),
"hint": "use --force to burn anyway, or 'providence --falsify' to keep history",
}
now = int(_time.time())
# DELETE + burn audit event are one atomic unit — never a burned
# cache row without its audit event, never the reverse.
with transaction(c):
c.execute(
"DELETE FROM providence_cache WHERE cache_key = ?",
(cache_key_value,),
)
event_hash = append_audit(
c,
event_type="providence_burn",
subject_root=cache_key_value,
body={
"cache_key": cache_key_value,
"burned_audit_mode": row["audit_mode"],
"burned_n_verified": int(row["n_verified"]),
"burned_state": row["falsification_state"],
"question_text": row["question_text"],
"reason": reason,
"by_actor": by_actor,
"child_falsifications_at_burn": int(child_falsifications),
"forced": bool(child_falsifications > 0 and force),
},
ts=now,
)
return {
"status": "burned",
"cache_key": cache_key_value,
"shard": sp.name,
"burned_audit_mode": row["audit_mode"],
"reason": reason,
"by_actor": by_actor,
"audit_event_hash": event_hash,
"ts": now,
}
finally:
c.close()
return {"status": "not_found", "cache_key": cache_key_value}
def _count_document_children(c, document_root: str) -> dict:
"""Count outbound child references that 'burn' must protect.
For a document/core leaf, "children" = anything downstream that built on
this row. Specifically:
- derivations rows where ``src_root = root`` (a core was distilled
from this — burning would orphan or silently cascade-truncate the
derivation, leaving the descendant core dangling).
- edges rows where ``dst_root = root`` (other documents link to this
one; burning leaves dangling references).
- providence_cache rows where ``source_root = root`` (Q&A grounded
in this document).
NOTE on schema: derivations has ON DELETE CASCADE on BOTH ``core_root``
AND ``src_root``. Without this gate, a bare DELETE FROM documents would
silently cascade-prune derivations and orphan downstream cores.
"""
derivations_downstream = c.execute(
"SELECT COUNT(*) FROM derivations WHERE src_root = ?",
(document_root,),
).fetchone()[0]
incoming_edges = c.execute(
"SELECT COUNT(*) FROM edges WHERE dst_root = ?",
(document_root,),
).fetchone()[0]
providence_refs = c.execute(
"SELECT COUNT(*) FROM providence_cache WHERE source_root = ?",
(document_root,),
).fetchone()[0]
return {
"derivations_downstream": int(derivations_downstream),
"incoming_edges": int(incoming_edges),
"providence_refs": int(providence_refs),
}
def _burn_document_root(
document_root_value: str,
*,
reason: str,
by_actor: str,
shards_dir: Path | None,
db_path: Path | None,
force: bool = False,
) -> dict:
"""Delete a surface document leaf, but only if it has no children.
Children:
- derivations.src_root = root (downstream cores derived from it)
- edges.dst_root = root (other docs link to it)
- providence_cache.source_root = root (Q&A grounded in it)
On burn:
- DELETE FROM chunks_fts (FTS5 has no FK; clear before chunks vanish).
- DELETE FROM documents — cascades to chunks + merkle_nodes via FK.
- Append a 'document_burn' audit event recording counts + forced flag.
Refuses with status='refused_has_children' (and skips the audit event)
when any child count > 0 and ``--force`` is not set, so callers can fix
state and retry idempotently.
"""
import time as _time
from arborist.store import append_audit, discover_shards, transaction
paths: list[Path] = (
discover_shards(shards_dir) if shards_dir else [Path(db_path)]
)
for sp in paths:
c = connect(sp)
try:
row = c.execute(
"SELECT document_root, document_uri, kind, title, source_type, "
" chunking_version, canonicalization_version, schema_version "
"FROM documents WHERE document_root = ? AND kind = 'surface'",
(document_root_value,),
).fetchone()
if row is None:
continue
counts = _count_document_children(c, document_root_value)
total_children = sum(counts.values())
if total_children > 0 and not force:
return {
"status": "refused_has_children",
"document_root": document_root_value,
"kind": "surface",
"shard": sp.name,
**counts,
"hint": "use --force to burn anyway (orphans descendants); "
"prefer evict for cold-tier compression",
}
chunk_count = c.execute(
"SELECT COUNT(*) FROM chunks WHERE document_root = ?",
(document_root_value,),
).fetchone()[0]
now = int(_time.time())
with transaction(c):
# FTS5 has no FK to chunks; clear by chunk_id before the
# CASCADE on documents wipes the rows that resolve them.
for cr in c.execute(
"SELECT chunk_id FROM chunks WHERE document_root = ?",
(document_root_value,),
).fetchall():
c.execute(
"DELETE FROM chunks_fts WHERE rowid = ?",
(cr["chunk_id"],),
)
# Outbound edges (src_root = this) carry no FK; clean them
# explicitly so we don't leave half-edges pointing from a
# ghost. Incoming edges (dst_root = this) are already gated
# above by the children check.
c.execute(
"DELETE FROM edges WHERE src_root = ?",
(document_root_value,),
)
# documents -> chunks/merkle_nodes/derivations cascade via FK.
c.execute(
"DELETE FROM documents WHERE document_root = ?",
(document_root_value,),
)
event_hash = append_audit(
c,
event_type="document_burn",
subject_root=document_root_value,
body={
"document_root": document_root_value,
"document_uri": row["document_uri"],
"kind": "surface",
"title": row["title"],
"source_type": row["source_type"],
"burned_chunk_count": int(chunk_count),
"child_counts_at_burn": counts,
"reason": reason,
"by_actor": by_actor,
"forced": bool(total_children > 0 and force),
},
ts=now,
)
return {
"status": "burned",
"document_root": document_root_value,
"kind": "surface",
"shard": sp.name,
"burned_chunk_count": int(chunk_count),
"child_counts_at_burn": counts,
"reason": reason,
"by_actor": by_actor,
"audit_event_hash": event_hash,
"ts": now,
}
finally:
c.close()
return {"status": "not_found", "document_root": document_root_value, "kind": "surface"}
def _burn_core_root(
document_root_value: str,
*,
reason: str,
by_actor: str,
shards_dir: Path | None,
db_path: Path | None,
force: bool = False,
) -> dict:
"""Delete a core document leaf, but only if it has no children.
Same children gates as ``_burn_document_root`` (derivations.src_root,
edges.dst_root, providence_cache.source_root). The "PLUS no further
derivations build cores from this core" rule from the spec is
structurally identical to derivations.src_root > 0 — a core acts as a
src_root only when something deeper distilled from it.
CLAUDE.md says "Cores never evict" — that's the eviction subsystem,
which only touches kind='surface'. Burn is operator-driven removal:
cores CAN be burned, but the children gate is enforced.
Audit event type is 'core_burn' so chain consumers can distinguish
surface vs core leaf removals at a glance.
"""
import time as _time
from arborist.store import append_audit, discover_shards, transaction
paths: list[Path] = (
discover_shards(shards_dir) if shards_dir else [Path(db_path)]
)
for sp in paths:
c = connect(sp)
try:
row = c.execute(
"SELECT document_root, document_uri, kind, title, source_type, "
" compression_depth, chunking_version, "
" canonicalization_version, schema_version "
"FROM documents WHERE document_root = ? AND kind = 'core'",
(document_root_value,),
).fetchone()
if row is None:
continue
counts = _count_document_children(c, document_root_value)
total_children = sum(counts.values())
if total_children > 0 and not force:
return {
"status": "refused_has_children",
"document_root": document_root_value,
"kind": "core",
"shard": sp.name,
**counts,
"hint": "use --force to burn anyway; cores carry "
"downstream derivations that will be orphaned",
}
chunk_count = c.execute(
"SELECT COUNT(*) FROM chunks WHERE document_root = ?",
(document_root_value,),
).fetchone()[0]
# Inbound derivations (where this core is core_root, i.e. its
# binding back to source surfaces). These are NOT children —
# they're the core's own provenance and cascade-delete with it.
inbound_derivations = c.execute(
"SELECT COUNT(*) FROM derivations WHERE core_root = ?",
(document_root_value,),
).fetchone()[0]
now = int(_time.time())
with transaction(c):
for cr in c.execute(
"SELECT chunk_id FROM chunks WHERE document_root = ?",
(document_root_value,),
).fetchall():
c.execute(
"DELETE FROM chunks_fts WHERE rowid = ?",
(cr["chunk_id"],),
)
c.execute(
"DELETE FROM edges WHERE src_root = ?",
(document_root_value,),
)
c.execute(
"DELETE FROM documents WHERE document_root = ?",
(document_root_value,),
)
event_hash = append_audit(
c,
event_type="core_burn",
subject_root=document_root_value,
body={
"document_root": document_root_value,
"document_uri": row["document_uri"],
"kind": "core",
"title": row["title"],
"source_type": row["source_type"],
"compression_depth": int(row["compression_depth"]),
"burned_chunk_count": int(chunk_count),
"burned_inbound_derivations": int(inbound_derivations),
"child_counts_at_burn": counts,
"reason": reason,
"by_actor": by_actor,
"forced": bool(total_children > 0 and force),
},
ts=now,
)
return {
"status": "burned",
"document_root": document_root_value,
"kind": "core",
"shard": sp.name,
"burned_chunk_count": int(chunk_count),
"burned_inbound_derivations": int(inbound_derivations),
"child_counts_at_burn": counts,
"reason": reason,
"by_actor": by_actor,
"audit_event_hash": event_hash,
"ts": now,
}
finally:
c.close()
return {"status": "not_found", "document_root": document_root_value, "kind": "core"}
def _cmd_burn_kindergarten(args: argparse.Namespace) -> int:
"""Burn all providence_cache rows younger than the kindergarten window.
Test-ergonomic mass burn: when iterating on retrieval/verifier knobs
you want to wipe recent test runs without finding each cache_key.
Mirrors the kindergarten window from `mesh sync` so what's still
"private" (un-broadcast) is also what's safe to bust without
confusing peers.
Each row goes through the standard `_burn_cache_key` so the
children gate is honored (use `--force` to override en masse).
Each successful burn writes one ``providence_burn`` audit event;
chain integrity is verifiable via `make chain-check-shards` after.
"""
import time as _time
from arborist.store import discover_shards
now = int(_time.time())
# `kindergarten_seconds <= 0` means "no time gate — burn every live
# row" per the verb's docstring. The earlier `cutoff = now - 0`
# treated 0 as a same-second-only window, which made
# test_burn_kindergarten_zero_seconds_burns_everything timing-flaky:
# if the wall-clock second rolled over between seed and burn,
# cutoff > seed.created_at and nothing matched.
kindergarten_seconds = max(0, args.kindergarten_seconds)
no_time_gate = kindergarten_seconds == 0
cutoff = 0 if no_time_gate else now - kindergarten_seconds
shards_dir = Path(args.global_shards_dir) if args.global_shards_dir else None
single_db = Path(args.db) if args.db else None
paths: list[Path] = (
discover_shards(shards_dir) if shards_dir else [single_db]
)
actor = args.by_actor or os.environ.get("USER", "unknown")
reason = args.reason or f"burn-kindergarten window={args.kindergarten_seconds}s"
examined = 0
burned = 0
refused = 0
not_found = 0
items: list[dict] = []
for sp in paths:
c = connect(sp)
try:
if no_time_gate:
rows = c.execute(
"SELECT cache_key, created_at, audit_mode "
"FROM providence_cache "
"WHERE falsification_state = 'live' "
"ORDER BY created_at DESC"
).fetchall()
else:
rows = c.execute(
"SELECT cache_key, created_at, audit_mode "
"FROM providence_cache "
"WHERE created_at >= ? AND falsification_state = 'live' "
"ORDER BY created_at DESC",
(cutoff,),
).fetchall()
finally:
c.close()
for r in rows:
examined += 1
if args.dry_run:
items.append({
"cache_key": r["cache_key"],
"audit_mode": r["audit_mode"],
"created_at": r["created_at"],
"would_burn": True,
})
continue
result = _burn_cache_key(
r["cache_key"],
reason=reason,
by_actor=actor,
shards_dir=shards_dir,
db_path=single_db,
force=bool(args.force),
)
status = result.get("status")
if status == "burned":
burned += 1
elif status == "refused_has_children":
refused += 1
else:
not_found += 1
items.append(result)
print(json.dumps({
"status": "dry_run" if args.dry_run else "burned",
"kindergarten_seconds": args.kindergarten_seconds,
"cutoff_at": cutoff,
"now": now,
"examined": examined,
"burned": burned,
"refused_has_children": refused,
"not_found": not_found,
"items": items[: args.verbose],
}, indent=2, ensure_ascii=False))
return 0
def _cmd_burn(args: argparse.Namespace) -> int:
"""CLI: burn a leaf with no children.
Dispatches on ``--kind`` to the matching helper. Default 'providence'
preserves the original surface (`--cache-key` only) so existing scripts
keep working. Document/core kinds use ``--root``.
"""
kind = getattr(args, "kind", "providence") or "providence"
shards = Path(args.global_shards_dir) if args.global_shards_dir else None
db = Path(args.db) if args.db else None
actor = args.by_actor or os.environ.get("USER", "unknown")
reason = args.reason or ""
force = bool(args.force)
if kind == "providence":
if not getattr(args, "cache_key", None):
print("burn --kind providence requires --cache-key", file=sys.stderr)
return 2
result = _burn_cache_key(
args.cache_key,
reason=reason,
by_actor=actor,
shards_dir=shards,
db_path=db,
force=force,
)
elif kind in ("document", "core"):
if not getattr(args, "root", None):
print(f"burn --kind {kind} requires --root", file=sys.stderr)
return 2
helper = _burn_document_root if kind == "document" else _burn_core_root
result = helper(
args.root,
reason=reason,
by_actor=actor,
shards_dir=shards,
db_path=db,
force=force,
)
else:
print(f"unknown burn kind: {kind}", file=sys.stderr)
return 2
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0 if result.get("status") == "burned" else 1
def _cmd_providence(args: argparse.Namespace) -> int:
"""List providence_cache records or falsify one by cache_key."""
if getattr(args, "falsify", None):
result = _falsify_cache_key(
args.falsify,
state=args.state,
reason=args.reason or "",
by_actor=args.by_actor or os.environ.get("USER", "unknown"),
shards_dir=Path(args.global_shards_dir) if args.global_shards_dir else None,
db_path=Path(args.db) if args.db else None,
)
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0 if result.get("status") == "falsified" else 1
if getattr(args, "show_preflight", None):
# Ticket #000009 §7.2 — pull the preflight stage payload
# from a row's run_dag_blob. Operator tool for inspecting
# the policy state that governed the cached row.
return _cmd_providence_show_preflight(
cache_key_prefix=args.show_preflight,
shards_dir=args.global_shards_dir,
db=args.db,
)
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
if args.document_uri:
rows = conn.execute(
"SELECT cache_key, question_text, answer_text, falsification_state, "
" hit_count, created_at FROM providence_cache "
"WHERE document_uri = ? ORDER BY created_at DESC",
(args.document_uri,),
).fetchall()
elif args.source_root:
rows = conn.execute(
"SELECT cache_key, question_text, answer_text, falsification_state, "
" hit_count, created_at FROM providence_cache "
"WHERE source_root = ? ORDER BY created_at DESC",
(args.source_root,),
).fetchall()
else:
rows = conn.execute(
"SELECT cache_key, question_text, answer_text, falsification_state, "
" hit_count, created_at FROM providence_cache "
"ORDER BY created_at DESC LIMIT ?",
(args.limit,),
).fetchall()
finally:
conn.close()
out = [
{
"cache_key": r["cache_key"],
"state": r["falsification_state"],
"hit_count": r["hit_count"],
"question": r["question_text"],
"answer": r["answer_text"],
"created_at": r["created_at"],
}
for r in rows
]
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
def _cmd_providence_show_preflight(
*,
cache_key_prefix: str,
shards_dir: str | None,
db: str | None,
) -> int:
"""Render the ``preflight`` stage payload for a cached row.
Ticket #000009 §7.2. Pulls ``run_dag_blob`` for the matching
cache row, parses the JSON, finds the ``preflight`` node, and
pretty-prints the five nested CTI clauses (classifier,
answer_contract, prompt_contract, evidence_contract,
policy_refs) plus the metacognition QuestionState.
Match is by 12-char prefix on ``cache_key`` (matches what
bench rows + `_render_query_human` already truncate to).
Returns 0 on success, 1 on miss / parse failure.
"""
conn = (
connect_query(db, shards_dir=shards_dir)
if shards_dir
else connect(db)
)
try:
rows = conn.execute(
"SELECT cache_key, question_text, run_dag_blob "
"FROM providence_cache WHERE cache_key LIKE ? "
"ORDER BY created_at DESC LIMIT 5",
(cache_key_prefix + "%",),
).fetchall()
finally:
conn.close()
if not rows:
print(
f" no providence_cache row matching cache_key prefix "
f"'{cache_key_prefix}'", file=sys.stderr,
)
return 1
if len(rows) > 1:
print(
f" {len(rows)} rows match prefix '{cache_key_prefix}'; "
"rendering most recent. Pass a longer prefix to disambiguate.",
file=sys.stderr,
)
row = rows[0]
blob = row["run_dag_blob"]
if not blob:
print(
f" cache_key {row['cache_key'][:12]}: no run_dag_blob "
"(legacy row, predates #000009)",
file=sys.stderr,
)
return 1
try:
parsed = json.loads(blob)
except json.JSONDecodeError as exc:
print(f" run_dag_blob parse error: {exc}", file=sys.stderr)
return 1
nodes = parsed.get("nodes") or []
preflight_node = next(
(n for n in nodes if isinstance(n, dict)
and n.get("stage") == "preflight"),
None,
)
if preflight_node is None:
print(
f" cache_key {row['cache_key'][:12]}: run_dag has no "
"preflight stage (predates #000009 binding)",
file=sys.stderr,
)
return 1
# Pull the full preflight payload (Ticket #000009 §7.2 — payload
# now persisted alongside nodes via build_run_dag's
# preflight_payload kwarg). Fall back to hash-only render for
# legacy rows whose blob predates the payload-storage commit.
payload = parsed.get("preflight_payload")
out: dict = {
"cache_key": row["cache_key"][:12],
"question": row["question_text"],
"preflight_stage_hash": preflight_node.get("hash"),
"preflight_hash_12": (preflight_node.get("hash") or "")[:12],
"run_dag_root": parsed.get("root"),
"run_dag_stages": [n.get("stage") for n in nodes],
}
if payload is not None:
# Verify the persisted payload hashes to the persisted leaf.
# Mismatch would indicate post-write tampering or a serialization
# drift; surface it explicitly so an auditor can detect.
from arborist.qa.dag import _canonical_json, _sha256_hex
recomputed = _sha256_hex(_canonical_json(payload))
out["preflight_payload"] = payload
out["payload_hash_check"] = (
"ok" if recomputed == preflight_node.get("hash")
else f"MISMATCH (recomputed {recomputed[:12]} != stored {preflight_node.get('hash', '')[:12]})"
)
else:
out["preflight_payload"] = None
out["payload_hash_check"] = (
"unavailable: legacy row predates preflight_payload "
"persistence (Ticket #000009 §7.2)"
)
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
def _cmd_controller_events(args: argparse.Namespace) -> int:
"""List ``controller_events`` rows from one shard or every shard.
Read-only inspector for the Phase 2 advisory writes
(``arborist.qa.runner._emit_qa_controller_advisory``). Surfaces
decision / difficulty / budget_allocation rows so operators can
measure the Retrigger 1 signal (#000045 §3) without raw SQL.
"""
import sqlite3 as _sqlite3
import time as _time
shards_dir = args.global_shards_dir
db_path = args.db
if shards_dir:
shard_paths = sorted(Path(shards_dir).glob("*.db"))
elif db_path:
shard_paths = [Path(db_path)]
else:
shard_paths = [Path(DEFAULT_DB_PATH)]
where: list[str] = []
params: list = []
if args.kind:
where.append("event_kind = ?")
params.append(args.kind)
if args.organism_prefix:
where.append("organism_root LIKE ?")
params.append(args.organism_prefix + "%")
if args.since_seconds is not None:
where.append("recorded_at >= ?")
params.append(int(_time.time()) - args.since_seconds)
where_sql = (" WHERE " + " AND ".join(where)) if where else ""
sql = (
"SELECT event_id, organism_root, branch_id, event_kind, label,"
" entropy, difficulty, allocation, body_blob, recorded_at"
" FROM controller_events"
+ where_sql
+ " ORDER BY recorded_at DESC, event_id DESC LIMIT ?"
)
out: list[dict] = []
summary: dict[str, int] = {}
remaining = args.limit
for sp in shard_paths:
if remaining <= 0:
break
if sp.suffix != ".db" or "-shm" in sp.name or "-wal" in sp.name:
continue
try:
conn = _sqlite3.connect(f"file:{sp}?mode=ro", uri=True)
except _sqlite3.OperationalError:
continue
conn.row_factory = _sqlite3.Row
has_table = conn.execute(
"SELECT name FROM sqlite_master "
"WHERE type='table' AND name='controller_events'"
).fetchone()
if not has_table:
conn.close()
continue
try:
rows = conn.execute(sql, [*params, remaining]).fetchall()
except _sqlite3.OperationalError:
conn.close()
continue
conn.close()
for r in rows:
kind = r["event_kind"]
summary[kind] = summary.get(kind, 0) + 1
entry = {
"shard": sp.name,
"event_id": r["event_id"],
"kind": kind,
"organism_root": r["organism_root"],
"branch_id": r["branch_id"],
"label": r["label"],
"entropy": r["entropy"],
"difficulty": r["difficulty"],
"allocation": r["allocation"],
"recorded_at": r["recorded_at"],
"recorded_at_iso": _time.strftime(
"%Y-%m-%dT%H:%M:%SZ", _time.gmtime(r["recorded_at"])
),
}
if args.body:
try:
entry["body"] = json.loads(r["body_blob"])
except (TypeError, ValueError):
entry["body"] = r["body_blob"]
out.append(entry)
remaining = args.limit - len(out)
if args.json:
print(json.dumps(
{"summary": summary, "rows": out},
indent=2, ensure_ascii=False,
))
return 0
if not out:
print("(no controller_events rows matched)")
return 0
print(f"# controller_events — {len(out)} row(s) across "
f"{len({e['shard'] for e in out})} shard(s)")
for kind, n in sorted(summary.items(), key=lambda kv: -kv[1]):
print(f"# {kind}: {n}")
print()
print(
f"{'shard':<10} {'kind':<29} {'label':<10} "
f"{'diff':>6} {'alloc':>6} {'recorded_at_iso'} organism"
)
for e in out:
org = (e["organism_root"] or "")[:48]
diff = "-" if e["difficulty"] is None else f"{e['difficulty']:.2f}"
alloc = "-" if e["allocation"] is None else f"{e['allocation']:.2f}"
print(
f"{e['shard']:<10} {e['kind']:<29} "
f"{(e['label'] or '-'):<10} {diff:>6} {alloc:>6} "
f"{e['recorded_at_iso']} {org}"
)
return 0
def _load_record_context(row, shards_dir, qa_db):
"""Reassemble context for a providence record. Returns text or None
if any source doc has no hot chunks (cold)."""
from arborist.qa.query import _load_doc_text
proof = json.loads(row["merkle_proof"])
sources = proof.get("sources", [])
if not sources:
return None
parts: list[str] = []
for src in sources:
shard_name = src.get("shard")
if not shard_name:
return None
if shards_dir:
shard_path = shards_dir / shard_name
else:
shard_path = qa_db.parent / shard_name
if not shard_path.exists():
return None
text = _load_doc_text(str(shard_path), src["document_root"])
if not text:
return None
parts.append(text)
return "\n\n".join(parts)
def _cmd_reclassify(args: argparse.Namespace) -> int:
"""Re-run the layered verifier against existing live providence records.
Reads each record's answer + reassembles its context from
merkle_proof.sources, runs verify_quotes(), and updates the row only
if the verdict differs from what's stored. No LLM calls — this just
relabels existing answers under the current verifier.
Cold-source records (where any source doc has no hot chunks) are
skipped: we can't faithfully reclassify without the original context.
Run `arborist rehydrate` first if you want those covered too.
`--compare` runs all four entity policies side-by-side without
writing — use it to see what each policy would produce on real data
before committing to one. `--entity-policy X` writes under a single
policy.
Each changed record gets one 'providence_reclassify' audit event
with old & new state for chain-of-custody.
"""
import time
from collections import defaultdict
from arborist.qa.verify import (
DEFAULT_ENTITY_POLICY,
ENTITY_POLICIES,
verify_quotes,
)
qa_db = args.qa_db
if qa_db is None:
if args.global_shards_dir:
qa_db = Path(args.global_shards_dir) / "qa.db"
else:
qa_db = Path.home() / ".arborist" / "qa.db"
qa_db = Path(qa_db)
shards_dir = (
Path(args.global_shards_dir) if args.global_shards_dir else None
)
conn = connect(qa_db)
try:
sql = (
"SELECT cache_key, answer_text, merkle_proof, audit_mode, "
" verifier_method, n_quotes, n_verified, unverified_quotes, "
" question_text "
"FROM providence_cache "
"WHERE falsification_state = 'live' "
"ORDER BY created_at DESC"
)
if args.limit:
sql += f" LIMIT {int(args.limit)}"
rows = conn.execute(sql).fetchall()
if args.compare:
# Run all four policies side-by-side, no DB write. Output is a
# per-record grid + a per-policy distribution summary so fox can
# eyeball where the policies disagree.
grid = []
distribution: dict[str, dict[str, int]] = {
p: defaultdict(int) for p in ENTITY_POLICIES
}
skipped_cold = 0
for row in rows:
context = _load_record_context(row, shards_dir, qa_db)
if context is None:
skipped_cold += 1
continue
per_policy = {}
for p in ENTITY_POLICIES:
v = verify_quotes(row["answer_text"], context, entity_policy=p)
label = f"{v['audit_mode']}/{v['verifier_method']}"
per_policy[p] = label
distribution[p][label] += 1
grid.append({
"cache_key": row["cache_key"][:16] + "…",
"question": row["question_text"][:55],
**per_policy,
})
print(json.dumps({
"examined": len(grid),
"skipped_cold": skipped_cold,
"distribution": {p: dict(d) for p, d in distribution.items()},
"records": grid,
}, indent=2, ensure_ascii=False))
return 0
# Single-policy reclassify. Default tracks DEFAULT_ENTITY_POLICY
# so the CLI always matches the verifier's current contract.
policy_name = args.entity_policy or DEFAULT_ENTITY_POLICY
if policy_name not in ENTITY_POLICIES:
print(
f"--entity-policy must be one of {ENTITY_POLICIES}, "
f"got {policy_name!r}",
file=sys.stderr,
)
return 2
summary = {
"examined": 0,
"changed": 0,
"skipped_cold": 0,
"unchanged": 0,
"entity_policy": policy_name,
"transitions": defaultdict(int),
}
for row in rows:
summary["examined"] += 1
context = _load_record_context(row, shards_dir, qa_db)
if context is None:
summary["skipped_cold"] += 1
continue
verdict = verify_quotes(
row["answer_text"], context, entity_policy=policy_name
)
old_unverified = row["unverified_quotes"] or "null"
new_unverified_blob = (
json.dumps(verdict["unverified_quotes"], separators=(",", ":"))
if verdict["unverified_quotes"]
else None
)
new_unverified_for_compare = new_unverified_blob or "null"
unchanged = (
verdict["audit_mode"] == row["audit_mode"]
and verdict["verifier_method"] == row["verifier_method"]
and verdict["n_quotes"] == row["n_quotes"]
and verdict["n_verified"] == row["n_verified"]
and old_unverified == new_unverified_for_compare
)
if unchanged:
summary["unchanged"] += 1
continue
summary["changed"] += 1
transition = (
f"{row['audit_mode']}/{row['verifier_method']} "
f"-> {verdict['audit_mode']}/{verdict['verifier_method']}"
)
summary["transitions"][transition] += 1
if args.dry_run:
continue
now = int(time.time())
with transaction(conn):
event_hash = append_audit(
conn,
event_type="providence_reclassify",
subject_root=row["cache_key"],
body={
"old_audit_mode": row["audit_mode"],
"new_audit_mode": verdict["audit_mode"],
"old_method": row["verifier_method"],
"new_method": verdict["verifier_method"],
"old_n_verified": row["n_verified"],
"new_n_verified": verdict["n_verified"],
"entity_policy": policy_name,
},
ts=now,
)
conn.execute(
"UPDATE providence_cache SET "
" audit_mode = ?, n_quotes = ?, n_verified = ?, "
" unverified_quotes = ?, verifier_method = ?, "
" audit_event_hash = ? "
"WHERE cache_key = ?",
(
verdict["audit_mode"],
verdict["n_quotes"],
verdict["n_verified"],
new_unverified_blob,
verdict["verifier_method"],
event_hash,
row["cache_key"],
),
)
finally:
conn.close()
summary["transitions"] = dict(summary["transitions"])
summary["dry_run"] = bool(args.dry_run)
print(json.dumps(summary, indent=2, ensure_ascii=False))
return 0
def _cmd_emergent(args: argparse.Namespace) -> int:
"""Surface emergent claims from UNGROUNDED/HYBRID providence records.
These are spans the model produced that don't appear verbatim in the
corpus — candidate ingest targets. Frequent unverified quotes signal
knowledge the model has from training that our corpus is missing.
"""
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
if args.aggregate:
rows = conn.execute(
"SELECT unverified_quotes FROM providence_cache "
"WHERE audit_mode IN ('UNGROUNDED','HYBRID') "
" AND falsification_state = 'live' "
" AND unverified_quotes IS NOT NULL"
).fetchall()
counts: dict[str, int] = {}
for r in rows:
for q in json.loads(r["unverified_quotes"]):
counts[q] = counts.get(q, 0) + 1
ranked = sorted(counts.items(), key=lambda kv: -kv[1])[: args.limit]
print(json.dumps(
[{"quote": q, "count": c} for q, c in ranked],
indent=2, ensure_ascii=False
))
else:
rows = conn.execute(
"SELECT cache_key, audit_mode, verifier_method, question_text, "
" n_quotes, n_verified, unverified_quotes, created_at "
"FROM providence_cache "
"WHERE audit_mode IN ('UNGROUNDED','HYBRID') "
" AND falsification_state = 'live' "
"ORDER BY created_at DESC LIMIT ?",
(args.limit,),
).fetchall()
out = [
{
"cache_key": r["cache_key"],
"audit_mode": r["audit_mode"],
"verifier_method": r["verifier_method"],
"question": r["question_text"],
"n_quotes": r["n_quotes"],
"n_verified": r["n_verified"],
"unverified_quotes": (
json.loads(r["unverified_quotes"])
if r["unverified_quotes"]
else []
),
"created_at": r["created_at"],
}
for r in rows
]
print(json.dumps(out, indent=2, ensure_ascii=False))
finally:
conn.close()
return 0
def _cmd_evict(args: argparse.Namespace) -> int:
"""Demote surface chunks from hot to cold (NULL content). Cores never evict."""
from arborist.evict import evict_to_cold
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
result = evict_to_cold(
conn,
source_type=args.source_type,
older_than_days=args.older_than_days,
document_roots=args.document_root or None,
)
finally:
conn.close()
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0
def _cmd_rehydrate(args: argparse.Namespace) -> int:
"""Rehydrate cold chunks from source; non-zero exit if drift detected."""
from arborist.evict import rehydrate
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
if args.all_cold:
roots = [
r["document_root"]
for r in conn.execute(
"SELECT DISTINCT document_root FROM chunks WHERE tier = 'cold'"
).fetchall()
]
else:
roots = list(args.document_root or [])
if not roots:
print(
"rehydrate needs --document-root R or --all-cold",
file=sys.stderr,
)
return 2
results = []
for r in roots:
res = rehydrate(conn, r)
res["document_root"] = r
results.append(res)
finally:
conn.close()
print(json.dumps(results, indent=2, ensure_ascii=False))
drift = sum(1 for r in results if r.get("status") == "drift_detected")
return 1 if drift else 0
def _cmd_activity(args: argparse.Namespace) -> int:
"""Recent activity: Q&A records + freshly cached docs across all shards.
Designed for an agent to read before deciding the next action — what was
just asked, what was just integrated, what's the corpus state.
"""
import time as _time
from arborist.store import discover_shards
shard_paths: list[Path] = []
if args.global_shards_dir:
shard_paths = discover_shards(args.global_shards_dir)
else:
shard_paths = [Path(args.db)]
cutoff_ts = 0
if args.since_seconds:
cutoff_ts = int(_time.time()) - args.since_seconds
qa_records: list[dict] = []
ingest_events: list[dict] = []
derive_events: list[dict] = []
falsifications: list[dict] = []
corpus = {
"documents_total": 0,
"documents_surface": 0,
"documents_core": 0,
"providence_total": 0,
"providence_live": 0,
"providence_stale": 0,
"providence_failed": 0,
"audit_events_total": 0,
}
for sp in shard_paths:
c = connect(sp)
try:
corpus["documents_total"] += c.execute(
"SELECT COUNT(*) FROM documents"
).fetchone()[0]
corpus["documents_surface"] += c.execute(
"SELECT COUNT(*) FROM documents WHERE kind='surface'"
).fetchone()[0]
corpus["documents_core"] += c.execute(
"SELECT COUNT(*) FROM documents WHERE kind='core'"
).fetchone()[0]
corpus["providence_total"] += c.execute(
"SELECT COUNT(*) FROM providence_cache"
).fetchone()[0]
corpus["providence_live"] += c.execute(
"SELECT COUNT(*) FROM providence_cache WHERE falsification_state='live'"
).fetchone()[0]
corpus["providence_stale"] += c.execute(
"SELECT COUNT(*) FROM providence_cache WHERE falsification_state='stale'"
).fetchone()[0]
corpus["providence_failed"] += c.execute(
"SELECT COUNT(*) FROM providence_cache WHERE falsification_state='failed'"
).fetchone()[0]
corpus["audit_events_total"] += c.execute(
"SELECT COUNT(*) FROM audit_events"
).fetchone()[0]
# Q&A records
for r in c.execute(
"SELECT cache_key, question_text, answer_text, "
" falsification_state, hit_count, created_at, last_hit_at, "
" document_uri FROM providence_cache "
"WHERE created_at >= ? ORDER BY created_at DESC LIMIT ?",
(cutoff_ts, args.limit),
).fetchall():
ans = r["answer_text"] or ""
qa_records.append(
{
"ts": r["created_at"],
"shard": sp.name,
"cache_key": r["cache_key"],
"question": r["question_text"],
"answer_preview": (
ans if len(ans) <= args.preview_chars
else ans[: args.preview_chars] + "…"
),
"sources_uri": r["document_uri"],
"state": r["falsification_state"],
"hit_count": r["hit_count"],
"last_hit_at": r["last_hit_at"],
}
)
# Recent ingest events
for r in c.execute(
"SELECT subject_root, body, ts FROM audit_events "
"WHERE event_type = 'ingest' AND ts >= ? "
"ORDER BY ts DESC LIMIT ?",
(cutoff_ts, args.limit),
).fetchall():
body = json.loads(r["body"]) if r["body"] else {}
ingest_events.append(
{
"ts": r["ts"],
"shard": sp.name,
"document_root": r["subject_root"],
"document_uri": body.get("document_uri"),
"source_type": body.get("source_type"),
"chunks": body.get("chunks"),
"supersedes": body.get("supersedes"),
}
)
# Recent derive events (distillations)
for r in c.execute(
"SELECT subject_root, body, ts FROM audit_events "
"WHERE event_type = 'derive' AND ts >= ? "
"ORDER BY ts DESC LIMIT ?",
(cutoff_ts, args.limit),
).fetchall():
body = json.loads(r["body"]) if r["body"] else {}
derive_events.append(
{
"ts": r["ts"],
"shard": sp.name,
"core_root": r["subject_root"],
"src_root": body.get("src_root"),
"process_id": body.get("process_id"),
"compression_ratio": body.get("compression_ratio"),
"compression_depth": body.get("compression_depth"),
}
)
# Recent falsifications
for r in c.execute(
"SELECT cache_key, state, reason, by_actor, at FROM falsifications "
"WHERE at >= ? ORDER BY at DESC LIMIT ?",
(cutoff_ts, args.limit),
).fetchall():
falsifications.append(
{
"ts": r["at"],
"shard": sp.name,
"cache_key": r["cache_key"],
"state": r["state"],
"reason": r["reason"],
"by_actor": r["by_actor"],
}
)
finally:
c.close()
qa_records.sort(key=lambda x: -x["ts"])
ingest_events.sort(key=lambda x: -x["ts"])
derive_events.sort(key=lambda x: -x["ts"])
falsifications.sort(key=lambda x: -x["ts"])
print(
json.dumps(
{
"as_of": int(_time.time()),
"shards": [str(p) for p in shard_paths],
"corpus": corpus,
"recent_qa": qa_records[: args.limit],
"recent_ingests": ingest_events[: args.limit],
"recent_derives": derive_events[: args.limit],
"recent_falsifications": falsifications[: args.limit],
},
indent=2, ensure_ascii=False
)
)
return 0
def _cmd_canon(args: argparse.Namespace) -> int:
"""Direct π* canonicalization — no shards, no LLM, no audit chain.
Two modes:
``arborist canon --list`` — print registry contents.
``arborist canon <key> "<input>"`` — canonicalize and print.
"""
import hashlib
from arborist.pi_star import PiStarError, get, list_keys
if args.list:
from arborist.pi_star import REGISTRY
for key in sorted(list_keys()):
ps = REGISTRY[key]
print(f"{key}\t{ps.domain}")
return 0
if not args.key or args.input is None:
print(
"error: KEY and INPUT required (or --list to see registry)",
file=sys.stderr,
)
return 2
try:
ps = get(args.key)
except KeyError:
print(f"error: unknown π*: {args.key!r}", file=sys.stderr)
print(
f"available: {', '.join(sorted(list_keys()))}",
file=sys.stderr,
)
return 2
try:
canonical = ps.canonicalize(args.input.encode("utf-8"))
except PiStarError as exc:
print(f"error: {exc}", file=sys.stderr)
return 1
except NotImplementedError as exc:
print(f"error: {args.key} not yet implemented ({exc})", file=sys.stderr)
return 1
if args.json:
digest = hashlib.sha256(canonical).hexdigest()
print(json.dumps({
"pi_star_ref": args.key,
"input": args.input,
"canonical": canonical.decode("utf-8", errors="replace"),
"canonical_sha256": digest,
}, ensure_ascii=False))
else:
sys.stdout.buffer.write(canonical)
sys.stdout.buffer.write(b"\n")
return 0
def _cmd_stats(args: argparse.Namespace) -> int:
"""Counts: documents, chunks (by tier), edges, providence, audit events."""
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
result = stats(conn)
finally:
conn.close()
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0
def _check_audit_chain(conn: sqlite3.Connection) -> tuple[int, int]:
"""Return (events_checked, breaks) for one audit chain in `conn`."""
import hashlib
rows = conn.execute(
"SELECT seq, event_hash, prev_event_hash, body FROM audit_events ORDER BY seq"
).fetchall()
prev = None
breaks = 0
for r in rows:
h = hashlib.sha256()
if r["prev_event_hash"]:
h.update(bytes.fromhex(r["prev_event_hash"]))
h.update(r["body"].encode("utf-8"))
if h.hexdigest() != r["event_hash"]:
breaks += 1
if r["prev_event_hash"] != prev:
breaks += 1
prev = r["event_hash"]
return len(rows), breaks
def _cmd_analyze(args: argparse.Namespace) -> int:
"""Compression spectrum, depth distribution, audit chain integrity."""
from arborist.store import discover_shards
# In sharded mode, audit chains live per-shard (each shard has its own
# genesis -> latest). Check each independently and aggregate.
audit_summary: dict | None = None
if args.global_shards_dir:
per_shard_chain = []
total_events = 0
total_breaks = 0
for sp in discover_shards(args.global_shards_dir):
# Read-only: an audit-chain walk must not run migration DDL
# or take a write lock on the shard (see connect_readonly).
sc = connect_readonly(sp)
try:
ev, br = _check_audit_chain(sc)
finally:
sc.close()
per_shard_chain.append({"shard": sp.name, "events": ev, "breaks": br})
total_events += ev
total_breaks += br
audit_summary = {
"events": total_events,
"breaks": total_breaks,
"shards": per_shard_chain,
}
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
# Depth distribution.
depth = conn.execute(
"SELECT compression_depth, COUNT(*) AS n "
"FROM documents GROUP BY compression_depth ORDER BY 1"
).fetchall()
# Per-process compression ratios.
procs = conn.execute(
"SELECT json_extract(body, '$.process_id') AS process_id, "
" json_extract(body, '$.src_kind') AS src_kind, "
" AVG(CAST(json_extract(body, '$.compression_ratio') AS REAL)) AS mean_ratio, "
" MIN(CAST(json_extract(body, '$.compression_ratio') AS REAL)) AS min_ratio, "
" MAX(CAST(json_extract(body, '$.compression_ratio') AS REAL)) AS max_ratio, "
" COUNT(*) AS n_events "
"FROM audit_events WHERE event_type='derive' "
"GROUP BY process_id, src_kind"
).fetchall()
# Source/kind crosstab.
kinds = conn.execute(
"SELECT source_type, kind, COUNT(*) AS n "
"FROM documents GROUP BY source_type, kind ORDER BY 3 DESC"
).fetchall()
# Tier distribution.
tiers = conn.execute(
"SELECT tier, COUNT(*) AS n FROM chunks GROUP BY tier"
).fetchall()
# Top inbound link targets — the 'gravity wells' of the corpus,
# counted per resolved destination document (dst_root), which the
# idx_edges_dst_root partial index serves directly: an index-ordered
# scan -> streaming GROUP BY, bounded memory. (Grouping by the raw
# dst_uri link string instead has no index and hash-aggregates over
# every target including red links -> unbounded RSS at corpus scale.)
# Join documents only for the surviving top-N rows, for titles.
gravity = conn.execute(
"SELECT g.root AS root, g.inbound AS inbound, "
" d.document_uri AS uri, d.title AS title "
"FROM (SELECT dst_root AS root, COUNT(*) AS inbound "
" FROM edges "
" WHERE edge_type='wikilink' AND dst_root <> '' "
" GROUP BY dst_root ORDER BY inbound DESC LIMIT ?) g "
"LEFT JOIN documents d ON d.document_root = g.root "
"ORDER BY g.inbound DESC",
(args.gravity_top,),
).fetchall()
# Audit chain integrity (per-shard if sharded; single chain otherwise).
if audit_summary is None:
ev, br = _check_audit_chain(conn)
audit_summary = {"events": ev, "breaks": br}
report = {
"compression_depth_histogram": [
{"depth": r["compression_depth"], "count": r["n"]} for r in depth
],
"distillers": [
{
"process_id": r["process_id"],
"src_kind": r["src_kind"],
"n_events": r["n_events"],
"compression_ratio": {
"mean": (
round(r["mean_ratio"], 4)
if r["mean_ratio"] is not None
else None
),
"min": (
round(r["min_ratio"], 4)
if r["min_ratio"] is not None
else None
),
"max": (
round(r["max_ratio"], 4)
if r["max_ratio"] is not None
else None
),
},
}
for r in procs
],
"documents_by_source_kind": [
{"source_type": r["source_type"], "kind": r["kind"], "count": r["n"]}
for r in kinds
],
"chunks_by_tier": {r["tier"]: r["n"] for r in tiers},
"audit_chain": audit_summary,
"gravity_top_inbound": [
{
"root": r["root"],
"uri": r["uri"],
"title": r["title"],
"inbound": r["inbound"],
}
for r in gravity
],
}
finally:
conn.close()
print(json.dumps(report, indent=2, ensure_ascii=False))
return 0
def _cmd_snapshot_create(args: argparse.Namespace) -> int:
"""Compute snapshot_root over the read scope, persist into args.db.
Single-DB mode (--db only): read + write are the same connection;
delegate to the snapshot module's create_snapshot().
Cross-shard mode (--shards-dir + --db): read against the in-memory
UNION view to get the cluster-level Merkle root, then persist into
args.db (a dedicated snapshots store, conventionally
`~/.arborist/shards/snapshots.db`). The writer's own documents table
is irrelevant to the snapshot value — only the union scope counts.
"""
import time as _time
from arborist.snapshot import compute_snapshot_root, create_snapshot
if args.global_shards_dir is None:
conn = connect(args.db)
try:
result = create_snapshot(
conn, reason=args.reason, parent_snapshot=args.parent,
)
finally:
conn.close()
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0
# Cross-shard: compute against UNION, write to args.db.
read_conn = connect_query(args.db, shards_dir=args.global_shards_dir)
try:
snapshot_root, doc_count = compute_snapshot_root(read_conn)
finally:
read_conn.close()
write_conn = connect(args.db)
try:
parent = args.parent
if parent is None:
row = write_conn.execute(
"SELECT snapshot_root FROM snapshots ORDER BY taken_at DESC LIMIT 1"
).fetchone()
if row is not None:
parent = row["snapshot_root"]
now = int(_time.time())
body = {
"snapshot_root": snapshot_root,
"doc_count": doc_count,
"parent_snapshot": parent,
"reason": args.reason,
"scope": "shards-union",
}
audit_event_hash = append_audit(
write_conn,
event_type="snapshot_create",
body=body,
subject_root=snapshot_root,
ts=now,
)
with transaction(write_conn):
write_conn.execute(
"INSERT OR IGNORE INTO snapshots "
"(snapshot_root, taken_at, audit_event_hash, doc_count, "
" parent_snapshot, reason) VALUES (?, ?, ?, ?, ?, ?)",
(
snapshot_root,
now,
audit_event_hash,
doc_count,
parent,
args.reason,
),
)
finally:
write_conn.close()
print(
json.dumps(
{
"snapshot_root": snapshot_root,
"doc_count": doc_count,
"parent_snapshot": parent,
"audit_event_hash": audit_event_hash,
"taken_at": now,
"reason": args.reason,
"scope": "shards-union",
},
indent=2, ensure_ascii=False
)
)
return 0
def _cmd_snapshot_list(args: argparse.Namespace) -> int:
"""List recent corpus snapshots (newest first, --limit N)."""
from arborist.snapshot import list_snapshots
conn = connect(args.db)
try:
rows = list_snapshots(conn, limit=args.limit)
finally:
conn.close()
print(json.dumps(rows, indent=2, ensure_ascii=False))
return 0
def _cmd_snapshot_verify(args: argparse.Namespace) -> int:
"""Re-derive snapshot root from current corpus; non-zero exit on drift."""
from arborist.snapshot import verify_snapshot
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
result = verify_snapshot(conn, args.snapshot_root)
finally:
conn.close()
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0 if result["matches"] else 1
def _cmd_snapshot_diff(args: argparse.Namespace) -> int:
from arborist.snapshot import diff_against_current
conn = (
connect_query(args.db, shards_dir=args.global_shards_dir)
if args.global_shards_dir
else connect(args.db)
)
try:
result = diff_against_current(conn, args.snapshot_root)
finally:
conn.close()
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0
def _cmd_substrate_score(args: argparse.Namespace) -> int:
"""Compute the substrate ForkScore over (parent, child) bench-result JSON files."""
from arborist.substrate import (
bench_result_to_metrics,
fork_score,
)
from arborist.substrate.weights import DEFAULT_WEIGHTS, from_dict as weights_from_dict
with open(args.parent, "r", encoding="utf-8") as fh:
parent_payload = json.load(fh)
with open(args.child, "r", encoding="utf-8") as fh:
child_payload = json.load(fh)
if args.weights:
with open(args.weights, "r", encoding="utf-8") as fh:
weights = weights_from_dict(json.load(fh))
else:
weights = DEFAULT_WEIGHTS
parent_metrics = bench_result_to_metrics(parent_payload)
child_metrics = bench_result_to_metrics(child_payload)
scored = fork_score(
parent_metrics,
child_metrics,
weights=weights,
capital_delta=float(args.capital_delta),
selfmodel_calibration_gain=float(args.selfmodel_calibration_gain),
audit_completeness=float(args.audit_completeness),
validator_diversity=float(args.validator_diversity),
security_risk=float(args.security_risk),
complexity_delta=float(args.complexity_delta),
memory_invalidation_count=float(args.memory_invalidation_count),
)
artifact = scored.to_dict()
artifact_json = json.dumps(
artifact, indent=2, ensure_ascii=False, default=str
)
print(artifact_json)
# Phase 1b — also write to disk so CI / mesh peers / downstream
# graders can pick up the artifact without parsing stdout.
out_path = getattr(args, "out", None)
if out_path:
from pathlib import Path
p = Path(out_path)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(artifact_json + "\n", encoding="utf-8")
# Phase 1c — branch-set persistence (#000012). Default off:
# writes only when --branch-set is present. --branch-id falls
# back to --child-root so a single-flag CLI works for the common
# case (one branch per child-root identity).
branch_set_id = getattr(args, "branch_set", None)
if branch_set_id:
from arborist.substrate.fork_score import persist_branch_score
parent_root = getattr(args, "parent_root", None)
child_root = getattr(args, "child_root", None)
branch_id = getattr(args, "branch_id", None) or child_root
if not parent_root or not branch_id:
sys.stderr.write(
"--branch-set requires --parent-root + (--branch-id "
"or --child-root)\n"
)
return 2
persist_shard = getattr(args, "persist_shard", None) or args.db
weights_id = getattr(args, "weights_id", None) or (
"default" if not args.weights else Path(args.weights).stem
)
p_conn = connect(persist_shard)
try:
with transaction(p_conn):
persist_branch_score(
p_conn,
branch_set_id=branch_set_id,
branch_id=branch_id,
parent_root=parent_root,
child_root=child_root,
scored=scored,
weights_id=weights_id,
)
finally:
p_conn.close()
# Non-zero exit on REJECT so CI can gate on it.
return 0 if scored.verdict in ("ACCEPT", "MARGINAL") else 1
def _cmd_memory_snapshot(args: argparse.Namespace) -> int:
"""Build a memory snapshot from current store state and persist it."""
from arborist.memory import snapshot, store_snapshot
conn = connect(args.db)
try:
with transaction(conn):
ms = snapshot(conn)
root = store_snapshot(conn, ms)
finally:
conn.close()
print(json.dumps({"memory_root": root}, indent=2, ensure_ascii=False))
return 0
def _cmd_memory_show(args: argparse.Namespace) -> int:
"""Print a memory record by root, or the latest live one."""
from arborist.memory import branches_for, latest, load
conn = connect(args.db)
try:
if args.root:
row = load(conn, args.root)
else:
row = latest(conn)
if row is None:
print(json.dumps({"error": "no memory snapshot found"}, indent=2))
return 1
body = row["branch_summaries_blob"]
if isinstance(body, (bytes, bytearray)):
body = body.decode("utf-8", errors="replace")
out = {
"memory_root": row["memory_root"],
"state": row["state"],
"schema_version": row["schema_version"],
"parent_memory_root": row["parent_memory_root"],
"audit_events_high_water": row["audit_events_high_water"],
"audit_event_hash": row["audit_event_hash"],
"created_at": row["created_at"],
"falsified_at": row["falsified_at"],
"falsified_reason": row["falsified_reason"],
"body": body,
"branches": branches_for(conn, row["memory_root"]),
}
finally:
conn.close()
print(json.dumps(out, indent=2, ensure_ascii=False, default=str))
return 0
def _cmd_memory_branches(args: argparse.Namespace) -> int:
"""List branch summaries attached to a memory_root."""
from arborist.memory import branches_for, latest
conn = connect(args.db)
try:
if args.root:
root = args.root
else:
row = latest(conn)
if row is None:
print(json.dumps([], indent=2))
return 0
root = row["memory_root"]
out = branches_for(conn, root)
finally:
conn.close()
# Decode summary_blob to text for human inspection.
rendered = []
for b in out:
rec = dict(b)
if isinstance(rec["summary_blob"], (bytes, bytearray)):
rec["summary_blob"] = rec["summary_blob"].decode(
"utf-8", errors="replace"
)
rendered.append(rec)
print(json.dumps(rendered, indent=2, ensure_ascii=False, default=str))
return 0
def _cmd_memory_falsify(args: argparse.Namespace) -> int:
"""Mark a memory_root falsified."""
from arborist.memory import falsify
conn = connect(args.db)
try:
with transaction(conn):
event_hash = falsify(
conn,
args.root,
reason=args.reason,
triggering_branch_id=args.branch_id,
)
finally:
conn.close()
print(
json.dumps(
{
"memory_root": args.root,
"audit_event_hash": event_hash or None,
"noop": event_hash == "",
},
indent=2,
ensure_ascii=False,
)
)
return 0
def _cmd_capital_summary(args: argparse.Namespace) -> int:
"""Aggregate capital_ledger totals; per-form sums + row count."""
from arborist.capital import summary as capital_summary
conn = connect(args.db)
try:
out = capital_summary(conn, op_type=args.op_type, since=args.since)
finally:
conn.close()
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
def _cmd_capital_op_cost(args: argparse.Namespace) -> int:
"""Per-form totals for one op_type."""
from arborist.capital import op_cost
conn = connect(args.db)
try:
out = op_cost(conn, args.op_type)
finally:
conn.close()
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
def _cmd_capital_top(args: argparse.Namespace) -> int:
"""Top-N op_types by total contribution to a single capital form."""
from arborist.capital import top_by_form
conn = connect(args.db)
try:
out = top_by_form(conn, args.form, limit=args.limit)
finally:
conn.close()
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
def _cmd_selfmodel_snapshot(args: argparse.Namespace) -> int:
"""Build a SelfModel from current store state and persist it.
Reads the latest providence_cache row + audit_events to derive the
canonical fields (model_profile_hash, governance_policy_hash,
verifier_method_root, etc.) and writes one row to selfmodel_records
+ emits a selfmodel_snapshot_landed audit event.
"""
from arborist.selfmodel import snapshot, store_snapshot
conn = connect(args.db)
try:
with transaction(conn):
sm = snapshot(conn)
root = store_snapshot(conn, sm)
finally:
conn.close()
print(json.dumps({"selfmodel_root": root}, indent=2, ensure_ascii=False))
return 0
def _cmd_selfmodel_show(args: argparse.Namespace) -> int:
"""Print a SelfModel record by root, or the latest live one."""
from arborist.selfmodel import latest, load
from arborist.selfmodel.store import claims_for
conn = connect(args.db)
try:
if args.root:
row = load(conn, args.root)
else:
row = latest(conn)
if row is None:
print(json.dumps({"error": "no SelfModel found"}, indent=2))
return 1
body = row["body_blob"]
if isinstance(body, (bytes, bytearray)):
body = body.decode("utf-8", errors="replace")
out = {
"selfmodel_root": row["selfmodel_root"],
"state": row["state"],
"schema_version": row["schema_version"],
"parent_selfmodel_root": row["parent_selfmodel_root"],
"model_profile_hash": row["model_profile_hash"],
"verifier_method_root": row["verifier_method_root"],
"governance_policy_hash": row["governance_policy_hash"],
"canonicalization_version": row["canonicalization_version"],
"chunking_version": row["chunking_version"],
"memory_root": row["memory_root"],
"audit_event_hash": row["audit_event_hash"],
"created_at": row["created_at"],
"falsified_at": row["falsified_at"],
"falsified_reason": row["falsified_reason"],
"claims": claims_for(conn, row["selfmodel_root"]),
}
finally:
conn.close()
print(json.dumps(out, indent=2, ensure_ascii=False, default=str))
return 0
def _cmd_selfmodel_falsify(args: argparse.Namespace) -> int:
"""Mark a SelfModel falsified, citing a reason."""
from arborist.selfmodel import falsify
conn = connect(args.db)
try:
with transaction(conn):
event_hash = falsify(
conn,
args.root,
reason=args.reason,
triggering_claim_hash=args.claim_hash,
)
finally:
conn.close()
print(
json.dumps(
{
"selfmodel_root": args.root,
"audit_event_hash": event_hash or None,
"noop": event_hash == "",
},
indent=2,
ensure_ascii=False,
)
)
return 0
def _cmd_selfmodel_list(args: argparse.Namespace) -> int:
"""List recent SelfModel rows, newest first."""
conn = connect(args.db)
try:
rows = conn.execute(
"SELECT selfmodel_root, state, model_profile_hash,"
" governance_policy_hash, created_at,"
" falsified_at, falsified_reason"
" FROM selfmodel_records "
" ORDER BY created_at DESC LIMIT ?",
(args.limit,),
).fetchall()
finally:
conn.close()
print(
json.dumps(
[dict(r) for r in rows], indent=2, ensure_ascii=False, default=str
)
)
return 0
def _render_warrant_chain_tail(result: dict) -> str:
"""Compute a render-layer tail showing how many cited sources
have a `warrant-resolver-v1` derivations chain back to a primary-
source surface (ticket #000031 Phase 3).
Returns ``""`` when no shards_dir is on the result dict, no
chains exist, or the lookup fails — keeps the render path silent
in absence of data.
Returns ``" · warrant: 1 proven"`` (or higher count) when cited
sources have Merkle-bound bindings to textbook surface chunks.
The actual ladder upgrade — POINTER-LINKED → ANCHOR-WARRANTED →
EVIDENCE-WARRANTED based on warrant chains — is intentionally
NOT done here. This is a positive-signal render addition; the
underlying audit_mode + violations stay as the verifier
produced them.
Reads ``result["_shards_dir"]`` (stashed by ``_cmd_query`` before
render time). The render function takes only ``(result, question)``
so the shards_dir threads through the result dict.
"""
shards_dir = result.get("_shards_dir")
if not shards_dir:
return ""
sources = result.get("sources") or []
if not sources:
return ""
try:
from arborist.qa.warrant_resolver import warrant_chains_for_sources
chains = warrant_chains_for_sources(sources, shards_dir)
except Exception:
return ""
if not chains:
return ""
n = len(chains)
label = "warrant" if n == 1 else "warrants"
return f" · {label}: {n} proven"
def _cmd_warrant_status(args: argparse.Namespace) -> int:
"""Read-only — show what surface chunks the citation resolver
finds for each claim-pack record. JSON output. No DB writes.
See ``arborist.qa.warrant_resolver`` for ticket #000031 Phase 2.
"""
from arborist.qa.warrant_resolver import warrant_status
shards_dir = args.global_shards_dir or args.shards_dir
if not shards_dir:
print("--shards-dir is required for warrant-status", file=sys.stderr)
return 2
results = warrant_status(shards_dir, limit=args.limit)
out = []
for r in results:
out.append(
{
"record_root": r.record_root,
"record_title": r.record_title,
"source_reference": r.source_reference,
"citations": [
{
"title": c.title,
"authors": list(c.authors),
"year": c.year,
"section": c.section,
}
for c in r.citations
],
"matches": [
{
"shard": m.shard_path,
"document_root": m.document_root,
"document_title": m.document_title,
"chunk_id": m.chunk_id,
"score": m.score,
"snippet": m.snippet,
}
for m in r.matches
],
"has_derivation": r.has_derivation,
}
)
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
def _cmd_warrant_resolve(args: argparse.Namespace) -> int:
"""Run the warrant resolver across all claim-pack records.
Default dry-run (just summary counts); ``--write`` flag computes
Merkle inclusion proofs for top matches and writes
``derivations`` rows. Idempotent at the database layer.
"""
from arborist.qa.warrant_resolver import warrant_resolve
shards_dir = args.global_shards_dir or args.shards_dir
if not shards_dir:
print("--shards-dir is required for warrant-resolve", file=sys.stderr)
return 2
summary = warrant_resolve(
shards_dir,
write=args.write,
limit=args.limit,
use_aliases=getattr(args, "use_aliases", False),
)
print(json.dumps(summary, indent=2))
return 0
def _aliases_db_path(args: argparse.Namespace) -> str | None:
"""Resolve the aliases DB path. Default: shard 000 of the shards
cluster (alongside claim-pack records). Operators can override
via --aliases-db."""
explicit = getattr(args, "aliases_db", None)
if explicit:
return str(explicit)
shards_dir = args.global_shards_dir or getattr(args, "shards_dir", None)
if not shards_dir:
return None
return str(Path(shards_dir) / "000.db")
def _cmd_alias_citation_add(args: argparse.Namespace) -> int:
"""Add a citation alias. Refuses without --by; audit-discipline
fail-closed at the API surface."""
from arborist.qa.aliases import add_citation_alias
db = _aliases_db_path(args)
if not db:
print("--shards-dir or --aliases-db is required", file=sys.stderr)
return 2
try:
inserted = add_citation_alias(
db,
original_ref=args.original,
substitute_ref=args.substitute,
substitute_authors=args.author or [],
substitute_title=args.title or "",
decision_by=args.by,
decision_rationale=args.rationale or "",
)
except ValueError as exc:
print(f"alias add failed: {exc}", file=sys.stderr)
return 2
print(json.dumps({"inserted": inserted, "db": db}, indent=2))
return 0
def _cmd_alias_citation_list(args: argparse.Namespace) -> int:
from arborist.qa.aliases import list_citation_aliases
db = _aliases_db_path(args)
if not db:
print("--shards-dir or --aliases-db is required", file=sys.stderr)
return 2
aliases = list_citation_aliases(db, original_filter=args.filter)
print(
json.dumps(
[
{
"original_ref": a.original_ref,
"substitute_ref": a.substitute_ref,
"substitute_authors": list(a.substitute_authors),
"substitute_title": a.substitute_title,
"decision_at": a.decision_at,
"decision_by": a.decision_by,
"decision_rationale": a.decision_rationale,
}
for a in aliases
],
indent=2,
ensure_ascii=False,
)
)
return 0
def _cmd_alias_citation_remove(args: argparse.Namespace) -> int:
from arborist.qa.aliases import remove_citation_alias
db = _aliases_db_path(args)
if not db:
print("--shards-dir or --aliases-db is required", file=sys.stderr)
return 2
removed = remove_citation_alias(db, args.original, args.substitute)
print(json.dumps({"removed": removed, "db": db}, indent=2))
return 0
def _cmd_alias_term_add(args: argparse.Namespace) -> int:
"""Add a term alias. Refuses without --by; audit-discipline
fail-closed at the API surface."""
from arborist.qa.aliases import add_term_alias
db = _aliases_db_path(args)
if not db:
print("--shards-dir or --aliases-db is required", file=sys.stderr)
return 2
try:
inserted = add_term_alias(
db,
term=args.term,
alternate_term=args.alternate,
domain=args.domain,
decision_by=args.by,
decision_rationale=args.rationale or "",
)
except ValueError as exc:
print(f"alias add failed: {exc}", file=sys.stderr)
return 2
print(json.dumps({"inserted": inserted, "db": db}, indent=2))
return 0
def _cmd_alias_term_list(args: argparse.Namespace) -> int:
from arborist.qa.aliases import list_term_aliases
db = _aliases_db_path(args)
if not db:
print("--shards-dir or --aliases-db is required", file=sys.stderr)
return 2
aliases = list_term_aliases(db, domain=args.domain, term_filter=args.filter)
print(
json.dumps(
[
{
"term": a.term,
"alternate_term": a.alternate_term,
"domain": a.domain,
"decision_at": a.decision_at,
"decision_by": a.decision_by,
"decision_rationale": a.decision_rationale,
}
for a in aliases
],
indent=2,
ensure_ascii=False,
)
)
return 0
def _cmd_alias_term_remove(args: argparse.Namespace) -> int:
from arborist.qa.aliases import remove_term_alias
db = _aliases_db_path(args)
if not db:
print("--shards-dir or --aliases-db is required", file=sys.stderr)
return 2
removed = remove_term_alias(db, args.term, args.alternate, args.domain)
print(json.dumps({"removed": removed, "db": db}, indent=2))
return 0
def _cmd_sweep(args: argparse.Namespace) -> int:
"""Unconscious sweep — drain the meta-cognition backlog.
Implements the warrant-resolver-only fragment of #000037 §3.1
Target B (documents that bypassed meta-cognition at ingest
time). The full bicameral sweep (canonical-projection probe
/ freshness probe / document-content witness fan-out) is
gated on the §12 phase trigger and a small schema migration
(`documents.last_swept_at`); `--target warrants` here is
the no-schema-change increment available today.
Idempotent: re-running on the same shards is a no-op at the
database layer (PK collision on
`(core_root, src_root, process_id)` per the existing
`derivations` table). Operators can run on a cron / systemd
timer without worrying about row proliferation.
"""
from arborist.qa.warrant_resolver import warrant_resolve
shards_dir = args.global_shards_dir or args.shards_dir
if not shards_dir:
print("--shards-dir is required for sweep", file=sys.stderr)
return 2
if args.target == "warrants":
summary = warrant_resolve(
shards_dir,
write=args.write,
limit=args.limit,
use_aliases=getattr(args, "use_aliases", False),
)
summary["target"] = "warrants"
summary["mode"] = "write" if args.write else "dry-run"
print(json.dumps(summary, indent=2))
return 0
if args.target == "all":
# Full bicameral sweep awaits #000037 §12 phase trigger
# (canonical-projection probe + freshness probe + document-
# content witness fan-out + documents.last_swept_at schema
# migration). Today this is a no-op with an honest message.
print(
json.dumps(
{
"target": "all",
"status": "deferred",
"reason": (
"Full bicameral sweep requires #000037 §12 "
"phase trigger + documents.last_swept_at "
"migration. Use --target warrants for the "
"available increment today."
),
},
indent=2,
)
)
return 0
print(f"unknown sweep target: {args.target}", file=sys.stderr)
return 2
def _cmd_mesh_status(args: argparse.Namespace) -> int:
"""Show mesh state: enabled flag, identity, current epoch, roster."""
from arborist.mesh import current_epoch, is_enabled, load_identity
from arborist.mesh.state import roster_at
conn = connect(args.db)
try:
ident = load_identity(conn)
epoch = current_epoch(conn)
roster = roster_at(conn, epoch) if epoch is not None else []
out = {
"enabled": is_enabled(conn),
"identity": (
{
"member_id": ident.member_id,
"group_name": ident.group_name,
"sign_pub_hex": ident.sign_pub.hex(),
"dh_pub_hex": ident.dh_pub.hex(),
"created_at": ident.created_at,
}
if ident
else None
),
"current_epoch": epoch,
"roster": [
{
"member_id": m.member_id,
"role": m.role,
"sign_pub_hex": m.sign_pub.hex(),
"dh_pub_hex": m.dh_pub.hex(),
}
for m in roster
],
}
finally:
conn.close()
print(json.dumps(out, indent=2, ensure_ascii=False))
return 0
def _cmd_mesh_init(args: argparse.Namespace) -> int:
from arborist.mesh import init_identity
conn = connect(args.db)
try:
ident = init_identity(conn, group_name=args.group, member_id=args.member_id)
except RuntimeError as e:
print(f"error: {e}", file=sys.stderr)
conn.close()
return 2
finally:
conn.close()
print(
json.dumps(
{
"member_id": ident.member_id,
"group_name": ident.group_name,
"sign_pub_hex": ident.sign_pub.hex(),
"dh_pub_hex": ident.dh_pub.hex(),
"note": "share sign_pub_hex + dh_pub_hex with the founder of any group "
"you want to join. Run 'mesh enable' to flip the gating flag on.",
},
indent=2, ensure_ascii=False
)
)
return 0
def _cmd_mesh_enable(args: argparse.Namespace) -> int:
from arborist.mesh import set_enabled
conn = connect(args.db)
try:
set_enabled(conn, True)
finally:
conn.close()
print(json.dumps({"enabled": True}, indent=2, ensure_ascii=False))
return 0
def _cmd_mesh_disable(args: argparse.Namespace) -> int:
from arborist.mesh import set_enabled
conn = connect(args.db)
try:
set_enabled(conn, False)
finally:
conn.close()
print(json.dumps({"enabled": False}, indent=2, ensure_ascii=False))
return 0
def _cmd_mesh_members(args: argparse.Namespace) -> int:
from arborist.mesh import current_epoch
from arborist.mesh.state import roster_at
conn = connect(args.db)
try:
epoch = current_epoch(conn)
if epoch is None:
print(json.dumps({"error": "mesh not initialized"}, indent=2, ensure_ascii=False))
return 2
roster = roster_at(conn, epoch)
finally:
conn.close()
print(
json.dumps(
{
"epoch": epoch,
"members": [
{
"member_id": m.member_id,
"role": m.role,
"sign_pub_hex": m.sign_pub.hex(),
"dh_pub_hex": m.dh_pub.hex(),
}
for m in roster
],
},
indent=2, ensure_ascii=False
)
)
return 0
def _cmd_mesh_add(args: argparse.Namespace) -> int:
from arborist.mesh.members import add_member
try:
sign_pub = bytes.fromhex(args.sign_pub)
dh_pub = bytes.fromhex(args.dh_pub)
except ValueError:
print("error: --sign-pub and --dh-pub must be hex-encoded 32-byte keys", file=sys.stderr)
return 2
if len(sign_pub) != 32 or len(dh_pub) != 32:
print("error: keys must decode to exactly 32 bytes", file=sys.stderr)
return 2
conn = connect(args.db)
try:
epoch = add_member(
conn,
member_id=args.member_id,
sign_pub=sign_pub,
dh_pub=dh_pub,
role=args.role,
)
except (PermissionError, RuntimeError, ValueError) as e:
print(f"error: {e}", file=sys.stderr)
return 2
finally:
conn.close()
print(json.dumps({"new_epoch": epoch, "added": args.member_id}, indent=2, ensure_ascii=False))
return 0
def _cmd_mesh_kick(args: argparse.Namespace) -> int:
from arborist.mesh.members import kick_member
conn = connect(args.db)
try:
epoch = kick_member(conn, member_id=args.member_id, reason=args.reason)
except (PermissionError, RuntimeError, ValueError) as e:
print(f"error: {e}", file=sys.stderr)
return 2
finally:
conn.close()
print(
json.dumps(
{"new_epoch": epoch, "kicked": args.member_id, "reason": args.reason},
indent=2, ensure_ascii=False
)
)
return 0
def _cmd_mesh_rotate(args: argparse.Namespace) -> int:
from arborist.mesh.members import scheduled_rotate
conn = connect(args.db)
try:
epoch = scheduled_rotate(conn, reason=args.reason)
except (PermissionError, RuntimeError) as e:
print(f"error: {e}", file=sys.stderr)
return 2
finally:
conn.close()
print(json.dumps({"new_epoch": epoch, "reason": args.reason}, indent=2, ensure_ascii=False))
return 0
def _cmd_mesh_serve(args: argparse.Namespace) -> int:
"""Run the HTTP gossip server until SIGINT."""
from arborist.mesh import is_enabled, load_identity
from arborist.mesh.wire import MeshWireServer
conn = connect(args.db)
try:
if load_identity(conn) is None:
print("error: mesh not initialized; run 'arborist mesh init' first", file=sys.stderr)
return 2
if not is_enabled(conn):
print("error: mesh.enabled is off; run 'arborist mesh enable' first", file=sys.stderr)
return 2
finally:
conn.close()
srv = MeshWireServer(args.db, host=args.host, port=args.port)
print(json.dumps({"status": "serving", "url": srv.url, "db": str(args.db)}, ensure_ascii=False))
sys.stdout.flush()
try:
srv.serve()
except KeyboardInterrupt:
print(json.dumps({"status": "stopped", "reason": "SIGINT"}, ensure_ascii=False))
finally:
srv.stop()
return 0
def _cmd_mesh_sync(args: argparse.Namespace) -> int:
"""Push local roots + falsifications to a peer.
Two pushes happen by default (unless ``--no-roots`` /
``--no-falsifications`` opts one out):
1. **ANNOUNCE_ROOT** for the most-recent ``--limit`` documents
older than the kindergarten window. Receivers dedup by
``documents.document_root``.
2. **ANNOUNCE_FALSIFICATION** for the most-recent ``--limit``
falsifications older than the kindergarten window. Burns
deliberately NOT propagated — local kindergarten cleanup.
**Kindergarten window.** Records younger than
``--kindergarten-seconds`` (default 3600 = 1 hour) are NOT
broadcast. Gives the operator time to inspect a fresh ingest or
falsification & burn it before the network sees it. Override per
invocation; ``--kindergarten-seconds 0`` broadcasts everything
(cron-friendly opt-out for operators who prefer immediate
propagation). The window is sender-side discipline; receivers
don't enforce it because they have no view into when the sender
created the record.
Receivers verify the Ed25519 signature, run per-peer chain-of-
claims fork detection, then write one ``mesh_received`` audit
event per accepted envelope. Duplicate broadcasts produce
duplicate audit-log entries on the receiver but no state
corruption.
"""
import time as _time
from arborist.mesh import is_enabled, load_identity
from arborist.mesh.wire import MeshWireClient
now_ts = int(_time.time())
cutoff_ts = now_ts - max(0, args.kindergarten_seconds)
conn = connect(args.db)
try:
if load_identity(conn) is None:
print("error: mesh not initialized", file=sys.stderr)
return 2
if not is_enabled(conn):
print("error: mesh.enabled is off", file=sys.stderr)
return 2
# Total counts inform skipped-by-kindergarten reporting.
total_roots = 0
total_falsifications = 0
root_rows: list = []
falsification_rows: list = []
if not args.no_roots:
total_roots = conn.execute(
"SELECT COUNT(*) FROM documents"
).fetchone()[0]
root_rows = conn.execute(
"SELECT document_root, document_uri, chunking_version, "
" canonicalization_version, schema_version "
"FROM documents WHERE ingest_ts <= ? "
"ORDER BY rowid DESC LIMIT ?",
(cutoff_ts, args.limit),
).fetchall()
if not args.no_falsifications:
total_falsifications = conn.execute(
"SELECT COUNT(*) FROM falsifications"
).fetchone()[0]
falsification_rows = conn.execute(
"SELECT cache_key, reason FROM falsifications "
"WHERE at <= ? "
"ORDER BY at DESC LIMIT ?",
(cutoff_ts, args.limit),
).fetchall()
finally:
conn.close()
# Skipped-by-kindergarten = (rows younger than cutoff that would have
# been in the most-recent --limit) — approximated by total minus what
# we pulled, capped at limit.
fresh_roots_held = max(
0,
min(total_roots, args.limit) - len(root_rows),
) if not args.no_roots else 0
fresh_falsifications_held = max(
0,
min(total_falsifications, args.limit) - len(falsification_rows),
) if not args.no_falsifications else 0
sent_roots: list[dict] = []
sent_falsifications: list[dict] = []
errors: list[dict] = []
with MeshWireClient(args.db, args.peer) as client:
try:
peer_info = client.info()
except Exception as e:
print(json.dumps({"status": "peer_unreachable", "peer": args.peer, "error": str(e)}, indent=2, ensure_ascii=False))
return 2
for r in root_rows:
try:
resp = client.announce_root(
document_root=r["document_root"],
source_uri=r["document_uri"],
chunking_version=r["chunking_version"],
canonicalization_version=r["canonicalization_version"],
schema_version=r["schema_version"],
)
sent_roots.append({"document_root": r["document_root"], "ack": resp})
except Exception as e:
errors.append({"document_root": r["document_root"], "error": str(e)})
for f in falsification_rows:
try:
resp = client.announce_falsification(
cache_key=f["cache_key"],
reason=f["reason"] or "",
)
sent_falsifications.append({"cache_key": f["cache_key"], "ack": resp})
except Exception as e:
errors.append({"cache_key": f["cache_key"], "error": str(e)})
print(json.dumps({
"status": "synced",
"peer": args.peer,
"peer_member_id": peer_info.get("member_id"),
"peer_epoch": peer_info.get("current_epoch"),
"kindergarten_seconds": args.kindergarten_seconds,
"announced_roots": len(sent_roots),
"announced_falsifications": len(sent_falsifications),
"kindergarten_held_roots": fresh_roots_held,
"kindergarten_held_falsifications": fresh_falsifications_held,
"errors": len(errors),
"sent_roots": sent_roots[: args.verbose],
"sent_falsifications": sent_falsifications[: args.verbose],
"error_samples": errors[:5],
}, indent=2, ensure_ascii=False))
return 0 if not errors else 1
def _cmd_mesh_pull(args: argparse.Namespace) -> int:
"""Pull a single document body from a peer by document_root.
Closes the request half of the gossip loop. The wire client already
verifies the peer's signature and re-derives the Merkle root from the
delivered leaves before returning. This verb then re-ingests the
delivered text through the standard ingest path so chunking_version /
canonicalization_version stay consistent — and rejects with rc=2 if
the local re-ingest produces a different document_root than requested.
"""
from arborist.document import Document
from arborist.ingest import ingest_source
from arborist.mesh import is_enabled, load_identity
from arborist.mesh.wire import MeshWireClient
conn = connect(args.db)
try:
if load_identity(conn) is None:
print("error: mesh not initialized", file=sys.stderr)
return 2
if not is_enabled(conn):
print("error: mesh.enabled is off", file=sys.stderr)
return 2
already = conn.execute(
"SELECT document_root, document_uri FROM documents WHERE document_root=?",
(args.root,),
).fetchone()
finally:
conn.close()
if already is not None:
print(json.dumps({
"status": "already_present",
"document_root": already["document_root"],
"document_uri": already["document_uri"],
"shard": str(args.db),
}, indent=2, ensure_ascii=False))
return 0
try:
with MeshWireClient(args.db, args.peer) as client:
body = client.request_body(root=args.root)
except Exception as e:
print(f"error: pull failed: {e}", file=sys.stderr)
return 2
delivered_uri = body.get("document_uri") or ""
delivered_text = body.get("text") or ""
class _PulledSource:
source_type = "mesh_pull"
def iter_documents(self):
yield Document(
uri=delivered_uri,
content=delivered_text,
source_type="mesh_pull",
title=None,
)
conn = connect(args.db)
try:
ingest_source(conn, _PulledSource())
row = conn.execute(
"SELECT document_root FROM documents WHERE document_root=?",
(args.root,),
).fetchone()
if row is None:
# Re-ingest produced a different root than the peer claimed.
# The pulled text doesn't reproduce the requested root under
# this peer's chunker/canonicalization. Fail closed.
actual = conn.execute(
"SELECT document_root FROM documents WHERE document_uri=? "
"ORDER BY ingest_ts DESC LIMIT 1",
(delivered_uri,),
).fetchone()
actual_root = actual["document_root"] if actual else None
print(
"error: local re-ingest produced "
f"{actual_root!r}, expected {args.root!r}",
file=sys.stderr,
)
return 2
with transaction(conn):
event_hash = append_audit(
conn,
event_type="mesh_pulled",
body={
"document_root": args.root,
"document_uri": delivered_uri,
"peer": args.peer,
},
subject_root=args.root,
)
finally:
conn.close()
print(json.dumps({
"status": "pulled",
"document_root": args.root,
"document_uri": delivered_uri,
"shard": str(args.db),
"audit_event_hash": event_hash,
}, indent=2, ensure_ascii=False))
return 0
def _cmd_crawl(args: argparse.Namespace) -> int:
"""BFS-discover same-domain URLs from a seed and optionally ingest.
Two modes:
- default: print discovered URLs to stdout (one per line). Compose
with `arborist ingest --source html` if you want to feed them
through the standard ingest path manually.
- ``--ingest``: run the discovery + ingest path in a single shot,
capturing ETag + Last-Modified per page so a future
``crawler recrawl-check`` can do conditional HEADs.
"""
try:
from arborist.sources.crawler.bridge import crawl_seed, ingest_crawled
except ImportError as e:
print(f"error: {e}", file=sys.stderr)
return 2
from arborist.progress import Progress
cap = "no cap" if args.max_pages == 0 else f"max {args.max_pages}"
speed = "fast" if args.fast else "polite"
print(
f" crawl: seed={args.seed_url} depth={args.depth} {cap} ({speed})",
file=sys.stderr,
flush=True,
)
crawl_progress = Progress(prefix="crawl ")
urls = crawl_seed(
args.seed_url,
max_depth=args.depth,
max_pages=args.max_pages,
progress=crawl_progress,
fast=args.fast,
)
print(
f" crawl: discovery done — {len(urls)} URLs",
file=sys.stderr,
flush=True,
)
if not args.ingest:
for u in urls:
print(u)
return 0
print(
f" ingest: starting on {len(urls)} URLs",
file=sys.stderr,
flush=True,
)
ingest_progress = Progress(prefix="ingest ", total_estimate=len(urls))
conn = connect(args.db)
try:
result = ingest_crawled(
conn,
urls,
progress=ingest_progress,
default_author=getattr(args, "author", None),
)
finally:
conn.close()
print(json.dumps(
{
"status": "crawled_and_ingested",
"seed": args.seed_url,
"depth": args.depth,
"max_pages": args.max_pages,
"discovered": len(urls),
**result,
},
indent=2, ensure_ascii=False
))
return 0
def _cmd_crawler_recrawl_check(args: argparse.Namespace) -> int:
"""Send conditional HEAD requests for ingested documents.
Reports each as fresh (304), stale (200, body changed), gone
(404/410), or unreachable. Updates `document_http_meta.last_status`
and `last_checked_at` so consecutive runs target the oldest checks
first.
"""
try:
from arborist.sources.crawler.bridge import recrawl_check
except ImportError as e:
print(f"error: {e}", file=sys.stderr)
return 2
conn = connect(args.db)
try:
result = recrawl_check(
conn,
domain=args.domain,
limit=args.limit,
)
finally:
conn.close()
print(json.dumps(result, indent=2, ensure_ascii=False))
return 0
[docs]
def build_parser() -> argparse.ArgumentParser:
p = argparse.ArgumentParser(
prog="arborist",
description="An arborist for trees and forests of cross-linked information.",
)
p.add_argument("--version", action="version", version=f"arborist {__version__}")
p.add_argument(
"--db",
type=Path,
default=DEFAULT_DB_PATH,
help=f"path to arborist SQLite db (default: {DEFAULT_DB_PATH})",
)
p.add_argument(
"--shards-dir",
dest="global_shards_dir",
default=None,
help=(
"for read commands: attach all shards in this directory and "
"expose them as UNION views over the standard tables"
),
)
sub = p.add_subparsers(dest="cmd", required=True)
# Optional sqlite-vec backend (#000039): only surface --embed /
# `arborist embed` / `search --backend vec` when the [vec] extra
# is installed.
try:
from arborist.search import VEC_AVAILABLE as _vec_ok
except Exception: # pragma: no cover
_vec_ok = False
ingest = sub.add_parser("ingest", help="ingest documents from a source")
ingest.add_argument(
"--source",
required=True,
choices=[
"wikipedia_cur",
"wikipedia_old",
"wikipedia_xml",
"wikipedia_xml_history",
"wikipedia_abstract",
"html",
"grok_export",
"grok_media",
"git_repo",
"hg_repo",
"providence",
"claim_pack",
"textbook_tex",
],
help="source type",
)
ingest.add_argument(
"--kindergarten-seconds",
type=int,
default=None,
help=(
"(providence source only) records younger than this many "
"seconds stay opaque to ingestion — fresh thoughts cool "
"before they become substrate. Default 3600s (1h)."
),
)
ingest.add_argument(
"--path",
help=(
"path to dump file (wikipedia) or to xAI export root / "
"prod-grok-backend.json (grok_export, grok_media)"
),
)
ingest.add_argument(
"--url", action="append", help="URL to ingest (html source; repeatable)"
)
ingest.add_argument(
"--bundle",
action="append",
help=(
"JSON bundle path (claim_pack source; repeatable). Provide axiom "
"and theorem bundles together so cross-bundle pillar_reference "
"edges resolve to specific record URIs."
),
)
ingest.add_argument(
"--urls-from",
dest="urls_from",
help="file with one URL per line (html source)",
)
ingest.add_argument(
"--no-robots",
dest="no_robots",
action="store_true",
help="do not consult robots.txt (use only for explicitly authorized sites)",
)
ingest.add_argument(
"--author",
default=None,
help=(
"default author surname for the html / textbook_tex sources "
"(#000031 Phase 1 follow-up). Appended to document title as "
"'<title>, by <author>' so the warrant resolver's "
"_shard_matches_citation heuristic finds the surname in "
"the title haystack. Wikisource and PG HTML pages rarely "
"include author in <title>; the manifest carries it instead."
),
)
ingest.add_argument(
"--chunker", default=None, help="chunker name (default: tok-512-v1)"
)
ingest.add_argument(
"--limit", type=int, default=None, help="cap number of documents"
)
ingest.add_argument(
"--batch-size",
dest="batch_size",
type=int,
default=200,
help="documents per SQLite transaction (default 200)",
)
ingest.add_argument(
"--shard",
default=None,
help=(
"rank/total — yield only every N-th doc for parallel ingest. "
"spawn N processes, each with --shard 0/N, 1/N, ... they "
"parallelize parser CPU and serialize writes via WAL"
),
)
ingest.add_argument(
"--shards-dir",
dest="shards_dir",
default=None,
help=(
"directory for attach-forever sharding. With --shard rank/total, "
"writes to shards-dir/<rank>.db instead of --db, removing the "
"WAL writer-lock contention entirely. Reads via arborist --shards-dir"
),
)
ingest.add_argument(
"--resume",
action="store_true",
help=(
"rsync-style: read each source's last high-water mark from this "
"DB's meta table and skip rows whose id is <= it. Safe to kill "
"and restart at any time"
),
)
ingest.add_argument(
"--quiet",
action="store_true",
help="suppress periodic stderr progress output",
)
ingest.add_argument(
"--progress-interval",
dest="progress_interval",
type=float,
default=2.0,
help="seconds between stderr progress lines (default 2.0)",
)
ingest.add_argument(
"--total-estimate",
dest="total_estimate",
type=int,
default=None,
help=(
"estimated total docs the source will yield. enables percent "
"+ ETA in progress output"
),
)
ingest.add_argument(
"--no-loss-report",
dest="no_loss_report",
action="store_true",
help=(
"disable adapter LossReport sidecar (ticket #000022). "
"Default: enabled. Sidecar — toggling does NOT invalidate "
"QA cache_keys"
),
)
ingest.add_argument(
"--no-loss-excerpts",
dest="no_loss_excerpts",
action="store_true",
help=(
"drop sample_excerpt content from LossReport rows "
"(PII-paranoid mode). sample_hash stays populated"
),
)
ingest.add_argument(
"--loss-excerpt-bytes",
dest="loss_excerpt_bytes",
type=int,
default=200,
help="max excerpt byte length (default 200)",
)
if _vec_ok:
ingest.add_argument(
"--embed", action="store_true",
help=(
"after ingest, embed the chunks this run added (incremental "
"chunk_vecs population — #000039; default ingest does NOT "
"embed, the lazy `arborist embed` pass / cron / Prometheus-Σ "
"sweep is the usual path)"
),
)
ingest.set_defaults(func=_cmd_ingest)
search = sub.add_parser("search", help="keyword/semantic search (UNGROUNDED audit mode)")
search.add_argument("query", help="query string")
search.add_argument("--limit", type=int, default=20)
search.add_argument("--json", action="store_true", help="output JSON")
_search_backends = ["fts5"] + (["vec"] if _vec_ok else [])
search.add_argument(
"--backend", choices=_search_backends, default="fts5",
help=(
"retrieval backend: 'fts5' (BM25 lexical, default) or 'vec' "
"(semantic ANN over chunk_vecs — requires [vec] extra + a populated "
"shard via `arborist embed`)" if "vec" in _search_backends
else "retrieval backend: 'fts5' (BM25 lexical; 'vec' needs the [vec] extra)"
),
)
search.set_defaults(func=_cmd_search)
if "vec" in _search_backends:
embed_cmd = sub.add_parser(
"embed",
help="populate chunk_vecs (semantic embeddings) for --db [vec extra]",
)
embed_cmd.add_argument(
"--limit", type=int, default=None,
help="cap the number of chunks embedded (smoke-test on a real shard)",
)
embed_cmd.add_argument("--batch-size", type=int, default=256)
embed_cmd.add_argument(
"--quant", choices=["float32", "int8"], default="float32",
help=(
"vector quantization: float32 (default — +25%% corpus tax, max "
"fidelity) or int8 (~4x smaller, +6%% tax, ~1-3%% recall hit — "
"the recommended production default; switching quant on an "
"existing chunk_vecs requires --rebuild)"
),
)
embed_cmd.add_argument(
"--rebuild", action="store_true",
help=(
"DROP + recreate chunk_vecs (at --quant), then full re-embed — "
"the clean version-bump path and the only way to switch quant "
"(default is incremental: embed only chunks not already in "
"chunk_vecs)"
),
)
embed_cmd.set_defaults(func=_cmd_embed)
verify = sub.add_parser(
"verify", help="round-trip Merkle proofs for N random documents"
)
verify.add_argument("-n", type=int, default=10)
verify.set_defaults(func=_cmd_verify)
distill = sub.add_parser(
"distill",
help="compress docs into Merkle-signed cores (surface->core or core->core)",
)
distill.add_argument(
"--process", default="first-sentence-v1", help="distiller name"
)
distill.add_argument(
"--kind",
choices=["surface", "core"],
default="surface",
help="source kind to scan; 'core' runs recursive distillation",
)
distill.add_argument(
"--source-type",
dest="source_type",
default=None,
help="restrict to one source_type",
)
distill.add_argument(
"--chunker", default=None, help="chunker for the core doc"
)
distill.add_argument(
"--limit", type=int, default=None, help="cap number of docs scanned"
)
distill.add_argument(
"--batch-size",
dest="batch_size",
type=int,
default=200,
help="cores written per SQLite transaction (default 200)",
)
distill.set_defaults(func=_cmd_distill)
ask_cmd = sub.add_parser(
"ask",
help="answer a question about a document (cache-first, STRICT)",
)
ask_cmd.add_argument(
"--document-root",
dest="document_root",
required=True,
help="document_root to ask about",
)
ask_cmd.add_argument(
"--question", required=True, help="question text"
)
ask_cmd.add_argument(
"--model",
default=None,
help="model_id (default $ARBORIST_LLM_MODEL or hermes-3)",
)
ask_cmd.add_argument(
"--endpoint",
default=None,
help="OpenAI-compatible base URL (default $ARBORIST_LLM_ENDPOINT)",
)
ask_cmd.add_argument(
"--dry-run",
dest="dry_run",
action="store_true",
help="use StubClient — no network call",
)
ask_cmd.add_argument(
"--answer-mode", dest="answer_mode", default=None,
choices=["quote", "claim_lattice_pointer", "claim_lattice"],
help=(
"answer schema. See `query --answer-mode` for full semantics. "
"Default 'quote'; 'claim_lattice_pointer' enables quote-by-pointer; "
"'claim_lattice' is the JSON variant (vLLM guided_json + lenient "
"pre-parser; pairs with grammar-constrained inference)."
),
)
ask_cmd.set_defaults(func=_cmd_ask)
query_cmd = sub.add_parser(
"query",
help="multi-source RAG: question -> top-K corpus docs -> Hermes -> cache",
)
query_cmd.add_argument("question", help="the question to ask")
query_cmd.add_argument(
"--top-k", dest="top_k", type=int, default=8,
help="max distinct source documents in context (default 8)",
)
query_cmd.add_argument(
"--over-fetch", dest="over_fetch", type=int, default=32,
help="FTS5 hits to fetch per shard before dedup (default 32)",
)
query_cmd.add_argument(
"--max-context-chars", dest="max_context_chars", type=int, default=None,
help=(
"cap on assembled context bytes. When omitted, falls back to "
"the per-mode default in DEFAULT_QUERY_POLICY['max_context_chars_by_mode'] "
"(quote=24000, claim_lattice_pointer=24000, claim_lattice=48000). "
"Sprint 1b 2026-05-02 — peaks measured per mode."
),
)
query_cmd.add_argument(
"--question-dedup", dest="question_dedup", default=None,
choices=["strict", "equivalence_class"],
help=(
"write-time question canonicalization. 'equivalence_class' "
"(default) collapses articles + trailing-punct + case so "
"variants share cache_keys. 'strict' keeps every variant "
"distinct (audit-grade)."
),
)
query_cmd.add_argument(
"--fidelity", dest="fidelity", default=None,
choices=["strict", "equivalence_class"],
help=(
"lookup tolerance. 'equivalence_class' (default) tries the "
"primary cache_key then falls back to the alternate dedup "
"mode's cache_key. 'strict' refuses fallback."
),
)
query_cmd.add_argument(
"--qa-db", dest="qa_db", default=None,
help=(
"providence_cache target DB. default: <shards-dir>/qa.db, or "
"~/.arborist/qa.db when no shards-dir"
),
)
query_cmd.add_argument(
"--model", default=None,
help="model_id (default $ARBORIST_LLM_MODEL or hermes-3)",
)
query_cmd.add_argument(
"--endpoint", default=None,
help="OpenAI-compatible base URL (default $ARBORIST_LLM_ENDPOINT)",
)
query_cmd.add_argument(
"--dry-run", dest="dry_run", action="store_true",
help="use StubClient — assembles context but skips the LLM call",
)
query_cmd.add_argument(
"--json", action="store_true",
help="emit the raw record as indented JSON (default: human render)",
)
query_cmd.add_argument(
"--burn", action="store_true",
help=(
"delete the matching live providence_cache row BEFORE lookup, "
"forcing a fresh inference. Test-ergonomic — see new behavior "
"without finding cache_keys by hand. Writes a providence_burn "
"audit event."
),
)
query_cmd.add_argument(
"--repair", action="store_true",
help=(
"enable mechanical repair after first verify (off by "
"default). When the verdict is HYBRID/UNGROUNDED, applies "
"synthetic_elision split, trailing_artifact trim, and "
"no_overlap drop deterministically; persists the repaired "
"answer with a providence_repair audit event."
),
)
query_cmd.add_argument(
"--repair-reprompts", dest="repair_reprompts", type=int, default=0,
help=(
"max LLM re-prompt iterations after mechanical repair "
"(default 0 = no re-prompt). Each iteration sends a feedback "
"turn naming failed quotes; the model is asked to rewrite "
"using only verbatim citations. Requires --repair."
),
)
query_cmd.add_argument(
"--answer-mode", dest="answer_mode", default=None,
choices=["quote", "claim_lattice_pointer", "claim_lattice"],
help=(
"answer schema. 'quote' (default): model writes prose with "
"verbatim quote spans inline. 'claim_lattice_pointer' (G0 "
"/ CTI quote-by-pointer): runtime builds a labeled evidence "
"map (E1, E2, …); model writes pointer-line prose ('Claim. "
"[E12]'); renderer interpolates literal spans. Synthetic-"
"elision-by-construction-impossible. No repair loop "
"(one-shot discipline). 'claim_lattice' is the JSON variant "
"(vLLM guided_json + lenient pre-parser; pairs with grammar-"
"constrained inference like Qwen 3.6 reasoner / Claude / GPT-4)."
),
)
query_cmd.add_argument(
"--retrieval-keywords", dest="retrieval_keywords", default=None,
help=(
"operator-supplied keywords appended to the question for "
"FTS5 retrieval ONLY — never sent to the LLM, never enters "
"cache_key, never reaches the verifier. Use to narrow "
"OR-mode retrieval on long discursive questions whose "
"content tokens get diluted by template phrasing. Example: "
"make query Q='what tech may enable one person to "
"reconstruct another person's thoughts...' "
"K='transcranial knowledge acquisition'. Pair with --burn "
"to force fresh inference (keywords are session-only and "
"cache-hits ignore them)."
),
)
# Ticket #000008 Phase 4 — quantifier-guard CLI flags. Each
# corresponds to a level of the §10.11.2 disable hierarchy.
query_cmd.add_argument(
"--no-quantifier-guard",
dest="no_quantifier_guard", action="store_true",
help=(
"Disable the broad-quantifier preflight guard for this "
"call. Overrides quantifier_guard_enabled in policy. "
"Bench-side telemetry (quantifier_intensity, etc.) goes "
"to None for the row. Use when the guard misclassifies."
),
)
query_cmd.add_argument(
"--allow-broad",
dest="allow_broad", action="store_true",
help=(
"Emergent-search mode: keep the classifier on (telemetry "
"stays useful) but don't apply caps. For broad questions "
"where the operator wants exploratory enumeration, not "
"grounded completeness."
),
)
query_cmd.add_argument(
"--reject-broad",
dest="reject_broad", action="store_true",
help=(
"Strict mode: when intensity is ALL/COMPREHENSIVE/"
"OPEN_REQUEST AND scope_bound_hint is unbounded, return "
"UNGROUNDED before the LLM call with a "
"BROAD_QUANTIFIER_REJECTED violation. Saves ~10-15s on "
"rejected runs. Bounded universals (e.g. all members of "
"the Beatles) are NOT rejected."
),
)
query_cmd.add_argument(
"--apply-quantifier-caps",
dest="apply_quantifier_caps", action="store_true",
help=(
"Flip the dry-run gate per-call. By default Phase 2 "
"lands with quantifier_guard_apply_caps=False so the "
"cap is reported on the result but not applied to the "
"verifier. This flag enables actual cap enforcement "
"for one call. Use after dry-run bench review confirms "
"the classifier output across the question set."
),
)
# Ticket #000001 §7 Phase 0 — cross-language guard (default OFF;
# dry-run rollout discipline). Enable per-call to experiment.
query_cmd.add_argument(
"--crosslang-guard",
dest="crosslang_guard", action="store_true",
help=(
"Enable the deterministic cross-language guard for this "
"call (ticket #000001 §7 Phase 0, default OFF). A query "
"with a non-English signal (¿/¡/non-ASCII letter) has "
"es-v1 function words stripped from the retrieval set so "
"they can't drive an OR-mode full-corpus FTS5 scan; if no "
"corpus-language content token survives, returns "
"UNGROUNDED before retrieval/LLM with a "
"CROSS_LANGUAGE_UNSUPPORTED violation. Retrieval-side "
"only — never touches the verifier / cache_key / "
"question_hash. Provably inert on English."
),
)
query_cmd.add_argument(
"--crosslang-translate",
dest="crosslang_translate", action="store_true",
help=(
"Operation Sandwich (ticket #000056, default OFF; implies "
"--crosslang-guard). When the non-English signal fires, "
"translate the query es→en (local [mt] opus-mt; the "
"English article ranks primary AND the LLM is prompted in "
"English), ground the English answer with the UNTOUCHED "
"verifier, then render the verified English answer back "
"es as DISPLAY-ONLY (banner-labelled, zero grounding). "
"Needs the [mt] extra; degrades to the Phase-0 guard if "
"absent. Never touches the verifier / cache_key / "
"question_hash / governance_policy_hash."
),
)
# Ticket #000010 — meta-cognition CLI flags.
query_cmd.add_argument(
"--no-preflight",
dest="no_preflight", action="store_true",
help=(
"Disable the meta-cognition preflight guard for this "
"call. Skips temporal / contradiction / false-premise / "
"out-of-corpus detectors. The QuestionState surfaces a "
"stub with empty logical_statuses so bench rows stay "
"column-aligned."
),
)
query_cmd.add_argument(
"--block-on-contradiction",
dest="block_on_contradiction", action="store_true",
help=(
"Hard-block on lexical contradictions (default: label-"
"only). Strict mode: questions like 'which unmarried "
"spouse is X married to' return PREFLIGHT_BLOCKED."
),
)
query_cmd.add_argument(
"--soft-preflight",
dest="soft_preflight", action="store_true",
help=(
"Ticket #000011 — opt-in to the model-assisted soft "
"preflight sidecar. Adds one short LLM round-trip "
"(~200ms median) before the main answer call; the model "
"classifies the question shape and returns a SOFT_* "
"advisory hint that surfaces as `· soft: <label>` on "
"the audit-line tail. NEVER enters the verifier proof "
"path; cannot create PREFLIGHT_OK or PREFLIGHT_BLOCKED."
),
)
query_cmd.add_argument(
"--no-canonical-preflight",
dest="no_canonical_preflight", action="store_true",
help=(
"Disable the math/logic π* short-circuit. Pure-arithmetic "
"(e.g. '0.1 + 0.2') and pure-propositional ('A IMPL B') "
"questions normally bypass RAG and answer via "
"arithmetic@v1 / logic-kernel@v1 directly. Pass this "
"flag to force RAG against the LLM (useful when bench-"
"comparing the canonical answer with the model's reply)."
),
)
witness_group = query_cmd.add_mutually_exclusive_group()
witness_group.add_argument(
"--witness", dest="witness_override", action="store_const",
const="on",
help=(
"Ticket #000028 — multi-modality witness. On canonical-shape "
"questions (arithmetic@v1, logic-kernel@v1, future kernels), "
"fan out kernel + cache + LLM in parallel and verify cross-"
"modality agreement via kernel-canonicalization. Adds one LLM "
"call (~2-5s) on top of the canonical fast path; default OFF "
"to keep arithmetic queries at ~10ms."
),
)
witness_group.add_argument(
"--no-witness", dest="witness_override", action="store_const",
const="off",
help="Disable the witness path even when policy enables it.",
)
progress_group = query_cmd.add_mutually_exclusive_group()
progress_group.add_argument(
"--progress", dest="progress_override", action="store_const",
const="on",
help=(
"Force per-stage state-machine emission to stderr (per-shard "
"× per-route search lines, llm.start/done, verify, persist, "
"etc.). Default: on at TTY, off when stderr is piped. "
"Format: [arborist NN.NNs] stage.name key=value …"
),
)
progress_group.add_argument(
"--no-progress", dest="progress_override", action="store_const",
const="off",
help=(
"Suppress state-machine emission even when stderr is a TTY. "
"Use when running interactively but piping output through "
"tools that misbehave on stderr noise."
),
)
query_cmd.set_defaults(
func=_cmd_query, progress_override=None, witness_override=None,
)
inspect_cmd = sub.add_parser(
"inspect",
help=(
"sidecar diagnostic for a providence_cache record — pulls "
"source chunks and classifies each unverified span "
"(paraphrase / trailing_artifact / interior_elision / "
"synthetic_elision_inside_quote / "
"no_overlap). Read-only, "
"no audit events, no v9.8 field changes."
),
)
inspect_cmd.add_argument(
"--cache-key", dest="cache_key", required=True,
help="64-char hex cache_key of the providence record to inspect",
)
inspect_cmd.add_argument(
"--qa-db", dest="qa_db", default=None,
help="path to qa.db (default: <shards>/qa.db or ~/.arborist/qa.db)",
)
inspect_cmd.add_argument(
"--json", action="store_true",
help="emit raw diagnosis as JSON (default: human render)",
)
inspect_cmd.set_defaults(func=_cmd_inspect)
losses_cmd = sub.add_parser(
"losses",
help=(
"list adapter LossReport sidecar rows (ticket #000022). "
"Read-only; never enters proof path"
),
)
losses_cmd.add_argument(
"--document-root", dest="document_root", default=None,
help="filter to one document_root (hex)",
)
losses_cmd.add_argument(
"--chunk-id", dest="chunk_id", type=int, default=None,
help="filter to one chunk_id",
)
losses_cmd.add_argument(
"--kind", default=None,
help="filter to one loss_kind (ref_tag, file_link, html_chrome, ...)",
)
losses_cmd.add_argument(
"--stage", default=None,
help="filter to one stage (wikitext_base, html_normalize, ingest)",
)
losses_cmd.add_argument(
"--summary", action="store_true",
help="aggregate by (stage, loss_kind, loss_mode)",
)
losses_cmd.add_argument(
"--limit", type=int, default=200,
help="row cap for non-summary mode (default 200)",
)
losses_cmd.add_argument(
"--json", action="store_true",
help="emit raw rows as JSON (default: human render)",
)
losses_cmd.set_defaults(func=_cmd_losses)
prov_cmd = sub.add_parser(
"providence",
help="list or falsify providence_cache records",
)
prov_cmd.add_argument("--document-uri", dest="document_uri", default=None)
prov_cmd.add_argument("--source-root", dest="source_root", default=None)
prov_cmd.add_argument("--limit", type=int, default=20)
prov_cmd.add_argument(
"--falsify",
default=None,
metavar="CACHE_KEY",
help=(
"mark a providence_cache record as failed/stale/quarantined. "
"Lookups will skip it. Audit chain records the act"
),
)
prov_cmd.add_argument(
"--state",
default="failed",
choices=["failed", "stale", "quarantined"],
help="falsification state to set (default: failed)",
)
prov_cmd.add_argument(
"--reason",
default=None,
help="reason text stored in falsifications log",
)
prov_cmd.add_argument(
"--by-actor",
dest="by_actor",
default=None,
help="who is falsifying (default: $USER)",
)
prov_cmd.add_argument(
"--show-preflight",
dest="show_preflight",
default=None,
metavar="CACHE_KEY_PREFIX",
help=(
"Pull the preflight stage payload from a row's "
"run_dag_blob. Match by 12-char prefix. Renders the "
"preflight stage hash + run-DAG stage list. Operator "
"tool for inspecting the policy state that governed a "
"cached row (#000009 §7.2)."
),
)
prov_cmd.set_defaults(func=_cmd_providence)
ce_cmd = sub.add_parser(
"controller-events",
help=(
"list controller_events advisory rows from #000037 Phase 2 "
"(QA-runner → controller decision/difficulty/budget rows)"
),
)
ce_cmd.add_argument("--limit", type=int, default=20)
ce_cmd.add_argument(
"--kind",
choices=[
"controller_decision",
"controller_difficulty",
"controller_budget_allocation",
"controller_falsification_proposal",
],
default=None,
help="filter to one event_kind",
)
ce_cmd.add_argument(
"--organism-prefix",
dest="organism_prefix",
default=None,
help='match organism_root LIKE prefix (e.g. "qa:" for QA-runner advisories)',
)
ce_cmd.add_argument(
"--since-seconds",
dest="since_seconds",
type=int,
default=None,
help="only rows recorded within the last N seconds",
)
ce_cmd.add_argument(
"--body",
action="store_true",
help="include the JSON body_blob in JSON output (off by default — bodies are bulky)",
)
ce_cmd.add_argument(
"--json",
action="store_true",
help="emit JSON instead of a terminal table",
)
ce_cmd.set_defaults(func=_cmd_controller_events)
burn_cmd = sub.add_parser(
"burn",
help=(
"delete a leaf with no children — providence_cache, document, "
"or core (kindergarten use; falsify/evict are audit-preserving)"
),
)
burn_cmd.add_argument(
"--kind",
choices=("providence", "document", "core"),
default="providence",
help="leaf kind to burn (default: providence — backwards-compatible)",
)
burn_cmd.add_argument(
"--cache-key",
dest="cache_key",
default=None,
help="cache_key (hex) of the providence record to burn (kind=providence)",
)
burn_cmd.add_argument(
"--root",
dest="root",
default=None,
help="document_root (hex) of the document/core to burn (kind=document|core)",
)
burn_cmd.add_argument(
"--reason",
default=None,
help="reason text recorded in the burn audit event",
)
burn_cmd.add_argument(
"--by-actor",
dest="by_actor",
default=None,
help="who is burning (default: $USER)",
)
burn_cmd.add_argument(
"--force",
action="store_true",
help="burn even if children exist; not recommended",
)
burn_cmd.set_defaults(func=_cmd_burn)
burn_kg_cmd = sub.add_parser(
"burn-kindergarten",
help=(
"burn every providence_cache record younger than the "
"kindergarten window — test-ergonomic mass cleanup that "
"matches the mesh-sync kindergarten window so only "
"un-broadcast records get busted"
),
)
burn_kg_cmd.add_argument(
"--kindergarten-seconds",
dest="kindergarten_seconds",
type=int,
default=3600,
help=(
"burn rows younger than this many seconds (default: 3600 = 1 "
"hour, mirrors mesh sync default). 0 = burn everything live."
),
)
burn_kg_cmd.add_argument(
"--reason", default=None,
help="reason text recorded in each providence_burn audit event",
)
burn_kg_cmd.add_argument(
"--by-actor", dest="by_actor", default=None,
help="who is burning (default: $USER)",
)
burn_kg_cmd.add_argument(
"--force", action="store_true",
help="burn even if rows have falsification children",
)
burn_kg_cmd.add_argument(
"--dry-run", dest="dry_run", action="store_true",
help="report what would burn without writing",
)
burn_kg_cmd.add_argument(
"--verbose", type=int, default=10,
help="include this many items in the result JSON (default: 10)",
)
burn_kg_cmd.set_defaults(func=_cmd_burn_kindergarten)
reclassify_cmd = sub.add_parser(
"reclassify",
help="re-run the verifier against existing live providence records "
"(no LLM calls; relabels stale classifications)",
)
reclassify_cmd.add_argument(
"--qa-db", dest="qa_db", default=None,
help="path to qa.db (default: <shards>/qa.db or ~/.arborist/qa.db)",
)
reclassify_cmd.add_argument(
"--limit", type=int, default=0,
help="reclassify at most N records (0 = unlimited)",
)
reclassify_cmd.add_argument(
"--dry-run", dest="dry_run", action="store_true",
help="report what would change without writing",
)
reclassify_cmd.add_argument(
"--entity-policy", dest="entity_policy", default=None,
choices=["strict", "hybrid", "drop", "proximity"],
help=(
"how the entity path classifies: 'strict' (legacy, overclaims), "
"'hybrid' (default — caps at HYBRID), 'drop' (skip entity path → "
"UNGROUNDED), 'proximity' (STRICT only if N entities cluster within "
"W chars in source)"
),
)
reclassify_cmd.add_argument(
"--compare", dest="compare", action="store_true",
help="run all four entity policies side-by-side without writing",
)
reclassify_cmd.set_defaults(func=_cmd_reclassify)
emergent_cmd = sub.add_parser(
"emergent",
help="surface UNGROUNDED/HYBRID claims — corpus-growth signal",
)
emergent_cmd.add_argument(
"--aggregate",
action="store_true",
help="rank unverified quotes by frequency (vs per-record list)",
)
emergent_cmd.add_argument("--limit", type=int, default=20)
emergent_cmd.set_defaults(func=_cmd_emergent)
evict_cmd = sub.add_parser(
"evict",
help="demote surface chunks hot→cold (NULL content, retain leaf_hash)",
)
evict_cmd.add_argument(
"--source-type",
dest="source_type",
default=None,
help="restrict to one source_type",
)
evict_cmd.add_argument(
"--older-than-days",
dest="older_than_days",
type=int,
default=None,
help="only evict docs older than N days",
)
evict_cmd.add_argument(
"--document-root",
action="append",
default=None,
help="explicit document_root(s) to evict; repeatable",
)
evict_cmd.set_defaults(func=_cmd_evict)
rehydrate_cmd = sub.add_parser(
"rehydrate",
help="refetch URI, verify leaves, restore cold content if root matches",
)
rehydrate_cmd.add_argument(
"--document-root",
action="append",
default=None,
help="explicit document_root(s) to rehydrate; repeatable",
)
rehydrate_cmd.add_argument(
"--all-cold",
dest="all_cold",
action="store_true",
help="rehydrate every document with cold chunks",
)
rehydrate_cmd.set_defaults(func=_cmd_rehydrate)
activity_cmd = sub.add_parser(
"activity",
help="recent Q&A + freshly cached docs (agent-readable timeline)",
)
activity_cmd.add_argument(
"--limit", type=int, default=10,
help="max items per category (default 10)",
)
activity_cmd.add_argument(
"--since-seconds",
dest="since_seconds",
type=int,
default=0,
help="only events newer than this many seconds (0 = all time, default)",
)
activity_cmd.add_argument(
"--preview-chars",
dest="preview_chars",
type=int,
default=240,
help="answer preview length (default 240 chars)",
)
activity_cmd.set_defaults(func=_cmd_activity)
stats_cmd = sub.add_parser("stats", help="counts: docs, chunks, edges, audit")
stats_cmd.set_defaults(func=_cmd_stats)
canon_cmd = sub.add_parser(
"canon",
help=(
"canonicalize input via a registered π* (direct projection, "
"no RAG, no LLM)"
),
)
canon_cmd.add_argument(
"key", nargs="?",
help="π* key, e.g. arithmetic@v1, logic-kernel@v1, code-py-ast@v1",
)
canon_cmd.add_argument(
"input", nargs="?",
help="raw input string to canonicalize (UTF-8)",
)
canon_cmd.add_argument(
"--list", action="store_true",
help="list registered π* keys + their domains and exit",
)
canon_cmd.add_argument(
"--json", action="store_true",
help="emit {pi_star_ref, input, canonical, canonical_sha256}",
)
canon_cmd.set_defaults(func=_cmd_canon)
analyze_cmd = sub.add_parser(
"analyze",
help="compression spectrum, depth distribution, audit chain integrity",
)
analyze_cmd.add_argument(
"--gravity-top",
dest="gravity_top",
type=int,
default=10,
help="N top inbound-linked documents to report (default 10)",
)
analyze_cmd.set_defaults(func=_cmd_analyze)
# ----- snapshot subcommands ----------------------------------------------
snap_cmd = sub.add_parser(
"snapshot",
help="corpus-level Merkle snapshots: pin a forest state by single root",
)
snap_sub = snap_cmd.add_subparsers(dest="snap_op", required=True)
snap_create = snap_sub.add_parser(
"create", help="compute snapshot_root from current corpus, persist + audit"
)
snap_create.add_argument("--reason", default="manual")
snap_create.add_argument(
"--parent",
default=None,
help="explicit parent_snapshot hex (default: auto-link to latest prior snapshot)",
)
snap_create.set_defaults(func=_cmd_snapshot_create)
snap_list = snap_sub.add_parser("list", help="recent snapshots, newest first")
snap_list.add_argument("--limit", type=int, default=20)
snap_list.set_defaults(func=_cmd_snapshot_list)
snap_verify = snap_sub.add_parser(
"verify",
help="recompute root from current corpus; matches=True iff nothing has changed",
)
snap_verify.add_argument("snapshot_root", help="hex snapshot_root to verify")
snap_verify.set_defaults(func=_cmd_snapshot_verify)
snap_diff = snap_sub.add_parser(
"diff",
help="coarse drift signal between a snapshot and the current corpus",
)
snap_diff.add_argument("snapshot_root", help="hex snapshot_root to diff against current")
snap_diff.set_defaults(func=_cmd_snapshot_diff)
# ----- substrate subcommands (ticket #000012 Phase 1a + future) ----------
# Was `arborist v8 score` until 2026-05-10; renamed for naming-consistency
# with the arborist/substrate/ dir, which is itself the post-rename home
# of what used to live under arborist/v8/. The ``v`` in ``v8`` referred
# to the substrate-paper version, which collided with the v9.8 SQLite
# schema version and confused readers.
substrate_cmd = sub.add_parser(
"substrate",
help="Merkle-AGI substrate primitives (ForkScore + future paper specs)",
)
substrate_sub = substrate_cmd.add_subparsers(dest="substrate_op", required=True)
substrate_score = substrate_sub.add_parser(
"score",
help="ForkScore over (parent, child) bench-result JSON files (#000012)",
)
substrate_score.add_argument(
"--parent", required=True,
help="path to parent bench-result JSON (from `bench.batteries.runner --all`)",
)
substrate_score.add_argument(
"--child", required=True,
help="path to child bench-result JSON",
)
substrate_score.add_argument(
"--weights", default=None,
help="optional path to a weights JSON file; falls through to DEFAULT_WEIGHTS",
)
substrate_score.add_argument(
"--capital-delta", dest="capital_delta", type=float, default=0.0,
help="capital cost delta from #000020 ledger; positive = child costs more",
)
substrate_score.add_argument(
"--selfmodel-calibration-gain",
dest="selfmodel_calibration_gain", type=float, default=0.0,
help="SelfModel calibration improvement (parent→child); 0 if unmeasured",
)
substrate_score.add_argument(
"--audit-completeness", dest="audit_completeness", type=float, default=0.0,
help="fraction of state-changes with audit-event in 0..1",
)
substrate_score.add_argument(
"--validator-diversity", dest="validator_diversity", type=float, default=0.0,
help="multi-validator diversity score; 0 in single-validator mode",
)
substrate_score.add_argument(
"--security-risk", dest="security_risk", type=float, default=0.0,
help="reserved; 0 in Phase 1a (no security-bench yet)",
)
substrate_score.add_argument(
"--complexity-delta", dest="complexity_delta", type=float, default=0.0,
help="reserved; 0 in Phase 1a",
)
substrate_score.add_argument(
"--memory-invalidation-count",
dest="memory_invalidation_count", type=float, default=0.0,
help="count of memory_records the fork would falsify",
)
substrate_score.add_argument(
"--out", default=None,
help=(
"optional output file path; ScoredFork JSON is also written "
"here in addition to stdout. Default: stdout only. The CI "
"/ make bench-fork-score targets pin this to "
"bench/results/fork_score_report.json so downstream graders "
"/ ForkScore-aware mesh peers can ingest the artifact."
),
)
# ----- Phase 1c (#000012 §7) — branch-set persistence ---------------
# Default off. When --branch-set is present a row is written to
# ``fork_score_branches``; absent ⇒ pure-function semantics
# (Phase 1a behavior preserved).
substrate_score.add_argument(
"--branch-set", dest="branch_set", default=None,
help=(
"checkpoint identity (e.g. parent_root + ts); when present, "
"writes one row to fork_score_branches sibling table"
),
)
substrate_score.add_argument(
"--branch-id", dest="branch_id", default=None,
help="fork identifier; defaults to --child-root when omitted",
)
substrate_score.add_argument(
"--parent-root", dest="parent_root", default=None,
help="shared parent root (required when --branch-set is given)",
)
substrate_score.add_argument(
"--child-root", dest="child_root", default=None,
help="child root (nullable for in-flight branches)",
)
substrate_score.add_argument(
"--persist-shard", dest="persist_shard", default=None,
help=(
"SQLite path to write fork_score_branches row to; defaults "
"to --db (the current arborist target shard)"
),
)
substrate_score.add_argument(
"--weights-id", dest="weights_id", default=None,
help=(
"opaque label for the WeightSet used; defaults to "
"'default' or the basename of --weights"
),
)
substrate_score.set_defaults(func=_cmd_substrate_score)
# ----- memory subcommands (ticket #000017) --------------------------------
memory_cmd = sub.add_parser(
"memory",
help="lifelong-learning audit summary (ticket #000017)",
)
memory_sub = memory_cmd.add_subparsers(
dest="memory_op", required=True
)
mem_snap = memory_sub.add_parser(
"snapshot",
help="build a memory snapshot from current store state",
)
mem_snap.set_defaults(func=_cmd_memory_snapshot)
mem_show = memory_sub.add_parser(
"show",
help="print a memory record by root, or the latest live one",
)
mem_show.add_argument(
"--root",
default=None,
help="hex memory_root (default: latest live)",
)
mem_show.set_defaults(func=_cmd_memory_show)
mem_branches = memory_sub.add_parser(
"branches",
help="list branch summaries attached to a memory_root",
)
mem_branches.add_argument(
"--root",
default=None,
help="hex memory_root (default: latest live)",
)
mem_branches.set_defaults(func=_cmd_memory_branches)
mem_fals = memory_sub.add_parser(
"falsify", help="mark a memory_root falsified"
)
mem_fals.add_argument("root", help="hex memory_root to falsify")
mem_fals.add_argument(
"--reason", required=True, help="why this memory is falsified"
)
mem_fals.add_argument(
"--branch-id",
dest="branch_id",
default=None,
help="optional triggering branch_id",
)
mem_fals.set_defaults(func=_cmd_memory_falsify)
# ----- capital subcommands (ticket #000020) -------------------------------
capital_cmd = sub.add_parser(
"capital",
help="8-capital-form cost ledger (ticket #000020)",
)
capital_sub = capital_cmd.add_subparsers(
dest="capital_op", required=True
)
cap_summary = capital_sub.add_parser(
"summary",
help="aggregate per-form sums across the ledger",
)
cap_summary.add_argument(
"--op-type",
dest="op_type",
default=None,
help="filter to one op_type",
)
cap_summary.add_argument(
"--since",
type=int,
default=None,
help="only rows with recorded_at >= this Unix timestamp",
)
cap_summary.set_defaults(func=_cmd_capital_summary)
cap_op = capital_sub.add_parser(
"op-cost", help="per-form totals for one op_type"
)
cap_op.add_argument("op_type", help="e.g. ingest|qa|distill")
cap_op.set_defaults(func=_cmd_capital_op_cost)
cap_top = capital_sub.add_parser(
"top", help="top-N op_types by total in one capital form"
)
cap_top.add_argument(
"--form",
required=True,
choices=[
"living",
"material",
"financial",
"intellectual",
"experiential",
"social",
"cultural",
"spiritual",
],
)
cap_top.add_argument("--limit", type=int, default=10)
cap_top.set_defaults(func=_cmd_capital_top)
# ----- selfmodel subcommands (ticket #000014) -----------------------------
selfmodel_cmd = sub.add_parser(
"selfmodel",
help="agent identity record: capability claims, falsification (ticket #000014)",
)
selfmodel_sub = selfmodel_cmd.add_subparsers(
dest="selfmodel_op", required=True
)
sm_snap = selfmodel_sub.add_parser(
"snapshot",
help="build a SelfModel from current store state and persist it",
)
sm_snap.set_defaults(func=_cmd_selfmodel_snapshot)
sm_show = selfmodel_sub.add_parser(
"show",
help="print a SelfModel by root, or the latest live one",
)
sm_show.add_argument(
"--root",
default=None,
help="hex selfmodel_root (default: latest live)",
)
sm_show.set_defaults(func=_cmd_selfmodel_show)
sm_fals = selfmodel_sub.add_parser(
"falsify",
help="mark a SelfModel falsified with a reason",
)
sm_fals.add_argument("root", help="hex selfmodel_root to falsify")
sm_fals.add_argument(
"--reason", required=True, help="why this SelfModel is falsified"
)
sm_fals.add_argument(
"--claim-hash",
dest="claim_hash",
default=None,
help="optional triggering capability-claim hash",
)
sm_fals.set_defaults(func=_cmd_selfmodel_falsify)
sm_list = selfmodel_sub.add_parser(
"list", help="list recent SelfModel rows, newest first"
)
sm_list.add_argument("--limit", type=int, default=20)
sm_list.set_defaults(func=_cmd_selfmodel_list)
# ----- warrant resolver (#000031 Phase 2) --------------------------------
warrant_status_cmd = sub.add_parser(
"warrant-status",
help="show citation-resolver matches per claim-pack record (read-only)",
)
warrant_status_cmd.add_argument(
"--shards-dir",
dest="shards_dir",
default=None,
help="shards directory (overrides --shards-dir from global)",
)
warrant_status_cmd.add_argument(
"--limit", type=int, default=3, help="max candidate matches per record"
)
warrant_status_cmd.set_defaults(func=_cmd_warrant_status)
warrant_resolve_cmd = sub.add_parser(
"warrant-resolve",
help="run citation resolver; with --write, write derivations rows binding "
"claim-pack records to surface chunks (Merkle proof)",
)
warrant_resolve_cmd.add_argument(
"--shards-dir",
dest="shards_dir",
default=None,
help="shards directory (overrides --shards-dir from global)",
)
warrant_resolve_cmd.add_argument(
"--write",
action="store_true",
help="actually write derivations rows (default: dry-run summary)",
)
warrant_resolve_cmd.add_argument(
"--limit", type=int, default=1, help="how many top matches per record (default 1)"
)
warrant_resolve_cmd.add_argument(
"--use-aliases",
dest="use_aliases",
action="store_true",
help=(
"look up registered citation + term aliases at resolve "
"time (#000041 + #000042); alias-resolved derivations "
"carry process_id 'warrant-resolver-v1+alias'"
),
)
warrant_resolve_cmd.set_defaults(func=_cmd_warrant_resolve)
# ----- unconscious sweep (#000037 §3.1 partial) --------------------------
sweep_cmd = sub.add_parser(
"sweep",
help="unconscious sweep: re-run cross-checks on data that bypassed "
"meta-cognition at ingest time (#000037 §3.1)",
)
sweep_cmd.add_argument(
"--shards-dir",
dest="shards_dir",
default=None,
help="shards directory (overrides --shards-dir from global)",
)
sweep_cmd.add_argument(
"--target",
choices=["warrants", "all"],
default="warrants",
help=(
"sweep target: 'warrants' = re-run warrant resolver against every "
"claim-pack record (#000031), idempotent at PK level. 'all' "
"reserved for the full bicameral sweep landing later "
"(canonical-projection probe / freshness probe / document-content "
"witness — schema-bumped per #000037 §12 trigger)."
),
)
sweep_cmd.add_argument(
"--write",
action="store_true",
help="materialize derivations rows for newly-resolved records "
"(default: dry-run; report what would land)",
)
sweep_cmd.add_argument(
"--limit", type=int, default=1,
help="how many top matches per record to consider (default 1)",
)
sweep_cmd.add_argument(
"--use-aliases",
dest="use_aliases",
action="store_true",
help="apply citation + term aliases (#000041 + #000042) at resolve time",
)
sweep_cmd.set_defaults(func=_cmd_sweep)
# ----- alias subcommands (#000041 + #000042) -----------------------------
alias_cmd = sub.add_parser(
"alias",
help="curated citation + term aliases (audit-disciplined; opt-in)",
)
alias_sub = alias_cmd.add_subparsers(dest="alias_op", required=True)
# alias citation {add,list,remove}
alias_cit = alias_sub.add_parser(
"citation",
help="citation aliases (#000041): substitute textbook for proprietary cite",
)
alias_cit_sub = alias_cit.add_subparsers(dest="alias_cit_op", required=True)
cit_add = alias_cit_sub.add_parser(
"add", help="add a citation alias (refuses without --by)"
)
cit_add.add_argument("original", help="original source_reference string")
cit_add.add_argument(
"--substitute", required=True, help="replacement citation string"
)
cit_add.add_argument(
"--author", action="append", default=None,
help="author of the substitute work (repeatable)",
)
cit_add.add_argument(
"--title", default="", help="title of the substitute work"
)
cit_add.add_argument(
"--by", required=True,
help="who decided this alias (audit field; refuses if empty)",
)
cit_add.add_argument(
"--rationale", default="", help="why this alias is appropriate"
)
cit_add.add_argument(
"--aliases-db", default=None,
help="aliases DB path (default: <shards-dir>/000.db)",
)
cit_add.set_defaults(func=_cmd_alias_citation_add)
cit_list = alias_cit_sub.add_parser(
"list", help="list registered citation aliases"
)
cit_list.add_argument(
"--filter", default=None, help="substring filter on original_ref"
)
cit_list.add_argument("--aliases-db", default=None)
cit_list.set_defaults(func=_cmd_alias_citation_list)
cit_rm = alias_cit_sub.add_parser(
"remove", help="remove a citation alias by (original, substitute)"
)
cit_rm.add_argument("original")
cit_rm.add_argument("--substitute", required=True)
cit_rm.add_argument("--aliases-db", default=None)
cit_rm.set_defaults(func=_cmd_alias_citation_remove)
# alias term {add,list,remove}
alias_term = alias_sub.add_parser(
"term",
help="term aliases (#000042): vocabulary bridge for old vs modern words",
)
alias_term_sub = alias_term.add_subparsers(dest="alias_term_op", required=True)
term_add = alias_term_sub.add_parser(
"add", help="add a term alias (refuses without --by)"
)
term_add.add_argument("term", help="modern term used in claim-pack records")
term_add.add_argument(
"alternate", help="historical / foreign / alternate term used in textbook prose"
)
term_add.add_argument(
"--domain", required=True,
help="domain string (e.g., 'geometry', 'logic')",
)
term_add.add_argument(
"--by", required=True,
help="who decided this alias (audit field; refuses if empty)",
)
term_add.add_argument(
"--rationale", default="", help="why this alias is appropriate"
)
term_add.add_argument("--aliases-db", default=None)
term_add.set_defaults(func=_cmd_alias_term_add)
term_list = alias_term_sub.add_parser(
"list", help="list registered term aliases"
)
term_list.add_argument("--domain", default=None)
term_list.add_argument(
"--filter", default=None, help="substring filter on term"
)
term_list.add_argument("--aliases-db", default=None)
term_list.set_defaults(func=_cmd_alias_term_list)
term_rm = alias_term_sub.add_parser(
"remove", help="remove a term alias by (term, alternate, domain)"
)
term_rm.add_argument("term")
term_rm.add_argument("alternate")
term_rm.add_argument("--domain", required=True)
term_rm.add_argument("--aliases-db", default=None)
term_rm.set_defaults(func=_cmd_alias_term_remove)
# ----- mesh subcommands (off by default) ---------------------------------
mesh_cmd = sub.add_parser(
"mesh",
help="federation/gossip layer (off by default; opt-in via 'mesh enable')",
)
mesh_sub = mesh_cmd.add_subparsers(dest="mesh_op", required=True)
mesh_status = mesh_sub.add_parser("status", help="show enabled flag, identity, current epoch + roster")
mesh_status.set_defaults(func=_cmd_mesh_status)
mesh_init = mesh_sub.add_parser("init", help="generate this peer's keys; create epoch 0")
mesh_init.add_argument("--group", required=True, help="group name")
mesh_init.add_argument("--member-id", dest="member_id", default=None, help="optional fixed member id (default: random 8-hex)")
mesh_init.set_defaults(func=_cmd_mesh_init)
mesh_enable = mesh_sub.add_parser("enable", help="flip the mesh.enabled flag on")
mesh_enable.set_defaults(func=_cmd_mesh_enable)
mesh_disable = mesh_sub.add_parser("disable", help="flip the mesh.enabled flag off")
mesh_disable.set_defaults(func=_cmd_mesh_disable)
mesh_members = mesh_sub.add_parser("members", help="list current epoch's roster")
mesh_members.set_defaults(func=_cmd_mesh_members)
mesh_add = mesh_sub.add_parser("add", help="admin-only: add a peer to the roster (bumps epoch)")
mesh_add.add_argument("--member-id", dest="member_id", required=True)
mesh_add.add_argument("--sign-pub", dest="sign_pub", required=True, help="hex Ed25519 pubkey (32 bytes / 64 hex chars)")
mesh_add.add_argument("--dh-pub", dest="dh_pub", required=True, help="hex X25519 pubkey")
mesh_add.add_argument("--role", choices=["admin", "member"], default="member")
mesh_add.set_defaults(func=_cmd_mesh_add)
mesh_kick = mesh_sub.add_parser("kick", help="admin-only: evict a peer (bumps epoch; old signatures stay valid, new gossip is opaque to them)")
mesh_kick.add_argument("--member-id", dest="member_id", required=True)
mesh_kick.add_argument("--reason", required=True)
mesh_kick.set_defaults(func=_cmd_mesh_kick)
mesh_rotate = mesh_sub.add_parser("rotate", help="refresh epoch secret without changing roster")
mesh_rotate.add_argument("--reason", default="scheduled")
mesh_rotate.set_defaults(func=_cmd_mesh_rotate)
mesh_serve = mesh_sub.add_parser(
"serve",
help="run the HTTP gossip server (blocks until SIGINT)",
)
mesh_serve.add_argument("--host", default="127.0.0.1", help="bind host (default: 127.0.0.1)")
mesh_serve.add_argument("--port", type=int, default=8400, help="bind port (default: 8400)")
mesh_serve.set_defaults(func=_cmd_mesh_serve)
mesh_sync = mesh_sub.add_parser(
"sync",
help="announce local document_roots to a peer's gossip server",
)
mesh_sync.add_argument("--peer", required=True, help="peer URL, e.g. http://other.example.com:8400")
mesh_sync.add_argument("--limit", type=int, default=100, help="announce at most N most-recent items per category (default: 100)")
mesh_sync.add_argument("--verbose", type=int, default=10, help="include this many ack details in output (default: 10)")
mesh_sync.add_argument(
"--no-roots",
dest="no_roots",
action="store_true",
help="skip ANNOUNCE_ROOT broadcast (only push falsifications)",
)
mesh_sync.add_argument(
"--no-falsifications",
dest="no_falsifications",
action="store_true",
help="skip ANNOUNCE_FALSIFICATION broadcast (only push roots)",
)
mesh_sync.add_argument(
"--kindergarten-seconds",
dest="kindergarten_seconds",
type=int,
default=3600,
help=(
"hold records younger than this many seconds back from the "
"broadcast (default: 3600 = 1 hour). Gives operators time to "
"burn or falsify before peers see it. 0 = broadcast everything."
),
)
mesh_sync.set_defaults(func=_cmd_mesh_sync)
mesh_pull = mesh_sub.add_parser(
"pull",
help="pull one document body from a peer by document_root",
)
mesh_pull.add_argument("--root", required=True, help="64-char hex document_root to pull")
mesh_pull.add_argument("--peer", required=True, help="peer URL, e.g. http://other.example.com:8400")
mesh_pull.set_defaults(func=_cmd_mesh_pull)
crawl_cmd = sub.add_parser(
"crawl",
help=(
"BFS-discover same-domain URLs from a seed; optionally ingest "
"and store ETag/Last-Modified for cheap recrawl-checks "
"(requires arborist[crawler] extras)"
),
)
crawl_cmd.add_argument("--seed-url", dest="seed_url", required=True)
crawl_cmd.add_argument("--depth", type=int, default=2, help="max BFS depth (default: 2)")
crawl_cmd.add_argument(
"--max-pages",
dest="max_pages",
type=int,
default=0,
help="cap discovery at N URLs (0 = no cap, depth is the only bound; default: 0)",
)
crawl_cmd.add_argument(
"--ingest",
action="store_true",
help="ingest the discovered pages into --db (default: print URL list only)",
)
crawl_cmd.add_argument(
"--fast",
action="store_true",
help=(
"fast_mode: 5s timeouts, CPU*3 parallel page workers, ignore "
"robots.txt crawl-delay (Disallow is still honored). Use only "
"against domains where aggressive fetching is acceptable."
),
)
crawl_cmd.add_argument(
"--author",
default=None,
help=(
"default author surname (#000031 Phase 1 follow-up). "
"Appended to ingested document titles when the HTML's "
"<title> doesn't already carry the surname. Threads into "
"_shard_matches_citation's title-haystack for warrant "
"resolution. Only relevant with --ingest."
),
)
crawl_cmd.set_defaults(func=_cmd_crawl)
crawler_cmd = sub.add_parser(
"crawler",
help="crawler maintenance verbs (recrawl-check, ...)",
)
crawler_sub = crawler_cmd.add_subparsers(dest="crawler_op", required=True)
recrawl_check_cmd = crawler_sub.add_parser(
"recrawl-check",
help=(
"send conditional HEAD requests for ingested documents and "
"classify each as fresh/stale/gone/unreachable"
),
)
recrawl_check_cmd.add_argument(
"--domain",
default=None,
help="restrict to documents whose URI contains this domain",
)
recrawl_check_cmd.add_argument(
"--limit",
type=int,
default=100,
help="check at most N documents (oldest checks first; default: 100)",
)
recrawl_check_cmd.set_defaults(func=_cmd_crawler_recrawl_check)
return p
[docs]
def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
return args.func(args)
if __name__ == "__main__":
raise SystemExit(main())